Compare commits

..

No commits in common. "74844da4aeb4d4cdc214ac6fe7a20eb416e337ac" and "a9417058ee53105d7e3f14e445d2116744eaddf9" have entirely different histories.

23 changed files with 752 additions and 1035 deletions

View File

@ -24,5 +24,5 @@
**/secrets.dev.yaml **/secrets.dev.yaml
**/values.dev.yaml **/values.dev.yaml
README.md README.md
database/ data/
.flake8 .flake8

2
.gitignore vendored
View File

@ -161,4 +161,4 @@ cython_debug/
#.idea/ #.idea/
# Database data # Database data
database/data/ data/

View File

@ -20,9 +20,7 @@ services:
- password_manager - password_manager
db: db:
build: image: stnicolay/mariadb-aria
context: ./database/
dockerfile: ./Dockerfile
restart: always restart: always
environment: environment:
MYSQL_ROOT_PASSWORD: example123! MYSQL_ROOT_PASSWORD: example123!
@ -30,6 +28,6 @@ services:
MYSQL_USER: manager MYSQL_USER: manager
MYSQL_PASSWORD: passwd123! MYSQL_PASSWORD: passwd123!
volumes: volumes:
- ./database/data:/var/lib/mysql - ./data:/var/lib/mysql
networks: networks:
- password_manager - password_manager

View File

@ -1,3 +0,0 @@
FROM mariadb
COPY mariadb.cnf /etc/mysql/mariadb.cnf

View File

@ -1,38 +0,0 @@
# The MariaDB configuration file
#
# The MariaDB/MySQL tools read configuration files in the following order:
# 0. "/etc/mysql/my.cnf" symlinks to this file, reason why all the rest is read.
# 1. "/etc/mysql/mariadb.cnf" (this file) to set global defaults,
# 2. "/etc/mysql/conf.d/*.cnf" to set global options.
# 3. "/etc/mysql/mariadb.conf.d/*.cnf" to set MariaDB-only options.
# 4. "~/.my.cnf" to set user-specific options.
#
# If the same option is defined multiple times, the last one will apply.
#
# One can use all long options that the program supports.
# Run program with --help to get a list of available options and with
# --print-defaults to see which it would actually understand and use.
#
# If you are new to MariaDB, check out https://mariadb.com/kb/en/basic-mariadb-articles/
#
# This group is read both by the client and the server
# use it for options that affect everything
#
[client-server]
# Port or socket location where to connect
# port = 3306
socket = /run/mysqld/mysqld.sock
# Import all .cnf files from configuration directory
[mariadbd]
skip-host-cache
skip-name-resolve
[mysqld]
skip-innodb
default-storage-engine=Aria
default-tmp-storage-engine=Aria
!includedir /etc/mysql/mariadb.conf.d/
!includedir /etc/mysql/conf.d/

View File

@ -1,3 +1,2 @@
black black
flake8 flake8
isort

View File

@ -4,4 +4,3 @@ python-dotenv
pyTelegramBotAPI pyTelegramBotAPI
sqlmodel sqlmodel
pydantic pydantic
aiohttp

View File

@ -1,4 +1,3 @@
import asyncio
import os import os
from dotenv import load_dotenv from dotenv import load_dotenv
@ -7,9 +6,8 @@ from . import (
account_checks, account_checks,
account_parsing, account_parsing,
bot, bot,
database,
decrypted_account,
encryption, encryption,
database,
generate_password, generate_password,
) )
@ -17,7 +15,6 @@ __all__ = [
"account_checks", "account_checks",
"account_parsing", "account_parsing",
"bot", "bot",
"decrypted_account",
"encryption", "encryption",
"database", "database",
"generate_password", "generate_password",
@ -34,4 +31,4 @@ def main() -> None:
) )
database.prepare.prepare(engine) database.prepare.prepare(engine)
bot_ = bot.create_bot(os.getenv("TG_TOKEN"), engine) bot_ = bot.create_bot(os.getenv("TG_TOKEN"), engine)
asyncio.run(bot_.infinity_polling()) bot_.infinity_polling()

View File

@ -1,8 +1,6 @@
import string import string
from .decrypted_account import DecryptedAccount FORBIDDEN_CHARS = frozenset("`\n")
FORBIDDEN_CHARS = frozenset("`\n\\")
PUNCTUATION = frozenset(string.punctuation).difference(FORBIDDEN_CHARS) PUNCTUATION = frozenset(string.punctuation).difference(FORBIDDEN_CHARS)
@ -26,13 +24,13 @@ def check_password(passwd: str) -> bool:
return _base_check(passwd) return _base_check(passwd)
def check_account(account: DecryptedAccount) -> bool: def check_account(name: str, login: str, passwd: str) -> bool:
"""Runs checks for account name, login and password""" """Runs checks for account name, login and password"""
return all( return all(
( (
check_account_name(account.name), check_account_name(name),
check_login(account.login), check_login(login),
check_password(account.password), check_password(passwd),
) )
) )

View File

@ -1,50 +1,37 @@
import io import io
from typing import Iterable, Self from typing import Iterator, Self, Type
import pydantic import pydantic
from .decrypted_account import DecryptedAccount
class _Account(pydantic.BaseModel): class _Account(pydantic.BaseModel):
name: str name: str
login: str login: str
password: str password: str
def to_usual_account(self, user_id: int) -> DecryptedAccount:
return DecryptedAccount(
user_id=user_id,
name=self.name,
login=self.login,
password=self.password,
)
@classmethod @classmethod
def from_usual_account(cls, account: DecryptedAccount) -> Self: def from_tuple(cls: Type[Self], tuple_: tuple[str, str, str]) -> Self:
return cls( return cls(name=tuple_[0], login=tuple_[1], password=tuple_[2])
name=account.name,
login=account.login, def as_tuple(self: Self) -> tuple[str, str, str]:
password=account.password, return (self.name, self.login, self.password)
)
class _Accounts(pydantic.BaseModel): class _Accounts(pydantic.BaseModel):
accounts: list[_Account] = pydantic.Field(default_factory=list) accounts: list[_Account] = pydantic.Field(default_factory=list)
def _accounts_list_to_json(accounts: Iterable[DecryptedAccount]) -> str: def _accounts_list_to_json(accounts: Iterator[tuple[str, str, str]]) -> str:
result = _Accounts( accounts = _Accounts(accounts=[_Account.from_tuple(i) for i in accounts])
accounts=[_Account.from_usual_account(i) for i in accounts], return accounts.json(ensure_ascii=False)
).json(ensure_ascii=False)
return result
def json_to_accounts(json_: str, user_id: int) -> list[DecryptedAccount]: def json_to_accounts(json_: str) -> list[tuple[str, str, str]]:
accounts = _Accounts.parse_raw(json_) accounts = _Accounts.parse_raw(json_)
return [account.to_usual_account(user_id) for account in accounts.accounts] return [i.as_tuple() for i in accounts.accounts]
def accounts_to_json(accounts: Iterable[DecryptedAccount]) -> io.StringIO: def accounts_to_json(accounts: Iterator[tuple[str, str, str]]) -> io.StringIO:
file = io.StringIO(_accounts_list_to_json(accounts)) file = io.StringIO(_accounts_list_to_json(accounts))
file.name = "passwords.json" file.name = "passwords.json"
return file return file

View File

