Made handlers asynchrounos

Switched from TeleBot class to AsyncTeleBot
This commit is contained in:
StNicolay 2022-12-28 20:36:25 +03:00
parent 8858aa09a7
commit 0026e3321a
5 changed files with 278 additions and 182 deletions

View File

@ -4,3 +4,4 @@ python-dotenv
pyTelegramBotAPI pyTelegramBotAPI
sqlmodel sqlmodel
pydantic pydantic
aiohttp

View File

@ -1,3 +1,4 @@
import asyncio
import os import os
from dotenv import load_dotenv from dotenv import load_dotenv
@ -33,4 +34,4 @@ def main() -> None:
) )
database.prepare.prepare(engine) database.prepare.prepare(engine)
bot_ = bot.create_bot(os.getenv("TG_TOKEN"), engine) bot_ = bot.create_bot(os.getenv("TG_TOKEN"), engine)
bot_.infinity_polling() asyncio.run(bot_.infinity_polling())

View File

@ -1,15 +1,15 @@
import functools import functools
import telebot
from sqlalchemy.future import Engine from sqlalchemy.future import Engine
from telebot.async_telebot import AsyncTeleBot
from . import handlers from . import handlers
__all__ = ["handlers"] __all__ = ["handlers"]
def create_bot(token: str, engine: Engine) -> telebot.TeleBot: def create_bot(token: str, engine: Engine) -> AsyncTeleBot:
bot = telebot.TeleBot(token) bot = AsyncTeleBot(token)
bot.register_message_handler( bot.register_message_handler(
functools.partial(handlers.set_master_password, bot, engine), functools.partial(handlers.set_master_password, bot, engine),
commands=["set_master_pass"], commands=["set_master_pass"],
@ -56,4 +56,19 @@ def create_bot(token: str, engine: Engine) -> telebot.TeleBot:
functools.partial(handlers.gen_password, bot), functools.partial(handlers.gen_password, bot),
commands=["gen_password"], commands=["gen_password"],
) )
bot.register_message_handler(
functools.partial(handlers.message_handler, bot),
content_types=[
"text",
"audio",
"document",
"photo",
"sticker",
"video",
"video_note",
"voice",
"location",
"contact",
],
)
return bot return bot

View File

