diff --git a/requirements.txt b/requirements.txt index 19063d5..a24bafe 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,3 +4,4 @@ python-dotenv pyTelegramBotAPI sqlmodel pydantic +aiohttp diff --git a/src/__init__.py b/src/__init__.py index 372432d..472ca7c 100644 --- a/src/__init__.py +++ b/src/__init__.py @@ -1,3 +1,4 @@ +import asyncio import os from dotenv import load_dotenv @@ -33,4 +34,4 @@ def main() -> None: ) database.prepare.prepare(engine) bot_ = bot.create_bot(os.getenv("TG_TOKEN"), engine) - bot_.infinity_polling() + asyncio.run(bot_.infinity_polling()) diff --git a/src/bot/__init__.py b/src/bot/__init__.py index a832d04..db386ad 100644 --- a/src/bot/__init__.py +++ b/src/bot/__init__.py @@ -1,15 +1,15 @@ import functools -import telebot from sqlalchemy.future import Engine +from telebot.async_telebot import AsyncTeleBot from . import handlers __all__ = ["handlers"] -def create_bot(token: str, engine: Engine) -> telebot.TeleBot: - bot = telebot.TeleBot(token) +def create_bot(token: str, engine: Engine) -> AsyncTeleBot: + bot = AsyncTeleBot(token) bot.register_message_handler( functools.partial(handlers.set_master_password, bot, engine), commands=["set_master_pass"], @@ -56,4 +56,19 @@ def create_bot(token: str, engine: Engine) -> telebot.TeleBot: functools.partial(handlers.gen_password, bot), commands=["gen_password"], ) + bot.register_message_handler( + functools.partial(handlers.message_handler, bot), + content_types=[ + "text", + "audio", + "document", + "photo", + "sticker", + "video", + "video_note", + "voice", + "location", + "contact", + ], + ) return bot diff --git a/src/bot/handlers.py b/src/bot/handlers.py index 8476daf..4fd81af 100644 --- a/src/bot/handlers.py +++ b/src/bot/handlers.py @@ -1,9 +1,11 @@ +import asyncio import functools import gc -import time +from typing import Any, Awaitable, Callable import telebot from sqlalchemy.future import Engine +from telebot.async_telebot import AsyncTeleBot from .. import database, encryption, generate_password from ..account_checks import ( @@ -16,49 +18,68 @@ from ..account_parsing import accounts_to_json, json_to_accounts from ..decrypted_account import DecryptedAccount Message = telebot.types.Message +states: dict[tuple[int, int], Callable[[Message], Awaitable[Any]]] = {} -def _delete_message(bot: telebot.TeleBot, mes: Message) -> bool: +def _register_state( + message: Message, + handler: Callable[[Message], Any], +) -> None: + states[(message.chat.id, message.from_user.id)] = handler + + +async def _delete_message( + bot: AsyncTeleBot, + mes: Message, + *, + sleep_time: int = 0, +) -> bool: try: - bot.delete_message(mes.chat.id, mes.id) + if sleep_time != 0: + await asyncio.sleep(sleep_time) + await bot.delete_message(mes.chat.id, mes.id) except telebot.apihelper.ApiException: return False else: return True -def _send_tmp_message( - bot: telebot.TeleBot, +async def _send_tmp_message( + bot: AsyncTeleBot, chat_id: telebot.types.Message, text: str, - timeout: int = 5, + sleep_time: int = 5, ) -> None: - bot_mes = bot.send_message(chat_id, text, parse_mode="MarkdownV2") - time.sleep(timeout) - _delete_message(bot, bot_mes) + bot_mes = await bot.send_message(chat_id, text, parse_mode="MarkdownV2") + await _delete_message(bot, bot_mes, sleep_time=sleep_time) -def _base_handler( - bot: telebot.TeleBot, mes: Message, prev_mes: Message | None = None +async def _base_handler( + bot: AsyncTeleBot, mes: Message, prev_mes: Message | None = None ) -> None: - _delete_message(bot, mes) + states.pop((mes.chat.id, mes.from_user.id), None) + await _delete_message(bot, mes) if prev_mes is not None: - _delete_message(bot, prev_mes) + await _delete_message(bot, prev_mes) -def get_accounts(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None: - _base_handler(bot, mes) +async def get_accounts( + bot: AsyncTeleBot, + engine: Engine, + mes: Message, +) -> None: + await _base_handler(bot, mes) accounts = database.get.get_accounts( engine, mes.from_user.id, to_sort=True, ) if not accounts: - return _send_tmp_message(bot, mes.chat.id, "У вас нет аккаунтов") + return await _send_tmp_message(bot, mes.chat.id, "У вас нет аккаунтов") # Make accounts copyable and escape special chars accounts = [f"`{account}`" for account in accounts] - _send_tmp_message( + await _send_tmp_message( bot, mes.chat.id, "Ваши аккаунты:\n" @@ -69,61 +90,65 @@ def get_accounts(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None: ) -def delete_all(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None: - _base_handler(bot, mes) - bot_mes = bot.send_message( +async def delete_all(bot: AsyncTeleBot, engine: Engine, mes: Message) -> None: + await _base_handler(bot, mes) + bot_mes = await bot.send_message( mes.chat.id, "Вы действительно хотите удалить все ваши аккаунты? Это действие " "нельзя отменить. " "Отправьте YES для подтверждения", ) - bot.register_next_step_handler( - mes, functools.partial(_delete_all2, bot, engine, bot_mes) - ) + _register_state(mes, functools.partial(_delete_all2, bot, engine, bot_mes)) -def _delete_all2( - bot: telebot.TeleBot, engine: Engine, prev_mes: Message, mes: Message +async def _delete_all2( + bot: AsyncTeleBot, engine: Engine, prev_mes: Message, mes: Message ) -> None: - _base_handler(bot, mes, prev_mes) + await _base_handler(bot, mes, prev_mes) text = mes.text.strip() if text == "YES": database.delete.purge_accounts(engine, mes.from_user.id) database.delete.delete_master_pass(engine, mes.from_user.id) - _send_tmp_message(bot, mes.chat.id, "Всё успешно удалено", timeout=10) + await _send_tmp_message( + bot, + mes.chat.id, + "Всё успешно удалено", + sleep_time=10, + ) else: - _send_tmp_message( + await _send_tmp_message( bot, mes.chat.id, "Вы отправили не YES, ничего не удалено", ) -def set_master_password( - bot: telebot.TeleBot, +async def set_master_password( + bot: AsyncTeleBot, engine: Engine, mes: Message, ) -> None: - _base_handler(bot, mes, None) + await _base_handler(bot, mes, None) if database.get.get_master_pass(engine, mes.from_user.id) is not None: - return _send_tmp_message( + return await _send_tmp_message( bot, mes.chat.id, "Мастер пароль уже существует", ) - bot_mes = bot.send_message(mes.chat.id, "Отправьте мастер пароль") - bot.register_next_step_handler( - mes, functools.partial(_set_master_pass2, bot, engine, bot_mes) + bot_mes = await bot.send_message(mes.chat.id, "Отправьте мастер пароль") + _register_state( + mes, + functools.partial(_set_master_pass2, bot, engine, bot_mes), ) -def _set_master_pass2( - bot: telebot.TeleBot, engine: Engine, prev_mes: Message, mes: Message +async def _set_master_pass2( + bot: AsyncTeleBot, engine: Engine, prev_mes: Message, mes: Message ) -> None: - _base_handler(bot, mes, prev_mes) + await _base_handler(bot, mes, prev_mes) text = mes.text.strip() if text == "/cancel": - return _send_tmp_message(bot, mes.chat.id, "Успешная отмена") + return await _send_tmp_message(bot, mes.chat.id, "Успешная отмена") master_password = encryption.master_pass.encrypt_master_pass( mes.from_user.id, @@ -131,39 +156,44 @@ def _set_master_pass2( ) database.add.add_master_pass(engine, master_password) - _send_tmp_message(bot, mes.chat.id, "Успех") + await _send_tmp_message(bot, mes.chat.id, "Успех") del mes, text gc.collect() -def reset_master_pass( - bot: telebot.TeleBot, +async def reset_master_pass( + bot: AsyncTeleBot, engine: Engine, mes: Message, ) -> None: - _base_handler(bot, mes) + await _base_handler(bot, mes) if database.get.get_master_pass(engine, mes.from_user.id) is None: - return _send_tmp_message(bot, mes.chat.id, "Мастер пароль не задан") + return await _send_tmp_message( + bot, + mes.chat.id, + "Мастер пароль не задан", + ) - bot_mes = bot.send_message( + bot_mes = await bot.send_message( mes.chat.id, "Отправьте новый мастер пароль, осторожно, все текущие аккаунты " "будут удалены навсегда", ) - bot.register_next_step_handler( - mes, functools.partial(_reset_master_pass2, bot, engine, bot_mes) + _register_state( + mes, + functools.partial(_reset_master_pass2, bot, engine, bot_mes), ) -def _reset_master_pass2( - bot: telebot.TeleBot, engine: Engine, prev_mes: Message, mes: Message +async def _reset_master_pass2( + bot: AsyncTeleBot, engine: Engine, prev_mes: Message, mes: Message ) -> None: - _base_handler(bot, mes, prev_mes) + await _base_handler(bot, mes, prev_mes) text = mes.text.strip() if text == "/cancel": - return _send_tmp_message(bot, mes.chat.id, "Успешная отмена") + return await _send_tmp_message(bot, mes.chat.id, "Успешная отмена") master_password = encryption.master_pass.encrypt_master_pass( mes.from_user.id, @@ -172,118 +202,132 @@ def _reset_master_pass2( database.delete.purge_accounts(engine, mes.from_user.id) database.change.change_master_pass(engine, master_password) - _send_tmp_message( + await _send_tmp_message( bot, mes.chat.id, "Все ваши аккаунты удалены, а мастер пароль изменён" ) del mes, text gc.collect() -def add_account(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None: - _base_handler(bot, mes) +async def add_account(bot: AsyncTeleBot, engine: Engine, mes: Message) -> None: + await _base_handler(bot, mes) master_password_from_db = database.get.get_master_pass( engine, mes.from_user.id, ) if master_password_from_db is None: - return _send_tmp_message(bot, mes.chat.id, "Нет мастер пароля") + return await _send_tmp_message(bot, mes.chat.id, "Нет мастер пароля") - bot_mes = bot.send_message(mes.chat.id, "Отправьте название аккаунта") + bot_mes = await bot.send_message( + mes.chat.id, + "Отправьте название аккаунта", + ) - bot.register_next_step_handler( - mes, functools.partial(_add_account2, bot, engine, bot_mes) + _register_state( + mes, + functools.partial(_add_account2, bot, engine, bot_mes), ) -def _add_account2( - bot: telebot.TeleBot, engine: Engine, prev_mes: Message, mes: Message +async def _add_account2( + bot: AsyncTeleBot, engine: Engine, prev_mes: Message, mes: Message ) -> None: - _base_handler(bot, mes, prev_mes) + await _base_handler(bot, mes, prev_mes) text = mes.text.strip() if text == "/cancel": - return _send_tmp_message(bot, mes.chat.id, "Успешная отмена") + return await _send_tmp_message(bot, mes.chat.id, "Успешная отмена") if not check_account_name(text): - return _send_tmp_message( + return await _send_tmp_message( bot, mes.chat.id, "Не корректное название аккаунта", ) if text in database.get.get_accounts(engine, mes.from_user.id): - return _send_tmp_message( + return await _send_tmp_message( bot, mes.chat.id, "Аккаунт с таким именем уже существует" ) - bot_mes = bot.send_message(mes.chat.id, "Отправьте логин") + bot_mes = await bot.send_message(mes.chat.id, "Отправьте логин") data = {"name": text} - bot.register_next_step_handler( - mes, functools.partial(_add_account3, bot, engine, bot_mes, data) + _register_state( + mes, + functools.partial(_add_account3, bot, engine, bot_mes, data), ) -def _add_account3( - bot: telebot.TeleBot, +async def _add_account3( + bot: AsyncTeleBot, engine: Engine, prev_mes: Message, data: dict[str, str], mes: Message, ) -> None: - _base_handler(bot, mes, prev_mes) + await _base_handler(bot, mes, prev_mes) text = mes.text.strip() if text == "/cancel": - return _send_tmp_message(bot, mes.chat.id, "Успешная отмена") + return await _send_tmp_message(bot, mes.chat.id, "Успешная отмена") if not check_login(text): - return _send_tmp_message(bot, mes.chat.id, "Не корректный логин") + return await _send_tmp_message(bot, mes.chat.id, "Не корректный логин") data["login"] = text - bot_mes = bot.send_message(mes.chat.id, "Отправьте пароль от аккаунта") + bot_mes = await bot.send_message( + mes.chat.id, + "Отправьте пароль от аккаунта", + ) - bot.register_next_step_handler( - mes, functools.partial(_add_account4, bot, engine, bot_mes, data) + _register_state( + mes, + functools.partial(_add_account4, bot, engine, bot_mes, data), ) -def _add_account4( - bot: telebot.TeleBot, +async def _add_account4( + bot: AsyncTeleBot, engine: Engine, prev_mes: Message, data: dict[str, str], mes: Message, ) -> None: - _base_handler(bot, mes, prev_mes) + await _base_handler(bot, mes, prev_mes) text = mes.text.strip() if text == "/cancel": - return _send_tmp_message(bot, mes.chat.id, "Успешная отмена") + return await _send_tmp_message(bot, mes.chat.id, "Успешная отмена") if not check_password(text): - return _send_tmp_message(bot, mes.chat.id, "Не корректный пароль") + return await _send_tmp_message( + bot, + mes.chat.id, + "Не корректный пароль", + ) data["passwd"] = text - bot_mes = bot.send_message(mes.chat.id, "Отправьте мастер пароль") + bot_mes = await bot.send_message(mes.chat.id, "Отправьте мастер пароль") - bot.register_next_step_handler( - mes, functools.partial(_add_account5, bot, engine, bot_mes, data) + _register_state( + mes, + functools.partial(_add_account5, bot, engine, bot_mes, data), ) -def _add_account5( - bot: telebot.TeleBot, +async def _add_account5( + bot: AsyncTeleBot, engine: Engine, prev_mes: Message, data: dict[str, str], mes: Message, ) -> None: - _base_handler(bot, mes, prev_mes) + await _base_handler(bot, mes, prev_mes) text = mes.text.strip() if text == "/cancel": - return _send_tmp_message(bot, mes.chat.id, "Успешная отмена") + return await _send_tmp_message(bot, mes.chat.id, "Успешная отмена") master_password = database.get.get_master_pass(engine, mes.from_user.id) if not encryption.master_pass.check_master_pass(text, master_password): - return _send_tmp_message( + return await _send_tmp_message( bot, mes.chat.id, "Не подходит главный пароль", @@ -307,7 +351,7 @@ def _add_account5( encrypted_account, ) - _send_tmp_message( + await _send_tmp_message( bot, mes.chat.id, "Успех" if result else "Произошла не предвиденная ошибка", @@ -318,47 +362,52 @@ def _add_account5( gc.collect() -def get_account(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None: - _base_handler(bot, mes) +async def get_account(bot: AsyncTeleBot, engine: Engine, mes: Message) -> None: + await _base_handler(bot, mes) master_pass = database.get.get_master_pass(engine, mes.from_user.id) if master_pass is None: - return _send_tmp_message(bot, mes.chat.id, "Нет мастер пароля") + return await _send_tmp_message(bot, mes.chat.id, "Нет мастер пароля") - bot_mes = bot.send_message(mes.chat.id, "Отправьте название аккаунта") - bot.register_next_step_handler( - mes, functools.partial(_get_account2, bot, engine, bot_mes) + bot_mes = await bot.send_message( + mes.chat.id, + "Отправьте название аккаунта", + ) + _register_state( + mes, + functools.partial(_get_account2, bot, engine, bot_mes), ) -def _get_account2( - bot: telebot.TeleBot, engine: Engine, prev_mes: Message, mes: Message +async def _get_account2( + bot: AsyncTeleBot, engine: Engine, prev_mes: Message, mes: Message ) -> None: - _base_handler(bot, mes, prev_mes) + await _base_handler(bot, mes, prev_mes) text = mes.text.strip() if text == "/cancel": - return _send_tmp_message(bot, mes.chat.id, "Успешная отмена") + return await _send_tmp_message(bot, mes.chat.id, "Успешная отмена") if text not in database.get.get_accounts(engine, mes.from_user.id): - return _send_tmp_message(bot, mes.chat.id, "Нет такого аккаунта") + return await _send_tmp_message(bot, mes.chat.id, "Нет такого аккаунта") - bot_mes = bot.send_message(mes.chat.id, "Отправьте мастер пароль") - bot.register_next_step_handler( - mes, functools.partial(_get_account3, bot, engine, bot_mes, text) + bot_mes = await bot.send_message(mes.chat.id, "Отправьте мастер пароль") + _register_state( + mes, + functools.partial(_get_account3, bot, engine, bot_mes, text), ) -def _get_account3( - bot: telebot.TeleBot, +async def _get_account3( + bot: AsyncTeleBot, engine: Engine, prev_mes: Message, name: str, mes: Message, ) -> None: - _base_handler(bot, mes, prev_mes) + await _base_handler(bot, mes, prev_mes) text = mes.text.strip() if text == "/cancel": - return _send_tmp_message(bot, mes.chat.id, "Успешная отмена") + return await _send_tmp_message(bot, mes.chat.id, "Успешная отмена") master_password = database.get.get_master_pass( engine, @@ -366,14 +415,18 @@ def _get_account3( ) if not encryption.master_pass.check_master_pass(text, master_password): - return _send_tmp_message(bot, mes.chat.id, "Не подходит мастер пароль") + return await _send_tmp_message( + bot, + mes.chat.id, + "Не подходит мастер пароль", + ) account = database.get.get_account_info(engine, mes.from_user.id, name) account = encryption.other_accounts.decrypt( account, text, ) - _send_tmp_message( + await _send_tmp_message( bot, mes.chat.id, f"Логин:\n`{account.login}`\nПароль:\n`{account.password}`\nНажмите " @@ -385,64 +438,69 @@ def _get_account3( gc.collect() -def delete_account(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None: - _base_handler(bot, mes) +async def delete_account( + bot: AsyncTeleBot, + engine: Engine, + mes: Message, +) -> None: + await _base_handler(bot, mes) master_pass = database.get.get_master_pass(engine, mes.from_user.id) if master_pass is None: - return _send_tmp_message(bot, mes.chat.id, "Нет мастер пароля") + return await _send_tmp_message(bot, mes.chat.id, "Нет мастер пароля") - bot_mes = bot.send_message( + bot_mes = await bot.send_message( mes.chat.id, "Отправьте название аккаунта, который вы хотите удалить" ) - bot.register_next_step_handler( - mes, functools.partial(_delete_account2, bot, engine, bot_mes) + _register_state( + mes, + functools.partial(_delete_account2, bot, engine, bot_mes), ) -def _delete_account2( - bot: telebot.TeleBot, +async def _delete_account2( + bot: AsyncTeleBot, engine: Engine, prev_mes: Message, mes: Message, ): - _base_handler(bot, mes, prev_mes) + await _base_handler(bot, mes, prev_mes) text = mes.text.strip() if text == "/cancel": - return _send_tmp_message(bot, mes.chat.id, "Успешная отмена") + return await _send_tmp_message(bot, mes.chat.id, "Успешная отмена") if text not in database.get.get_accounts(engine, mes.from_user.id): - return _send_tmp_message(bot, mes.chat.id, "Нет такого аккаунта") + return await _send_tmp_message(bot, mes.chat.id, "Нет такого аккаунта") - bot_mes = bot.send_message( + bot_mes = await bot.send_message( mes.from_user.id, f'Вы уверены, что хотите удалить аккаунт "{text}"?\nОтправьте YES для ' "подтверждения", ) - bot.register_next_step_handler( + _register_state( mes, functools.partial(_delete_account3, bot, engine, bot_mes, text) ) -def _delete_account3( - bot: telebot.TeleBot, +async def _delete_account3( + bot: AsyncTeleBot, engine: Engine, prev_mes: Message, account_name: str, mes: Message, ) -> None: - _base_handler(bot, mes, prev_mes) + await _base_handler(bot, mes, prev_mes) text = mes.text.strip() if text != "YES": - return _send_tmp_message(bot, mes.chat.id, "Успешная отмена") + return await _send_tmp_message(bot, mes.chat.id, "Успешная отмена") database.delete.delete_account(engine, mes.from_user.id, account_name) - _send_tmp_message(bot, mes.chat.id, "Аккаунт удалён") + await _send_tmp_message(bot, mes.chat.id, "Аккаунт удалён") -def help_command(bot: telebot.TeleBot, mes: Message) -> None: +async def help_command(bot: AsyncTeleBot, mes: Message) -> None: message = """Команды: /set_master_pass - установить мастер пароль /add_account - создать аккаунт @@ -457,137 +515,145 @@ def help_command(bot: telebot.TeleBot, mes: Message) -> None: /import - импортировать пароли из json в файле в таком же формате, \ как из /export /gen_password - создать 10 надёжных паролей""" - bot.send_message(mes.chat.id, message) + await bot.send_message(mes.chat.id, message) -def cancel(bot: telebot.TeleBot, mes: Message) -> None: - _base_handler(bot, mes) - _send_tmp_message(bot, mes.chat.id, "Нет активного действия") +async def cancel(bot: AsyncTeleBot, mes: Message) -> None: + await _base_handler(bot, mes) + await _send_tmp_message(bot, mes.chat.id, "Нет активного действия") -def export(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None: - _base_handler(bot, mes) +async def export(bot: AsyncTeleBot, engine: Engine, mes: Message) -> None: + await _base_handler(bot, mes) master_password_from_db = database.get.get_master_pass( engine, mes.from_user.id, ) if master_password_from_db is None: - return _send_tmp_message(bot, mes.chat.id, "Нет мастер пароля") + return await _send_tmp_message(bot, mes.chat.id, "Нет мастер пароля") if not database.get.get_accounts(engine, mes.from_user.id): - return _send_tmp_message(bot, mes.chat.id, "Нет аккаунтов") + return await _send_tmp_message(bot, mes.chat.id, "Нет аккаунтов") - bot_mes = bot.send_message(mes.chat.id, "Отправьте мастер пароль") + bot_mes = await bot.send_message(mes.chat.id, "Отправьте мастер пароль") - bot.register_next_step_handler( - mes, functools.partial(_export2, bot, engine, bot_mes) - ) + _register_state(mes, functools.partial(_export2, bot, engine, bot_mes)) -def _export2( - bot: telebot.TeleBot, engine: Engine, prev_mes: Message, mes: Message +async def _export2( + bot: AsyncTeleBot, engine: Engine, prev_mes: Message, mes: Message ) -> None: - _base_handler(bot, mes, prev_mes) + await _base_handler(bot, mes, prev_mes) text = mes.text.strip() if text == "/cancel": - return _send_tmp_message(bot, mes.chat.id, "Успешная отмена") + return await _send_tmp_message(bot, mes.chat.id, "Успешная отмена") master_password = database.get.get_master_pass( engine, mes.from_user.id, ) if not encryption.master_pass.check_master_pass(text, master_password): - return _send_tmp_message(bot, mes.chat.id, "Не подходит мастер пароль") + return await _send_tmp_message( + bot, + mes.chat.id, + "Не подходит мастер пароль", + ) accounts = database.get.get_all_accounts(engine, mes.from_user.id) accounts = encryption.other_accounts.decrypt_multiple(accounts, text) json_io = accounts_to_json(accounts) - bot_mes = bot.send_document(mes.chat.id, json_io) + bot_mes = await bot.send_document(mes.chat.id, json_io) del text, accounts, json_io gc.collect() - time.sleep(30) - _delete_message(bot, bot_mes) + await _delete_message(bot, bot_mes, sleep_time=30) -def import_accounts( - bot: telebot.TeleBot, +async def import_accounts( + bot: AsyncTeleBot, engine: Engine, mes: Message, ) -> None: - _base_handler(bot, mes) + await _base_handler(bot, mes) master_password_from_db = database.get.get_master_pass( engine, mes.from_user.id, ) if master_password_from_db is None: - return _send_tmp_message(bot, mes.chat.id, "Нет мастер пароля") + return await _send_tmp_message(bot, mes.chat.id, "Нет мастер пароля") - bot_mes = bot.send_message(mes.chat.id, "Отправьте json файл") + bot_mes = await bot.send_message(mes.chat.id, "Отправьте json файл") - bot.register_next_step_handler( - mes, functools.partial(_import2, bot, engine, bot_mes) - ) + _register_state(mes, functools.partial(_import2, bot, engine, bot_mes)) -def _import2( - bot: telebot.TeleBot, engine: Engine, prev_mes: Message, mes: Message +async def _import2( + bot: AsyncTeleBot, engine: Engine, prev_mes: Message, mes: Message ) -> None: - _base_handler(bot, mes, prev_mes) + await _base_handler(bot, mes, prev_mes) if mes.text is not None: text = mes.text.strip() if text == "/cancel": - return _send_tmp_message(bot, mes.chat.id, "Успешная отмена") + return await _send_tmp_message(bot, mes.chat.id, "Успешная отмена") if mes.document is None: - return _send_tmp_message( + return await _send_tmp_message( bot, mes.chat.id, "Вы должны отправить документ", ) if mes.document.file_size > 102_400: # If file size is bigger than 100 MB - return _send_tmp_message(bot, mes.chat.id, "Файл слишком большой") + return await _send_tmp_message( + bot, + mes.chat.id, + "Файл слишком большой", + ) - file_info = bot.get_file(mes.document.file_id) - downloaded_file = bot.download_file(file_info.file_path) + file_info = await bot.get_file(mes.document.file_id) + downloaded_file = await bot.download_file(file_info.file_path) try: accounts = json_to_accounts( downloaded_file.decode("utf-8"), mes.from_user.id, ) except Exception: - return _send_tmp_message( + return await _send_tmp_message( bot, mes.chat.id, "Ошибка во время работы с файлом", ) - bot_mes = bot.send_message(mes.chat.id, "Отправьте мастер пароль") - bot.register_next_step_handler( - mes, functools.partial(_import3, bot, engine, bot_mes, accounts) + bot_mes = await bot.send_message(mes.chat.id, "Отправьте мастер пароль") + _register_state( + mes, + functools.partial(_import3, bot, engine, bot_mes, accounts), ) -def _import3( - bot: telebot.TeleBot, +async def _import3( + bot: AsyncTeleBot, engine: Engine, prev_mes: Message, accounts: list[DecryptedAccount], mes: Message, ) -> None: - _base_handler(bot, mes, prev_mes) + await _base_handler(bot, mes, prev_mes) text = mes.text.strip() if text == "/cancel": - return _send_tmp_message(bot, mes.chat.id, "Успешная отмена") + return await _send_tmp_message(bot, mes.chat.id, "Успешная отмена") master_password = database.get.get_master_pass( engine, mes.from_user.id, ) if not encryption.master_pass.check_master_pass(text, master_password): - return _send_tmp_message(bot, mes.chat.id, "Не подходит мастер пароль") + return await _send_tmp_message( + bot, + mes.chat.id, + "Не подходит мастер пароль", + ) # List of names of accounts, which failed to be added to the database # or failed the tests @@ -609,13 +675,13 @@ def _import3( else: mes_text = "Успех" - _send_tmp_message(bot, mes.chat.id, mes_text, 10) + await _send_tmp_message(bot, mes.chat.id, mes_text, 10) del text, mes, accounts gc.collect() -def gen_password(bot: telebot.TeleBot, mes: Message) -> None: - _base_handler(bot, mes) +async def gen_password(bot: AsyncTeleBot, mes: Message) -> None: + await _base_handler(bot, mes) # Generate 10 passwords and put 'em in the backticks passwords = (f"`{generate_password.gen_password()}`" for _ in range(10)) text = ( @@ -623,4 +689,17 @@ def gen_password(bot: telebot.TeleBot, mes: Message) -> None: + "\n".join(passwords) + "\nНажмите на пароль, чтобы его скопировать" ) - _send_tmp_message(bot, mes.chat.id, text, 15) + await _send_tmp_message(bot, mes.chat.id, text, 15) + + +async def message_handler(bot: AsyncTeleBot, mes: Message) -> None: + handler = states.get((mes.chat.id, mes.from_user.id)) + if handler is None: + await _delete_message(bot, mes) + await _send_tmp_message( + bot, + mes.chat.id, + "Вы отправили не корректное сообщение", + ) + return + await handler(mes) diff --git a/src/encryption/other_accounts.py b/src/encryption/other_accounts.py index 52b69c7..30daea6 100644 --- a/src/encryption/other_accounts.py +++ b/src/encryption/other_accounts.py @@ -7,8 +7,8 @@ from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC -from ..decrypted_account import DecryptedAccount from ..database.models import Account +from ..decrypted_account import DecryptedAccount def _generate_key(salt: bytes, master_pass: bytes) -> bytes: