From 2c69882b130edae7a65457dddfa3c4c1c90795ab Mon Sep 17 00:00:00 2001 From: StNicolay Date: Sun, 14 May 2023 13:57:44 +0300 Subject: [PATCH] import.rs now uses the FuturesUnordered to remove a mutex around the vector of failed accounts --- src/handlers/commands/import.rs | 59 +++++++++++++++------------------ 1 file changed, 26 insertions(+), 33 deletions(-) diff --git a/src/handlers/commands/import.rs b/src/handlers/commands/import.rs index a201f85..89018b0 100644 --- a/src/handlers/commands/import.rs +++ b/src/handlers/commands/import.rs @@ -3,13 +3,13 @@ use crate::{ handlers::{markups::deletion_markup, utils::package_handler, MainDialogue, State}, models::{DecryptedAccount, User}, }; -use futures::{stream, StreamExt, TryStreamExt}; +use futures::{future, stream::FuturesUnordered, StreamExt, TryStreamExt}; use itertools::Itertools; use sea_orm::prelude::*; use serde_json::from_slice; use std::sync::Arc; use teloxide::{adaptors::Throttle, net::Download, prelude::*}; -use tokio::{sync::Mutex, task::spawn_blocking}; +use tokio::task::spawn_blocking; /// Gets the master password, encryptes and adds the accounts to the DB async fn get_master_pass( @@ -21,40 +21,33 @@ async fn get_master_pass( accounts: Vec, ) -> crate::Result<()> { let user_id = msg.from().ok_or(NoUserInfo)?.id.0; - let master_pass: Arc = master_pass.into(); - let failed = Arc::new(Mutex::new(Vec::new())); - stream::iter(accounts) - .for_each_concurrent(None, |account| { - let master_pass = Arc::clone(&master_pass); - let failed = Arc::clone(&failed); - let db = db.clone(); - let name = account.name.clone(); - async move { - let result = spawn_blocking(move || { + let failed: Vec = { + let master_pass: Arc = master_pass.into(); + let db = &db; + let futures: FuturesUnordered<_> = accounts + .into_iter() + .map(|account| { + let master_pass = Arc::clone(&master_pass); + async move { if !account.validate() { - return Err(()); + return Err(account.name); + } + let name = account.name.clone(); + match spawn_blocking(move || account.into_account(user_id, &master_pass)).await + { + Ok(Ok(account)) => match account.insert(db).await { + Ok(_) => Ok(()), + Err(_) => Err(name), + }, + _ => Err(name), } - account.into_account(user_id, &master_pass).map_err(|_| ()) - }) - .await; - match result { - Ok(Ok(account)) => match account.insert(&db).await { - Ok(_) => (), - Err(_) => failed.lock().await.push(name), - }, - _ => failed.lock().await.push(name), } - } - }) - .await; - drop(master_pass); - let failed = match Arc::try_unwrap(failed) { - Ok(accounts) => accounts.into_inner(), - Err(_) => { - return Err(crate::Error::msg( - "Couldn't get accounts from Arc in import.rs", - )) - } + }) + .collect(); + futures + .filter_map(|result| future::ready(result.err())) + .collect() + .await }; let message = if failed.is_empty() { "Success".to_owned()