use std::{ env, io, path::{Path, PathBuf}, sync::Arc, }; use axum::body::Bytes; use futures::{Stream, StreamExt}; use sha2::Digest as _; use tokio::{ fs, io::{AsyncWrite, AsyncWriteExt, BufWriter}, }; use tokio_util::io::StreamReader; use uuid::Uuid; #[derive(Clone)] pub struct FileStorage(Arc); impl FileStorage { pub fn new() -> anyhow::Result { let var = env::var("DRIVE_STORAGE_PATH"); let path_str = match var { Ok(ref string) => string, Err(err) => { tracing::info!( %err, "Error getting DRIVE_STORAGE_PATH variable. Defaulting to ./files" ); "./files" } }; let path = Path::new(path_str); match path.metadata() { Ok(meta) => anyhow::ensure!(meta.is_dir(), "Expected path to a directory"), Err(err) if err.kind() == io::ErrorKind::NotFound => { std::fs::create_dir_all(path)?; } Err(err) => return Err(err.into()), }; Ok(FileStorage(path.into())) } fn path_for_file(&self, file_id: Uuid) -> PathBuf { let file_name = file_id.as_hyphenated().to_string(); self.0.join(file_name) } async fn create_inner(&self, file_id: Uuid) -> anyhow::Result { fs::File::create_new(self.path_for_file(file_id)) .await .map_err(Into::into) } pub async fn create(&self) -> anyhow::Result<(Uuid, impl tokio::io::AsyncWrite)> { let mut error = anyhow::anyhow!("Error creating a file"); for _ in 0..3 { let file_id = Uuid::new_v4(); match self.create_inner(file_id).await { Ok(file) => return Ok((file_id, file)), Err(err) => error = error.context(err), } } Err(error) } pub async fn read(&self, file_id: Uuid) -> anyhow::Result { fs::File::open(self.path_for_file(file_id)) .await .map_err(Into::into) } pub async fn write(&self, file_id: Uuid) -> anyhow::Result> { match fs::File::create(self.path_for_file(file_id)).await { Ok(file) => Ok(Some(file)), Err(err) if err.kind() == io::ErrorKind::NotFound => Ok(None), Err(err) => Err(err.into()), } } pub async fn delete(&self, file_id: Uuid) -> anyhow::Result { match fs::remove_file(self.path_for_file(file_id)).await { Ok(()) => Ok(true), Err(err) if err.kind() == io::ErrorKind::NotFound => Ok(false), Err(err) => Err(err.into()), } } pub async fn write_to_file(file: F, stream: S) -> io::Result<(Vec, i64)> where F: AsyncWrite + Unpin, S: Stream> + Unpin, E: Into>, { const BUF_CAP: usize = 64 * 1024 * 1024; // 64 MiB let mut hash = sha2::Sha512::new(); let mut size: i64 = 0; let stream = stream.map(|value| { let bytes = value.map_err(io::Error::other)?; hash.update(&bytes); size = i64::try_from(bytes.len()) .ok() .and_then(|part_size| size.checked_add(part_size)) .ok_or_else(|| io::Error::other(anyhow::anyhow!("Size calculation overflow")))?; io::Result::Ok(bytes) }); let mut reader = StreamReader::new(stream); let mut writer = BufWriter::with_capacity(BUF_CAP, file); tokio::io::copy_buf(&mut reader, &mut writer).await?; writer.flush().await?; let hash = hash.finalize().to_vec(); Ok((hash, size)) } }