19 Commits

Author SHA1 Message Date
972c5577f4 Added validation to the database models 2023-01-10 21:00:03 +03:00
671286dc39 Removed backend because it wasn't used in encryption.accounts 2023-01-10 20:28:34 +03:00
5dbf93013a Added requesting a master password for deleting accounts, deleting all, reseting master password 2023-01-10 20:10:48 +03:00
c051c14f1f Switched to the ChaCha20Poly1305 encryption algorithm for better security 2023-01-10 20:00:58 +03:00
3686195396 Added a message about exceptions for a user 2023-01-06 17:32:44 +03:00
d79b57b1f0 Added return statement after sending a message about that there is no active message 2023-01-05 14:52:45 +03:00
157c2c4aa2 _import3 no longer blocks event loop during decryption
Now running encrytion of accounts in ProcessPoolExecutor
2023-01-05 14:11:29 +03:00
f4a5f51b23 Made more verbose exception handler for the bot for easier debugging 2023-01-05 13:47:31 +03:00
6bc8eb1413 db.add changes
Added _add_model helper function to reduce code duplication
Added add_accounts for future use
2023-01-05 13:19:01 +03:00
9f64305050 Moved sorting back to the get_accounts 2023-01-05 13:03:44 +03:00
4954f39a91 Added indentation into exported json files 2023-01-05 13:02:50 +03:00
3edeb86b6c Disabled autoincrement in master_passwords table 2023-01-05 13:01:28 +03:00
c7675c231f Fixed sending a final message in /import 2023-01-03 21:07:37 +03:00
ae88fccf13 Updated type hint of the handler in the register_state function 2023-01-03 12:34:54 +03:00
e29eefe40b Changes in helper_functions.delete_message
Removed checking if sleep_time is 0
Moved sleeping outside of try block
2023-01-03 12:29:08 +03:00
fdbed91512 Now account name is copyable 2023-01-03 12:12:40 +03:00
f9d163361a Renamed functions in db.get
get_accounts -> get_account_names
get_all_accounts -> get_accounts
2023-01-03 12:08:06 +03:00
9ec66a3521 Renamed src/database into src/db 2023-01-03 11:59:06 +03:00
70e9afe21d Small reformating in bot.message_handler 2023-01-03 11:48:25 +03:00
16 changed files with 265 additions and 162 deletions

View File

@ -7,7 +7,7 @@ from . import (
account_checks, account_checks,
account_parsing, account_parsing,
bot, bot,
database, db,
decrypted_account, decrypted_account,
encryption, encryption,
generate_password, generate_password,
@ -19,19 +19,19 @@ __all__ = [
"bot", "bot",
"decrypted_account", "decrypted_account",
"encryption", "encryption",
"database", "db",
"generate_password", "generate_password",
] ]
def main() -> None: def main() -> None:
load_dotenv("./.env") load_dotenv("./.env")
engine = database.prepare.get_engine( engine = db.prepare.get_engine(
host=os.getenv("DB_HOST"), host=os.getenv("DB_HOST"),
user=os.getenv("DB_USER"), user=os.getenv("DB_USER"),
passwd=os.getenv("DB_PASS"), passwd=os.getenv("DB_PASS"),
db=os.getenv("DB_NAME"), db=os.getenv("DB_NAME"),
) )
database.prepare.prepare(engine) db.prepare.prepare(engine)
bot_ = bot.create_bot(os.getenv("TG_TOKEN"), engine) bot_ = bot.create_bot(os.getenv("TG_TOKEN"), engine)
asyncio.run(bot_.infinity_polling()) asyncio.run(bot_.infinity_polling())

View File

@ -35,7 +35,7 @@ class _Accounts(pydantic.BaseModel):
def _accounts_list_to_json(accounts: Iterable[DecryptedAccount]) -> str: def _accounts_list_to_json(accounts: Iterable[DecryptedAccount]) -> str:
result = _Accounts( result = _Accounts(
accounts=[_Account.from_usual_account(i) for i in accounts], accounts=[_Account.from_usual_account(i) for i in accounts],
).json(ensure_ascii=False) ).json(ensure_ascii=False, indent=2)
return result return result

View File

@ -3,13 +3,13 @@ import functools
from sqlalchemy.future import Engine from sqlalchemy.future import Engine
from telebot.async_telebot import AsyncTeleBot from telebot.async_telebot import AsyncTeleBot
from . import callback_handlers, message_handlers from . import callback_handlers, exception_handler, message_handlers
__all__ = ["callback_handlers", "message_handlers"] __all__ = ["callback_handlers", "exception_handler", "message_handlers"]
def create_bot(token: str, engine: Engine) -> AsyncTeleBot: def create_bot(token: str, engine: Engine) -> AsyncTeleBot:
bot = AsyncTeleBot(token) bot = AsyncTeleBot(token, exception_handler=exception_handler.Handler)
bot.register_message_handler( bot.register_message_handler(
functools.partial(message_handlers.set_master_password, bot, engine), functools.partial(message_handlers.set_master_password, bot, engine),
commands=["set_master_pass"], commands=["set_master_pass"],

View File

@ -0,0 +1,8 @@
import traceback
from typing import Type
class Handler:
@staticmethod
def handle(exc: Type[BaseException]) -> None:
traceback.print_exception(exc)

View File

@ -18,7 +18,7 @@ states: dict[tuple[int, int], Handler] = {}
def register_state( def register_state(
message: Message, message: Message,
handler: Callable[[Message], Any], handler: Handler,
) -> None: ) -> None:
states[(message.chat.id, message.from_user.id)] = handler states[(message.chat.id, message.from_user.id)] = handler
@ -40,9 +40,8 @@ async def delete_message(
*, *,
sleep_time: int = 0, sleep_time: int = 0,
) -> bool: ) -> bool:
await asyncio.sleep(sleep_time)
try: try:
if sleep_time != 0:
await asyncio.sleep(sleep_time)
await bot.delete_message(mes.chat.id, mes.id) await bot.delete_message(mes.chat.id, mes.id)
except telebot.apihelper.ApiException: except telebot.apihelper.ApiException:
return False return False
@ -86,10 +85,3 @@ async def send_deleteable_message(
parse_mode="MarkdownV2", parse_mode="MarkdownV2",
reply_markup=markup, reply_markup=markup,
) )
def escape(text: str) -> str:
escaped_chars = "*_~|`[("
for char in escaped_chars:
text = text.replace(char, rf"\\{char}")
return text

View File