@ -1,9 +1,11 @@
import asyncio
import functools import functools
import gc import gc
import time from typing import Any, Awaitable, Callable
import telebot import telebot
from sqlalchemy.future import Engine from sqlalchemy.future import Engine
from telebot.async_telebot import AsyncTeleBot
from .. import database, encryption, generate_password from .. import database, encryption, generate_password
from ..account_checks import ( from ..account_checks import (
@ -16,49 +18,68 @@ from ..account_parsing import accounts_to_json, json_to_accounts
from ..decrypted_account import DecryptedAccount from ..decrypted_account import DecryptedAccount
Message = telebot.types.Message Message = telebot.types.Message
states: dict[tuple[int, int], Callable[[Message], Awaitable[Any]]] = {}
def _delete_message(bot: telebot.TeleBot, mes: Message) -> bool: def _register_state(
message: Message,
handler: Callable[[Message], Any],
) -> None:
states[(message.chat.id, message.from_user.id)] = handler
async def _delete_message(
bot: AsyncTeleBot,
mes: Message,
*,
sleep_time: int = 0,
) -> bool:
try: try:
bot.delete_message(mes.chat.id, mes.id) if sleep_time != 0:
await asyncio.sleep(sleep_time)
await bot.delete_message(mes.chat.id, mes.id)
except telebot.apihelper.ApiException: except telebot.apihelper.ApiException:
return False return False
else: else:
return True return True
def _send_tmp_message( async def _send_tmp_message(
bot: telebot.TeleBot, bot: AsyncTeleBot,
chat_id: telebot.types.Message, chat_id: telebot.types.Message,
text: str, text: str,
timeout: int = 5, sleep_time: int = 5,
) -> None: ) -> None:
bot_mes = bot.send_message(chat_id, text, parse_mode="MarkdownV2") bot_mes = await bot.send_message(chat_id, text, parse_mode="MarkdownV2")
time.sleep(timeout) await _delete_message(bot, bot_mes, sleep_time=sleep_time)
_delete_message(bot, bot_mes)
def _base_handler( async def _base_handler(
bot: telebot.TeleBot, mes: Message, prev_mes: Message | None = None bot: AsyncTeleBot, mes: Message, prev_mes: Message | None = None
) -> None: ) -> None:
_delete_message(bot, mes) states.pop((mes.chat.id, mes.from_user.id), None)
await _delete_message(bot, mes)
if prev_mes is not None: if prev_mes is not None:
_delete_message(bot, prev_mes) await _delete_message(bot, prev_mes)
def get_accounts(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None: async def get_accounts(
_base_handler(bot, mes) bot: AsyncTeleBot,
engine: Engine,
mes: Message,
) -> None:
await _base_handler(bot, mes)
accounts = database.get.get_accounts( accounts = database.get.get_accounts(
engine, engine,
mes.from_user.id, mes.from_user.id,
to_sort=True, to_sort=True,
) )
if not accounts: if not accounts:
return _send_tmp_message(bot, mes.chat.id, "У вас нет аккаунтов") return await _send_tmp_message(bot, mes.chat.id, "У вас нет аккаунтов")
# Make accounts copyable and escape special chars # Make accounts copyable and escape special chars
accounts = [f"`{account}`" for account in accounts] accounts = [f"`{account}`" for account in accounts]
_send_tmp_message( await _send_tmp_message(
bot, bot,
mes.chat.id, mes.chat.id,
"Ваши аккаунты:\n" "Ваши аккаунты:\n"
@ -69,61 +90,65 @@ def get_accounts(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None:
) )
def delete_all(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None: async def delete_all(bot: AsyncTeleBot, engine: Engine, mes: Message) -> None:
_base_handler(bot, mes) await _base_handler(bot, mes)
bot_mes = bot.send_message( bot_mes = await bot.send_message(
mes.chat.id, mes.chat.id,
"Вы действительно хотите удалить все ваши аккаунты? Это действие " "Вы действительно хотите удалить все ваши аккаунты? Это действие "
"нельзя отменить. " "нельзя отменить. "
"Отправьте YES для подтверждения", "Отправьте YES для подтверждения",
) )
bot.register_next_step_handler( _register_state(mes, functools.partial(_delete_all2, bot, engine, bot_mes))
mes, functools.partial(_delete_all2, bot, engine, bot_mes)
)
def _delete_all2( async def _delete_all2(
bot: telebot.TeleBot, engine: Engine, prev_mes: Message, mes: Message bot: AsyncTeleBot, engine: Engine, prev_mes: Message, mes: Message
) -> None: ) -> None:
_base_handler(bot, mes, prev_mes) await _base_handler(bot, mes, prev_mes)
text = mes.text.strip() text = mes.text.strip()
if text == "YES": if text == "YES":
database.delete.purge_accounts(engine, mes.from_user.id) database.delete.purge_accounts(engine, mes.from_user.id)
database.delete.delete_master_pass(engine, mes.from_user.id) database.delete.delete_master_pass(engine, mes.from_user.id)
_send_tmp_message(bot, mes.chat.id, "Всё успешно удалено", timeout=10) await _send_tmp_message(
bot,
mes.chat.id,
"Всё успешно удалено",
sleep_time=10,
)
else: else:
_send_tmp_message( await _send_tmp_message(
bot, bot,
mes.chat.id, mes.chat.id,
"Вы отправили не YES, ничего не удалено", "Вы отправили не YES, ничего не удалено",
) )
def set_master_password( async def set_master_password(
bot: telebot.TeleBot, bot: AsyncTeleBot,
engine: Engine, engine: Engine,
mes: Message, mes: Message,
) -> None: ) -> None:
_base_handler(bot, mes, None) await _base_handler(bot, mes, None)
if database.get.get_master_pass(engine, mes.from_user.id) is not None: if database.get.get_master_pass(engine, mes.from_user.id) is not None:
return _send_tmp_message( return await _send_tmp_message(
bot, bot,
mes.chat.id, mes.chat.id,
"Мастер пароль уже существует", "Мастер пароль уже существует",
) )
bot_mes = bot.send_message(mes.chat.id, "Отправьте мастер пароль") bot_mes = await bot.send_message(mes.chat.id, "Отправьте мастер пароль")
bot.register_next_step_handler( _register_state(
mes, functools.partial(_set_master_pass2, bot, engine, bot_mes) mes,
functools.partial(_set_master_pass2, bot, engine, bot_mes),
) )
def _set_master_pass2( async def _set_master_pass2(
bot: telebot.TeleBot, engine: Engine, prev_mes: Message, mes: Message bot: AsyncTeleBot, engine: Engine, prev_mes: Message, mes: Message
) -> None: ) -> None:
_base_handler(bot, mes, prev_mes) await _base_handler(bot, mes, prev_mes)
text = mes.text.strip() text = mes.text.strip()
if text == "/cancel": if text == "/cancel":
return _send_tmp_message(bot, mes.chat.id, "Успешная отмена") return await _send_tmp_message(bot, mes.chat.id, "Успешная отмена")
master_password = encryption.master_pass.encrypt_master_pass( master_password = encryption.master_pass.encrypt_master_pass(
mes.from_user.id, mes.from_user.id,
@ -131,39 +156,44 @@ def _set_master_pass2(
) )
database.add.add_master_pass(engine, master_password) database.add.add_master_pass(engine, master_password)
_send_tmp_message(bot, mes.chat.id, "Успех") await _send_tmp_message(bot, mes.chat.id, "Успех")
del mes, text del mes, text
gc.collect() gc.collect()
def reset_master_pass( async def reset_master_pass(
bot: telebot.TeleBot, bot: AsyncTeleBot,
engine: Engine, engine: Engine,
mes: Message, mes: Message,
) -> None: ) -> None:
_base_handler(bot, mes) await _base_handler(bot, mes)
if database.get.get_master_pass(engine, mes.from_user.id) is None: if database.get.get_master_pass(engine, mes.from_user.id) is None:
return _send_tmp_message(bot, mes.chat.id, "Мастер пароль не задан") return await _send_tmp_message(
bot,
mes.chat.id,
"Мастер пароль не задан",
)
bot_mes = bot.send_message( bot_mes = await bot.send_message(
mes.chat.id, mes.chat.id,
"Отправьте новый мастер пароль, осторожно, все текущие аккаунты " "Отправьте новый мастер пароль, осторожно, все текущие аккаунты "
"будут удалены навсегда", "будут удалены навсегда",
) )
bot.register_next_step_handler( _register_state(
mes, functools.partial(_reset_master_pass2, bot, engine, bot_mes) mes,
functools.partial(_reset_master_pass2, bot, engine, bot_mes),
) )
def _reset_master_pass2( async def _reset_master_pass2(
bot: telebot.TeleBot, engine: Engine, prev_mes: Message, mes: Message bot: AsyncTeleBot, engine: Engine, prev_mes: Message, mes: Message
) -> None: ) -> None:
_base_handler(bot, mes, prev_mes) await _base_handler(bot, mes, prev_mes)
text = mes.text.strip() text = mes.text.strip()
if text == "/cancel": if text == "/cancel":
return _send_tmp_message(bot, mes.chat.id, "Успешная отмена") return await _send_tmp_message(bot, mes.chat.id, "Успешная отмена")
master_password = encryption.master_pass.encrypt_master_pass( master_password = encryption.master_pass.encrypt_master_pass(
mes.from_user.id, mes.from_user.id,
@ -172,118 +202,132 @@ def _reset_master_pass2(
database.delete.purge_accounts(engine, mes.from_user.id) database.delete.purge_accounts(engine, mes.from_user.id)
database.change.change_master_pass(engine, master_password) database.change.change_master_pass(engine, master_password)
_send_tmp_message( await _send_tmp_message(
bot, mes.chat.id, "Все ваши аккаунты удалены, а мастер пароль изменён" bot, mes.chat.id, "Все ваши аккаунты удалены, а мастер пароль изменён"
) )
del mes, text del mes, text
gc.collect() gc.collect()
def add_account(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None: async def add_account(bot: AsyncTeleBot, engine: Engine, mes: Message) -> None:
_base_handler(bot, mes) await _base_handler(bot, mes)
master_password_from_db = database.get.get_master_pass( master_password_from_db = database.get.get_master_pass(
engine, engine,
mes.from_user.id, mes.from_user.id,
) )
if master_password_from_db is None: if master_password_from_db is None:
return _send_tmp_message(bot, mes.chat.id, "Нет мастер пароля") return await _send_tmp_message(bot, mes.chat.id, "Нет мастер пароля")
bot_mes = bot.send_message(mes.chat.id, "Отправьте название аккаунта") bot_mes = await bot.send_message(
mes.chat.id,
"Отправьте название аккаунта",
)
bot.register_next_step_handler( _register_state(
mes, functools.partial(_add_account2, bot, engine, bot_mes) mes,
functools.partial(_add_account2, bot, engine, bot_mes),
) )
def _add_account2( async def _add_account2(
bot: telebot.TeleBot, engine: Engine, prev_mes: Message, mes: Message bot: AsyncTeleBot, engine: Engine, prev_mes: Message, mes: Message
) -> None: ) -> None:
_base_handler(bot, mes, prev_mes) await _base_handler(bot, mes, prev_mes)
text = mes.text.strip() text = mes.text.strip()
if text == "/cancel": if text == "/cancel":
return _send_tmp_message(bot, mes.chat.id, "Успешная отмена") return await _send_tmp_message(bot, mes.chat.id, "Успешная отмена")
if not check_account_name(text): if not check_account_name(text):
return _send_tmp_message( return await _send_tmp_message(
bot, bot,
mes.chat.id, mes.chat.id,
"Не корректное название аккаунта", "Не корректное название аккаунта",
) )
if text in database.get.get_accounts(engine, mes.from_user.id): if text in database.get.get_accounts(engine, mes.from_user.id):
return _send_tmp_message( return await _send_tmp_message(
bot, mes.chat.id, "Аккаунт с таким именем уже существует" bot, mes.chat.id, "Аккаунт с таким именем уже существует"
) )
bot_mes = bot.send_message(mes.chat.id, "Отправьте логин") bot_mes = await bot.send_message(mes.chat.id, "Отправьте логин")
data = {"name": text} data = {"name": text}
bot.register_next_step_handler( _register_state(
mes, functools.partial(_add_account3, bot, engine, bot_mes, data) mes,
functools.partial(_add_account3, bot, engine, bot_mes, data),
) )
def _add_account3( async def _add_account3(
bot: telebot.TeleBot, bot: AsyncTeleBot,
engine: Engine, engine: Engine,
prev_mes: Message, prev_mes: Message,
data: dict[str, str], data: dict[str, str],
mes: Message, mes: Message,
) -> None: ) -> None:
_base_handler(bot, mes, prev_mes) await _base_handler(bot, mes, prev_mes)
text = mes.text.strip() text = mes.text.strip()
if text == "/cancel": if text == "/cancel":
return _send_tmp_message(bot, mes.chat.id, "Успешная отмена") return await _send_tmp_message(bot, mes.chat.id, "Успешная отмена")
if not check_login(text): if not check_login(text):
return _send_tmp_message(bot, mes.chat.id, "Не корректный логин") return await _send_tmp_message(bot, mes.chat.id, "Не корректный логин")
data["login"] = text data["login"] = text
bot_mes = bot.send_message(mes.chat.id, "Отправьте пароль от аккаунта") bot_mes = await bot.send_message(
mes.chat.id,
"Отправьте пароль от аккаунта",
)
bot.register_next_step_handler( _register_state(
mes, functools.partial(_add_account4, bot, engine, bot_mes, data) mes,
functools.partial(_add_account4, bot, engine, bot_mes, data),
) )
def _add_account4( async def _add_account4(
bot: telebot.TeleBot, bot: AsyncTeleBot,
engine: Engine, engine: Engine,
prev_mes: Message, prev_mes: Message,
data: dict[str, str], data: dict[str, str],
mes: Message, mes: Message,
) -> None: ) -> None:
_base_handler(bot, mes, prev_mes) await _base_handler(bot, mes, prev_mes)
text = mes.text.strip() text = mes.text.strip()
if text == "/cancel": if text == "/cancel":
return _send_tmp_message(bot, mes.chat.id, "Успешная отмена") return await _send_tmp_message(bot, mes.chat.id, "Успешная отмена")
if not check_password(text): if not check_password(text):
return _send_tmp_message(bot, mes.chat.id, "Не корректный пароль") return await _send_tmp_message(
bot,
mes.chat.id,
"Не корректный пароль",
)
data["passwd"] = text data["passwd"] = text
bot_mes = bot.send_message(mes.chat.id, "Отправьте мастер пароль") bot_mes = await bot.send_message(mes.chat.id, "Отправьте мастер пароль")
bot.register_next_step_handler( _register_state(
mes, functools.partial(_add_account5, bot, engine, bot_mes, data) mes,
functools.partial(_add_account5, bot, engine, bot_mes, data),
) )
def _add_account5( async def _add_account5(
bot: telebot.TeleBot, bot: AsyncTeleBot,
engine: Engine, engine: Engine,
prev_mes: Message, prev_mes: Message,
data: dict[str, str], data: dict[str, str],
mes: Message, mes: Message,
) -> None: ) -> None:
_base_handler(bot, mes, prev_mes) await _base_handler(bot, mes, prev_mes)
text = mes.text.strip() text = mes.text.strip()
if text == "/cancel": if text == "/cancel":
return _send_tmp_message(bot, mes.chat.id, "Успешная отмена") return await _send_tmp_message(bot, mes.chat.id, "Успешная отмена")
master_password = database.get.get_master_pass(engine, mes.from_user.id) master_password = database.get.get_master_pass(engine, mes.from_user.id)
if not encryption.master_pass.check_master_pass(text, master_password): if not encryption.master_pass.check_master_pass(text, master_password):
return _send_tmp_message( return await _send_tmp_message(
bot, bot,
mes.chat.id, mes.chat.id,
"Не подходит главный пароль", "Не подходит главный пароль",
@ -307,7 +351,7 @@ def _add_account5(
encrypted_account, encrypted_account,
) )
_send_tmp_message( await _send_tmp_message(
bot, bot,
mes.chat.id, mes.chat.id,
"Успех" if result else "Произошла не предвиденная ошибка", "Успех" if result else "Произошла не предвиденная ошибка",
@ -318,47 +362,52 @@ def _add_account5(
gc.collect() gc.collect()
def get_account(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None: async def get_account(bot: AsyncTeleBot, engine: Engine, mes: Message) -> None:
_base_handler(bot, mes) await _base_handler(bot, mes)
master_pass = database.get.get_master_pass(engine, mes.from_user.id) master_pass = database.get.get_master_pass(engine, mes.from_user.id)
if master_pass is None: if master_pass is None:
return _send_tmp_message(bot, mes.chat.id, "Нет мастер пароля") return await _send_tmp_message(bot, mes.chat.id, "Нет мастер пароля")
bot_mes = bot.send_message(mes.chat.id, "Отправьте название аккаунта") bot_mes = await bot.send_message(
bot.register_next_step_handler( mes.chat.id,
mes, functools.partial(_get_account2, bot, engine, bot_mes) "Отправьте название аккаунта",
)
_register_state(
mes,
functools.partial(_get_account2, bot, engine, bot_mes),
) )
def _get_account2( async def _get_account2(
bot: telebot.TeleBot, engine: Engine, prev_mes: Message, mes: Message bot: AsyncTeleBot, engine: Engine, prev_mes: Message, mes: Message
) -> None: ) -> None:
_base_handler(bot, mes, prev_mes) await _base_handler(bot, mes, prev_mes)
text = mes.text.strip() text = mes.text.strip()
if text == "/cancel": if text == "/cancel":
return _send_tmp_message(bot, mes.chat.id, "Успешная отмена") return await _send_tmp_message(bot, mes.chat.id, "Успешная отмена")
if text not in database.get.get_accounts(engine, mes.from_user.id): if text not in database.get.get_accounts(engine, mes.from_user.id):
return _send_tmp_message(bot, mes.chat.id, "Нет такого аккаунта") return await _send_tmp_message(bot, mes.chat.id, "Нет такого аккаунта")
bot_mes = bot.send_message(mes.chat.id, "Отправьте мастер пароль") bot_mes = await bot.send_message(mes.chat.id, "Отправьте мастер пароль")
bot.register_next_step_handler( _register_state(
mes, functools.partial(_get_account3, bot, engine, bot_mes, text) mes,
functools.partial(_get_account3, bot, engine, bot_mes, text),
) )
def _get_account3( async def _get_account3(
bot: telebot.TeleBot, bot: AsyncTeleBot,
engine: Engine, engine: Engine,
prev_mes: Message, prev_mes: Message,
name: str, name: str,
mes: Message, mes: Message,
) -> None: ) -> None:
_base_handler(bot, mes, prev_mes) await _base_handler(bot, mes, prev_mes)
text = mes.text.strip() text = mes.text.strip()
if text == "/cancel": if text == "/cancel":
return _send_tmp_message(bot, mes.chat.id, "Успешная отмена") return await _send_tmp_message(bot, mes.chat.id, "Успешная отмена")
master_password = database.get.get_master_pass( master_password = database.get.get_master_pass(
engine, engine,
@ -366,14 +415,18 @@ def _get_account3(
) )
if not encryption.master_pass.check_master_pass(text, master_password): if not encryption.master_pass.check_master_pass(text, master_password):
return _send_tmp_message(bot, mes.chat.id, "Не подходит мастер пароль") return await _send_tmp_message(
bot,
mes.chat.id,
"Не подходит мастер пароль",
)
account = database.get.get_account_info(engine, mes.from_user.id, name) account = database.get.get_account_info(engine, mes.from_user.id, name)
account = encryption.other_accounts.decrypt( account = encryption.other_accounts.decrypt(
account, account,
text, text,
) )
_send_tmp_message( await _send_tmp_message(
bot, bot,
mes.chat.id, mes.chat.id,
f"Логин:\n`{account.login}`\nПароль:\n`{account.password}`\nНажмите " f"Логин:\n`{account.login}`\nПароль:\n`{account.password}`\nНажмите "
@ -385,64 +438,69 @@ def _get_account3(
gc.collect() gc.collect()
def delete_account(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None: async def delete_account(
_base_handler(bot, mes) bot: AsyncTeleBot,
engine: Engine,
mes: Message,
) -> None:
await _base_handler(bot, mes)
master_pass = database.get.get_master_pass(engine, mes.from_user.id) master_pass = database.get.get_master_pass(engine, mes.from_user.id)
if master_pass is None: if master_pass is None:
return _send_tmp_message(bot, mes.chat.id, "Нет мастер пароля") return await _send_tmp_message(bot, mes.chat.id, "Нет мастер пароля")
bot_mes = bot.send_message( bot_mes = await bot.send_message(
mes.chat.id, "Отправьте название аккаунта, который вы хотите удалить" mes.chat.id, "Отправьте название аккаунта, который вы хотите удалить"
) )
bot.register_next_step_handler( _register_state(
mes, functools.partial(_delete_account2, bot, engine, bot_mes) mes,
functools.partial(_delete_account2, bot, engine, bot_mes),
) )
def _delete_account2( async def _delete_account2(
bot: telebot.TeleBot, bot: AsyncTeleBot,
engine: Engine, engine: Engine,
prev_mes: Message, prev_mes: Message,
mes: Message, mes: Message,
): ):
_base_handler(bot, mes, prev_mes) await _base_handler(bot, mes, prev_mes)
text = mes.text.strip() text = mes.text.strip()
if text == "/cancel": if text == "/cancel":
return _send_tmp_message(bot, mes.chat.id, "Успешная отмена") return await _send_tmp_message(bot, mes.chat.id, "Успешная отмена")
if text not in database.get.get_accounts(engine, mes.from_user.id): if text not in database.get.get_accounts(engine, mes.from_user.id):
return _send_tmp_message(bot, mes.chat.id, "Нет такого аккаунта") return await _send_tmp_message(bot, mes.chat.id, "Нет такого аккаунта")
bot_mes = bot.send_message( bot_mes = await bot.send_message(
mes.from_user.id, mes.from_user.id,
f'Вы уверены, что хотите удалить аккаунт "{text}"?\nОтправьте YES для ' f'Вы уверены, что хотите удалить аккаунт "{text}"?\nОтправьте YES для '
"подтверждения", "подтверждения",
) )
bot.register_next_step_handler( _register_state(
mes, functools.partial(_delete_account3, bot, engine, bot_mes, text) mes, functools.partial(_delete_account3, bot, engine, bot_mes, text)
) )
def _delete_account3( async def _delete_account3(
bot: telebot.TeleBot, bot: AsyncTeleBot,
engine: Engine, engine: Engine,
prev_mes: Message, prev_mes: Message,
account_name: str, account_name: str,
mes: Message, mes: Message,
) -> None: ) -> None:
_base_handler(bot, mes, prev_mes) await _base_handler(bot, mes, prev_mes)
text = mes.text.strip() text = mes.text.strip()
if text != "YES": if text != "YES":
return _send_tmp_message(bot, mes.chat.id, "Успешная отмена") return await _send_tmp_message(bot, mes.chat.id, "Успешная отмена")
database.delete.delete_account(engine, mes.from_user.id, account_name) database.delete.delete_account(engine, mes.from_user.id, account_name)
_send_tmp_message(bot, mes.chat.id, "Аккаунт удалён") await _send_tmp_message(bot, mes.chat.id, "Аккаунт удалён")
def help_command(bot: telebot.TeleBot, mes: Message) -> None: async def help_command(bot: AsyncTeleBot, mes: Message) -> None:
message = """Команды: message = """Команды:
/set_master_pass - установить мастер пароль /set_master_pass - установить мастер пароль
/add_account - создать аккаунт /add_account - создать аккаунт
@ -457,137 +515,145 @@ def help_command(bot: telebot.TeleBot, mes: Message) -> None:
/import - импортировать пароли из json в файле в таком же формате, \ /import - импортировать пароли из json в файле в таком же формате, \
как из /export как из /export
/gen_password - создать 10 надёжных паролей""" /gen_password - создать 10 надёжных паролей"""
bot.send_message(mes.chat.id, message) await bot.send_message(mes.chat.id, message)
def cancel(bot: telebot.TeleBot, mes: Message) -> None: async def cancel(bot: AsyncTeleBot, mes: Message) -> None:
_base_handler(bot, mes) await _base_handler(bot, mes)
_send_tmp_message(bot, mes.chat.id, "Нет активного действия") await _send_tmp_message(bot, mes.chat.id, "Нет активного действия")
def export(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None: async def export(bot: AsyncTeleBot, engine: Engine, mes: Message) -> None:
_base_handler(bot, mes) await _base_handler(bot, mes)
master_password_from_db = database.get.get_master_pass( master_password_from_db = database.get.get_master_pass(
engine, engine,
mes.from_user.id, mes.from_user.id,
) )
if master_password_from_db is None: if master_password_from_db is None:
return _send_tmp_message(bot, mes.chat.id, "Нет мастер пароля") return await _send_tmp_message(bot, mes.chat.id, "Нет мастер пароля")
if not database.get.get_accounts(engine, mes.from_user.id): if not database.get.get_accounts(engine, mes.from_user.id):
return _send_tmp_message(bot, mes.chat.id, "Нет аккаунтов") return await _send_tmp_message(bot, mes.chat.id, "Нет аккаунтов")
bot_mes = bot.send_message(mes.chat.id, "Отправьте мастер пароль") bot_mes = await bot.send_message(mes.chat.id, "Отправьте мастер пароль")
bot.register_next_step_handler( _register_state(mes, functools.partial(_export2, bot, engine, bot_mes))
mes, functools.partial(_export2, bot, engine, bot_mes)
)
def _export2( async def _export2(
bot: telebot.TeleBot, engine: Engine, prev_mes: Message, mes: Message bot: AsyncTeleBot, engine: Engine, prev_mes: Message, mes: Message
) -> None: ) -> None:
_base_handler(bot, mes, prev_mes) await _base_handler(bot, mes, prev_mes)
text = mes.text.strip() text = mes.text.strip()
if text == "/cancel": if text == "/cancel":
return _send_tmp_message(bot, mes.chat.id, "Успешная отмена") return await _send_tmp_message(bot, mes.chat.id, "Успешная отмена")
master_password = database.get.get_master_pass( master_password = database.get.get_master_pass(
engine, engine,
mes.from_user.id, mes.from_user.id,
) )
if not encryption.master_pass.check_master_pass(text, master_password): if not encryption.master_pass.check_master_pass(text, master_password):
return _send_tmp_message(bot, mes.chat.id, "Не подходит мастер пароль") return await _send_tmp_message(
bot,
mes.chat.id,
"Не подходит мастер пароль",
)
accounts = database.get.get_all_accounts(engine, mes.from_user.id) accounts = database.get.get_all_accounts(engine, mes.from_user.id)
accounts = encryption.other_accounts.decrypt_multiple(accounts, text) accounts = encryption.other_accounts.decrypt_multiple(accounts, text)
json_io = accounts_to_json(accounts) json_io = accounts_to_json(accounts)
bot_mes = bot.send_document(mes.chat.id, json_io) bot_mes = await bot.send_document(mes.chat.id, json_io)
del text, accounts, json_io del text, accounts, json_io
gc.collect() gc.collect()
time.sleep(30) await _delete_message(bot, bot_mes, sleep_time=30)
_delete_message(bot, bot_mes)
def import_accounts( async def import_accounts(
bot: telebot.TeleBot, bot: AsyncTeleBot,
engine: Engine, engine: Engine,
mes: Message, mes: Message,
) -> None: ) -> None:
_base_handler(bot, mes) await _base_handler(bot, mes)
master_password_from_db = database.get.get_master_pass( master_password_from_db = database.get.get_master_pass(
engine, engine,
mes.from_user.id, mes.from_user.id,
) )
if master_password_from_db is None: if master_password_from_db is None:
return _send_tmp_message(bot, mes.chat.id, "Нет мастер пароля") return await _send_tmp_message(bot, mes.chat.id, "Нет мастер пароля")
bot_mes = bot.send_message(mes.chat.id, "Отправьте json файл") bot_mes = await bot.send_message(mes.chat.id, "Отправьте json файл")
bot.register_next_step_handler( _register_state(mes, functools.partial(_import2, bot, engine, bot_mes))
mes, functools.partial(_import2, bot, engine, bot_mes)
)
def _import2( async def _import2(
bot: telebot.TeleBot, engine: Engine, prev_mes: Message, mes: Message bot: AsyncTeleBot, engine: Engine, prev_mes: Message, mes: Message
) -> None: ) -> None:
_base_handler(bot, mes, prev_mes) await _base_handler(bot, mes, prev_mes)
if mes.text is not None: if mes.text is not None:
text = mes.text.strip() text = mes.text.strip()
if text == "/cancel": if text == "/cancel":
return _send_tmp_message(bot, mes.chat.id, "Успешная отмена") return await _send_tmp_message(bot, mes.chat.id, "Успешная отмена")
if mes.document is None: if mes.document is None:
return _send_tmp_message( return await _send_tmp_message(
bot, bot,
mes.chat.id, mes.chat.id,
"Вы должны отправить документ", "Вы должны отправить документ",
) )
if mes.document.file_size > 102_400: # If file size is bigger than 100 MB if mes.document.file_size > 102_400: # If file size is bigger than 100 MB
return _send_tmp_message(bot, mes.chat.id, "Файл слишком большой") return await _send_tmp_message(
bot,
mes.chat.id,
"Файл слишком большой",
)
file_info = bot.get_file(mes.document.file_id) file_info = await bot.get_file(mes.document.file_id)
downloaded_file = bot.download_file(file_info.file_path) downloaded_file = await bot.download_file(file_info.file_path)
try: try:
accounts = json_to_accounts( accounts = json_to_accounts(
downloaded_file.decode("utf-8"), downloaded_file.decode("utf-8"),
mes.from_user.id, mes.from_user.id,
) )
except Exception: except Exception:
return _send_tmp_message( return await _send_tmp_message(
bot, bot,
mes.chat.id, mes.chat.id,
"Ошибка во время работы с файлом", "Ошибка во время работы с файлом",
) )
bot_mes = bot.send_message(mes.chat.id, "Отправьте мастер пароль") bot_mes = await bot.send_message(mes.chat.id, "Отправьте мастер пароль")
bot.register_next_step_handler( _register_state(
mes, functools.partial(_import3, bot, engine, bot_mes, accounts) mes,
functools.partial(_import3, bot, engine, bot_mes, accounts),
) )
def _import3( async def _import3(
bot: telebot.TeleBot, bot: AsyncTeleBot,
engine: Engine, engine: Engine,
prev_mes: Message, prev_mes: Message,
accounts: list[DecryptedAccount], accounts: list[DecryptedAccount],
mes: Message, mes: Message,
) -> None: ) -> None:
_base_handler(bot, mes, prev_mes) await _base_handler(bot, mes, prev_mes)
text = mes.text.strip() text = mes.text.strip()
if text == "/cancel": if text == "/cancel":
return _send_tmp_message(bot, mes.chat.id, "Успешная отмена") return await _send_tmp_message(bot, mes.chat.id, "Успешная отмена")
master_password = database.get.get_master_pass( master_password = database.get.get_master_pass(
engine, engine,
mes.from_user.id, mes.from_user.id,
) )
if not encryption.master_pass.check_master_pass(text, master_password): if not encryption.master_pass.check_master_pass(text, master_password):
return _send_tmp_message(bot, mes.chat.id, "Не подходит мастер пароль") return await _send_tmp_message(
bot,
mes.chat.id,
"Не подходит мастер пароль",
)
# List of names of accounts, which failed to be added to the database # List of names of accounts, which failed to be added to the database
# or failed the tests # or failed the tests
@ -609,13 +675,13 @@ def _import3(
else: else:
mes_text = "Успех" mes_text = "Успех"
_send_tmp_message(bot, mes.chat.id, mes_text, 10) await _send_tmp_message(bot, mes.chat.id, mes_text, 10)
del text, mes, accounts del text, mes, accounts
gc.collect() gc.collect()
def gen_password(bot: telebot.TeleBot, mes: Message) -> None: async def gen_password(bot: AsyncTeleBot, mes: Message) -> None:
_base_handler(bot, mes) await _base_handler(bot, mes)
# Generate 10 passwords and put 'em in the backticks # Generate 10 passwords and put 'em in the backticks
passwords = (f"`{generate_password.gen_password()}`" for _ in range(10)) passwords = (f"`{generate_password.gen_password()}`" for _ in range(10))
text = ( text = (
@ -623,4 +689,17 @@ def gen_password(bot: telebot.TeleBot, mes: Message) -> None:
+ "\n".join(passwords) + "\n".join(passwords)
+ "\nНажмите на пароль, чтобы его скопировать" + "\nНажмите на пароль, чтобы его скопировать"
) )
_send_tmp_message(bot, mes.chat.id, text, 15) await _send_tmp_message(bot, mes.chat.id, text, 15)
async def message_handler(bot: AsyncTeleBot, mes: Message) -> None:
handler = states.get((mes.chat.id, mes.from_user.id))
if handler is None:
await _delete_message(bot, mes)
await _send_tmp_message(
bot,
mes.chat.id,
"Вы отправили не корректное сообщение",
)
return
await handler(mes)

View File

@ -7,8 +7,8 @@ from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from ..decrypted_account import DecryptedAccount
from ..database.models import Account from ..database.models import Account
from ..decrypted_account import DecryptedAccount
def _generate_key(salt: bytes, master_pass: bytes) -> bytes: def _generate_key(salt: bytes, master_pass: bytes) -> bytes: