8 Commits

5 changed files with 179 additions and 65 deletions

View File

@ -3,7 +3,8 @@ import string
from .decrypted_account import DecryptedAccount from .decrypted_account import DecryptedAccount
FORBIDDEN_CHARS = frozenset("`\n\\") FORBIDDEN_CHARS = frozenset("`\n\\")
PUNCTUATION = frozenset(string.punctuation).difference(FORBIDDEN_CHARS) FULL_PUNCTUATION = frozenset(string.punctuation)
PUNCTUATION = FULL_PUNCTUATION.difference(FORBIDDEN_CHARS)
def _base_check(val: str, /) -> bool: def _base_check(val: str, /) -> bool:
@ -21,9 +22,9 @@ def check_login(login: str) -> bool:
return _base_check(login) return _base_check(login)
def check_password(passwd: str) -> bool: def check_password(password: str) -> bool:
"Returns true if password is valid" "Returns true if password is valid"
return _base_check(passwd) return _base_check(password)
def check_account(account: DecryptedAccount) -> bool: def check_account(account: DecryptedAccount) -> bool:
@ -37,14 +38,28 @@ def check_account(account: DecryptedAccount) -> bool:
) )
def check_gened_password(passwd: str, /) -> bool: def check_gened_password(password: str, /) -> bool:
"""Retuns true if generated password is valid, """Retuns true if generated password is valid,
false otherwise. false otherwise.
Password is valid if there is at least one lowercase character, Password is valid if there is at least one lowercase character,
uppercase character and one punctuation character""" uppercase character and one punctuation character"""
return ( return (
any(c.islower() for c in passwd) any(c.islower() for c in password)
and any(c.isupper() for c in passwd) and any(c.isupper() for c in password)
and any(c.isdigit() for c in passwd) and any(c.isdigit() for c in password)
and any(c in PUNCTUATION for c in passwd) and any(c in PUNCTUATION for c in password)
)
def check_master_password(password: str) -> bool:
"""Returns True if master password is valid.
Master password has to have at least one lowercase letter,
one uppercase letter, one digit, one punctuation character
and length must be at least 8"""
return (
len(password) >= 8
and any(c.islower() for c in password)
and any(c.isupper() for c in password)
and any(c.isdigit() for c in password)
and any(c in FULL_PUNCTUATION for c in password)
) )

View File

