From 8af4d0ab12869b5e78b98cbe2d26662719df18fc Mon Sep 17 00:00:00 2001 From: StNicolay Date: Sat, 6 May 2023 19:21:40 +0300 Subject: [PATCH] Updated import and export commands to work better --- src/entity/account.rs | 4 +-- src/handlers/commands/export.rs | 26 +++++++++++-------- src/handlers/commands/import.rs | 45 +++++++++++++++++---------------- 3 files changed, 41 insertions(+), 34 deletions(-) diff --git a/src/entity/account.rs b/src/entity/account.rs index 771388c..f7eac1f 100644 --- a/src/entity/account.rs +++ b/src/entity/account.rs @@ -96,7 +96,7 @@ impl Entity { .filter(Column::UserId.eq(user_id)) .stream(db) .await?; - Ok(result.map_err(Into::into)) + Ok(result.err_into()) } /// Gets a list of account names of a user @@ -113,7 +113,7 @@ impl Entity { select = select.order_by_asc(Column::Name); } let result = select.into_tuple().stream(db).await?; - Ok(result.map_err(Into::into)) + Ok(result.err_into()) } pub async fn exists( diff --git a/src/handlers/commands/export.rs b/src/handlers/commands/export.rs index 2bf732c..4da9b0e 100644 --- a/src/handlers/commands/export.rs +++ b/src/handlers/commands/export.rs @@ -8,7 +8,7 @@ use sea_orm::DatabaseConnection; use serde_json::to_string_pretty; use std::sync::Arc; use teloxide::{adaptors::Throttle, prelude::*, types::InputFile}; -use tokio::task::JoinSet; +use tokio::{sync::Mutex, task::spawn_blocking}; async fn get_master_pass( bot: Throttle, @@ -21,21 +21,27 @@ async fn get_master_pass( let _ = bot.delete_message(previous.chat.id, previous.id).await; let master_pass: Arc = master_pass.into(); let user_id = msg.from().unwrap().id.0; - let mut join_set = JoinSet::new(); - let mut accounts = Vec::new(); + let accounts = Arc::new(Mutex::new(Vec::new())); Account::get_all(user_id, &db) .await? - .try_for_each(|account| { + .try_for_each_concurrent(None, |account| { let master_pass = Arc::clone(&master_pass); - join_set.spawn_blocking(move || DecryptedAccount::from_account(account, &master_pass)); - async { crate::Result::Ok(()) } + let accounts = Arc::clone(&accounts); + async move { + let account = + spawn_blocking(move || DecryptedAccount::from_account(account, &master_pass)) + .await??; + accounts.lock().await.push(account); + Ok(()) + } }) .await?; drop(master_pass); - while let Some(account) = join_set.join_next().await.transpose()?.transpose()? { - accounts.push(account) - } - accounts.sort_by(|this, other| this.name.cmp(&other.name)); + let mut accounts = match Arc::try_unwrap(accounts) { + Ok(account) => account.into_inner(), + Err(_) => unreachable!(), + }; + accounts.sort_unstable_by(|this, other| this.name.cmp(&other.name)); let json = to_string_pretty(&User { accounts })?; let file = InputFile::memory(json).file_name("accounts.json"); bot.send_document(msg.chat.id, file).await?; diff --git a/src/handlers/commands/import.rs b/src/handlers/commands/import.rs index 4d02cd3..40756df 100644 --- a/src/handlers/commands/import.rs +++ b/src/handlers/commands/import.rs @@ -2,13 +2,13 @@ use crate::{ handlers::{utils::package_handler, MainDialogue, State}, models::{DecryptedAccount, User}, }; -use futures::TryStreamExt; +use futures::{stream, StreamExt, TryStreamExt}; use itertools::Itertools; use sea_orm::prelude::*; use serde_json::from_slice; use std::sync::Arc; use teloxide::{adaptors::Throttle, net::Download, prelude::*}; -use tokio::task::{spawn_blocking, JoinSet}; +use tokio::{sync::Mutex, task::spawn_blocking}; async fn get_master_pass( bot: Throttle, @@ -22,28 +22,29 @@ async fn get_master_pass( let _ = bot.delete_message(previous.chat.id, previous.id).await; let user_id = msg.from().unwrap().id.0; let master_pass: Arc = master_pass.into(); - let mut join_set = JoinSet::new(); - let mut failed = Vec::new(); - for account in accounts { - let master_pass = Arc::clone(&master_pass); - let db = db.clone(); - join_set.spawn(async move { + let failed = Arc::new(Mutex::new(Vec::new())); + stream::iter(accounts) + .for_each_concurrent(None, |account| { + let master_pass = Arc::clone(&master_pass); + let failed = Arc::clone(&failed); + let db = db.clone(); let name = account.name.clone(); - let account = + async move { match spawn_blocking(move || account.into_account(user_id, &master_pass)).await { - Ok(Ok(account)) => account, - _ => return Err(name), - }; - account.insert(&db).await.map_err(|_| name)?; - Ok(()) - }); - } - while let Some(result) = join_set.join_next().await.transpose()? { - match result { - Ok(()) => (), - Err(name) => failed.push(name), - } - } + Ok(Ok(account)) => match account.insert(&db).await { + Ok(_) => (), + Err(_) => failed.lock().await.push(name), + }, + _ => failed.lock().await.push(name), + } + } + }) + .await; + drop(master_pass); + let failed = match Arc::try_unwrap(failed) { + Ok(accounts) => accounts.into_inner(), + Err(_) => unreachable!(), + }; if failed.is_empty() { bot.send_message(msg.chat.id, "Success").await?; } else {