From dbf27d401e0371f41ec8777d483dd3a357ccfce5 Mon Sep 17 00:00:00 2001 From: StNicolay Date: Tue, 1 Nov 2022 09:46:09 +0300 Subject: [PATCH] Added ability to export accounts to json --- README.md | 1 + requirements.txt | 3 +- src/bot/__init__.py | 7 ++- src/bot/handlers.py | 143 ++++++++++++++++++++++++++------------------ src/bot/utils.py | 77 ++++++++++++++++++++++++ 5 files changed, 170 insertions(+), 61 deletions(-) create mode 100644 src/bot/utils.py diff --git a/README.md b/README.md index 7a6b626..14f3b85 100644 --- a/README.md +++ b/README.md @@ -21,6 +21,7 @@ - /reset_master_pass- удалить все аккаунты и изменить мастер пароль - /cancel - отмена текущего действия - /help - помощь +- /export - получить пароли в json формате ### Настройка diff --git a/requirements.txt b/requirements.txt index 61e83cd..b6a95cf 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,4 +3,5 @@ cryptography mariadb python-dotenv pyTelegramBotAPI -sqlmodel \ No newline at end of file +sqlmodel +pydantic \ No newline at end of file diff --git a/src/bot/__init__.py b/src/bot/__init__.py index bea8d75..12df942 100644 --- a/src/bot/__init__.py +++ b/src/bot/__init__.py @@ -3,9 +3,9 @@ import functools import mariadb import telebot -from . import handlers +from . import handlers, utils -__all__ = ["handlers"] +__all__ = ["handlers", "utils"] def create_bot(token: str, engine: mariadb.Connection) -> telebot.TeleBot: @@ -40,4 +40,7 @@ def create_bot(token: str, engine: mariadb.Connection) -> telebot.TeleBot: bot.register_message_handler( functools.partial(handlers.cancel, bot), commands=["cancel"] ) + bot.register_message_handler( + functools.partial(handlers.export, bot, engine), commands=["export"] + ) return bot diff --git a/src/bot/handlers.py b/src/bot/handlers.py index b85da91..903e199 100644 --- a/src/bot/handlers.py +++ b/src/bot/handlers.py @@ -6,25 +6,18 @@ import telebot from sqlalchemy.future import Engine from .. import cryptography, database +from .utils import accounts_to_json, base_handler, get_all_accounts, send_tmp_message Message = telebot.types.Message -def _send_tmp_message( - bot: telebot.TeleBot, chat_id: telebot.types.Message, text: str, timeout: int = 5 -) -> None: - bot_mes = bot.send_message(chat_id, text, parse_mode="MarkdownV2") - time.sleep(timeout) - bot.delete_message(chat_id, bot_mes.id) - - def get_accounts( bot: telebot.TeleBot, engine: Engine, mes: telebot.types.Message ) -> None: accounts = database.get.get_accounts(engine, mes.from_user.id) bot.delete_message(mes.chat.id, mes.id) - return _send_tmp_message( + return send_tmp_message( bot, mes.chat.id, "Ваши аккаунты:\n" + "\n".join(accounts) if accounts else "У вас нет аккаунтов", @@ -32,16 +25,10 @@ def get_accounts( ) -def _base(bot: telebot.TeleBot, mes: Message, prev_mes: Message | None = None) -> None: - bot.delete_message(mes.chat.id, mes.id) - if prev_mes is not None: - bot.delete_message(prev_mes.chat.id, prev_mes.id) - - def delete_all( bot: telebot.TeleBot, engine: Engine, mes: telebot.types.Message ) -> None: - _base(bot, mes) + base_handler(bot, mes) bot_mes = bot.send_message( mes.chat.id, "Вы действительно хотите удалить все ваши аккаунты? Это действие нельзя отменить. Отправьте YES для подтверждения", @@ -54,20 +41,20 @@ def delete_all( def _delete_all( bot: telebot.TeleBot, engine: Engine, prev_mes: Message, mes: Message ) -> None: - _base(bot, mes, prev_mes) + base_handler(bot, mes, prev_mes) text = mes.text.strip() if text == "YES": database.delete.purge_accounts(engine, mes.from_user.id) database.delete.delete_master_pass(engine, mes.from_user.id) - _send_tmp_message(bot, mes.chat.id, "Всё успешно удалено", timeout=10) + send_tmp_message(bot, mes.chat.id, "Всё успешно удалено", timeout=10) else: - _send_tmp_message(bot, mes.chat.id, "Вы отправили не YES, ничего не удалено") + send_tmp_message(bot, mes.chat.id, "Вы отправили не YES, ничего не удалено") def set_master_password(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None: - _base(bot, mes, None) + base_handler(bot, mes, None) if database.get.get_master_pass(engine, mes.from_user.id) is not None: - return _send_tmp_message(bot, mes.chat.id, "Мастер пароль уже существует") + return send_tmp_message(bot, mes.chat.id, "Мастер пароль уже существует") bot_mes = bot.send_message(mes.chat.id, "Отправьте мастер пароль") bot.register_next_step_handler( mes, functools.partial(_set_master_pass2, bot, engine, bot_mes) @@ -77,22 +64,22 @@ def set_master_password(bot: telebot.TeleBot, engine: Engine, mes: Message) -> N def _set_master_pass2( bot: telebot.TeleBot, engine: Engine, prev_mes: Message, mes: Message ) -> None: - _base(bot, mes, prev_mes) + base_handler(bot, mes, prev_mes) text = mes.text.strip() if text == "/cancel": - return _send_tmp_message(bot, mes.chat.id, "Успешная отмена") + return send_tmp_message(bot, mes.chat.id, "Успешная отмена") hash_, salt = cryptography.master_pass.encrypt_master_pass(text) database.add.add_master_pass(engine, mes.from_user.id, salt, hash_) - _send_tmp_message(bot, mes.chat.id, "Успех") + send_tmp_message(bot, mes.chat.id, "Успех") del mes, text gc.collect() def reset_master_pass(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None: - _base(bot, mes) + base_handler(bot, mes) if database.get.get_master_pass(engine, mes.from_user.id) is None: - return _send_tmp_message(bot, mes.chat.id, "Мастер пароль не задан") + return send_tmp_message(bot, mes.chat.id, "Мастер пароль не задан") bot_mes = bot.send_message( mes.chat.id, "Отправьте новый мастер пароль, осторожно, все текущие аккаунты будут удалены навсегда", @@ -105,15 +92,15 @@ def reset_master_pass(bot: telebot.TeleBot, engine: Engine, mes: Message) -> Non def _reset_master_pass2( bot: telebot.TeleBot, engine: Engine, prev_mes: Message, mes: Message ) -> None: - _base(bot, mes, prev_mes) + base_handler(bot, mes, prev_mes) text = mes.text.strip() if text == "/cancel": - return _send_tmp_message(bot, mes.chat.id, "Успешная отмена") + return send_tmp_message(bot, mes.chat.id, "Успешная отмена") hash_, salt = cryptography.master_pass.encrypt_master_pass(text) database.delete.purge_accounts(engine, mes.from_user.id) database.change.change_master_pass(engine, mes.from_user.id, salt, hash_) - _send_tmp_message( + send_tmp_message( bot, mes.chat.id, "Все ваши аккаунты удалены, а мастер пароль изменён" ) del mes, text @@ -121,11 +108,11 @@ def _reset_master_pass2( def add_account(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None: - _base(bot, mes) + base_handler(bot, mes) master_password_from_db = database.get.get_master_pass(engine, mes.from_user.id) if master_password_from_db is None: - return _send_tmp_message(bot, mes.chat.id, "Нет мастер пароля") + return send_tmp_message(bot, mes.chat.id, "Нет мастер пароля") bot_mes = bot.send_message(mes.chat.id, "Отправьте название аккаунта") @@ -137,13 +124,13 @@ def add_account(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None: def _add_account2( bot: telebot.TeleBot, engine: Engine, prev_mes: Message, mes: Message ) -> None: - _base(bot, mes, prev_mes) + base_handler(bot, mes, prev_mes) text = mes.text.strip() if text == "/cancel": - return _send_tmp_message(bot, mes.chat.id, "Успешная отмена") + return send_tmp_message(bot, mes.chat.id, "Успешная отмена") if text in database.get.get_accounts(engine, mes.from_user.id): - return _send_tmp_message( + return send_tmp_message( bot, mes.chat.id, "Аккаунт с таким именем уже существует" ) @@ -162,10 +149,10 @@ def _add_account3( data: dict[str, str], mes: Message, ) -> None: - _base(bot, mes, prev_mes) + base_handler(bot, mes, prev_mes) text = mes.text.strip() if text == "/cancel": - return _send_tmp_message(bot, mes.chat.id, "Успешная отмена") + return send_tmp_message(bot, mes.chat.id, "Успешная отмена") data["login"] = text @@ -183,10 +170,10 @@ def _add_account4( data: dict[str, str], mes: Message, ) -> None: - _base(bot, mes, prev_mes) + base_handler(bot, mes, prev_mes) text = mes.text.strip() if text == "/cancel": - return _send_tmp_message(bot, mes.chat.id, "Успешная отмена") + return send_tmp_message(bot, mes.chat.id, "Успешная отмена") data["passwd"] = text @@ -204,14 +191,14 @@ def _add_account5( data: dict[str, str], mes: Message, ) -> None: - _base(bot, mes, prev_mes) + base_handler(bot, mes, prev_mes) text = mes.text.strip() if text == "/cancel": - return _send_tmp_message(bot, mes.chat.id, "Успешная отмена") + return send_tmp_message(bot, mes.chat.id, "Успешная отмена") salt, hash_ = database.get.get_master_pass(engine, mes.from_user.id) if cryptography.master_pass.encrypt_master_pass(text, salt) != hash_: - return _send_tmp_message(bot, mes.chat.id, "Не подходит главный пароль") + return send_tmp_message(bot, mes.chat.id, "Не подходит главный пароль") name, login, passwd = data["name"], data["login"], data["passwd"] @@ -223,7 +210,7 @@ def _add_account5( engine, mes.from_user.id, name, salt, enc_login, enc_pass ) - _send_tmp_message( + send_tmp_message( bot, mes.chat.id, "Успех" if result else "Произошла не предвиденная ошибка" ) @@ -233,7 +220,7 @@ def _add_account5( def get_account(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None: - _base(bot, mes) + base_handler(bot, mes) bot_mes = bot.send_message(mes.chat.id, "Отправьте название аккаунта") bot.register_next_step_handler( mes, functools.partial(_get_account2, bot, engine, bot_mes) @@ -243,13 +230,13 @@ def get_account(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None: def _get_account2( bot: telebot.TeleBot, engine: Engine, prev_mes: Message, mes: Message ) -> None: - _base(bot, mes, prev_mes) + base_handler(bot, mes, prev_mes) text = mes.text.strip() if text == "/cancel": - return _send_tmp_message(bot, mes.chat.id, "Успешная отмена") + return send_tmp_message(bot, mes.chat.id, "Успешная отмена") if text not in database.get.get_accounts(engine, mes.from_user.id): - return _send_tmp_message(bot, mes.chat.id, "Нет такого аккаунта") + return send_tmp_message(bot, mes.chat.id, "Нет такого аккаунта") bot_mes = bot.send_message(mes.chat.id, "Отправьте мастер пароль") bot.register_next_step_handler( @@ -260,19 +247,19 @@ def _get_account2( def _get_account3( bot: telebot.TeleBot, engine: Engine, prev_mes: Message, name: str, mes: Message ) -> None: - _base(bot, mes, prev_mes) + base_handler(bot, mes, prev_mes) text = mes.text.strip() if text == "/cancel": - return _send_tmp_message(bot, mes.chat.id, "Успешная отмена") + return send_tmp_message(bot, mes.chat.id, "Успешная отмена") master_pass = database.get.get_master_pass(engine, mes.from_user.id) if master_pass is None: - return _send_tmp_message(bot, mes.chat.id, "Нет мастер пароля") + return send_tmp_message(bot, mes.chat.id, "Нет мастер пароля") master_salt, hash_pass = master_pass if cryptography.master_pass.encrypt_master_pass(text, master_salt) != hash_pass: - return _send_tmp_message(bot, mes.chat.id, "Не подходит мастер пароль") + return send_tmp_message(bot, mes.chat.id, "Не подходит мастер пароль") salt, enc_login, enc_pass = database.get.get_account_info( engine, mes.from_user.id, name @@ -280,7 +267,7 @@ def _get_account3( login, passwd = cryptography.other_accounts.decrypt_account_info( enc_login, enc_pass, text.encode("utf-8"), salt ) - _send_tmp_message( + send_tmp_message( bot, mes.chat.id, f"Логин:\n`{login}`\nПароль:\n`{passwd}`\nНажмите на логин или пароль, чтобы скопировать", @@ -292,7 +279,7 @@ def _get_account3( def delete_account(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None: - _base(bot, mes) + base_handler(bot, mes) bot_mes = bot.send_message( mes.chat.id, "Отправьте название аккаунта, который вы хотите удалить" ) @@ -305,16 +292,16 @@ def delete_account(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None: def _delete_account2( bot: telebot.TeleBot, engine: Engine, prev_mes: Message, mes: Message ) -> None: - _base(bot, mes, prev_mes) + base_handler(bot, mes, prev_mes) text = mes.text.strip() if text == "/cancel": - return _send_tmp_message(bot, mes.chat.id, "Успешная отмена") + return send_tmp_message(bot, mes.chat.id, "Успешная отмена") if text not in database.get.get_accounts(engine, mes.from_user.id): - return _send_tmp_message(bot, mes.chat.id, "Нет такого аккаунта") + return send_tmp_message(bot, mes.chat.id, "Нет такого аккаунта") database.delete.delete_account(engine, mes.from_user.id, text) - _send_tmp_message(bot, mes.chat.id, "Аккаунт удалён") + send_tmp_message(bot, mes.chat.id, "Аккаунт удалён") def help(bot: telebot.TeleBot, mes: telebot.types.Message) -> None: @@ -327,9 +314,49 @@ def help(bot: telebot.TeleBot, mes: telebot.types.Message) -> None: /delete_all - удалить все аккаунты и мастер пароль /reset_master_pass - удалить все аккаунты и изменить мастер пароль /cancel - отмена текущего действия -/help - помощь""" +/help - помощь +/export - получить пароли в json формате""" bot.send_message(mes.chat.id, message) def cancel(bot: telebot.TeleBot, mes: Message) -> None: - _send_tmp_message(bot, mes.chat.id, "Нет активного действия") + send_tmp_message(bot, mes.chat.id, "Нет активного действия") + + +def export(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None: + base_handler(bot, mes) + master_password_from_db = database.get.get_master_pass(engine, mes.from_user.id) + + if master_password_from_db is None: + return send_tmp_message(bot, mes.chat.id, "Нет мастер пароля") + + if not database.get.get_accounts(engine, mes.from_user.id): + return send_tmp_message(bot, mes.chat.id, "Нет аккаунтов") + + bot_mes = bot.send_message(mes.chat.id, "Отправьте мастер пароль") + + bot.register_next_step_handler( + mes, functools.partial(_export2, bot, engine, bot_mes) + ) + + +def _export2( + bot: telebot.TeleBot, engine: Engine, prev_mes: Message, mes: Message +) -> None: + base_handler(bot, mes, prev_mes) + text = mes.text.strip() + if text == "/cancel": + return send_tmp_message(bot, mes.chat.id, "Успешная отмена") + + master_salt, hash_pass = database.get.get_master_pass(engine, mes.from_user.id) + if cryptography.master_pass.encrypt_master_pass(text, master_salt) != hash_pass: + return send_tmp_message(bot, mes.chat.id, "Не подходит мастер пароль") + + accounts = get_all_accounts(engine, mes.from_user.id, text) + json_io = accounts_to_json(accounts) + bot_mes = bot.send_document(mes.chat.id, json_io) + + del text, accounts, json_io + gc.collect() + time.sleep(30) + bot.delete_message(bot_mes.chat.id, bot_mes.id) diff --git a/src/bot/utils.py b/src/bot/utils.py new file mode 100644 index 0000000..bf8f49b --- /dev/null +++ b/src/bot/utils.py @@ -0,0 +1,77 @@ +import io +import time +from typing import Self, Type + +import pydantic +import telebot +from sqlalchemy.future import Engine + +from .. import database, cryptography + + +class Account(pydantic.BaseModel): + name: str + login: str + passwd: str + + @classmethod + def from_tuple(cls: Type[Self], tuple_: tuple[str, str, str]) -> Self: + return cls(name=tuple_[0], login=tuple_[1], passwd=tuple_[2]) + + def as_tuple(self: Self) -> tuple[str, str, str]: + return (self.name, self.login, self.passwd) + + +class _Accounts(pydantic.BaseModel): + accounts: list[Account] = pydantic.Field(default_factory=list) + + +def _accounts_list_to_json(accounts: list[tuple[str, str, str]]) -> str: + accounts = _Accounts(accounts=[Account.from_tuple(i) for i in accounts]) + return accounts.json() + + +def json_to_accounts(json_: str) -> list[tuple[str, str, str]]: + accounts = _Accounts.parse_raw(json_) + return [i.as_tuple() for i in accounts.accounts] + + +Message = telebot.types.Message + + +def send_tmp_message( + bot: telebot.TeleBot, chat_id: telebot.types.Message, text: str, timeout: int = 5 +) -> None: + bot_mes = bot.send_message(chat_id, text, parse_mode="MarkdownV2") + time.sleep(timeout) + bot.delete_message(chat_id, bot_mes.id) + + +def base_handler( + bot: telebot.TeleBot, mes: Message, prev_mes: Message | None = None +) -> None: + bot.delete_message(mes.chat.id, mes.id) + if prev_mes is not None: + bot.delete_message(prev_mes.chat.id, prev_mes.id) + + +def get_all_accounts( + engine: Engine, user_id: int, master_pass: str +) -> list[tuple[str, str, str]]: + accounts: list[tuple[str, str, str]] = [] + master_pass = master_pass.encode("utf-8") + for account_name in database.get.get_accounts(engine, user_id): + salt, enc_login, enc_passwd = database.get.get_account_info( + engine, user_id, account_name + ) + login, passwd = cryptography.other_accounts.decrypt_account_info( + enc_login, enc_passwd, master_pass, salt + ) + accounts.append((account_name, login, passwd)) + return accounts + + +def accounts_to_json(accounts: list[tuple[str, str, str]]) -> io.StringIO: + file = io.StringIO(_accounts_list_to_json(accounts)) + file.name = "passwords.json" + return file