This repository has been archived on 2023-08-08. You can view files and clone it, but cannot push or open issues or pull requests.
PassManager/src/bot/handlers.py

459 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import functools
import gc
import time
import telebot
from sqlalchemy.future import Engine
from .. import cryptography, database
from .utils import (
accounts_to_json,
base_handler,
check_account_name,
check_login,
check_passwd,
get_all_accounts,
json_to_accounts,
send_tmp_message,
)
Message = telebot.types.Message
def get_accounts(
bot: telebot.TeleBot, engine: Engine, mes: telebot.types.Message
) -> None:
base_handler(bot, mes)
accounts = database.get.get_accounts(engine, mes.from_user.id)
if accounts:
accounts = [f"`{account}`" for account in accounts]
return send_tmp_message(
bot,
mes.chat.id,
"Ваши аккаунты:\n"
+ "\n".join(accounts)
+ "\nНажмите на название, чтобы скопировать",
30,
)
send_tmp_message(bot, mes.chat.id, "У вас нет аккаунтов")
def delete_all(
bot: telebot.TeleBot, engine: Engine, mes: telebot.types.Message
) -> None:
base_handler(bot, mes)
bot_mes = bot.send_message(
mes.chat.id,
"Вы действительно хотите удалить все ваши аккаунты? Это действие нельзя отменить. Отправьте YES для подтверждения",
)
bot.register_next_step_handler(
mes, functools.partial(_delete_all, bot, engine, bot_mes)
)
def _delete_all(
bot: telebot.TeleBot, engine: Engine, prev_mes: Message, mes: Message
) -> None:
base_handler(bot, mes, prev_mes)
text = mes.text.strip()
if text == "YES":
database.delete.purge_accounts(engine, mes.from_user.id)
database.delete.delete_master_pass(engine, mes.from_user.id)
send_tmp_message(bot, mes.chat.id, "Всё успешно удалено", timeout=10)
else:
send_tmp_message(bot, mes.chat.id, "Вы отправили не YES, ничего не удалено")
def set_master_password(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None:
base_handler(bot, mes, None)
if database.get.get_master_pass(engine, mes.from_user.id) is not None:
return send_tmp_message(bot, mes.chat.id, "Мастер пароль уже существует")
bot_mes = bot.send_message(mes.chat.id, "Отправьте мастер пароль")
bot.register_next_step_handler(
mes, functools.partial(_set_master_pass2, bot, engine, bot_mes)
)
def _set_master_pass2(
bot: telebot.TeleBot, engine: Engine, prev_mes: Message, mes: Message
) -> None:
base_handler(bot, mes, prev_mes)
text = mes.text.strip()
if text == "/cancel":
return send_tmp_message(bot, mes.chat.id, "Успешная отмена")
hash_, salt = cryptography.master_pass.encrypt_master_pass(text)
database.add.add_master_pass(engine, mes.from_user.id, salt, hash_)
send_tmp_message(bot, mes.chat.id, "Успех")
del mes, text
gc.collect()
def reset_master_pass(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None:
base_handler(bot, mes)
if database.get.get_master_pass(engine, mes.from_user.id) is None:
return send_tmp_message(bot, mes.chat.id, "Мастер пароль не задан")
bot_mes = bot.send_message(
mes.chat.id,
"Отправьте новый мастер пароль, осторожно, все текущие аккаунты будут удалены навсегда",
)
bot.register_next_step_handler(
mes, functools.partial(_reset_master_pass2, bot, engine, bot_mes)
)
def _reset_master_pass2(
bot: telebot.TeleBot, engine: Engine, prev_mes: Message, mes: Message
) -> None:
base_handler(bot, mes, prev_mes)
text = mes.text.strip()
if text == "/cancel":
return send_tmp_message(bot, mes.chat.id, "Успешная отмена")
hash_, salt = cryptography.master_pass.encrypt_master_pass(text)
database.delete.purge_accounts(engine, mes.from_user.id)
database.change.change_master_pass(engine, mes.from_user.id, salt, hash_)
send_tmp_message(
bot, mes.chat.id, "Все ваши аккаунты удалены, а мастер пароль изменён"
)
del mes, text
gc.collect()
def add_account(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None:
base_handler(bot, mes)
master_password_from_db = database.get.get_master_pass(engine, mes.from_user.id)
if master_password_from_db is None:
return send_tmp_message(bot, mes.chat.id, "Нет мастер пароля")
bot_mes = bot.send_message(mes.chat.id, "Отправьте название аккаунта")
bot.register_next_step_handler(
mes, functools.partial(_add_account2, bot, engine, bot_mes)
)
def _add_account2(
bot: telebot.TeleBot, engine: Engine, prev_mes: Message, mes: Message
) -> None:
base_handler(bot, mes, prev_mes)
text = mes.text.strip()
if text == "/cancel":
return send_tmp_message(bot, mes.chat.id, "Успешная отмена")
if not check_account_name(text):
return send_tmp_message(bot, mes.chat.id, "Не корректное название аккаунта")
if text in database.get.get_accounts(engine, mes.from_user.id):
return send_tmp_message(
bot, mes.chat.id, "Аккаунт с таким именем уже существует"
)
bot_mes = bot.send_message(mes.chat.id, "Отправьте логин")
data = {"name": text}
bot.register_next_step_handler(
mes, functools.partial(_add_account3, bot, engine, bot_mes, data)
)
def _add_account3(
bot: telebot.TeleBot,
engine: Engine,
prev_mes: Message,
data: dict[str, str],
mes: Message,
) -> None:
base_handler(bot, mes, prev_mes)
text = mes.text.strip()
if text == "/cancel":
return send_tmp_message(bot, mes.chat.id, "Успешная отмена")
if not check_login(text):
return send_tmp_message(bot, mes.chat.id, "Не корректный логин")
data["login"] = text
bot_mes = bot.send_message(mes.chat.id, "Отправьте пароль от аккаунта")
bot.register_next_step_handler(
mes, functools.partial(_add_account4, bot, engine, bot_mes, data)
)
def _add_account4(
bot: telebot.TeleBot,
engine: Engine,
prev_mes: Message,
data: dict[str, str],
mes: Message,
) -> None:
base_handler(bot, mes, prev_mes)
text = mes.text.strip()
if text == "/cancel":
return send_tmp_message(bot, mes.chat.id, "Успешная отмена")
if not check_passwd(text):
return send_tmp_message(bot, mes.chat.id, "Не корректный пароль")
data["passwd"] = text
bot_mes = bot.send_message(mes.chat.id, "Отправьте мастер пароль")
bot.register_next_step_handler(
mes, functools.partial(_add_account5, bot, engine, bot_mes, data)
)
def _add_account5(
bot: telebot.TeleBot,
engine: Engine,
prev_mes: Message,
data: dict[str, str],
mes: Message,
) -> None:
base_handler(bot, mes, prev_mes)
text = mes.text.strip()
if text == "/cancel":
return send_tmp_message(bot, mes.chat.id, "Успешная отмена")
salt, hash_ = database.get.get_master_pass(engine, mes.from_user.id)
if cryptography.master_pass.encrypt_master_pass(text, salt) != hash_:
return send_tmp_message(bot, mes.chat.id, "Не подходит главный пароль")
name, login, passwd = data["name"], data["login"], data["passwd"]
enc_login, enc_pass, salt = cryptography.other_accounts.encrypt_account_info(
login, passwd, text.encode("utf-8")
)
result = database.add.add_account(
engine, mes.from_user.id, name, salt, enc_login, enc_pass
)
send_tmp_message(
bot, mes.chat.id, "Успех" if result else "Произошла не предвиденная ошибка"
)
del data, name, login, passwd, enc_login
gc.collect()
def get_account(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None:
base_handler(bot, mes)
bot_mes = bot.send_message(mes.chat.id, "Отправьте название аккаунта")
bot.register_next_step_handler(
mes, functools.partial(_get_account2, bot, engine, bot_mes)
)
def _get_account2(
bot: telebot.TeleBot, engine: Engine, prev_mes: Message, mes: Message
) -> None:
base_handler(bot, mes, prev_mes)
text = mes.text.strip()
if text == "/cancel":
return send_tmp_message(bot, mes.chat.id, "Успешная отмена")
if text not in database.get.get_accounts(engine, mes.from_user.id):
return send_tmp_message(bot, mes.chat.id, "Нет такого аккаунта")
bot_mes = bot.send_message(mes.chat.id, "Отправьте мастер пароль")
bot.register_next_step_handler(
mes, functools.partial(_get_account3, bot, engine, bot_mes, text)
)
def _get_account3(
bot: telebot.TeleBot, engine: Engine, prev_mes: Message, name: str, mes: Message
) -> None:
base_handler(bot, mes, prev_mes)
text = mes.text.strip()
if text == "/cancel":
return send_tmp_message(bot, mes.chat.id, "Успешная отмена")
master_pass = database.get.get_master_pass(engine, mes.from_user.id)
if master_pass is None:
return send_tmp_message(bot, mes.chat.id, "Нет мастер пароля")
master_salt, hash_pass = master_pass
if cryptography.master_pass.encrypt_master_pass(text, master_salt) != hash_pass:
return send_tmp_message(bot, mes.chat.id, "Не подходит мастер пароль")
salt, enc_login, enc_pass = database.get.get_account_info(
engine, mes.from_user.id, name
)
login, passwd = cryptography.other_accounts.decrypt_account_info(
enc_login, enc_pass, text.encode("utf-8"), salt
)
send_tmp_message(
bot,
mes.chat.id,
f"Логин:\n`{login}`\nПароль:\n`{passwd}`\nНажмите на логин или пароль, чтобы скопировать",
30,
)
del text, mes, passwd, login
gc.collect()
def delete_account(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None:
base_handler(bot, mes)
bot_mes = bot.send_message(
mes.chat.id, "Отправьте название аккаунта, который вы хотите удалить"
)
bot.register_next_step_handler(
mes, functools.partial(_delete_account2, bot, engine, bot_mes)
)
def _delete_account2(
bot: telebot.TeleBot, engine: Engine, prev_mes: Message, mes: Message
) -> None:
base_handler(bot, mes, prev_mes)
text = mes.text.strip()
if text == "/cancel":
return send_tmp_message(bot, mes.chat.id, "Успешная отмена")
if text not in database.get.get_accounts(engine, mes.from_user.id):
return send_tmp_message(bot, mes.chat.id, "Нет такого аккаунта")
database.delete.delete_account(engine, mes.from_user.id, text)
send_tmp_message(bot, mes.chat.id, "Аккаунт удалён")
def help(bot: telebot.TeleBot, mes: telebot.types.Message) -> None:
message = """Команды:
/set_master_pass - установить мастер пароль
/add_account - создать аккаунт
/get_accounts - получить список аккаунтов
/get_account - получить логин и пароль аккаунта
/delete_account - удалить аккаунт
/delete_all - удалить все аккаунты и мастер пароль
/reset_master_pass - удалить все аккаунты и изменить мастер пароль
/cancel - отмена текущего действия
/help - помощь
/export - получить пароли в json формате
/import - импортировать пароли из json в файле в таком же формате, как из /export"""
bot.send_message(mes.chat.id, message)
def cancel(bot: telebot.TeleBot, mes: Message) -> None:
send_tmp_message(bot, mes.chat.id, "Нет активного действия")
def export(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None:
base_handler(bot, mes)
master_password_from_db = database.get.get_master_pass(engine, mes.from_user.id)
if master_password_from_db is None:
return send_tmp_message(bot, mes.chat.id, "Нет мастер пароля")
if not database.get.get_accounts(engine, mes.from_user.id):
return send_tmp_message(bot, mes.chat.id, "Нет аккаунтов")
bot_mes = bot.send_message(mes.chat.id, "Отправьте мастер пароль")
bot.register_next_step_handler(
mes, functools.partial(_export2, bot, engine, bot_mes)
)
def _export2(
bot: telebot.TeleBot, engine: Engine, prev_mes: Message, mes: Message
) -> None:
base_handler(bot, mes, prev_mes)
text = mes.text.strip()
if text == "/cancel":
return send_tmp_message(bot, mes.chat.id, "Успешная отмена")
master_salt, hash_pass = database.get.get_master_pass(engine, mes.from_user.id)
if cryptography.master_pass.encrypt_master_pass(text, master_salt) != hash_pass:
return send_tmp_message(bot, mes.chat.id, "Не подходит мастер пароль")
accounts = get_all_accounts(engine, mes.from_user.id, text)
json_io = accounts_to_json(accounts)
bot_mes = bot.send_document(mes.chat.id, json_io)
del text, accounts, json_io
gc.collect()
time.sleep(30)
bot.delete_message(bot_mes.chat.id, bot_mes.id)
def import_accounts(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None:
base_handler(bot, mes)
master_password_from_db = database.get.get_master_pass(engine, mes.from_user.id)
if master_password_from_db is None:
return send_tmp_message(bot, mes.chat.id, "Нет мастер пароля")
bot_mes = bot.send_message(mes.chat.id, "Отправьте json файл")
bot.register_next_step_handler(
mes, functools.partial(_import2, bot, engine, bot_mes)
)
def _import2(
bot: telebot.TeleBot, engine: Engine, prev_mes: Message, mes: Message
) -> None:
base_handler(bot, mes, prev_mes)
if mes.text is not None:
text = mes.text.strip()
if text == "/cancel":
return send_tmp_message(bot, mes.chat.id, "Успешная отмена")
if mes.document is None:
send_tmp_message(bot, mes.chat.id, "Вы должны отправить документ")
file_info = bot.get_file(mes.document.file_id)
downloaded_file = bot.download_file(file_info.file_path)
try:
accounts = json_to_accounts(downloaded_file.decode("utf-8"))
except Exception:
send_tmp_message(bot, mes.chat.id, "Ошибка во время работы с файлом")
bot_mes = bot.send_message(mes.chat.id, "Отправьте мастер пароль")
bot.register_next_step_handler(
mes, functools.partial(_import3, bot, engine, bot_mes, accounts)
)
def _import3(
bot: telebot.TeleBot,
engine: Engine,
prev_mes: Message,
accounts: list[tuple[str, str, str]],
mes: Message,
) -> None:
base_handler(bot, mes, prev_mes)
text = mes.text.strip()
if text == "/cancel":
return send_tmp_message(bot, mes.chat.id, "Успешная отмена")
master_salt, hash_pass = database.get.get_master_pass(engine, mes.from_user.id)
if cryptography.master_pass.encrypt_master_pass(text, master_salt) != hash_pass:
return send_tmp_message(bot, mes.chat.id, "Не подходит мастер пароль")
failed: list[str] = []
for account in accounts:
name, login, passwd = account
enc_login, enc_passwd, salt = cryptography.other_accounts.encrypt_account_info(
login, passwd, text.encode("utf-8")
)
result = database.add.add_account(
engine, mes.from_user.id, name, salt, enc_login, enc_passwd
)
if not result:
failed.append(name)
if failed:
mes_text = "Не удалось добавить:\n" + "\n".join(failed)
else:
mes_text = "Успех"
send_tmp_message(bot, mes.chat.id, mes_text, 10)
del text, mes, accounts
gc.collect()