Massive code cleanup

This commit is contained in:
StNicolay 2022-11-30 15:56:25 +03:00
parent 944f23a146
commit 0463388829
9 changed files with 95 additions and 42 deletions

View File

@ -44,7 +44,8 @@ def delete_all(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None:
base_handler(bot, mes) base_handler(bot, mes)
bot_mes = bot.send_message( bot_mes = bot.send_message(
mes.chat.id, mes.chat.id,
"Вы действительно хотите удалить все ваши аккаунты? Это действие нельзя отменить. Отправьте YES для подтверждения", "Вы действительно хотите удалить все ваши аккаунты? Это действие нельзя отменить. "
"Отправьте YES для подтверждения",
) )
bot.register_next_step_handler( bot.register_next_step_handler(
mes, functools.partial(_delete_all, bot, engine, bot_mes) mes, functools.partial(_delete_all, bot, engine, bot_mes)
@ -95,7 +96,8 @@ def reset_master_pass(bot: telebot.TeleBot, engine: Engine, mes: Message) -> Non
return send_tmp_message(bot, mes.chat.id, "Мастер пароль не задан") return send_tmp_message(bot, mes.chat.id, "Мастер пароль не задан")
bot_mes = bot.send_message( bot_mes = bot.send_message(
mes.chat.id, mes.chat.id,
"Отправьте новый мастер пароль, осторожно, все текущие аккаунты будут удалены навсегда", "Отправьте новый мастер пароль, осторожно, все текущие аккаунты "
"будут удалены навсегда",
) )
bot.register_next_step_handler( bot.register_next_step_handler(
mes, functools.partial(_reset_master_pass2, bot, engine, bot_mes) mes, functools.partial(_reset_master_pass2, bot, engine, bot_mes)
@ -290,7 +292,8 @@ def _get_account3(
send_tmp_message( send_tmp_message(
bot, bot,
mes.chat.id, mes.chat.id,
f"Логин:\n`{login}`\nПароль:\n`{passwd}`\nНажмите на логин или пароль, чтобы скопировать", f"Логин:\n`{login}`\nПароль:\n`{passwd}`\nНажмите на логин или пароль, "
"чтобы скопировать",
30, 30,
) )
@ -341,7 +344,8 @@ def help(bot: telebot.TeleBot, mes: Message) -> None:
/cancel - отмена текущего действия /cancel - отмена текущего действия
/help - помощь /help - помощь
/export - получить пароли в json формате /export - получить пароли в json формате
/import - импортировать пароли из json в файле в таком же формате, как из /export /import - импортировать пароли из json в файле в таком же формате, \
как из /export
/gen_password - создать 10 надёжных паролей""" /gen_password - создать 10 надёжных паролей"""
bot.send_message(mes.chat.id, message) bot.send_message(mes.chat.id, message)
@ -472,8 +476,8 @@ def _import3(
def gen_password(bot: telebot.TeleBot, mes: Message) -> None: def gen_password(bot: telebot.TeleBot, mes: Message) -> None:
# Generate 10 passwords and put 'em in the backticks
base_handler(bot, mes) base_handler(bot, mes)
# Generate 10 passwords and put 'em in the backticks
passwords = (f"`{gen_passwd()}`" for _ in range(10)) passwords = (f"`{gen_passwd()}`" for _ in range(10))
text = ( text = (
"Пароли:\n" "Пароли:\n"

View File

@ -2,7 +2,7 @@ import io
import string import string
import time import time
from random import SystemRandom from random import SystemRandom
from typing import Self, Type from typing import Self, Type, Iterator
import pydantic import pydantic
import telebot import telebot
@ -37,7 +37,7 @@ class _Accounts(pydantic.BaseModel):
accounts: list[_Account] = pydantic.Field(default_factory=list) accounts: list[_Account] = pydantic.Field(default_factory=list)
def _accounts_list_to_json(accounts: list[tuple[str, str, str]]) -> str: def _accounts_list_to_json(accounts: Iterator[tuple[str, str, str]]) -> str:
accounts = _Accounts(accounts=[_Account.from_tuple(i) for i in accounts]) accounts = _Accounts(accounts=[_Account.from_tuple(i) for i in accounts])
return accounts.json() return accounts.json()
@ -48,7 +48,10 @@ def json_to_accounts(json_: str) -> list[tuple[str, str, str]]:
def send_tmp_message( def send_tmp_message(
bot: telebot.TeleBot, chat_id: telebot.types.Message, text: str, timeout: int = 5 bot: telebot.TeleBot,
chat_id: telebot.types.Message,
text: str,
timeout: int = 5,
) -> None: ) -> None:
bot_mes = bot.send_message(chat_id, text, parse_mode="MarkdownV2") bot_mes = bot.send_message(chat_id, text, parse_mode="MarkdownV2")
time.sleep(timeout) time.sleep(timeout)
@ -65,20 +68,16 @@ def base_handler(
def get_all_accounts( def get_all_accounts(
engine: Engine, user_id: int, master_pass: str engine: Engine, user_id: int, master_pass: str
) -> list[tuple[str, str, str]]: ) -> Iterator[tuple[str, str, str]]:
accounts: list[tuple[str, str, str]] = [] for account in database.get.get_all_accounts(engine, user_id):
for account_name in database.get.get_accounts(engine, user_id): name, salt, enc_login, enc_passwd = account
salt, enc_login, enc_passwd = database.get.get_account_info(
engine, user_id, account_name
)
login, passwd = cryptography.other_accounts.decrypt_account_info( login, passwd = cryptography.other_accounts.decrypt_account_info(
enc_login, enc_passwd, master_pass, salt enc_login, enc_passwd, master_pass, salt
) )
accounts.append((account_name, login, passwd)) yield (name, login, passwd)
return accounts
def accounts_to_json(accounts: list[tuple[str, str, str]]) -> io.StringIO: def accounts_to_json(accounts: Iterator[tuple[str, str, str]]) -> io.StringIO:
file = io.StringIO(_accounts_list_to_json(accounts)) file = io.StringIO(_accounts_list_to_json(accounts))
file.name = "passwords.json" file.name = "passwords.json"
return file return file

View File

@ -17,18 +17,22 @@ def _get_kdf(salt: bytes) -> Scrypt:
return kdf return kdf
def encrypt_master_pass(passwd: str) -> tuple[bytes, bytes]: def encrypt_master_pass(password: str) -> tuple[bytes, bytes]:
"""Hashes master password and return tuple of hashed password and salt""" """Hashes master password and return tuple of hashed password and salt"""
salt = os.urandom(64) salt = os.urandom(64)
kdf = _get_kdf(salt) kdf = _get_kdf(salt)
return kdf.derive(passwd.encode("utf-8")), salt return kdf.derive(password.encode("utf-8")), salt
def check_master_pass(passwd: str, enc_pass: bytes, salt: bytes) -> bool: def check_master_pass(
password: str,
enc_password: bytes,
salt: bytes,
) -> bool:
"""Checks if the master password is correct""" """Checks if the master password is correct"""
kdf = _get_kdf(salt) kdf = _get_kdf(salt)
try: try:
kdf.verify(passwd.encode("utf-8"), enc_pass) kdf.verify(password.encode("utf-8"), enc_password)
except InvalidKey: except InvalidKey:
return False return False
else: else:

View File

@ -22,23 +22,28 @@ def _generate_key(salt: bytes, master_pass: bytes) -> bytes:
def encrypt_account_info( def encrypt_account_info(
login: str, passwd: str, master_pass: str login: str, passwd: str, master_pass: str
) -> tuple[bytes, bytes, bytes]: ) -> tuple[bytes, bytes, bytes]:
"""Encrypts login and password of a user using their master password as a key. """Encrypts login and password of a user using their master
password as a key.
Returns a tuple of encrypted login, password and salt""" Returns a tuple of encrypted login, password and salt"""
salt = os.urandom(64) salt = os.urandom(64)
key = _generate_key(salt, master_pass.encode("utf-8")) key = _generate_key(salt, master_pass.encode("utf-8"))
f = Fernet(key) f = Fernet(key)
enc_login = base64.urlsafe_b64decode(f.encrypt(login.encode("utf-8"))) enc_login = base64.urlsafe_b64decode(f.encrypt(login.encode("utf-8")))
enc_passwd = base64.urlsafe_b64decode(f.encrypt(passwd.encode("utf-8"))) enc_password = base64.urlsafe_b64decode(f.encrypt(passwd.encode("utf-8")))
return (enc_login, enc_passwd, salt) return (enc_login, enc_password, salt)
def decrypt_account_info( def decrypt_account_info(
enc_login: bytes, enc_pass: bytes, master_pass: str, salt: bytes enc_login: bytes,
enc_pass: bytes,
master_pass: str,
salt: bytes,
) -> tuple[str, str]: ) -> tuple[str, str]:
"""Decrypts login and password using their master password as a key. """Decrypts login and password using their
master password as a key.
Returns a tuple of decrypted login and password""" Returns a tuple of decrypted login and password"""
key = _generate_key(salt, master_pass.encode("utf-8")) key = _generate_key(salt, master_pass.encode("utf-8"))
f = Fernet(key) f = Fernet(key)
login_bytes = f.decrypt(base64.urlsafe_b64encode(enc_login)).decode("utf-8") login = f.decrypt(base64.urlsafe_b64encode(enc_login)).decode("utf-8")
pass_bytes = f.decrypt(base64.urlsafe_b64encode(enc_pass)).decode("utf-8") password = f.decrypt(base64.urlsafe_b64encode(enc_pass)).decode("utf-8")
return (login_bytes, pass_bytes) return (login, password)

View File

@ -11,11 +11,15 @@ def add_account(
name: str, name: str,
salt: bytes, salt: bytes,
enc_login: bytes, enc_login: bytes,
enc_pass: bytes, enc_password: bytes,
) -> bool: ) -> bool:
"""Adds account to the database. Returns true on success, false otherwise""" """Adds account to the database. Returns true on success, false otherwise"""
account = models.Account( account = models.Account(
user_id=user_id, name=name, salt=salt, enc_login=enc_login, enc_pass=enc_pass user_id=user_id,
name=name,
salt=salt,
enc_login=enc_login,
enc_pass=enc_password,
) )
try: try:
with sqlmodel.Session(engine) as session: with sqlmodel.Session(engine) as session:
@ -27,9 +31,18 @@ def add_account(
return True return True
def add_master_pass(engine: Engine, user_id: int, salt: bytes, passwd: bytes) -> bool: def add_master_pass(
engine: Engine,
user_id: int,
salt: bytes,
password_hash: bytes,
) -> bool:
"""Adds master password the database. Returns true on success, false otherwise""" """Adds master password the database. Returns true on success, false otherwise"""
master_pass = models.MasterPass(user_id=user_id, salt=salt, passwd=passwd) master_pass = models.MasterPass(
user_id=user_id,
salt=salt,
passwd=password_hash,
)
try: try:
with sqlmodel.Session(engine) as session: with sqlmodel.Session(engine) as session:
session.add(master_pass) session.add(master_pass)

View File

@ -5,13 +5,13 @@ from . import models
def change_master_pass( def change_master_pass(
engine: Engine, user_id: int, salt: bytes, passwd: bytes engine: Engine, user_id: int, salt: bytes, password: bytes
) -> None: ) -> None:
"""Changes master password and salt in the database""" """Changes master password and salt in the database"""
statement = ( statement = (
sqlmodel.update(models.MasterPass) sqlmodel.update(models.MasterPass)
.where(models.MasterPass.user_id == user_id) .where(models.MasterPass.user_id == user_id)
.values(salt=salt, passwd=passwd) .values(salt=salt, passwd=password)
) )
with sqlmodel.Session(engine) as session: with sqlmodel.Session(engine) as session:
session.exec(statement) session.exec(statement)

View File

@ -6,7 +6,9 @@ from . import models
def purge_accounts(engine: Engine, user_id: int) -> None: def purge_accounts(engine: Engine, user_id: int) -> None:
"""Deletes all user's accounts""" """Deletes all user's accounts"""
statement = sqlmodel.delete(models.Account).where(models.Account.user_id == user_id) statement = sqlmodel.delete(models.Account).where(
models.Account.user_id == user_id,
)
with sqlmodel.Session(engine) as session: with sqlmodel.Session(engine) as session:
session.exec(statement) session.exec(statement)
session.commit() session.commit()
@ -25,7 +27,8 @@ def delete_master_pass(engine: Engine, user_id: int) -> None:
def delete_account(engine: Engine, user_id: int, name: str) -> None: def delete_account(engine: Engine, user_id: int, name: str) -> None:
"""Deletes specific user account""" """Deletes specific user account"""
statement = sqlmodel.delete(models.Account).where( statement = sqlmodel.delete(models.Account).where(
models.Account.user_id == user_id, models.Account.name == name models.Account.user_id == user_id,
models.Account.name == name,
) )
with sqlmodel.Session(engine) as session: with sqlmodel.Session(engine) as session:
session.exec(statement) session.exec(statement)

View File

@ -1,14 +1,19 @@
from typing import Iterator
import sqlmodel import sqlmodel
from sqlalchemy.future import Engine from sqlalchemy.future import Engine
from . import models from . import models
def get_master_pass(engine: Engine, user_id: int) -> tuple[bytes, bytes] | None: def get_master_pass(
engine: Engine,
user_id: int,
) -> tuple[bytes, bytes] | None:
"""Gets master pass. Returns tuple of salt and password """Gets master pass. Returns tuple of salt and password
or None if it wasn't found""" or None if it wasn't found"""
statement = sqlmodel.select(models.MasterPass).where( statement = sqlmodel.select(models.MasterPass).where(
models.MasterPass.user_id == user_id models.MasterPass.user_id == user_id,
) )
with sqlmodel.Session(engine) as session: with sqlmodel.Session(engine) as session:
result = session.exec(statement).first() result = session.exec(statement).first()
@ -19,19 +24,38 @@ def get_master_pass(engine: Engine, user_id: int) -> tuple[bytes, bytes] | None:
def get_accounts(engine: Engine, user_id: int) -> list[str]: def get_accounts(engine: Engine, user_id: int) -> list[str]:
"""Gets list of account names""" """Gets list of account names"""
statement = sqlmodel.select(models.Account).where(models.Account.user_id == user_id) statement = sqlmodel.select(models.Account).where(
models.Account.user_id == user_id,
)
with sqlmodel.Session(engine) as session: with sqlmodel.Session(engine) as session:
result = session.exec(statement) result = session.exec(statement)
return [account.name for account in result] return [account.name for account in result]
def get_all_accounts(
engine: Engine, user_id: int
) -> Iterator[tuple[str, bytes, bytes, bytes]]:
"""Returns an iterator of tuples, where values represent account's name, salt,
encrypted login and encrypted password"""
statement = sqlmodel.select(models.Account).where(
models.Account.user_id == user_id,
)
with sqlmodel.Session(engine) as session:
result = session.exec(statement)
yield from (
(account.name, account.salt, account.enc_login, account.enc_pass)
for account in result
)
def get_account_info( def get_account_info(
engine: Engine, user_id: int, name: str engine: Engine, user_id: int, name: str
) -> tuple[bytes, bytes, bytes]: ) -> tuple[bytes, bytes, bytes]:
"""Gets account info. Returns tuple of salt, login and password """Gets account info. Returns tuple of salt, login and password
or None if it wasn't found""" or None if it wasn't found"""
statement = sqlmodel.select(models.Account).where( statement = sqlmodel.select(models.Account).where(
models.Account.user_id == user_id, models.Account.name == name models.Account.user_id == user_id,
models.Account.name == name,
) )
with sqlmodel.Session(engine) as session: with sqlmodel.Session(engine) as session:
result = session.exec(statement).first() result = session.exec(statement).first()

View File

@ -6,7 +6,8 @@ from . import models
def get_engine(host: str, user: str, passwd: str, db: str) -> Engine: def get_engine(host: str, user: str, passwd: str, db: str) -> Engine:
"""Creates an engine for mariadb with pymysql as connector""" """Creates an engine for mariadb with pymysql as connector"""
engine = sqlmodel.create_engine(f"mariadb+pymysql://{user}:{passwd}@{host}/{db}") uri = f"mariadb+pymysql://{user}:{passwd}@{host}/{db}"
engine = sqlmodel.create_engine(uri)
return engine return engine