Added limits for the amount of workers for /export and /import and moved out async blocks out of them into sepparate functions
This commit is contained in:
		@@ -12,6 +12,19 @@ use std::sync::Arc;
 | 
			
		||||
use teloxide::{adaptors::Throttle, prelude::*, types::InputFile};
 | 
			
		||||
use tokio::{sync::Mutex, task::spawn_blocking};
 | 
			
		||||
 | 
			
		||||
/// Decryptes the account on a worker thread and adds it to the accounts vector
 | 
			
		||||
#[inline]
 | 
			
		||||
async fn decrypt_account(
 | 
			
		||||
    account: account::Model,
 | 
			
		||||
    master_pass: Arc<str>,
 | 
			
		||||
    accounts: &Mutex<&mut Vec<DecryptedAccount>>,
 | 
			
		||||
) -> crate::Result<()> {
 | 
			
		||||
    let account =
 | 
			
		||||
        spawn_blocking(move || DecryptedAccount::from_account(account, &master_pass)).await??;
 | 
			
		||||
    accounts.lock().await.push(account);
 | 
			
		||||
    Ok(())
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
/// Gets the master password, decryptes the account and sends the json file to the user
 | 
			
		||||
async fn get_master_pass(
 | 
			
		||||
    bot: Throttle<Bot>,
 | 
			
		||||
@@ -22,28 +35,25 @@ async fn get_master_pass(
 | 
			
		||||
) -> crate::Result<()> {
 | 
			
		||||
    let user_id = msg.from().ok_or(NoUserInfo)?.id.0;
 | 
			
		||||
    let mut accounts = Vec::new();
 | 
			
		||||
 | 
			
		||||
    {
 | 
			
		||||
        let accounts = &Mutex::new(&mut accounts);
 | 
			
		||||
        let accounts = Mutex::new(&mut accounts);
 | 
			
		||||
        let master_pass: Arc<str> = master_pass.into();
 | 
			
		||||
 | 
			
		||||
        Account::get_all(user_id, &db)
 | 
			
		||||
            .await?
 | 
			
		||||
            .err_into::<crate::Error>()
 | 
			
		||||
            .try_for_each_concurrent(None, |account| {
 | 
			
		||||
                let master_pass = Arc::clone(&master_pass);
 | 
			
		||||
                async move {
 | 
			
		||||
                    let account = spawn_blocking(move || {
 | 
			
		||||
                        DecryptedAccount::from_account(account, &master_pass)
 | 
			
		||||
                    })
 | 
			
		||||
                    .await??;
 | 
			
		||||
                    accounts.lock().await.push(account);
 | 
			
		||||
                    Ok(())
 | 
			
		||||
                }
 | 
			
		||||
            .try_for_each_concurrent(3, |account| {
 | 
			
		||||
                decrypt_account(account, master_pass.clone(), &accounts)
 | 
			
		||||
            })
 | 
			
		||||
            .await?;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    accounts.sort_unstable_by(|this, other| this.name.cmp(&other.name));
 | 
			
		||||
 | 
			
		||||
    let json = to_vec_pretty(&User { accounts })?;
 | 
			
		||||
    let file = InputFile::memory(json).file_name("accounts.json");
 | 
			
		||||
 | 
			
		||||
    bot.send_document(msg.chat.id, file)
 | 
			
		||||
        .reply_markup(deletion_markup())
 | 
			
		||||
        .await?;
 | 
			
		||||
 
 | 
			
		||||
@@ -1,12 +1,39 @@
 | 
			
		||||
use crate::{
 | 
			
		||||
    errors::NoUserInfo, markups::deletion_markup, models::User, Handler, MainDialogue, State,
 | 
			
		||||
    errors::NoUserInfo,
 | 
			
		||||
    markups::deletion_markup,
 | 
			
		||||
    models::{DecryptedAccount, User},
 | 
			
		||||
    Handler, MainDialogue, State,
 | 
			
		||||
};
 | 
			
		||||
use futures::{future, stream::FuturesUnordered, StreamExt};
 | 
			
		||||
use futures::{stream, StreamExt};
 | 
			
		||||
use itertools::Itertools;
 | 
			
		||||
use sea_orm::prelude::*;
 | 
			
		||||
use std::sync::Arc;
 | 
			
		||||
use teloxide::{adaptors::Throttle, prelude::*};
 | 
			
		||||
use tokio::task::spawn_blocking;
 | 
			
		||||
use tokio::{sync::Mutex, task::spawn_blocking};
 | 
			
		||||
 | 
			
		||||
/// Ecryptes the account and adds it to the database
 | 
			
		||||
/// If any of these steps fail, the account name will be added to the failed vector
 | 
			
		||||
#[inline]
 | 
			
		||||
async fn encrypt_account(
 | 
			
		||||
    account: DecryptedAccount,
 | 
			
		||||
    user_id: u64,
 | 
			
		||||
    db: &DatabaseConnection,
 | 
			
		||||
    master_pass: Arc<str>,
 | 
			
		||||
    failed: &Mutex<&mut Vec<String>>,
 | 
			
		||||
) {
 | 
			
		||||
    if !account.validate() {
 | 
			
		||||
        failed.lock().await.push(account.name);
 | 
			
		||||
        return;
 | 
			
		||||
    }
 | 
			
		||||
    let name = account.name.clone();
 | 
			
		||||
    match spawn_blocking(move || account.into_account(user_id, &master_pass)).await {
 | 
			
		||||
        Ok(Ok(account)) => match account.insert(db).await {
 | 
			
		||||
            Ok(_) => (),
 | 
			
		||||
            Err(_) => failed.lock().await.push(name),
 | 
			
		||||
        },
 | 
			
		||||
        _ => failed.lock().await.push(name),
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
/// Gets the master password, encryptes and adds the accounts to the DB
 | 
			
		||||
async fn get_master_pass(
 | 
			
		||||
@@ -18,38 +45,19 @@ async fn get_master_pass(
 | 
			
		||||
    user: User,
 | 
			
		||||
) -> crate::Result<()> {
 | 
			
		||||
    let user_id = msg.from().ok_or(NoUserInfo)?.id.0;
 | 
			
		||||
    let accounts = user.accounts;
 | 
			
		||||
    let mut failed = Vec::new();
 | 
			
		||||
 | 
			
		||||
    let failed: Vec<String> = {
 | 
			
		||||
    {
 | 
			
		||||
        let failed = Mutex::new(&mut failed);
 | 
			
		||||
        let master_pass: Arc<str> = master_pass.into();
 | 
			
		||||
        let db = &db;
 | 
			
		||||
 | 
			
		||||
        let futures: FuturesUnordered<_> = accounts
 | 
			
		||||
            .into_iter()
 | 
			
		||||
            .map(|account| {
 | 
			
		||||
                let master_pass = Arc::clone(&master_pass);
 | 
			
		||||
                async move {
 | 
			
		||||
                    if !account.validate() {
 | 
			
		||||
                        return Err(account.name);
 | 
			
		||||
                    }
 | 
			
		||||
                    let name = account.name.clone();
 | 
			
		||||
                    match spawn_blocking(move || account.into_account(user_id, &master_pass)).await
 | 
			
		||||
                    {
 | 
			
		||||
                        Ok(Ok(account)) => match account.insert(db).await {
 | 
			
		||||
                            Ok(_) => Ok(()),
 | 
			
		||||
                            Err(_) => Err(name),
 | 
			
		||||
                        },
 | 
			
		||||
                        _ => Err(name),
 | 
			
		||||
                    }
 | 
			
		||||
                }
 | 
			
		||||
        stream::iter(user.accounts)
 | 
			
		||||
            .for_each_concurrent(3, |account| {
 | 
			
		||||
                encrypt_account(account, user_id, &db, master_pass.clone(), &failed)
 | 
			
		||||
            })
 | 
			
		||||
            .collect();
 | 
			
		||||
            .await;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
        futures
 | 
			
		||||
            .filter_map(|result| future::ready(result.err()))
 | 
			
		||||
            .collect()
 | 
			
		||||
            .await
 | 
			
		||||
    };
 | 
			
		||||
    let message = if failed.is_empty() {
 | 
			
		||||
        "Success".to_owned()
 | 
			
		||||
    } else {
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user