@ -1,13 +1,14 @@
import asyncio import asyncio
import functools import functools
import gc import gc
import itertools
from concurrent.futures import ProcessPoolExecutor from concurrent.futures import ProcessPoolExecutor
import telebot import telebot
from sqlalchemy.future import Engine from sqlalchemy.future import Engine
from telebot.async_telebot import AsyncTeleBot from telebot.async_telebot import AsyncTeleBot
from .. import database, encryption, generate_password from .. import db, encryption, generate_password
from ..account_checks import ( from ..account_checks import (
check_account, check_account,
check_account_name, check_account_name,
@ -20,7 +21,6 @@ from . import markups
from .helper_functions import ( from .helper_functions import (
base_handler, base_handler,
delete_message, delete_message,
escape,
get_state, get_state,
register_state, register_state,
send_deleteable_message, send_deleteable_message,
@ -36,7 +36,7 @@ async def get_accounts(
mes: Message, mes: Message,
) -> None: ) -> None:
await base_handler(bot, mes) await base_handler(bot, mes)
accounts = database.get.get_accounts( accounts = db.get.get_account_names(
engine, engine,
mes.from_user.id, mes.from_user.id,
to_sort=True, to_sort=True,
@ -58,23 +58,33 @@ async def get_accounts(
async def delete_all(bot: AsyncTeleBot, engine: Engine, mes: Message) -> None: async def delete_all(bot: AsyncTeleBot, engine: Engine, mes: Message) -> None:
await base_handler(bot, mes) await base_handler(bot, mes)
master_pass = db.get.get_master_pass(engine, mes.from_user.id)
if master_pass is None:
await send_tmp_message(bot, mes.chat.id, "У вас нет мастер пароля")
return
bot_mes = await bot.send_message( bot_mes = await bot.send_message(
mes.chat.id, mes.chat.id,
"Вы действительно хотите удалить все ваши аккаунты? Это действие " "Вы действительно хотите удалить все ваши аккаунты? Это действие "
"нельзя отменить. " "нельзя отменить. "
"Отправьте YES для подтверждения", "Отправьте мастер пароль для подтверждения",
)
register_state(
mes, functools.partial(_delete_all2, bot, engine, master_pass, bot_mes)
) )
register_state(mes, functools.partial(_delete_all2, bot, engine, bot_mes))
async def _delete_all2( async def _delete_all2(
bot: AsyncTeleBot, engine: Engine, prev_mes: Message, mes: Message bot: AsyncTeleBot,
engine: Engine,
master_pass: db.models.MasterPass,
prev_mes: Message,
mes: Message,
) -> None: ) -> None:
await base_handler(bot, mes, prev_mes) await base_handler(bot, mes, prev_mes)
text = mes.text.strip() text = mes.text.strip()
if text == "YES": if encryption.master_pass.check_master_pass(text, master_pass):
database.delete.purge_accounts(engine, mes.from_user.id) db.delete.purge_accounts(engine, mes.from_user.id)
database.delete.delete_master_pass(engine, mes.from_user.id) db.delete.delete_master_pass(engine, mes.from_user.id)
await send_tmp_message( await send_tmp_message(
bot, bot,
mes.chat.id, mes.chat.id,
@ -85,7 +95,7 @@ async def _delete_all2(
await send_tmp_message( await send_tmp_message(
bot, bot,
mes.chat.id, mes.chat.id,
"Вы отправили не YES, ничего не удалено", "Вы отправили не верный мастер пароль, ничего не удалено",
) )
@ -95,7 +105,7 @@ async def set_master_password(
mes: Message, mes: Message,
) -> None: ) -> None:
await base_handler(bot, mes, None) await base_handler(bot, mes, None)
if database.get.get_master_pass(engine, mes.from_user.id) is not None: if db.get.get_master_pass(engine, mes.from_user.id) is not None:
return await send_tmp_message( return await send_tmp_message(
bot, bot,
mes.chat.id, mes.chat.id,
@ -120,7 +130,7 @@ async def _set_master_pass2(
mes.from_user.id, mes.from_user.id,
text, text,
) )
database.add.add_master_pass(engine, master_password) db.add.add_master_pass(engine, master_password)
await send_tmp_message(bot, mes.chat.id, "Успех") await send_tmp_message(bot, mes.chat.id, "Успех")
del mes, text del mes, text
@ -134,7 +144,8 @@ async def reset_master_pass(
) -> None: ) -> None:
await base_handler(bot, mes) await base_handler(bot, mes)
if database.get.get_master_pass(engine, mes.from_user.id) is None: master_pass = db.get.get_master_pass(engine, mes.from_user.id)
if master_pass is None:
return await send_tmp_message( return await send_tmp_message(
bot, bot,
mes.chat.id, mes.chat.id,
@ -143,17 +154,48 @@ async def reset_master_pass(
bot_mes = await bot.send_message( bot_mes = await bot.send_message(
mes.chat.id, mes.chat.id,
"Отправьте новый мастер пароль, осторожно, все текущие аккаунты " "Отправьте текущий мастер пароль",
"будут удалены навсегда",
) )
register_state( register_state(
mes, mes,
functools.partial(_reset_master_pass2, bot, engine, bot_mes), functools.partial(
_reset_master_pass2,
bot,
engine,
master_pass,
bot_mes,
),
) )
async def _reset_master_pass2( async def _reset_master_pass2(
bot: AsyncTeleBot,
engine: Engine,
master_pass: db.models.MasterPass,
prev_mes: Message,
mes: Message,
) -> None:
await base_handler(bot, mes, prev_mes)
text = mes.text.strip()
if text == "/cancel":
await send_tmp_message(bot, mes.chat.id, "Успешная отмена")
if not encryption.master_pass.check_master_pass(text, master_pass):
await send_tmp_message(bot, mes.chat.id, "Неверный мастер пароль")
return
bot_mes = await bot.send_message(
mes.chat.id,
"Отправьте новый мастер пароль. Осторожно, все аккаунты будут удалены",
)
register_state(
mes,
functools.partial(_reset_master_pass3, bot, engine, bot_mes),
)
async def _reset_master_pass3(
bot: AsyncTeleBot, engine: Engine, prev_mes: Message, mes: Message bot: AsyncTeleBot, engine: Engine, prev_mes: Message, mes: Message
) -> None: ) -> None:
await base_handler(bot, mes, prev_mes) await base_handler(bot, mes, prev_mes)
@ -165,8 +207,8 @@ async def _reset_master_pass2(
mes.from_user.id, mes.from_user.id,
text, text,
) )
database.delete.purge_accounts(engine, mes.from_user.id) db.delete.purge_accounts(engine, mes.from_user.id)
database.change.change_master_pass(engine, master_password) db.change.change_master_pass(engine, master_password)
await send_tmp_message( await send_tmp_message(
bot, mes.chat.id, "Все ваши аккаунты удалены, а мастер пароль изменён" bot, mes.chat.id, "Все ваши аккаунты удалены, а мастер пароль изменён"
@ -178,7 +220,7 @@ async def _reset_master_pass2(
async def add_account(bot: AsyncTeleBot, engine: Engine, mes: Message) -> None: async def add_account(bot: AsyncTeleBot, engine: Engine, mes: Message) -> None:
await base_handler(bot, mes) await base_handler(bot, mes)
master_password_from_db = database.get.get_master_pass( master_password_from_db = db.get.get_master_pass(
engine, engine,
mes.from_user.id, mes.from_user.id,
) )
@ -210,7 +252,7 @@ async def _add_account2(
mes.chat.id, mes.chat.id,
"Не корректное название аккаунта", "Не корректное название аккаунта",
) )
if text in database.get.get_accounts(engine, mes.from_user.id): if text in db.get.get_account_names(engine, mes.from_user.id):
return await send_tmp_message( return await send_tmp_message(
bot, mes.chat.id, "Аккаунт с таким именем уже существует" bot, mes.chat.id, "Аккаунт с таким именем уже существует"
) )
@ -291,7 +333,7 @@ async def _add_account5(
if text == "/cancel": if text == "/cancel":
return await send_tmp_message(bot, mes.chat.id, "Успешная отмена") return await send_tmp_message(bot, mes.chat.id, "Успешная отмена")
master_password = database.get.get_master_pass(engine, mes.from_user.id) master_password = db.get.get_master_pass(engine, mes.from_user.id)
if not encryption.master_pass.check_master_pass(text, master_password): if not encryption.master_pass.check_master_pass(text, master_password):
return await send_tmp_message( return await send_tmp_message(
bot, bot,
@ -307,12 +349,9 @@ async def _add_account5(
password=data["passwd"], password=data["passwd"],
) )
encrypted_account = encryption.accounts.encrypt( encrypted_account = encryption.accounts.encrypt(account, text)
account,
text,
)
result = database.add.add_account( result = db.add.add_account(
engine, engine,
encrypted_account, encrypted_account,
) )
@ -331,11 +370,11 @@ async def _add_account5(
async def get_account(bot: AsyncTeleBot, engine: Engine, mes: Message) -> None: async def get_account(bot: AsyncTeleBot, engine: Engine, mes: Message) -> None:
await base_handler(bot, mes) await base_handler(bot, mes)
master_pass = database.get.get_master_pass(engine, mes.from_user.id) master_pass = db.get.get_master_pass(engine, mes.from_user.id)
if master_pass is None: if master_pass is None:
return await send_tmp_message(bot, mes.chat.id, "Нет мастер пароля") return await send_tmp_message(bot, mes.chat.id, "Нет мастер пароля")
accounts = database.get.get_accounts( accounts = db.get.get_account_names(
engine, engine,
mes.from_user.id, mes.from_user.id,
to_sort=True, to_sort=True,
@ -358,7 +397,7 @@ async def _get_account2(
if text == "/cancel": if text == "/cancel":
return await send_tmp_message(bot, mes.chat.id, "Успешная отмена") return await send_tmp_message(bot, mes.chat.id, "Успешная отмена")
if text not in database.get.get_accounts(engine, mes.from_user.id): if text not in db.get.get_account_names(engine, mes.from_user.id):
return await send_tmp_message(bot, mes.chat.id, "Нет такого аккаунта") return await send_tmp_message(bot, mes.chat.id, "Нет такого аккаунта")
bot_mes = await bot.send_message(mes.chat.id, "Отправьте мастер пароль") bot_mes = await bot.send_message(mes.chat.id, "Отправьте мастер пароль")
@ -380,7 +419,7 @@ async def _get_account3(
if text == "/cancel": if text == "/cancel":
return await send_tmp_message(bot, mes.chat.id, "Успешная отмена") return await send_tmp_message(bot, mes.chat.id, "Успешная отмена")
master_password = database.get.get_master_pass( master_password = db.get.get_master_pass(
engine, engine,
mes.from_user.id, mes.from_user.id,
) )
@ -392,17 +431,14 @@ async def _get_account3(
"Не подходит мастер пароль", "Не подходит мастер пароль",
) )
account = database.get.get_account_info(engine, mes.from_user.id, name) account = db.get.get_account_info(engine, mes.from_user.id, name)
account = encryption.accounts.decrypt( account = encryption.accounts.decrypt(account, text)
account,
text,
)
await send_deleteable_message( await send_deleteable_message(
bot, bot,
mes.chat.id, mes.chat.id,
f"Название:\n{escape(account.name)}\n" f"Название:\n`{account.name}`\n"
f"Логин:\n`{account.login}`\nПароль:\n`{account.password}`\nНажмите " f"Логин:\n`{account.login}`\nПароль:\n`{account.password}`\nНажмите "
"на логин или пароль, чтобы скопировать", "на название, логин или пароль, чтобы скопировать",
) )
del text, mes del text, mes
@ -416,11 +452,11 @@ async def delete_account(
) -> None: ) -> None:
await base_handler(bot, mes) await base_handler(bot, mes)
master_pass = database.get.get_master_pass(engine, mes.from_user.id) master_pass = db.get.get_master_pass(engine, mes.from_user.id)
if master_pass is None: if master_pass is None:
return await send_tmp_message(bot, mes.chat.id, "Нет мастер пароля") return await send_tmp_message(bot, mes.chat.id, "Нет мастер пароля")
accounts = database.get.get_accounts( accounts = db.get.get_account_names(
engine, engine,
mes.from_user.id, mes.from_user.id,
to_sort=True, to_sort=True,
@ -434,13 +470,14 @@ async def delete_account(
register_state( register_state(
mes, mes,
functools.partial(_delete_account2, bot, engine, bot_mes), functools.partial(_delete_account2, bot, engine, master_pass, bot_mes),
) )
async def _delete_account2( async def _delete_account2(
bot: AsyncTeleBot, bot: AsyncTeleBot,
engine: Engine, engine: Engine,
master_pass: db.models.MasterPass,
prev_mes: Message, prev_mes: Message,
mes: Message, mes: Message,
): ):
@ -449,34 +486,43 @@ async def _delete_account2(
if text == "/cancel": if text == "/cancel":
return await send_tmp_message(bot, mes.chat.id, "Успешная отмена") return await send_tmp_message(bot, mes.chat.id, "Успешная отмена")
if text not in database.get.get_accounts(engine, mes.from_user.id): if text not in db.get.get_account_names(engine, mes.from_user.id):
return await send_tmp_message(bot, mes.chat.id, "Нет такого аккаунта") return await send_tmp_message(bot, mes.chat.id, "Нет такого аккаунта")
bot_mes = await bot.send_message( bot_mes = await bot.send_message(
mes.from_user.id, mes.from_user.id,
f'Вы уверены, что хотите удалить аккаунт "{text}"?\nОтправьте YES для ' f'Вы уверены, что хотите удалить аккаунт "{text}"?\nОтправьте мастер '
"подтверждения", "пароль для подтверждения",
) )
register_state( register_state(
mes, mes,
functools.partial(_delete_account3, bot, engine, bot_mes, text), functools.partial(
_delete_account3,
bot,
engine,
master_pass,
bot_mes,
text,
),
) )
async def _delete_account3( async def _delete_account3(
bot: AsyncTeleBot, bot: AsyncTeleBot,
engine: Engine, engine: Engine,
master_pass: db.models.MasterPass,
prev_mes: Message, prev_mes: Message,
account_name: str, account_name: str,
mes: Message, mes: Message,
) -> None: ) -> None:
await base_handler(bot, mes, prev_mes) await base_handler(bot, mes, prev_mes)
text = mes.text.strip() text = mes.text.strip()
if text != "YES": if not encryption.master_pass.check_master_pass(text, master_pass):
return await send_tmp_message(bot, mes.chat.id, "Успешная отмена") await send_tmp_message(bot, mes.chat.id, "Неверный пароль")
return
database.delete.delete_account(engine, mes.from_user.id, account_name) db.delete.delete_account(engine, mes.from_user.id, account_name)
await send_tmp_message(bot, mes.chat.id, "Аккаунт удалён") await send_tmp_message(bot, mes.chat.id, "Аккаунт удалён")
@ -500,7 +546,7 @@ async def help_command(bot: AsyncTeleBot, mes: Message) -> None:
async def export(bot: AsyncTeleBot, engine: Engine, mes: Message) -> None: async def export(bot: AsyncTeleBot, engine: Engine, mes: Message) -> None:
await base_handler(bot, mes) await base_handler(bot, mes)
master_password_from_db = database.get.get_master_pass( master_password_from_db = db.get.get_master_pass(
engine, engine,
mes.from_user.id, mes.from_user.id,
) )
@ -508,7 +554,7 @@ async def export(bot: AsyncTeleBot, engine: Engine, mes: Message) -> None:
if master_password_from_db is None: if master_password_from_db is None:
return await send_tmp_message(bot, mes.chat.id, "Нет мастер пароля") return await send_tmp_message(bot, mes.chat.id, "Нет мастер пароля")
if not database.get.get_accounts(engine, mes.from_user.id): if not db.get.get_account_names(engine, mes.from_user.id):
return await send_tmp_message(bot, mes.chat.id, "Нет аккаунтов") return await send_tmp_message(bot, mes.chat.id, "Нет аккаунтов")
bot_mes = await bot.send_message(mes.chat.id, "Отправьте мастер пароль") bot_mes = await bot.send_message(mes.chat.id, "Отправьте мастер пароль")
@ -524,7 +570,7 @@ async def _export2(
if text == "/cancel": if text == "/cancel":
return await send_tmp_message(bot, mes.chat.id, "Успешная отмена") return await send_tmp_message(bot, mes.chat.id, "Успешная отмена")
master_password = database.get.get_master_pass( master_password = db.get.get_master_pass(
engine, engine,
mes.from_user.id, mes.from_user.id,
) )
@ -535,7 +581,7 @@ async def _export2(
"Не подходит мастер пароль", "Не подходит мастер пароль",
) )
accounts = database.get.get_all_accounts(engine, mes.from_user.id) accounts = db.get.get_accounts(engine, mes.from_user.id)
with ProcessPoolExecutor() as pool: with ProcessPoolExecutor() as pool:
loop = asyncio.get_running_loop() loop = asyncio.get_running_loop()
tasks = [] tasks = []
@ -547,7 +593,6 @@ async def _export2(
) )
tasks.append(loop.run_in_executor(pool, function)) tasks.append(loop.run_in_executor(pool, function))
accounts = await asyncio.gather(*tasks) accounts = await asyncio.gather(*tasks)
accounts.sort(key=lambda account: account.name)
json_io = accounts_to_json(accounts) json_io = accounts_to_json(accounts)
await bot.send_document( await bot.send_document(
mes.chat.id, mes.chat.id,
@ -565,7 +610,7 @@ async def import_accounts(
mes: Message, mes: Message,
) -> None: ) -> None:
await base_handler(bot, mes) await base_handler(bot, mes)
master_password_from_db = database.get.get_master_pass( master_password_from_db = db.get.get_master_pass(
engine, engine,
mes.from_user.id, mes.from_user.id,
) )
@ -633,7 +678,7 @@ async def _import3(
if text == "/cancel": if text == "/cancel":
return await send_tmp_message(bot, mes.chat.id, "Успешная отмена") return await send_tmp_message(bot, mes.chat.id, "Успешная отмена")
master_password = database.get.get_master_pass( master_password = db.get.get_master_pass(
engine, engine,
mes.from_user.id, mes.from_user.id,
) )
@ -647,25 +692,35 @@ async def _import3(
# List of names of accounts, which failed to be added to the database # List of names of accounts, which failed to be added to the database
# or failed the tests # or failed the tests
failed: list[str] = [] failed: list[str] = []
for account in accounts: tasks: list[asyncio.Future[db.models.Account]] = []
if not check_account(account): loop = asyncio.get_running_loop()
failed.append(account.name) with ProcessPoolExecutor() as pool:
continue for account in accounts:
account = encryption.accounts.encrypt( if not check_account(account):
account, failed.append(account.name)
text, continue
) function = functools.partial(
result = database.add.add_account(engine, account) encryption.accounts.encrypt,
if not result: account,
failed.append(account.name) text,
)
tasks.append(loop.run_in_executor(pool, function))
enc_accounts: list[db.models.Account] = await asyncio.gather(*tasks)
results = db.add.add_accounts(engine, enc_accounts)
failed_accounts = itertools.compress(
enc_accounts, (not result for result in results)
)
failed.extend((account.name for account in failed_accounts))
if failed: if failed:
mes_text = "Не удалось добавить:\n" + "\n".join(failed) await send_deleteable_message(
bot, mes.chat.id, "Не удалось добавить:\n" + "\n".join(failed)
)
else: else:
mes_text = "Успех" await send_tmp_message(bot, mes.chat.id, "Успех")
await send_tmp_message(bot, mes.chat.id, mes_text, 10) del text, mes, accounts, function, tasks, failed_accounts
del text, mes, accounts
gc.collect() gc.collect()
@ -687,10 +742,20 @@ async def message_handler(bot: AsyncTeleBot, mes: Message) -> None:
await delete_message(bot, mes) await delete_message(bot, mes)
if mes.text.strip() == "/cancel": if mes.text.strip() == "/cancel":
await send_tmp_message(bot, mes.chat.id, "Нет активного действия") await send_tmp_message(bot, mes.chat.id, "Нет активного действия")
return
await send_tmp_message( await send_tmp_message(
bot, bot,
mes.chat.id, mes.chat.id,
"Вы отправили не корректное сообщение", "Вы отправили не корректное сообщение",
) )
return return
await handler(mes)
try:
await handler(mes)
except Exception:
await send_tmp_message(
bot,
mes.chat.id,
"Произошла непредвиденная ошибка",
)
raise

View File

@ -1,31 +0,0 @@
import sqlmodel
from sqlalchemy.exc import IntegrityError
from sqlalchemy.future import Engine
from . import models
def add_account(engine: Engine, account: models.Account) -> bool:
"""Adds account to the database. Returns true on success,
false otherwise"""
try:
with sqlmodel.Session(engine) as session:
session.add(account)
session.commit()
except IntegrityError:
return False
else:
return True
def add_master_pass(engine: Engine, master_pass: models.MasterPass) -> bool:
"""Adds master password the database. Returns true on success,
false otherwise"""
try:
with sqlmodel.Session(engine) as session:
session.add(master_pass)
session.commit()
except IntegrityError:
return False
else:
return True

46
src/db/add.py Normal file
View File

@ -0,0 +1,46 @@
import sqlmodel
from sqlalchemy.exc import IntegrityError
from sqlalchemy.future import Engine
from . import models
def _add_model(
session: sqlmodel.Session, model: models.Account | models.MasterPass
) -> bool:
"""Adds model to the session. Returns true on success,
false otherwise"""
try:
session.add(model)
except IntegrityError:
return False
else:
return True
def add_account(engine: Engine, account: models.Account) -> bool:
"""Adds account to the database. Returns true on success,
false otherwise"""
with sqlmodel.Session(engine) as session:
result = _add_model(session, account)
session.commit()
return result
def add_master_pass(engine: Engine, master_pass: models.MasterPass) -> bool:
"""Adds master password the database. Returns true on success,
false otherwise"""
with sqlmodel.Session(engine) as session:
result = _add_model(session, master_pass)
session.commit()
return result
def add_accounts(
engine: Engine,
accounts: list[models.Account],
) -> list[bool]:
with sqlmodel.Session(engine) as session:
result = [_add_model(session, account) for account in accounts]
session.commit()
return result

View File

@ -17,7 +17,7 @@ def get_master_pass(
return result return result
def get_accounts( def get_account_names(
engine: Engine, engine: Engine,
user_id: int, user_id: int,
*, *,
@ -34,10 +34,14 @@ def get_accounts(
return result return result
def get_all_accounts(engine: Engine, user_id: int) -> list[models.Account]: def get_accounts(engine: Engine, user_id: int) -> list[models.Account]:
"""Returns a list of accounts of a user""" """Returns a list of accounts of a user"""
statement = sqlmodel.select(models.Account).where( statement = (
models.Account.user_id == user_id, sqlmodel.select(models.Account)
.where(
models.Account.user_id == user_id,
)
.order_by(models.Account.name)
) )
with sqlmodel.Session(engine) as session: with sqlmodel.Session(engine) as session:
result = session.exec(statement).fetchall() result = session.exec(statement).fetchall()

View File

@ -4,13 +4,21 @@ import sqlmodel
class MasterPass(sqlmodel.SQLModel, table=True): class MasterPass(sqlmodel.SQLModel, table=True):
__tablename__ = "master_passwords" __tablename__ = "master_passwords"
user_id: int = sqlmodel.Field( user_id: int = sqlmodel.Field(
sa_column=sqlmodel.Column(sqlmodel.INT(), primary_key=True) sa_column=sqlmodel.Column(
sqlmodel.INT(),
primary_key=True,
autoincrement=False,
)
) )
salt: bytes = sqlmodel.Field( salt: bytes = sqlmodel.Field(
sa_column=sqlmodel.Column(sqlmodel.BINARY(64), nullable=False) sa_column=sqlmodel.Column(sqlmodel.BINARY(64), nullable=False),
max_length=64,
min_length=64,
) )
password_hash: bytes = sqlmodel.Field( password_hash: bytes = sqlmodel.Field(
sa_column=sqlmodel.Column(sqlmodel.BINARY(128), nullable=False) sa_column=sqlmodel.Column(sqlmodel.BINARY(128), nullable=False),
max_length=128,
min_length=128,
) )
@ -18,13 +26,17 @@ class Account(sqlmodel.SQLModel, table=True):
__tablename__ = "accounts" __tablename__ = "accounts"
__table_args__ = (sqlmodel.PrimaryKeyConstraint("user_id", "name"),) __table_args__ = (sqlmodel.PrimaryKeyConstraint("user_id", "name"),)
user_id: int = sqlmodel.Field() user_id: int = sqlmodel.Field()
name: str = sqlmodel.Field(max_length=255) name: str = sqlmodel.Field(max_length=256)
salt: bytes = sqlmodel.Field( salt: bytes = sqlmodel.Field(
sa_column=sqlmodel.Column(sqlmodel.BINARY(64), nullable=False) sa_column=sqlmodel.Column(sqlmodel.BINARY(64), nullable=False),
max_length=64,
min_length=64,
) )
enc_login: bytes = sqlmodel.Field( enc_login: bytes = sqlmodel.Field(
sa_column=sqlmodel.Column(sqlmodel.VARBINARY(256), nullable=False) sa_column=sqlmodel.Column(sqlmodel.VARBINARY(256), nullable=False),
max_length=256,
) )
enc_password: bytes = sqlmodel.Field( enc_password: bytes = sqlmodel.Field(
sa_column=sqlmodel.Column(sqlmodel.VARBINARY(256), nullable=False) sa_column=sqlmodel.Column(sqlmodel.VARBINARY(256), nullable=False),
max_length=256,
) )

View File

@ -1,26 +1,43 @@
import base64
import os import os
from typing import Self
from cryptography.fernet import Fernet
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.ciphers.aead import ChaCha20Poly1305
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from ..database.models import Account from ..db.models import Account
from ..decrypted_account import DecryptedAccount from ..decrypted_account import DecryptedAccount
def _generate_key(salt: bytes, master_pass: bytes) -> bytes: class Cipher:
"""Generates key for fernet encryption""" def __init__(self, key: bytes) -> None:
kdf = PBKDF2HMAC( self._chacha = ChaCha20Poly1305(key)
algorithm=hashes.SHA256(),
length=32, @classmethod
salt=salt, def generate_cipher(cls, salt: bytes, password: bytes) -> Self:
iterations=100000, """Generates cipher which uses key derived from a given password"""
backend=default_backend(), kdf = PBKDF2HMAC(
) algorithm=hashes.SHA256(),
key = base64.urlsafe_b64encode(kdf.derive(master_pass)) length=32,
return key salt=salt,
iterations=100000,
)
return cls(kdf.derive(password))
def encrypt(self, data: bytes) -> bytes:
nonce = os.urandom(12)
return nonce + self._chacha.encrypt(
nonce,
data,
associated_data=None,
)
def decrypt(self, data: bytes) -> bytes:
return self._chacha.decrypt(
nonce=data[:12],
data=data[12:],
associated_data=None,
)
def encrypt( def encrypt(
@ -29,15 +46,10 @@ def encrypt(
) -> Account: ) -> Account:
"""Encrypts account using master password and returns Account object""" """Encrypts account using master password and returns Account object"""
salt = os.urandom(64) salt = os.urandom(64)
key = _generate_key(salt, master_pass.encode("utf-8")) cipher = Cipher.generate_cipher(salt, master_pass.encode("utf-8"))
f = Fernet(key)
enc_login = base64.urlsafe_b64decode( enc_login = cipher.encrypt(account.login.encode("utf-8"))
f.encrypt(account.login.encode("utf-8")), enc_password = cipher.encrypt(account.password.encode("utf-8"))
)
enc_password = base64.urlsafe_b64decode(
f.encrypt(account.password.encode("utf-8")),
)
return Account( return Account(
user_id=account.user_id, user_id=account.user_id,
@ -54,15 +66,10 @@ def decrypt(
) -> DecryptedAccount: ) -> DecryptedAccount:
"""Decrypts account using master password and returns """Decrypts account using master password and returns
DecryptedAccount object""" DecryptedAccount object"""
key = _generate_key(account.salt, master_pass.encode("utf-8")) cipher = Cipher.generate_cipher(account.salt, master_pass.encode("utf-8"))
f = Fernet(key)
login = f.decrypt( login = cipher.decrypt(account.enc_login).decode("utf-8")
base64.urlsafe_b64encode(account.enc_login), password = cipher.decrypt(account.enc_password).decode("utf-8")
).decode("utf-8")
password = f.decrypt(
base64.urlsafe_b64encode(account.enc_password),
).decode("utf-8")
return DecryptedAccount( return DecryptedAccount(
user_id=account.user_id, user_id=account.user_id,

View File

@ -3,7 +3,7 @@ import os
from cryptography.exceptions import InvalidKey from cryptography.exceptions import InvalidKey
from cryptography.hazmat.primitives.kdf.scrypt import Scrypt from cryptography.hazmat.primitives.kdf.scrypt import Scrypt
from ..database.models import MasterPass from ..db.models import MasterPass
MEMORY_USAGE = 2**14 MEMORY_USAGE = 2**14