diff --git a/requirements-dev.txt b/requirements-dev.txt index 4e92b9d..5e5647a 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -1,2 +1,3 @@ black flake8 +isort diff --git a/src/__init__.py b/src/__init__.py index 50e3dae..7b33bca 100644 --- a/src/__init__.py +++ b/src/__init__.py @@ -6,8 +6,9 @@ from . import ( account_checks, account_parsing, bot, - encryption, + classes, database, + encryption, generate_password, ) @@ -15,6 +16,7 @@ __all__ = [ "account_checks", "account_parsing", "bot", + "classes", "encryption", "database", "generate_password", diff --git a/src/account_checks.py b/src/account_checks.py index 9fb05ed..e967d4e 100644 --- a/src/account_checks.py +++ b/src/account_checks.py @@ -1,5 +1,7 @@ import string +from .classes import DecryptedAccount + FORBIDDEN_CHARS = frozenset("`\n") PUNCTUATION = frozenset(string.punctuation).difference(FORBIDDEN_CHARS) @@ -24,13 +26,13 @@ def check_password(passwd: str) -> bool: return _base_check(passwd) -def check_account(name: str, login: str, passwd: str) -> bool: +def check_account(account: DecryptedAccount) -> bool: """Runs checks for account name, login and password""" return all( ( - check_account_name(name), - check_login(login), - check_password(passwd), + check_account_name(account.name), + check_login(account.login), + check_password(account.password), ) ) diff --git a/src/account_parsing.py b/src/account_parsing.py index 932aea2..06ae655 100644 --- a/src/account_parsing.py +++ b/src/account_parsing.py @@ -1,37 +1,50 @@ import io -from typing import Iterator, Self, Type +from typing import Iterable, Self import pydantic +from .classes import DecryptedAccount + class _Account(pydantic.BaseModel): name: str login: str password: str - @classmethod - def from_tuple(cls: Type[Self], tuple_: tuple[str, str, str]) -> Self: - return cls(name=tuple_[0], login=tuple_[1], password=tuple_[2]) + def to_usual_account(self, user_id: int) -> DecryptedAccount: + return DecryptedAccount( + user_id=user_id, + name=self.name, + login=self.login, + password=self.password, + ) - def as_tuple(self: Self) -> tuple[str, str, str]: - return (self.name, self.login, self.password) + @classmethod + def from_usual_account(cls, account: DecryptedAccount) -> Self: + return cls( + name=account.name, + login=account.login, + password=account.password, + ) class _Accounts(pydantic.BaseModel): accounts: list[_Account] = pydantic.Field(default_factory=list) -def _accounts_list_to_json(accounts: Iterator[tuple[str, str, str]]) -> str: - accounts = _Accounts(accounts=[_Account.from_tuple(i) for i in accounts]) - return accounts.json(ensure_ascii=False) +def _accounts_list_to_json(accounts: Iterable[DecryptedAccount]) -> str: + result = _Accounts( + accounts=[_Account.from_usual_account(i) for i in accounts], + ).json(ensure_ascii=False) + return result -def json_to_accounts(json_: str) -> list[tuple[str, str, str]]: +def json_to_accounts(json_: str, user_id: int) -> list[DecryptedAccount]: accounts = _Accounts.parse_raw(json_) - return [i.as_tuple() for i in accounts.accounts] + return [account.to_usual_account(user_id) for account in accounts.accounts] -def accounts_to_json(accounts: Iterator[tuple[str, str, str]]) -> io.StringIO: +def accounts_to_json(accounts: Iterable[DecryptedAccount]) -> io.StringIO: file = io.StringIO(_accounts_list_to_json(accounts)) file.name = "passwords.json" return file diff --git a/src/bot/handlers.py b/src/bot/handlers.py index 04749a4..c10e474 100644 --- a/src/bot/handlers.py +++ b/src/bot/handlers.py @@ -5,7 +5,7 @@ import time import telebot from sqlalchemy.future import Engine -from .. import encryption, database, generate_password +from .. import database, encryption, generate_password from ..account_checks import ( check_account, check_account_name, @@ -13,6 +13,7 @@ from ..account_checks import ( check_password, ) from ..account_parsing import accounts_to_json, json_to_accounts +from ..classes import DecryptedAccount Message = telebot.types.Message @@ -31,9 +32,9 @@ def _send_tmp_message( def _base_handler( bot: telebot.TeleBot, mes: Message, prev_mes: Message | None = None ) -> None: - bot.delete_message(mes.chat.id, mes.id) + bot.delete_message(mes.chat.id, mes.id, timeout=5) if prev_mes is not None: - bot.delete_message(prev_mes.chat.id, prev_mes.id) + bot.delete_message(prev_mes.chat.id, prev_mes.id, timeout=5) def get_accounts(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None: @@ -68,11 +69,11 @@ def delete_all(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None: "Отправьте YES для подтверждения", ) bot.register_next_step_handler( - mes, functools.partial(_delete_all, bot, engine, bot_mes) + mes, functools.partial(_delete_all2, bot, engine, bot_mes) ) -def _delete_all( +def _delete_all2( bot: telebot.TeleBot, engine: Engine, prev_mes: Message, mes: Message ) -> None: _base_handler(bot, mes, prev_mes) @@ -115,15 +116,12 @@ def _set_master_pass2( if text == "/cancel": return _send_tmp_message(bot, mes.chat.id, "Успешная отмена") - password_hash, master_salt = encryption.master_pass.encrypt_master_pass( + master_password = encryption.master_pass.encrypt_master_pass( + mes.from_user.id, text, ) - database.add.add_master_pass( - engine, - mes.from_user.id, - master_salt, - password_hash, - ) + database.add.add_master_pass(engine, master_password) + _send_tmp_message(bot, mes.chat.id, "Успех") del mes, text gc.collect() @@ -135,13 +133,16 @@ def reset_master_pass( mes: Message, ) -> None: _base_handler(bot, mes) + if database.get.get_master_pass(engine, mes.from_user.id) is None: return _send_tmp_message(bot, mes.chat.id, "Мастер пароль не задан") + bot_mes = bot.send_message( mes.chat.id, "Отправьте новый мастер пароль, осторожно, все текущие аккаунты " "будут удалены навсегда", ) + bot.register_next_step_handler( mes, functools.partial(_reset_master_pass2, bot, engine, bot_mes) ) @@ -155,9 +156,13 @@ def _reset_master_pass2( if text == "/cancel": return _send_tmp_message(bot, mes.chat.id, "Успешная отмена") - hash_, salt = encryption.master_pass.encrypt_master_pass(text) + master_password = encryption.master_pass.encrypt_master_pass( + mes.from_user.id, + text, + ) database.delete.purge_accounts(engine, mes.from_user.id) - database.change.change_master_pass(engine, mes.from_user.id, salt, hash_) + database.change.change_master_pass(engine, master_password) + _send_tmp_message( bot, mes.chat.id, "Все ваши аккаунты удалены, а мастер пароль изменён" ) @@ -267,24 +272,30 @@ def _add_account5( if text == "/cancel": return _send_tmp_message(bot, mes.chat.id, "Успешная отмена") - salt, hash_ = database.get.get_master_pass(engine, mes.from_user.id) - if not encryption.master_pass.check_master_pass(text, hash_, salt): + master_password = database.get.get_master_pass(engine, mes.from_user.id) + if not encryption.master_pass.check_master_pass(text, master_password): return _send_tmp_message( bot, mes.chat.id, "Не подходит главный пароль", ) - name, login, passwd = data["name"], data["login"], data["passwd"] + # name, login, passwd = data["name"], data["login"], data["passwd"] + account = DecryptedAccount( + user_id=mes.from_user.id, + name=data["name"], + login=data["login"], + password=data["passwd"], + ) - enc_login, enc_pass, salt = encryption.other_accounts.encrypt( - login, - passwd, + encrypted_account = encryption.other_accounts.encrypt( + account, text, ) result = database.add.add_account( - engine, mes.from_user.id, name, salt, enc_login, enc_pass + engine, + encrypted_account, ) _send_tmp_message( @@ -293,19 +304,19 @@ def _add_account5( "Успех" if result else "Произошла не предвиденная ошибка", ) - del data, name, login, passwd, enc_login + del data, account gc.collect() def get_account(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None: _base_handler(bot, mes) - bot_mes = bot.send_message(mes.chat.id, "Отправьте название аккаунта") master_pass = database.get.get_master_pass(engine, mes.from_user.id) if master_pass is None: return _send_tmp_message(bot, mes.chat.id, "Нет мастер пароля") + bot_mes = bot.send_message(mes.chat.id, "Отправьте название аккаунта") bot.register_next_step_handler( mes, functools.partial(_get_account2, bot, engine, bot_mes) ) @@ -340,36 +351,28 @@ def _get_account3( if text == "/cancel": return _send_tmp_message(bot, mes.chat.id, "Успешная отмена") - master_salt, hash_pass = database.get.get_master_pass( + master_password = database.get.get_master_pass( engine, mes.from_user.id, ) - if not encryption.master_pass.check_master_pass( - text, - hash_pass, - master_salt, - ): + if not encryption.master_pass.check_master_pass(text, master_password): return _send_tmp_message(bot, mes.chat.id, "Не подходит мастер пароль") - salt, enc_login, enc_pass = database.get.get_account_info( - engine, mes.from_user.id, name - ) - login, passwd = encryption.other_accounts.decrypt( - enc_login, - enc_pass, + account = database.get.get_account_info(engine, mes.from_user.id, name) + account = encryption.other_accounts.decrypt( + account, text, - salt, ) _send_tmp_message( bot, mes.chat.id, - f"Логин:\n`{login}`\nПароль:\n`{passwd}`\nНажмите на логин " - "или пароль, чтобы скопировать", + f"Логин:\n`{account.login}`\nПароль:\n`{account.password}`\nНажмите " + "на логин или пароль, чтобы скопировать", 30, ) - del text, mes, passwd, login + del text, mes gc.collect() @@ -455,15 +458,11 @@ def _export2( if text == "/cancel": return _send_tmp_message(bot, mes.chat.id, "Успешная отмена") - master_salt, hash_pass = database.get.get_master_pass( + master_password = database.get.get_master_pass( engine, mes.from_user.id, ) - if not encryption.master_pass.check_master_pass( - text, - hash_pass, - master_salt, - ): + if not encryption.master_pass.check_master_pass(text, master_password): return _send_tmp_message(bot, mes.chat.id, "Не подходит мастер пароль") accounts = database.get.get_all_accounts(engine, mes.from_user.id) @@ -474,7 +473,7 @@ def _export2( del text, accounts, json_io gc.collect() time.sleep(30) - bot.delete_message(bot_mes.chat.id, bot_mes.id) + bot.delete_message(bot_mes.chat.id, bot_mes.id, timeout=5) def import_accounts( @@ -513,13 +512,16 @@ def _import2( mes.chat.id, "Вы должны отправить документ", ) - if mes.document.file_size > 102_400: # If file size is bigger that 100 MB + if mes.document.file_size > 102_400: # If file size is bigger than 100 MB return _send_tmp_message(bot, mes.chat.id, "Файл слишком большой") file_info = bot.get_file(mes.document.file_id) downloaded_file = bot.download_file(file_info.file_path) try: - accounts = json_to_accounts(downloaded_file.decode("utf-8")) + accounts = json_to_accounts( + downloaded_file.decode("utf-8"), + mes.from_user.id, + ) except Exception: return _send_tmp_message( bot, @@ -537,7 +539,7 @@ def _import3( bot: telebot.TeleBot, engine: Engine, prev_mes: Message, - accounts: list[tuple[str, str, str]], + accounts: list[DecryptedAccount], mes: Message, ) -> None: _base_handler(bot, mes, prev_mes) @@ -545,40 +547,33 @@ def _import3( if text == "/cancel": return _send_tmp_message(bot, mes.chat.id, "Успешная отмена") - master_salt, hash_pass = database.get.get_master_pass( + master_password = database.get.get_master_pass( engine, mes.from_user.id, ) - if not encryption.master_pass.check_master_pass( - text, - hash_pass, - master_salt, - ): + if not encryption.master_pass.check_master_pass(text, master_password): return _send_tmp_message(bot, mes.chat.id, "Не подходит мастер пароль") # List of names of accounts, which failed to be added to the database # or failed the tests failed: list[str] = [] for account in accounts: - name, login, passwd = account - if not check_account(name, login, passwd): - failed.append(name) + if not check_account(account): + failed.append(account.name) continue - enc_login, enc_passwd, salt = encryption.other_accounts.encrypt( - login, - passwd, + account = encryption.other_accounts.encrypt( + account, text, ) - result = database.add.add_account( - engine, mes.from_user.id, name, salt, enc_login, enc_passwd - ) + result = database.add.add_account(engine, account) if not result: - failed.append(name) + failed.append(account.name) if failed: mes_text = "Не удалось добавить:\n" + "\n".join(failed) else: mes_text = "Успех" + _send_tmp_message(bot, mes.chat.id, mes_text, 10) del text, mes, accounts gc.collect() diff --git a/src/classes.py b/src/classes.py new file mode 100644 index 0000000..9efc42a --- /dev/null +++ b/src/classes.py @@ -0,0 +1,21 @@ +from typing import Self, TypeAlias + +import pydantic + +from .database import models + +Account: TypeAlias = models.Account + + +class DecryptedAccount(pydantic.BaseModel): + user_id: int + name: str + login: str + password: str + + @classmethod + def from_tuple(cls, tuple_: tuple[str, str, str]) -> Self: + return cls(name=tuple_[0], login=tuple_[1], password=tuple_[2]) + + def as_tuple(self) -> tuple[str, str, str]: + return (self.name, self.login, self.password) diff --git a/src/database/__init__.py b/src/database/__init__.py index e4acac3..dae9edf 100644 --- a/src/database/__init__.py +++ b/src/database/__init__.py @@ -1,3 +1,3 @@ -from . import add, delete, get, models, prepare, change +from . import add, change, delete, get, models, prepare __all__ = ["add", "delete", "get", "models", "prepare", "change"] diff --git a/src/database/add.py b/src/database/add.py index a462080..1480515 100644 --- a/src/database/add.py +++ b/src/database/add.py @@ -5,23 +5,9 @@ from sqlalchemy.future import Engine from . import models -def add_account( - engine: Engine, - user_id: int, - name: str, - salt: bytes, - enc_login: bytes, - enc_password: bytes, -) -> bool: +def add_account(engine: Engine, account: models.Account) -> bool: """Adds account to the database. Returns true on success, false otherwise""" - account = models.Account( - user_id=user_id, - name=name, - salt=salt, - enc_login=enc_login, - enc_password=enc_password, - ) try: with sqlmodel.Session(engine) as session: session.add(account) @@ -32,19 +18,9 @@ def add_account( return True -def add_master_pass( - engine: Engine, - user_id: int, - salt: bytes, - password_hash: bytes, -) -> bool: +def add_master_pass(engine: Engine, master_pass: models.MasterPass) -> bool: """Adds master password the database. Returns true on success, false otherwise""" - master_pass = models.MasterPass( - user_id=user_id, - salt=salt, - password_hash=password_hash, - ) try: with sqlmodel.Session(engine) as session: session.add(master_pass) diff --git a/src/database/change.py b/src/database/change.py index 6b6b8bd..3dc99ec 100644 --- a/src/database/change.py +++ b/src/database/change.py @@ -5,13 +5,17 @@ from . import models def change_master_pass( - engine: Engine, user_id: int, salt: bytes, password: bytes + engine: Engine, + master_password: models.MasterPass, ) -> None: """Changes master password and salt in the database""" statement = ( sqlmodel.update(models.MasterPass) - .where(models.MasterPass.user_id == user_id) - .values(salt=salt, password_hash=password) + .where(models.MasterPass.user_id == master_password.user_id) + .values( + salt=master_password.salt, + password_hash=master_password.password_hash, + ) ) with sqlmodel.Session(engine) as session: session.exec(statement) diff --git a/src/database/get.py b/src/database/get.py index 0e1c714..7eb8a43 100644 --- a/src/database/get.py +++ b/src/database/get.py @@ -1,5 +1,3 @@ -from typing import Iterator - import sqlmodel from sqlalchemy.future import Engine @@ -9,17 +7,14 @@ from . import models def get_master_pass( engine: Engine, user_id: int, -) -> tuple[bytes, bytes] | None: - """Gets master pass. Returns tuple of salt and password - or None if it wasn't found""" +) -> models.MasterPass | None: + """Gets master password of a user""" statement = sqlmodel.select(models.MasterPass).where( models.MasterPass.user_id == user_id, ) with sqlmodel.Session(engine) as session: result = session.exec(statement).first() - if result is None: - return - return (result.salt, result.password_hash) + return result def get_accounts( @@ -28,7 +23,7 @@ def get_accounts( *, to_sort: bool = False, ) -> list[str]: - """Gets list of account names""" + """Gets a list of account names of a user""" statement = sqlmodel.select(models.Account.name).where( models.Account.user_id == user_id, ) @@ -39,11 +34,8 @@ def get_accounts( return result -def get_all_accounts( - engine: Engine, user_id: int -) -> Iterator[tuple[str, bytes, bytes, bytes]]: - """Returns an iterator of tuples, where values represent account's - name, salt, encrypted login and encrypted password""" +def get_all_accounts(engine: Engine, user_id: int) -> list[models.Account]: + """Returns a list of accounts of a user""" statement = ( sqlmodel.select(models.Account) .where( @@ -53,28 +45,19 @@ def get_all_accounts( ) with sqlmodel.Session(engine) as session: result = session.exec(statement).fetchall() - yield from ( - ( - account.name, - account.salt, - account.enc_login, - account.enc_password, - ) - for account in result - ) + return result def get_account_info( - engine: Engine, user_id: int, name: str -) -> tuple[bytes, bytes, bytes]: - """Gets account info. Returns tuple of salt, login and password - or None if it wasn't found""" + engine: Engine, + user_id: int, + name: str, +) -> models.Account: + """Gets account info""" statement = sqlmodel.select(models.Account).where( models.Account.user_id == user_id, models.Account.name == name, ) with sqlmodel.Session(engine) as session: result = session.exec(statement).first() - if result is None: - return - return (result.salt, result.enc_login, result.enc_password) + return result diff --git a/src/encryption/master_pass.py b/src/encryption/master_pass.py index f840cfb..228106e 100644 --- a/src/encryption/master_pass.py +++ b/src/encryption/master_pass.py @@ -3,6 +3,8 @@ import os from cryptography.exceptions import InvalidKey from cryptography.hazmat.primitives.kdf.scrypt import Scrypt +from ..database.models import MasterPass + MEMORY_USAGE = 2**14 @@ -17,22 +19,23 @@ def _get_kdf(salt: bytes) -> Scrypt: return kdf -def encrypt_master_pass(password: str) -> tuple[bytes, bytes]: - """Hashes master password and return tuple of hashed password and salt""" +def encrypt_master_pass(user_id: int, password: str) -> MasterPass: + """Hashes master password and returns MasterPass object""" salt = os.urandom(64) kdf = _get_kdf(salt) - return kdf.derive(password.encode("utf-8")), salt + password_hash = kdf.derive(password.encode("utf-8")) + return MasterPass( + user_id=user_id, + password_hash=password_hash, + salt=salt, + ) -def check_master_pass( - password: str, - enc_password: bytes, - salt: bytes, -) -> bool: +def check_master_pass(password: str, master_password: MasterPass) -> bool: """Checks if the master password is correct""" - kdf = _get_kdf(salt) + kdf = _get_kdf(master_password.salt) try: - kdf.verify(password.encode("utf-8"), enc_password) + kdf.verify(password.encode("utf-8"), master_password.password_hash) except InvalidKey: return False else: diff --git a/src/encryption/other_accounts.py b/src/encryption/other_accounts.py index a86bee7..b2c1306 100644 --- a/src/encryption/other_accounts.py +++ b/src/encryption/other_accounts.py @@ -1,14 +1,18 @@ import base64 import os -from typing import Iterator +from typing import Iterable, Iterator from cryptography.fernet import Fernet from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC +from ..classes import DecryptedAccount +from ..database.models import Account + def _generate_key(salt: bytes, master_pass: bytes) -> bytes: + """Generates key for fernet encryption""" kdf = PBKDF2HMAC( algorithm=hashes.SHA256(), length=32, @@ -21,49 +25,54 @@ def _generate_key(salt: bytes, master_pass: bytes) -> bytes: def encrypt( - login: str, - passwd: str, + account: DecryptedAccount, master_pass: str, -) -> tuple[bytes, bytes, bytes]: - """Encrypts login and password of a user using their master - password as a key. - Returns a tuple of encrypted login, password and salt""" +) -> Account: + """Encrypts account using master password and returns Account object""" salt = os.urandom(64) key = _generate_key(salt, master_pass.encode("utf-8")) f = Fernet(key) - enc_login = base64.urlsafe_b64decode(f.encrypt(login.encode("utf-8"))) - enc_password = base64.urlsafe_b64decode(f.encrypt(passwd.encode("utf-8"))) - return (enc_login, enc_password, salt) + enc_login = base64.urlsafe_b64decode( + f.encrypt(account.login.encode("utf-8")), + ) + enc_password = base64.urlsafe_b64decode( + f.encrypt(account.password.encode("utf-8")), + ) + return Account( + user_id=account.user_id, + name=account.name, + salt=salt, + enc_login=enc_login, + enc_password=enc_password, + ) def decrypt( - enc_login: bytes, - enc_pass: bytes, + account: Account, master_pass: str, - salt: bytes, -) -> tuple[str, str]: - """Decrypts login and password using their - master password as a key. - Returns a tuple of decrypted login and password""" - key = _generate_key(salt, master_pass.encode("utf-8")) +) -> DecryptedAccount: + """Decrypts account using master password and returns + DecryptedAccount object""" + key = _generate_key(account.salt, master_pass.encode("utf-8")) f = Fernet(key) - login = f.decrypt(base64.urlsafe_b64encode(enc_login)).decode("utf-8") - password = f.decrypt(base64.urlsafe_b64encode(enc_pass)).decode("utf-8") - return (login, password) + login = f.decrypt( + base64.urlsafe_b64encode(account.enc_login), + ).decode("utf-8") + password = f.decrypt( + base64.urlsafe_b64encode(account.enc_password), + ).decode("utf-8") + return DecryptedAccount( + user_id=account.user_id, + name=account.name, + login=login, + password=password, + ) def decrypt_multiple( - accounts: Iterator[tuple[str, bytes, bytes, bytes]], master_pass: str -) -> Iterator[tuple[str, str, str]]: - """Gets an iterator of tuples, where values represent account's name, salt, - encrypted login and encrypted password. - Return an iterator of names, logins and passwords as a tuple""" + accounts: Iterable[Account], master_pass: str +) -> Iterator[DecryptedAccount]: + """Decrypts an iterable of accounts using master_pass and + returns an Iterator of decrypted accounts""" for account in accounts: - name, salt, enc_login, enc_passwd = account - login, passwd = decrypt( - enc_login, - enc_passwd, - master_pass, - salt, - ) - yield (name, login, passwd) + yield decrypt(account, master_pass)