Compare commits

...

7 Commits

17 changed files with 314 additions and 251 deletions

View File

@ -1 +1 @@
black black

View File

@ -3,4 +3,4 @@ pymysql
python-dotenv python-dotenv
pyTelegramBotAPI pyTelegramBotAPI
sqlmodel sqlmodel
pydantic pydantic

View File

@ -2,9 +2,23 @@ import os
from dotenv import load_dotenv from dotenv import load_dotenv
from . import bot, cryptography, database from . import (
account_checks,
account_parsing,
bot,
cryptography,
database,
generate_password,
)
__all__ = ["bot", "cryptography", "database"] __all__ = [
"account_checks",
"account_parsing",
"bot",
"cryptography",
"database",
"generate_password",
]
def main() -> None: def main() -> None:

42
src/account_checks.py Normal file
View File

@ -0,0 +1,42 @@
import string
FORBIDDEN_CHARS = frozenset("`\n")
PUNCTUATION = frozenset(string.punctuation).difference(FORBIDDEN_CHARS)
def _base_check(val: str, /) -> bool:
"Returns false if finds new lines or backtick (`)"
return not any(i in FORBIDDEN_CHARS for i in val)
def check_account_name(name: str) -> bool:
"Returns true if account name is valid"
return _base_check(name)
def check_login(login: str) -> bool:
"Returns true if login is valid"
return _base_check(login)
def check_password(passwd: str) -> bool:
"Returns true if password is valid"
return _base_check(passwd)
def check_account(name: str, login: str, passwd: str) -> bool:
"""Runs checks for account name, login and password"""
return check_account_name(name) and check_login(login) and check_password(passwd)
def check_gened_password(passwd: str, /) -> bool:
"""Retuns true if generated password is valid,
false otherwise.
Password is valid if there is at least one lowercase character,
uppercase character and one punctuation character"""
return (
any(c.islower() for c in passwd)
and any(c.isupper() for c in passwd)
and any(c.isdigit() for c in passwd)
and any(c in PUNCTUATION for c in passwd)
)

37
src/account_parsing.py Normal file
View File

@ -0,0 +1,37 @@
import io
from typing import Iterator, Self, Type
import pydantic
class _Account(pydantic.BaseModel):
name: str
login: str
password: str
@classmethod
def from_tuple(cls: Type[Self], tuple_: tuple[str, str, str]) -> Self:
return cls(name=tuple_[0], login=tuple_[1], passwd=tuple_[2])
def as_tuple(self: Self) -> tuple[str, str, str]:
return (self.name, self.login, self.password)
class _Accounts(pydantic.BaseModel):
accounts: list[_Account] = pydantic.Field(default_factory=list)
def _accounts_list_to_json(accounts: Iterator[tuple[str, str, str]]) -> str:
accounts = _Accounts(accounts=[_Account.from_tuple(i) for i in accounts])
return accounts.json()
def json_to_accounts(json_: str) -> list[tuple[str, str, str]]:
accounts = _Accounts.parse_raw(json_)
return [i.as_tuple() for i in accounts.accounts]
def accounts_to_json(accounts: Iterator[tuple[str, str, str]]) -> io.StringIO:
file = io.StringIO(_accounts_list_to_json(accounts))
file.name = "passwords.json"
return file

View File

@ -1,11 +1,11 @@
import functools import functools
from sqlalchemy.future import Engine
import telebot import telebot
from sqlalchemy.future import Engine
from . import handlers, utils from . import handlers
__all__ = ["handlers", "utils"] __all__ = ["handlers"]
def create_bot(token: str, engine: Engine) -> telebot.TeleBot: def create_bot(token: str, engine: Engine) -> telebot.TeleBot:

View File

@ -5,32 +5,46 @@ import time
import telebot import telebot
from sqlalchemy.future import Engine from sqlalchemy.future import Engine
from .. import cryptography, database from .. import cryptography, database, generate_password
from .utils import ( from ..account_checks import (
accounts_to_json,
base_handler,
check_account, check_account,
check_account_name, check_account_name,
check_login, check_login,
check_passwd, check_password,
gen_passwd,
get_all_accounts,
json_to_accounts,
send_tmp_message,
) )
from ..account_parsing import accounts_to_json, json_to_accounts
Message = telebot.types.Message Message = telebot.types.Message
def _send_tmp_message(
bot: telebot.TeleBot,
chat_id: telebot.types.Message,
text: str,
timeout: int = 5,
) -> None:
bot_mes = bot.send_message(chat_id, text, parse_mode="MarkdownV2")
time.sleep(timeout)
bot.delete_message(chat_id, bot_mes.id)
def _base_handler(
bot: telebot.TeleBot, mes: Message, prev_mes: Message | None = None
) -> None:
bot.delete_message(mes.chat.id, mes.id)
if prev_mes is not None:
bot.delete_message(prev_mes.chat.id, prev_mes.id)
def get_accounts(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None: def get_accounts(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None:
base_handler(bot, mes) _base_handler(bot, mes)
accounts = database.get.get_accounts(engine, mes.from_user.id) accounts = database.get.get_accounts(engine, mes.from_user.id)
if not accounts: if not accounts:
return send_tmp_message(bot, mes.chat.id, "У вас нет аккаунтов") return _send_tmp_message(bot, mes.chat.id, "У вас нет аккаунтов")
# Make accounts copyable and escape special chars # Make accounts copyable and escape special chars
accounts = [f"`{account}`" for account in accounts] accounts = [f"`{account}`" for account in accounts]
send_tmp_message( _send_tmp_message(
bot, bot,
mes.chat.id, mes.chat.id,
"Ваши аккаунты:\n" "Ваши аккаунты:\n"
@ -41,10 +55,11 @@ def get_accounts(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None:
def delete_all(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None: def delete_all(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None:
base_handler(bot, mes) _base_handler(bot, mes)
bot_mes = bot.send_message( bot_mes = bot.send_message(
mes.chat.id, mes.chat.id,
"Вы действительно хотите удалить все ваши аккаунты? Это действие нельзя отменить. Отправьте YES для подтверждения", "Вы действительно хотите удалить все ваши аккаунты? Это действие нельзя отменить. "
"Отправьте YES для подтверждения",
) )
bot.register_next_step_handler( bot.register_next_step_handler(
mes, functools.partial(_delete_all, bot, engine, bot_mes) mes, functools.partial(_delete_all, bot, engine, bot_mes)
@ -54,20 +69,20 @@ def delete_all(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None:
def _delete_all( def _delete_all(
bot: telebot.TeleBot, engine: Engine, prev_mes: Message, mes: Message bot: telebot.TeleBot, engine: Engine, prev_mes: Message, mes: Message
) -> None: ) -> None:
base_handler(bot, mes, prev_mes) _base_handler(bot, mes, prev_mes)
text = mes.text.strip() text = mes.text.strip()
if text == "YES": if text == "YES":
database.delete.purge_accounts(engine, mes.from_user.id) database.delete.purge_accounts(engine, mes.from_user.id)
database.delete.delete_master_pass(engine, mes.from_user.id) database.delete.delete_master_pass(engine, mes.from_user.id)
send_tmp_message(bot, mes.chat.id, "Всё успешно удалено", timeout=10) _send_tmp_message(bot, mes.chat.id, "Всё успешно удалено", timeout=10)
else: else:
send_tmp_message(bot, mes.chat.id, "Вы отправили не YES, ничего не удалено") _send_tmp_message(bot, mes.chat.id, "Вы отправили не YES, ничего не удалено")
def set_master_password(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None: def set_master_password(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None:
base_handler(bot, mes, None) _base_handler(bot, mes, None)
if database.get.get_master_pass(engine, mes.from_user.id) is not None: if database.get.get_master_pass(engine, mes.from_user.id) is not None:
return send_tmp_message(bot, mes.chat.id, "Мастер пароль уже существует") return _send_tmp_message(bot, mes.chat.id, "Мастер пароль уже существует")
bot_mes = bot.send_message(mes.chat.id, "Отправьте мастер пароль") bot_mes = bot.send_message(mes.chat.id, "Отправьте мастер пароль")
bot.register_next_step_handler( bot.register_next_step_handler(
mes, functools.partial(_set_master_pass2, bot, engine, bot_mes) mes, functools.partial(_set_master_pass2, bot, engine, bot_mes)
@ -77,25 +92,26 @@ def set_master_password(bot: telebot.TeleBot, engine: Engine, mes: Message) -> N
def _set_master_pass2( def _set_master_pass2(
bot: telebot.TeleBot, engine: Engine, prev_mes: Message, mes: Message bot: telebot.TeleBot, engine: Engine, prev_mes: Message, mes: Message
) -> None: ) -> None:
base_handler(bot, mes, prev_mes) _base_handler(bot, mes, prev_mes)
text = mes.text.strip() text = mes.text.strip()
if text == "/cancel": if text == "/cancel":
return send_tmp_message(bot, mes.chat.id, "Успешная отмена") return _send_tmp_message(bot, mes.chat.id, "Успешная отмена")
hash_pass, master_salt = cryptography.master_pass.encrypt_master_pass(text) hash_pass, master_salt = cryptography.master_pass.encrypt_master_pass(text)
database.add.add_master_pass(engine, mes.from_user.id, master_salt, hash_pass) database.add.add_master_pass(engine, mes.from_user.id, master_salt, hash_pass)
send_tmp_message(bot, mes.chat.id, "Успех") _send_tmp_message(bot, mes.chat.id, "Успех")
del mes, text del mes, text
gc.collect() gc.collect()
def reset_master_pass(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None: def reset_master_pass(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None:
base_handler(bot, mes) _base_handler(bot, mes)
if database.get.get_master_pass(engine, mes.from_user.id) is None: if database.get.get_master_pass(engine, mes.from_user.id) is None:
return send_tmp_message(bot, mes.chat.id, "Мастер пароль не задан") return _send_tmp_message(bot, mes.chat.id, "Мастер пароль не задан")
bot_mes = bot.send_message( bot_mes = bot.send_message(
mes.chat.id, mes.chat.id,
"Отправьте новый мастер пароль, осторожно, все текущие аккаунты будут удалены навсегда", "Отправьте новый мастер пароль, осторожно, все текущие аккаунты "
"будут удалены навсегда",
) )
bot.register_next_step_handler( bot.register_next_step_handler(
mes, functools.partial(_reset_master_pass2, bot, engine, bot_mes) mes, functools.partial(_reset_master_pass2, bot, engine, bot_mes)
@ -105,15 +121,15 @@ def reset_master_pass(bot: telebot.TeleBot, engine: Engine, mes: Message) -> Non
def _reset_master_pass2( def _reset_master_pass2(
bot: telebot.TeleBot, engine: Engine, prev_mes: Message, mes: Message bot: telebot.TeleBot, engine: Engine, prev_mes: Message, mes: Message
) -> None: ) -> None:
base_handler(bot, mes, prev_mes) _base_handler(bot, mes, prev_mes)
text = mes.text.strip() text = mes.text.strip()
if text == "/cancel": if text == "/cancel":
return send_tmp_message(bot, mes.chat.id, "Успешная отмена") return _send_tmp_message(bot, mes.chat.id, "Успешная отмена")
hash_, salt = cryptography.master_pass.encrypt_master_pass(text) hash_, salt = cryptography.master_pass.encrypt_master_pass(text)
database.delete.purge_accounts(engine, mes.from_user.id) database.delete.purge_accounts(engine, mes.from_user.id)
database.change.change_master_pass(engine, mes.from_user.id, salt, hash_) database.change.change_master_pass(engine, mes.from_user.id, salt, hash_)
send_tmp_message( _send_tmp_message(
bot, mes.chat.id, "Все ваши аккаунты удалены, а мастер пароль изменён" bot, mes.chat.id, "Все ваши аккаунты удалены, а мастер пароль изменён"
) )
del mes, text del mes, text
@ -121,11 +137,11 @@ def _reset_master_pass2(
def add_account(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None: def add_account(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None:
base_handler(bot, mes) _base_handler(bot, mes)
master_password_from_db = database.get.get_master_pass(engine, mes.from_user.id) master_password_from_db = database.get.get_master_pass(engine, mes.from_user.id)
if master_password_from_db is None: if master_password_from_db is None:
return send_tmp_message(bot, mes.chat.id, "Нет мастер пароля") return _send_tmp_message(bot, mes.chat.id, "Нет мастер пароля")
bot_mes = bot.send_message(mes.chat.id, "Отправьте название аккаунта") bot_mes = bot.send_message(mes.chat.id, "Отправьте название аккаунта")
@ -137,15 +153,15 @@ def add_account(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None:
def _add_account2( def _add_account2(
bot: telebot.TeleBot, engine: Engine, prev_mes: Message, mes: Message bot: telebot.TeleBot, engine: Engine, prev_mes: Message, mes: Message
) -> None: ) -> None:
base_handler(bot, mes, prev_mes) _base_handler(bot, mes, prev_mes)
text = mes.text.strip() text = mes.text.strip()
if text == "/cancel": if text == "/cancel":
return send_tmp_message(bot, mes.chat.id, "Успешная отмена") return _send_tmp_message(bot, mes.chat.id, "Успешная отмена")
if not check_account_name(text): if not check_account_name(text):
return send_tmp_message(bot, mes.chat.id, "Не корректное название аккаунта") return _send_tmp_message(bot, mes.chat.id, "Не корректное название аккаунта")
if text in database.get.get_accounts(engine, mes.from_user.id): if text in database.get.get_accounts(engine, mes.from_user.id):
return send_tmp_message( return _send_tmp_message(
bot, mes.chat.id, "Аккаунт с таким именем уже существует" bot, mes.chat.id, "Аккаунт с таким именем уже существует"
) )
@ -164,12 +180,12 @@ def _add_account3(
data: dict[str, str], data: dict[str, str],
mes: Message, mes: Message,
) -> None: ) -> None:
base_handler(bot, mes, prev_mes) _base_handler(bot, mes, prev_mes)
text = mes.text.strip() text = mes.text.strip()
if text == "/cancel": if text == "/cancel":
return send_tmp_message(bot, mes.chat.id, "Успешная отмена") return _send_tmp_message(bot, mes.chat.id, "Успешная отмена")
if not check_login(text): if not check_login(text):
return send_tmp_message(bot, mes.chat.id, "Не корректный логин") return _send_tmp_message(bot, mes.chat.id, "Не корректный логин")
data["login"] = text data["login"] = text
@ -187,12 +203,12 @@ def _add_account4(
data: dict[str, str], data: dict[str, str],
mes: Message, mes: Message,
) -> None: ) -> None:
base_handler(bot, mes, prev_mes) _base_handler(bot, mes, prev_mes)
text = mes.text.strip() text = mes.text.strip()
if text == "/cancel": if text == "/cancel":
return send_tmp_message(bot, mes.chat.id, "Успешная отмена") return _send_tmp_message(bot, mes.chat.id, "Успешная отмена")
if not check_passwd(text): if not check_password(text):
return send_tmp_message(bot, mes.chat.id, "Не корректный пароль") return _send_tmp_message(bot, mes.chat.id, "Не корректный пароль")
data["passwd"] = text data["passwd"] = text
@ -210,14 +226,14 @@ def _add_account5(
data: dict[str, str], data: dict[str, str],
mes: Message, mes: Message,
) -> None: ) -> None:
base_handler(bot, mes, prev_mes) _base_handler(bot, mes, prev_mes)
text = mes.text.strip() text = mes.text.strip()
if text == "/cancel": if text == "/cancel":
return send_tmp_message(bot, mes.chat.id, "Успешная отмена") return _send_tmp_message(bot, mes.chat.id, "Успешная отмена")
salt, hash_ = database.get.get_master_pass(engine, mes.from_user.id) salt, hash_ = database.get.get_master_pass(engine, mes.from_user.id)
if not cryptography.master_pass.check_master_pass(text, hash_, salt): if not cryptography.master_pass.check_master_pass(text, hash_, salt):
return send_tmp_message(bot, mes.chat.id, "Не подходит главный пароль") return _send_tmp_message(bot, mes.chat.id, "Не подходит главный пароль")
name, login, passwd = data["name"], data["login"], data["passwd"] name, login, passwd = data["name"], data["login"], data["passwd"]
@ -229,7 +245,7 @@ def _add_account5(
engine, mes.from_user.id, name, salt, enc_login, enc_pass engine, mes.from_user.id, name, salt, enc_login, enc_pass
) )
send_tmp_message( _send_tmp_message(
bot, mes.chat.id, "Успех" if result else "Произошла не предвиденная ошибка" bot, mes.chat.id, "Успех" if result else "Произошла не предвиденная ошибка"
) )
@ -239,12 +255,12 @@ def _add_account5(
def get_account(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None: def get_account(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None:
base_handler(bot, mes) _base_handler(bot, mes)
bot_mes = bot.send_message(mes.chat.id, "Отправьте название аккаунта") bot_mes = bot.send_message(mes.chat.id, "Отправьте название аккаунта")
master_pass = database.get.get_master_pass(engine, mes.from_user.id) master_pass = database.get.get_master_pass(engine, mes.from_user.id)
if master_pass is None: if master_pass is None:
return send_tmp_message(bot, mes.chat.id, "Нет мастер пароля") return _send_tmp_message(bot, mes.chat.id, "Нет мастер пароля")
bot.register_next_step_handler( bot.register_next_step_handler(
mes, functools.partial(_get_account2, bot, engine, bot_mes) mes, functools.partial(_get_account2, bot, engine, bot_mes)
@ -254,13 +270,13 @@ def get_account(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None:
def _get_account2( def _get_account2(
bot: telebot.TeleBot, engine: Engine, prev_mes: Message, mes: Message bot: telebot.TeleBot, engine: Engine, prev_mes: Message, mes: Message
) -> None: ) -> None:
base_handler(bot, mes, prev_mes) _base_handler(bot, mes, prev_mes)
text = mes.text.strip() text = mes.text.strip()
if text == "/cancel": if text == "/cancel":
return send_tmp_message(bot, mes.chat.id, "Успешная отмена") return _send_tmp_message(bot, mes.chat.id, "Успешная отмена")
if text not in database.get.get_accounts(engine, mes.from_user.id): if text not in database.get.get_accounts(engine, mes.from_user.id):
return send_tmp_message(bot, mes.chat.id, "Нет такого аккаунта") return _send_tmp_message(bot, mes.chat.id, "Нет такого аккаунта")
bot_mes = bot.send_message(mes.chat.id, "Отправьте мастер пароль") bot_mes = bot.send_message(mes.chat.id, "Отправьте мастер пароль")
bot.register_next_step_handler( bot.register_next_step_handler(
@ -271,15 +287,15 @@ def _get_account2(
def _get_account3( def _get_account3(
bot: telebot.TeleBot, engine: Engine, prev_mes: Message, name: str, mes: Message bot: telebot.TeleBot, engine: Engine, prev_mes: Message, name: str, mes: Message
) -> None: ) -> None:
base_handler(bot, mes, prev_mes) _base_handler(bot, mes, prev_mes)
text = mes.text.strip() text = mes.text.strip()
if text == "/cancel": if text == "/cancel":
return send_tmp_message(bot, mes.chat.id, "Успешная отмена") return _send_tmp_message(bot, mes.chat.id, "Успешная отмена")
master_salt, hash_pass = database.get.get_master_pass(engine, mes.from_user.id) master_salt, hash_pass = database.get.get_master_pass(engine, mes.from_user.id)
if not cryptography.master_pass.check_master_pass(text, hash_pass, master_salt): if not cryptography.master_pass.check_master_pass(text, hash_pass, master_salt):
return send_tmp_message(bot, mes.chat.id, "Не подходит мастер пароль") return _send_tmp_message(bot, mes.chat.id, "Не подходит мастер пароль")
salt, enc_login, enc_pass = database.get.get_account_info( salt, enc_login, enc_pass = database.get.get_account_info(
engine, mes.from_user.id, name engine, mes.from_user.id, name
@ -287,10 +303,11 @@ def _get_account3(
login, passwd = cryptography.other_accounts.decrypt_account_info( login, passwd = cryptography.other_accounts.decrypt_account_info(
enc_login, enc_pass, text, salt enc_login, enc_pass, text, salt
) )
send_tmp_message( _send_tmp_message(
bot, bot,
mes.chat.id, mes.chat.id,
f"Логин:\n`{login}`\nПароль:\n`{passwd}`\nНажмите на логин или пароль, чтобы скопировать", f"Логин:\n`{login}`\nПароль:\n`{passwd}`\nНажмите на логин или пароль, "
"чтобы скопировать",
30, 30,
) )
@ -299,11 +316,11 @@ def _get_account3(
def delete_account(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None: def delete_account(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None:
base_handler(bot, mes) _base_handler(bot, mes)
master_pass = database.get.get_master_pass(engine, mes.from_user.id) master_pass = database.get.get_master_pass(engine, mes.from_user.id)
if master_pass is None: if master_pass is None:
return send_tmp_message(bot, mes.chat.id, "Нет мастер пароля") return _send_tmp_message(bot, mes.chat.id, "Нет мастер пароля")
bot_mes = bot.send_message( bot_mes = bot.send_message(
mes.chat.id, "Отправьте название аккаунта, который вы хотите удалить" mes.chat.id, "Отправьте название аккаунта, который вы хотите удалить"
@ -317,16 +334,16 @@ def delete_account(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None:
def _delete_account2( def _delete_account2(
bot: telebot.TeleBot, engine: Engine, prev_mes: Message, mes: Message bot: telebot.TeleBot, engine: Engine, prev_mes: Message, mes: Message
) -> None: ) -> None:
base_handler(bot, mes, prev_mes) _base_handler(bot, mes, prev_mes)
text = mes.text.strip() text = mes.text.strip()
if text == "/cancel": if text == "/cancel":
return send_tmp_message(bot, mes.chat.id, "Успешная отмена") return _send_tmp_message(bot, mes.chat.id, "Успешная отмена")
if text not in database.get.get_accounts(engine, mes.from_user.id): if text not in database.get.get_accounts(engine, mes.from_user.id):
return send_tmp_message(bot, mes.chat.id, "Нет такого аккаунта") return _send_tmp_message(bot, mes.chat.id, "Нет такого аккаунта")
database.delete.delete_account(engine, mes.from_user.id, text) database.delete.delete_account(engine, mes.from_user.id, text)
send_tmp_message(bot, mes.chat.id, "Аккаунт удалён") _send_tmp_message(bot, mes.chat.id, "Аккаунт удалён")
def help(bot: telebot.TeleBot, mes: Message) -> None: def help(bot: telebot.TeleBot, mes: Message) -> None:
@ -341,24 +358,26 @@ def help(bot: telebot.TeleBot, mes: Message) -> None:
/cancel - отмена текущего действия /cancel - отмена текущего действия
/help - помощь /help - помощь
/export - получить пароли в json формате /export - получить пароли в json формате
/import - импортировать пароли из json в файле в таком же формате, как из /export /import - импортировать пароли из json в файле в таком же формате, \
как из /export
/gen_password - создать 10 надёжных паролей""" /gen_password - создать 10 надёжных паролей"""
bot.send_message(mes.chat.id, message) bot.send_message(mes.chat.id, message)
def cancel(bot: telebot.TeleBot, mes: Message) -> None: def cancel(bot: telebot.TeleBot, mes: Message) -> None:
send_tmp_message(bot, mes.chat.id, "Нет активного действия") _base_handler(bot, mes)
_send_tmp_message(bot, mes.chat.id, "Нет активного действия")
def export(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None: def export(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None:
base_handler(bot, mes) _base_handler(bot, mes)
master_password_from_db = database.get.get_master_pass(engine, mes.from_user.id) master_password_from_db = database.get.get_master_pass(engine, mes.from_user.id)
if master_password_from_db is None: if master_password_from_db is None:
return send_tmp_message(bot, mes.chat.id, "Нет мастер пароля") return _send_tmp_message(bot, mes.chat.id, "Нет мастер пароля")
if not database.get.get_accounts(engine, mes.from_user.id): if not database.get.get_accounts(engine, mes.from_user.id):
return send_tmp_message(bot, mes.chat.id, "Нет аккаунтов") return _send_tmp_message(bot, mes.chat.id, "Нет аккаунтов")
bot_mes = bot.send_message(mes.chat.id, "Отправьте мастер пароль") bot_mes = bot.send_message(mes.chat.id, "Отправьте мастер пароль")
@ -370,16 +389,17 @@ def export(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None:
def _export2( def _export2(
bot: telebot.TeleBot, engine: Engine, prev_mes: Message, mes: Message bot: telebot.TeleBot, engine: Engine, prev_mes: Message, mes: Message
) -> None: ) -> None:
base_handler(bot, mes, prev_mes) _base_handler(bot, mes, prev_mes)
text = mes.text.strip() text = mes.text.strip()
if text == "/cancel": if text == "/cancel":
return send_tmp_message(bot, mes.chat.id, "Успешная отмена") return _send_tmp_message(bot, mes.chat.id, "Успешная отмена")
master_salt, hash_pass = database.get.get_master_pass(engine, mes.from_user.id) master_salt, hash_pass = database.get.get_master_pass(engine, mes.from_user.id)
if not cryptography.master_pass.check_master_pass(text, hash_pass, master_salt): if not cryptography.master_pass.check_master_pass(text, hash_pass, master_salt):
return send_tmp_message(bot, mes.chat.id, "Не подходит мастер пароль") return _send_tmp_message(bot, mes.chat.id, "Не подходит мастер пароль")
accounts = get_all_accounts(engine, mes.from_user.id, text) accounts = database.get.get_all_accounts(engine, mes.from_user.id)
accounts = cryptography.other_accounts.decrypt_multiple(accounts, text)
json_io = accounts_to_json(accounts) json_io = accounts_to_json(accounts)
bot_mes = bot.send_document(mes.chat.id, json_io) bot_mes = bot.send_document(mes.chat.id, json_io)
@ -390,11 +410,11 @@ def _export2(
def import_accounts(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None: def import_accounts(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None:
base_handler(bot, mes) _base_handler(bot, mes)
master_password_from_db = database.get.get_master_pass(engine, mes.from_user.id) master_password_from_db = database.get.get_master_pass(engine, mes.from_user.id)
if master_password_from_db is None: if master_password_from_db is None:
return send_tmp_message(bot, mes.chat.id, "Нет мастер пароля") return _send_tmp_message(bot, mes.chat.id, "Нет мастер пароля")
bot_mes = bot.send_message(mes.chat.id, "Отправьте json файл") bot_mes = bot.send_message(mes.chat.id, "Отправьте json файл")
@ -406,23 +426,23 @@ def import_accounts(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None:
def _import2( def _import2(
bot: telebot.TeleBot, engine: Engine, prev_mes: Message, mes: Message bot: telebot.TeleBot, engine: Engine, prev_mes: Message, mes: Message
) -> None: ) -> None:
base_handler(bot, mes, prev_mes) _base_handler(bot, mes, prev_mes)
if mes.text is not None: if mes.text is not None:
text = mes.text.strip() text = mes.text.strip()
if text == "/cancel": if text == "/cancel":
return send_tmp_message(bot, mes.chat.id, "Успешная отмена") return _send_tmp_message(bot, mes.chat.id, "Успешная отмена")
if mes.document is None: if mes.document is None:
return send_tmp_message(bot, mes.chat.id, "Вы должны отправить документ") return _send_tmp_message(bot, mes.chat.id, "Вы должны отправить документ")
if mes.document.file_size > 102_400: # If file size is bigger that 100 MB if mes.document.file_size > 102_400: # If file size is bigger that 100 MB
return send_tmp_message(bot, mes.chat.id, "Файл слишком большой") return _send_tmp_message(bot, mes.chat.id, "Файл слишком большой")
file_info = bot.get_file(mes.document.file_id) file_info = bot.get_file(mes.document.file_id)
downloaded_file = bot.download_file(file_info.file_path) downloaded_file = bot.download_file(file_info.file_path)
try: try:
accounts = json_to_accounts(downloaded_file.decode("utf-8")) accounts = json_to_accounts(downloaded_file.decode("utf-8"))
except Exception: except Exception:
return send_tmp_message(bot, mes.chat.id, "Ошибка во время работы с файлом") return _send_tmp_message(bot, mes.chat.id, "Ошибка во время работы с файлом")
bot_mes = bot.send_message(mes.chat.id, "Отправьте мастер пароль") bot_mes = bot.send_message(mes.chat.id, "Отправьте мастер пароль")
bot.register_next_step_handler( bot.register_next_step_handler(
@ -437,14 +457,14 @@ def _import3(
accounts: list[tuple[str, str, str]], accounts: list[tuple[str, str, str]],
mes: Message, mes: Message,
) -> None: ) -> None:
base_handler(bot, mes, prev_mes) _base_handler(bot, mes, prev_mes)
text = mes.text.strip() text = mes.text.strip()
if text == "/cancel": if text == "/cancel":
return send_tmp_message(bot, mes.chat.id, "Успешная отмена") return _send_tmp_message(bot, mes.chat.id, "Успешная отмена")
master_salt, hash_pass = database.get.get_master_pass(engine, mes.from_user.id) master_salt, hash_pass = database.get.get_master_pass(engine, mes.from_user.id)
if not cryptography.master_pass.check_master_pass(text, hash_pass, master_salt): if not cryptography.master_pass.check_master_pass(text, hash_pass, master_salt):
return send_tmp_message(bot, mes.chat.id, "Не подходит мастер пароль") return _send_tmp_message(bot, mes.chat.id, "Не подходит мастер пароль")
# List of names of accounts, which failed to be added to the database or failed tests # List of names of accounts, which failed to be added to the database or failed tests
failed: list[str] = [] failed: list[str] = []
@ -466,18 +486,18 @@ def _import3(
mes_text = "Не удалось добавить:\n" + "\n".join(failed) mes_text = "Не удалось добавить:\n" + "\n".join(failed)
else: else:
mes_text = "Успех" mes_text = "Успех"
send_tmp_message(bot, mes.chat.id, mes_text, 10) _send_tmp_message(bot, mes.chat.id, mes_text, 10)
del text, mes, accounts del text, mes, accounts
gc.collect() gc.collect()
def gen_password(bot: telebot.TeleBot, mes: Message) -> None: def gen_password(bot: telebot.TeleBot, mes: Message) -> None:
_base_handler(bot, mes)
# Generate 10 passwords and put 'em in the backticks # Generate 10 passwords and put 'em in the backticks
base_handler(bot, mes) passwords = (f"`{generate_password.gen_password()}`" for _ in range(10))
passwords = (f"`{gen_passwd()}`" for _ in range(10))
text = ( text = (
"Пароли:\n" "Пароли:\n"
+ "\n".join(passwords) + "\n".join(passwords)
+ "\nНажмите на пароль, чтобы его скопировать" + "\nНажмите на пароль, чтобы его скопировать"
) )
send_tmp_message(bot, mes.chat.id, text, 15) _send_tmp_message(bot, mes.chat.id, text, 15)

View File

@ -1,131 +0,0 @@
import io
import string
import time
from random import SystemRandom
from typing import Self, Type
import pydantic
import telebot
from sqlalchemy.future import Engine
from .. import cryptography, database
FORBIDDEN_CHARS = frozenset("`\n")
PUNCTUATION = frozenset(string.punctuation).difference(FORBIDDEN_CHARS)
PASSWORD_CHARS = tuple(
frozenset(string.ascii_letters + string.digits).difference(FORBIDDEN_CHARS)
| PUNCTUATION
)
Message = telebot.types.Message
class _Account(pydantic.BaseModel):
name: str
login: str
passwd: str
@classmethod
def from_tuple(cls: Type[Self], tuple_: tuple[str, str, str]) -> Self:
return cls(name=tuple_[0], login=tuple_[1], passwd=tuple_[2])
def as_tuple(self: Self) -> tuple[str, str, str]:
return (self.name, self.login, self.passwd)
class _Accounts(pydantic.BaseModel):
accounts: list[_Account] = pydantic.Field(default_factory=list)
def _accounts_list_to_json(accounts: list[tuple[str, str, str]]) -> str:
accounts = _Accounts(accounts=[_Account.from_tuple(i) for i in accounts])
return accounts.json()
def json_to_accounts(json_: str) -> list[tuple[str, str, str]]:
accounts = _Accounts.parse_raw(json_)
return [i.as_tuple() for i in accounts.accounts]
def send_tmp_message(
bot: telebot.TeleBot, chat_id: telebot.types.Message, text: str, timeout: int = 5
) -> None:
bot_mes = bot.send_message(chat_id, text, parse_mode="MarkdownV2")
time.sleep(timeout)
bot.delete_message(chat_id, bot_mes.id)
def base_handler(
bot: telebot.TeleBot, mes: Message, prev_mes: Message | None = None
) -> None:
bot.delete_message(mes.chat.id, mes.id)
if prev_mes is not None:
bot.delete_message(prev_mes.chat.id, prev_mes.id)
def get_all_accounts(
engine: Engine, user_id: int, master_pass: str
) -> list[tuple[str, str, str]]:
accounts: list[tuple[str, str, str]] = []
for account_name in database.get.get_accounts(engine, user_id):
salt, enc_login, enc_passwd = database.get.get_account_info(
engine, user_id, account_name
)
login, passwd = cryptography.other_accounts.decrypt_account_info(
enc_login, enc_passwd, master_pass, salt
)
accounts.append((account_name, login, passwd))
return accounts
def accounts_to_json(accounts: list[tuple[str, str, str]]) -> io.StringIO:
file = io.StringIO(_accounts_list_to_json(accounts))
file.name = "passwords.json"
return file
def _base_check(val: str, /) -> bool:
"Returns false if finds new lines or backtick (`)"
return not any(i in FORBIDDEN_CHARS for i in val)
def check_account_name(name: str) -> bool:
"Returns true if account name is valid"
return _base_check(name)
def check_login(login: str) -> bool:
"Returns true if login is valid"
return _base_check(login)
def check_passwd(passwd: str) -> bool:
"Returns true if password is valid"
return _base_check(passwd)
def check_account(name: str, login: str, passwd: str) -> bool:
"""Runs checks for account name, login and password"""
return check_account_name(name) and check_login(login) and check_passwd(passwd)
def _check_gened_password(passwd: str, /) -> bool:
"""Retuns true if generated password is valid,
false otherwise.
Password is valid if there is at least one lowercase character,
uppercase character and one punctuation character"""
return (
any(c.islower() for c in passwd)
and any(c.isupper() for c in passwd)
and any(c.isdigit() for c in passwd)
and any(c in PUNCTUATION for c in passwd)
)
def gen_passwd() -> str:
"""Generates password of length 32"""
choices = SystemRandom().choices
while True:
passwd = "".join(choices(PASSWORD_CHARS, k=32))
if _check_gened_password(passwd):
return passwd

View File

@ -17,18 +17,22 @@ def _get_kdf(salt: bytes) -> Scrypt:
return kdf return kdf
def encrypt_master_pass(passwd: str) -> tuple[bytes, bytes]: def encrypt_master_pass(password: str) -> tuple[bytes, bytes]:
"""Hashes master password and return tuple of hashed password and salt""" """Hashes master password and return tuple of hashed password and salt"""
salt = os.urandom(64) salt = os.urandom(64)
kdf = _get_kdf(salt) kdf = _get_kdf(salt)
return kdf.derive(passwd.encode("utf-8")), salt return kdf.derive(password.encode("utf-8")), salt
def check_master_pass(passwd: str, enc_pass: bytes, salt: bytes) -> bool: def check_master_pass(
password: str,
enc_password: bytes,
salt: bytes,
) -> bool:
"""Checks if the master password is correct""" """Checks if the master password is correct"""
kdf = _get_kdf(salt) kdf = _get_kdf(salt)
try: try:
kdf.verify(passwd.encode("utf-8"), enc_pass) kdf.verify(password.encode("utf-8"), enc_password)
except InvalidKey: except InvalidKey:
return False return False
else: else:

View File

@ -1,5 +1,6 @@
import base64 import base64
import os import os
from typing import Iterator
from cryptography.fernet import Fernet from cryptography.fernet import Fernet
from cryptography.hazmat.backends import default_backend from cryptography.hazmat.backends import default_backend
@ -22,23 +23,40 @@ def _generate_key(salt: bytes, master_pass: bytes) -> bytes:
def encrypt_account_info( def encrypt_account_info(
login: str, passwd: str, master_pass: str login: str, passwd: str, master_pass: str
) -> tuple[bytes, bytes, bytes]: ) -> tuple[bytes, bytes, bytes]:
"""Encrypts login and password of a user using their master password as a key. """Encrypts login and password of a user using their master
password as a key.
Returns a tuple of encrypted login, password and salt""" Returns a tuple of encrypted login, password and salt"""
salt = os.urandom(64) salt = os.urandom(64)
key = _generate_key(salt, master_pass.encode("utf-8")) key = _generate_key(salt, master_pass.encode("utf-8"))
f = Fernet(key) f = Fernet(key)
enc_login = base64.urlsafe_b64decode(f.encrypt(login.encode("utf-8"))) enc_login = base64.urlsafe_b64decode(f.encrypt(login.encode("utf-8")))
enc_passwd = base64.urlsafe_b64decode(f.encrypt(passwd.encode("utf-8"))) enc_password = base64.urlsafe_b64decode(f.encrypt(passwd.encode("utf-8")))
return (enc_login, enc_passwd, salt) return (enc_login, enc_password, salt)
def decrypt_account_info( def decrypt_account_info(
enc_login: bytes, enc_pass: bytes, master_pass: str, salt: bytes enc_login: bytes,
enc_pass: bytes,
master_pass: str,
salt: bytes,
) -> tuple[str, str]: ) -> tuple[str, str]:
"""Decrypts login and password using their master password as a key. """Decrypts login and password using their
master password as a key.
Returns a tuple of decrypted login and password""" Returns a tuple of decrypted login and password"""
key = _generate_key(salt, master_pass.encode("utf-8")) key = _generate_key(salt, master_pass.encode("utf-8"))
f = Fernet(key) f = Fernet(key)
login_bytes = f.decrypt(base64.urlsafe_b64encode(enc_login)).decode("utf-8") login = f.decrypt(base64.urlsafe_b64encode(enc_login)).decode("utf-8")
pass_bytes = f.decrypt(base64.urlsafe_b64encode(enc_pass)).decode("utf-8") password = f.decrypt(base64.urlsafe_b64encode(enc_pass)).decode("utf-8")
return (login_bytes, pass_bytes) return (login, password)
def decrypt_multiple(
accounts: Iterator[tuple[str, bytes, bytes, bytes]], master_pass: str
) -> Iterator[tuple[str, str, str]]:
"""Gets an iterator of tuples, where values represent account's name, salt,
encrypted login and encrypted password.
Return an iterator of names, logins and passwords as a tuple"""
for account in accounts:
name, salt, enc_login, enc_passwd = account
login, passwd = decrypt_account_info(enc_login, enc_passwd, master_pass, salt)
yield (name, login, passwd)

View File

@ -11,11 +11,15 @@ def add_account(
name: str, name: str,
salt: bytes, salt: bytes,
enc_login: bytes, enc_login: bytes,
enc_pass: bytes, enc_password: bytes,
) -> bool: ) -> bool:
"""Adds account to the database. Returns true on success, false otherwise""" """Adds account to the database. Returns true on success, false otherwise"""
account = models.Account( account = models.Account(
user_id=user_id, name=name, salt=salt, enc_login=enc_login, enc_pass=enc_pass user_id=user_id,
name=name,
salt=salt,
enc_login=enc_login,
enc_password=enc_password,
) )
try: try:
with sqlmodel.Session(engine) as session: with sqlmodel.Session(engine) as session:
@ -27,9 +31,18 @@ def add_account(
return True return True
def add_master_pass(engine: Engine, user_id: int, salt: bytes, passwd: bytes) -> bool: def add_master_pass(
engine: Engine,
user_id: int,
salt: bytes,
password_hash: bytes,
) -> bool:
"""Adds master password the database. Returns true on success, false otherwise""" """Adds master password the database. Returns true on success, false otherwise"""
master_pass = models.MasterPass(user_id=user_id, salt=salt, passwd=passwd) master_pass = models.MasterPass(
user_id=user_id,
salt=salt,
password_hash=password_hash,
)
try: try:
with sqlmodel.Session(engine) as session: with sqlmodel.Session(engine) as session:
session.add(master_pass) session.add(master_pass)

View File

@ -5,13 +5,13 @@ from . import models
def change_master_pass( def change_master_pass(
engine: Engine, user_id: int, salt: bytes, passwd: bytes engine: Engine, user_id: int, salt: bytes, password: bytes
) -> None: ) -> None:
"""Changes master password and salt in the database""" """Changes master password and salt in the database"""
statement = ( statement = (
sqlmodel.update(models.MasterPass) sqlmodel.update(models.MasterPass)
.where(models.MasterPass.user_id == user_id) .where(models.MasterPass.user_id == user_id)
.values(salt=salt, passwd=passwd) .values(salt=salt, passwd=password)
) )
with sqlmodel.Session(engine) as session: with sqlmodel.Session(engine) as session:
session.exec(statement) session.exec(statement)

View File

@ -6,7 +6,9 @@ from . import models
def purge_accounts(engine: Engine, user_id: int) -> None: def purge_accounts(engine: Engine, user_id: int) -> None:
"""Deletes all user's accounts""" """Deletes all user's accounts"""
statement = sqlmodel.delete(models.Account).where(models.Account.user_id == user_id) statement = sqlmodel.delete(models.Account).where(
models.Account.user_id == user_id,
)
with sqlmodel.Session(engine) as session: with sqlmodel.Session(engine) as session:
session.exec(statement) session.exec(statement)
session.commit() session.commit()
@ -25,7 +27,8 @@ def delete_master_pass(engine: Engine, user_id: int) -> None:
def delete_account(engine: Engine, user_id: int, name: str) -> None: def delete_account(engine: Engine, user_id: int, name: str) -> None:
"""Deletes specific user account""" """Deletes specific user account"""
statement = sqlmodel.delete(models.Account).where( statement = sqlmodel.delete(models.Account).where(
models.Account.user_id == user_id, models.Account.name == name models.Account.user_id == user_id,
models.Account.name == name,
) )
with sqlmodel.Session(engine) as session: with sqlmodel.Session(engine) as session:
session.exec(statement) session.exec(statement)

View File

@ -1,40 +1,64 @@
from typing import Iterator
import sqlmodel import sqlmodel
from sqlalchemy.future import Engine from sqlalchemy.future import Engine
from . import models from . import models
def get_master_pass(engine: Engine, user_id: int) -> tuple[bytes, bytes] | None: def get_master_pass(
engine: Engine,
user_id: int,
) -> tuple[bytes, bytes] | None:
"""Gets master pass. Returns tuple of salt and password """Gets master pass. Returns tuple of salt and password
or None if it wasn't found""" or None if it wasn't found"""
statement = sqlmodel.select(models.MasterPass).where( statement = sqlmodel.select(models.MasterPass).where(
models.MasterPass.user_id == user_id models.MasterPass.user_id == user_id,
) )
with sqlmodel.Session(engine) as session: with sqlmodel.Session(engine) as session:
result = session.exec(statement).first() result = session.exec(statement).first()
if result is None: if result is None:
return return
return (result.salt, result.passwd) return (result.salt, result.password_hash)
def get_accounts(engine: Engine, user_id: int) -> list[str]: def get_accounts(engine: Engine, user_id: int) -> list[str]:
"""Gets list of account names""" """Gets list of account names"""
statement = sqlmodel.select(models.Account).where(models.Account.user_id == user_id) statement = sqlmodel.select(models.Account).where(
models.Account.user_id == user_id,
)
with sqlmodel.Session(engine) as session: with sqlmodel.Session(engine) as session:
result = session.exec(statement) result = session.exec(statement)
return [account.name for account in result] return [account.name for account in result]
def get_all_accounts(
engine: Engine, user_id: int
) -> Iterator[tuple[str, bytes, bytes, bytes]]:
"""Returns an iterator of tuples, where values represent account's name, salt,
encrypted login and encrypted password"""
statement = sqlmodel.select(models.Account).where(
models.Account.user_id == user_id,
)
with sqlmodel.Session(engine) as session:
result = session.exec(statement)
yield from (
(account.name, account.salt, account.enc_login, account.enc_password)
for account in result
)
def get_account_info( def get_account_info(
engine: Engine, user_id: int, name: str engine: Engine, user_id: int, name: str
) -> tuple[bytes, bytes, bytes]: ) -> tuple[bytes, bytes, bytes]:
"""Gets account info. Returns tuple of salt, login and password """Gets account info. Returns tuple of salt, login and password
or None if it wasn't found""" or None if it wasn't found"""
statement = sqlmodel.select(models.Account).where( statement = sqlmodel.select(models.Account).where(
models.Account.user_id == user_id, models.Account.name == name models.Account.user_id == user_id,
models.Account.name == name,
) )
with sqlmodel.Session(engine) as session: with sqlmodel.Session(engine) as session:
result = session.exec(statement).first() result = session.exec(statement).first()
if result is None: if result is None:
return return
return (result.salt, result.enc_login, result.enc_pass) return (result.salt, result.enc_login, result.enc_password)

View File

@ -10,7 +10,7 @@ class MasterPass(sqlmodel.SQLModel, table=True):
salt: bytes = sqlmodel.Field( salt: bytes = sqlmodel.Field(
sa_column=sqlmodel.Column(sqlmodel.BINARY(64), nullable=False) sa_column=sqlmodel.Column(sqlmodel.BINARY(64), nullable=False)
) )
passwd: bytes = sqlmodel.Field( password_hash: bytes = sqlmodel.Field(
sa_column=sqlmodel.Column(sqlmodel.BINARY(128), nullable=False) sa_column=sqlmodel.Column(sqlmodel.BINARY(128), nullable=False)
) )
@ -27,6 +27,6 @@ class Account(sqlmodel.SQLModel, table=True):
enc_login: bytes = sqlmodel.Field( enc_login: bytes = sqlmodel.Field(
sa_column=sqlmodel.Column(sqlmodel.VARBINARY(256), nullable=False) sa_column=sqlmodel.Column(sqlmodel.VARBINARY(256), nullable=False)
) )
enc_pass: bytes = sqlmodel.Field( enc_password: bytes = sqlmodel.Field(
sa_column=sqlmodel.Column(sqlmodel.VARBINARY(256), nullable=False) sa_column=sqlmodel.Column(sqlmodel.VARBINARY(256), nullable=False)
) )

View File

@ -6,7 +6,8 @@ from . import models
def get_engine(host: str, user: str, passwd: str, db: str) -> Engine: def get_engine(host: str, user: str, passwd: str, db: str) -> Engine:
"""Creates an engine for mariadb with pymysql as connector""" """Creates an engine for mariadb with pymysql as connector"""
engine = sqlmodel.create_engine(f"mariadb+pymysql://{user}:{passwd}@{host}/{db}") uri = f"mariadb+pymysql://{user}:{passwd}@{host}/{db}"
engine = sqlmodel.create_engine(uri)
return engine return engine

18
src/generate_password.py Normal file
View File

@ -0,0 +1,18 @@
import string
from random import SystemRandom
from .account_checks import FORBIDDEN_CHARS, PUNCTUATION, check_gened_password
PASSWORD_CHARS = tuple(
frozenset(string.ascii_letters + string.digits).difference(FORBIDDEN_CHARS)
| PUNCTUATION
)
def gen_password() -> str:
"""Generates password of length 32"""
choices = SystemRandom().choices
while True:
passwd = "".join(choices(PASSWORD_CHARS, k=32))
if check_gened_password(passwd):
return passwd