Compare commits

..

No commits in common. "b0599c14841fd881379cfd0a5d10f87a241bf4b7" and "0d3965d5d209a21c40e7b626e5c4dd31608f754e" have entirely different histories.

15 changed files with 60 additions and 181 deletions

View File

@ -25,4 +25,3 @@
**/values.dev.yaml **/values.dev.yaml
README.md README.md
data/ data/
.flake8

View File

@ -1,2 +0,0 @@
[flake8]
exclude=.git,__pycache__,venv

View File

@ -1,2 +1 @@
black black
flake8

View File

@ -6,7 +6,7 @@ from . import (
account_checks, account_checks,
account_parsing, account_parsing,
bot, bot,
encryption, cryptography,
database, database,
generate_password, generate_password,
) )
@ -15,7 +15,7 @@ __all__ = [
"account_checks", "account_checks",
"account_parsing", "account_parsing",
"bot", "bot",
"encryption", "cryptography",
"database", "database",
"generate_password", "generate_password",
] ]

View File

@ -26,13 +26,7 @@ def check_password(passwd: str) -> bool:
def check_account(name: str, login: str, passwd: str) -> bool: def check_account(name: str, login: str, passwd: str) -> bool:
"""Runs checks for account name, login and password""" """Runs checks for account name, login and password"""
return all( return check_account_name(name) and check_login(login) and check_password(passwd)
(
check_account_name(name),
check_login(login),
check_password(passwd),
)
)
def check_gened_password(passwd: str, /) -> bool: def check_gened_password(passwd: str, /) -> bool:

View File

@ -11,7 +11,7 @@ class _Account(pydantic.BaseModel):
@classmethod @classmethod
def from_tuple(cls: Type[Self], tuple_: tuple[str, str, str]) -> Self: def from_tuple(cls: Type[Self], tuple_: tuple[str, str, str]) -> Self:
return cls(name=tuple_[0], login=tuple_[1], password=tuple_[2]) return cls(name=tuple_[0], login=tuple_[1], passwd=tuple_[2])
def as_tuple(self: Self) -> tuple[str, str, str]: def as_tuple(self: Self) -> tuple[str, str, str]:
return (self.name, self.login, self.password) return (self.name, self.login, self.password)

View File

@ -15,20 +15,16 @@ def create_bot(token: str, engine: Engine) -> telebot.TeleBot:
commands=["set_master_pass"], commands=["set_master_pass"],
) )
bot.register_message_handler( bot.register_message_handler(
functools.partial(handlers.get_account, bot, engine), functools.partial(handlers.get_account, bot, engine), commands=["get_account"]
commands=["get_account"],
) )
bot.register_message_handler( bot.register_message_handler(
functools.partial(handlers.get_accounts, bot, engine), functools.partial(handlers.get_accounts, bot, engine), commands=["get_accounts"]
commands=["get_accounts"],
) )
bot.register_message_handler( bot.register_message_handler(
functools.partial(handlers.add_account, bot, engine), functools.partial(handlers.add_account, bot, engine), commands=["add_account"]
commands=["add_account"],
) )
bot.register_message_handler( bot.register_message_handler(
functools.partial(handlers.delete_all, bot, engine), functools.partial(handlers.delete_all, bot, engine), commands=["delete_all"]
commands=["delete_all"],
) )
bot.register_message_handler( bot.register_message_handler(
functools.partial(handlers.reset_master_pass, bot, engine), functools.partial(handlers.reset_master_pass, bot, engine),
@ -39,8 +35,7 @@ def create_bot(token: str, engine: Engine) -> telebot.TeleBot:
commands=["delete_account"], commands=["delete_account"],
) )
bot.register_message_handler( bot.register_message_handler(
functools.partial(handlers.help_command, bot), functools.partial(handlers.help, bot), commands=["help", "start"]
commands=["help", "start"],
) )
bot.register_message_handler( bot.register_message_handler(
functools.partial(handlers.cancel, bot), commands=["cancel"] functools.partial(handlers.cancel, bot), commands=["cancel"]
@ -49,11 +44,9 @@ def create_bot(token: str, engine: Engine) -> telebot.TeleBot:
functools.partial(handlers.export, bot, engine), commands=["export"] functools.partial(handlers.export, bot, engine), commands=["export"]
) )
bot.register_message_handler( bot.register_message_handler(
functools.partial(handlers.import_accounts, bot, engine), functools.partial(handlers.import_accounts, bot, engine), commands=["import"]
commands=["import"],
) )
bot.register_message_handler( bot.register_message_handler(
functools.partial(handlers.gen_password, bot), functools.partial(handlers.gen_password, bot), commands=["gen_password"]
commands=["gen_password"],
) )
return bot return bot

