19 Commits

Author SHA1 Message Date
972c5577f4 Added validation to the database models 2023-01-10 21:00:03 +03:00
671286dc39 Removed backend because it wasn't used in encryption.accounts 2023-01-10 20:28:34 +03:00
5dbf93013a Added requesting a master password for deleting accounts, deleting all, reseting master password 2023-01-10 20:10:48 +03:00
c051c14f1f Switched to the ChaCha20Poly1305 encryption algorithm for better security 2023-01-10 20:00:58 +03:00
3686195396 Added a message about exceptions for a user 2023-01-06 17:32:44 +03:00
d79b57b1f0 Added return statement after sending a message about that there is no active message 2023-01-05 14:52:45 +03:00
157c2c4aa2 _import3 no longer blocks event loop during decryption
Now running encrytion of accounts in ProcessPoolExecutor
2023-01-05 14:11:29 +03:00
f4a5f51b23 Made more verbose exception handler for the bot for easier debugging 2023-01-05 13:47:31 +03:00
6bc8eb1413 db.add changes
Added _add_model helper function to reduce code duplication
Added add_accounts for future use
2023-01-05 13:19:01 +03:00
9f64305050 Moved sorting back to the get_accounts 2023-01-05 13:03:44 +03:00
4954f39a91 Added indentation into exported json files 2023-01-05 13:02:50 +03:00
3edeb86b6c Disabled autoincrement in master_passwords table 2023-01-05 13:01:28 +03:00
c7675c231f Fixed sending a final message in /import 2023-01-03 21:07:37 +03:00
ae88fccf13 Updated type hint of the handler in the register_state function 2023-01-03 12:34:54 +03:00
e29eefe40b Changes in helper_functions.delete_message
Removed checking if sleep_time is 0
Moved sleeping outside of try block
2023-01-03 12:29:08 +03:00
fdbed91512 Now account name is copyable 2023-01-03 12:12:40 +03:00
f9d163361a Renamed functions in db.get
get_accounts -> get_account_names
get_all_accounts -> get_accounts
2023-01-03 12:08:06 +03:00
9ec66a3521 Renamed src/database into src/db 2023-01-03 11:59:06 +03:00
70e9afe21d Small reformating in bot.message_handler 2023-01-03 11:48:25 +03:00
16 changed files with 265 additions and 162 deletions

View File

@ -7,7 +7,7 @@ from . import (
account_checks,
account_parsing,
bot,
database,
db,
decrypted_account,
encryption,
generate_password,
@ -19,19 +19,19 @@ __all__ = [
"bot",
"decrypted_account",
"encryption",
"database",
"db",
"generate_password",
]
def main() -> None:
load_dotenv("./.env")
engine = database.prepare.get_engine(
engine = db.prepare.get_engine(
host=os.getenv("DB_HOST"),
user=os.getenv("DB_USER"),
passwd=os.getenv("DB_PASS"),
db=os.getenv("DB_NAME"),
)
database.prepare.prepare(engine)
db.prepare.prepare(engine)
bot_ = bot.create_bot(os.getenv("TG_TOKEN"), engine)
asyncio.run(bot_.infinity_polling())

View File

@ -35,7 +35,7 @@ class _Accounts(pydantic.BaseModel):
def _accounts_list_to_json(accounts: Iterable[DecryptedAccount]) -> str:
result = _Accounts(
accounts=[_Account.from_usual_account(i) for i in accounts],
).json(ensure_ascii=False)
).json(ensure_ascii=False, indent=2)
return result

View File

@ -3,13 +3,13 @@ import functools
from sqlalchemy.future import Engine
from telebot.async_telebot import AsyncTeleBot
from . import callback_handlers, message_handlers
from . import callback_handlers, exception_handler, message_handlers
__all__ = ["callback_handlers", "message_handlers"]
__all__ = ["callback_handlers", "exception_handler", "message_handlers"]
def create_bot(token: str, engine: Engine) -> AsyncTeleBot:
bot = AsyncTeleBot(token)
bot = AsyncTeleBot(token, exception_handler=exception_handler.Handler)
bot.register_message_handler(
functools.partial(message_handlers.set_master_password, bot, engine),
commands=["set_master_pass"],

View File

@ -0,0 +1,8 @@
import traceback
from typing import Type
class Handler:
@staticmethod
def handle(exc: Type[BaseException]) -> None:
traceback.print_exception(exc)

View File

@ -18,7 +18,7 @@ states: dict[tuple[int, int], Handler] = {}
def register_state(
message: Message,
handler: Callable[[Message], Any],
handler: Handler,
) -> None:
states[(message.chat.id, message.from_user.id)] = handler
@ -40,9 +40,8 @@ async def delete_message(
*,
sleep_time: int = 0,
) -> bool:
await asyncio.sleep(sleep_time)
try:
if sleep_time != 0:
await asyncio.sleep(sleep_time)
await bot.delete_message(mes.chat.id, mes.id)
except telebot.apihelper.ApiException:
return False
@ -86,10 +85,3 @@ async def send_deleteable_message(
parse_mode="MarkdownV2",
reply_markup=markup,
)
def escape(text: str) -> str:
escaped_chars = "*_~|`[("
for char in escaped_chars:
text = text.replace(char, rf"\\{char}")
return text

View File

@ -1,13 +1,14 @@
import asyncio
import functools
import gc
import itertools
from concurrent.futures import ProcessPoolExecutor
import telebot
from sqlalchemy.future import Engine
from telebot.async_telebot import AsyncTeleBot
from .. import database, encryption, generate_password
from .. import db, encryption, generate_password
from ..account_checks import (
check_account,
check_account_name,
@ -20,7 +21,6 @@ from . import markups
from .helper_functions import (
base_handler,
delete_message,
escape,
get_state,
register_state,
send_deleteable_message,
@ -36,7 +36,7 @@ async def get_accounts(
mes: Message,
) -> None:
await base_handler(bot, mes)
accounts = database.get.get_accounts(
accounts = db.get.get_account_names(
engine,
mes.from_user.id,
to_sort=True,
@ -58,23 +58,33 @@ async def get_accounts(
async def delete_all(bot: AsyncTeleBot, engine: Engine, mes: Message) -> None:
await base_handler(bot, mes)
master_pass = db.get.get_master_pass(engine, mes.from_user.id)
if master_pass is None:
await send_tmp_message(bot, mes.chat.id, "У вас нет мастер пароля")
return
bot_mes = await bot.send_message(
mes.chat.id,
"Вы действительно хотите удалить все ваши аккаунты? Это действие "
"нельзя отменить. "
"Отправьте YES для подтверждения",
"Отправьте мастер пароль для подтверждения",
)
register_state(
mes, functools.partial(_delete_all2, bot, engine, master_pass, bot_mes)
)
register_state(mes, functools.partial(_delete_all2, bot, engine, bot_mes))
async def _delete_all2(
bot: AsyncTeleBot, engine: Engine, prev_mes: Message, mes: Message
bot: AsyncTeleBot,
engine: Engine,
master_pass: db.models.MasterPass,
prev_mes: Message,
mes: Message,
) -> None:
await base_handler(bot, mes, prev_mes)
text = mes.text.strip()
if text == "YES":
database.delete.purge_accounts(engine, mes.from_user.id)
database.delete.delete_master_pass(engine, mes.from_user.id)
if encryption.master_pass.check_master_pass(text, master_pass):
db.delete.purge_accounts(engine, mes.from_user.id)
db.delete.delete_master_pass(engine, mes.from_user.id)
await send_tmp_message(
bot,
mes.chat.id,
@ -85,7 +95,7 @@ async def _delete_all2(
await send_tmp_message(
bot,
mes.chat.id,
"Вы отправили не YES, ничего не удалено",
"Вы отправили не верный мастер пароль, ничего не удалено",
)
@ -95,7 +105,7 @@ async def set_master_password(
mes: Message,
) -> None:
await base_handler(bot, mes, None)
if database.get.get_master_pass(engine, mes.from_user.id) is not None:
if db.get.get_master_pass(engine, mes.from_user.id) is not None:
return await send_tmp_message(
bot,
mes.chat.id,
@ -120,7 +130,7 @@ async def _set_master_pass2(
mes.from_user.id,
text,
)
database.add.add_master_pass(engine, master_password)
db.add.add_master_pass(engine, master_password)
await send_tmp_message(bot, mes.chat.id, "Успех")
del mes, text
@ -134,7 +144,8 @@ async def reset_master_pass(
) -> None:
await base_handler(bot, mes)
if database.get.get_master_pass(engine, mes.from_user.id) is None:
master_pass = db.get.get_master_pass(engine, mes.from_user.id)
if master_pass is None:
return await send_tmp_message(
bot,
mes.chat.id,
@ -143,17 +154,48 @@ async def reset_master_pass(
bot_mes = await bot.send_message(
mes.chat.id,
"Отправьте новый мастер пароль, осторожно, все текущие аккаунты "
"будут удалены навсегда",
"Отправьте текущий мастер пароль",
)
register_state(
mes,
functools.partial(_reset_master_pass2, bot, engine, bot_mes),
functools.partial(
_reset_master_pass2,
bot,
engine,
master_pass,
bot_mes,
),
)
async def _reset_master_pass2(
bot: AsyncTeleBot,
engine: Engine,
master_pass: db.models.MasterPass,
prev_mes: Message,
mes: Message,
) -> None:
await base_handler(bot, mes, prev_mes)
text = mes.text.strip()
if text == "/cancel":
await send_tmp_message(bot, mes.chat.id, "Успешная отмена")
if not encryption.master_pass.check_master_pass(text, master_pass):
await send_tmp_message(bot, mes.chat.id, "Неверный мастер пароль")
return
bot_mes = await bot.send_message(
mes.chat.id,
"Отправьте новый мастер пароль. Осторожно, все аккаунты будут удалены",
)
register_state(
mes,
functools.partial(_reset_master_pass3, bot, engine, bot_mes),
)
async def _reset_master_pass3(
bot: AsyncTeleBot, engine: Engine, prev_mes: Message, mes: Message
) -> None:
await base_handler(bot, mes, prev_mes)
@ -165,8 +207,8 @@ async def _reset_master_pass2(
mes.from_user.id,
text,
)
database.delete.purge_accounts(engine, mes.from_user.id)
database.change.change_master_pass(engine, master_password)
db.delete.purge_accounts(engine, mes.from_user.id)
db.change.change_master_pass(engine, master_password)
await send_tmp_message(
bot, mes.chat.id, "Все ваши аккаунты удалены, а мастер пароль изменён"
@ -178,7 +220,7 @@ async def _reset_master_pass2(
async def add_account(bot: AsyncTeleBot, engine: Engine, mes: Message) -> None:
await base_handler(bot, mes)
master_password_from_db = database.get.get_master_pass(
master_password_from_db = db.get.get_master_pass(
engine,
mes.from_user.id,
)
@ -210,7 +252,7 @@ async def _add_account2(
mes.chat.id,
"Не корректное название аккаунта",
)
if text in database.get.get_accounts(engine, mes.from_user.id):
if text in db.get.get_account_names(engine, mes.from_user.id):
return await send_tmp_message(
bot, mes.chat.id, "Аккаунт с таким именем уже существует"
)
@ -291,7 +333,7 @@ async def _add_account5(
if text == "/cancel":
return await send_tmp_message(bot, mes.chat.id, "Успешная отмена")
master_password = database.get.get_master_pass(engine, mes.from_user.id)
master_password = db.get.get_master_pass(engine, mes.from_user.id)
if not encryption.master_pass.check_master_pass(text, master_password):
return await send_tmp_message(
bot,
@ -307,12 +349,9 @@ async def _add_account5(
password=data["passwd"],
)
encrypted_account = encryption.accounts.encrypt(
account,
text,
)
encrypted_account = encryption.accounts.encrypt(account, text)
result = database.add.add_account(
result = db.add.add_account(
engine,
encrypted_account,
)
@ -331,11 +370,11 @@ async def _add_account5(
async def get_account(bot: AsyncTeleBot, engine: Engine, mes: Message) -> None:
await base_handler(bot, mes)
master_pass = database.get.get_master_pass(engine, mes.from_user.id)
master_pass = db.get.get_master_pass(engine, mes.from_user.id)
if master_pass is None:
return await send_tmp_message(bot, mes.chat.id, "Нет мастер пароля")
accounts = database.get.get_accounts(
accounts = db.get.get_account_names(
engine,
mes.from_user.id,
to_sort=True,
@ -358,7 +397,7 @@ async def _get_account2(
if text == "/cancel":
return await send_tmp_message(bot, mes.chat.id, "Успешная отмена")
if text not in database.get.get_accounts(engine, mes.from_user.id):
if text not in db.get.get_account_names(engine, mes.from_user.id):
return await send_tmp_message(bot, mes.chat.id, "Нет такого аккаунта")
bot_mes = await bot.send_message(mes.chat.id, "Отправьте мастер пароль")
@ -380,7 +419,7 @@ async def _get_account3(
if text == "/cancel":
return await send_tmp_message(bot, mes.chat.id, "Успешная отмена")
master_password = database.get.get_master_pass(
master_password = db.get.get_master_pass(
engine,
mes.from_user.id,
)
@ -392,17 +431,14 @@ async def _get_account3(
"Не подходит мастер пароль",
)
account = database.get.get_account_info(engine, mes.from_user.id, name)
account = encryption.accounts.decrypt(
account,
text,
)
account = db.get.get_account_info(engine, mes.from_user.id, name)
account = encryption.accounts.decrypt(account, text)
await send_deleteable_message(
bot,
mes.chat.id,
f"Название:\n{escape(account.name)}\n"
f"Название:\n`{account.name}`\n"
f"Логин:\n`{account.login}`\nПароль:\n`{account.password}`\nНажмите "
"на логин или пароль, чтобы скопировать",
"на название, логин или пароль, чтобы скопировать",
)
del text, mes
@ -416,11 +452,11 @@ async def delete_account(
) -> None:
await base_handler(bot, mes)
master_pass = database.get.get_master_pass(engine, mes.from_user.id)
master_pass = db.get.get_master_pass(engine, mes.from_user.id)
if master_pass is None:
return await send_tmp_message(bot, mes.chat.id, "Нет мастер пароля")
accounts = database.get.get_accounts(
accounts = db.get.get_account_names(
engine,
mes.from_user.id,
to_sort=True,
@ -434,13 +470,14 @@ async def delete_account(
register_state(
mes,
functools.partial(_delete_account2, bot, engine, bot_mes),
functools.partial(_delete_account2, bot, engine, master_pass, bot_mes),
)
async def _delete_account2(
bot: AsyncTeleBot,
engine: Engine,
master_pass: db.models.MasterPass,
prev_mes: Message,
mes: Message,
):
@ -449,34 +486,43 @@ async def _delete_account2(
if text == "/cancel":
return await send_tmp_message(bot, mes.chat.id, "Успешная отмена")
if text not in database.get.get_accounts(engine, mes.from_user.id):
if text not in db.get.get_account_names(engine, mes.from_user.id):
return await send_tmp_message(bot, mes.chat.id, "Нет такого аккаунта")
bot_mes = await bot.send_message(
mes.from_user.id,
f'Вы уверены, что хотите удалить аккаунт "{text}"?\nОтправьте YES для '
"подтверждения",
f'Вы уверены, что хотите удалить аккаунт "{text}"?\nОтправьте мастер '
"пароль для подтверждения",
)
register_state(
mes,
functools.partial(_delete_account3, bot, engine, bot_mes, text),
functools.partial(
_delete_account3,
bot,
engine,
master_pass,
bot_mes,
text,
),
)
async def _delete_account3(
bot: AsyncTeleBot,
engine: Engine,
master_pass: db.models.MasterPass,
prev_mes: Message,
account_name: str,
mes: Message,
) -> None:
await base_handler(bot, mes, prev_mes)
text = mes.text.strip()
if text != "YES":
return await send_tmp_message(bot, mes.chat.id, "Успешная отмена")
if not encryption.master_pass.check_master_pass(text, master_pass):
await send_tmp_message(bot, mes.chat.id, "Неверный пароль")
return
database.delete.delete_account(engine, mes.from_user.id, account_name)
db.delete.delete_account(engine, mes.from_user.id, account_name)
await send_tmp_message(bot, mes.chat.id, "Аккаунт удалён")
@ -500,7 +546,7 @@ async def help_command(bot: AsyncTeleBot, mes: Message) -> None:
async def export(bot: AsyncTeleBot, engine: Engine, mes: Message) -> None:
await base_handler(bot, mes)
master_password_from_db = database.get.get_master_pass(
master_password_from_db = db.get.get_master_pass(
engine,
mes.from_user.id,
)
@ -508,7 +554,7 @@ async def export(bot: AsyncTeleBot, engine: Engine, mes: Message) -> None:
if master_password_from_db is None:
return await send_tmp_message(bot, mes.chat.id, "Нет мастер пароля")
if not database.get.get_accounts(engine, mes.from_user.id):
if not db.get.get_account_names(engine, mes.from_user.id):
return await send_tmp_message(bot, mes.chat.id, "Нет аккаунтов")
bot_mes = await bot.send_message(mes.chat.id, "Отправьте мастер пароль")
@ -524,7 +570,7 @@ async def _export2(
if text == "/cancel":
return await send_tmp_message(bot, mes.chat.id, "Успешная отмена")
master_password = database.get.get_master_pass(
master_password = db.get.get_master_pass(
engine,
mes.from_user.id,
)
@ -535,7 +581,7 @@ async def _export2(
"Не подходит мастер пароль",
)
accounts = database.get.get_all_accounts(engine, mes.from_user.id)
accounts = db.get.get_accounts(engine, mes.from_user.id)
with ProcessPoolExecutor() as pool:
loop = asyncio.get_running_loop()
tasks = []
@ -547,7 +593,6 @@ async def _export2(
)
tasks.append(loop.run_in_executor(pool, function))
accounts = await asyncio.gather(*tasks)
accounts.sort(key=lambda account: account.name)
json_io = accounts_to_json(accounts)
await bot.send_document(
mes.chat.id,
@ -565,7 +610,7 @@ async def import_accounts(
mes: Message,
) -> None:
await base_handler(bot, mes)
master_password_from_db = database.get.get_master_pass(
master_password_from_db = db.get.get_master_pass(
engine,
mes.from_user.id,
)
@ -633,7 +678,7 @@ async def _import3(
if text == "/cancel":
return await send_tmp_message(bot, mes.chat.id, "Успешная отмена")
master_password = database.get.get_master_pass(
master_password = db.get.get_master_pass(
engine,
mes.from_user.id,
)
@ -647,25 +692,35 @@ async def _import3(
# List of names of accounts, which failed to be added to the database
# or failed the tests
failed: list[str] = []
for account in accounts:
if not check_account(account):
failed.append(account.name)
continue
account = encryption.accounts.encrypt(
account,
text,
)
result = database.add.add_account(engine, account)
if not result:
failed.append(account.name)
tasks: list[asyncio.Future[db.models.Account]] = []
loop = asyncio.get_running_loop()
with ProcessPoolExecutor() as pool:
for account in accounts:
if not check_account(account):
failed.append(account.name)
continue
function = functools.partial(
encryption.accounts.encrypt,
account,
text,
)
tasks.append(loop.run_in_executor(pool, function))
enc_accounts: list[db.models.Account] = await asyncio.gather(*tasks)
results = db.add.add_accounts(engine, enc_accounts)
failed_accounts = itertools.compress(
enc_accounts, (not result for result in results)
)
failed.extend((account.name for account in failed_accounts))
if failed:
mes_text = "Не удалось добавить:\n" + "\n".join(failed)
await send_deleteable_message(
bot, mes.chat.id, "Не удалось добавить:\n" + "\n".join(failed)
)
else:
mes_text = "Успех"
await send_tmp_message(bot, mes.chat.id, "Успех")
await send_tmp_message(bot, mes.chat.id, mes_text, 10)
del text, mes, accounts
del text, mes, accounts, function, tasks, failed_accounts
gc.collect()
@ -687,10 +742,20 @@ async def message_handler(bot: AsyncTeleBot, mes: Message) -> None:
await delete_message(bot, mes)
if mes.text.strip() == "/cancel":
await send_tmp_message(bot, mes.chat.id, "Нет активного действия")
return
await send_tmp_message(
bot,
mes.chat.id,
"Вы отправили не корректное сообщение",
)
return
await handler(mes)
try:
await handler(mes)
except Exception:
await send_tmp_message(
bot,
mes.chat.id,
"Произошла непредвиденная ошибка",
)
raise

View File

@ -1,31 +0,0 @@
import sqlmodel
from sqlalchemy.exc import IntegrityError
from sqlalchemy.future import Engine
from . import models
def add_account(engine: Engine, account: models.Account) -> bool:
"""Adds account to the database. Returns true on success,
false otherwise"""
try:
with sqlmodel.Session(engine) as session:
session.add(account)
session.commit()
except IntegrityError:
return False
else:
return True
def add_master_pass(engine: Engine, master_pass: models.MasterPass) -> bool:
"""Adds master password the database. Returns true on success,
false otherwise"""
try:
with sqlmodel.Session(engine) as session:
session.add(master_pass)
session.commit()
except IntegrityError:
return False
else:
return True

46
src/db/add.py Normal file
View File

@ -0,0 +1,46 @@
import sqlmodel
from sqlalchemy.exc import IntegrityError
from sqlalchemy.future import Engine
from . import models
def _add_model(
session: sqlmodel.Session, model: models.Account | models.MasterPass
) -> bool:
"""Adds model to the session. Returns true on success,
false otherwise"""
try:
session.add(model)
except IntegrityError:
return False
else:
return True
def add_account(engine: Engine, account: models.Account) -> bool:
"""Adds account to the database. Returns true on success,
false otherwise"""
with sqlmodel.Session(engine) as session:
result = _add_model(session, account)
session.commit()
return result
def add_master_pass(engine: Engine, master_pass: models.MasterPass) -> bool:
"""Adds master password the database. Returns true on success,
false otherwise"""
with sqlmodel.Session(engine) as session:
result = _add_model(session, master_pass)
session.commit()
return result
def add_accounts(
engine: Engine,
accounts: list[models.Account],
) -> list[bool]:
with sqlmodel.Session(engine) as session:
result = [_add_model(session, account) for account in accounts]
session.commit()
return result

View File

@ -17,7 +17,7 @@ def get_master_pass(
return result
def get_accounts(
def get_account_names(
engine: Engine,
user_id: int,
*,
@ -34,10 +34,14 @@ def get_accounts(
return result
def get_all_accounts(engine: Engine, user_id: int) -> list[models.Account]:
def get_accounts(engine: Engine, user_id: int) -> list[models.Account]:
"""Returns a list of accounts of a user"""
statement = sqlmodel.select(models.Account).where(
models.Account.user_id == user_id,
statement = (
sqlmodel.select(models.Account)
.where(
models.Account.user_id == user_id,
)
.order_by(models.Account.name)
)
with sqlmodel.Session(engine) as session:
result = session.exec(statement).fetchall()

View File

@ -4,13 +4,21 @@ import sqlmodel
class MasterPass(sqlmodel.SQLModel, table=True):
__tablename__ = "master_passwords"
user_id: int = sqlmodel.Field(
sa_column=sqlmodel.Column(sqlmodel.INT(), primary_key=True)
sa_column=sqlmodel.Column(
sqlmodel.INT(),
primary_key=True,
autoincrement=False,
)
)
salt: bytes = sqlmodel.Field(
sa_column=sqlmodel.Column(sqlmodel.BINARY(64), nullable=False)
sa_column=sqlmodel.Column(sqlmodel.BINARY(64), nullable=False),
max_length=64,
min_length=64,
)
password_hash: bytes = sqlmodel.Field(
sa_column=sqlmodel.Column(sqlmodel.BINARY(128), nullable=False)
sa_column=sqlmodel.Column(sqlmodel.BINARY(128), nullable=False),
max_length=128,
min_length=128,
)
@ -18,13 +26,17 @@ class Account(sqlmodel.SQLModel, table=True):
__tablename__ = "accounts"
__table_args__ = (sqlmodel.PrimaryKeyConstraint("user_id", "name"),)
user_id: int = sqlmodel.Field()
name: str = sqlmodel.Field(max_length=255)
name: str = sqlmodel.Field(max_length=256)
salt: bytes = sqlmodel.Field(
sa_column=sqlmodel.Column(sqlmodel.BINARY(64), nullable=False)
sa_column=sqlmodel.Column(sqlmodel.BINARY(64), nullable=False),
max_length=64,
min_length=64,
)
enc_login: bytes = sqlmodel.Field(
sa_column=sqlmodel.Column(sqlmodel.VARBINARY(256), nullable=False)
sa_column=sqlmodel.Column(sqlmodel.VARBINARY(256), nullable=False),
max_length=256,
)
enc_password: bytes = sqlmodel.Field(
sa_column=sqlmodel.Column(sqlmodel.VARBINARY(256), nullable=False)
sa_column=sqlmodel.Column(sqlmodel.VARBINARY(256), nullable=False),
max_length=256,
)

View File

@ -1,26 +1,43 @@
import base64
import os
from typing import Self
from cryptography.fernet import Fernet
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.ciphers.aead import ChaCha20Poly1305
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from ..database.models import Account
from ..db.models import Account
from ..decrypted_account import DecryptedAccount
def _generate_key(salt: bytes, master_pass: bytes) -> bytes:
"""Generates key for fernet encryption"""
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=salt,
iterations=100000,
backend=default_backend(),
)
key = base64.urlsafe_b64encode(kdf.derive(master_pass))
return key
class Cipher:
def __init__(self, key: bytes) -> None:
self._chacha = ChaCha20Poly1305(key)
@classmethod
def generate_cipher(cls, salt: bytes, password: bytes) -> Self:
"""Generates cipher which uses key derived from a given password"""
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=salt,
iterations=100000,
)
return cls(kdf.derive(password))
def encrypt(self, data: bytes) -> bytes:
nonce = os.urandom(12)
return nonce + self._chacha.encrypt(
nonce,
data,
associated_data=None,
)
def decrypt(self, data: bytes) -> bytes:
return self._chacha.decrypt(
nonce=data[:12],
data=data[12:],
associated_data=None,
)
def encrypt(
@ -29,15 +46,10 @@ def encrypt(
) -> Account:
"""Encrypts account using master password and returns Account object"""
salt = os.urandom(64)
key = _generate_key(salt, master_pass.encode("utf-8"))
f = Fernet(key)
cipher = Cipher.generate_cipher(salt, master_pass.encode("utf-8"))
enc_login = base64.urlsafe_b64decode(
f.encrypt(account.login.encode("utf-8")),
)
enc_password = base64.urlsafe_b64decode(
f.encrypt(account.password.encode("utf-8")),
)
enc_login = cipher.encrypt(account.login.encode("utf-8"))
enc_password = cipher.encrypt(account.password.encode("utf-8"))
return Account(
user_id=account.user_id,
@ -54,15 +66,10 @@ def decrypt(
) -> DecryptedAccount:
"""Decrypts account using master password and returns
DecryptedAccount object"""
key = _generate_key(account.salt, master_pass.encode("utf-8"))
f = Fernet(key)
cipher = Cipher.generate_cipher(account.salt, master_pass.encode("utf-8"))
login = f.decrypt(
base64.urlsafe_b64encode(account.enc_login),
).decode("utf-8")
password = f.decrypt(
base64.urlsafe_b64encode(account.enc_password),
).decode("utf-8")
login = cipher.decrypt(account.enc_login).decode("utf-8")
password = cipher.decrypt(account.enc_password).decode("utf-8")
return DecryptedAccount(
user_id=account.user_id,

View File

@ -3,7 +3,7 @@ import os
from cryptography.exceptions import InvalidKey
from cryptography.hazmat.primitives.kdf.scrypt import Scrypt
from ..database.models import MasterPass
from ..db.models import MasterPass
MEMORY_USAGE = 2**14