@ -14,6 +14,7 @@ from ..account_checks import (
check_account_name, check_account_name,
check_login, check_login,
check_password, check_password,
check_master_password,
) )
from ..account_parsing import accounts_to_json, json_to_accounts from ..account_parsing import accounts_to_json, json_to_accounts
from ..decrypted_account import DecryptedAccount from ..decrypted_account import DecryptedAccount
@ -58,21 +59,31 @@ async def get_accounts(
async def delete_all(bot: AsyncTeleBot, engine: Engine, mes: Message) -> None: async def delete_all(bot: AsyncTeleBot, engine: Engine, mes: Message) -> None:
await base_handler(bot, mes) await base_handler(bot, mes)
master_pass = db.get.get_master_pass(engine, mes.from_user.id)
if master_pass is None:
await send_tmp_message(bot, mes.chat.id, "У вас нет мастер пароля")
return
bot_mes = await bot.send_message( bot_mes = await bot.send_message(
mes.chat.id, mes.chat.id,
"Вы действительно хотите удалить все ваши аккаунты? Это действие " "Вы действительно хотите удалить все ваши аккаунты? Это действие "
"нельзя отменить. " "нельзя отменить. "
"Отправьте YES для подтверждения", "Отправьте мастер пароль для подтверждения",
)
register_state(
mes, functools.partial(_delete_all2, bot, engine, master_pass, bot_mes)
) )
register_state(mes, functools.partial(_delete_all2, bot, engine, bot_mes))
async def _delete_all2( async def _delete_all2(
bot: AsyncTeleBot, engine: Engine, prev_mes: Message, mes: Message bot: AsyncTeleBot,
engine: Engine,
master_pass: db.models.MasterPass,
prev_mes: Message,
mes: Message,
) -> None: ) -> None:
await base_handler(bot, mes, prev_mes) await base_handler(bot, mes, prev_mes)
text = mes.text.strip() text = mes.text.strip()
if text == "YES": if encryption.master_pass.verify_master_pass(text, master_pass):
db.delete.purge_accounts(engine, mes.from_user.id) db.delete.purge_accounts(engine, mes.from_user.id)
db.delete.delete_master_pass(engine, mes.from_user.id) db.delete.delete_master_pass(engine, mes.from_user.id)
await send_tmp_message( await send_tmp_message(
@ -85,7 +96,7 @@ async def _delete_all2(
await send_tmp_message( await send_tmp_message(
bot, bot,
mes.chat.id, mes.chat.id,
"Вы отправили не YES, ничего не удалено", "Вы отправили не верный мастер пароль, ничего не удалено",
) )
@ -116,6 +127,17 @@ async def _set_master_pass2(
if text == "/cancel": if text == "/cancel":
return await send_tmp_message(bot, mes.chat.id, "Успешная отмена") return await send_tmp_message(bot, mes.chat.id, "Успешная отмена")
if not check_master_password(text):
await send_tmp_message(
bot,
mes.chat.id,
"Не подходящий мастер пароль\\. Он должен быть не меньше "
"8 символов, иметь хотя бы один символ в нижнем и "
"верхнем регистре, хотя бы один специальный символ",
sleep_time=10,
)
return
master_password = encryption.master_pass.encrypt_master_pass( master_password = encryption.master_pass.encrypt_master_pass(
mes.from_user.id, mes.from_user.id,
text, text,
@ -134,7 +156,8 @@ async def reset_master_pass(
) -> None: ) -> None:
await base_handler(bot, mes) await base_handler(bot, mes)
if db.get.get_master_pass(engine, mes.from_user.id) is None: master_pass = db.get.get_master_pass(engine, mes.from_user.id)
if master_pass is None:
return await send_tmp_message( return await send_tmp_message(
bot, bot,
mes.chat.id, mes.chat.id,
@ -143,17 +166,48 @@ async def reset_master_pass(
bot_mes = await bot.send_message( bot_mes = await bot.send_message(
mes.chat.id, mes.chat.id,
"Отправьте новый мастер пароль, осторожно, все текущие аккаунты " "Отправьте текущий мастер пароль",
"будут удалены навсегда",
) )
register_state( register_state(
mes, mes,
functools.partial(_reset_master_pass2, bot, engine, bot_mes), functools.partial(
_reset_master_pass2,
bot,
engine,
master_pass,
bot_mes,
),
) )
async def _reset_master_pass2( async def _reset_master_pass2(
bot: AsyncTeleBot,
engine: Engine,
master_pass: db.models.MasterPass,
prev_mes: Message,
mes: Message,
) -> None:
await base_handler(bot, mes, prev_mes)
text = mes.text.strip()
if text == "/cancel":
await send_tmp_message(bot, mes.chat.id, "Успешная отмена")
if not encryption.master_pass.verify_master_pass(text, master_pass):
await send_tmp_message(bot, mes.chat.id, "Неверный мастер пароль")
return
bot_mes = await bot.send_message(
mes.chat.id,
"Отправьте новый мастер пароль. Осторожно, все аккаунты будут удалены",
)
register_state(
mes,
functools.partial(_reset_master_pass3, bot, engine, bot_mes),
)
async def _reset_master_pass3(
bot: AsyncTeleBot, engine: Engine, prev_mes: Message, mes: Message bot: AsyncTeleBot, engine: Engine, prev_mes: Message, mes: Message
) -> None: ) -> None:
await base_handler(bot, mes, prev_mes) await base_handler(bot, mes, prev_mes)
@ -161,6 +215,17 @@ async def _reset_master_pass2(
if text == "/cancel": if text == "/cancel":
return await send_tmp_message(bot, mes.chat.id, "Успешная отмена") return await send_tmp_message(bot, mes.chat.id, "Успешная отмена")
if not check_master_password(text):
await send_tmp_message(
bot,
mes.chat.id,
"Не подходящий мастер пароль\\. Он должен быть не меньше "
"8 символов, иметь хотя бы один символ в нижнем и "
"верхнем регистре, хотя бы один специальный символ",
sleep_time=10,
)
return
master_password = encryption.master_pass.encrypt_master_pass( master_password = encryption.master_pass.encrypt_master_pass(
mes.from_user.id, mes.from_user.id,
text, text,
@ -292,7 +357,7 @@ async def _add_account5(
return await send_tmp_message(bot, mes.chat.id, "Успешная отмена") return await send_tmp_message(bot, mes.chat.id, "Успешная отмена")
master_password = db.get.get_master_pass(engine, mes.from_user.id) master_password = db.get.get_master_pass(engine, mes.from_user.id)
if not encryption.master_pass.check_master_pass(text, master_password): if not encryption.master_pass.verify_master_pass(text, master_password):
return await send_tmp_message( return await send_tmp_message(
bot, bot,
mes.chat.id, mes.chat.id,
@ -382,7 +447,7 @@ async def _get_account3(
mes.from_user.id, mes.from_user.id,
) )
if not encryption.master_pass.check_master_pass(text, master_password): if not encryption.master_pass.verify_master_pass(text, master_password):
return await send_tmp_message( return await send_tmp_message(
bot, bot,
mes.chat.id, mes.chat.id,
@ -428,13 +493,14 @@ async def delete_account(
register_state( register_state(
mes, mes,
functools.partial(_delete_account2, bot, engine, bot_mes), functools.partial(_delete_account2, bot, engine, master_pass, bot_mes),
) )
async def _delete_account2( async def _delete_account2(
bot: AsyncTeleBot, bot: AsyncTeleBot,
engine: Engine, engine: Engine,
master_pass: db.models.MasterPass,
prev_mes: Message, prev_mes: Message,
mes: Message, mes: Message,
): ):
@ -448,27 +514,36 @@ async def _delete_account2(
bot_mes = await bot.send_message( bot_mes = await bot.send_message(
mes.from_user.id, mes.from_user.id,
f'Вы уверены, что хотите удалить аккаунт "{text}"?\nОтправьте YES для ' f'Вы уверены, что хотите удалить аккаунт "{text}"?\nОтправьте мастер '
"подтверждения", "пароль для подтверждения",
) )
register_state( register_state(
mes, mes,
functools.partial(_delete_account3, bot, engine, bot_mes, text), functools.partial(
_delete_account3,
bot,
engine,
master_pass,
bot_mes,
text,
),
) )
async def _delete_account3( async def _delete_account3(
bot: AsyncTeleBot, bot: AsyncTeleBot,
engine: Engine, engine: Engine,
master_pass: db.models.MasterPass,
prev_mes: Message, prev_mes: Message,
account_name: str, account_name: str,
mes: Message, mes: Message,
) -> None: ) -> None:
await base_handler(bot, mes, prev_mes) await base_handler(bot, mes, prev_mes)
text = mes.text.strip() text = mes.text.strip()
if text != "YES": if not encryption.master_pass.verify_master_pass(text, master_pass):
return await send_tmp_message(bot, mes.chat.id, "Успешная отмена") await send_tmp_message(bot, mes.chat.id, "Неверный пароль")
return
db.delete.delete_account(engine, mes.from_user.id, account_name) db.delete.delete_account(engine, mes.from_user.id, account_name)
await send_tmp_message(bot, mes.chat.id, "Аккаунт удалён") await send_tmp_message(bot, mes.chat.id, "Аккаунт удалён")
@ -522,7 +597,7 @@ async def _export2(
engine, engine,
mes.from_user.id, mes.from_user.id,
) )
if not encryption.master_pass.check_master_pass(text, master_password): if not encryption.master_pass.verify_master_pass(text, master_password):
return await send_tmp_message( return await send_tmp_message(
bot, bot,
mes.chat.id, mes.chat.id,
@ -630,7 +705,7 @@ async def _import3(
engine, engine,
mes.from_user.id, mes.from_user.id,
) )
if not encryption.master_pass.check_master_pass(text, master_password): if not encryption.master_pass.verify_master_pass(text, master_password):
return await send_tmp_message( return await send_tmp_message(
bot, bot,
mes.chat.id, mes.chat.id,
@ -697,4 +772,13 @@ async def message_handler(bot: AsyncTeleBot, mes: Message) -> None:
"Вы отправили не корректное сообщение", "Вы отправили не корректное сообщение",
) )
return return
try:
await handler(mes) await handler(mes)
except Exception:
await send_tmp_message(
bot,
mes.chat.id,
"Произошла непредвиденная ошибка",
)
raise

View File

@ -11,10 +11,14 @@ class MasterPass(sqlmodel.SQLModel, table=True):
) )
) )
salt: bytes = sqlmodel.Field( salt: bytes = sqlmodel.Field(
sa_column=sqlmodel.Column(sqlmodel.BINARY(64), nullable=False) sa_column=sqlmodel.Column(sqlmodel.BINARY(64), nullable=False),
max_length=64,
min_length=64,
) )
password_hash: bytes = sqlmodel.Field( password_hash: bytes = sqlmodel.Field(
sa_column=sqlmodel.Column(sqlmodel.BINARY(128), nullable=False) sa_column=sqlmodel.Column(sqlmodel.BINARY(128), nullable=False),
max_length=128,
min_length=128,
) )
@ -22,13 +26,17 @@ class Account(sqlmodel.SQLModel, table=True):
__tablename__ = "accounts" __tablename__ = "accounts"
__table_args__ = (sqlmodel.PrimaryKeyConstraint("user_id", "name"),) __table_args__ = (sqlmodel.PrimaryKeyConstraint("user_id", "name"),)
user_id: int = sqlmodel.Field() user_id: int = sqlmodel.Field()
name: str = sqlmodel.Field(max_length=255) name: str = sqlmodel.Field(max_length=256)
salt: bytes = sqlmodel.Field( salt: bytes = sqlmodel.Field(
sa_column=sqlmodel.Column(sqlmodel.BINARY(64), nullable=False) sa_column=sqlmodel.Column(sqlmodel.BINARY(64), nullable=False),
max_length=64,
min_length=64,
) )
enc_login: bytes = sqlmodel.Field( enc_login: bytes = sqlmodel.Field(
sa_column=sqlmodel.Column(sqlmodel.VARBINARY(256), nullable=False) sa_column=sqlmodel.Column(sqlmodel.VARBINARY(256), nullable=False),
max_length=256,
) )
enc_password: bytes = sqlmodel.Field( enc_password: bytes = sqlmodel.Field(
sa_column=sqlmodel.Column(sqlmodel.VARBINARY(256), nullable=False) sa_column=sqlmodel.Column(sqlmodel.VARBINARY(256), nullable=False),
max_length=256,
) )

View File

@ -1,26 +1,43 @@
import base64
import os import os
from typing import Self
from cryptography.fernet import Fernet
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.ciphers.aead import ChaCha20Poly1305
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from ..db.models import Account from ..db.models import Account
from ..decrypted_account import DecryptedAccount from ..decrypted_account import DecryptedAccount
def _generate_key(salt: bytes, master_pass: bytes) -> bytes: class Cipher:
"""Generates key for fernet encryption""" def __init__(self, key: bytes) -> None:
self._chacha = ChaCha20Poly1305(key)
@classmethod
def generate_cipher(cls, salt: bytes, password: bytes) -> Self:
"""Generates cipher which uses key derived from a given password"""
kdf = PBKDF2HMAC( kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(), algorithm=hashes.SHA256(),
length=32, length=32,
salt=salt, salt=salt,
iterations=100000, iterations=480000,
backend=default_backend(), )
return cls(kdf.derive(password))
def encrypt(self, data: bytes) -> bytes:
nonce = os.urandom(12)
return nonce + self._chacha.encrypt(
nonce,
data,
associated_data=None,
)
def decrypt(self, data: bytes) -> bytes:
return self._chacha.decrypt(
nonce=data[:12],
data=data[12:],
associated_data=None,
) )
key = base64.urlsafe_b64encode(kdf.derive(master_pass))
return key
def encrypt( def encrypt(
@ -29,15 +46,10 @@ def encrypt(
) -> Account: ) -> Account:
"""Encrypts account using master password and returns Account object""" """Encrypts account using master password and returns Account object"""
salt = os.urandom(64) salt = os.urandom(64)
key = _generate_key(salt, master_pass.encode("utf-8")) cipher = Cipher.generate_cipher(salt, master_pass.encode("utf-8"))
f = Fernet(key)
enc_login = base64.urlsafe_b64decode( enc_login = cipher.encrypt(account.login.encode("utf-8"))
f.encrypt(account.login.encode("utf-8")), enc_password = cipher.encrypt(account.password.encode("utf-8"))
)
enc_password = base64.urlsafe_b64decode(
f.encrypt(account.password.encode("utf-8")),
)
return Account( return Account(
user_id=account.user_id, user_id=account.user_id,
@ -54,15 +66,10 @@ def decrypt(
) -> DecryptedAccount: ) -> DecryptedAccount:
"""Decrypts account using master password and returns """Decrypts account using master password and returns
DecryptedAccount object""" DecryptedAccount object"""
key = _generate_key(account.salt, master_pass.encode("utf-8")) cipher = Cipher.generate_cipher(account.salt, master_pass.encode("utf-8"))
f = Fernet(key)
login = f.decrypt( login = cipher.decrypt(account.enc_login).decode("utf-8")
base64.urlsafe_b64encode(account.enc_login), password = cipher.decrypt(account.enc_password).decode("utf-8")
).decode("utf-8")
password = f.decrypt(
base64.urlsafe_b64encode(account.enc_password),
).decode("utf-8")
return DecryptedAccount( return DecryptedAccount(
user_id=account.user_id, user_id=account.user_id,

View File

@ -31,7 +31,7 @@ def encrypt_master_pass(user_id: int, password: str) -> MasterPass:
) )
def check_master_pass(password: str, master_password: MasterPass) -> bool: def verify_master_pass(password: str, master_password: MasterPass) -> bool:
"""Checks if the master password is correct""" """Checks if the master password is correct"""
kdf = _get_kdf(master_password.salt) kdf = _get_kdf(master_password.salt)
try: try: