From cc02d91c030dd72748ea30b40d3ac3149a0e62f0 Mon Sep 17 00:00:00 2001 From: StNicolay Date: Fri, 9 Jun 2023 18:05:45 +0300 Subject: [PATCH] Added limits for the amount of workers for /export and /import and moved out async blocks out of them into sepparate functions --- Cargo.lock | 90 ++++++++++++++++++++++++++++++++++++------ Cargo.toml | 1 + src/commands/export.rs | 32 +++++++++------ src/commands/import.rs | 68 +++++++++++++++++-------------- 4 files changed, 138 insertions(+), 53 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a817aa9..e8a821d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -420,6 +420,40 @@ dependencies = [ "libc", ] +[[package]] +name = "crossbeam-channel" +version = "0.5.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a33c2bf77f2df06183c3aa30d1e96c0695a313d4f9c453cc3762a6db39f99200" +dependencies = [ + "cfg-if", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-deque" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ce6fd6f855243022dcecf8702fef0c297d4338e226845fe067f6341ad9fa0cef" +dependencies = [ + "cfg-if", + "crossbeam-epoch", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-epoch" +version = "0.9.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "46bd5f3f85273295a9d14aedfb86f6aadbff6d8f5295c4a9edb08e819dcf5695" +dependencies = [ + "autocfg", + "cfg-if", + "crossbeam-utils", + "memoffset", + "scopeguard", +] + [[package]] name = "crossbeam-queue" version = "0.3.8" @@ -951,7 +985,7 @@ dependencies = [ "hyper", "rustls 0.21.1", "tokio", - "tokio-rustls 0.24.0", + "tokio-rustls 0.24.1", ] [[package]] @@ -1141,6 +1175,15 @@ version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d" +[[package]] +name = "memoffset" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d61c719bcfbcf5d62b3a09efa6088de8c54bc0bfcd3ea7ae39fcc186108b8de1" +dependencies = [ + "autocfg", +] + [[package]] name = "migration" version = "0.2.0" @@ -1347,6 +1390,7 @@ dependencies = [ "log", "migration", "pretty_env_logger", + "rayon", "sea-orm", "serde", "serde_json", @@ -1513,9 +1557,9 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.59" +version = "1.0.60" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6aeca18b86b413c660b781aa319e4e2648a3e6f9eadc9b47e9038e6fe9f3451b" +checksum = "dec2b086b7a862cf4de201096214fa870344cf922b2b30c167badb3af3195406" dependencies = [ "unicode-ident", ] @@ -1585,6 +1629,28 @@ dependencies = [ "getrandom", ] +[[package]] +name = "rayon" +version = "1.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d2df5196e37bcc87abebc0053e20787d73847bb33134a69841207dd0a47f03b" +dependencies = [ + "either", + "rayon-core", +] + +[[package]] +name = "rayon-core" +version = "1.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4b8f95bd6966f5c87776639160a66bd8ab9895d9d4ab01ddba9fc60661aebe8d" +dependencies = [ + "crossbeam-channel", + "crossbeam-deque", + "crossbeam-utils", + "num_cpus", +] + [[package]] name = "rc-box" version = "1.2.0" @@ -1674,7 +1740,7 @@ dependencies = [ "serde_json", "serde_urlencoded", "tokio", - "tokio-rustls 0.24.0", + "tokio-rustls 0.24.1", "tokio-util", "tower-service", "url", @@ -2059,18 +2125,18 @@ checksum = "bebd363326d05ec3e2f532ab7660680f3b02130d780c299bca73469d521bc0ed" [[package]] name = "serde" -version = "1.0.163" +version = "1.0.164" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2113ab51b87a539ae008b5c6c02dc020ffa39afd2d83cffcb3f4eb2722cebec2" +checksum = "9e8c8cf938e98f769bc164923b06dce91cea1751522f46f8466461af04c9027d" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.163" +version = "1.0.164" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8c805777e3930c8883389c602315a24224bcc738b63905ef87cd1420353ea93e" +checksum = "d9735b638ccc51c28bf6914d90a2e9725b377144fc612c49a611fddd1b631d68" dependencies = [ "proc-macro2", "quote", @@ -2484,9 +2550,9 @@ dependencies = [ [[package]] name = "time" -version = "0.3.21" +version = "0.3.22" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f3403384eaacbca9923fa06940178ac13e4edb725486d70e8e15881d0c836cc" +checksum = "ea9e1b3cf1243ae005d9e74085d4d542f3125458f3a81af210d901dcd7411efd" dependencies = [ "itoa", "serde", @@ -2566,9 +2632,9 @@ dependencies = [ [[package]] name = "tokio-rustls" -version = "0.24.0" +version = "0.24.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e0d409377ff5b1e3ca6437aa86c1eb7d40c134bfec254e44c830defa92669db5" +checksum = "c28327cf380ac148141087fbfb9de9d7bd4e84ab5d2c28fbc911d753de8a7081" dependencies = [ "rustls 0.21.1", "tokio", diff --git a/Cargo.toml b/Cargo.toml index 3b806c2..8a518f6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,6 +22,7 @@ itertools = "0.10.5" log = "0.4.17" migration = { version = "0.2.0", path = "migration" } pretty_env_logger = "0.5.0" +rayon = "1.7.0" sea-orm = { version = "0.11.3", features = ["sqlx-mysql", "runtime-tokio-rustls"] } serde = "1.0.163" serde_json = "1.0.96" diff --git a/src/commands/export.rs b/src/commands/export.rs index 24201a1..a0c51de 100644 --- a/src/commands/export.rs +++ b/src/commands/export.rs @@ -12,6 +12,19 @@ use std::sync::Arc; use teloxide::{adaptors::Throttle, prelude::*, types::InputFile}; use tokio::{sync::Mutex, task::spawn_blocking}; +/// Decryptes the account on a worker thread and adds it to the accounts vector +#[inline] +async fn decrypt_account( + account: account::Model, + master_pass: Arc, + accounts: &Mutex<&mut Vec>, +) -> crate::Result<()> { + let account = + spawn_blocking(move || DecryptedAccount::from_account(account, &master_pass)).await??; + accounts.lock().await.push(account); + Ok(()) +} + /// Gets the master password, decryptes the account and sends the json file to the user async fn get_master_pass( bot: Throttle, @@ -22,28 +35,25 @@ async fn get_master_pass( ) -> crate::Result<()> { let user_id = msg.from().ok_or(NoUserInfo)?.id.0; let mut accounts = Vec::new(); + { - let accounts = &Mutex::new(&mut accounts); + let accounts = Mutex::new(&mut accounts); let master_pass: Arc = master_pass.into(); + Account::get_all(user_id, &db) .await? .err_into::() - .try_for_each_concurrent(None, |account| { - let master_pass = Arc::clone(&master_pass); - async move { - let account = spawn_blocking(move || { - DecryptedAccount::from_account(account, &master_pass) - }) - .await??; - accounts.lock().await.push(account); - Ok(()) - } + .try_for_each_concurrent(3, |account| { + decrypt_account(account, master_pass.clone(), &accounts) }) .await?; } + accounts.sort_unstable_by(|this, other| this.name.cmp(&other.name)); + let json = to_vec_pretty(&User { accounts })?; let file = InputFile::memory(json).file_name("accounts.json"); + bot.send_document(msg.chat.id, file) .reply_markup(deletion_markup()) .await?; diff --git a/src/commands/import.rs b/src/commands/import.rs index b49600d..0650a1f 100644 --- a/src/commands/import.rs +++ b/src/commands/import.rs @@ -1,12 +1,39 @@ use crate::{ - errors::NoUserInfo, markups::deletion_markup, models::User, Handler, MainDialogue, State, + errors::NoUserInfo, + markups::deletion_markup, + models::{DecryptedAccount, User}, + Handler, MainDialogue, State, }; -use futures::{future, stream::FuturesUnordered, StreamExt}; +use futures::{stream, StreamExt}; use itertools::Itertools; use sea_orm::prelude::*; use std::sync::Arc; use teloxide::{adaptors::Throttle, prelude::*}; -use tokio::task::spawn_blocking; +use tokio::{sync::Mutex, task::spawn_blocking}; + +/// Ecryptes the account and adds it to the database +/// If any of these steps fail, the account name will be added to the failed vector +#[inline] +async fn encrypt_account( + account: DecryptedAccount, + user_id: u64, + db: &DatabaseConnection, + master_pass: Arc, + failed: &Mutex<&mut Vec>, +) { + if !account.validate() { + failed.lock().await.push(account.name); + return; + } + let name = account.name.clone(); + match spawn_blocking(move || account.into_account(user_id, &master_pass)).await { + Ok(Ok(account)) => match account.insert(db).await { + Ok(_) => (), + Err(_) => failed.lock().await.push(name), + }, + _ => failed.lock().await.push(name), + } +} /// Gets the master password, encryptes and adds the accounts to the DB async fn get_master_pass( @@ -18,38 +45,19 @@ async fn get_master_pass( user: User, ) -> crate::Result<()> { let user_id = msg.from().ok_or(NoUserInfo)?.id.0; - let accounts = user.accounts; + let mut failed = Vec::new(); - let failed: Vec = { + { + let failed = Mutex::new(&mut failed); let master_pass: Arc = master_pass.into(); - let db = &db; - let futures: FuturesUnordered<_> = accounts - .into_iter() - .map(|account| { - let master_pass = Arc::clone(&master_pass); - async move { - if !account.validate() { - return Err(account.name); - } - let name = account.name.clone(); - match spawn_blocking(move || account.into_account(user_id, &master_pass)).await - { - Ok(Ok(account)) => match account.insert(db).await { - Ok(_) => Ok(()), - Err(_) => Err(name), - }, - _ => Err(name), - } - } + stream::iter(user.accounts) + .for_each_concurrent(3, |account| { + encrypt_account(account, user_id, &db, master_pass.clone(), &failed) }) - .collect(); + .await; + } - futures - .filter_map(|result| future::ready(result.err())) - .collect() - .await - }; let message = if failed.is_empty() { "Success".to_owned() } else {