1
1

Compare commits

...

5 Commits

15 changed files with 181 additions and 60 deletions

@ -25,3 +25,4 @@
**/values.dev.yaml
README.md
data/
.flake8

2
.flake8 Normal file

@ -0,0 +1,2 @@
[flake8]
exclude=.git,__pycache__,venv

@ -1 +1,2 @@
black
flake8

@ -6,7 +6,7 @@ from . import (
account_checks,
account_parsing,
bot,
cryptography,
encryption,
database,
generate_password,
)
@ -15,7 +15,7 @@ __all__ = [
"account_checks",
"account_parsing",
"bot",
"cryptography",
"encryption",
"database",
"generate_password",
]

@ -26,7 +26,13 @@ def check_password(passwd: str) -> bool:
def check_account(name: str, login: str, passwd: str) -> bool:
"""Runs checks for account name, login and password"""
return check_account_name(name) and check_login(login) and check_password(passwd)
return all(
(
check_account_name(name),
check_login(login),
check_password(passwd),
)
)
def check_gened_password(passwd: str, /) -> bool:

@ -11,7 +11,7 @@ class _Account(pydantic.BaseModel):
@classmethod
def from_tuple(cls: Type[Self], tuple_: tuple[str, str, str]) -> Self:
return cls(name=tuple_[0], login=tuple_[1], passwd=tuple_[2])
return cls(name=tuple_[0], login=tuple_[1], password=tuple_[2])
def as_tuple(self: Self) -> tuple[str, str, str]:
return (self.name, self.login, self.password)

@ -15,16 +15,20 @@ def create_bot(token: str, engine: Engine) -> telebot.TeleBot:
commands=["set_master_pass"],
)
bot.register_message_handler(
functools.partial(handlers.get_account, bot, engine), commands=["get_account"]
functools.partial(handlers.get_account, bot, engine),
commands=["get_account"],
)
bot.register_message_handler(
functools.partial(handlers.get_accounts, bot, engine), commands=["get_accounts"]
functools.partial(handlers.get_accounts, bot, engine),
commands=["get_accounts"],
)
bot.register_message_handler(
functools.partial(handlers.add_account, bot, engine), commands=["add_account"]
functools.partial(handlers.add_account, bot, engine),
commands=["add_account"],
)
bot.register_message_handler(
functools.partial(handlers.delete_all, bot, engine), commands=["delete_all"]
functools.partial(handlers.delete_all, bot, engine),
commands=["delete_all"],
)
bot.register_message_handler(
functools.partial(handlers.reset_master_pass, bot, engine),
@ -35,7 +39,8 @@ def create_bot(token: str, engine: Engine) -> telebot.TeleBot:
commands=["delete_account"],
)
bot.register_message_handler(
functools.partial(handlers.help, bot), commands=["help", "start"]
functools.partial(handlers.help_command, bot),
commands=["help", "start"],
)
bot.register_message_handler(
functools.partial(handlers.cancel, bot), commands=["cancel"]
@ -44,9 +49,11 @@ def create_bot(token: str, engine: Engine) -> telebot.TeleBot:
functools.partial(handlers.export, bot, engine), commands=["export"]
)
bot.register_message_handler(
functools.partial(handlers.import_accounts, bot, engine), commands=["import"]
functools.partial(handlers.import_accounts, bot, engine),
commands=["import"],
)
bot.register_message_handler(
functools.partial(handlers.gen_password, bot), commands=["gen_password"]
functools.partial(handlers.gen_password, bot),
commands=["gen_password"],
)
return bot

@ -5,7 +5,7 @@ import time
import telebot
from sqlalchemy.future import Engine
from .. import cryptography, database, generate_password
from .. import encryption, database, generate_password
from ..account_checks import (
check_account,
check_account_name,
@ -58,7 +58,8 @@ def delete_all(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None:
_base_handler(bot, mes)
bot_mes = bot.send_message(
mes.chat.id,
"Вы действительно хотите удалить все ваши аккаунты? Это действие нельзя отменить. "
"Вы действительно хотите удалить все ваши аккаунты? Это действие "
"нельзя отменить. "
"Отправьте YES для подтверждения",
)
bot.register_next_step_handler(
@ -76,13 +77,25 @@ def _delete_all(
database.delete.delete_master_pass(engine, mes.from_user.id)
_send_tmp_message(bot, mes.chat.id, "Всё успешно удалено", timeout=10)
else:
_send_tmp_message(bot, mes.chat.id, "Вы отправили не YES, ничего не удалено")
_send_tmp_message(
bot,
mes.chat.id,
"Вы отправили не YES, ничего не удалено",
)
def set_master_password(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None:
def set_master_password(
bot: telebot.TeleBot,
engine: Engine,
mes: Message,
) -> None:
_base_handler(bot, mes, None)
if database.get.get_master_pass(engine, mes.from_user.id) is not None:
return _send_tmp_message(bot, mes.chat.id, "Мастер пароль уже существует")
return _send_tmp_message(
bot,
mes.chat.id,
"Мастер пароль уже существует",
)
bot_mes = bot.send_message(mes.chat.id, "Отправьте мастер пароль")
bot.register_next_step_handler(
mes, functools.partial(_set_master_pass2, bot, engine, bot_mes)
@ -97,14 +110,25 @@ def _set_master_pass2(
if text == "/cancel":
return _send_tmp_message(bot, mes.chat.id, "Успешная отмена")
hash_pass, master_salt = cryptography.master_pass.encrypt_master_pass(text)
database.add.add_master_pass(engine, mes.from_user.id, master_salt, hash_pass)
password_hash, master_salt = encryption.master_pass.encrypt_master_pass(
text,
)
database.add.add_master_pass(
engine,
mes.from_user.id,
master_salt,
password_hash,
)
_send_tmp_message(bot, mes.chat.id, "Успех")
del mes, text
gc.collect()
def reset_master_pass(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None:
def reset_master_pass(
bot: telebot.TeleBot,
engine: Engine,
mes: Message,
) -> None:
_base_handler(bot, mes)
if database.get.get_master_pass(engine, mes.from_user.id) is None:
return _send_tmp_message(bot, mes.chat.id, "Мастер пароль не задан")
@ -126,7 +150,7 @@ def _reset_master_pass2(
if text == "/cancel":
return _send_tmp_message(bot, mes.chat.id, "Успешная отмена")
hash_, salt = cryptography.master_pass.encrypt_master_pass(text)
hash_, salt = encryption.master_pass.encrypt_master_pass(text)
database.delete.purge_accounts(engine, mes.from_user.id)
database.change.change_master_pass(engine, mes.from_user.id, salt, hash_)
_send_tmp_message(
@ -139,7 +163,10 @@ def _reset_master_pass2(
def add_account(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None:
_base_handler(bot, mes)
master_password_from_db = database.get.get_master_pass(engine, mes.from_user.id)
master_password_from_db = database.get.get_master_pass(
engine,
mes.from_user.id,
)
if master_password_from_db is None:
return _send_tmp_message(bot, mes.chat.id, "Нет мастер пароля")
@ -159,7 +186,11 @@ def _add_account2(
return _send_tmp_message(bot, mes.chat.id, "Успешная отмена")
if not check_account_name(text):
return _send_tmp_message(bot, mes.chat.id, "Не корректное название аккаунта")
return _send_tmp_message(
bot,
mes.chat.id,
"Не корректное название аккаунта",
)
if text in database.get.get_accounts(engine, mes.from_user.id):
return _send_tmp_message(
bot, mes.chat.id, "Аккаунт с таким именем уже существует"
@ -232,13 +263,19 @@ def _add_account5(
return _send_tmp_message(bot, mes.chat.id, "Успешная отмена")
salt, hash_ = database.get.get_master_pass(engine, mes.from_user.id)
if not cryptography.master_pass.check_master_pass(text, hash_, salt):
return _send_tmp_message(bot, mes.chat.id, "Не подходит главный пароль")
if not encryption.master_pass.check_master_pass(text, hash_, salt):
return _send_tmp_message(
bot,
mes.chat.id,
"Не подходит главный пароль",
)
name, login, passwd = data["name"], data["login"], data["passwd"]
enc_login, enc_pass, salt = cryptography.other_accounts.encrypt_account_info(
login, passwd, text
enc_login, enc_pass, salt = encryption.other_accounts.encrypt(
login,
passwd,
text,
)
result = database.add.add_account(
@ -246,7 +283,9 @@ def _add_account5(
)
_send_tmp_message(
bot, mes.chat.id, "Успех" if result else "Произошла не предвиденная ошибка"
bot,
mes.chat.id,
"Успех" if result else "Произошла не предвиденная ошибка",
)
del data, name, login, passwd, enc_login
@ -285,29 +324,43 @@ def _get_account2(
def _get_account3(
bot: telebot.TeleBot, engine: Engine, prev_mes: Message, name: str, mes: Message
bot: telebot.TeleBot,
engine: Engine,
prev_mes: Message,
name: str,
mes: Message,
) -> None:
_base_handler(bot, mes, prev_mes)
text = mes.text.strip()
if text == "/cancel":
return _send_tmp_message(bot, mes.chat.id, "Успешная отмена")
master_salt, hash_pass = database.get.get_master_pass(engine, mes.from_user.id)
master_salt, hash_pass = database.get.get_master_pass(
engine,
mes.from_user.id,
)
if not cryptography.master_pass.check_master_pass(text, hash_pass, master_salt):
if not encryption.master_pass.check_master_pass(
text,
hash_pass,
master_salt,
):
return _send_tmp_message(bot, mes.chat.id, "Не подходит мастер пароль")
salt, enc_login, enc_pass = database.get.get_account_info(
engine, mes.from_user.id, name
)
login, passwd = cryptography.other_accounts.decrypt_account_info(
enc_login, enc_pass, text, salt
login, passwd = encryption.other_accounts.decrypt(
enc_login,
enc_pass,
text,
salt,
)
_send_tmp_message(
bot,
mes.chat.id,
f"Логин:\n`{login}`\nПароль:\n`{passwd}`\nНажмите на логин или пароль, "
"чтобы скопировать",
f"Логин:\n`{login}`\nПароль:\n`{passwd}`\nНажмите на логин "
"или пароль, чтобы скопировать",
30,
)
@ -346,7 +399,7 @@ def _delete_account2(
_send_tmp_message(bot, mes.chat.id, "Аккаунт удалён")
def help(bot: telebot.TeleBot, mes: Message) -> None:
def help_command(bot: telebot.TeleBot, mes: Message) -> None:
message = """Команды:
/set_master_pass - установить мастер пароль
/add_account - создать аккаунт
@ -371,7 +424,10 @@ def cancel(bot: telebot.TeleBot, mes: Message) -> None:
def export(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None:
_base_handler(bot, mes)
master_password_from_db = database.get.get_master_pass(engine, mes.from_user.id)
master_password_from_db = database.get.get_master_pass(
engine,
mes.from_user.id,
)
if master_password_from_db is None:
return _send_tmp_message(bot, mes.chat.id, "Нет мастер пароля")
@ -394,12 +450,19 @@ def _export2(
if text == "/cancel":
return _send_tmp_message(bot, mes.chat.id, "Успешная отмена")
master_salt, hash_pass = database.get.get_master_pass(engine, mes.from_user.id)
if not cryptography.master_pass.check_master_pass(text, hash_pass, master_salt):
master_salt, hash_pass = database.get.get_master_pass(
engine,
mes.from_user.id,
)
if not encryption.master_pass.check_master_pass(
text,
hash_pass,
master_salt,
):
return _send_tmp_message(bot, mes.chat.id, "Не подходит мастер пароль")
accounts = database.get.get_all_accounts(engine, mes.from_user.id)
accounts = cryptography.other_accounts.decrypt_multiple(accounts, text)
accounts = encryption.other_accounts.decrypt_multiple(accounts, text)
json_io = accounts_to_json(accounts)
bot_mes = bot.send_document(mes.chat.id, json_io)
@ -409,9 +472,16 @@ def _export2(
bot.delete_message(bot_mes.chat.id, bot_mes.id)
def import_accounts(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None:
def import_accounts(
bot: telebot.TeleBot,
engine: Engine,
mes: Message,
) -> None:
_base_handler(bot, mes)
master_password_from_db = database.get.get_master_pass(engine, mes.from_user.id)
master_password_from_db = database.get.get_master_pass(
engine,
mes.from_user.id,
)
if master_password_from_db is None:
return _send_tmp_message(bot, mes.chat.id, "Нет мастер пароля")
@ -433,7 +503,11 @@ def _import2(
return _send_tmp_message(bot, mes.chat.id, "Успешная отмена")
if mes.document is None:
return _send_tmp_message(bot, mes.chat.id, "Вы должны отправить документ")
return _send_tmp_message(
bot,
mes.chat.id,
"Вы должны отправить документ",
)
if mes.document.file_size > 102_400: # If file size is bigger that 100 MB
return _send_tmp_message(bot, mes.chat.id, "Файл слишком большой")
@ -442,7 +516,11 @@ def _import2(
try:
accounts = json_to_accounts(downloaded_file.decode("utf-8"))
except Exception:
return _send_tmp_message(bot, mes.chat.id, "Ошибка во время работы с файлом")
return _send_tmp_message(
bot,
mes.chat.id,
"Ошибка во время работы с файлом",
)
bot_mes = bot.send_message(mes.chat.id, "Отправьте мастер пароль")
bot.register_next_step_handler(
@ -462,19 +540,29 @@ def _import3(
if text == "/cancel":
return _send_tmp_message(bot, mes.chat.id, "Успешная отмена")
master_salt, hash_pass = database.get.get_master_pass(engine, mes.from_user.id)
if not cryptography.master_pass.check_master_pass(text, hash_pass, master_salt):
master_salt, hash_pass = database.get.get_master_pass(
engine,
mes.from_user.id,
)
if not encryption.master_pass.check_master_pass(
text,
hash_pass,
master_salt,
):
return _send_tmp_message(bot, mes.chat.id, "Не подходит мастер пароль")
# List of names of accounts, which failed to be added to the database or failed tests
# List of names of accounts, which failed to be added to the database
# or failed the tests
failed: list[str] = []
for account in accounts:
name, login, passwd = account
if not check_account(name, login, passwd):
failed.append(name)
continue
enc_login, enc_passwd, salt = cryptography.other_accounts.encrypt_account_info(
login, passwd, text
enc_login, enc_passwd, salt = encryption.other_accounts.encrypt(
login,
passwd,
text,
)
result = database.add.add_account(
engine, mes.from_user.id, name, salt, enc_login, enc_passwd

@ -13,7 +13,8 @@ def add_account(
enc_login: bytes,
enc_password: bytes,
) -> bool:
"""Adds account to the database. Returns true on success, false otherwise"""
"""Adds account to the database. Returns true on success,
false otherwise"""
account = models.Account(
user_id=user_id,
name=name,
@ -37,7 +38,8 @@ def add_master_pass(
salt: bytes,
password_hash: bytes,
) -> bool:
"""Adds master password the database. Returns true on success, false otherwise"""
"""Adds master password the database. Returns true on success,
false otherwise"""
master_pass = models.MasterPass(
user_id=user_id,
salt=salt,

@ -11,7 +11,7 @@ def change_master_pass(
statement = (
sqlmodel.update(models.MasterPass)
.where(models.MasterPass.user_id == user_id)
.values(salt=salt, passwd=password)
.values(salt=salt, password_hash=password)
)
with sqlmodel.Session(engine) as session:
session.exec(statement)

@ -35,15 +35,20 @@ def get_accounts(engine: Engine, user_id: int) -> list[str]:
def get_all_accounts(
engine: Engine, user_id: int
) -> Iterator[tuple[str, bytes, bytes, bytes]]:
"""Returns an iterator of tuples, where values represent account's name, salt,
encrypted login and encrypted password"""
"""Returns an iterator of tuples, where values represent account's
name, salt, encrypted login and encrypted password"""
statement = sqlmodel.select(models.Account).where(
models.Account.user_id == user_id,
)
with sqlmodel.Session(engine) as session:
result = session.exec(statement)
yield from (
(account.name, account.salt, account.enc_login, account.enc_password)
(
account.name,
account.salt,
account.enc_login,
account.enc_password,
)
for account in result
)

@ -1,13 +1,15 @@
import sqlmodel
from sqlalchemy.future import Engine
from . import models
from . import models # noqa
HOUR_IN_SECONDS = 3600
def get_engine(host: str, user: str, passwd: str, db: str) -> Engine:
"""Creates an engine for mariadb with pymysql as connector"""
uri = f"mariadb+pymysql://{user}:{passwd}@{host}/{db}"
engine = sqlmodel.create_engine(uri)
engine = sqlmodel.create_engine(uri, pool_recycle=HOUR_IN_SECONDS)
return engine

@ -20,8 +20,10 @@ def _generate_key(salt: bytes, master_pass: bytes) -> bytes:
return key
def encrypt_account_info(
login: str, passwd: str, master_pass: str
def encrypt(
login: str,
passwd: str,
master_pass: str,
) -> tuple[bytes, bytes, bytes]:
"""Encrypts login and password of a user using their master
password as a key.
@ -34,7 +36,7 @@ def encrypt_account_info(
return (enc_login, enc_password, salt)
def decrypt_account_info(
def decrypt(
enc_login: bytes,
enc_pass: bytes,
master_pass: str,
@ -58,5 +60,10 @@ def decrypt_multiple(
Return an iterator of names, logins and passwords as a tuple"""
for account in accounts:
name, salt, enc_login, enc_passwd = account
login, passwd = decrypt_account_info(enc_login, enc_passwd, master_pass, salt)
login, passwd = decrypt(
enc_login,
enc_passwd,
master_pass,
salt,
)
yield (name, login, passwd)