This repository has been archived on 2024-08-23. You can view files and clone it, but cannot push or open issues or pull requests.
project/src/file_storage.rs
2024-08-04 12:34:46 +03:00

118 lines
3.8 KiB
Rust

use std::{
env, io,
path::{Path, PathBuf},
sync::Arc,
};
use axum::body::Bytes;
use sha2::Digest as _;
use tokio::{
fs,
io::{AsyncWrite, AsyncWriteExt, BufWriter},
};
use tokio_util::io::StreamReader;
use crate::prelude::*;
#[derive(Clone)]
pub struct FileStorage(Arc<Path>);
impl FileStorage {
pub fn new() -> anyhow::Result<Self> {
let var = env::var("DRIVE_STORAGE_PATH");
let path_str = match var {
Ok(ref string) => string.trim(),
Err(err) => {
tracing::info!(
%err,
"Error getting DRIVE_STORAGE_PATH variable. Defaulting to ./files"
);
"./files"
}
};
let path = Path::new(path_str);
match path.metadata() {
Ok(meta) => anyhow::ensure!(meta.is_dir(), "Expected path to a directory"),
Err(err) if err.kind() == io::ErrorKind::NotFound => {
std::fs::create_dir_all(path)?;
}
Err(err) => return Err(err.into()),
};
Ok(FileStorage(path.into()))
}
fn path_for_file(&self, file_id: Uuid) -> PathBuf {
let file_name = file_id.as_hyphenated().to_string();
self.0.join(file_name)
}
async fn create_inner(&self, file_id: Uuid) -> anyhow::Result<impl tokio::io::AsyncWrite> {
fs::File::create_new(self.path_for_file(file_id))
.await
.map_err(Into::into)
}
pub async fn create(&self) -> anyhow::Result<(Uuid, impl tokio::io::AsyncWrite)> {
let mut error = anyhow::anyhow!("Error creating a file");
for _ in 0..3 {
let file_id = Uuid::now_v7();
match self.create_inner(file_id).await {
Ok(file) => return Ok((file_id, file)),
Err(err) => error = error.context(err),
}
}
Err(error)
}
pub async fn read(&self, file_id: Uuid) -> anyhow::Result<impl tokio::io::AsyncRead> {
fs::File::open(self.path_for_file(file_id))
.await
.map_err(Into::into)
}
pub async fn write(&self, file_id: Uuid) -> anyhow::Result<Option<impl tokio::io::AsyncWrite>> {
match fs::File::create(self.path_for_file(file_id)).await {
Ok(file) => Ok(Some(file)),
Err(err) if err.kind() == io::ErrorKind::NotFound => Ok(None),
Err(err) => Err(err.into()),
}
}
pub async fn delete(&self, file_id: Uuid) -> anyhow::Result<bool> {
match fs::remove_file(self.path_for_file(file_id)).await {
Ok(()) => Ok(true),
Err(err) if err.kind() == io::ErrorKind::NotFound => Ok(false),
Err(err) => Err(err.into()),
}
}
pub async fn write_to_file<F, S, E>(file: F, stream: S) -> io::Result<(Vec<u8>, i64)>
where
F: AsyncWrite + Unpin,
S: Stream<Item = Result<Bytes, E>> + Unpin,
E: Into<Box<dyn std::error::Error + Send + Sync>>,
{
const BUF_CAP: usize = 64 * 1024 * 1024; // 64 MiB
let mut hash = sha2::Sha512::new();
let mut size: i64 = 0;
let stream = stream.map(|value| {
let bytes = value.map_err(io::Error::other)?;
hash.update(&bytes);
size = i64::try_from(bytes.len())
.ok()
.and_then(|part_size| size.checked_add(part_size))
.ok_or_else(|| io::Error::other(anyhow::anyhow!("Size calculation overflow")))?;
io::Result::Ok(bytes)
});
let mut reader = StreamReader::new(stream);
let mut writer = BufWriter::with_capacity(BUF_CAP, file);
tokio::io::copy_buf(&mut reader, &mut writer).await?;
writer.flush().await?;
let hash = hash.finalize().to_vec();
Ok((hash, size))
}
}