Compare commits
8 Commits
d79b57b1f0
...
unstable
Author | SHA1 | Date | |
---|---|---|---|
4ef018ccfc | |||
aa36f2eb82 | |||
931e93fbde | |||
972c5577f4 | |||
671286dc39 | |||
5dbf93013a | |||
c051c14f1f | |||
3686195396 |
@ -3,7 +3,8 @@ import string
|
||||
from .decrypted_account import DecryptedAccount
|
||||
|
||||
FORBIDDEN_CHARS = frozenset("`\n\\")
|
||||
PUNCTUATION = frozenset(string.punctuation).difference(FORBIDDEN_CHARS)
|
||||
FULL_PUNCTUATION = frozenset(string.punctuation)
|
||||
PUNCTUATION = FULL_PUNCTUATION.difference(FORBIDDEN_CHARS)
|
||||
|
||||
|
||||
def _base_check(val: str, /) -> bool:
|
||||
@ -21,9 +22,9 @@ def check_login(login: str) -> bool:
|
||||
return _base_check(login)
|
||||
|
||||
|
||||
def check_password(passwd: str) -> bool:
|
||||
def check_password(password: str) -> bool:
|
||||
"Returns true if password is valid"
|
||||
return _base_check(passwd)
|
||||
return _base_check(password)
|
||||
|
||||
|
||||
def check_account(account: DecryptedAccount) -> bool:
|
||||
@ -37,14 +38,28 @@ def check_account(account: DecryptedAccount) -> bool:
|
||||
)
|
||||
|
||||
|
||||
def check_gened_password(passwd: str, /) -> bool:
|
||||
def check_gened_password(password: str, /) -> bool:
|
||||
"""Retuns true if generated password is valid,
|
||||
false otherwise.
|
||||
Password is valid if there is at least one lowercase character,
|
||||
uppercase character and one punctuation character"""
|
||||
return (
|
||||
any(c.islower() for c in passwd)
|
||||
and any(c.isupper() for c in passwd)
|
||||
and any(c.isdigit() for c in passwd)
|
||||
and any(c in PUNCTUATION for c in passwd)
|
||||
any(c.islower() for c in password)
|
||||
and any(c.isupper() for c in password)
|
||||
and any(c.isdigit() for c in password)
|
||||
and any(c in PUNCTUATION for c in password)
|
||||
)
|
||||
|
||||
|
||||
def check_master_password(password: str) -> bool:
|
||||
"""Returns True if master password is valid.
|
||||
Master password has to have at least one lowercase letter,
|
||||
one uppercase letter, one digit, one punctuation character
|
||||
and length must be at least 8"""
|
||||
return (
|
||||
len(password) >= 8
|
||||
and any(c.islower() for c in password)
|
||||
and any(c.isupper() for c in password)
|
||||
and any(c.isdigit() for c in password)
|
||||
and any(c in FULL_PUNCTUATION for c in password)
|
||||
)
|
||||
|
@ -14,6 +14,7 @@ from ..account_checks import (
|
||||
check_account_name,
|
||||
check_login,
|
||||
check_password,
|
||||
check_master_password,
|
||||
)
|
||||
from ..account_parsing import accounts_to_json, json_to_accounts
|
||||
from ..decrypted_account import DecryptedAccount
|
||||
@ -58,21 +59,31 @@ async def get_accounts(
|
||||
|
||||
async def delete_all(bot: AsyncTeleBot, engine: Engine, mes: Message) -> None:
|
||||
await base_handler(bot, mes)
|
||||
master_pass = db.get.get_master_pass(engine, mes.from_user.id)
|
||||
if master_pass is None:
|
||||
await send_tmp_message(bot, mes.chat.id, "У вас нет мастер пароля")
|
||||
return
|
||||
bot_mes = await bot.send_message(
|
||||
mes.chat.id,
|
||||
"Вы действительно хотите удалить все ваши аккаунты? Это действие "
|
||||
"нельзя отменить. "
|
||||
"Отправьте YES для подтверждения",
|
||||
"Отправьте мастер пароль для подтверждения",
|
||||
)
|
||||
register_state(
|
||||
mes, functools.partial(_delete_all2, bot, engine, master_pass, bot_mes)
|
||||
)
|
||||
register_state(mes, functools.partial(_delete_all2, bot, engine, bot_mes))
|
||||
|
||||
|
||||
async def _delete_all2(
|
||||
bot: AsyncTeleBot, engine: Engine, prev_mes: Message, mes: Message
|
||||
bot: AsyncTeleBot,
|
||||
engine: Engine,
|
||||
master_pass: db.models.MasterPass,
|
||||
prev_mes: Message,
|
||||
mes: Message,
|
||||
) -> None:
|
||||
await base_handler(bot, mes, prev_mes)
|
||||
text = mes.text.strip()
|
||||
if text == "YES":
|
||||
if encryption.master_pass.verify_master_pass(text, master_pass):
|
||||
db.delete.purge_accounts(engine, mes.from_user.id)
|
||||
db.delete.delete_master_pass(engine, mes.from_user.id)
|
||||
await send_tmp_message(
|
||||
@ -85,7 +96,7 @@ async def _delete_all2(
|
||||
await send_tmp_message(
|
||||
bot,
|
||||
mes.chat.id,
|
||||
"Вы отправили не YES, ничего не удалено",
|
||||
"Вы отправили не верный мастер пароль, ничего не удалено",
|
||||
)
|
||||
|
||||
|
||||
@ -116,6 +127,17 @@ async def _set_master_pass2(
|
||||
if text == "/cancel":
|
||||
return await send_tmp_message(bot, mes.chat.id, "Успешная отмена")
|
||||
|
||||
if not check_master_password(text):
|
||||
await send_tmp_message(
|
||||
bot,
|
||||
mes.chat.id,
|
||||
"Не подходящий мастер пароль\\. Он должен быть не меньше "
|
||||
"8 символов, иметь хотя бы один символ в нижнем и "
|
||||
"верхнем регистре, хотя бы один специальный символ",
|
||||
sleep_time=10,
|
||||
)
|
||||
return
|
||||
|
||||
master_password = encryption.master_pass.encrypt_master_pass(
|
||||
mes.from_user.id,
|
||||
text,
|
||||
@ -134,7 +156,8 @@ async def reset_master_pass(
|
||||
) -> None:
|
||||
await base_handler(bot, mes)
|
||||
|
||||
if db.get.get_master_pass(engine, mes.from_user.id) is None:
|
||||
master_pass = db.get.get_master_pass(engine, mes.from_user.id)
|
||||
if master_pass is None:
|
||||
return await send_tmp_message(
|
||||
bot,
|
||||
mes.chat.id,
|
||||
@ -143,17 +166,48 @@ async def reset_master_pass(
|
||||
|
||||
bot_mes = await bot.send_message(
|
||||
mes.chat.id,
|
||||
"Отправьте новый мастер пароль, осторожно, все текущие аккаунты "
|
||||
"будут удалены навсегда",
|
||||
"Отправьте текущий мастер пароль",
|
||||
)
|
||||
|
||||
register_state(
|
||||
mes,
|
||||
functools.partial(_reset_master_pass2, bot, engine, bot_mes),
|
||||
functools.partial(
|
||||
_reset_master_pass2,
|
||||
bot,
|
||||
engine,
|
||||
master_pass,
|
||||
bot_mes,
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
async def _reset_master_pass2(
|
||||
bot: AsyncTeleBot,
|
||||
engine: Engine,
|
||||
master_pass: db.models.MasterPass,
|
||||
prev_mes: Message,
|
||||
mes: Message,
|
||||
) -> None:
|
||||
await base_handler(bot, mes, prev_mes)
|
||||
text = mes.text.strip()
|
||||
if text == "/cancel":
|
||||
await send_tmp_message(bot, mes.chat.id, "Успешная отмена")
|
||||
|
||||
if not encryption.master_pass.verify_master_pass(text, master_pass):
|
||||
await send_tmp_message(bot, mes.chat.id, "Неверный мастер пароль")
|
||||
return
|
||||
|
||||
bot_mes = await bot.send_message(
|
||||
mes.chat.id,
|
||||
"Отправьте новый мастер пароль. Осторожно, все аккаунты будут удалены",
|
||||
)
|
||||
register_state(
|
||||
mes,
|
||||
functools.partial(_reset_master_pass3, bot, engine, bot_mes),
|
||||
)
|
||||
|
||||
|
||||
async def _reset_master_pass3(
|
||||
bot: AsyncTeleBot, engine: Engine, prev_mes: Message, mes: Message
|
||||
) -> None:
|
||||
await base_handler(bot, mes, prev_mes)
|
||||
@ -161,6 +215,17 @@ async def _reset_master_pass2(
|
||||
if text == "/cancel":
|
||||
return await send_tmp_message(bot, mes.chat.id, "Успешная отмена")
|
||||
|
||||
if not check_master_password(text):
|
||||
await send_tmp_message(
|
||||
bot,
|
||||
mes.chat.id,
|
||||
"Не подходящий мастер пароль\\. Он должен быть не меньше "
|
||||
"8 символов, иметь хотя бы один символ в нижнем и "
|
||||
"верхнем регистре, хотя бы один специальный символ",
|
||||
sleep_time=10,
|
||||
)
|
||||
return
|
||||
|
||||
master_password = encryption.master_pass.encrypt_master_pass(
|
||||
mes.from_user.id,
|
||||
text,
|
||||
@ -292,7 +357,7 @@ async def _add_account5(
|
||||
return await send_tmp_message(bot, mes.chat.id, "Успешная отмена")
|
||||
|
||||
master_password = db.get.get_master_pass(engine, mes.from_user.id)
|
||||
if not encryption.master_pass.check_master_pass(text, master_password):
|
||||
if not encryption.master_pass.verify_master_pass(text, master_password):
|
||||
return await send_tmp_message(
|
||||
bot,
|
||||
mes.chat.id,
|
||||
@ -382,7 +447,7 @@ async def _get_account3(
|
||||
mes.from_user.id,
|
||||
)
|
||||
|
||||
if not encryption.master_pass.check_master_pass(text, master_password):
|
||||
if not encryption.master_pass.verify_master_pass(text, master_password):
|
||||
return await send_tmp_message(
|
||||
bot,
|
||||
mes.chat.id,
|
||||
@ -428,13 +493,14 @@ async def delete_account(
|
||||
|
||||
register_state(
|
||||
mes,
|
||||
functools.partial(_delete_account2, bot, engine, bot_mes),
|
||||
functools.partial(_delete_account2, bot, engine, master_pass, bot_mes),
|
||||
)
|
||||
|
||||
|
||||
async def _delete_account2(
|
||||
bot: AsyncTeleBot,
|
||||
engine: Engine,
|
||||
master_pass: db.models.MasterPass,
|
||||
prev_mes: Message,
|
||||
mes: Message,
|
||||
):
|
||||
@ -448,27 +514,36 @@ async def _delete_account2(
|
||||
|
||||
bot_mes = await bot.send_message(
|
||||
mes.from_user.id,
|
||||
f'Вы уверены, что хотите удалить аккаунт "{text}"?\nОтправьте YES для '
|
||||
"подтверждения",
|
||||
f'Вы уверены, что хотите удалить аккаунт "{text}"?\nОтправьте мастер '
|
||||
"пароль для подтверждения",
|
||||
)
|
||||
|
||||
register_state(
|
||||
mes,
|
||||
functools.partial(_delete_account3, bot, engine, bot_mes, text),
|
||||
functools.partial(
|
||||
_delete_account3,
|
||||
bot,
|
||||
engine,
|
||||
master_pass,
|
||||
bot_mes,
|
||||
text,
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
async def _delete_account3(
|
||||
bot: AsyncTeleBot,
|
||||
engine: Engine,
|
||||
master_pass: db.models.MasterPass,
|
||||
prev_mes: Message,
|
||||
account_name: str,
|
||||
mes: Message,
|
||||
) -> None:
|
||||
await base_handler(bot, mes, prev_mes)
|
||||
text = mes.text.strip()
|
||||
if text != "YES":
|
||||
return await send_tmp_message(bot, mes.chat.id, "Успешная отмена")
|
||||
if not encryption.master_pass.verify_master_pass(text, master_pass):
|
||||
await send_tmp_message(bot, mes.chat.id, "Неверный пароль")
|
||||
return
|
||||
|
||||
db.delete.delete_account(engine, mes.from_user.id, account_name)
|
||||
await send_tmp_message(bot, mes.chat.id, "Аккаунт удалён")
|
||||
@ -522,7 +597,7 @@ async def _export2(
|
||||
engine,
|
||||
mes.from_user.id,
|
||||
)
|
||||
if not encryption.master_pass.check_master_pass(text, master_password):
|
||||
if not encryption.master_pass.verify_master_pass(text, master_password):
|
||||
return await send_tmp_message(
|
||||
bot,
|
||||
mes.chat.id,
|
||||
@ -630,7 +705,7 @@ async def _import3(
|
||||
engine,
|
||||
mes.from_user.id,
|
||||
)
|
||||
if not encryption.master_pass.check_master_pass(text, master_password):
|
||||
if not encryption.master_pass.verify_master_pass(text, master_password):
|
||||
return await send_tmp_message(
|
||||
bot,
|
||||
mes.chat.id,
|
||||
@ -697,4 +772,13 @@ async def message_handler(bot: AsyncTeleBot, mes: Message) -> None:
|
||||
"Вы отправили не корректное сообщение",
|
||||
)
|
||||
return
|
||||
await handler(mes)
|
||||
|
||||
try:
|
||||
await handler(mes)
|
||||
except Exception:
|
||||
await send_tmp_message(
|
||||
bot,
|
||||
mes.chat.id,
|
||||
"Произошла непредвиденная ошибка",
|
||||
)
|
||||
raise
|
||||
|
@ -11,10 +11,14 @@ class MasterPass(sqlmodel.SQLModel, table=True):
|
||||
)
|
||||
)
|
||||
salt: bytes = sqlmodel.Field(
|
||||
sa_column=sqlmodel.Column(sqlmodel.BINARY(64), nullable=False)
|
||||
sa_column=sqlmodel.Column(sqlmodel.BINARY(64), nullable=False),
|
||||
max_length=64,
|
||||
min_length=64,
|
||||
)
|
||||
password_hash: bytes = sqlmodel.Field(
|
||||
sa_column=sqlmodel.Column(sqlmodel.BINARY(128), nullable=False)
|
||||
sa_column=sqlmodel.Column(sqlmodel.BINARY(128), nullable=False),
|
||||
max_length=128,
|
||||
min_length=128,
|
||||
)
|
||||
|
||||
|
||||
@ -22,13 +26,17 @@ class Account(sqlmodel.SQLModel, table=True):
|
||||
__tablename__ = "accounts"
|
||||
__table_args__ = (sqlmodel.PrimaryKeyConstraint("user_id", "name"),)
|
||||
user_id: int = sqlmodel.Field()
|
||||
name: str = sqlmodel.Field(max_length=255)
|
||||
name: str = sqlmodel.Field(max_length=256)
|
||||
salt: bytes = sqlmodel.Field(
|
||||
sa_column=sqlmodel.Column(sqlmodel.BINARY(64), nullable=False)
|
||||
sa_column=sqlmodel.Column(sqlmodel.BINARY(64), nullable=False),
|
||||
max_length=64,
|
||||
min_length=64,
|
||||
)
|
||||
enc_login: bytes = sqlmodel.Field(
|
||||
sa_column=sqlmodel.Column(sqlmodel.VARBINARY(256), nullable=False)
|
||||
sa_column=sqlmodel.Column(sqlmodel.VARBINARY(256), nullable=False),
|
||||
max_length=256,
|
||||
)
|
||||
enc_password: bytes = sqlmodel.Field(
|
||||
sa_column=sqlmodel.Column(sqlmodel.VARBINARY(256), nullable=False)
|
||||
sa_column=sqlmodel.Column(sqlmodel.VARBINARY(256), nullable=False),
|
||||
max_length=256,
|
||||
)
|
||||
|
@ -1,26 +1,43 @@
|
||||
import base64
|
||||
import os
|
||||
from typing import Self
|
||||
|
||||
from cryptography.fernet import Fernet
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
from cryptography.hazmat.primitives import hashes
|
||||
from cryptography.hazmat.primitives.ciphers.aead import ChaCha20Poly1305
|
||||
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
|
||||
|
||||
from ..db.models import Account
|
||||
from ..decrypted_account import DecryptedAccount
|
||||
|
||||
|
||||
def _generate_key(salt: bytes, master_pass: bytes) -> bytes:
|
||||
"""Generates key for fernet encryption"""
|
||||
kdf = PBKDF2HMAC(
|
||||
algorithm=hashes.SHA256(),
|
||||
length=32,
|
||||
salt=salt,
|
||||
iterations=100000,
|
||||
backend=default_backend(),
|
||||
)
|
||||
key = base64.urlsafe_b64encode(kdf.derive(master_pass))
|
||||
return key
|
||||
class Cipher:
|
||||
def __init__(self, key: bytes) -> None:
|
||||
self._chacha = ChaCha20Poly1305(key)
|
||||
|
||||
@classmethod
|
||||
def generate_cipher(cls, salt: bytes, password: bytes) -> Self:
|
||||
"""Generates cipher which uses key derived from a given password"""
|
||||
kdf = PBKDF2HMAC(
|
||||
algorithm=hashes.SHA256(),
|
||||
length=32,
|
||||
salt=salt,
|
||||
iterations=480000,
|
||||
)
|
||||
return cls(kdf.derive(password))
|
||||
|
||||
def encrypt(self, data: bytes) -> bytes:
|
||||
nonce = os.urandom(12)
|
||||
return nonce + self._chacha.encrypt(
|
||||
nonce,
|
||||
data,
|
||||
associated_data=None,
|
||||
)
|
||||
|
||||
def decrypt(self, data: bytes) -> bytes:
|
||||
return self._chacha.decrypt(
|
||||
nonce=data[:12],
|
||||
data=data[12:],
|
||||
associated_data=None,
|
||||
)
|
||||
|
||||
|
||||
def encrypt(
|
||||
@ -29,15 +46,10 @@ def encrypt(
|
||||
) -> Account:
|
||||
"""Encrypts account using master password and returns Account object"""
|
||||
salt = os.urandom(64)
|
||||
key = _generate_key(salt, master_pass.encode("utf-8"))
|
||||
f = Fernet(key)
|
||||
cipher = Cipher.generate_cipher(salt, master_pass.encode("utf-8"))
|
||||
|
||||
enc_login = base64.urlsafe_b64decode(
|
||||
f.encrypt(account.login.encode("utf-8")),
|
||||
)
|
||||
enc_password = base64.urlsafe_b64decode(
|
||||
f.encrypt(account.password.encode("utf-8")),
|
||||
)
|
||||
enc_login = cipher.encrypt(account.login.encode("utf-8"))
|
||||
enc_password = cipher.encrypt(account.password.encode("utf-8"))
|
||||
|
||||
return Account(
|
||||
user_id=account.user_id,
|
||||
@ -54,15 +66,10 @@ def decrypt(
|
||||
) -> DecryptedAccount:
|
||||
"""Decrypts account using master password and returns
|
||||
DecryptedAccount object"""
|
||||
key = _generate_key(account.salt, master_pass.encode("utf-8"))
|
||||
f = Fernet(key)
|
||||
cipher = Cipher.generate_cipher(account.salt, master_pass.encode("utf-8"))
|
||||
|
||||
login = f.decrypt(
|
||||
base64.urlsafe_b64encode(account.enc_login),
|
||||
).decode("utf-8")
|
||||
password = f.decrypt(
|
||||
base64.urlsafe_b64encode(account.enc_password),
|
||||
).decode("utf-8")
|
||||
login = cipher.decrypt(account.enc_login).decode("utf-8")
|
||||
password = cipher.decrypt(account.enc_password).decode("utf-8")
|
||||
|
||||
return DecryptedAccount(
|
||||
user_id=account.user_id,
|
||||
|
@ -31,7 +31,7 @@ def encrypt_master_pass(user_id: int, password: str) -> MasterPass:
|
||||
)
|
||||
|
||||
|
||||
def check_master_pass(password: str, master_password: MasterPass) -> bool:
|
||||
def verify_master_pass(password: str, master_password: MasterPass) -> bool:
|
||||
"""Checks if the master password is correct"""
|
||||
kdf = _get_kdf(master_password.salt)
|
||||
try:
|
||||
|
Reference in New Issue
Block a user