import.rs now uses the FuturesUnordered to remove a mutex around the vector of failed accounts

This commit is contained in:
StNicolay 2023-05-14 13:57:44 +03:00
parent 7f949e3cdc
commit 2c69882b13
Signed by: StNicolay
GPG Key ID: 9693D04DCD962B0D

View File

@ -3,13 +3,13 @@ use crate::{
handlers::{markups::deletion_markup, utils::package_handler, MainDialogue, State}, handlers::{markups::deletion_markup, utils::package_handler, MainDialogue, State},
models::{DecryptedAccount, User}, models::{DecryptedAccount, User},
}; };
use futures::{stream, StreamExt, TryStreamExt}; use futures::{future, stream::FuturesUnordered, StreamExt, TryStreamExt};
use itertools::Itertools; use itertools::Itertools;
use sea_orm::prelude::*; use sea_orm::prelude::*;
use serde_json::from_slice; use serde_json::from_slice;
use std::sync::Arc; use std::sync::Arc;
use teloxide::{adaptors::Throttle, net::Download, prelude::*}; use teloxide::{adaptors::Throttle, net::Download, prelude::*};
use tokio::{sync::Mutex, task::spawn_blocking}; use tokio::task::spawn_blocking;
/// Gets the master password, encryptes and adds the accounts to the DB /// Gets the master password, encryptes and adds the accounts to the DB
async fn get_master_pass( async fn get_master_pass(
@ -21,40 +21,33 @@ async fn get_master_pass(
accounts: Vec<DecryptedAccount>, accounts: Vec<DecryptedAccount>,
) -> crate::Result<()> { ) -> crate::Result<()> {
let user_id = msg.from().ok_or(NoUserInfo)?.id.0; let user_id = msg.from().ok_or(NoUserInfo)?.id.0;
let master_pass: Arc<str> = master_pass.into(); let failed: Vec<String> = {
let failed = Arc::new(Mutex::new(Vec::new())); let master_pass: Arc<str> = master_pass.into();
stream::iter(accounts) let db = &db;
.for_each_concurrent(None, |account| { let futures: FuturesUnordered<_> = accounts
let master_pass = Arc::clone(&master_pass); .into_iter()
let failed = Arc::clone(&failed); .map(|account| {
let db = db.clone(); let master_pass = Arc::clone(&master_pass);
let name = account.name.clone(); async move {
async move {
let result = spawn_blocking(move || {
if !account.validate() { if !account.validate() {
return Err(()); return Err(account.name);
}
let name = account.name.clone();
match spawn_blocking(move || account.into_account(user_id, &master_pass)).await
{
Ok(Ok(account)) => match account.insert(db).await {
Ok(_) => Ok(()),
Err(_) => Err(name),
},
_ => Err(name),
} }
account.into_account(user_id, &master_pass).map_err(|_| ())
})
.await;
match result {
Ok(Ok(account)) => match account.insert(&db).await {
Ok(_) => (),
Err(_) => failed.lock().await.push(name),
},
_ => failed.lock().await.push(name),
} }
} })
}) .collect();
.await; futures
drop(master_pass); .filter_map(|result| future::ready(result.err()))
let failed = match Arc::try_unwrap(failed) { .collect()
Ok(accounts) => accounts.into_inner(), .await
Err(_) => {
return Err(crate::Error::msg(
"Couldn't get accounts from Arc in import.rs",
))
}
}; };
let message = if failed.is_empty() { let message = if failed.is_empty() {
"Success".to_owned() "Success".to_owned()