Compare commits
	
		
			17 Commits
		
	
	
		
			1.1.1
			...
			025ea868a6
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 025ea868a6 | |||
| d82d152fef | |||
| b0599c1484 | |||
| eab94e1c01 | |||
| b42cbb57a4 | |||
| 138ec55ae5 | |||
| 6cd8091fde | |||
| 0d3965d5d2 | |||
| 04bb306751 | |||
| 2a5b594f3f | |||
| 2d2ed017f1 | |||
| e9eaa085a2 | |||
| 0463388829 | |||
| 944f23a146 | |||
| b4c6e17ce2 | |||
| 2ea3096fb0 | |||
| 63de9010de | 
| @@ -1,5 +1,5 @@ | ||||
| **/__pycache__ | ||||
| **/.venv | ||||
| **/venv | ||||
| **/.classpath | ||||
| **/.dockerignore | ||||
| **/.env | ||||
| @@ -25,3 +25,4 @@ | ||||
| **/values.dev.yaml | ||||
| README.md | ||||
| data/ | ||||
| .flake8 | ||||
|   | ||||
| @@ -1 +1,2 @@ | ||||
| black | ||||
| black | ||||
| flake8 | ||||
|   | ||||
| @@ -3,4 +3,4 @@ pymysql | ||||
| python-dotenv | ||||
| pyTelegramBotAPI | ||||
| sqlmodel | ||||
| pydantic | ||||
| pydantic | ||||
|   | ||||
| @@ -2,9 +2,23 @@ import os | ||||
|  | ||||
| from dotenv import load_dotenv | ||||
|  | ||||
| from . import bot, cryptography, database | ||||
| from . import ( | ||||
|     account_checks, | ||||
|     account_parsing, | ||||
|     bot, | ||||
|     encryption, | ||||
|     database, | ||||
|     generate_password, | ||||
| ) | ||||
|  | ||||
| __all__ = ["bot", "cryptography", "database"] | ||||
| __all__ = [ | ||||
|     "account_checks", | ||||
|     "account_parsing", | ||||
|     "bot", | ||||
|     "encryption", | ||||
|     "database", | ||||
|     "generate_password", | ||||
| ] | ||||
|  | ||||
|  | ||||
| def main() -> None: | ||||
|   | ||||
							
								
								
									
										48
									
								
								src/account_checks.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										48
									
								
								src/account_checks.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,48 @@ | ||||
| import string | ||||
|  | ||||
| FORBIDDEN_CHARS = frozenset("`\n") | ||||
| PUNCTUATION = frozenset(string.punctuation).difference(FORBIDDEN_CHARS) | ||||
|  | ||||
|  | ||||
| def _base_check(val: str, /) -> bool: | ||||
|     "Returns false if finds new lines or backtick (`)" | ||||
|     return not any(i in FORBIDDEN_CHARS for i in val) | ||||
|  | ||||
|  | ||||
| def check_account_name(name: str) -> bool: | ||||
|     "Returns true if account name is valid" | ||||
|     return _base_check(name) | ||||
|  | ||||
|  | ||||
| def check_login(login: str) -> bool: | ||||
|     "Returns true if login is valid" | ||||
|     return _base_check(login) | ||||
|  | ||||
|  | ||||
| def check_password(passwd: str) -> bool: | ||||
|     "Returns true if password is valid" | ||||
|     return _base_check(passwd) | ||||
|  | ||||
|  | ||||
| def check_account(name: str, login: str, passwd: str) -> bool: | ||||
|     """Runs checks for account name, login and password""" | ||||
|     return all( | ||||
|         ( | ||||
|             check_account_name(name), | ||||
|             check_login(login), | ||||
|             check_password(passwd), | ||||
|         ) | ||||
|     ) | ||||
|  | ||||
|  | ||||
| def check_gened_password(passwd: str, /) -> bool: | ||||
|     """Retuns true if generated password is valid, | ||||
|     false otherwise. | ||||
|     Password is valid if there is at least one lowercase character, | ||||
|     uppercase character and one punctuation character""" | ||||
|     return ( | ||||
|         any(c.islower() for c in passwd) | ||||
|         and any(c.isupper() for c in passwd) | ||||
|         and any(c.isdigit() for c in passwd) | ||||
|         and any(c in PUNCTUATION for c in passwd) | ||||
|     ) | ||||
							
								
								
									
										37
									
								
								src/account_parsing.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										37
									
								
								src/account_parsing.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,37 @@ | ||||
| import io | ||||
| from typing import Iterator, Self, Type | ||||
|  | ||||
| import pydantic | ||||
|  | ||||
|  | ||||
| class _Account(pydantic.BaseModel): | ||||
|     name: str | ||||
|     login: str | ||||
|     password: str | ||||
|  | ||||
|     @classmethod | ||||
|     def from_tuple(cls: Type[Self], tuple_: tuple[str, str, str]) -> Self: | ||||
|         return cls(name=tuple_[0], login=tuple_[1], password=tuple_[2]) | ||||
|  | ||||
|     def as_tuple(self: Self) -> tuple[str, str, str]: | ||||
|         return (self.name, self.login, self.password) | ||||
|  | ||||
|  | ||||
| class _Accounts(pydantic.BaseModel): | ||||
|     accounts: list[_Account] = pydantic.Field(default_factory=list) | ||||
|  | ||||
|  | ||||
| def _accounts_list_to_json(accounts: Iterator[tuple[str, str, str]]) -> str: | ||||
|     accounts = _Accounts(accounts=[_Account.from_tuple(i) for i in accounts]) | ||||
|     return accounts.json(ensure_ascii=False) | ||||
|  | ||||
|  | ||||
| def json_to_accounts(json_: str) -> list[tuple[str, str, str]]: | ||||
|     accounts = _Accounts.parse_raw(json_) | ||||
|     return [i.as_tuple() for i in accounts.accounts] | ||||
|  | ||||
|  | ||||
| def accounts_to_json(accounts: Iterator[tuple[str, str, str]]) -> io.StringIO: | ||||
|     file = io.StringIO(_accounts_list_to_json(accounts)) | ||||
|     file.name = "passwords.json" | ||||
|     return file | ||||
| @@ -1,11 +1,11 @@ | ||||
| import functools | ||||
|  | ||||
| from sqlalchemy.future import Engine | ||||
| import telebot | ||||
| from sqlalchemy.future import Engine | ||||
|  | ||||
| from . import handlers, utils | ||||
| from . import handlers | ||||
|  | ||||
| __all__ = ["handlers", "utils"] | ||||
| __all__ = ["handlers"] | ||||
|  | ||||
|  | ||||
| def create_bot(token: str, engine: Engine) -> telebot.TeleBot: | ||||
| @@ -15,16 +15,20 @@ def create_bot(token: str, engine: Engine) -> telebot.TeleBot: | ||||
|         commands=["set_master_pass"], | ||||
|     ) | ||||
|     bot.register_message_handler( | ||||
|         functools.partial(handlers.get_account, bot, engine), commands=["get_account"] | ||||
|         functools.partial(handlers.get_account, bot, engine), | ||||
|         commands=["get_account"], | ||||
|     ) | ||||
|     bot.register_message_handler( | ||||
|         functools.partial(handlers.get_accounts, bot, engine), commands=["get_accounts"] | ||||
|         functools.partial(handlers.get_accounts, bot, engine), | ||||
|         commands=["get_accounts"], | ||||
|     ) | ||||
|     bot.register_message_handler( | ||||
|         functools.partial(handlers.add_account, bot, engine), commands=["add_account"] | ||||
|         functools.partial(handlers.add_account, bot, engine), | ||||
|         commands=["add_account"], | ||||
|     ) | ||||
|     bot.register_message_handler( | ||||
|         functools.partial(handlers.delete_all, bot, engine), commands=["delete_all"] | ||||
|         functools.partial(handlers.delete_all, bot, engine), | ||||
|         commands=["delete_all"], | ||||
|     ) | ||||
|     bot.register_message_handler( | ||||
|         functools.partial(handlers.reset_master_pass, bot, engine), | ||||
| @@ -35,7 +39,8 @@ def create_bot(token: str, engine: Engine) -> telebot.TeleBot: | ||||
|         commands=["delete_account"], | ||||
|     ) | ||||
|     bot.register_message_handler( | ||||
|         functools.partial(handlers.help, bot), commands=["help", "start"] | ||||
|         functools.partial(handlers.help_command, bot), | ||||
|         commands=["help", "start"], | ||||
|     ) | ||||
|     bot.register_message_handler( | ||||
|         functools.partial(handlers.cancel, bot), commands=["cancel"] | ||||
| @@ -44,9 +49,11 @@ def create_bot(token: str, engine: Engine) -> telebot.TeleBot: | ||||
|         functools.partial(handlers.export, bot, engine), commands=["export"] | ||||
|     ) | ||||
|     bot.register_message_handler( | ||||
|         functools.partial(handlers.import_accounts, bot, engine), commands=["import"] | ||||
|         functools.partial(handlers.import_accounts, bot, engine), | ||||
|         commands=["import"], | ||||
|     ) | ||||
|     bot.register_message_handler( | ||||
|         functools.partial(handlers.gen_password, bot), commands=["gen_password"] | ||||
|         functools.partial(handlers.gen_password, bot), | ||||
|         commands=["gen_password"], | ||||
|     ) | ||||
|     return bot | ||||
|   | ||||
| @@ -5,32 +5,46 @@ import time | ||||
| import telebot | ||||
| from sqlalchemy.future import Engine | ||||
|  | ||||
| from .. import cryptography, database | ||||
| from .utils import ( | ||||
|     accounts_to_json, | ||||
|     base_handler, | ||||
| from .. import encryption, database, generate_password | ||||
| from ..account_checks import ( | ||||
|     check_account, | ||||
|     check_account_name, | ||||
|     check_login, | ||||
|     check_passwd, | ||||
|     gen_passwd, | ||||
|     get_all_accounts, | ||||
|     json_to_accounts, | ||||
|     send_tmp_message, | ||||
|     check_password, | ||||
| ) | ||||
| from ..account_parsing import accounts_to_json, json_to_accounts | ||||
|  | ||||
| Message = telebot.types.Message | ||||
|  | ||||
|  | ||||
| def _send_tmp_message( | ||||
|     bot: telebot.TeleBot, | ||||
|     chat_id: telebot.types.Message, | ||||
|     text: str, | ||||
|     timeout: int = 5, | ||||
| ) -> None: | ||||
|     bot_mes = bot.send_message(chat_id, text, parse_mode="MarkdownV2") | ||||
|     time.sleep(timeout) | ||||
|     bot.delete_message(chat_id, bot_mes.id) | ||||
|  | ||||
|  | ||||
| def _base_handler( | ||||
|     bot: telebot.TeleBot, mes: Message, prev_mes: Message | None = None | ||||
| ) -> None: | ||||
|     bot.delete_message(mes.chat.id, mes.id) | ||||
|     if prev_mes is not None: | ||||
|         bot.delete_message(prev_mes.chat.id, prev_mes.id) | ||||
|  | ||||
|  | ||||
| def get_accounts(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None: | ||||
|     base_handler(bot, mes) | ||||
|     _base_handler(bot, mes) | ||||
|     accounts = database.get.get_accounts(engine, mes.from_user.id) | ||||
|     if not accounts: | ||||
|         return send_tmp_message(bot, mes.chat.id, "У вас нет аккаунтов") | ||||
|         return _send_tmp_message(bot, mes.chat.id, "У вас нет аккаунтов") | ||||
|  | ||||
|     # Make accounts copyable and escape special chars | ||||
|     accounts = [f"`{account}`" for account in accounts] | ||||
|     send_tmp_message( | ||||
|     _send_tmp_message( | ||||
|         bot, | ||||
|         mes.chat.id, | ||||
|         "Ваши аккаунты:\n" | ||||
| @@ -41,10 +55,12 @@ def get_accounts(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None: | ||||
|  | ||||
|  | ||||
| def delete_all(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None: | ||||
|     base_handler(bot, mes) | ||||
|     _base_handler(bot, mes) | ||||
|     bot_mes = bot.send_message( | ||||
|         mes.chat.id, | ||||
|         "Вы действительно хотите удалить все ваши аккаунты? Это действие нельзя отменить. Отправьте YES для подтверждения", | ||||
|         "Вы действительно хотите удалить все ваши аккаунты? Это действие " | ||||
|         "нельзя отменить. " | ||||
|         "Отправьте YES для подтверждения", | ||||
|     ) | ||||
|     bot.register_next_step_handler( | ||||
|         mes, functools.partial(_delete_all, bot, engine, bot_mes) | ||||
| @@ -54,20 +70,32 @@ def delete_all(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None: | ||||
| def _delete_all( | ||||
|     bot: telebot.TeleBot, engine: Engine, prev_mes: Message, mes: Message | ||||
| ) -> None: | ||||
|     base_handler(bot, mes, prev_mes) | ||||
|     _base_handler(bot, mes, prev_mes) | ||||
|     text = mes.text.strip() | ||||
|     if text == "YES": | ||||
|         database.delete.purge_accounts(engine, mes.from_user.id) | ||||
|         database.delete.delete_master_pass(engine, mes.from_user.id) | ||||
|         send_tmp_message(bot, mes.chat.id, "Всё успешно удалено", timeout=10) | ||||
|         _send_tmp_message(bot, mes.chat.id, "Всё успешно удалено", timeout=10) | ||||
|     else: | ||||
|         send_tmp_message(bot, mes.chat.id, "Вы отправили не YES, ничего не удалено") | ||||
|         _send_tmp_message( | ||||
|             bot, | ||||
|             mes.chat.id, | ||||
|             "Вы отправили не YES, ничего не удалено", | ||||
|         ) | ||||
|  | ||||
|  | ||||
| def set_master_password(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None: | ||||
|     base_handler(bot, mes, None) | ||||
| def set_master_password( | ||||
|     bot: telebot.TeleBot, | ||||
|     engine: Engine, | ||||
|     mes: Message, | ||||
| ) -> None: | ||||
|     _base_handler(bot, mes, None) | ||||
|     if database.get.get_master_pass(engine, mes.from_user.id) is not None: | ||||
|         return send_tmp_message(bot, mes.chat.id, "Мастер пароль уже существует") | ||||
|         return _send_tmp_message( | ||||
|             bot, | ||||
|             mes.chat.id, | ||||
|             "Мастер пароль уже существует", | ||||
|         ) | ||||
|     bot_mes = bot.send_message(mes.chat.id, "Отправьте мастер пароль") | ||||
|     bot.register_next_step_handler( | ||||
|         mes, functools.partial(_set_master_pass2, bot, engine, bot_mes) | ||||
| @@ -77,25 +105,37 @@ def set_master_password(bot: telebot.TeleBot, engine: Engine, mes: Message) -> N | ||||
| def _set_master_pass2( | ||||
|     bot: telebot.TeleBot, engine: Engine, prev_mes: Message, mes: Message | ||||
| ) -> None: | ||||
|     base_handler(bot, mes, prev_mes) | ||||
|     _base_handler(bot, mes, prev_mes) | ||||
|     text = mes.text.strip() | ||||
|     if text == "/cancel": | ||||
|         return send_tmp_message(bot, mes.chat.id, "Успешная отмена") | ||||
|         return _send_tmp_message(bot, mes.chat.id, "Успешная отмена") | ||||
|  | ||||
|     hash_pass, master_salt = cryptography.master_pass.encrypt_master_pass(text) | ||||
|     database.add.add_master_pass(engine, mes.from_user.id, master_salt, hash_pass) | ||||
|     send_tmp_message(bot, mes.chat.id, "Успех") | ||||
|     password_hash, master_salt = encryption.master_pass.encrypt_master_pass( | ||||
|         text, | ||||
|     ) | ||||
|     database.add.add_master_pass( | ||||
|         engine, | ||||
|         mes.from_user.id, | ||||
|         master_salt, | ||||
|         password_hash, | ||||
|     ) | ||||
|     _send_tmp_message(bot, mes.chat.id, "Успех") | ||||
|     del mes, text | ||||
|     gc.collect() | ||||
|  | ||||
|  | ||||
| def reset_master_pass(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None: | ||||
|     base_handler(bot, mes) | ||||
| def reset_master_pass( | ||||
|     bot: telebot.TeleBot, | ||||
|     engine: Engine, | ||||
|     mes: Message, | ||||
| ) -> None: | ||||
|     _base_handler(bot, mes) | ||||
|     if database.get.get_master_pass(engine, mes.from_user.id) is None: | ||||
|         return send_tmp_message(bot, mes.chat.id, "Мастер пароль не задан") | ||||
|         return _send_tmp_message(bot, mes.chat.id, "Мастер пароль не задан") | ||||
|     bot_mes = bot.send_message( | ||||
|         mes.chat.id, | ||||
|         "Отправьте новый мастер пароль, осторожно, все текущие аккаунты будут удалены навсегда", | ||||
|         "Отправьте новый мастер пароль, осторожно, все текущие аккаунты " | ||||
|         "будут удалены навсегда", | ||||
|     ) | ||||
|     bot.register_next_step_handler( | ||||
|         mes, functools.partial(_reset_master_pass2, bot, engine, bot_mes) | ||||
| @@ -105,15 +145,15 @@ def reset_master_pass(bot: telebot.TeleBot, engine: Engine, mes: Message) -> Non | ||||
| def _reset_master_pass2( | ||||
|     bot: telebot.TeleBot, engine: Engine, prev_mes: Message, mes: Message | ||||
| ) -> None: | ||||
|     base_handler(bot, mes, prev_mes) | ||||
|     _base_handler(bot, mes, prev_mes) | ||||
|     text = mes.text.strip() | ||||
|     if text == "/cancel": | ||||
|         return send_tmp_message(bot, mes.chat.id, "Успешная отмена") | ||||
|         return _send_tmp_message(bot, mes.chat.id, "Успешная отмена") | ||||
|  | ||||
|     hash_, salt = cryptography.master_pass.encrypt_master_pass(text) | ||||
|     hash_, salt = encryption.master_pass.encrypt_master_pass(text) | ||||
|     database.delete.purge_accounts(engine, mes.from_user.id) | ||||
|     database.change.change_master_pass(engine, mes.from_user.id, salt, hash_) | ||||
|     send_tmp_message( | ||||
|     _send_tmp_message( | ||||
|         bot, mes.chat.id, "Все ваши аккаунты удалены, а мастер пароль изменён" | ||||
|     ) | ||||
|     del mes, text | ||||
| @@ -121,11 +161,14 @@ def _reset_master_pass2( | ||||
|  | ||||
|  | ||||
| def add_account(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None: | ||||
|     base_handler(bot, mes) | ||||
|     _base_handler(bot, mes) | ||||
|  | ||||
|     master_password_from_db = database.get.get_master_pass(engine, mes.from_user.id) | ||||
|     master_password_from_db = database.get.get_master_pass( | ||||
|         engine, | ||||
|         mes.from_user.id, | ||||
|     ) | ||||
|     if master_password_from_db is None: | ||||
|         return send_tmp_message(bot, mes.chat.id, "Нет мастер пароля") | ||||
|         return _send_tmp_message(bot, mes.chat.id, "Нет мастер пароля") | ||||
|  | ||||
|     bot_mes = bot.send_message(mes.chat.id, "Отправьте название аккаунта") | ||||
|  | ||||
| @@ -137,15 +180,19 @@ def add_account(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None: | ||||
| def _add_account2( | ||||
|     bot: telebot.TeleBot, engine: Engine, prev_mes: Message, mes: Message | ||||
| ) -> None: | ||||
|     base_handler(bot, mes, prev_mes) | ||||
|     _base_handler(bot, mes, prev_mes) | ||||
|     text = mes.text.strip() | ||||
|     if text == "/cancel": | ||||
|         return send_tmp_message(bot, mes.chat.id, "Успешная отмена") | ||||
|         return _send_tmp_message(bot, mes.chat.id, "Успешная отмена") | ||||
|  | ||||
|     if not check_account_name(text): | ||||
|         return send_tmp_message(bot, mes.chat.id, "Не корректное название аккаунта") | ||||
|         return _send_tmp_message( | ||||
|             bot, | ||||
|             mes.chat.id, | ||||
|             "Не корректное название аккаунта", | ||||
|         ) | ||||
|     if text in database.get.get_accounts(engine, mes.from_user.id): | ||||
|         return send_tmp_message( | ||||
|         return _send_tmp_message( | ||||
|             bot, mes.chat.id, "Аккаунт с таким именем уже существует" | ||||
|         ) | ||||
|  | ||||
| @@ -164,12 +211,12 @@ def _add_account3( | ||||
|     data: dict[str, str], | ||||
|     mes: Message, | ||||
| ) -> None: | ||||
|     base_handler(bot, mes, prev_mes) | ||||
|     _base_handler(bot, mes, prev_mes) | ||||
|     text = mes.text.strip() | ||||
|     if text == "/cancel": | ||||
|         return send_tmp_message(bot, mes.chat.id, "Успешная отмена") | ||||
|         return _send_tmp_message(bot, mes.chat.id, "Успешная отмена") | ||||
|     if not check_login(text): | ||||
|         return send_tmp_message(bot, mes.chat.id, "Не корректный логин") | ||||
|         return _send_tmp_message(bot, mes.chat.id, "Не корректный логин") | ||||
|  | ||||
|     data["login"] = text | ||||
|  | ||||
| @@ -187,12 +234,12 @@ def _add_account4( | ||||
|     data: dict[str, str], | ||||
|     mes: Message, | ||||
| ) -> None: | ||||
|     base_handler(bot, mes, prev_mes) | ||||
|     _base_handler(bot, mes, prev_mes) | ||||
|     text = mes.text.strip() | ||||
|     if text == "/cancel": | ||||
|         return send_tmp_message(bot, mes.chat.id, "Успешная отмена") | ||||
|     if not check_passwd(text): | ||||
|         return send_tmp_message(bot, mes.chat.id, "Не корректный пароль") | ||||
|         return _send_tmp_message(bot, mes.chat.id, "Успешная отмена") | ||||
|     if not check_password(text): | ||||
|         return _send_tmp_message(bot, mes.chat.id, "Не корректный пароль") | ||||
|  | ||||
|     data["passwd"] = text | ||||
|  | ||||
| @@ -210,27 +257,35 @@ def _add_account5( | ||||
|     data: dict[str, str], | ||||
|     mes: Message, | ||||
| ) -> None: | ||||
|     base_handler(bot, mes, prev_mes) | ||||
|     _base_handler(bot, mes, prev_mes) | ||||
|     text = mes.text.strip() | ||||
|     if text == "/cancel": | ||||
|         return send_tmp_message(bot, mes.chat.id, "Успешная отмена") | ||||
|         return _send_tmp_message(bot, mes.chat.id, "Успешная отмена") | ||||
|  | ||||
|     salt, hash_ = database.get.get_master_pass(engine, mes.from_user.id) | ||||
|     if not cryptography.master_pass.check_master_pass(text, hash_, salt): | ||||
|         return send_tmp_message(bot, mes.chat.id, "Не подходит главный пароль") | ||||
|     if not encryption.master_pass.check_master_pass(text, hash_, salt): | ||||
|         return _send_tmp_message( | ||||
|             bot, | ||||
|             mes.chat.id, | ||||
|             "Не подходит главный пароль", | ||||
|         ) | ||||
|  | ||||
|     name, login, passwd = data["name"], data["login"], data["passwd"] | ||||
|  | ||||
|     enc_login, enc_pass, salt = cryptography.other_accounts.encrypt_account_info( | ||||
|         login, passwd, text | ||||
|     enc_login, enc_pass, salt = encryption.other_accounts.encrypt( | ||||
|         login, | ||||
|         passwd, | ||||
|         text, | ||||
|     ) | ||||
|  | ||||
|     result = database.add.add_account( | ||||
|         engine, mes.from_user.id, name, salt, enc_login, enc_pass | ||||
|     ) | ||||
|  | ||||
|     send_tmp_message( | ||||
|         bot, mes.chat.id, "Успех" if result else "Произошла не предвиденная ошибка" | ||||
|     _send_tmp_message( | ||||
|         bot, | ||||
|         mes.chat.id, | ||||
|         "Успех" if result else "Произошла не предвиденная ошибка", | ||||
|     ) | ||||
|  | ||||
|     del data, name, login, passwd, enc_login | ||||
| @@ -239,12 +294,12 @@ def _add_account5( | ||||
|  | ||||
|  | ||||
| def get_account(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None: | ||||
|     base_handler(bot, mes) | ||||
|     _base_handler(bot, mes) | ||||
|     bot_mes = bot.send_message(mes.chat.id, "Отправьте название аккаунта") | ||||
|  | ||||
|     master_pass = database.get.get_master_pass(engine, mes.from_user.id) | ||||
|     if master_pass is None: | ||||
|         return send_tmp_message(bot, mes.chat.id, "Нет мастер пароля") | ||||
|         return _send_tmp_message(bot, mes.chat.id, "Нет мастер пароля") | ||||
|  | ||||
|     bot.register_next_step_handler( | ||||
|         mes, functools.partial(_get_account2, bot, engine, bot_mes) | ||||
| @@ -254,13 +309,13 @@ def get_account(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None: | ||||
| def _get_account2( | ||||
|     bot: telebot.TeleBot, engine: Engine, prev_mes: Message, mes: Message | ||||
| ) -> None: | ||||
|     base_handler(bot, mes, prev_mes) | ||||
|     _base_handler(bot, mes, prev_mes) | ||||
|     text = mes.text.strip() | ||||
|     if text == "/cancel": | ||||
|         return send_tmp_message(bot, mes.chat.id, "Успешная отмена") | ||||
|         return _send_tmp_message(bot, mes.chat.id, "Успешная отмена") | ||||
|  | ||||
|     if text not in database.get.get_accounts(engine, mes.from_user.id): | ||||
|         return send_tmp_message(bot, mes.chat.id, "Нет такого аккаунта") | ||||
|         return _send_tmp_message(bot, mes.chat.id, "Нет такого аккаунта") | ||||
|  | ||||
|     bot_mes = bot.send_message(mes.chat.id, "Отправьте мастер пароль") | ||||
|     bot.register_next_step_handler( | ||||
| @@ -269,28 +324,43 @@ def _get_account2( | ||||
|  | ||||
|  | ||||
| def _get_account3( | ||||
|     bot: telebot.TeleBot, engine: Engine, prev_mes: Message, name: str, mes: Message | ||||
|     bot: telebot.TeleBot, | ||||
|     engine: Engine, | ||||
|     prev_mes: Message, | ||||
|     name: str, | ||||
|     mes: Message, | ||||
| ) -> None: | ||||
|     base_handler(bot, mes, prev_mes) | ||||
|     _base_handler(bot, mes, prev_mes) | ||||
|     text = mes.text.strip() | ||||
|     if text == "/cancel": | ||||
|         return send_tmp_message(bot, mes.chat.id, "Успешная отмена") | ||||
|         return _send_tmp_message(bot, mes.chat.id, "Успешная отмена") | ||||
|  | ||||
|     master_salt, hash_pass = database.get.get_master_pass(engine, mes.from_user.id) | ||||
|     master_salt, hash_pass = database.get.get_master_pass( | ||||
|         engine, | ||||
|         mes.from_user.id, | ||||
|     ) | ||||
|  | ||||
|     if not cryptography.master_pass.check_master_pass(text, hash_pass, master_salt): | ||||
|         return send_tmp_message(bot, mes.chat.id, "Не подходит мастер пароль") | ||||
|     if not encryption.master_pass.check_master_pass( | ||||
|         text, | ||||
|         hash_pass, | ||||
|         master_salt, | ||||
|     ): | ||||
|         return _send_tmp_message(bot, mes.chat.id, "Не подходит мастер пароль") | ||||
|  | ||||
|     salt, enc_login, enc_pass = database.get.get_account_info( | ||||
|         engine, mes.from_user.id, name | ||||
|     ) | ||||
|     login, passwd = cryptography.other_accounts.decrypt_account_info( | ||||
|         enc_login, enc_pass, text, salt | ||||
|     login, passwd = encryption.other_accounts.decrypt( | ||||
|         enc_login, | ||||
|         enc_pass, | ||||
|         text, | ||||
|         salt, | ||||
|     ) | ||||
|     send_tmp_message( | ||||
|     _send_tmp_message( | ||||
|         bot, | ||||
|         mes.chat.id, | ||||
|         f"Логин:\n`{login}`\nПароль:\n`{passwd}`\nНажмите на логин или пароль, чтобы скопировать", | ||||
|         f"Логин:\n`{login}`\nПароль:\n`{passwd}`\nНажмите на логин " | ||||
|         "или пароль, чтобы скопировать", | ||||
|         30, | ||||
|     ) | ||||
|  | ||||
| @@ -299,11 +369,11 @@ def _get_account3( | ||||
|  | ||||
|  | ||||
| def delete_account(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None: | ||||
|     base_handler(bot, mes) | ||||
|     _base_handler(bot, mes) | ||||
|  | ||||
|     master_pass = database.get.get_master_pass(engine, mes.from_user.id) | ||||
|     if master_pass is None: | ||||
|         return send_tmp_message(bot, mes.chat.id, "Нет мастер пароля") | ||||
|         return _send_tmp_message(bot, mes.chat.id, "Нет мастер пароля") | ||||
|  | ||||
|     bot_mes = bot.send_message( | ||||
|         mes.chat.id, "Отправьте название аккаунта, который вы хотите удалить" | ||||
| @@ -317,19 +387,19 @@ def delete_account(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None: | ||||
| def _delete_account2( | ||||
|     bot: telebot.TeleBot, engine: Engine, prev_mes: Message, mes: Message | ||||
| ) -> None: | ||||
|     base_handler(bot, mes, prev_mes) | ||||
|     _base_handler(bot, mes, prev_mes) | ||||
|     text = mes.text.strip() | ||||
|     if text == "/cancel": | ||||
|         return send_tmp_message(bot, mes.chat.id, "Успешная отмена") | ||||
|         return _send_tmp_message(bot, mes.chat.id, "Успешная отмена") | ||||
|  | ||||
|     if text not in database.get.get_accounts(engine, mes.from_user.id): | ||||
|         return send_tmp_message(bot, mes.chat.id, "Нет такого аккаунта") | ||||
|         return _send_tmp_message(bot, mes.chat.id, "Нет такого аккаунта") | ||||
|  | ||||
|     database.delete.delete_account(engine, mes.from_user.id, text) | ||||
|     send_tmp_message(bot, mes.chat.id, "Аккаунт удалён") | ||||
|     _send_tmp_message(bot, mes.chat.id, "Аккаунт удалён") | ||||
|  | ||||
|  | ||||
| def help(bot: telebot.TeleBot, mes: Message) -> None: | ||||
| def help_command(bot: telebot.TeleBot, mes: Message) -> None: | ||||
|     message = """Команды: | ||||
| /set_master_pass - установить мастер пароль | ||||
| /add_account - создать аккаунт | ||||
| @@ -341,24 +411,29 @@ def help(bot: telebot.TeleBot, mes: Message) -> None: | ||||
| /cancel - отмена текущего действия | ||||
| /help - помощь | ||||
| /export - получить пароли в json формате | ||||
| /import - импортировать пароли из json в файле в таком же формате, как из /export | ||||
| /import - импортировать пароли из json в файле в таком же формате, \ | ||||
| как из /export | ||||
| /gen_password - создать 10 надёжных паролей""" | ||||
|     bot.send_message(mes.chat.id, message) | ||||
|  | ||||
|  | ||||
| def cancel(bot: telebot.TeleBot, mes: Message) -> None: | ||||
|     send_tmp_message(bot, mes.chat.id, "Нет активного действия") | ||||
|     _base_handler(bot, mes) | ||||
|     _send_tmp_message(bot, mes.chat.id, "Нет активного действия") | ||||
|  | ||||
|  | ||||
| def export(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None: | ||||
|     base_handler(bot, mes) | ||||
|     master_password_from_db = database.get.get_master_pass(engine, mes.from_user.id) | ||||
|     _base_handler(bot, mes) | ||||
|     master_password_from_db = database.get.get_master_pass( | ||||
|         engine, | ||||
|         mes.from_user.id, | ||||
|     ) | ||||
|  | ||||
|     if master_password_from_db is None: | ||||
|         return send_tmp_message(bot, mes.chat.id, "Нет мастер пароля") | ||||
|         return _send_tmp_message(bot, mes.chat.id, "Нет мастер пароля") | ||||
|  | ||||
|     if not database.get.get_accounts(engine, mes.from_user.id): | ||||
|         return send_tmp_message(bot, mes.chat.id, "Нет аккаунтов") | ||||
|         return _send_tmp_message(bot, mes.chat.id, "Нет аккаунтов") | ||||
|  | ||||
|     bot_mes = bot.send_message(mes.chat.id, "Отправьте мастер пароль") | ||||
|  | ||||
| @@ -370,16 +445,24 @@ def export(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None: | ||||
| def _export2( | ||||
|     bot: telebot.TeleBot, engine: Engine, prev_mes: Message, mes: Message | ||||
| ) -> None: | ||||
|     base_handler(bot, mes, prev_mes) | ||||
|     _base_handler(bot, mes, prev_mes) | ||||
|     text = mes.text.strip() | ||||
|     if text == "/cancel": | ||||
|         return send_tmp_message(bot, mes.chat.id, "Успешная отмена") | ||||
|         return _send_tmp_message(bot, mes.chat.id, "Успешная отмена") | ||||
|  | ||||
|     master_salt, hash_pass = database.get.get_master_pass(engine, mes.from_user.id) | ||||
|     if not cryptography.master_pass.check_master_pass(text, hash_pass, master_salt): | ||||
|         return send_tmp_message(bot, mes.chat.id, "Не подходит мастер пароль") | ||||
|     master_salt, hash_pass = database.get.get_master_pass( | ||||
|         engine, | ||||
|         mes.from_user.id, | ||||
|     ) | ||||
|     if not encryption.master_pass.check_master_pass( | ||||
|         text, | ||||
|         hash_pass, | ||||
|         master_salt, | ||||
|     ): | ||||
|         return _send_tmp_message(bot, mes.chat.id, "Не подходит мастер пароль") | ||||
|  | ||||
|     accounts = get_all_accounts(engine, mes.from_user.id, text) | ||||
|     accounts = database.get.get_all_accounts(engine, mes.from_user.id) | ||||
|     accounts = encryption.other_accounts.decrypt_multiple(accounts, text) | ||||
|     json_io = accounts_to_json(accounts) | ||||
|     bot_mes = bot.send_document(mes.chat.id, json_io) | ||||
|  | ||||
| @@ -389,12 +472,19 @@ def _export2( | ||||
|     bot.delete_message(bot_mes.chat.id, bot_mes.id) | ||||
|  | ||||
|  | ||||
| def import_accounts(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None: | ||||
|     base_handler(bot, mes) | ||||
|     master_password_from_db = database.get.get_master_pass(engine, mes.from_user.id) | ||||
| def import_accounts( | ||||
|     bot: telebot.TeleBot, | ||||
|     engine: Engine, | ||||
|     mes: Message, | ||||
| ) -> None: | ||||
|     _base_handler(bot, mes) | ||||
|     master_password_from_db = database.get.get_master_pass( | ||||
|         engine, | ||||
|         mes.from_user.id, | ||||
|     ) | ||||
|  | ||||
|     if master_password_from_db is None: | ||||
|         return send_tmp_message(bot, mes.chat.id, "Нет мастер пароля") | ||||
|         return _send_tmp_message(bot, mes.chat.id, "Нет мастер пароля") | ||||
|  | ||||
|     bot_mes = bot.send_message(mes.chat.id, "Отправьте json файл") | ||||
|  | ||||
| @@ -406,23 +496,31 @@ def import_accounts(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None: | ||||
| def _import2( | ||||
|     bot: telebot.TeleBot, engine: Engine, prev_mes: Message, mes: Message | ||||
| ) -> None: | ||||
|     base_handler(bot, mes, prev_mes) | ||||
|     _base_handler(bot, mes, prev_mes) | ||||
|     if mes.text is not None: | ||||
|         text = mes.text.strip() | ||||
|         if text == "/cancel": | ||||
|             return send_tmp_message(bot, mes.chat.id, "Успешная отмена") | ||||
|             return _send_tmp_message(bot, mes.chat.id, "Успешная отмена") | ||||
|  | ||||
|     if mes.document is None: | ||||
|         return send_tmp_message(bot, mes.chat.id, "Вы должны отправить документ") | ||||
|         return _send_tmp_message( | ||||
|             bot, | ||||
|             mes.chat.id, | ||||
|             "Вы должны отправить документ", | ||||
|         ) | ||||
|     if mes.document.file_size > 102_400:  # If file size is bigger that 100 MB | ||||
|         return send_tmp_message(bot, mes.chat.id, "Файл слишком большой") | ||||
|         return _send_tmp_message(bot, mes.chat.id, "Файл слишком большой") | ||||
|  | ||||
|     file_info = bot.get_file(mes.document.file_id) | ||||
|     downloaded_file = bot.download_file(file_info.file_path) | ||||
|     try: | ||||
|         accounts = json_to_accounts(downloaded_file.decode("utf-8")) | ||||
|     except Exception: | ||||
|         return send_tmp_message(bot, mes.chat.id, "Ошибка во время работы с файлом") | ||||
|         return _send_tmp_message( | ||||
|             bot, | ||||
|             mes.chat.id, | ||||
|             "Ошибка во время работы с файлом", | ||||
|         ) | ||||
|  | ||||
|     bot_mes = bot.send_message(mes.chat.id, "Отправьте мастер пароль") | ||||
|     bot.register_next_step_handler( | ||||
| @@ -437,24 +535,34 @@ def _import3( | ||||
|     accounts: list[tuple[str, str, str]], | ||||
|     mes: Message, | ||||
| ) -> None: | ||||
|     base_handler(bot, mes, prev_mes) | ||||
|     _base_handler(bot, mes, prev_mes) | ||||
|     text = mes.text.strip() | ||||
|     if text == "/cancel": | ||||
|         return send_tmp_message(bot, mes.chat.id, "Успешная отмена") | ||||
|         return _send_tmp_message(bot, mes.chat.id, "Успешная отмена") | ||||
|  | ||||
|     master_salt, hash_pass = database.get.get_master_pass(engine, mes.from_user.id) | ||||
|     if not cryptography.master_pass.check_master_pass(text, hash_pass, master_salt): | ||||
|         return send_tmp_message(bot, mes.chat.id, "Не подходит мастер пароль") | ||||
|     master_salt, hash_pass = database.get.get_master_pass( | ||||
|         engine, | ||||
|         mes.from_user.id, | ||||
|     ) | ||||
|     if not encryption.master_pass.check_master_pass( | ||||
|         text, | ||||
|         hash_pass, | ||||
|         master_salt, | ||||
|     ): | ||||
|         return _send_tmp_message(bot, mes.chat.id, "Не подходит мастер пароль") | ||||
|  | ||||
|     # List of names of accounts, which failed to be added to the database or failed tests | ||||
|     # List of names of accounts, which failed to be added to the database | ||||
|     # or failed the tests | ||||
|     failed: list[str] = [] | ||||
|     for account in accounts: | ||||
|         name, login, passwd = account | ||||
|         if not check_account(name, login, passwd): | ||||
|             failed.append(name) | ||||
|             continue | ||||
|         enc_login, enc_passwd, salt = cryptography.other_accounts.encrypt_account_info( | ||||
|             login, passwd, text | ||||
|         enc_login, enc_passwd, salt = encryption.other_accounts.encrypt( | ||||
|             login, | ||||
|             passwd, | ||||
|             text, | ||||
|         ) | ||||
|         result = database.add.add_account( | ||||
|             engine, mes.from_user.id, name, salt, enc_login, enc_passwd | ||||
| @@ -466,18 +574,18 @@ def _import3( | ||||
|         mes_text = "Не удалось добавить:\n" + "\n".join(failed) | ||||
|     else: | ||||
|         mes_text = "Успех" | ||||
|     send_tmp_message(bot, mes.chat.id, mes_text, 10) | ||||
|     _send_tmp_message(bot, mes.chat.id, mes_text, 10) | ||||
|     del text, mes, accounts | ||||
|     gc.collect() | ||||
|  | ||||
|  | ||||
| def gen_password(bot: telebot.TeleBot, mes: Message) -> None: | ||||
|     _base_handler(bot, mes) | ||||
|     # Generate 10 passwords and put 'em in the backticks | ||||
|     base_handler(bot, mes) | ||||
|     passwords = (f"`{gen_passwd()}`" for _ in range(10)) | ||||
|     passwords = (f"`{generate_password.gen_password()}`" for _ in range(10)) | ||||
|     text = ( | ||||
|         "Пароли:\n" | ||||
|         + "\n".join(passwords) | ||||
|         + "\nНажмите на пароль, чтобы его скопировать" | ||||
|     ) | ||||
|     send_tmp_message(bot, mes.chat.id, text, 15) | ||||
|     _send_tmp_message(bot, mes.chat.id, text, 15) | ||||
|   | ||||
							
								
								
									
										131
									
								
								src/bot/utils.py
									
									
									
									
									
								
							
							
						
						
									
										131
									
								
								src/bot/utils.py
									
									
									
									
									
								
							| @@ -1,131 +0,0 @@ | ||||
| import io | ||||
| import string | ||||
| import time | ||||
| from random import SystemRandom | ||||
| from typing import Self, Type | ||||
|  | ||||
| import pydantic | ||||
| import telebot | ||||
| from sqlalchemy.future import Engine | ||||
|  | ||||
| from .. import cryptography, database | ||||
|  | ||||
|  | ||||
| FORBIDDEN_CHARS = frozenset("`\n") | ||||
| PUNCTUATION = frozenset(string.punctuation).difference(FORBIDDEN_CHARS) | ||||
| PASSWORD_CHARS = tuple( | ||||
|     frozenset(string.ascii_letters + string.digits).difference(FORBIDDEN_CHARS) | ||||
|     | PUNCTUATION | ||||
| ) | ||||
| Message = telebot.types.Message | ||||
|  | ||||
|  | ||||
| class _Account(pydantic.BaseModel): | ||||
|     name: str | ||||
|     login: str | ||||
|     passwd: str | ||||
|  | ||||
|     @classmethod | ||||
|     def from_tuple(cls: Type[Self], tuple_: tuple[str, str, str]) -> Self: | ||||
|         return cls(name=tuple_[0], login=tuple_[1], passwd=tuple_[2]) | ||||
|  | ||||
|     def as_tuple(self: Self) -> tuple[str, str, str]: | ||||
|         return (self.name, self.login, self.passwd) | ||||
|  | ||||
|  | ||||
| class _Accounts(pydantic.BaseModel): | ||||
|     accounts: list[_Account] = pydantic.Field(default_factory=list) | ||||
|  | ||||
|  | ||||
| def _accounts_list_to_json(accounts: list[tuple[str, str, str]]) -> str: | ||||
|     accounts = _Accounts(accounts=[_Account.from_tuple(i) for i in accounts]) | ||||
|     return accounts.json() | ||||
|  | ||||
|  | ||||
| def json_to_accounts(json_: str) -> list[tuple[str, str, str]]: | ||||
|     accounts = _Accounts.parse_raw(json_) | ||||
|     return [i.as_tuple() for i in accounts.accounts] | ||||
|  | ||||
|  | ||||
| def send_tmp_message( | ||||
|     bot: telebot.TeleBot, chat_id: telebot.types.Message, text: str, timeout: int = 5 | ||||
| ) -> None: | ||||
|     bot_mes = bot.send_message(chat_id, text, parse_mode="MarkdownV2") | ||||
|     time.sleep(timeout) | ||||
|     bot.delete_message(chat_id, bot_mes.id) | ||||
|  | ||||
|  | ||||
| def base_handler( | ||||
|     bot: telebot.TeleBot, mes: Message, prev_mes: Message | None = None | ||||
| ) -> None: | ||||
|     bot.delete_message(mes.chat.id, mes.id) | ||||
|     if prev_mes is not None: | ||||
|         bot.delete_message(prev_mes.chat.id, prev_mes.id) | ||||
|  | ||||
|  | ||||
| def get_all_accounts( | ||||
|     engine: Engine, user_id: int, master_pass: str | ||||
| ) -> list[tuple[str, str, str]]: | ||||
|     accounts: list[tuple[str, str, str]] = [] | ||||
|     for account_name in database.get.get_accounts(engine, user_id): | ||||
|         salt, enc_login, enc_passwd = database.get.get_account_info( | ||||
|             engine, user_id, account_name | ||||
|         ) | ||||
|         login, passwd = cryptography.other_accounts.decrypt_account_info( | ||||
|             enc_login, enc_passwd, master_pass, salt | ||||
|         ) | ||||
|         accounts.append((account_name, login, passwd)) | ||||
|     return accounts | ||||
|  | ||||
|  | ||||
| def accounts_to_json(accounts: list[tuple[str, str, str]]) -> io.StringIO: | ||||
|     file = io.StringIO(_accounts_list_to_json(accounts)) | ||||
|     file.name = "passwords.json" | ||||
|     return file | ||||
|  | ||||
|  | ||||
| def _base_check(val: str, /) -> bool: | ||||
|     "Returns false if finds new lines or backtick (`)" | ||||
|     return not any(i in FORBIDDEN_CHARS for i in val) | ||||
|  | ||||
|  | ||||
| def check_account_name(name: str) -> bool: | ||||
|     "Returns true if account name is valid" | ||||
|     return _base_check(name) | ||||
|  | ||||
|  | ||||
| def check_login(login: str) -> bool: | ||||
|     "Returns true if login is valid" | ||||
|     return _base_check(login) | ||||
|  | ||||
|  | ||||
| def check_passwd(passwd: str) -> bool: | ||||
|     "Returns true if password is valid" | ||||
|     return _base_check(passwd) | ||||
|  | ||||
|  | ||||
| def check_account(name: str, login: str, passwd: str) -> bool: | ||||
|     """Runs checks for account name, login and password""" | ||||
|     return check_account_name(name) and check_login(login) and check_passwd(passwd) | ||||
|  | ||||
|  | ||||
| def _check_gened_password(passwd: str, /) -> bool: | ||||
|     """Retuns true if generated password is valid, | ||||
|     false otherwise. | ||||
|     Password is valid if there is at least one lowercase character, | ||||
|     uppercase character and one punctuation character""" | ||||
|     return ( | ||||
|         any(c.islower() for c in passwd) | ||||
|         and any(c.isupper() for c in passwd) | ||||
|         and any(c.isdigit() for c in passwd) | ||||
|         and any(c in PUNCTUATION for c in passwd) | ||||
|     ) | ||||
|  | ||||
|  | ||||
| def gen_passwd() -> str: | ||||
|     """Generates password of length 32""" | ||||
|     choices = SystemRandom().choices | ||||
|     while True: | ||||
|         passwd = "".join(choices(PASSWORD_CHARS, k=32)) | ||||
|         if _check_gened_password(passwd): | ||||
|             return passwd | ||||
| @@ -1,44 +0,0 @@ | ||||
| import base64 | ||||
| import os | ||||
|  | ||||
| from cryptography.fernet import Fernet | ||||
| from cryptography.hazmat.backends import default_backend | ||||
| from cryptography.hazmat.primitives import hashes | ||||
| from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC | ||||
|  | ||||
|  | ||||
| def _generate_key(salt: bytes, master_pass: bytes) -> bytes: | ||||
|     kdf = PBKDF2HMAC( | ||||
|         algorithm=hashes.SHA256(), | ||||
|         length=32, | ||||
|         salt=salt, | ||||
|         iterations=100000, | ||||
|         backend=default_backend(), | ||||
|     ) | ||||
|     key = base64.urlsafe_b64encode(kdf.derive(master_pass)) | ||||
|     return key | ||||
|  | ||||
|  | ||||
| def encrypt_account_info( | ||||
|     login: str, passwd: str, master_pass: str | ||||
| ) -> tuple[bytes, bytes, bytes]: | ||||
|     """Encrypts login and password of a user using their master password as a key. | ||||
|     Returns a tuple of encrypted login, password and salt""" | ||||
|     salt = os.urandom(64) | ||||
|     key = _generate_key(salt, master_pass.encode("utf-8")) | ||||
|     f = Fernet(key) | ||||
|     enc_login = f.encrypt(login.encode("utf-8")) | ||||
|     enc_passwd = f.encrypt(passwd.encode("utf-8")) | ||||
|     return (enc_login, enc_passwd, salt) | ||||
|  | ||||
|  | ||||
| def decrypt_account_info( | ||||
|     enc_login: bytes, enc_pass: bytes, master_pass: str, salt: bytes | ||||
| ) -> tuple[str, str]: | ||||
|     """Decrypts login and password using their master password as a key. | ||||
|     Returns a tuple of decrypted login and password""" | ||||
|     key = _generate_key(salt, master_pass.encode("utf-8")) | ||||
|     f = Fernet(key) | ||||
|     login_bytes = f.decrypt(enc_login) | ||||
|     pass_bytes = f.decrypt(enc_pass) | ||||
|     return (login_bytes.decode("utf-8"), pass_bytes.decode("utf-8")) | ||||
| @@ -11,11 +11,16 @@ def add_account( | ||||
|     name: str, | ||||
|     salt: bytes, | ||||
|     enc_login: bytes, | ||||
|     enc_pass: bytes, | ||||
|     enc_password: bytes, | ||||
| ) -> bool: | ||||
|     """Adds account to the database. Returns true on success, false otherwise""" | ||||
|     """Adds account to the database. Returns true on success, | ||||
|     false otherwise""" | ||||
|     account = models.Account( | ||||
|         user_id=user_id, name=name, salt=salt, enc_login=enc_login, enc_pass=enc_pass | ||||
|         user_id=user_id, | ||||
|         name=name, | ||||
|         salt=salt, | ||||
|         enc_login=enc_login, | ||||
|         enc_password=enc_password, | ||||
|     ) | ||||
|     try: | ||||
|         with sqlmodel.Session(engine) as session: | ||||
| @@ -27,9 +32,19 @@ def add_account( | ||||
|         return True | ||||
|  | ||||
|  | ||||
| def add_master_pass(engine: Engine, user_id: int, salt: bytes, passwd: bytes) -> bool: | ||||
|     """Adds master password the database. Returns true on success, false otherwise""" | ||||
|     master_pass = models.MasterPass(user_id=user_id, salt=salt, passwd=passwd) | ||||
| def add_master_pass( | ||||
|     engine: Engine, | ||||
|     user_id: int, | ||||
|     salt: bytes, | ||||
|     password_hash: bytes, | ||||
| ) -> bool: | ||||
|     """Adds master password the database. Returns true on success, | ||||
|     false otherwise""" | ||||
|     master_pass = models.MasterPass( | ||||
|         user_id=user_id, | ||||
|         salt=salt, | ||||
|         password_hash=password_hash, | ||||
|     ) | ||||
|     try: | ||||
|         with sqlmodel.Session(engine) as session: | ||||
|             session.add(master_pass) | ||||
|   | ||||
| @@ -5,13 +5,13 @@ from . import models | ||||
|  | ||||
|  | ||||
| def change_master_pass( | ||||
|     engine: Engine, user_id: int, salt: bytes, passwd: bytes | ||||
|     engine: Engine, user_id: int, salt: bytes, password: bytes | ||||
| ) -> None: | ||||
|     """Changes master password and salt in the database""" | ||||
|     statement = ( | ||||
|         sqlmodel.update(models.MasterPass) | ||||
|         .where(models.MasterPass.user_id == user_id) | ||||
|         .values(salt=salt, passwd=passwd) | ||||
|         .values(salt=salt, password_hash=password) | ||||
|     ) | ||||
|     with sqlmodel.Session(engine) as session: | ||||
|         session.exec(statement) | ||||
|   | ||||
| @@ -6,7 +6,9 @@ from . import models | ||||
|  | ||||
| def purge_accounts(engine: Engine, user_id: int) -> None: | ||||
|     """Deletes all user's accounts""" | ||||
|     statement = sqlmodel.delete(models.Account).where(models.Account.user_id == user_id) | ||||
|     statement = sqlmodel.delete(models.Account).where( | ||||
|         models.Account.user_id == user_id, | ||||
|     ) | ||||
|     with sqlmodel.Session(engine) as session: | ||||
|         session.exec(statement) | ||||
|         session.commit() | ||||
| @@ -25,7 +27,8 @@ def delete_master_pass(engine: Engine, user_id: int) -> None: | ||||
| def delete_account(engine: Engine, user_id: int, name: str) -> None: | ||||
|     """Deletes specific user account""" | ||||
|     statement = sqlmodel.delete(models.Account).where( | ||||
|         models.Account.user_id == user_id, models.Account.name == name | ||||
|         models.Account.user_id == user_id, | ||||
|         models.Account.name == name, | ||||
|     ) | ||||
|     with sqlmodel.Session(engine) as session: | ||||
|         session.exec(statement) | ||||
|   | ||||
| @@ -1,40 +1,77 @@ | ||||
| from typing import Iterator | ||||
|  | ||||
| import sqlmodel | ||||
| from sqlalchemy.future import Engine | ||||
|  | ||||
| from . import models | ||||
|  | ||||
|  | ||||
| def get_master_pass(engine: Engine, user_id: int) -> tuple[bytes, bytes] | None: | ||||
| def get_master_pass( | ||||
|     engine: Engine, | ||||
|     user_id: int, | ||||
| ) -> tuple[bytes, bytes] | None: | ||||
|     """Gets master pass. Returns tuple of salt and password | ||||
|     or None if it wasn't found""" | ||||
|     statement = sqlmodel.select(models.MasterPass).where( | ||||
|         models.MasterPass.user_id == user_id | ||||
|         models.MasterPass.user_id == user_id, | ||||
|     ) | ||||
|     with sqlmodel.Session(engine) as session: | ||||
|         result = session.exec(statement).first() | ||||
|     if result is None: | ||||
|         return | ||||
|     return (result.salt, result.passwd) | ||||
|     return (result.salt, result.password_hash) | ||||
|  | ||||
|  | ||||
| def get_accounts(engine: Engine, user_id: int) -> list[str]: | ||||
|     """Gets list of account names""" | ||||
|     statement = sqlmodel.select(models.Account).where(models.Account.user_id == user_id) | ||||
|     statement = ( | ||||
|         sqlmodel.select(models.Account) | ||||
|         .where( | ||||
|             models.Account.user_id == user_id, | ||||
|         ) | ||||
|         .order_by(models.Account.name) | ||||
|     ) | ||||
|     with sqlmodel.Session(engine) as session: | ||||
|         result = session.exec(statement) | ||||
|         return [account.name for account in result] | ||||
|  | ||||
|  | ||||
| def get_all_accounts( | ||||
|     engine: Engine, user_id: int | ||||
| ) -> Iterator[tuple[str, bytes, bytes, bytes]]: | ||||
|     """Returns an iterator of tuples, where values represent account's | ||||
|     name, salt, encrypted login and encrypted password""" | ||||
|     statement = ( | ||||
|         sqlmodel.select(models.Account) | ||||
|         .where( | ||||
|             models.Account.user_id == user_id, | ||||
|         ) | ||||
|         .order_by(models.Account.name) | ||||
|     ) | ||||
|     with sqlmodel.Session(engine) as session: | ||||
|         result = session.exec(statement) | ||||
|         yield from ( | ||||
|             ( | ||||
|                 account.name, | ||||
|                 account.salt, | ||||
|                 account.enc_login, | ||||
|                 account.enc_password, | ||||
|             ) | ||||
|             for account in result | ||||
|         ) | ||||
|  | ||||
|  | ||||
| def get_account_info( | ||||
|     engine: Engine, user_id: int, name: str | ||||
| ) -> tuple[bytes, bytes, bytes]: | ||||
|     """Gets account info. Returns tuple of salt, login and password | ||||
|     or None if it wasn't found""" | ||||
|     statement = sqlmodel.select(models.Account).where( | ||||
|         models.Account.user_id == user_id, models.Account.name == name | ||||
|         models.Account.user_id == user_id, | ||||
|         models.Account.name == name, | ||||
|     ) | ||||
|     with sqlmodel.Session(engine) as session: | ||||
|         result = session.exec(statement).first() | ||||
|         if result is None: | ||||
|             return | ||||
|         return (result.salt, result.enc_login, result.enc_pass) | ||||
|         return (result.salt, result.enc_login, result.enc_password) | ||||
|   | ||||
| @@ -10,7 +10,7 @@ class MasterPass(sqlmodel.SQLModel, table=True): | ||||
|     salt: bytes = sqlmodel.Field( | ||||
|         sa_column=sqlmodel.Column(sqlmodel.BINARY(64), nullable=False) | ||||
|     ) | ||||
|     passwd: bytes = sqlmodel.Field( | ||||
|     password_hash: bytes = sqlmodel.Field( | ||||
|         sa_column=sqlmodel.Column(sqlmodel.BINARY(128), nullable=False) | ||||
|     ) | ||||
|  | ||||
| @@ -25,8 +25,8 @@ class Account(sqlmodel.SQLModel, table=True): | ||||
|         sa_column=sqlmodel.Column(sqlmodel.BINARY(64), nullable=False) | ||||
|     ) | ||||
|     enc_login: bytes = sqlmodel.Field( | ||||
|         sa_column=sqlmodel.Column(sqlmodel.VARBINARY(500), nullable=False) | ||||
|         sa_column=sqlmodel.Column(sqlmodel.VARBINARY(256), nullable=False) | ||||
|     ) | ||||
|     enc_pass: bytes = sqlmodel.Field( | ||||
|         sa_column=sqlmodel.Column(sqlmodel.VARBINARY(500), nullable=False) | ||||
|     enc_password: bytes = sqlmodel.Field( | ||||
|         sa_column=sqlmodel.Column(sqlmodel.VARBINARY(256), nullable=False) | ||||
|     ) | ||||
|   | ||||
| @@ -1,12 +1,15 @@ | ||||
| import sqlmodel | ||||
| from sqlalchemy.future import Engine | ||||
|  | ||||
| from . import models | ||||
| from . import models  # noqa | ||||
|  | ||||
| HOUR_IN_SECONDS = 3600 | ||||
|  | ||||
|  | ||||
| def get_engine(host: str, user: str, passwd: str, db: str) -> Engine: | ||||
|     """Creates an engine for mariadb with pymysql as connector""" | ||||
|     engine = sqlmodel.create_engine(f"mariadb+pymysql://{user}:{passwd}@{host}/{db}") | ||||
|     uri = f"mariadb+pymysql://{user}:{passwd}@{host}/{db}" | ||||
|     engine = sqlmodel.create_engine(uri, pool_recycle=HOUR_IN_SECONDS) | ||||
|     return engine | ||||
|  | ||||
|  | ||||
|   | ||||
| @@ -17,18 +17,22 @@ def _get_kdf(salt: bytes) -> Scrypt: | ||||
|     return kdf | ||||
| 
 | ||||
| 
 | ||||
| def encrypt_master_pass(passwd: str) -> tuple[bytes, bytes]: | ||||
| def encrypt_master_pass(password: str) -> tuple[bytes, bytes]: | ||||
|     """Hashes master password and return tuple of hashed password and salt""" | ||||
|     salt = os.urandom(64) | ||||
|     kdf = _get_kdf(salt) | ||||
|     return kdf.derive(passwd.encode("utf-8")), salt | ||||
|     return kdf.derive(password.encode("utf-8")), salt | ||||
| 
 | ||||
| 
 | ||||
| def check_master_pass(passwd: str, enc_pass: bytes, salt: bytes) -> bool: | ||||
| def check_master_pass( | ||||
|     password: str, | ||||
|     enc_password: bytes, | ||||
|     salt: bytes, | ||||
| ) -> bool: | ||||
|     """Checks if the master password is correct""" | ||||
|     kdf = _get_kdf(salt) | ||||
|     try: | ||||
|         kdf.verify(passwd.encode("utf-8"), enc_pass) | ||||
|         kdf.verify(password.encode("utf-8"), enc_password) | ||||
|     except InvalidKey: | ||||
|         return False | ||||
|     else: | ||||
							
								
								
									
										69
									
								
								src/encryption/other_accounts.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										69
									
								
								src/encryption/other_accounts.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,69 @@ | ||||
| import base64 | ||||
| import os | ||||
| from typing import Iterator | ||||
|  | ||||
| from cryptography.fernet import Fernet | ||||
| from cryptography.hazmat.backends import default_backend | ||||
| from cryptography.hazmat.primitives import hashes | ||||
| from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC | ||||
|  | ||||
|  | ||||
| def _generate_key(salt: bytes, master_pass: bytes) -> bytes: | ||||
|     kdf = PBKDF2HMAC( | ||||
|         algorithm=hashes.SHA256(), | ||||
|         length=32, | ||||
|         salt=salt, | ||||
|         iterations=100000, | ||||
|         backend=default_backend(), | ||||
|     ) | ||||
|     key = base64.urlsafe_b64encode(kdf.derive(master_pass)) | ||||
|     return key | ||||
|  | ||||
|  | ||||
| def encrypt( | ||||
|     login: str, | ||||
|     passwd: str, | ||||
|     master_pass: str, | ||||
| ) -> tuple[bytes, bytes, bytes]: | ||||
|     """Encrypts login and password of a user using their master | ||||
|     password as a key. | ||||
|     Returns a tuple of encrypted login, password and salt""" | ||||
|     salt = os.urandom(64) | ||||
|     key = _generate_key(salt, master_pass.encode("utf-8")) | ||||
|     f = Fernet(key) | ||||
|     enc_login = base64.urlsafe_b64decode(f.encrypt(login.encode("utf-8"))) | ||||
|     enc_password = base64.urlsafe_b64decode(f.encrypt(passwd.encode("utf-8"))) | ||||
|     return (enc_login, enc_password, salt) | ||||
|  | ||||
|  | ||||
| def decrypt( | ||||
|     enc_login: bytes, | ||||
|     enc_pass: bytes, | ||||
|     master_pass: str, | ||||
|     salt: bytes, | ||||
| ) -> tuple[str, str]: | ||||
|     """Decrypts login and password using their | ||||
|     master password as a key. | ||||
|     Returns a tuple of decrypted login and password""" | ||||
|     key = _generate_key(salt, master_pass.encode("utf-8")) | ||||
|     f = Fernet(key) | ||||
|     login = f.decrypt(base64.urlsafe_b64encode(enc_login)).decode("utf-8") | ||||
|     password = f.decrypt(base64.urlsafe_b64encode(enc_pass)).decode("utf-8") | ||||
|     return (login, password) | ||||
|  | ||||
|  | ||||
| def decrypt_multiple( | ||||
|     accounts: Iterator[tuple[str, bytes, bytes, bytes]], master_pass: str | ||||
| ) -> Iterator[tuple[str, str, str]]: | ||||
|     """Gets an iterator of tuples, where values represent account's name, salt, | ||||
|     encrypted login and encrypted password. | ||||
|     Return an iterator of names, logins and passwords as a tuple""" | ||||
|     for account in accounts: | ||||
|         name, salt, enc_login, enc_passwd = account | ||||
|         login, passwd = decrypt( | ||||
|             enc_login, | ||||
|             enc_passwd, | ||||
|             master_pass, | ||||
|             salt, | ||||
|         ) | ||||
|         yield (name, login, passwd) | ||||
							
								
								
									
										18
									
								
								src/generate_password.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										18
									
								
								src/generate_password.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,18 @@ | ||||
| import string | ||||
| from random import SystemRandom | ||||
|  | ||||
| from .account_checks import FORBIDDEN_CHARS, PUNCTUATION, check_gened_password | ||||
|  | ||||
| PASSWORD_CHARS = tuple( | ||||
|     frozenset(string.ascii_letters + string.digits).difference(FORBIDDEN_CHARS) | ||||
|     | PUNCTUATION | ||||
| ) | ||||
|  | ||||
|  | ||||
| def gen_password() -> str: | ||||
|     """Generates password of length 32""" | ||||
|     choices = SystemRandom().choices | ||||
|     while True: | ||||
|         passwd = "".join(choices(PASSWORD_CHARS, k=32)) | ||||
|         if check_gened_password(passwd): | ||||
|             return passwd | ||||
		Reference in New Issue
	
	Block a user