diff --git a/src/__init__.py b/src/__init__.py index 81bd10e..ad44c78 100644 --- a/src/__init__.py +++ b/src/__init__.py @@ -10,8 +10,6 @@ engine: Engine def main() -> None: - global engine - load_dotenv("./.env") engine = database.prepare.get_engine( host=os.getenv("DB_HOST"), @@ -20,5 +18,5 @@ def main() -> None: db=os.getenv("DB_NAME"), ) # type: ignore database.prepare.prepare(engine) - bot_ = bot.create_bot(os.getenv("TG_TOKEN"), con) # type: ignore + bot_ = bot.create_bot(os.getenv("TG_TOKEN"), engine) # type: ignore bot_.infinity_polling() diff --git a/src/bot/__init__.py b/src/bot/__init__.py index e69de29..2980e98 100644 --- a/src/bot/__init__.py +++ b/src/bot/__init__.py @@ -0,0 +1,34 @@ +import functools + +import mariadb +import telebot +from sqlalchemy.future import Engine + +from . import handlers + +__all__ = ["handlers"] + + +def create_bot(token: str, engine: mariadb.Connection) -> telebot.TeleBot: + bot = telebot.TeleBot(token) + bot.register_message_handler( + functools.partial(handlers.set_master_password, bot, engine), + commands=["set_master_pass"], + ) + bot.register_message_handler( + functools.partial(handlers.get_account, bot, engine), commands=["get_account"] + ) + bot.register_message_handler( + functools.partial(handlers.get_accounts, bot, engine), commands=["get_accounts"] + ) + bot.register_message_handler( + functools.partial(handlers.add_record, bot, engine), commands=["add_account"] + ) + bot.register_message_handler( + functools.partial(handlers.delete_all, bot, engine), commands=["delete_all"] + ) + bot.register_message_handler( + functools.partial(handlers.reset_master_pass, bot, engine), + commands=["reset_master_pass"], + ) + return bot diff --git a/src/bot/handlers.py b/src/bot/handlers.py new file mode 100644 index 0000000..e76ce33 --- /dev/null +++ b/src/bot/handlers.py @@ -0,0 +1,142 @@ +import gc +import shlex +import time + +import telebot +from sqlalchemy.future import Engine + +from .. import cryptography, database + + +def _send_tmp_message( + bot: telebot.TeleBot, chat_id: telebot.types.Message, text: str, timeout: int = 5 +) -> None: + bot_mes = bot.send_message(chat_id, text) + time.sleep(timeout) + bot.delete_message(chat_id, bot_mes.id) + + +def add_record( + bot: telebot.TeleBot, engine: Engine, mes: telebot.types.Message +) -> None: + data = shlex.split(mes.text) + bot.delete_message(mes.chat.id, mes.id) + if len(data) != 5: + return _send_tmp_message(bot, mes.chat.id, "Неправильное кол-во аргументов") + + master_password = data[4] + master_password_from_db = database.get.get_master_pass(engine, mes.from_user.id) + if master_password is None: + return _send_tmp_message(bot, mes.chat.id, "Нет мастер пароля") + + master_salt, hash_pass = master_password_from_db + if ( + cryptography.master_pass.encrypt_master_pass_known_salt( + master_password, master_salt + ) + != hash_pass + ): + return _send_tmp_message(bot, mes.chat.id, "Не подходит главный пароль") + + enc_login, enc_pass, salt = cryptography.other_accounts.encrypt_account_info( + data[2], data[3], master_password.encode("utf-8") + ) + result = database.add.add_account( + engine, mes.from_user.id, data[1], salt, enc_login, enc_pass + ) + _send_tmp_message( + bot, + mes.chat.id, + "Успех" if result else "Ошибка, вероятно аккаунт с таким же именем существует", + ) + del data, master_password, mes + gc.collect() + + +def get_accounts( + bot: telebot.TeleBot, engine: Engine, mes: telebot.types.Message +) -> None: + accounts = database.get.get_accounts(engine, mes.from_user.id) + bot.delete_message(mes.chat.id, mes.id) + + return _send_tmp_message( + bot, + mes.chat.id, + "Ваши аккаунты:\n" + "\n".join(accounts) if accounts else "У вас нет аккаунтов", + ) + + +def set_master_password( + bot: telebot.TeleBot, engine: Engine, mes: telebot.types.Message +) -> None: + data = shlex.split(mes.text) + bot.delete_message(mes.chat.id, mes.id) + + if database.get.get_master_pass(engine, mes.from_user.id) is not None: + return _send_tmp_message(bot, mes.chat.id, "У вас уже установлен мастер пароль") + + if len(data) != 2: + return _send_tmp_message(bot, mes.chat.id, "Неправильное количество аргументов") + + pass_, salt = cryptography.master_pass.encrypt_master_pass(data[1]) + result = database.add.add_master_pass(engine, mes.from_user.id, salt, pass_) + _send_tmp_message( + bot, + mes.chat.id, + "Успех" if result else "Ошибка, вероятно аккаунт с таким же именем существует", + ) + + del data, mes + gc.collect() + + +def get_account( + bot: telebot.TeleBot, engine: Engine, mes: telebot.types.Message +) -> None: + data = shlex.split(mes.text) + bot.delete_message(mes.chat.id, mes.id) + + if len(data) != 3: + return _send_tmp_message(bot, mes.chat.id, "Неправильное количество аргументов") + + if data[1] not in database.get.get_accounts(engine, mes.from_user.id): + return _send_tmp_message(bot, mes.chat.id, "Нет такого аккаунта") + + master_salt, hash_pass = database.get.get_master_pass(engine, mes.from_user.id) + if ( + cryptography.master_pass.encrypt_master_pass_known_salt(data[2], master_salt) + != hash_pass + ): + return _send_tmp_message(bot, mes.chat.id, "Не подходит главный пароль") + + salt, enc_login, enc_pass = database.get.get_account_info( + engine, mes.from_user.id, data[1] + ) + login, passwd = cryptography.other_accounts.decrypt_account_info( + enc_login, enc_pass, data[2].encode("utf-8"), salt + ) + bot.send_message(mes.chat.id, f"Логин:\n{login}\nПароль:\n{passwd}") + + del data, mes, passwd, login + gc.collect() + + +def delete_all( + bot: telebot.TeleBot, engine: Engine, mes: telebot.types.Message +) -> None: + database.delete.purge_accounts(engine, mes.from_user.id) + database.delete.delete_master_pass(engine, mes.from_user.id) + bot.send_message(mes.chat.id, "Все ваши данные удалены из базы данных") + + +def reset_master_pass( + bot: telebot.TeleBot, engine: Engine, mes: telebot.types.Message +) -> None: + data = shlex.split(mes.text) + master_password = data[1] + enc_pass, salt = cryptography.master_pass.encrypt_master_pass(master_password) + database.delete.purge_accounts(engine, mes.from_user.id) + database.change.change_master_pass(engine, mes.from_user.id, salt, enc_pass) + _send_tmp_message( + bot, mes.chat.id, "Все ваши аккаунты удалены, а мастер пароль изменён" + )