From 04633888292d2cec574890204bba8fa47eb7d1ac Mon Sep 17 00:00:00 2001 From: StNicolay Date: Wed, 30 Nov 2022 15:56:25 +0300 Subject: [PATCH] Massive code cleanup --- src/bot/handlers.py | 14 ++++++++----- src/bot/utils.py | 23 ++++++++++----------- src/cryptography/master_pass.py | 12 +++++++---- src/cryptography/other_accounts.py | 21 ++++++++++++-------- src/database/add.py | 21 ++++++++++++++++---- src/database/change.py | 4 ++-- src/database/delete.py | 7 +++++-- src/database/get.py | 32 ++++++++++++++++++++++++++---- src/database/prepare.py | 3 ++- 9 files changed, 95 insertions(+), 42 deletions(-) diff --git a/src/bot/handlers.py b/src/bot/handlers.py index 8d0035b..5c17310 100644 --- a/src/bot/handlers.py +++ b/src/bot/handlers.py @@ -44,7 +44,8 @@ def delete_all(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None: base_handler(bot, mes) bot_mes = bot.send_message( mes.chat.id, - "Вы действительно хотите удалить все ваши аккаунты? Это действие нельзя отменить. Отправьте YES для подтверждения", + "Вы действительно хотите удалить все ваши аккаунты? Это действие нельзя отменить. " + "Отправьте YES для подтверждения", ) bot.register_next_step_handler( mes, functools.partial(_delete_all, bot, engine, bot_mes) @@ -95,7 +96,8 @@ def reset_master_pass(bot: telebot.TeleBot, engine: Engine, mes: Message) -> Non return send_tmp_message(bot, mes.chat.id, "Мастер пароль не задан") bot_mes = bot.send_message( mes.chat.id, - "Отправьте новый мастер пароль, осторожно, все текущие аккаунты будут удалены навсегда", + "Отправьте новый мастер пароль, осторожно, все текущие аккаунты " + "будут удалены навсегда", ) bot.register_next_step_handler( mes, functools.partial(_reset_master_pass2, bot, engine, bot_mes) @@ -290,7 +292,8 @@ def _get_account3( send_tmp_message( bot, mes.chat.id, - f"Логин:\n`{login}`\nПароль:\n`{passwd}`\nНажмите на логин или пароль, чтобы скопировать", + f"Логин:\n`{login}`\nПароль:\n`{passwd}`\nНажмите на логин или пароль, " + "чтобы скопировать", 30, ) @@ -341,7 +344,8 @@ def help(bot: telebot.TeleBot, mes: Message) -> None: /cancel - отмена текущего действия /help - помощь /export - получить пароли в json формате -/import - импортировать пароли из json в файле в таком же формате, как из /export +/import - импортировать пароли из json в файле в таком же формате, \ +как из /export /gen_password - создать 10 надёжных паролей""" bot.send_message(mes.chat.id, message) @@ -472,8 +476,8 @@ def _import3( def gen_password(bot: telebot.TeleBot, mes: Message) -> None: - # Generate 10 passwords and put 'em in the backticks base_handler(bot, mes) + # Generate 10 passwords and put 'em in the backticks passwords = (f"`{gen_passwd()}`" for _ in range(10)) text = ( "Пароли:\n" diff --git a/src/bot/utils.py b/src/bot/utils.py index b958a10..a7608e8 100644 --- a/src/bot/utils.py +++ b/src/bot/utils.py @@ -2,7 +2,7 @@ import io import string import time from random import SystemRandom -from typing import Self, Type +from typing import Self, Type, Iterator import pydantic import telebot @@ -37,7 +37,7 @@ class _Accounts(pydantic.BaseModel): accounts: list[_Account] = pydantic.Field(default_factory=list) -def _accounts_list_to_json(accounts: list[tuple[str, str, str]]) -> str: +def _accounts_list_to_json(accounts: Iterator[tuple[str, str, str]]) -> str: accounts = _Accounts(accounts=[_Account.from_tuple(i) for i in accounts]) return accounts.json() @@ -48,7 +48,10 @@ def json_to_accounts(json_: str) -> list[tuple[str, str, str]]: def send_tmp_message( - bot: telebot.TeleBot, chat_id: telebot.types.Message, text: str, timeout: int = 5 + bot: telebot.TeleBot, + chat_id: telebot.types.Message, + text: str, + timeout: int = 5, ) -> None: bot_mes = bot.send_message(chat_id, text, parse_mode="MarkdownV2") time.sleep(timeout) @@ -65,20 +68,16 @@ def base_handler( def get_all_accounts( engine: Engine, user_id: int, master_pass: str -) -> list[tuple[str, str, str]]: - accounts: list[tuple[str, str, str]] = [] - for account_name in database.get.get_accounts(engine, user_id): - salt, enc_login, enc_passwd = database.get.get_account_info( - engine, user_id, account_name - ) +) -> Iterator[tuple[str, str, str]]: + for account in database.get.get_all_accounts(engine, user_id): + name, salt, enc_login, enc_passwd = account login, passwd = cryptography.other_accounts.decrypt_account_info( enc_login, enc_passwd, master_pass, salt ) - accounts.append((account_name, login, passwd)) - return accounts + yield (name, login, passwd) -def accounts_to_json(accounts: list[tuple[str, str, str]]) -> io.StringIO: +def accounts_to_json(accounts: Iterator[tuple[str, str, str]]) -> io.StringIO: file = io.StringIO(_accounts_list_to_json(accounts)) file.name = "passwords.json" return file diff --git a/src/cryptography/master_pass.py b/src/cryptography/master_pass.py index ff0f079..f840cfb 100644 --- a/src/cryptography/master_pass.py +++ b/src/cryptography/master_pass.py @@ -17,18 +17,22 @@ def _get_kdf(salt: bytes) -> Scrypt: return kdf -def encrypt_master_pass(passwd: str) -> tuple[bytes, bytes]: +def encrypt_master_pass(password: str) -> tuple[bytes, bytes]: """Hashes master password and return tuple of hashed password and salt""" salt = os.urandom(64) kdf = _get_kdf(salt) - return kdf.derive(passwd.encode("utf-8")), salt + return kdf.derive(password.encode("utf-8")), salt -def check_master_pass(passwd: str, enc_pass: bytes, salt: bytes) -> bool: +def check_master_pass( + password: str, + enc_password: bytes, + salt: bytes, +) -> bool: """Checks if the master password is correct""" kdf = _get_kdf(salt) try: - kdf.verify(passwd.encode("utf-8"), enc_pass) + kdf.verify(password.encode("utf-8"), enc_password) except InvalidKey: return False else: diff --git a/src/cryptography/other_accounts.py b/src/cryptography/other_accounts.py index d38bc2b..26a97e8 100644 --- a/src/cryptography/other_accounts.py +++ b/src/cryptography/other_accounts.py @@ -22,23 +22,28 @@ def _generate_key(salt: bytes, master_pass: bytes) -> bytes: def encrypt_account_info( login: str, passwd: str, master_pass: str ) -> tuple[bytes, bytes, bytes]: - """Encrypts login and password of a user using their master password as a key. + """Encrypts login and password of a user using their master + password as a key. Returns a tuple of encrypted login, password and salt""" salt = os.urandom(64) key = _generate_key(salt, master_pass.encode("utf-8")) f = Fernet(key) enc_login = base64.urlsafe_b64decode(f.encrypt(login.encode("utf-8"))) - enc_passwd = base64.urlsafe_b64decode(f.encrypt(passwd.encode("utf-8"))) - return (enc_login, enc_passwd, salt) + enc_password = base64.urlsafe_b64decode(f.encrypt(passwd.encode("utf-8"))) + return (enc_login, enc_password, salt) def decrypt_account_info( - enc_login: bytes, enc_pass: bytes, master_pass: str, salt: bytes + enc_login: bytes, + enc_pass: bytes, + master_pass: str, + salt: bytes, ) -> tuple[str, str]: - """Decrypts login and password using their master password as a key. + """Decrypts login and password using their + master password as a key. Returns a tuple of decrypted login and password""" key = _generate_key(salt, master_pass.encode("utf-8")) f = Fernet(key) - login_bytes = f.decrypt(base64.urlsafe_b64encode(enc_login)).decode("utf-8") - pass_bytes = f.decrypt(base64.urlsafe_b64encode(enc_pass)).decode("utf-8") - return (login_bytes, pass_bytes) + login = f.decrypt(base64.urlsafe_b64encode(enc_login)).decode("utf-8") + password = f.decrypt(base64.urlsafe_b64encode(enc_pass)).decode("utf-8") + return (login, password) diff --git a/src/database/add.py b/src/database/add.py index 542953e..1875dea 100644 --- a/src/database/add.py +++ b/src/database/add.py @@ -11,11 +11,15 @@ def add_account( name: str, salt: bytes, enc_login: bytes, - enc_pass: bytes, + enc_password: bytes, ) -> bool: """Adds account to the database. Returns true on success, false otherwise""" account = models.Account( - user_id=user_id, name=name, salt=salt, enc_login=enc_login, enc_pass=enc_pass + user_id=user_id, + name=name, + salt=salt, + enc_login=enc_login, + enc_pass=enc_password, ) try: with sqlmodel.Session(engine) as session: @@ -27,9 +31,18 @@ def add_account( return True -def add_master_pass(engine: Engine, user_id: int, salt: bytes, passwd: bytes) -> bool: +def add_master_pass( + engine: Engine, + user_id: int, + salt: bytes, + password_hash: bytes, +) -> bool: """Adds master password the database. Returns true on success, false otherwise""" - master_pass = models.MasterPass(user_id=user_id, salt=salt, passwd=passwd) + master_pass = models.MasterPass( + user_id=user_id, + salt=salt, + passwd=password_hash, + ) try: with sqlmodel.Session(engine) as session: session.add(master_pass) diff --git a/src/database/change.py b/src/database/change.py index 2022f42..4e210a6 100644 --- a/src/database/change.py +++ b/src/database/change.py @@ -5,13 +5,13 @@ from . import models def change_master_pass( - engine: Engine, user_id: int, salt: bytes, passwd: bytes + engine: Engine, user_id: int, salt: bytes, password: bytes ) -> None: """Changes master password and salt in the database""" statement = ( sqlmodel.update(models.MasterPass) .where(models.MasterPass.user_id == user_id) - .values(salt=salt, passwd=passwd) + .values(salt=salt, passwd=password) ) with sqlmodel.Session(engine) as session: session.exec(statement) diff --git a/src/database/delete.py b/src/database/delete.py index 93d2d65..3235e10 100644 --- a/src/database/delete.py +++ b/src/database/delete.py @@ -6,7 +6,9 @@ from . import models def purge_accounts(engine: Engine, user_id: int) -> None: """Deletes all user's accounts""" - statement = sqlmodel.delete(models.Account).where(models.Account.user_id == user_id) + statement = sqlmodel.delete(models.Account).where( + models.Account.user_id == user_id, + ) with sqlmodel.Session(engine) as session: session.exec(statement) session.commit() @@ -25,7 +27,8 @@ def delete_master_pass(engine: Engine, user_id: int) -> None: def delete_account(engine: Engine, user_id: int, name: str) -> None: """Deletes specific user account""" statement = sqlmodel.delete(models.Account).where( - models.Account.user_id == user_id, models.Account.name == name + models.Account.user_id == user_id, + models.Account.name == name, ) with sqlmodel.Session(engine) as session: session.exec(statement) diff --git a/src/database/get.py b/src/database/get.py index 64f1e34..b664b13 100644 --- a/src/database/get.py +++ b/src/database/get.py @@ -1,14 +1,19 @@ +from typing import Iterator + import sqlmodel from sqlalchemy.future import Engine from . import models -def get_master_pass(engine: Engine, user_id: int) -> tuple[bytes, bytes] | None: +def get_master_pass( + engine: Engine, + user_id: int, +) -> tuple[bytes, bytes] | None: """Gets master pass. Returns tuple of salt and password or None if it wasn't found""" statement = sqlmodel.select(models.MasterPass).where( - models.MasterPass.user_id == user_id + models.MasterPass.user_id == user_id, ) with sqlmodel.Session(engine) as session: result = session.exec(statement).first() @@ -19,19 +24,38 @@ def get_master_pass(engine: Engine, user_id: int) -> tuple[bytes, bytes] | None: def get_accounts(engine: Engine, user_id: int) -> list[str]: """Gets list of account names""" - statement = sqlmodel.select(models.Account).where(models.Account.user_id == user_id) + statement = sqlmodel.select(models.Account).where( + models.Account.user_id == user_id, + ) with sqlmodel.Session(engine) as session: result = session.exec(statement) return [account.name for account in result] +def get_all_accounts( + engine: Engine, user_id: int +) -> Iterator[tuple[str, bytes, bytes, bytes]]: + """Returns an iterator of tuples, where values represent account's name, salt, + encrypted login and encrypted password""" + statement = sqlmodel.select(models.Account).where( + models.Account.user_id == user_id, + ) + with sqlmodel.Session(engine) as session: + result = session.exec(statement) + yield from ( + (account.name, account.salt, account.enc_login, account.enc_pass) + for account in result + ) + + def get_account_info( engine: Engine, user_id: int, name: str ) -> tuple[bytes, bytes, bytes]: """Gets account info. Returns tuple of salt, login and password or None if it wasn't found""" statement = sqlmodel.select(models.Account).where( - models.Account.user_id == user_id, models.Account.name == name + models.Account.user_id == user_id, + models.Account.name == name, ) with sqlmodel.Session(engine) as session: result = session.exec(statement).first() diff --git a/src/database/prepare.py b/src/database/prepare.py index 14680f0..ec5438d 100644 --- a/src/database/prepare.py +++ b/src/database/prepare.py @@ -6,7 +6,8 @@ from . import models def get_engine(host: str, user: str, passwd: str, db: str) -> Engine: """Creates an engine for mariadb with pymysql as connector""" - engine = sqlmodel.create_engine(f"mariadb+pymysql://{user}:{passwd}@{host}/{db}") + uri = f"mariadb+pymysql://{user}:{passwd}@{host}/{db}" + engine = sqlmodel.create_engine(uri) return engine