115 lines
3.8 KiB
Rust
115 lines
3.8 KiB
Rust
use std::{
|
|
env, io,
|
|
path::{Path, PathBuf},
|
|
sync::Arc,
|
|
};
|
|
|
|
use axum::body::Bytes;
|
|
use futures::{Stream, StreamExt};
|
|
use sha2::Digest as _;
|
|
use tokio::{
|
|
fs,
|
|
io::{AsyncWrite, AsyncWriteExt, BufWriter},
|
|
};
|
|
use tokio_util::io::StreamReader;
|
|
use uuid::Uuid;
|
|
|
|
#[derive(Clone)]
|
|
pub struct FileStorage(Arc<Path>);
|
|
|
|
impl FileStorage {
|
|
pub fn new() -> anyhow::Result<Self> {
|
|
let var = env::var("DRIVE_STORAGE_PATH");
|
|
let path_str = match var {
|
|
Ok(ref string) => string.trim(),
|
|
Err(err) => {
|
|
tracing::info!(
|
|
%err,
|
|
"Error getting DRIVE_STORAGE_PATH variable. Defaulting to ./files"
|
|
);
|
|
"./files"
|
|
}
|
|
};
|
|
let path = Path::new(path_str);
|
|
match path.metadata() {
|
|
Ok(meta) => anyhow::ensure!(meta.is_dir(), "Expected path to a directory"),
|
|
Err(err) if err.kind() == io::ErrorKind::NotFound => {
|
|
std::fs::create_dir_all(path)?;
|
|
}
|
|
Err(err) => return Err(err.into()),
|
|
};
|
|
Ok(FileStorage(path.into()))
|
|
}
|
|
|
|
fn path_for_file(&self, file_id: Uuid) -> PathBuf {
|
|
let file_name = file_id.as_hyphenated().to_string();
|
|
self.0.join(file_name)
|
|
}
|
|
|
|
async fn create_inner(&self, file_id: Uuid) -> anyhow::Result<impl tokio::io::AsyncWrite> {
|
|
fs::File::create_new(self.path_for_file(file_id))
|
|
.await
|
|
.map_err(Into::into)
|
|
}
|
|
|
|
pub async fn create(&self) -> anyhow::Result<(Uuid, impl tokio::io::AsyncWrite)> {
|
|
let mut error = anyhow::anyhow!("Error creating a file");
|
|
for _ in 0..3 {
|
|
let file_id = Uuid::now_v7();
|
|
match self.create_inner(file_id).await {
|
|
Ok(file) => return Ok((file_id, file)),
|
|
Err(err) => error = error.context(err),
|
|
}
|
|
}
|
|
Err(error)
|
|
}
|
|
|
|
pub async fn read(&self, file_id: Uuid) -> anyhow::Result<impl tokio::io::AsyncRead> {
|
|
fs::File::open(self.path_for_file(file_id))
|
|
.await
|
|
.map_err(Into::into)
|
|
}
|
|
|
|
pub async fn write(&self, file_id: Uuid) -> anyhow::Result<Option<impl tokio::io::AsyncWrite>> {
|
|
match fs::File::create(self.path_for_file(file_id)).await {
|
|
Ok(file) => Ok(Some(file)),
|
|
Err(err) if err.kind() == io::ErrorKind::NotFound => Ok(None),
|
|
Err(err) => Err(err.into()),
|
|
}
|
|
}
|
|
|
|
pub async fn delete(&self, file_id: Uuid) -> anyhow::Result<bool> {
|
|
match fs::remove_file(self.path_for_file(file_id)).await {
|
|
Ok(()) => Ok(true),
|
|
Err(err) if err.kind() == io::ErrorKind::NotFound => Ok(false),
|
|
Err(err) => Err(err.into()),
|
|
}
|
|
}
|
|
|
|
pub async fn write_to_file<F, S, E>(file: F, stream: S) -> io::Result<(Vec<u8>, i64)>
|
|
where
|
|
F: AsyncWrite + Unpin,
|
|
S: Stream<Item = Result<Bytes, E>> + Unpin,
|
|
E: Into<Box<dyn std::error::Error + Send + Sync>>,
|
|
{
|
|
const BUF_CAP: usize = 64 * 1024 * 1024; // 64 MiB
|
|
let mut hash = sha2::Sha512::new();
|
|
let mut size: i64 = 0;
|
|
let stream = stream.map(|value| {
|
|
let bytes = value.map_err(io::Error::other)?;
|
|
hash.update(&bytes);
|
|
size = i64::try_from(bytes.len())
|
|
.ok()
|
|
.and_then(|part_size| size.checked_add(part_size))
|
|
.ok_or_else(|| io::Error::other(anyhow::anyhow!("Size calculation overflow")))?;
|
|
io::Result::Ok(bytes)
|
|
});
|
|
let mut reader = StreamReader::new(stream);
|
|
let mut writer = BufWriter::with_capacity(BUF_CAP, file);
|
|
tokio::io::copy_buf(&mut reader, &mut writer).await?;
|
|
writer.flush().await?;
|
|
let hash = hash.finalize().to_vec();
|
|
Ok((hash, size))
|
|
}
|
|
}
|