Compare commits

..

No commits in common. "b0599c14841fd881379cfd0a5d10f87a241bf4b7" and "0d3965d5d209a21c40e7b626e5c4dd31608f754e" have entirely different histories.

15 changed files with 60 additions and 181 deletions

View File

@ -25,4 +25,3 @@
**/values.dev.yaml
README.md
data/
.flake8

View File

@ -1,2 +0,0 @@
[flake8]
exclude=.git,__pycache__,venv

View File

@ -1,2 +1 @@
black
flake8

View File

@ -6,7 +6,7 @@ from . import (
account_checks,
account_parsing,
bot,
encryption,
cryptography,
database,
generate_password,
)
@ -15,7 +15,7 @@ __all__ = [
"account_checks",
"account_parsing",
"bot",
"encryption",
"cryptography",
"database",
"generate_password",
]

View File

@ -26,13 +26,7 @@ def check_password(passwd: str) -> bool:
def check_account(name: str, login: str, passwd: str) -> bool:
"""Runs checks for account name, login and password"""
return all(
(
check_account_name(name),
check_login(login),
check_password(passwd),
)
)
return check_account_name(name) and check_login(login) and check_password(passwd)
def check_gened_password(passwd: str, /) -> bool:

View File

@ -11,7 +11,7 @@ class _Account(pydantic.BaseModel):
@classmethod
def from_tuple(cls: Type[Self], tuple_: tuple[str, str, str]) -> Self:
return cls(name=tuple_[0], login=tuple_[1], password=tuple_[2])
return cls(name=tuple_[0], login=tuple_[1], passwd=tuple_[2])
def as_tuple(self: Self) -> tuple[str, str, str]:
return (self.name, self.login, self.password)

View File

@ -15,20 +15,16 @@ def create_bot(token: str, engine: Engine) -> telebot.TeleBot:
commands=["set_master_pass"],
)
bot.register_message_handler(
functools.partial(handlers.get_account, bot, engine),
commands=["get_account"],
functools.partial(handlers.get_account, bot, engine), commands=["get_account"]
)
bot.register_message_handler(
functools.partial(handlers.get_accounts, bot, engine),
commands=["get_accounts"],
functools.partial(handlers.get_accounts, bot, engine), commands=["get_accounts"]
)
bot.register_message_handler(
functools.partial(handlers.add_account, bot, engine),
commands=["add_account"],
functools.partial(handlers.add_account, bot, engine), commands=["add_account"]
)
bot.register_message_handler(
functools.partial(handlers.delete_all, bot, engine),
commands=["delete_all"],
functools.partial(handlers.delete_all, bot, engine), commands=["delete_all"]
)
bot.register_message_handler(
functools.partial(handlers.reset_master_pass, bot, engine),
@ -39,8 +35,7 @@ def create_bot(token: str, engine: Engine) -> telebot.TeleBot:
commands=["delete_account"],
)
bot.register_message_handler(
functools.partial(handlers.help_command, bot),
commands=["help", "start"],
functools.partial(handlers.help, bot), commands=["help", "start"]
)
bot.register_message_handler(
functools.partial(handlers.cancel, bot), commands=["cancel"]
@ -49,11 +44,9 @@ def create_bot(token: str, engine: Engine) -> telebot.TeleBot:
functools.partial(handlers.export, bot, engine), commands=["export"]
)
bot.register_message_handler(
functools.partial(handlers.import_accounts, bot, engine),
commands=["import"],
functools.partial(handlers.import_accounts, bot, engine), commands=["import"]
)
bot.register_message_handler(
functools.partial(handlers.gen_password, bot),
commands=["gen_password"],
functools.partial(handlers.gen_password, bot), commands=["gen_password"]
)
return bot

View File

@ -5,7 +5,7 @@ import time
import telebot
from sqlalchemy.future import Engine
from .. import encryption, database, generate_password
from .. import cryptography, database, generate_password
from ..account_checks import (
check_account,
check_account_name,
@ -58,8 +58,7 @@ def delete_all(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None:
_base_handler(bot, mes)
bot_mes = bot.send_message(
mes.chat.id,
"Вы действительно хотите удалить все ваши аккаунты? Это действие "
"нельзя отменить. "
"Вы действительно хотите удалить все ваши аккаунты? Это действие нельзя отменить. "
"Отправьте YES для подтверждения",
)
bot.register_next_step_handler(
@ -77,25 +76,13 @@ def _delete_all(
database.delete.delete_master_pass(engine, mes.from_user.id)
_send_tmp_message(bot, mes.chat.id, "Всё успешно удалено", timeout=10)
else:
_send_tmp_message(
bot,
mes.chat.id,
"Вы отправили не YES, ничего не удалено",
)
_send_tmp_message(bot, mes.chat.id, "Вы отправили не YES, ничего не удалено")
def set_master_password(
bot: telebot.TeleBot,
engine: Engine,
mes: Message,
) -> None:
def set_master_password(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None:
_base_handler(bot, mes, None)
if database.get.get_master_pass(engine, mes.from_user.id) is not None:
return _send_tmp_message(
bot,
mes.chat.id,
"Мастер пароль уже существует",
)
return _send_tmp_message(bot, mes.chat.id, "Мастер пароль уже существует")
bot_mes = bot.send_message(mes.chat.id, "Отправьте мастер пароль")
bot.register_next_step_handler(
mes, functools.partial(_set_master_pass2, bot, engine, bot_mes)
@ -110,25 +97,14 @@ def _set_master_pass2(
if text == "/cancel":
return _send_tmp_message(bot, mes.chat.id, "Успешная отмена")
password_hash, master_salt = encryption.master_pass.encrypt_master_pass(
text,
)
database.add.add_master_pass(
engine,
mes.from_user.id,
master_salt,
password_hash,
)
hash_pass, master_salt = cryptography.master_pass.encrypt_master_pass(text)
database.add.add_master_pass(engine, mes.from_user.id, master_salt, hash_pass)
_send_tmp_message(bot, mes.chat.id, "Успех")
del mes, text
gc.collect()
def reset_master_pass(
bot: telebot.TeleBot,
engine: Engine,
mes: Message,
) -> None:
def reset_master_pass(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None:
_base_handler(bot, mes)
if database.get.get_master_pass(engine, mes.from_user.id) is None:
return _send_tmp_message(bot, mes.chat.id, "Мастер пароль не задан")
@ -150,7 +126,7 @@ def _reset_master_pass2(
if text == "/cancel":
return _send_tmp_message(bot, mes.chat.id, "Успешная отмена")
hash_, salt = encryption.master_pass.encrypt_master_pass(text)
hash_, salt = cryptography.master_pass.encrypt_master_pass(text)
database.delete.purge_accounts(engine, mes.from_user.id)
database.change.change_master_pass(engine, mes.from_user.id, salt, hash_)
_send_tmp_message(
@ -163,10 +139,7 @@ def _reset_master_pass2(
def add_account(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None:
_base_handler(bot, mes)
master_password_from_db = database.get.get_master_pass(
engine,
mes.from_user.id,
)
master_password_from_db = database.get.get_master_pass(engine, mes.from_user.id)
if master_password_from_db is None:
return _send_tmp_message(bot, mes.chat.id, "Нет мастер пароля")
@ -186,11 +159,7 @@ def _add_account2(
return _send_tmp_message(bot, mes.chat.id, "Успешная отмена")
if not check_account_name(text):
return _send_tmp_message(
bot,
mes.chat.id,
"Не корректное название аккаунта",
)
return _send_tmp_message(bot, mes.chat.id, "Не корректное название аккаунта")
if text in database.get.get_accounts(engine, mes.from_user.id):
return _send_tmp_message(
bot, mes.chat.id, "Аккаунт с таким именем уже существует"
@ -263,19 +232,13 @@ def _add_account5(
return _send_tmp_message(bot, mes.chat.id, "Успешная отмена")
salt, hash_ = database.get.get_master_pass(engine, mes.from_user.id)
if not encryption.master_pass.check_master_pass(text, hash_, salt):
return _send_tmp_message(
bot,
mes.chat.id,
"Не подходит главный пароль",
)
if not cryptography.master_pass.check_master_pass(text, hash_, salt):
return _send_tmp_message(bot, mes.chat.id, "Не подходит главный пароль")
name, login, passwd = data["name"], data["login"], data["passwd"]
enc_login, enc_pass, salt = encryption.other_accounts.encrypt(
login,
passwd,
text,
enc_login, enc_pass, salt = cryptography.other_accounts.encrypt_account_info(
login, passwd, text
)
result = database.add.add_account(
@ -283,9 +246,7 @@ def _add_account5(
)
_send_tmp_message(
bot,
mes.chat.id,
"Успех" if result else "Произошла не предвиденная ошибка",
bot, mes.chat.id, "Успех" if result else "Произошла не предвиденная ошибка"
)
del data, name, login, passwd, enc_login
@ -324,43 +285,29 @@ def _get_account2(
def _get_account3(
bot: telebot.TeleBot,
engine: Engine,
prev_mes: Message,
name: str,
mes: Message,
bot: telebot.TeleBot, engine: Engine, prev_mes: Message, name: str, mes: Message
) -> None:
_base_handler(bot, mes, prev_mes)
text = mes.text.strip()
if text == "/cancel":
return _send_tmp_message(bot, mes.chat.id, "Успешная отмена")
master_salt, hash_pass = database.get.get_master_pass(
engine,
mes.from_user.id,
)
master_salt, hash_pass = database.get.get_master_pass(engine, mes.from_user.id)
if not encryption.master_pass.check_master_pass(
text,
hash_pass,
master_salt,
):
if not cryptography.master_pass.check_master_pass(text, hash_pass, master_salt):
return _send_tmp_message(bot, mes.chat.id, "Не подходит мастер пароль")
salt, enc_login, enc_pass = database.get.get_account_info(
engine, mes.from_user.id, name
)
login, passwd = encryption.other_accounts.decrypt(
enc_login,
enc_pass,
text,
salt,
login, passwd = cryptography.other_accounts.decrypt_account_info(
enc_login, enc_pass, text, salt
)
_send_tmp_message(
bot,
mes.chat.id,
f"Логин:\n`{login}`\nПароль:\n`{passwd}`\nНажмите на логин "
"или пароль, чтобы скопировать",
f"Логин:\n`{login}`\nПароль:\n`{passwd}`\nНажмите на логин или пароль, "
"чтобы скопировать",
30,
)
@ -399,7 +346,7 @@ def _delete_account2(
_send_tmp_message(bot, mes.chat.id, "Аккаунт удалён")
def help_command(bot: telebot.TeleBot, mes: Message) -> None:
def help(bot: telebot.TeleBot, mes: Message) -> None:
message = """Команды:
/set_master_pass - установить мастер пароль
/add_account - создать аккаунт
@ -424,10 +371,7 @@ def cancel(bot: telebot.TeleBot, mes: Message) -> None:
def export(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None:
_base_handler(bot, mes)
master_password_from_db = database.get.get_master_pass(
engine,
mes.from_user.id,
)
master_password_from_db = database.get.get_master_pass(engine, mes.from_user.id)
if master_password_from_db is None:
return _send_tmp_message(bot, mes.chat.id, "Нет мастер пароля")
@ -450,19 +394,12 @@ def _export2(
if text == "/cancel":
return _send_tmp_message(bot, mes.chat.id, "Успешная отмена")
master_salt, hash_pass = database.get.get_master_pass(
engine,
mes.from_user.id,
)
if not encryption.master_pass.check_master_pass(
text,
hash_pass,
master_salt,
):
master_salt, hash_pass = database.get.get_master_pass(engine, mes.from_user.id)
if not cryptography.master_pass.check_master_pass(text, hash_pass, master_salt):
return _send_tmp_message(bot, mes.chat.id, "Не подходит мастер пароль")
accounts = database.get.get_all_accounts(engine, mes.from_user.id)
accounts = encryption.other_accounts.decrypt_multiple(accounts, text)
accounts = cryptography.other_accounts.decrypt_multiple(accounts, text)
json_io = accounts_to_json(accounts)
bot_mes = bot.send_document(mes.chat.id, json_io)
@ -472,16 +409,9 @@ def _export2(
bot.delete_message(bot_mes.chat.id, bot_mes.id)
def import_accounts(
bot: telebot.TeleBot,
engine: Engine,
mes: Message,
) -> None:
def import_accounts(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None:
_base_handler(bot, mes)
master_password_from_db = database.get.get_master_pass(
engine,
mes.from_user.id,
)
master_password_from_db = database.get.get_master_pass(engine, mes.from_user.id)
if master_password_from_db is None:
return _send_tmp_message(bot, mes.chat.id, "Нет мастер пароля")
@ -503,11 +433,7 @@ def _import2(
return _send_tmp_message(bot, mes.chat.id, "Успешная отмена")
if mes.document is None:
return _send_tmp_message(
bot,
mes.chat.id,
"Вы должны отправить документ",
)
return _send_tmp_message(bot, mes.chat.id, "Вы должны отправить документ")
if mes.document.file_size > 102_400: # If file size is bigger that 100 MB
return _send_tmp_message(bot, mes.chat.id, "Файл слишком большой")
@ -516,11 +442,7 @@ def _import2(
try:
accounts = json_to_accounts(downloaded_file.decode("utf-8"))
except Exception:
return _send_tmp_message(
bot,
mes.chat.id,
"Ошибка во время работы с файлом",
)
return _send_tmp_message(bot, mes.chat.id, "Ошибка во время работы с файлом")
bot_mes = bot.send_message(mes.chat.id, "Отправьте мастер пароль")
bot.register_next_step_handler(
@ -540,29 +462,19 @@ def _import3(
if text == "/cancel":
return _send_tmp_message(bot, mes.chat.id, "Успешная отмена")
master_salt, hash_pass = database.get.get_master_pass(
engine,
mes.from_user.id,
)
if not encryption.master_pass.check_master_pass(
text,
hash_pass,
master_salt,
):
master_salt, hash_pass = database.get.get_master_pass(engine, mes.from_user.id)
if not cryptography.master_pass.check_master_pass(text, hash_pass, master_salt):
return _send_tmp_message(bot, mes.chat.id, "Не подходит мастер пароль")
# List of names of accounts, which failed to be added to the database
# or failed the tests
# List of names of accounts, which failed to be added to the database or failed tests
failed: list[str] = []
for account in accounts:
name, login, passwd = account
if not check_account(name, login, passwd):
failed.append(name)
continue
enc_login, enc_passwd, salt = encryption.other_accounts.encrypt(
login,
passwd,
text,
enc_login, enc_passwd, salt = cryptography.other_accounts.encrypt_account_info(
login, passwd, text
)
result = database.add.add_account(
engine, mes.from_user.id, name, salt, enc_login, enc_passwd

View File

@ -20,10 +20,8 @@ def _generate_key(salt: bytes, master_pass: bytes) -> bytes:
return key
def encrypt(
login: str,
passwd: str,
master_pass: str,
def encrypt_account_info(
login: str, passwd: str, master_pass: str
) -> tuple[bytes, bytes, bytes]:
"""Encrypts login and password of a user using their master
password as a key.
@ -36,7 +34,7 @@ def encrypt(
return (enc_login, enc_password, salt)
def decrypt(
def decrypt_account_info(
enc_login: bytes,
enc_pass: bytes,
master_pass: str,
@ -60,10 +58,5 @@ def decrypt_multiple(
Return an iterator of names, logins and passwords as a tuple"""
for account in accounts:
name, salt, enc_login, enc_passwd = account
login, passwd = decrypt(
enc_login,
enc_passwd,
master_pass,
salt,
)
login, passwd = decrypt_account_info(enc_login, enc_passwd, master_pass, salt)
yield (name, login, passwd)

View File

@ -13,8 +13,7 @@ def add_account(
enc_login: bytes,
enc_password: bytes,
) -> bool:
"""Adds account to the database. Returns true on success,
false otherwise"""
"""Adds account to the database. Returns true on success, false otherwise"""
account = models.Account(
user_id=user_id,
name=name,
@ -38,8 +37,7 @@ def add_master_pass(
salt: bytes,
password_hash: bytes,
) -> bool:
"""Adds master password the database. Returns true on success,
false otherwise"""
"""Adds master password the database. Returns true on success, false otherwise"""
master_pass = models.MasterPass(
user_id=user_id,
salt=salt,

View File

@ -11,7 +11,7 @@ def change_master_pass(
statement = (
sqlmodel.update(models.MasterPass)
.where(models.MasterPass.user_id == user_id)
.values(salt=salt, password_hash=password)
.values(salt=salt, passwd=password)
)
with sqlmodel.Session(engine) as session:
session.exec(statement)

View File

@ -35,20 +35,15 @@ def get_accounts(engine: Engine, user_id: int) -> list[str]:
def get_all_accounts(
engine: Engine, user_id: int
) -> Iterator[tuple[str, bytes, bytes, bytes]]:
"""Returns an iterator of tuples, where values represent account's
name, salt, encrypted login and encrypted password"""
"""Returns an iterator of tuples, where values represent account's name, salt,
encrypted login and encrypted password"""
statement = sqlmodel.select(models.Account).where(
models.Account.user_id == user_id,
)
with sqlmodel.Session(engine) as session:
result = session.exec(statement)
yield from (
(
account.name,
account.salt,
account.enc_login,
account.enc_password,
)
(account.name, account.salt, account.enc_login, account.enc_password)
for account in result
)

View File

@ -1,15 +1,13 @@
import sqlmodel
from sqlalchemy.future import Engine
from . import models # noqa
HOUR_IN_SECONDS = 3600
from . import models
def get_engine(host: str, user: str, passwd: str, db: str) -> Engine:
"""Creates an engine for mariadb with pymysql as connector"""
uri = f"mariadb+pymysql://{user}:{passwd}@{host}/{db}"
engine = sqlmodel.create_engine(uri, pool_recycle=HOUR_IN_SECONDS)
engine = sqlmodel.create_engine(uri)
return engine