@ -1,77 +1,59 @@
import functools import functools
import telebot
from sqlalchemy.future import Engine from sqlalchemy.future import Engine
from telebot.async_telebot import AsyncTeleBot
from . import callback_handlers, message_handlers from . import handlers
__all__ = ["callback_handlers", "message_handlers"] __all__ = ["handlers"]
def create_bot(token: str, engine: Engine) -> AsyncTeleBot: def create_bot(token: str, engine: Engine) -> telebot.TeleBot:
bot = AsyncTeleBot(token) bot = telebot.TeleBot(token)
bot.register_message_handler( bot.register_message_handler(
functools.partial(message_handlers.set_master_password, bot, engine), functools.partial(handlers.set_master_password, bot, engine),
commands=["set_master_pass"], commands=["set_master_pass"],
) )
bot.register_message_handler( bot.register_message_handler(
functools.partial(message_handlers.get_account, bot, engine), functools.partial(handlers.get_account, bot, engine),
commands=["get_account"], commands=["get_account"],
) )
bot.register_message_handler( bot.register_message_handler(
functools.partial(message_handlers.get_accounts, bot, engine), functools.partial(handlers.get_accounts, bot, engine),
commands=["get_accounts"], commands=["get_accounts"],
) )
bot.register_message_handler( bot.register_message_handler(
functools.partial(message_handlers.add_account, bot, engine), functools.partial(handlers.add_account, bot, engine),
commands=["add_account"], commands=["add_account"],
) )
bot.register_message_handler( bot.register_message_handler(
functools.partial(message_handlers.delete_all, bot, engine), functools.partial(handlers.delete_all, bot, engine),
commands=["delete_all"], commands=["delete_all"],
) )
bot.register_message_handler( bot.register_message_handler(
functools.partial(message_handlers.reset_master_pass, bot, engine), functools.partial(handlers.reset_master_pass, bot, engine),
commands=["reset_master_pass"], commands=["reset_master_pass"],
) )
bot.register_message_handler( bot.register_message_handler(
functools.partial(message_handlers.delete_account, bot, engine), functools.partial(handlers.delete_account, bot, engine),
commands=["delete_account"], commands=["delete_account"],
) )
bot.register_message_handler( bot.register_message_handler(
functools.partial(message_handlers.help_command, bot), functools.partial(handlers.help_command, bot),
commands=["help", "start"], commands=["help", "start"],
) )
bot.register_message_handler( bot.register_message_handler(
functools.partial(message_handlers.export, bot, engine), functools.partial(handlers.cancel, bot), commands=["cancel"]
commands=["export"],
) )
bot.register_message_handler( bot.register_message_handler(
functools.partial(message_handlers.import_accounts, bot, engine), functools.partial(handlers.export, bot, engine), commands=["export"]
)
bot.register_message_handler(
functools.partial(handlers.import_accounts, bot, engine),
commands=["import"], commands=["import"],
) )
bot.register_message_handler( bot.register_message_handler(
functools.partial(message_handlers.gen_password, bot), functools.partial(handlers.gen_password, bot),
commands=["gen_password"], commands=["gen_password"],
) )
bot.register_message_handler(
functools.partial(message_handlers.message_handler, bot),
content_types=[
"text",
"audio",
"document",
"photo",
"sticker",
"video",
"video_note",
"voice",
"location",
"contact",
],
)
bot.register_callback_query_handler(
functools.partial(callback_handlers.delete_message, bot),
lambda call: call.data == "DELETE",
)
return bot return bot

View File

@ -1,8 +0,0 @@
from telebot.async_telebot import AsyncTeleBot
from telebot.types import CallbackQuery as Callback
from . import helper_functions
async def delete_message(bot: AsyncTeleBot, call: Callback) -> None:
await helper_functions.delete_message(bot, call.message)

592
src/bot/handlers.py Normal file
View File