View File

@ -5,7 +5,7 @@ import time
import telebot import telebot
from sqlalchemy.future import Engine from sqlalchemy.future import Engine
from .. import encryption, database, generate_password from .. import cryptography, database, generate_password
from ..account_checks import ( from ..account_checks import (
check_account, check_account,
check_account_name, check_account_name,
@ -58,8 +58,7 @@ def delete_all(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None:
_base_handler(bot, mes) _base_handler(bot, mes)
bot_mes = bot.send_message( bot_mes = bot.send_message(
mes.chat.id, mes.chat.id,
"Вы действительно хотите удалить все ваши аккаунты? Это действие " "Вы действительно хотите удалить все ваши аккаунты? Это действие нельзя отменить. "
"нельзя отменить. "
"Отправьте YES для подтверждения", "Отправьте YES для подтверждения",
) )
bot.register_next_step_handler( bot.register_next_step_handler(
@ -77,25 +76,13 @@ def _delete_all(
database.delete.delete_master_pass(engine, mes.from_user.id) database.delete.delete_master_pass(engine, mes.from_user.id)
_send_tmp_message(bot, mes.chat.id, "Всё успешно удалено", timeout=10) _send_tmp_message(bot, mes.chat.id, "Всё успешно удалено", timeout=10)
else: else:
_send_tmp_message( _send_tmp_message(bot, mes.chat.id, "Вы отправили не YES, ничего не удалено")
bot,
mes.chat.id,
"Вы отправили не YES, ничего не удалено",
)
def set_master_password( def set_master_password(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None:
bot: telebot.TeleBot,
engine: Engine,
mes: Message,
) -> None:
_base_handler(bot, mes, None) _base_handler(bot, mes, None)
if database.get.get_master_pass(engine, mes.from_user.id) is not None: if database.get.get_master_pass(engine, mes.from_user.id) is not None:
return _send_tmp_message( return _send_tmp_message(bot, mes.chat.id, "Мастер пароль уже существует")
bot,
mes.chat.id,
"Мастер пароль уже существует",
)
bot_mes = bot.send_message(mes.chat.id, "Отправьте мастер пароль") bot_mes = bot.send_message(mes.chat.id, "Отправьте мастер пароль")
bot.register_next_step_handler( bot.register_next_step_handler(
mes, functools.partial(_set_master_pass2, bot, engine, bot_mes) mes, functools.partial(_set_master_pass2, bot, engine, bot_mes)
@ -110,25 +97,14 @@ def _set_master_pass2(
if text == "/cancel": if text == "/cancel":
return _send_tmp_message(bot, mes.chat.id, "Успешная отмена") return _send_tmp_message(bot, mes.chat.id, "Успешная отмена")
password_hash, master_salt = encryption.master_pass.encrypt_master_pass( hash_pass, master_salt = cryptography.master_pass.encrypt_master_pass(text)
text, database.add.add_master_pass(engine, mes.from_user.id, master_salt, hash_pass)
)
database.add.add_master_pass(
engine,
mes.from_user.id,
master_salt,
password_hash,
)
_send_tmp_message(bot, mes.chat.id, "Успех") _send_tmp_message(bot, mes.chat.id, "Успех")
del mes, text del mes, text
gc.collect() gc.collect()
def reset_master_pass( def reset_master_pass(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None:
bot: telebot.TeleBot,
engine: Engine,
mes: Message,
) -> None:
_base_handler(bot, mes) _base_handler(bot, mes)
if database.get.get_master_pass(engine, mes.from_user.id) is None: if database.get.get_master_pass(engine, mes.from_user.id) is None:
return _send_tmp_message(bot, mes.chat.id, "Мастер пароль не задан") return _send_tmp_message(bot, mes.chat.id, "Мастер пароль не задан")
@ -150,7 +126,7 @@ def _reset_master_pass2(
if text == "/cancel": if text == "/cancel":
return _send_tmp_message(bot, mes.chat.id, "Успешная отмена") return _send_tmp_message(bot, mes.chat.id, "Успешная отмена")
hash_, salt = encryption.master_pass.encrypt_master_pass(text) hash_, salt = cryptography.master_pass.encrypt_master_pass(text)
database.delete.purge_accounts(engine, mes.from_user.id) database.delete.purge_accounts(engine, mes.from_user.id)
database.change.change_master_pass(engine, mes.from_user.id, salt, hash_) database.change.change_master_pass(engine, mes.from_user.id, salt, hash_)
_send_tmp_message( _send_tmp_message(
@ -163,10 +139,7 @@ def _reset_master_pass2(
def add_account(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None: def add_account(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None:
_base_handler(bot, mes) _base_handler(bot, mes)
master_password_from_db = database.get.get_master_pass( master_password_from_db = database.get.get_master_pass(engine, mes.from_user.id)
engine,
mes.from_user.id,
)
if master_password_from_db is None: if master_password_from_db is None:
return _send_tmp_message(bot, mes.chat.id, "Нет мастер пароля") return _send_tmp_message(bot, mes.chat.id, "Нет мастер пароля")
@ -186,11 +159,7 @@ def _add_account2(
return _send_tmp_message(bot, mes.chat.id, "Успешная отмена") return _send_tmp_message(bot, mes.chat.id, "Успешная отмена")
if not check_account_name(text): if not check_account_name(text):
return _send_tmp_message( return _send_tmp_message(bot, mes.chat.id, "Не корректное название аккаунта")
bot,
mes.chat.id,
"Не корректное название аккаунта",
)
if text in database.get.get_accounts(engine, mes.from_user.id): if text in database.get.get_accounts(engine, mes.from_user.id):
return _send_tmp_message( return _send_tmp_message(
bot, mes.chat.id, "Аккаунт с таким именем уже существует" bot, mes.chat.id, "Аккаунт с таким именем уже существует"
@ -263,19 +232,13 @@ def _add_account5(
return _send_tmp_message(bot, mes.chat.id, "Успешная отмена") return _send_tmp_message(bot, mes.chat.id, "Успешная отмена")
salt, hash_ = database.get.get_master_pass(engine, mes.from_user.id) salt, hash_ = database.get.get_master_pass(engine, mes.from_user.id)
if not encryption.master_pass.check_master_pass(text, hash_, salt): if not cryptography.master_pass.check_master_pass(text, hash_, salt):
return _send_tmp_message( return _send_tmp_message(bot, mes.chat.id, "Не подходит главный пароль")
bot,
mes.chat.id,
"Не подходит главный пароль",
)
name, login, passwd = data["name"], data["login"], data["passwd"] name, login, passwd = data["name"], data["login"], data["passwd"]
enc_login, enc_pass, salt = encryption.other_accounts.encrypt( enc_login, enc_pass, salt = cryptography.other_accounts.encrypt_account_info(
login, login, passwd, text
passwd,
text,
) )
result = database.add.add_account( result = database.add.add_account(
@ -283,9 +246,7 @@ def _add_account5(
) )
_send_tmp_message( _send_tmp_message(
bot, bot, mes.chat.id, "Успех" if result else "Произошла не предвиденная ошибка"
mes.chat.id,
"Успех" if result else "Произошла не предвиденная ошибка",
) )
del data, name, login, passwd, enc_login del data, name, login, passwd, enc_login
@ -324,43 +285,29 @@ def _get_account2(
def _get_account3( def _get_account3(
bot: telebot.TeleBot, bot: telebot.TeleBot, engine: Engine, prev_mes: Message, name: str, mes: Message
engine: Engine,
prev_mes: Message,
name: str,
mes: Message,
) -> None: ) -> None:
_base_handler(bot, mes, prev_mes) _base_handler(bot, mes, prev_mes)
text = mes.text.strip() text = mes.text.strip()
if text == "/cancel": if text == "/cancel":
return _send_tmp_message(bot, mes.chat.id, "Успешная отмена") return _send_tmp_message(bot, mes.chat.id, "Успешная отмена")
master_salt, hash_pass = database.get.get_master_pass( master_salt, hash_pass = database.get.get_master_pass(engine, mes.from_user.id)
engine,
mes.from_user.id,
)
if not encryption.master_pass.check_master_pass( if not cryptography.master_pass.check_master_pass(text, hash_pass, master_salt):
text,
hash_pass,
master_salt,
):
return _send_tmp_message(bot, mes.chat.id, "Не подходит мастер пароль") return _send_tmp_message(bot, mes.chat.id, "Не подходит мастер пароль")
salt, enc_login, enc_pass = database.get.get_account_info( salt, enc_login, enc_pass = database.get.get_account_info(
engine, mes.from_user.id, name engine, mes.from_user.id, name
) )
login, passwd = encryption.other_accounts.decrypt( login, passwd = cryptography.other_accounts.decrypt_account_info(
enc_login, enc_login, enc_pass, text, salt
enc_pass,
text,
salt,
) )
_send_tmp_message( _send_tmp_message(
bot, bot,
mes.chat.id, mes.chat.id,
f"Логин:\n`{login}`\nПароль:\n`{passwd}`\nНажмите на логин " f"Логин:\n`{login}`\nПароль:\n`{passwd}`\nНажмите на логин или пароль, "
"или пароль, чтобы скопировать", "чтобы скопировать",
30, 30,
) )
@ -399,7 +346,7 @@ def _delete_account2(
_send_tmp_message(bot, mes.chat.id, "Аккаунт удалён") _send_tmp_message(bot, mes.chat.id, "Аккаунт удалён")
def help_command(bot: telebot.TeleBot, mes: Message) -> None: def help(bot: telebot.TeleBot, mes: Message) -> None:
message = """Команды: message = """Команды:
/set_master_pass - установить мастер пароль /set_master_pass - установить мастер пароль
/add_account - создать аккаунт /add_account - создать аккаунт
@ -424,10 +371,7 @@ def cancel(bot: telebot.TeleBot, mes: Message) -> None:
def export(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None: def export(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None:
_base_handler(bot, mes) _base_handler(bot, mes)
master_password_from_db = database.get.get_master_pass( master_password_from_db = database.get.get_master_pass(engine, mes.from_user.id)
engine,
mes.from_user.id,
)
if master_password_from_db is None: if master_password_from_db is None:
return _send_tmp_message(bot, mes.chat.id, "Нет мастер пароля") return _send_tmp_message(bot, mes.chat.id, "Нет мастер пароля")
@ -450,19 +394,12 @@ def _export2(
if text == "/cancel": if text == "/cancel":
return _send_tmp_message(bot, mes.chat.id, "Успешная отмена") return _send_tmp_message(bot, mes.chat.id, "Успешная отмена")
master_salt, hash_pass = database.get.get_master_pass( master_salt, hash_pass = database.get.get_master_pass(engine, mes.from_user.id)
engine, if not cryptography.master_pass.check_master_pass(text, hash_pass, master_salt):
mes.from_user.id,
)
if not encryption.master_pass.check_master_pass(
text,
hash_pass,
master_salt,
):
return _send_tmp_message(bot, mes.chat.id, "Не подходит мастер пароль") return _send_tmp_message(bot, mes.chat.id, "Не подходит мастер пароль")
accounts = database.get.get_all_accounts(engine, mes.from_user.id) accounts = database.get.get_all_accounts(engine, mes.from_user.id)
accounts = encryption.other_accounts.decrypt_multiple(accounts, text) accounts = cryptography.other_accounts.decrypt_multiple(accounts, text)
json_io = accounts_to_json(accounts) json_io = accounts_to_json(accounts)
bot_mes = bot.send_document(mes.chat.id, json_io) bot_mes = bot.send_document(mes.chat.id, json_io)
@ -472,16 +409,9 @@ def _export2(
bot.delete_message(bot_mes.chat.id, bot_mes.id) bot.delete_message(bot_mes.chat.id, bot_mes.id)
def import_accounts( def import_accounts(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None:
bot: telebot.TeleBot,
engine: Engine,
mes: Message,
) -> None:
_base_handler(bot, mes) _base_handler(bot, mes)
master_password_from_db = database.get.get_master_pass( master_password_from_db = database.get.get_master_pass(engine, mes.from_user.id)
engine,
mes.from_user.id,
)
if master_password_from_db is None: if master_password_from_db is None:
return _send_tmp_message(bot, mes.chat.id, "Нет мастер пароля") return _send_tmp_message(bot, mes.chat.id, "Нет мастер пароля")
@ -503,11 +433,7 @@ def _import2(
return _send_tmp_message(bot, mes.chat.id, "Успешная отмена") return _send_tmp_message(bot, mes.chat.id, "Успешная отмена")
if mes.document is None: if mes.document is None:
return _send_tmp_message( return _send_tmp_message(bot, mes.chat.id, "Вы должны отправить документ")
bot,
mes.chat.id,
"Вы должны отправить документ",
)
if mes.document.file_size > 102_400: # If file size is bigger that 100 MB if mes.document.file_size > 102_400: # If file size is bigger that 100 MB
return _send_tmp_message(bot, mes.chat.id, "Файл слишком большой") return _send_tmp_message(bot, mes.chat.id, "Файл слишком большой")
@ -516,11 +442,7 @@ def _import2(
try: try:
accounts = json_to_accounts(downloaded_file.decode("utf-8")) accounts = json_to_accounts(downloaded_file.decode("utf-8"))
except Exception: except Exception:
return _send_tmp_message( return _send_tmp_message(bot, mes.chat.id, "Ошибка во время работы с файлом")
bot,
mes.chat.id,
"Ошибка во время работы с файлом",
)
bot_mes = bot.send_message(mes.chat.id, "Отправьте мастер пароль") bot_mes = bot.send_message(mes.chat.id, "Отправьте мастер пароль")
bot.register_next_step_handler( bot.register_next_step_handler(
@ -540,29 +462,19 @@ def _import3(
if text == "/cancel": if text == "/cancel":
return _send_tmp_message(bot, mes.chat.id, "Успешная отмена") return _send_tmp_message(bot, mes.chat.id, "Успешная отмена")
master_salt, hash_pass = database.get.get_master_pass( master_salt, hash_pass = database.get.get_master_pass(engine, mes.from_user.id)
engine, if not cryptography.master_pass.check_master_pass(text, hash_pass, master_salt):
mes.from_user.id,
)
if not encryption.master_pass.check_master_pass(
text,
hash_pass,
master_salt,
):
return _send_tmp_message(bot, mes.chat.id, "Не подходит мастер пароль") return _send_tmp_message(bot, mes.chat.id, "Не подходит мастер пароль")
# List of names of accounts, which failed to be added to the database # List of names of accounts, which failed to be added to the database or failed tests
# or failed the tests
failed: list[str] = [] failed: list[str] = []
for account in accounts: for account in accounts:
name, login, passwd = account name, login, passwd = account
if not check_account(name, login, passwd): if not check_account(name, login, passwd):
failed.append(name) failed.append(name)
continue continue
enc_login, enc_passwd, salt = encryption.other_accounts.encrypt( enc_login, enc_passwd, salt = cryptography.other_accounts.encrypt_account_info(
login, login, passwd, text
passwd,
text,
) )
result = database.add.add_account( result = database.add.add_account(
engine, mes.from_user.id, name, salt, enc_login, enc_passwd engine, mes.from_user.id, name, salt, enc_login, enc_passwd

View File

@ -20,10 +20,8 @@ def _generate_key(salt: bytes, master_pass: bytes) -> bytes:
return key return key
def encrypt( def encrypt_account_info(
login: str, login: str, passwd: str, master_pass: str
passwd: str,
master_pass: str,
) -> tuple[bytes, bytes, bytes]: ) -> tuple[bytes, bytes, bytes]:
"""Encrypts login and password of a user using their master """Encrypts login and password of a user using their master
password as a key. password as a key.
@ -36,7 +34,7 @@ def encrypt(
return (enc_login, enc_password, salt) return (enc_login, enc_password, salt)
def decrypt( def decrypt_account_info(
enc_login: bytes, enc_login: bytes,
enc_pass: bytes, enc_pass: bytes,
master_pass: str, master_pass: str,
@ -60,10 +58,5 @@ def decrypt_multiple(
Return an iterator of names, logins and passwords as a tuple""" Return an iterator of names, logins and passwords as a tuple"""
for account in accounts: for account in accounts:
name, salt, enc_login, enc_passwd = account name, salt, enc_login, enc_passwd = account
login, passwd = decrypt( login, passwd = decrypt_account_info(enc_login, enc_passwd, master_pass, salt)
enc_login,
enc_passwd,
master_pass,
salt,
)
yield (name, login, passwd) yield (name, login, passwd)

View File

@ -13,8 +13,7 @@ def add_account(
enc_login: bytes, enc_login: bytes,
enc_password: bytes, enc_password: bytes,
) -> bool: ) -> bool:
"""Adds account to the database. Returns true on success, """Adds account to the database. Returns true on success, false otherwise"""
false otherwise"""
account = models.Account( account = models.Account(
user_id=user_id, user_id=user_id,
name=name, name=name,
@ -38,8 +37,7 @@ def add_master_pass(
salt: bytes, salt: bytes,
password_hash: bytes, password_hash: bytes,
) -> bool: ) -> bool:
"""Adds master password the database. Returns true on success, """Adds master password the database. Returns true on success, false otherwise"""
false otherwise"""
master_pass = models.MasterPass( master_pass = models.MasterPass(
user_id=user_id, user_id=user_id,
salt=salt, salt=salt,

View File

@ -11,7 +11,7 @@ def change_master_pass(
statement = ( statement = (
sqlmodel.update(models.MasterPass) sqlmodel.update(models.MasterPass)
.where(models.MasterPass.user_id == user_id) .where(models.MasterPass.user_id == user_id)
.values(salt=salt, password_hash=password) .values(salt=salt, passwd=password)
) )
with sqlmodel.Session(engine) as session: with sqlmodel.Session(engine) as session:
session.exec(statement) session.exec(statement)

View File

@ -35,20 +35,15 @@ def get_accounts(engine: Engine, user_id: int) -> list[str]:
def get_all_accounts( def get_all_accounts(
engine: Engine, user_id: int engine: Engine, user_id: int
) -> Iterator[tuple[str, bytes, bytes, bytes]]: ) -> Iterator[tuple[str, bytes, bytes, bytes]]:
"""Returns an iterator of tuples, where values represent account's """Returns an iterator of tuples, where values represent account's name, salt,
name, salt, encrypted login and encrypted password""" encrypted login and encrypted password"""
statement = sqlmodel.select(models.Account).where( statement = sqlmodel.select(models.Account).where(
models.Account.user_id == user_id, models.Account.user_id == user_id,
) )
with sqlmodel.Session(engine) as session: with sqlmodel.Session(engine) as session:
result = session.exec(statement) result = session.exec(statement)
yield from ( yield from (
( (account.name, account.salt, account.enc_login, account.enc_password)
account.name,
account.salt,
account.enc_login,
account.enc_password,
)
for account in result for account in result
) )

View File

@ -1,15 +1,13 @@
import sqlmodel import sqlmodel
from sqlalchemy.future import Engine from sqlalchemy.future import Engine
from . import models # noqa from . import models
HOUR_IN_SECONDS = 3600
def get_engine(host: str, user: str, passwd: str, db: str) -> Engine: def get_engine(host: str, user: str, passwd: str, db: str) -> Engine:
"""Creates an engine for mariadb with pymysql as connector""" """Creates an engine for mariadb with pymysql as connector"""
uri = f"mariadb+pymysql://{user}:{passwd}@{host}/{db}" uri = f"mariadb+pymysql://{user}:{passwd}@{host}/{db}"
engine = sqlmodel.create_engine(uri, pool_recycle=HOUR_IN_SECONDS) engine = sqlmodel.create_engine(uri)
return engine return engine