@ -0,0 +1,592 @@
import functools
import gc
import time
import telebot
from sqlalchemy.future import Engine
from .. import encryption, database, generate_password
from ..account_checks import (
check_account,
check_account_name,
check_login,
check_password,
)
from ..account_parsing import accounts_to_json, json_to_accounts
Message = telebot.types.Message
def _send_tmp_message(
bot: telebot.TeleBot,
chat_id: telebot.types.Message,
text: str,
timeout: int = 5,
) -> None:
bot_mes = bot.send_message(chat_id, text, parse_mode="MarkdownV2")
time.sleep(timeout)
bot.delete_message(chat_id, bot_mes.id)
def _base_handler(
bot: telebot.TeleBot, mes: Message, prev_mes: Message | None = None
) -> None:
bot.delete_message(mes.chat.id, mes.id)
if prev_mes is not None:
bot.delete_message(prev_mes.chat.id, prev_mes.id)
def get_accounts(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None:
_base_handler(bot, mes)
accounts = database.get.get_accounts(engine, mes.from_user.id)
if not accounts:
return _send_tmp_message(bot, mes.chat.id, "У вас нет аккаунтов")
# Make accounts copyable and escape special chars
accounts = [f"`{account}`" for account in accounts]
_send_tmp_message(
bot,
mes.chat.id,
"Ваши аккаунты:\n"
+ "\n".join(accounts)
+ "\nНажмите на название, чтобы скопировать\n"
f"Всего {len(accounts)}",
30,
)
def delete_all(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None:
_base_handler(bot, mes)
bot_mes = bot.send_message(
mes.chat.id,
"Вы действительно хотите удалить все ваши аккаунты? Это действие "
"нельзя отменить. "
"Отправьте YES для подтверждения",
)
bot.register_next_step_handler(
mes, functools.partial(_delete_all, bot, engine, bot_mes)
)
def _delete_all(
bot: telebot.TeleBot, engine: Engine, prev_mes: Message, mes: Message
) -> None:
_base_handler(bot, mes, prev_mes)
text = mes.text.strip()
if text == "YES":
database.delete.purge_accounts(engine, mes.from_user.id)
database.delete.delete_master_pass(engine, mes.from_user.id)
_send_tmp_message(bot, mes.chat.id, "Всё успешно удалено", timeout=10)
else:
_send_tmp_message(
bot,
mes.chat.id,
"Вы отправили не YES, ничего не удалено",
)
def set_master_password(
bot: telebot.TeleBot,
engine: Engine,
mes: Message,
) -> None:
_base_handler(bot, mes, None)
if database.get.get_master_pass(engine, mes.from_user.id) is not None:
return _send_tmp_message(
bot,
mes.chat.id,
"Мастер пароль уже существует",
)
bot_mes = bot.send_message(mes.chat.id, "Отправьте мастер пароль")
bot.register_next_step_handler(
mes, functools.partial(_set_master_pass2, bot, engine, bot_mes)
)
def _set_master_pass2(
bot: telebot.TeleBot, engine: Engine, prev_mes: Message, mes: Message
) -> None:
_base_handler(bot, mes, prev_mes)
text = mes.text.strip()
if text == "/cancel":
return _send_tmp_message(bot, mes.chat.id, "Успешная отмена")
password_hash, master_salt = encryption.master_pass.encrypt_master_pass(
text,
)
database.add.add_master_pass(
engine,
mes.from_user.id,
master_salt,
password_hash,
)
_send_tmp_message(bot, mes.chat.id, "Успех")
del mes, text
gc.collect()
def reset_master_pass(
bot: telebot.TeleBot,
engine: Engine,
mes: Message,
) -> None:
_base_handler(bot, mes)
if database.get.get_master_pass(engine, mes.from_user.id) is None:
return _send_tmp_message(bot, mes.chat.id, "Мастер пароль не задан")
bot_mes = bot.send_message(
mes.chat.id,
"Отправьте новый мастер пароль, осторожно, все текущие аккаунты "
"будут удалены навсегда",
)
bot.register_next_step_handler(
mes, functools.partial(_reset_master_pass2, bot, engine, bot_mes)
)
def _reset_master_pass2(
bot: telebot.TeleBot, engine: Engine, prev_mes: Message, mes: Message
) -> None:
_base_handler(bot, mes, prev_mes)
text = mes.text.strip()
if text == "/cancel":
return _send_tmp_message(bot, mes.chat.id, "Успешная отмена")
hash_, salt = encryption.master_pass.encrypt_master_pass(text)
database.delete.purge_accounts(engine, mes.from_user.id)
database.change.change_master_pass(engine, mes.from_user.id, salt, hash_)
_send_tmp_message(
bot, mes.chat.id, "Все ваши аккаунты удалены, а мастер пароль изменён"
)
del mes, text
gc.collect()
def add_account(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None:
_base_handler(bot, mes)
master_password_from_db = database.get.get_master_pass(
engine,
mes.from_user.id,
)
if master_password_from_db is None:
return _send_tmp_message(bot, mes.chat.id, "Нет мастер пароля")
bot_mes = bot.send_message(mes.chat.id, "Отправьте название аккаунта")
bot.register_next_step_handler(
mes, functools.partial(_add_account2, bot, engine, bot_mes)
)
def _add_account2(
bot: telebot.TeleBot, engine: Engine, prev_mes: Message, mes: Message
) -> None:
_base_handler(bot, mes, prev_mes)
text = mes.text.strip()
if text == "/cancel":
return _send_tmp_message(bot, mes.chat.id, "Успешная отмена")
if not check_account_name(text):
return _send_tmp_message(
bot,
mes.chat.id,
"Не корректное название аккаунта",
)
if text in database.get.get_accounts(engine, mes.from_user.id):
return _send_tmp_message(
bot, mes.chat.id, "Аккаунт с таким именем уже существует"
)
bot_mes = bot.send_message(mes.chat.id, "Отправьте логин")
data = {"name": text}
bot.register_next_step_handler(
mes, functools.partial(_add_account3, bot, engine, bot_mes, data)
)
def _add_account3(
bot: telebot.TeleBot,
engine: Engine,
prev_mes: Message,
data: dict[str, str],
mes: Message,
) -> None:
_base_handler(bot, mes, prev_mes)
text = mes.text.strip()
if text == "/cancel":
return _send_tmp_message(bot, mes.chat.id, "Успешная отмена")
if not check_login(text):
return _send_tmp_message(bot, mes.chat.id, "Не корректный логин")
data["login"] = text
bot_mes = bot.send_message(mes.chat.id, "Отправьте пароль от аккаунта")
bot.register_next_step_handler(
mes, functools.partial(_add_account4, bot, engine, bot_mes, data)
)
def _add_account4(
bot: telebot.TeleBot,
engine: Engine,
prev_mes: Message,
data: dict[str, str],
mes: Message,
) -> None:
_base_handler(bot, mes, prev_mes)
text = mes.text.strip()
if text == "/cancel":
return _send_tmp_message(bot, mes.chat.id, "Успешная отмена")
if not check_password(text):
return _send_tmp_message(bot, mes.chat.id, "Не корректный пароль")
data["passwd"] = text
bot_mes = bot.send_message(mes.chat.id, "Отправьте мастер пароль")
bot.register_next_step_handler(
mes, functools.partial(_add_account5, bot, engine, bot_mes, data)
)
def _add_account5(
bot: telebot.TeleBot,
engine: Engine,
prev_mes: Message,
data: dict[str, str],
mes: Message,
) -> None:
_base_handler(bot, mes, prev_mes)
text = mes.text.strip()
if text == "/cancel":
return _send_tmp_message(bot, mes.chat.id, "Успешная отмена")
salt, hash_ = database.get.get_master_pass(engine, mes.from_user.id)
if not encryption.master_pass.check_master_pass(text, hash_, salt):
return _send_tmp_message(
bot,
mes.chat.id,
"Не подходит главный пароль",
)
name, login, passwd = data["name"], data["login"], data["passwd"]
enc_login, enc_pass, salt = encryption.other_accounts.encrypt(
login,
passwd,
text,
)
result = database.add.add_account(
engine, mes.from_user.id, name, salt, enc_login, enc_pass
)
_send_tmp_message(
bot,
mes.chat.id,
"Успех" if result else "Произошла не предвиденная ошибка",
)
del data, name, login, passwd, enc_login
gc.collect()
def get_account(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None:
_base_handler(bot, mes)
bot_mes = bot.send_message(mes.chat.id, "Отправьте название аккаунта")
master_pass = database.get.get_master_pass(engine, mes.from_user.id)
if master_pass is None:
return _send_tmp_message(bot, mes.chat.id, "Нет мастер пароля")
bot.register_next_step_handler(
mes, functools.partial(_get_account2, bot, engine, bot_mes)
)
def _get_account2(
bot: telebot.TeleBot, engine: Engine, prev_mes: Message, mes: Message
) -> None:
_base_handler(bot, mes, prev_mes)
text = mes.text.strip()
if text == "/cancel":
return _send_tmp_message(bot, mes.chat.id, "Успешная отмена")
if text not in database.get.get_accounts(engine, mes.from_user.id):
return _send_tmp_message(bot, mes.chat.id, "Нет такого аккаунта")
bot_mes = bot.send_message(mes.chat.id, "Отправьте мастер пароль")
bot.register_next_step_handler(
mes, functools.partial(_get_account3, bot, engine, bot_mes, text)
)
def _get_account3(
bot: telebot.TeleBot,
engine: Engine,
prev_mes: Message,
name: str,
mes: Message,
) -> None:
_base_handler(bot, mes, prev_mes)
text = mes.text.strip()
if text == "/cancel":
return _send_tmp_message(bot, mes.chat.id, "Успешная отмена")
master_salt, hash_pass = database.get.get_master_pass(
engine,
mes.from_user.id,
)
if not encryption.master_pass.check_master_pass(
text,
hash_pass,
master_salt,
):
return _send_tmp_message(bot, mes.chat.id, "Не подходит мастер пароль")
salt, enc_login, enc_pass = database.get.get_account_info(
engine, mes.from_user.id, name
)
login, passwd = encryption.other_accounts.decrypt(
enc_login,
enc_pass,
text,
salt,
)
_send_tmp_message(
bot,
mes.chat.id,
f"Логин:\n`{login}`\nПароль:\n`{passwd}`\nНажмите на логин "
"или пароль, чтобы скопировать",
30,
)
del text, mes, passwd, login
gc.collect()
def delete_account(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None:
_base_handler(bot, mes)
master_pass = database.get.get_master_pass(engine, mes.from_user.id)
if master_pass is None:
return _send_tmp_message(bot, mes.chat.id, "Нет мастер пароля")
bot_mes = bot.send_message(
mes.chat.id, "Отправьте название аккаунта, который вы хотите удалить"
)
bot.register_next_step_handler(
mes, functools.partial(_delete_account2, bot, engine, bot_mes)
)
def _delete_account2(
bot: telebot.TeleBot, engine: Engine, prev_mes: Message, mes: Message
) -> None:
_base_handler(bot, mes, prev_mes)
text = mes.text.strip()
if text == "/cancel":
return _send_tmp_message(bot, mes.chat.id, "Успешная отмена")
if text not in database.get.get_accounts(engine, mes.from_user.id):
return _send_tmp_message(bot, mes.chat.id, "Нет такого аккаунта")
database.delete.delete_account(engine, mes.from_user.id, text)
_send_tmp_message(bot, mes.chat.id, "Аккаунт удалён")
def help_command(bot: telebot.TeleBot, mes: Message) -> None:
message = """Команды:
/set_master_pass - установить мастер пароль
/add_account - создать аккаунт
/get_accounts - получить список аккаунтов
/get_account - получить логин и пароль аккаунта
/delete_account - удалить аккаунт
/delete_all - удалить все аккаунты и мастер пароль
/reset_master_pass - удалить все аккаунты и изменить мастер пароль
/cancel - отмена текущего действия
/help - помощь
/export - получить пароли в json формате
/import - импортировать пароли из json в файле в таком же формате, \
как из /export
/gen_password - создать 10 надёжных паролей"""
bot.send_message(mes.chat.id, message)
def cancel(bot: telebot.TeleBot, mes: Message) -> None:
_base_handler(bot, mes)
_send_tmp_message(bot, mes.chat.id, "Нет активного действия")
def export(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None:
_base_handler(bot, mes)
master_password_from_db = database.get.get_master_pass(
engine,
mes.from_user.id,
)
if master_password_from_db is None:
return _send_tmp_message(bot, mes.chat.id, "Нет мастер пароля")
if not database.get.get_accounts(engine, mes.from_user.id):
return _send_tmp_message(bot, mes.chat.id, "Нет аккаунтов")
bot_mes = bot.send_message(mes.chat.id, "Отправьте мастер пароль")
bot.register_next_step_handler(
mes, functools.partial(_export2, bot, engine, bot_mes)
)
def _export2(
bot: telebot.TeleBot, engine: Engine, prev_mes: Message, mes: Message
) -> None:
_base_handler(bot, mes, prev_mes)
text = mes.text.strip()
if text == "/cancel":
return _send_tmp_message(bot, mes.chat.id, "Успешная отмена")
master_salt, hash_pass = database.get.get_master_pass(
engine,
mes.from_user.id,
)
if not encryption.master_pass.check_master_pass(
text,
hash_pass,
master_salt,
):
return _send_tmp_message(bot, mes.chat.id, "Не подходит мастер пароль")
accounts = database.get.get_all_accounts(engine, mes.from_user.id)
accounts = encryption.other_accounts.decrypt_multiple(accounts, text)
json_io = accounts_to_json(accounts)
bot_mes = bot.send_document(mes.chat.id, json_io)
del text, accounts, json_io
gc.collect()
time.sleep(30)
bot.delete_message(bot_mes.chat.id, bot_mes.id)
def import_accounts(
bot: telebot.TeleBot,
engine: Engine,
mes: Message,
) -> None:
_base_handler(bot, mes)
master_password_from_db = database.get.get_master_pass(
engine,
mes.from_user.id,
)
if master_password_from_db is None:
return _send_tmp_message(bot, mes.chat.id, "Нет мастер пароля")
bot_mes = bot.send_message(mes.chat.id, "Отправьте json файл")
bot.register_next_step_handler(
mes, functools.partial(_import2, bot, engine, bot_mes)
)
def _import2(
bot: telebot.TeleBot, engine: Engine, prev_mes: Message, mes: Message
) -> None:
_base_handler(bot, mes, prev_mes)
if mes.text is not None:
text = mes.text.strip()
if text == "/cancel":
return _send_tmp_message(bot, mes.chat.id, "Успешная отмена")
if mes.document is None:
return _send_tmp_message(
bot,
mes.chat.id,
"Вы должны отправить документ",
)
if mes.document.file_size > 102_400: # If file size is bigger that 100 MB
return _send_tmp_message(bot, mes.chat.id, "Файл слишком большой")
file_info = bot.get_file(mes.document.file_id)
downloaded_file = bot.download_file(file_info.file_path)
try:
accounts = json_to_accounts(downloaded_file.decode("utf-8"))
except Exception:
return _send_tmp_message(
bot,
mes.chat.id,
"Ошибка во время работы с файлом",
)
bot_mes = bot.send_message(mes.chat.id, "Отправьте мастер пароль")
bot.register_next_step_handler(
mes, functools.partial(_import3, bot, engine, bot_mes, accounts)
)
def _import3(
bot: telebot.TeleBot,
engine: Engine,
prev_mes: Message,
accounts: list[tuple[str, str, str]],
mes: Message,
) -> None:
_base_handler(bot, mes, prev_mes)
text = mes.text.strip()
if text == "/cancel":
return _send_tmp_message(bot, mes.chat.id, "Успешная отмена")
master_salt, hash_pass = database.get.get_master_pass(
engine,
mes.from_user.id,
)
if not encryption.master_pass.check_master_pass(
text,
hash_pass,
master_salt,
):
return _send_tmp_message(bot, mes.chat.id, "Не подходит мастер пароль")
# List of names of accounts, which failed to be added to the database
# or failed the tests
failed: list[str] = []
for account in accounts:
name, login, passwd = account
if not check_account(name, login, passwd):
failed.append(name)
continue
enc_login, enc_passwd, salt = encryption.other_accounts.encrypt(
login,
passwd,
text,
)
result = database.add.add_account(
engine, mes.from_user.id, name, salt, enc_login, enc_passwd
)
if not result:
failed.append(name)
if failed:
mes_text = "Не удалось добавить:\n" + "\n".join(failed)
else:
mes_text = "Успех"
_send_tmp_message(bot, mes.chat.id, mes_text, 10)
del text, mes, accounts
gc.collect()
def gen_password(bot: telebot.TeleBot, mes: Message) -> None:
_base_handler(bot, mes)
# Generate 10 passwords and put 'em in the backticks
passwords = (f"`{generate_password.gen_password()}`" for _ in range(10))
text = (
"Пароли:\n"
+ "\n".join(passwords)
+ "\nНажмите на пароль, чтобы его скопировать"
)
_send_tmp_message(bot, mes.chat.id, text, 15)

View File

@ -1,95 +0,0 @@
import asyncio
from typing import Any, Awaitable, Callable
import telebot
from telebot.async_telebot import AsyncTeleBot
from . import markups
Message = telebot.types.Message
Handler = Callable[[Message], Awaitable[Any]]
Markups = (
telebot.types.ReplyKeyboardMarkup
| telebot.types.InlineKeyboardMarkup
| telebot.types.ReplyKeyboardRemove
)
states: dict[tuple[int, int], Handler] = {}
def register_state(
message: Message,
handler: Callable[[Message], Any],
) -> None:
states[(message.chat.id, message.from_user.id)] = handler
def reset_state(message: Message) -> None:
try:
del states[(message.chat.id, message.from_user.id)]
except KeyError:
pass
def get_state(message: Message) -> Handler | None:
return states.get((message.chat.id, message.from_user.id))
async def delete_message(
bot: AsyncTeleBot,
mes: Message,
*,
sleep_time: int = 0,
) -> bool:
try:
if sleep_time != 0:
await asyncio.sleep(sleep_time)
await bot.delete_message(mes.chat.id, mes.id)
except telebot.apihelper.ApiException:
return False
else:
return True
async def send_tmp_message(
bot: AsyncTeleBot,
chat_id: telebot.types.Message,
text: str,
*,
sleep_time: int = 5,
markup: Markups | None = None,
) -> None:
bot_mes = await bot.send_message(
chat_id, text, parse_mode="MarkdownV2", reply_markup=markup
)
await delete_message(bot, bot_mes, sleep_time=sleep_time)
async def base_handler(
bot: AsyncTeleBot, mes: Message, prev_mes: Message | None = None
) -> None:
reset_state(mes)
await delete_message(bot, mes)
if prev_mes is not None:
await delete_message(bot, prev_mes)
async def send_deleteable_message(
bot: AsyncTeleBot,
chat_id: int,
text: str,
) -> None:
"""Sends a message with a delete button"""
markup = markups.deletion_markup()
await bot.send_message(
chat_id,
text,
parse_mode="MarkdownV2",
reply_markup=markup,
)
def escape(text: str) -> str:
escaped_chars = "*_~|`[("
for char in escaped_chars:
text = text.replace(char, rf"\\{char}")
return text

View File

@ -1,18 +0,0 @@
from telebot.types import (
InlineKeyboardButton,
InlineKeyboardMarkup,
ReplyKeyboardMarkup,
)
def deletion_markup() -> InlineKeyboardMarkup:
markup = InlineKeyboardMarkup(row_width=5)
button = InlineKeyboardButton("Удалить сообщение", callback_data="DELETE")
markup.add(button)
return markup
def account_markup(account_names: list[str]) -> ReplyKeyboardMarkup:
markup = ReplyKeyboardMarkup(resize_keyboard=True, row_width=2)
markup.add(*account_names)
return markup

View File

@ -1,683 +0,0 @@
import functools
import gc
import telebot
from sqlalchemy.future import Engine
from telebot.async_telebot import AsyncTeleBot
from .. import database, encryption, generate_password
from ..account_checks import (
check_account,
check_account_name,
check_login,
check_password,
)
from ..account_parsing import accounts_to_json, json_to_accounts
from ..decrypted_account import DecryptedAccount
from . import markups
from .helper_functions import (
base_handler,
delete_message,
escape,
get_state,
register_state,
send_deleteable_message,
send_tmp_message,
)
Message = telebot.types.Message
async def get_accounts(
bot: AsyncTeleBot,
engine: Engine,
mes: Message,
) -> None:
await base_handler(bot, mes)
accounts = database.get.get_accounts(
engine,
mes.from_user.id,
to_sort=True,
)
if not accounts:
return await send_tmp_message(bot, mes.chat.id, "У вас нет аккаунтов")
# Make accounts copyable and escape special chars
accounts = [f"`{account}`" for account in accounts]
await send_deleteable_message(
bot,
mes.chat.id,
"Ваши аккаунты:\n"
+ "\n".join(accounts)
+ "\nНажмите на название, чтобы скопировать\n"
f"Всего {len(accounts)}",
)
async def delete_all(bot: AsyncTeleBot, engine: Engine, mes: Message) -> None:
await base_handler(bot, mes)
bot_mes = await bot.send_message(
mes.chat.id,
"Вы действительно хотите удалить все ваши аккаунты? Это действие "
"нельзя отменить. "
"Отправьте YES для подтверждения",
)
register_state(mes, functools.partial(_delete_all2, bot, engine, bot_mes))
async def _delete_all2(
bot: AsyncTeleBot, engine: Engine, prev_mes: Message, mes: Message
) -> None:
await base_handler(bot, mes, prev_mes)
text = mes.text.strip()
if text == "YES":
database.delete.purge_accounts(engine, mes.from_user.id)
database.delete.delete_master_pass(engine, mes.from_user.id)
await send_tmp_message(
bot,
mes.chat.id,
"Всё успешно удалено",
sleep_time=10,
)
else:
await send_tmp_message(
bot,
mes.chat.id,
"Вы отправили не YES, ничего не удалено",
)
async def set_master_password(
bot: AsyncTeleBot,
engine: Engine,
mes: Message,
) -> None:
await base_handler(bot, mes, None)
if database.get.get_master_pass(engine, mes.from_user.id) is not None:
return await send_tmp_message(
bot,
mes.chat.id,
"Мастер пароль уже существует",
)
bot_mes = await bot.send_message(mes.chat.id, "Отправьте мастер пароль")
register_state(
mes,
functools.partial(_set_master_pass2, bot, engine, bot_mes),
)
async def _set_master_pass2(
bot: AsyncTeleBot, engine: Engine, prev_mes: Message, mes: Message
) -> None:
await base_handler(bot, mes, prev_mes)
text = mes.text.strip()
if text == "/cancel":
return await send_tmp_message(bot, mes.chat.id, "Успешная отмена")
master_password = encryption.master_pass.encrypt_master_pass(
mes.from_user.id,
text,
)
database.add.add_master_pass(engine, master_password)
await send_tmp_message(bot, mes.chat.id, "Успех")
del mes, text
gc.collect()
async def reset_master_pass(
bot: AsyncTeleBot,
engine: Engine,
mes: Message,
) -> None:
await base_handler(bot, mes)
if database.get.get_master_pass(engine, mes.from_user.id) is None:
return await send_tmp_message(
bot,
mes.chat.id,
"Мастер пароль не задан",
)
bot_mes = await bot.send_message(
mes.chat.id,
"Отправьте новый мастер пароль, осторожно, все текущие аккаунты "
"будут удалены навсегда",
)
register_state(
mes,
functools.partial(_reset_master_pass2, bot, engine, bot_mes),
)
async def _reset_master_pass2(
bot: AsyncTeleBot, engine: Engine, prev_mes: Message, mes: Message
) -> None:
await base_handler(bot, mes, prev_mes)
text = mes.text.strip()
if text == "/cancel":
return await send_tmp_message(bot, mes.chat.id, "Успешная отмена")
master_password = encryption.master_pass.encrypt_master_pass(
mes.from_user.id,
text,
)
database.delete.purge_accounts(engine, mes.from_user.id)
database.change.change_master_pass(engine, master_password)
await send_tmp_message(
bot, mes.chat.id, "Все ваши аккаунты удалены, а мастер пароль изменён"
)
del mes, text
gc.collect()
async def add_account(bot: AsyncTeleBot, engine: Engine, mes: Message) -> None:
await base_handler(bot, mes)
master_password_from_db = database.get.get_master_pass(
engine,
mes.from_user.id,
)
if master_password_from_db is None:
return await send_tmp_message(bot, mes.chat.id, "Нет мастер пароля")
bot_mes = await bot.send_message(
mes.chat.id,
"Отправьте название аккаунта",
)
register_state(
mes,
functools.partial(_add_account2, bot, engine, bot_mes),
)
async def _add_account2(
bot: AsyncTeleBot, engine: Engine, prev_mes: Message, mes: Message
) -> None:
await base_handler(bot, mes, prev_mes)
text = mes.text.strip()
if text == "/cancel":
return await send_tmp_message(bot, mes.chat.id, "Успешная отмена")
if not check_account_name(text):
return await send_tmp_message(
bot,
mes.chat.id,
"Не корректное название аккаунта",
)
if text in database.get.get_accounts(engine, mes.from_user.id):
return await send_tmp_message(
bot, mes.chat.id, "Аккаунт с таким именем уже существует"
)
bot_mes = await bot.send_message(mes.chat.id, "Отправьте логин")
data = {"name": text}
register_state(
mes,
functools.partial(_add_account3, bot, engine, bot_mes, data),
)
async def _add_account3(
bot: AsyncTeleBot,
engine: Engine,
prev_mes: Message,
data: dict[str, str],
mes: Message,
) -> None:
await base_handler(bot, mes, prev_mes)
text = mes.text.strip()
if text == "/cancel":
return await send_tmp_message(bot, mes.chat.id, "Успешная отмена")
if not check_login(text):
return await send_tmp_message(bot, mes.chat.id, "Не корректный логин")
data["login"] = text
bot_mes = await bot.send_message(
mes.chat.id,
"Отправьте пароль от аккаунта",
)
register_state(
mes,
functools.partial(_add_account4, bot, engine, bot_mes, data),
)
async def _add_account4(
bot: AsyncTeleBot,
engine: Engine,
prev_mes: Message,
data: dict[str, str],
mes: Message,
) -> None:
await base_handler(bot, mes, prev_mes)
text = mes.text.strip()
if text == "/cancel":
return await send_tmp_message(bot, mes.chat.id, "Успешная отмена")
if not check_password(text):
return await send_tmp_message(
bot,
mes.chat.id,
"Не корректный пароль",
)
data["passwd"] = text
bot_mes = await bot.send_message(mes.chat.id, "Отправьте мастер пароль")
register_state(
mes,
functools.partial(_add_account5, bot, engine, bot_mes, data),
)
async def _add_account5(
bot: AsyncTeleBot,
engine: Engine,
prev_mes: Message,
data: dict[str, str],
mes: Message,
) -> None:
await base_handler(bot, mes, prev_mes)
text = mes.text.strip()
if text == "/cancel":
return await send_tmp_message(bot, mes.chat.id, "Успешная отмена")
master_password = database.get.get_master_pass(engine, mes.from_user.id)
if not encryption.master_pass.check_master_pass(text, master_password):
return await send_tmp_message(
bot,
mes.chat.id,
"Не подходит главный пароль",
)
# name, login, passwd = data["name"], data["login"], data["passwd"]
account = DecryptedAccount(
user_id=mes.from_user.id,
name=data["name"],
login=data["login"],
password=data["passwd"],
)
encrypted_account = encryption.other_accounts.encrypt(
account,
text,
)
result = database.add.add_account(
engine,
encrypted_account,
)
await send_tmp_message(
bot,
mes.chat.id,
"Успех" if result else "Произошла не предвиденная ошибка",
)
del data, account
gc.collect()
async def get_account(bot: AsyncTeleBot, engine: Engine, mes: Message) -> None:
await base_handler(bot, mes)
master_pass = database.get.get_master_pass(engine, mes.from_user.id)
if master_pass is None:
return await send_tmp_message(bot, mes.chat.id, "Нет мастер пароля")
accounts = database.get.get_accounts(
engine,
mes.from_user.id,
to_sort=True,
)
markup = markups.account_markup(accounts)
bot_mes = await bot.send_message(
mes.chat.id, "Отправьте название аккаунта", reply_markup=markup
)
register_state(
mes,
functools.partial(_get_account2, bot, engine, bot_mes),
)
async def _get_account2(
bot: AsyncTeleBot, engine: Engine, prev_mes: Message, mes: Message
) -> None:
await base_handler(bot, mes, prev_mes)
text = mes.text.strip()
if text == "/cancel":
return await send_tmp_message(bot, mes.chat.id, "Успешная отмена")
if text not in database.get.get_accounts(engine, mes.from_user.id):
return await send_tmp_message(bot, mes.chat.id, "Нет такого аккаунта")
bot_mes = await bot.send_message(mes.chat.id, "Отправьте мастер пароль")
register_state(
mes,
functools.partial(_get_account3, bot, engine, bot_mes, text),
)
async def _get_account3(
bot: AsyncTeleBot,
engine: Engine,
prev_mes: Message,
name: str,
mes: Message,
) -> None:
await base_handler(bot, mes, prev_mes)
text = mes.text.strip()
if text == "/cancel":
return await send_tmp_message(bot, mes.chat.id, "Успешная отмена")
master_password = database.get.get_master_pass(
engine,
mes.from_user.id,
)
if not encryption.master_pass.check_master_pass(text, master_password):
return await send_tmp_message(
bot,
mes.chat.id,
"Не подходит мастер пароль",
)
account = database.get.get_account_info(engine, mes.from_user.id, name)
account = encryption.other_accounts.decrypt(
account,
text,
)
await send_deleteable_message(
bot,
mes.chat.id,
f"Название:\n{escape(account.name)}\n"
f"Логин:\n`{account.login}`\nПароль:\n`{account.password}`\nНажмите "
"на логин или пароль, чтобы скопировать",
)
del text, mes
gc.collect()
async def delete_account(
bot: AsyncTeleBot,
engine: Engine,
mes: Message,
) -> None:
await base_handler(bot, mes)
master_pass = database.get.get_master_pass(engine, mes.from_user.id)
if master_pass is None:
return await send_tmp_message(bot, mes.chat.id, "Нет мастер пароля")
accounts = database.get.get_accounts(
engine,
mes.from_user.id,
to_sort=True,
)
markup = markups.account_markup(accounts)
bot_mes = await bot.send_message(
mes.chat.id,
"Отправьте название аккаунта, который вы хотите удалить",
reply_markup=markup,
)
register_state(
mes,
functools.partial(_delete_account2, bot, engine, bot_mes),
)
async def _delete_account2(
bot: AsyncTeleBot,
engine: Engine,
prev_mes: Message,
mes: Message,
):
await base_handler(bot, mes, prev_mes)
text = mes.text.strip()
if text == "/cancel":
return await send_tmp_message(bot, mes.chat.id, "Успешная отмена")
if text not in database.get.get_accounts(engine, mes.from_user.id):
return await send_tmp_message(bot, mes.chat.id, "Нет такого аккаунта")
bot_mes = await bot.send_message(
mes.from_user.id,
f'Вы уверены, что хотите удалить аккаунт "{text}"?\nОтправьте YES для '
"подтверждения",
)
register_state(
mes,
functools.partial(_delete_account3, bot, engine, bot_mes, text),
)
async def _delete_account3(
bot: AsyncTeleBot,
engine: Engine,
prev_mes: Message,
account_name: str,
mes: Message,
) -> None:
await base_handler(bot, mes, prev_mes)
text = mes.text.strip()
if text != "YES":
return await send_tmp_message(bot, mes.chat.id, "Успешная отмена")
database.delete.delete_account(engine, mes.from_user.id, account_name)
await send_tmp_message(bot, mes.chat.id, "Аккаунт удалён")
async def help_command(bot: AsyncTeleBot, mes: Message) -> None:
message = """Команды:
/set_master_pass - установить мастер пароль
/add_account - создать аккаунт
/get_accounts - получить список аккаунтов
/get_account - получить логин и пароль аккаунта
/delete_account - удалить аккаунт
/delete_all - удалить все аккаунты и мастер пароль
/reset_master_pass - удалить все аккаунты и изменить мастер пароль
/cancel - отмена текущего действия
/help - помощь
/export - получить пароли в json формате
/import - импортировать пароли из json в файле в таком же формате, \
как из /export
/gen_password - создать 10 надёжных паролей"""
await bot.send_message(mes.chat.id, message)
async def export(bot: AsyncTeleBot, engine: Engine, mes: Message) -> None:
await base_handler(bot, mes)
master_password_from_db = database.get.get_master_pass(
engine,
mes.from_user.id,
)
if master_password_from_db is None:
return await send_tmp_message(bot, mes.chat.id, "Нет мастер пароля")
if not database.get.get_accounts(engine, mes.from_user.id):
return await send_tmp_message(bot, mes.chat.id, "Нет аккаунтов")
bot_mes = await bot.send_message(mes.chat.id, "Отправьте мастер пароль")
register_state(mes, functools.partial(_export2, bot, engine, bot_mes))
async def _export2(
bot: AsyncTeleBot, engine: Engine, prev_mes: Message, mes: Message
) -> None:
await base_handler(bot, mes, prev_mes)
text = mes.text.strip()
if text == "/cancel":
return await send_tmp_message(bot, mes.chat.id, "Успешная отмена")
master_password = database.get.get_master_pass(
engine,
mes.from_user.id,
)
if not encryption.master_pass.check_master_pass(text, master_password):
return await send_tmp_message(
bot,
mes.chat.id,
"Не подходит мастер пароль",
)
accounts = database.get.get_all_accounts(engine, mes.from_user.id)
accounts = encryption.other_accounts.decrypt_multiple(accounts, text)
json_io = accounts_to_json(accounts)
await bot.send_document(
mes.chat.id,
json_io,
reply_markup=markups.deletion_markup(),
)
del text, accounts, json_io
gc.collect()
async def import_accounts(
bot: AsyncTeleBot,
engine: Engine,
mes: Message,
) -> None:
await base_handler(bot, mes)
master_password_from_db = database.get.get_master_pass(
engine,
mes.from_user.id,
)
if master_password_from_db is None:
return await send_tmp_message(bot, mes.chat.id, "Нет мастер пароля")
bot_mes = await bot.send_message(mes.chat.id, "Отправьте json файл")
register_state(mes, functools.partial(_import2, bot, engine, bot_mes))
async def _import2(
bot: AsyncTeleBot, engine: Engine, prev_mes: Message, mes: Message
) -> None:
await base_handler(bot, mes, prev_mes)
if mes.text is not None:
text = mes.text.strip()
if text == "/cancel":
return await send_tmp_message(bot, mes.chat.id, "Успешная отмена")
if mes.document is None:
return await send_tmp_message(
bot,
mes.chat.id,
"Вы должны отправить документ",
)
if mes.document.file_size > 102_400: # If file size is bigger than 100 MB
return await send_tmp_message(
bot,
mes.chat.id,
"Файл слишком большой",
)
file_info = await bot.get_file(mes.document.file_id)
downloaded_file = await bot.download_file(file_info.file_path)
try:
accounts = json_to_accounts(
downloaded_file.decode("utf-8"),
mes.from_user.id,
)
except Exception:
return await send_tmp_message(
bot,
mes.chat.id,
"Ошибка во время работы с файлом",
)
bot_mes = await bot.send_message(mes.chat.id, "Отправьте мастер пароль")
register_state(
mes,
functools.partial(_import3, bot, engine, bot_mes, accounts),
)
async def _import3(
bot: AsyncTeleBot,
engine: Engine,
prev_mes: Message,
accounts: list[DecryptedAccount],
mes: Message,
) -> None:
await base_handler(bot, mes, prev_mes)
text = mes.text.strip()
if text == "/cancel":
return await send_tmp_message(bot, mes.chat.id, "Успешная отмена")
master_password = database.get.get_master_pass(
engine,
mes.from_user.id,
)
if not encryption.master_pass.check_master_pass(text, master_password):
return await send_tmp_message(
bot,
mes.chat.id,
"Не подходит мастер пароль",
)
# List of names of accounts, which failed to be added to the database
# or failed the tests
failed: list[str] = []
for account in accounts:
if not check_account(account):
failed.append(account.name)
continue
account = encryption.other_accounts.encrypt(
account,
text,
)
result = database.add.add_account(engine, account)
if not result:
failed.append(account.name)
if failed:
mes_text = "Не удалось добавить:\n" + "\n".join(failed)
else:
mes_text = "Успех"
await send_tmp_message(bot, mes.chat.id, mes_text, 10)
del text, mes, accounts
gc.collect()
async def gen_password(bot: AsyncTeleBot, mes: Message) -> None:
await base_handler(bot, mes)
# Generate 10 passwords and put them in the backticks
passwords = (f"`{generate_password.gen_password()}`" for _ in range(10))
text = (
"Пароли:\n"
+ "\n".join(passwords)
+ "\nНажмите на пароль, чтобы его скопировать"
)
await send_deleteable_message(bot, mes.chat.id, text)
async def message_handler(bot: AsyncTeleBot, mes: Message) -> None:
handler = get_state(mes)
if handler is None:
await delete_message(bot, mes)
if mes.text.strip() == "/cancel":
await send_tmp_message(bot, mes.chat.id, "Нет активного действия")
await send_tmp_message(
bot,
mes.chat.id,
"Вы отправили не корректное сообщение",
)
return
await handler(mes)

View File

@ -1,3 +1,3 @@
from . import add, change, delete, get, models, prepare from . import add, delete, get, models, prepare, change
__all__ = ["add", "delete", "get", "models", "prepare", "change"] __all__ = ["add", "delete", "get", "models", "prepare", "change"]

View File

@ -5,9 +5,23 @@ from sqlalchemy.future import Engine
from . import models from . import models
def add_account(engine: Engine, account: models.Account) -> bool: def add_account(
engine: Engine,
user_id: int,
name: str,
salt: bytes,
enc_login: bytes,
enc_password: bytes,
) -> bool:
"""Adds account to the database. Returns true on success, """Adds account to the database. Returns true on success,
false otherwise""" false otherwise"""
account = models.Account(
user_id=user_id,
name=name,
salt=salt,
enc_login=enc_login,
enc_password=enc_password,
)
try: try:
with sqlmodel.Session(engine) as session: with sqlmodel.Session(engine) as session:
session.add(account) session.add(account)
@ -18,9 +32,19 @@ def add_account(engine: Engine, account: models.Account) -> bool:
return True return True
def add_master_pass(engine: Engine, master_pass: models.MasterPass) -> bool: def add_master_pass(
engine: Engine,
user_id: int,
salt: bytes,
password_hash: bytes,
) -> bool:
"""Adds master password the database. Returns true on success, """Adds master password the database. Returns true on success,
false otherwise""" false otherwise"""
master_pass = models.MasterPass(
user_id=user_id,
salt=salt,
password_hash=password_hash,
)
try: try:
with sqlmodel.Session(engine) as session: with sqlmodel.Session(engine) as session:
session.add(master_pass) session.add(master_pass)

View File

@ -5,17 +5,13 @@ from . import models
def change_master_pass( def change_master_pass(
engine: Engine, engine: Engine, user_id: int, salt: bytes, password: bytes
master_password: models.MasterPass,
) -> None: ) -> None:
"""Changes master password and salt in the database""" """Changes master password and salt in the database"""
statement = ( statement = (
sqlmodel.update(models.MasterPass) sqlmodel.update(models.MasterPass)
.where(models.MasterPass.user_id == master_password.user_id) .where(models.MasterPass.user_id == user_id)
.values( .values(salt=salt, password_hash=password)
salt=master_password.salt,
password_hash=master_password.password_hash,
)
) )
with sqlmodel.Session(engine) as session: with sqlmodel.Session(engine) as session:
session.exec(statement) session.exec(statement)

View File

@ -1,3 +1,5 @@
from typing import Iterator
import sqlmodel import sqlmodel
from sqlalchemy.future import Engine from sqlalchemy.future import Engine
@ -7,35 +9,21 @@ from . import models
def get_master_pass( def get_master_pass(
engine: Engine, engine: Engine,
user_id: int, user_id: int,
) -> models.MasterPass | None: ) -> tuple[bytes, bytes] | None:
"""Gets master password of a user""" """Gets master pass. Returns tuple of salt and password
or None if it wasn't found"""
statement = sqlmodel.select(models.MasterPass).where( statement = sqlmodel.select(models.MasterPass).where(
models.MasterPass.user_id == user_id, models.MasterPass.user_id == user_id,
) )
with sqlmodel.Session(engine) as session: with sqlmodel.Session(engine) as session:
result = session.exec(statement).first() result = session.exec(statement).first()
return result if result is None:
return
return (result.salt, result.password_hash)
def get_accounts( def get_accounts(engine: Engine, user_id: int) -> list[str]:
engine: Engine, """Gets list of account names"""
user_id: int,
*,
to_sort: bool = False,
) -> list[str]:
"""Gets a list of account names of a user"""
statement = sqlmodel.select(models.Account.name).where(
models.Account.user_id == user_id,
)
if to_sort:
statement = statement.order_by(models.Account.name)
with sqlmodel.Session(engine) as session:
result = session.exec(statement).fetchall()
return result
def get_all_accounts(engine: Engine, user_id: int) -> list[models.Account]:
"""Returns a list of accounts of a user"""
statement = ( statement = (
sqlmodel.select(models.Account) sqlmodel.select(models.Account)
.where( .where(
@ -44,20 +32,46 @@ def get_all_accounts(engine: Engine, user_id: int) -> list[models.Account]:
.order_by(models.Account.name) .order_by(models.Account.name)
) )
with sqlmodel.Session(engine) as session: with sqlmodel.Session(engine) as session:
result = session.exec(statement).fetchall() result = session.exec(statement)
return result return [account.name for account in result]
def get_all_accounts(
engine: Engine, user_id: int
) -> Iterator[tuple[str, bytes, bytes, bytes]]:
"""Returns an iterator of tuples, where values represent account's
name, salt, encrypted login and encrypted password"""
statement = (
sqlmodel.select(models.Account)
.where(
models.Account.user_id == user_id,
)
.order_by(models.Account.name)
)
with sqlmodel.Session(engine) as session:
result = session.exec(statement)
yield from (
(
account.name,
account.salt,
account.enc_login,
account.enc_password,
)
for account in result
)
def get_account_info( def get_account_info(
engine: Engine, engine: Engine, user_id: int, name: str
user_id: int, ) -> tuple[bytes, bytes, bytes]:
name: str, """Gets account info. Returns tuple of salt, login and password
) -> models.Account: or None if it wasn't found"""
"""Gets account info"""
statement = sqlmodel.select(models.Account).where( statement = sqlmodel.select(models.Account).where(
models.Account.user_id == user_id, models.Account.user_id == user_id,
models.Account.name == name, models.Account.name == name,
) )
with sqlmodel.Session(engine) as session: with sqlmodel.Session(engine) as session:
result = session.exec(statement).first() result = session.exec(statement).first()
return result if result is None:
return
return (result.salt, result.enc_login, result.enc_password)

View File

@ -1,8 +0,0 @@
import pydantic
class DecryptedAccount(pydantic.BaseModel):
user_id: int
name: str
login: str
password: str

View File

@ -3,8 +3,6 @@ import os
from cryptography.exceptions import InvalidKey from cryptography.exceptions import InvalidKey
from cryptography.hazmat.primitives.kdf.scrypt import Scrypt from cryptography.hazmat.primitives.kdf.scrypt import Scrypt
from ..database.models import MasterPass
MEMORY_USAGE = 2**14 MEMORY_USAGE = 2**14
@ -19,23 +17,22 @@ def _get_kdf(salt: bytes) -> Scrypt:
return kdf return kdf
def encrypt_master_pass(user_id: int, password: str) -> MasterPass: def encrypt_master_pass(password: str) -> tuple[bytes, bytes]:
"""Hashes master password and returns MasterPass object""" """Hashes master password and return tuple of hashed password and salt"""
salt = os.urandom(64) salt = os.urandom(64)
kdf = _get_kdf(salt) kdf = _get_kdf(salt)
password_hash = kdf.derive(password.encode("utf-8")) return kdf.derive(password.encode("utf-8")), salt
return MasterPass(
user_id=user_id,
password_hash=password_hash,
salt=salt,
)
def check_master_pass(password: str, master_password: MasterPass) -> bool: def check_master_pass(
password: str,
enc_password: bytes,
salt: bytes,
) -> bool:
"""Checks if the master password is correct""" """Checks if the master password is correct"""
kdf = _get_kdf(master_password.salt) kdf = _get_kdf(salt)
try: try:
kdf.verify(password.encode("utf-8"), master_password.password_hash) kdf.verify(password.encode("utf-8"), enc_password)
except InvalidKey: except InvalidKey:
return False return False
else: else:

View File

@ -1,18 +1,14 @@
import base64 import base64
import os import os
from typing import Iterable, Iterator from typing import Iterator
from cryptography.fernet import Fernet from cryptography.fernet import Fernet
from cryptography.hazmat.backends import default_backend from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from ..database.models import Account
from ..decrypted_account import DecryptedAccount
def _generate_key(salt: bytes, master_pass: bytes) -> bytes: def _generate_key(salt: bytes, master_pass: bytes) -> bytes:
"""Generates key for fernet encryption"""
kdf = PBKDF2HMAC( kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(), algorithm=hashes.SHA256(),
length=32, length=32,
@ -25,58 +21,49 @@ def _generate_key(salt: bytes, master_pass: bytes) -> bytes:
def encrypt( def encrypt(
account: DecryptedAccount, login: str,
passwd: str,
master_pass: str, master_pass: str,
) -> Account: ) -> tuple[bytes, bytes, bytes]:
"""Encrypts account using master password and returns Account object""" """Encrypts login and password of a user using their master
password as a key.
Returns a tuple of encrypted login, password and salt"""
salt = os.urandom(64) salt = os.urandom(64)
key = _generate_key(salt, master_pass.encode("utf-8")) key = _generate_key(salt, master_pass.encode("utf-8"))
f = Fernet(key) f = Fernet(key)
enc_login = base64.urlsafe_b64decode(f.encrypt(login.encode("utf-8")))
enc_login = base64.urlsafe_b64decode( enc_password = base64.urlsafe_b64decode(f.encrypt(passwd.encode("utf-8")))
f.encrypt(account.login.encode("utf-8")), return (enc_login, enc_password, salt)
)
enc_password = base64.urlsafe_b64decode(
f.encrypt(account.password.encode("utf-8")),
)
return Account(
user_id=account.user_id,
name=account.name,
salt=salt,
enc_login=enc_login,
enc_password=enc_password,
)
def decrypt( def decrypt(
account: Account, enc_login: bytes,
enc_pass: bytes,
master_pass: str, master_pass: str,
) -> DecryptedAccount: salt: bytes,
"""Decrypts account using master password and returns ) -> tuple[str, str]:
DecryptedAccount object""" """Decrypts login and password using their
key = _generate_key(account.salt, master_pass.encode("utf-8")) master password as a key.
Returns a tuple of decrypted login and password"""
key = _generate_key(salt, master_pass.encode("utf-8"))
f = Fernet(key) f = Fernet(key)
login = f.decrypt(base64.urlsafe_b64encode(enc_login)).decode("utf-8")
login = f.decrypt( password = f.decrypt(base64.urlsafe_b64encode(enc_pass)).decode("utf-8")
base64.urlsafe_b64encode(account.enc_login), return (login, password)
).decode("utf-8")
password = f.decrypt(
base64.urlsafe_b64encode(account.enc_password),
).decode("utf-8")
return DecryptedAccount(
user_id=account.user_id,
name=account.name,
login=login,
password=password,
)
def decrypt_multiple( def decrypt_multiple(
accounts: Iterable[Account], master_pass: str accounts: Iterator[tuple[str, bytes, bytes, bytes]], master_pass: str
) -> Iterator[DecryptedAccount]: ) -> Iterator[tuple[str, str, str]]:
"""Decrypts an iterable of accounts using master_pass and """Gets an iterator of tuples, where values represent account's name, salt,
returns an Iterator of decrypted accounts""" encrypted login and encrypted password.
Return an iterator of names, logins and passwords as a tuple"""
for account in accounts: for account in accounts:
yield decrypt(account, master_pass) name, salt, enc_login, enc_passwd = account
login, passwd = decrypt(
enc_login,
enc_passwd,
master_pass,
salt,
)
yield (name, login, passwd)