Compare commits

...

91 Commits
Beta ... main

Author SHA1 Message Date
972c5577f4 Added validation to the database models 2023-01-10 21:00:03 +03:00
671286dc39 Removed backend because it wasn't used in encryption.accounts 2023-01-10 20:28:34 +03:00
5dbf93013a Added requesting a master password for deleting accounts, deleting all, reseting master password 2023-01-10 20:10:48 +03:00
c051c14f1f Switched to the ChaCha20Poly1305 encryption algorithm for better security 2023-01-10 20:00:58 +03:00
3686195396 Added a message about exceptions for a user 2023-01-06 17:32:44 +03:00
d79b57b1f0 Added return statement after sending a message about that there is no active message 2023-01-05 14:52:45 +03:00
157c2c4aa2 _import3 no longer blocks event loop during decryption
Now running encrytion of accounts in ProcessPoolExecutor
2023-01-05 14:11:29 +03:00
f4a5f51b23 Made more verbose exception handler for the bot for easier debugging 2023-01-05 13:47:31 +03:00
6bc8eb1413 db.add changes
Added _add_model helper function to reduce code duplication
Added add_accounts for future use
2023-01-05 13:19:01 +03:00
9f64305050 Moved sorting back to the get_accounts 2023-01-05 13:03:44 +03:00
4954f39a91 Added indentation into exported json files 2023-01-05 13:02:50 +03:00
3edeb86b6c Disabled autoincrement in master_passwords table 2023-01-05 13:01:28 +03:00
c7675c231f Fixed sending a final message in /import 2023-01-03 21:07:37 +03:00
ae88fccf13 Updated type hint of the handler in the register_state function 2023-01-03 12:34:54 +03:00
e29eefe40b Changes in helper_functions.delete_message
Removed checking if sleep_time is 0
Moved sleeping outside of try block
2023-01-03 12:29:08 +03:00
fdbed91512 Now account name is copyable 2023-01-03 12:12:40 +03:00
f9d163361a Renamed functions in db.get
get_accounts -> get_account_names
get_all_accounts -> get_accounts
2023-01-03 12:08:06 +03:00
9ec66a3521 Renamed src/database into src/db 2023-01-03 11:59:06 +03:00
70e9afe21d Small reformating in bot.message_handler 2023-01-03 11:48:25 +03:00
5d59adb7d2 Renamed encryption/other_accounts into encryption/accounts 2023-01-01 00:21:53 +03:00
281c4a262b _export2 no longer blocks event loop during decryption
Removed sorting in get_all_accounts
Removed decrypt_multiple function because it is no longer used
Now running decrytion of accounts in ProcessPoolExecutor
2023-01-01 00:18:57 +03:00
74844da4ae Added backslash to the forbidden chars 2022-12-30 13:54:03 +03:00
77be64ed4b sorted imports 2022-12-29 18:22:37 +03:00
5bec51beb2 Added keyboard markup to /delete_account 2022-12-29 15:42:27 +03:00
d5d87a8f3b Quality of life changes
Changed default action of /cancel to be handled at message_handler
Added keyboard markup to get_account for easier account selection
2022-12-29 15:38:38 +03:00
d4c50432d7 Added ability to send message, which can be deleted by pressing a button 2022-12-29 15:25:20 +03:00
0026e3321a Made handlers asynchrounos
Switched from TeleBot class to AsyncTeleBot
2022-12-29 13:58:53 +03:00
8858aa09a7 Added conformation before deleting an account. No longer raising an exception if message deletion fails 2022-12-25 21:34:09 +03:00
e165020111 Added blank lines in encryption.other_accounts for better readability 2022-12-25 21:04:45 +03:00
50eb3057d5 Renamed classes.py into decrypted_accounts.py. Removed unnessesary elements from that file 2022-12-25 21:01:18 +03:00
3f744723a9 Major refactor of the code
A lot of function are now using classes instead of parameters or tuples
isort was added to the dev requirements
Comments were adjusted
2022-12-25 20:12:19 +03:00
bbc9650357 Changed database.get file. Added fetchall method calls to close session sooner 2022-12-25 17:59:26 +03:00
f299173e56 Changes to database.get_accounts
Function is only fetching account names from db and not whole accounts now
Made sorting in that function optional
2022-12-25 17:35:37 +03:00
5991041b35 Added timeout to the deletion of the message in _send_tmp_message 2022-12-16 06:25:00 +00:00
c2eca49933 Switched to building an image localy instead of using one from docker hub
Added database folder
Created Dockerfile and config for mariadb image
Moved data folder tp database/data/
2022-12-15 15:13:39 +00:00
a9417058ee Updated tables, changed primary keys
Removed id fields from tables
Set user_id to be primary key of master_passwords
Set user_id and name to be primary keys of accounts
2022-12-10 23:46:00 +03:00
9690db982e Added info about total amount of accounts for get_acccounts command 2022-12-06 14:28:29 +03:00
025ea868a6 Addedd ensure_ascii=False to exporting Accounst 2022-12-06 14:21:36 +03:00
d82d152fef Added ordering by name for accounts 2022-12-06 14:16:03 +03:00
b0599c1484 Added pool_recycle every hour 2022-12-05 20:10:20 +03:00
eab94e1c01 Added flake8 config to dockerignore 2022-11-30 20:26:32 +03:00
b42cbb57a4 Renamed cryptogra[hy folder into encryption to not overshadow cryptography module 2022-11-30 20:24:46 +03:00
138ec55ae5 Added flake8 to requirements-dev.txt, added flake8 config file 2022-11-30 20:06:04 +03:00
6cd8091fde More code cleanup with a few bug fixes 2022-11-30 19:41:57 +03:00
0d3965d5d2 Renamed columns in tables
MasterPass passwd -> password_hash
Account enc_pass -> enc_password
2022-11-30 17:05:04 +03:00
04bb306751 Added _base_handler call to the cancel function in handlers 2022-11-30 16:53:59 +03:00
2a5b594f3f Renamed gen_password.py into generate_password.py, fixed gen_password command 2022-11-30 16:50:42 +03:00
2d2ed017f1 Removed utils.py, added decrypt_multiple function in other_accounts.py 2022-11-30 16:43:02 +03:00
e9eaa085a2 Moved utils into src directory, moved most of the functions from it into separate files 2022-11-30 16:28:37 +03:00
0463388829 Massive code cleanup 2022-11-30 16:05:33 +03:00
944f23a146 Added new lines in requirements 2022-11-26 19:45:29 +03:00
b4c6e17ce2 Decreased size of enc_login and enc_pass to 256 bytes 2022-11-25 21:21:18 +03:00
2ea3096fb0 encoding and decoding result of fernet to store data better 2022-11-25 21:02:21 +03:00
63de9010de Switched .venv to venv in .dockerignore 2022-11-22 08:08:12 +00:00
e49f2e00eb Made pip upgrade before copying requirements.txt in Dockerfile 2022-11-18 16:38:25 +00:00
fd002e3718 Added command to update pip setuptools and install wheel in Dockerfile 2022-11-18 11:34:04 +00:00
d68a7bb6e8 Moved check of generated password into separate function 2022-11-15 15:39:38 +03:00
d5f3708c50 Refactored utils.py
Created constants FORBIDDEN_CHARS, PUNCTUATION, PASSWORD_CHARS
Removed pipe from forbidden chars
Moved Message type alias to the top of the file
Optimized _base_check and made its parameter positional only
Changed gen_password to use said constants
2022-11-15 15:17:36 +03:00
cc13b35282 Made class Account private in utils.py 2022-11-14 16:58:34 +03:00
3802943225 Changed telebot.types.Message to Message in handlers for old functions 2022-11-14 16:58:34 +03:00
309fb2108b Removed unnessesary import from src.__init__ 2022-11-14 16:58:34 +03:00
9d5c52bebe optimized password generation function 2022-11-14 16:58:26 +03:00
a61deca6fa Removed unnecessary comments and type hints in src.__init__ 2022-11-13 19:49:21 +03:00
18abdecb74 Increased time untill deletion for /gen_password to 15 secs 2022-11-13 18:53:11 +03:00
46a1fe59b2 Documented /gen_password command 2022-11-13 18:51:23 +03:00
9ee51dc19f Added /gen_password command 2022-11-13 18:49:57 +03:00
2c07b3bed2 Added function for password generation is utils.py 2022-11-13 18:49:14 +03:00
b8113dc47b Sorted imports in utils.py 2022-11-13 18:16:03 +03:00
21bd01c3ed Changed and added comments in the database files 2022-11-10 18:45:10 +03:00
c2280e8bc2 Changed functions for encryption and decryption of accounts to use str of master_pass instead of bytes 2022-11-09 19:02:58 +03:00
f7f954ecd3 Sorted imports 2022-11-09 18:52:59 +03:00
559ee8f6d8 Renamed _memory_use in cryptography into MEMORY_USAGE 2022-11-07 21:02:13 +03:00
a1bed9014d Changed the way the master password hashing works
Switched from Bcrypt to Scrypt for master password hashing
Changed models to use new sizes for hashes and salts, doubled the size of enc_login and enc_passwd for accounts
Created new function to check master password validity
Increased salt sizes for accounts and master passwords
Removed bcrypt from requirements
2022-11-07 16:30:45 +03:00
66ab13b45d Changed handlers.get_accounts to use a guard clause, added comment to clarify, why we are putting account in backticks 2022-11-07 16:29:37 +03:00
da42d7ad1d Changed comments in cryptograpy.other_accounts to be more precise 2022-11-07 16:29:31 +03:00
570f15001e switched mariadb connector to pymysql 2022-11-04 02:17:17 +03:00
b4bf9fbf41 Changed mariadb.Connection to Engine in __init__ of bot 2022-11-04 02:16:47 +03:00
042ca9312e Cleaned up code in handlers
Renamed variables in _set_master_pass2 for consistency
Added a few missing returns in guard clauses
Added file size limit for importing account to 100 MB
2022-11-04 01:20:25 +03:00
b10b1fe2b3 changed to the newer version of mariadb 2022-11-03 16:27:52 +00:00
03646b6156 Added checks to /import 2022-11-03 10:49:00 +03:00
b56ebd0b61 Added account name and account credential checking 2022-11-02 23:11:29 +03:00
adf9865fbe Added ability to copy account name from /get_accounts
That also fixed an issue of having wrong behaviour, if using any special chars in account name
2022-11-02 23:02:20 +03:00
4122e15308 Added data directory to .dockerignore 2022-11-02 18:29:54 +00:00
37a81ed22d Changed change_master_pass to be more readable 2022-11-02 18:34:02 +03:00
fb850e3737 Added ability to import accounts 2022-11-02 14:48:15 +03:00
dbf27d401e Added ability to export accounts to json 2022-11-02 14:48:15 +03:00
373623b0b4 Removed type_ from sqlmodel.Column, because it isn't needed 2022-11-02 14:42:26 +03:00
afc03a6c1f Removed comment which broke code, when uncommented 2022-11-02 14:42:26 +03:00
4ee7f0a609 Added installation of g++ to Dockerfile 2022-11-01 09:26:13 +00:00
ae2b214904 Fixed an error of getting wrong account
I'm now quite shure how it worked before
2022-10-31 21:09:12 +03:00
01ab461d28 Added message of failure to add account 2022-10-31 20:40:53 +03:00
40 changed files with 1468 additions and 587 deletions

View File

@ -1,5 +1,5 @@
**/__pycache__
**/.venv
**/venv
**/.classpath
**/.dockerignore
**/.env
@ -24,3 +24,5 @@
**/secrets.dev.yaml
**/values.dev.yaml
README.md
database/
.flake8

2
.flake8 Normal file
View File

@ -0,0 +1,2 @@
[flake8]
exclude=.git,__pycache__,venv

2
.gitignore vendored
View File

@ -161,4 +161,4 @@ cython_debug/
#.idea/
# Database data
data/
database/data/

View File

@ -13,13 +13,11 @@ RUN adduser -u 1000 --disabled-password --gecos "" appuser && chown -R appuser /
# Install deps
RUN apt update && apt full-upgrade -y
RUN apt install curl gcc -y
RUN curl -sS https://downloads.mariadb.com/MariaDB/mariadb_repo_setup | bash
RUN apt install libmariadb3 libmariadb-dev -y
# Install pip requirements
RUN pip install -U pip setuptools wheel
COPY requirements.txt .
RUN python -m pip install -r requirements.txt
RUN pip install -r requirements.txt
COPY . /app

View File

@ -21,6 +21,9 @@
- /reset_master_pass- удалить все аккаунты и изменить мастер пароль
- /cancel - отмена текущего действия
- /help - помощь
- /export - получить пароли в json формате
- /import - импортировать пароли из json в файле в таком же формате, как из /export
- /gen_password - создать 10 надёжных паролей
### Настройка

View File

@ -20,7 +20,9 @@ services:
- password_manager
db:
image: jc21/mariadb-aria
build:
context: ./database/
dockerfile: ./Dockerfile
restart: always
environment:
MYSQL_ROOT_PASSWORD: example123!
@ -28,6 +30,6 @@ services:
MYSQL_USER: manager
MYSQL_PASSWORD: passwd123!
volumes:
- ./data:/var/lib/mysql
- ./database/data:/var/lib/mysql
networks:
- password_manager

3
database/Dockerfile Normal file
View File

@ -0,0 +1,3 @@
FROM mariadb
COPY mariadb.cnf /etc/mysql/mariadb.cnf

38
database/mariadb.cnf Normal file
View File

@ -0,0 +1,38 @@
# The MariaDB configuration file
#
# The MariaDB/MySQL tools read configuration files in the following order:
# 0. "/etc/mysql/my.cnf" symlinks to this file, reason why all the rest is read.
# 1. "/etc/mysql/mariadb.cnf" (this file) to set global defaults,
# 2. "/etc/mysql/conf.d/*.cnf" to set global options.
# 3. "/etc/mysql/mariadb.conf.d/*.cnf" to set MariaDB-only options.
# 4. "~/.my.cnf" to set user-specific options.
#
# If the same option is defined multiple times, the last one will apply.
#
# One can use all long options that the program supports.
# Run program with --help to get a list of available options and with
# --print-defaults to see which it would actually understand and use.
#
# If you are new to MariaDB, check out https://mariadb.com/kb/en/basic-mariadb-articles/
#
# This group is read both by the client and the server
# use it for options that affect everything
#
[client-server]
# Port or socket location where to connect
# port = 3306
socket = /run/mysqld/mysqld.sock
# Import all .cnf files from configuration directory
[mariadbd]
skip-host-cache
skip-name-resolve
[mysqld]
skip-innodb
default-storage-engine=Aria
default-tmp-storage-engine=Aria
!includedir /etc/mysql/mariadb.conf.d/
!includedir /etc/mysql/conf.d/

View File

@ -1 +1,3 @@
black
black
flake8
isort

View File

@ -1,6 +1,7 @@
bcrypt
cryptography
mariadb
pymysql
python-dotenv
pyTelegramBotAPI
sqlmodel
sqlmodel
pydantic
aiohttp

View File

@ -1,22 +1,37 @@
import asyncio
import os
from dotenv import load_dotenv
from sqlalchemy.future import Engine
from . import bot, cryptography, database
from . import (
account_checks,
account_parsing,
bot,
db,
decrypted_account,
encryption,
generate_password,
)
__all__ = ["bot", "cryptography", "database"]
engine: Engine
__all__ = [
"account_checks",
"account_parsing",
"bot",
"decrypted_account",
"encryption",
"db",
"generate_password",
]
def main() -> None:
load_dotenv("./.env")
engine = database.prepare.get_engine(
engine = db.prepare.get_engine(
host=os.getenv("DB_HOST"),
user=os.getenv("DB_USER"),
passwd=os.getenv("DB_PASS"),
db=os.getenv("DB_NAME"),
) # type: ignore
database.prepare.prepare(engine)
bot_ = bot.create_bot(os.getenv("TG_TOKEN"), engine) # type: ignore
bot_.infinity_polling()
)
db.prepare.prepare(engine)
bot_ = bot.create_bot(os.getenv("TG_TOKEN"), engine)
asyncio.run(bot_.infinity_polling())

50
src/account_checks.py Normal file
View File

@ -0,0 +1,50 @@
import string
from .decrypted_account import DecryptedAccount
FORBIDDEN_CHARS = frozenset("`\n\\")
PUNCTUATION = frozenset(string.punctuation).difference(FORBIDDEN_CHARS)
def _base_check(val: str, /) -> bool:
"Returns false if finds new lines or backtick (`)"
return not any(i in FORBIDDEN_CHARS for i in val)
def check_account_name(name: str) -> bool:
"Returns true if account name is valid"
return _base_check(name)
def check_login(login: str) -> bool:
"Returns true if login is valid"
return _base_check(login)
def check_password(passwd: str) -> bool:
"Returns true if password is valid"
return _base_check(passwd)
def check_account(account: DecryptedAccount) -> bool:
"""Runs checks for account name, login and password"""
return all(
(
check_account_name(account.name),
check_login(account.login),
check_password(account.password),
)
)
def check_gened_password(passwd: str, /) -> bool:
"""Retuns true if generated password is valid,
false otherwise.
Password is valid if there is at least one lowercase character,
uppercase character and one punctuation character"""
return (
any(c.islower() for c in passwd)
and any(c.isupper() for c in passwd)
and any(c.isdigit() for c in passwd)
and any(c in PUNCTUATION for c in passwd)
)

50
src/account_parsing.py Normal file
View File

@ -0,0 +1,50 @@
import io
from typing import Iterable, Self
import pydantic
from .decrypted_account import DecryptedAccount
class _Account(pydantic.BaseModel):
name: str
login: str
password: str
def to_usual_account(self, user_id: int) -> DecryptedAccount:
return DecryptedAccount(
user_id=user_id,
name=self.name,
login=self.login,
password=self.password,
)
@classmethod
def from_usual_account(cls, account: DecryptedAccount) -> Self:
return cls(
name=account.name,
login=account.login,
password=account.password,
)
class _Accounts(pydantic.BaseModel):
accounts: list[_Account] = pydantic.Field(default_factory=list)
def _accounts_list_to_json(accounts: Iterable[DecryptedAccount]) -> str:
result = _Accounts(
accounts=[_Account.from_usual_account(i) for i in accounts],
).json(ensure_ascii=False, indent=2)
return result
def json_to_accounts(json_: str, user_id: int) -> list[DecryptedAccount]:
accounts = _Accounts.parse_raw(json_)
return [account.to_usual_account(user_id) for account in accounts.accounts]
def accounts_to_json(accounts: Iterable[DecryptedAccount]) -> io.StringIO:
file = io.StringIO(_accounts_list_to_json(accounts))
file.name = "passwords.json"
return file

View File

@ -1,43 +1,77 @@
import functools
import mariadb
import telebot
from sqlalchemy.future import Engine
from telebot.async_telebot import AsyncTeleBot
from . import handlers
from . import callback_handlers, exception_handler, message_handlers
__all__ = ["handlers"]
__all__ = ["callback_handlers", "exception_handler", "message_handlers"]
def create_bot(token: str, engine: mariadb.Connection) -> telebot.TeleBot:
bot = telebot.TeleBot(token)
def create_bot(token: str, engine: Engine) -> AsyncTeleBot:
bot = AsyncTeleBot(token, exception_handler=exception_handler.Handler)
bot.register_message_handler(
functools.partial(handlers.set_master_password, bot, engine),
functools.partial(message_handlers.set_master_password, bot, engine),
commands=["set_master_pass"],
)
bot.register_message_handler(
functools.partial(handlers.get_account, bot, engine), commands=["get_account"]
functools.partial(message_handlers.get_account, bot, engine),
commands=["get_account"],
)
bot.register_message_handler(
functools.partial(handlers.get_accounts, bot, engine), commands=["get_accounts"]
functools.partial(message_handlers.get_accounts, bot, engine),
commands=["get_accounts"],
)
bot.register_message_handler(
functools.partial(handlers.add_account, bot, engine), commands=["add_account"]
functools.partial(message_handlers.add_account, bot, engine),
commands=["add_account"],
)
bot.register_message_handler(
functools.partial(handlers.delete_all, bot, engine), commands=["delete_all"]
functools.partial(message_handlers.delete_all, bot, engine),
commands=["delete_all"],
)
bot.register_message_handler(
functools.partial(handlers.reset_master_pass, bot, engine),
functools.partial(message_handlers.reset_master_pass, bot, engine),
commands=["reset_master_pass"],
)
bot.register_message_handler(
functools.partial(handlers.delete_account, bot, engine),
functools.partial(message_handlers.delete_account, bot, engine),
commands=["delete_account"],
)
bot.register_message_handler(
functools.partial(handlers.help, bot), commands=["help", "start"]
functools.partial(message_handlers.help_command, bot),
commands=["help", "start"],
)
bot.register_message_handler(
functools.partial(handlers.cancel, bot), commands=["cancel"]
functools.partial(message_handlers.export, bot, engine),
commands=["export"],
)
bot.register_message_handler(
functools.partial(message_handlers.import_accounts, bot, engine),
commands=["import"],
)
bot.register_message_handler(
functools.partial(message_handlers.gen_password, bot),
commands=["gen_password"],
)
bot.register_message_handler(
functools.partial(message_handlers.message_handler, bot),
content_types=[
"text",
"audio",
"document",
"photo",
"sticker",
"video",
"video_note",
"voice",
"location",
"contact",
],
)
bot.register_callback_query_handler(
functools.partial(callback_handlers.delete_message, bot),
lambda call: call.data == "DELETE",
)
return bot

View File

@ -0,0 +1,8 @@
from telebot.async_telebot import AsyncTeleBot
from telebot.types import CallbackQuery as Callback
from . import helper_functions
async def delete_message(bot: AsyncTeleBot, call: Callback) -> None:
await helper_functions.delete_message(bot, call.message)

View File

@ -0,0 +1,8 @@
import traceback
from typing import Type
class Handler:
@staticmethod
def handle(exc: Type[BaseException]) -> None:
traceback.print_exception(exc)

View File

@ -1,331 +0,0 @@
import functools
import gc
import time
import telebot
from sqlalchemy.future import Engine
from .. import cryptography, database
Message = telebot.types.Message
def _send_tmp_message(
bot: telebot.TeleBot, chat_id: telebot.types.Message, text: str, timeout: int = 5
) -> None:
bot_mes = bot.send_message(chat_id, text, parse_mode="MarkdownV2")
time.sleep(timeout)
bot.delete_message(chat_id, bot_mes.id)
def get_accounts(
bot: telebot.TeleBot, engine: Engine, mes: telebot.types.Message
) -> None:
accounts = database.get.get_accounts(engine, mes.from_user.id)
bot.delete_message(mes.chat.id, mes.id)
return _send_tmp_message(
bot,
mes.chat.id,
"Ваши аккаунты:\n" + "\n".join(accounts) if accounts else "У вас нет аккаунтов",
timeout=30,
)
def _base(bot: telebot.TeleBot, mes: Message, prev_mes: Message | None = None) -> None:
bot.delete_message(mes.chat.id, mes.id)
if prev_mes is not None:
bot.delete_message(prev_mes.chat.id, prev_mes.id)
def delete_all(
bot: telebot.TeleBot, engine: Engine, mes: telebot.types.Message
) -> None:
_base(bot, mes)
bot_mes = bot.send_message(
mes.chat.id,
"Вы действительно хотите удалить все ваши аккаунты? Это действие нельзя отменить. Отправьте YES для подтверждения",
)
bot.register_next_step_handler(
mes, functools.partial(_delete_all, bot, engine, bot_mes)
)
def _delete_all(
bot: telebot.TeleBot, engine: Engine, prev_mes: Message, mes: Message
) -> None:
_base(bot, mes, prev_mes)
text = mes.text.strip()
if text == "YES":
database.delete.purge_accounts(engine, mes.from_user.id)
database.delete.delete_master_pass(engine, mes.from_user.id)
_send_tmp_message(bot, mes.chat.id, "Всё успешно удалено", timeout=10)
else:
_send_tmp_message(bot, mes.chat.id, "Вы отправили не YES, ничего не удалено")
def set_master_password(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None:
_base(bot, mes, None)
if database.get.get_master_pass(engine, mes.from_user.id) is not None:
return _send_tmp_message(bot, mes.chat.id, "Мастер пароль уже существует")
bot_mes = bot.send_message(mes.chat.id, "Отправьте мастер пароль")
bot.register_next_step_handler(
mes, functools.partial(_set_master_pass2, bot, engine, bot_mes)
)
def _set_master_pass2(
bot: telebot.TeleBot, engine: Engine, prev_mes: Message, mes: Message
) -> None:
_base(bot, mes, prev_mes)
text = mes.text.strip()
if text == "/cancel":
return _send_tmp_message(bot, mes.chat.id, "Успешная отмена")
hash_, salt = cryptography.master_pass.encrypt_master_pass(text)
database.add.add_master_pass(engine, mes.from_user.id, salt, hash_)
_send_tmp_message(bot, mes.chat.id, "Успех")
del mes, text
gc.collect()
def reset_master_pass(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None:
_base(bot, mes)
if database.get.get_master_pass(engine, mes.from_user.id) is None:
return _send_tmp_message(bot, mes.chat.id, "Мастер пароль не задан")
bot_mes = bot.send_message(
mes.chat.id,
"Отправьте новый мастер пароль, осторожно, все текущие аккаунты будут удалены навсегда",
)
bot.register_next_step_handler(
mes, functools.partial(_reset_master_pass2, bot, engine, bot_mes)
)
def _reset_master_pass2(
bot: telebot.TeleBot, engine: Engine, prev_mes: Message, mes: Message
) -> None:
_base(bot, mes, prev_mes)
text = mes.text.strip()
if text == "/cancel":
return _send_tmp_message(bot, mes.chat.id, "Успешная отмена")
hash_, salt = cryptography.master_pass.encrypt_master_pass(text)
database.delete.purge_accounts(engine, mes.from_user.id)
database.change.change_master_pass(engine, mes.from_user.id, salt, hash_)
_send_tmp_message(
bot, mes.chat.id, "Все ваши аккаунты удалены, а мастер пароль изменён"
)
del mes, text
gc.collect()
def add_account(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None:
_base(bot, mes)
master_password_from_db = database.get.get_master_pass(engine, mes.from_user.id)
if master_password_from_db is None:
return _send_tmp_message(bot, mes.chat.id, "Нет мастер пароля")
bot_mes = bot.send_message(mes.chat.id, "Отправьте название аккаунта")
bot.register_next_step_handler(
mes, functools.partial(_add_account2, bot, engine, bot_mes)
)
def _add_account2(
bot: telebot.TeleBot, engine: Engine, prev_mes: Message, mes: Message
) -> None:
_base(bot, mes, prev_mes)
text = mes.text.strip()
if text == "/cancel":
return _send_tmp_message(bot, mes.chat.id, "Успешная отмена")
if text in database.get.get_accounts(engine, mes.from_user.id):
return _send_tmp_message(
bot, mes.chat.id, "Аккаунт с таким именем уже существует"
)
bot_mes = bot.send_message(mes.chat.id, "Отправьте логин")
data = {"name": text}
bot.register_next_step_handler(
mes, functools.partial(_add_account3, bot, engine, bot_mes, data)
)
def _add_account3(
bot: telebot.TeleBot,
engine: Engine,
prev_mes: Message,
data: dict[str, str],
mes: Message,
) -> None:
_base(bot, mes, prev_mes)
text = mes.text.strip()
if text == "/cancel":
return _send_tmp_message(bot, mes.chat.id, "Успешная отмена")
data["login"] = text
bot_mes = bot.send_message(mes.chat.id, "Отправьте пароль от аккаунта")
bot.register_next_step_handler(
mes, functools.partial(_add_account4, bot, engine, bot_mes, data)
)
def _add_account4(
bot: telebot.TeleBot,
engine: Engine,
prev_mes: Message,
data: dict[str, str],
mes: Message,
) -> None:
_base(bot, mes, prev_mes)
text = mes.text.strip()
if text == "/cancel":
return _send_tmp_message(bot, mes.chat.id, "Успешная отмена")
data["passwd"] = text
bot_mes = bot.send_message(mes.chat.id, "Отправьте мастер пароль")
bot.register_next_step_handler(
mes, functools.partial(_add_account5, bot, engine, bot_mes, data)
)
def _add_account5(
bot: telebot.TeleBot,
engine: Engine,
prev_mes: Message,
data: dict[str, str],
mes: Message,
) -> None:
_base(bot, mes, prev_mes)
text = mes.text.strip()
if text == "/cancel":
return _send_tmp_message(bot, mes.chat.id, "Успешная отмена")
salt, hash_ = database.get.get_master_pass(engine, mes.from_user.id)
if cryptography.master_pass.encrypt_master_pass(text, salt) != hash_:
return _send_tmp_message(bot, mes.chat.id, "Не подходит главный пароль")
name, login, passwd = data["name"], data["login"], data["passwd"]
enc_login, enc_pass, salt = cryptography.other_accounts.encrypt_account_info(
login, passwd, text.encode("utf-8")
)
database.add.add_account(engine, mes.from_user.id, name, salt, enc_login, enc_pass)
_send_tmp_message(bot, mes.chat.id, "Успех")
del data, name, login, passwd, enc_login
gc.collect()
def get_account(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None:
_base(bot, mes)
bot_mes = bot.send_message(mes.chat.id, "Отправьте название аккаунта")
bot.register_next_step_handler(
mes, functools.partial(_get_account2, bot, engine, bot_mes)
)
def _get_account2(
bot: telebot.TeleBot, engine: Engine, prev_mes: Message, mes: Message
) -> None:
_base(bot, mes, prev_mes)
text = mes.text.strip()
if text == "/cancel":
return _send_tmp_message(bot, mes.chat.id, "Успешная отмена")
if text not in database.get.get_accounts(engine, mes.from_user.id):
return _send_tmp_message(bot, mes.chat.id, "Нет такого аккаунта")
bot_mes = bot.send_message(mes.chat.id, "Отправьте мастер пароль")
bot.register_next_step_handler(
mes, functools.partial(_get_account3, bot, engine, bot_mes, text)
)
def _get_account3(
bot: telebot.TeleBot, engine: Engine, prev_mes: Message, name: str, mes: Message
) -> None:
_base(bot, mes, prev_mes)
text = mes.text.strip()
if text == "/cancel":
return _send_tmp_message(bot, mes.chat.id, "Успешная отмена")
master_pass = database.get.get_master_pass(engine, mes.from_user.id)
if master_pass is None:
return _send_tmp_message(bot, mes.chat.id, "Нет мастер пароля")
master_salt, hash_pass = master_pass
if cryptography.master_pass.encrypt_master_pass(text, master_salt) != hash_pass:
return _send_tmp_message(bot, mes.chat.id, "Не подходит мастер пароль")
salt, enc_login, enc_pass = database.get.get_account_info(
engine, mes.from_user.id, name
)
login, passwd = cryptography.other_accounts.decrypt_account_info(
enc_login, enc_pass, text.encode("utf-8"), salt
)
_send_tmp_message(
bot,
mes.chat.id,
f"Логин:\n`{login}`\nПароль:\n`{passwd}`\nНажмите на логин или пароль, чтобы скопировать",
30,
)
del text, mes, passwd, login
gc.collect()
def delete_account(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None:
_base(bot, mes)
bot_mes = bot.send_message(
mes.chat.id, "Отправьте название аккаунта, который вы хотите удалить"
)
bot.register_next_step_handler(
mes, functools.partial(_delete_account2, bot, engine, bot_mes)
)
def _delete_account2(
bot: telebot.TeleBot, engine: Engine, prev_mes: Message, mes: Message
) -> None:
_base(bot, mes, prev_mes)
text = mes.text.strip()
if text == "/cancel":
return _send_tmp_message(bot, mes.chat.id, "Успешная отмена")
if text not in database.get.get_accounts(engine, mes.from_user.id):
return _send_tmp_message(bot, mes.chat.id, "Нет такого аккаунта")
database.delete.delete_account(engine, mes.from_user.id, text)
_send_tmp_message(bot, mes.chat.id, "Аккаунт удалён")
def help(bot: telebot.TeleBot, mes: telebot.types.Message) -> None:
message = """Команды:
/set_master_pass - установить мастер пароль
/add_account - создать аккаунт
/get_accounts - получить список аккаунтов
/get_account - получить логин и пароль аккаунта
/delete_account - удалить аккаунт
/delete_all - удалить все аккаунты и мастер пароль
/reset_master_pass - удалить все аккаунты и изменить мастер пароль
/cancel - отмена текущего действия
/help - помощь"""
bot.send_message(mes.chat.id, message)
def cancel(bot: telebot.TeleBot, mes: Message) -> None:
_send_tmp_message(bot, mes.chat.id, "Нет активного действия")

View File

@ -0,0 +1,87 @@
import asyncio
from typing import Any, Awaitable, Callable
import telebot
from telebot.async_telebot import AsyncTeleBot
from . import markups
Message = telebot.types.Message
Handler = Callable[[Message], Awaitable[Any]]
Markups = (
telebot.types.ReplyKeyboardMarkup
| telebot.types.InlineKeyboardMarkup
| telebot.types.ReplyKeyboardRemove
)
states: dict[tuple[int, int], Handler] = {}
def register_state(
message: Message,
handler: Handler,
) -> None:
states[(message.chat.id, message.from_user.id)] = handler
def reset_state(message: Message) -> None:
try:
del states[(message.chat.id, message.from_user.id)]
except KeyError:
pass
def get_state(message: Message) -> Handler | None:
return states.get((message.chat.id, message.from_user.id))
async def delete_message(
bot: AsyncTeleBot,
mes: Message,
*,
sleep_time: int = 0,
) -> bool:
await asyncio.sleep(sleep_time)
try:
await bot.delete_message(mes.chat.id, mes.id)
except telebot.apihelper.ApiException:
return False
else:
return True
async def send_tmp_message(
bot: AsyncTeleBot,
chat_id: telebot.types.Message,
text: str,
*,
sleep_time: int = 5,
markup: Markups | None = None,
) -> None:
bot_mes = await bot.send_message(
chat_id, text, parse_mode="MarkdownV2", reply_markup=markup
)
await delete_message(bot, bot_mes, sleep_time=sleep_time)
async def base_handler(
bot: AsyncTeleBot, mes: Message, prev_mes: Message | None = None
) -> None:
reset_state(mes)
await delete_message(bot, mes)
if prev_mes is not None:
await delete_message(bot, prev_mes)
async def send_deleteable_message(
bot: AsyncTeleBot,
chat_id: int,
text: str,
) -> None:
"""Sends a message with a delete button"""
markup = markups.deletion_markup()
await bot.send_message(
chat_id,
text,
parse_mode="MarkdownV2",
reply_markup=markup,
)

18
src/bot/markups.py Normal file
View File

@ -0,0 +1,18 @@
from telebot.types import (
InlineKeyboardButton,
InlineKeyboardMarkup,
ReplyKeyboardMarkup,
)
def deletion_markup() -> InlineKeyboardMarkup:
markup = InlineKeyboardMarkup(row_width=5)
button = InlineKeyboardButton("Удалить сообщение", callback_data="DELETE")
markup.add(button)
return markup
def account_markup(account_names: list[str]) -> ReplyKeyboardMarkup:
markup = ReplyKeyboardMarkup(resize_keyboard=True, row_width=2)
markup.add(*account_names)
return markup

761
src/bot/message_handlers.py Normal file
View File

@ -0,0 +1,761 @@
import asyncio
import functools
import gc
import itertools
from concurrent.futures import ProcessPoolExecutor
import telebot
from sqlalchemy.future import Engine
from telebot.async_telebot import AsyncTeleBot
from .. import db, encryption, generate_password
from ..account_checks import (
check_account,
check_account_name,
check_login,
check_password,
)
from ..account_parsing import accounts_to_json, json_to_accounts
from ..decrypted_account import DecryptedAccount
from . import markups
from .helper_functions import (
base_handler,
delete_message,
get_state,
register_state,
send_deleteable_message,
send_tmp_message,
)
Message = telebot.types.Message
async def get_accounts(
bot: AsyncTeleBot,
engine: Engine,
mes: Message,
) -> None:
await base_handler(bot, mes)
accounts = db.get.get_account_names(
engine,
mes.from_user.id,
to_sort=True,
)
if not accounts:
return await send_tmp_message(bot, mes.chat.id, "У вас нет аккаунтов")
# Make accounts copyable and escape special chars
accounts = [f"`{account}`" for account in accounts]
await send_deleteable_message(
bot,
mes.chat.id,
"Ваши аккаунты:\n"
+ "\n".join(accounts)
+ "\nНажмите на название, чтобы скопировать\n"
f"Всего {len(accounts)}",
)
async def delete_all(bot: AsyncTeleBot, engine: Engine, mes: Message) -> None:
await base_handler(bot, mes)
master_pass = db.get.get_master_pass(engine, mes.from_user.id)
if master_pass is None:
await send_tmp_message(bot, mes.chat.id, "У вас нет мастер пароля")
return
bot_mes = await bot.send_message(
mes.chat.id,
"Вы действительно хотите удалить все ваши аккаунты? Это действие "
"нельзя отменить. "
"Отправьте мастер пароль для подтверждения",
)
register_state(
mes, functools.partial(_delete_all2, bot, engine, master_pass, bot_mes)
)
async def _delete_all2(
bot: AsyncTeleBot,
engine: Engine,
master_pass: db.models.MasterPass,
prev_mes: Message,
mes: Message,
) -> None:
await base_handler(bot, mes, prev_mes)
text = mes.text.strip()
if encryption.master_pass.check_master_pass(text, master_pass):
db.delete.purge_accounts(engine, mes.from_user.id)
db.delete.delete_master_pass(engine, mes.from_user.id)
await send_tmp_message(
bot,
mes.chat.id,
"Всё успешно удалено",
sleep_time=10,
)
else:
await send_tmp_message(
bot,
mes.chat.id,
"Вы отправили не верный мастер пароль, ничего не удалено",
)
async def set_master_password(
bot: AsyncTeleBot,
engine: Engine,
mes: Message,
) -> None:
await base_handler(bot, mes, None)
if db.get.get_master_pass(engine, mes.from_user.id) is not None:
return await send_tmp_message(
bot,
mes.chat.id,
"Мастер пароль уже существует",
)
bot_mes = await bot.send_message(mes.chat.id, "Отправьте мастер пароль")
register_state(
mes,
functools.partial(_set_master_pass2, bot, engine, bot_mes),
)
async def _set_master_pass2(
bot: AsyncTeleBot, engine: Engine, prev_mes: Message, mes: Message
) -> None:
await base_handler(bot, mes, prev_mes)
text = mes.text.strip()
if text == "/cancel":
return await send_tmp_message(bot, mes.chat.id, "Успешная отмена")
master_password = encryption.master_pass.encrypt_master_pass(
mes.from_user.id,
text,
)
db.add.add_master_pass(engine, master_password)
await send_tmp_message(bot, mes.chat.id, "Успех")
del mes, text
gc.collect()
async def reset_master_pass(
bot: AsyncTeleBot,
engine: Engine,
mes: Message,
) -> None:
await base_handler(bot, mes)
master_pass = db.get.get_master_pass(engine, mes.from_user.id)
if master_pass is None:
return await send_tmp_message(
bot,
mes.chat.id,
"Мастер пароль не задан",
)
bot_mes = await bot.send_message(
mes.chat.id,
"Отправьте текущий мастер пароль",
)
register_state(
mes,
functools.partial(
_reset_master_pass2,
bot,
engine,
master_pass,
bot_mes,
),
)
async def _reset_master_pass2(
bot: AsyncTeleBot,
engine: Engine,
master_pass: db.models.MasterPass,
prev_mes: Message,
mes: Message,
) -> None:
await base_handler(bot, mes, prev_mes)
text = mes.text.strip()
if text == "/cancel":
await send_tmp_message(bot, mes.chat.id, "Успешная отмена")
if not encryption.master_pass.check_master_pass(text, master_pass):
await send_tmp_message(bot, mes.chat.id, "Неверный мастер пароль")
return
bot_mes = await bot.send_message(
mes.chat.id,
"Отправьте новый мастер пароль. Осторожно, все аккаунты будут удалены",
)
register_state(
mes,
functools.partial(_reset_master_pass3, bot, engine, bot_mes),
)
async def _reset_master_pass3(
bot: AsyncTeleBot, engine: Engine, prev_mes: Message, mes: Message
) -> None:
await base_handler(bot, mes, prev_mes)
text = mes.text.strip()
if text == "/cancel":
return await send_tmp_message(bot, mes.chat.id, "Успешная отмена")
master_password = encryption.master_pass.encrypt_master_pass(
mes.from_user.id,
text,
)
db.delete.purge_accounts(engine, mes.from_user.id)
db.change.change_master_pass(engine, master_password)
await send_tmp_message(
bot, mes.chat.id, "Все ваши аккаунты удалены, а мастер пароль изменён"
)
del mes, text
gc.collect()
async def add_account(bot: AsyncTeleBot, engine: Engine, mes: Message) -> None:
await base_handler(bot, mes)
master_password_from_db = db.get.get_master_pass(
engine,
mes.from_user.id,
)
if master_password_from_db is None:
return await send_tmp_message(bot, mes.chat.id, "Нет мастер пароля")
bot_mes = await bot.send_message(
mes.chat.id,
"Отправьте название аккаунта",
)
register_state(
mes,
functools.partial(_add_account2, bot, engine, bot_mes),
)
async def _add_account2(
bot: AsyncTeleBot, engine: Engine, prev_mes: Message, mes: Message
) -> None:
await base_handler(bot, mes, prev_mes)
text = mes.text.strip()
if text == "/cancel":
return await send_tmp_message(bot, mes.chat.id, "Успешная отмена")
if not check_account_name(text):
return await send_tmp_message(
bot,
mes.chat.id,
"Не корректное название аккаунта",
)
if text in db.get.get_account_names(engine, mes.from_user.id):
return await send_tmp_message(
bot, mes.chat.id, "Аккаунт с таким именем уже существует"
)
bot_mes = await bot.send_message(mes.chat.id, "Отправьте логин")
data = {"name": text}
register_state(
mes,
functools.partial(_add_account3, bot, engine, bot_mes, data),
)
async def _add_account3(
bot: AsyncTeleBot,
engine: Engine,
prev_mes: Message,
data: dict[str, str],
mes: Message,
) -> None:
await base_handler(bot, mes, prev_mes)
text = mes.text.strip()
if text == "/cancel":
return await send_tmp_message(bot, mes.chat.id, "Успешная отмена")
if not check_login(text):
return await send_tmp_message(bot, mes.chat.id, "Не корректный логин")
data["login"] = text
bot_mes = await bot.send_message(
mes.chat.id,
"Отправьте пароль от аккаунта",
)
register_state(
mes,
functools.partial(_add_account4, bot, engine, bot_mes, data),
)
async def _add_account4(
bot: AsyncTeleBot,
engine: Engine,
prev_mes: Message,
data: dict[str, str],
mes: Message,
) -> None:
await base_handler(bot, mes, prev_mes)
text = mes.text.strip()
if text == "/cancel":
return await send_tmp_message(bot, mes.chat.id, "Успешная отмена")
if not check_password(text):
return await send_tmp_message(
bot,
mes.chat.id,
"Не корректный пароль",
)
data["passwd"] = text
bot_mes = await bot.send_message(mes.chat.id, "Отправьте мастер пароль")
register_state(
mes,
functools.partial(_add_account5, bot, engine, bot_mes, data),
)
async def _add_account5(
bot: AsyncTeleBot,
engine: Engine,
prev_mes: Message,
data: dict[str, str],
mes: Message,
) -> None:
await base_handler(bot, mes, prev_mes)
text = mes.text.strip()
if text == "/cancel":
return await send_tmp_message(bot, mes.chat.id, "Успешная отмена")
master_password = db.get.get_master_pass(engine, mes.from_user.id)
if not encryption.master_pass.check_master_pass(text, master_password):
return await send_tmp_message(
bot,
mes.chat.id,
"Не подходит главный пароль",
)
# name, login, passwd = data["name"], data["login"], data["passwd"]
account = DecryptedAccount(
user_id=mes.from_user.id,
name=data["name"],
login=data["login"],
password=data["passwd"],
)
encrypted_account = encryption.accounts.encrypt(account, text)
result = db.add.add_account(
engine,
encrypted_account,
)
await send_tmp_message(
bot,
mes.chat.id,
"Успех" if result else "Произошла не предвиденная ошибка",
)
del data, account
gc.collect()
async def get_account(bot: AsyncTeleBot, engine: Engine, mes: Message) -> None:
await base_handler(bot, mes)
master_pass = db.get.get_master_pass(engine, mes.from_user.id)
if master_pass is None:
return await send_tmp_message(bot, mes.chat.id, "Нет мастер пароля")
accounts = db.get.get_account_names(
engine,
mes.from_user.id,
to_sort=True,
)
markup = markups.account_markup(accounts)
bot_mes = await bot.send_message(
mes.chat.id, "Отправьте название аккаунта", reply_markup=markup
)
register_state(
mes,
functools.partial(_get_account2, bot, engine, bot_mes),
)
async def _get_account2(
bot: AsyncTeleBot, engine: Engine, prev_mes: Message, mes: Message
) -> None:
await base_handler(bot, mes, prev_mes)
text = mes.text.strip()
if text == "/cancel":
return await send_tmp_message(bot, mes.chat.id, "Успешная отмена")
if text not in db.get.get_account_names(engine, mes.from_user.id):
return await send_tmp_message(bot, mes.chat.id, "Нет такого аккаунта")
bot_mes = await bot.send_message(mes.chat.id, "Отправьте мастер пароль")
register_state(
mes,
functools.partial(_get_account3, bot, engine, bot_mes, text),
)
async def _get_account3(
bot: AsyncTeleBot,
engine: Engine,
prev_mes: Message,
name: str,
mes: Message,
) -> None:
await base_handler(bot, mes, prev_mes)
text = mes.text.strip()
if text == "/cancel":
return await send_tmp_message(bot, mes.chat.id, "Успешная отмена")
master_password = db.get.get_master_pass(
engine,
mes.from_user.id,
)
if not encryption.master_pass.check_master_pass(text, master_password):
return await send_tmp_message(
bot,
mes.chat.id,
"Не подходит мастер пароль",
)
account = db.get.get_account_info(engine, mes.from_user.id, name)
account = encryption.accounts.decrypt(account, text)
await send_deleteable_message(
bot,
mes.chat.id,
f"Название:\n`{account.name}`\n"
f"Логин:\n`{account.login}`\nПароль:\n`{account.password}`\nНажмите "
"на название, логин или пароль, чтобы скопировать",
)
del text, mes
gc.collect()
async def delete_account(
bot: AsyncTeleBot,
engine: Engine,
mes: Message,
) -> None:
await base_handler(bot, mes)
master_pass = db.get.get_master_pass(engine, mes.from_user.id)
if master_pass is None:
return await send_tmp_message(bot, mes.chat.id, "Нет мастер пароля")
accounts = db.get.get_account_names(
engine,
mes.from_user.id,
to_sort=True,
)
markup = markups.account_markup(accounts)
bot_mes = await bot.send_message(
mes.chat.id,
"Отправьте название аккаунта, который вы хотите удалить",
reply_markup=markup,
)
register_state(
mes,
functools.partial(_delete_account2, bot, engine, master_pass, bot_mes),
)
async def _delete_account2(
bot: AsyncTeleBot,
engine: Engine,
master_pass: db.models.MasterPass,
prev_mes: Message,
mes: Message,
):
await base_handler(bot, mes, prev_mes)
text = mes.text.strip()
if text == "/cancel":
return await send_tmp_message(bot, mes.chat.id, "Успешная отмена")
if text not in db.get.get_account_names(engine, mes.from_user.id):
return await send_tmp_message(bot, mes.chat.id, "Нет такого аккаунта")
bot_mes = await bot.send_message(
mes.from_user.id,
f'Вы уверены, что хотите удалить аккаунт "{text}"?\nОтправьте мастер '
"пароль для подтверждения",
)
register_state(
mes,
functools.partial(
_delete_account3,
bot,
engine,
master_pass,
bot_mes,
text,
),
)
async def _delete_account3(
bot: AsyncTeleBot,
engine: Engine,
master_pass: db.models.MasterPass,
prev_mes: Message,
account_name: str,
mes: Message,
) -> None:
await base_handler(bot, mes, prev_mes)
text = mes.text.strip()
if not encryption.master_pass.check_master_pass(text, master_pass):
await send_tmp_message(bot, mes.chat.id, "Неверный пароль")
return
db.delete.delete_account(engine, mes.from_user.id, account_name)
await send_tmp_message(bot, mes.chat.id, "Аккаунт удалён")
async def help_command(bot: AsyncTeleBot, mes: Message) -> None:
message = """Команды:
/set_master_pass - установить мастер пароль
/add_account - создать аккаунт
/get_accounts - получить список аккаунтов
/get_account - получить логин и пароль аккаунта
/delete_account - удалить аккаунт
/delete_all - удалить все аккаунты и мастер пароль
/reset_master_pass - удалить все аккаунты и изменить мастер пароль
/cancel - отмена текущего действия
/help - помощь
/export - получить пароли в json формате
/import - импортировать пароли из json в файле в таком же формате, \
как из /export
/gen_password - создать 10 надёжных паролей"""
await bot.send_message(mes.chat.id, message)
async def export(bot: AsyncTeleBot, engine: Engine, mes: Message) -> None:
await base_handler(bot, mes)
master_password_from_db = db.get.get_master_pass(
engine,
mes.from_user.id,
)
if master_password_from_db is None:
return await send_tmp_message(bot, mes.chat.id, "Нет мастер пароля")
if not db.get.get_account_names(engine, mes.from_user.id):
return await send_tmp_message(bot, mes.chat.id, "Нет аккаунтов")
bot_mes = await bot.send_message(mes.chat.id, "Отправьте мастер пароль")
register_state(mes, functools.partial(_export2, bot, engine, bot_mes))
async def _export2(
bot: AsyncTeleBot, engine: Engine, prev_mes: Message, mes: Message
) -> None:
await base_handler(bot, mes, prev_mes)
text = mes.text.strip()
if text == "/cancel":
return await send_tmp_message(bot, mes.chat.id, "Успешная отмена")
master_password = db.get.get_master_pass(
engine,
mes.from_user.id,
)
if not encryption.master_pass.check_master_pass(text, master_password):
return await send_tmp_message(
bot,
mes.chat.id,
"Не подходит мастер пароль",
)
accounts = db.get.get_accounts(engine, mes.from_user.id)
with ProcessPoolExecutor() as pool:
loop = asyncio.get_running_loop()
tasks = []
for account in accounts:
function = functools.partial(
encryption.accounts.decrypt,
account,
text,
)
tasks.append(loop.run_in_executor(pool, function))
accounts = await asyncio.gather(*tasks)
json_io = accounts_to_json(accounts)
await bot.send_document(
mes.chat.id,
json_io,
reply_markup=markups.deletion_markup(),
)
del text, accounts, json_io
gc.collect()
async def import_accounts(
bot: AsyncTeleBot,
engine: Engine,
mes: Message,
) -> None:
await base_handler(bot, mes)
master_password_from_db = db.get.get_master_pass(
engine,
mes.from_user.id,
)
if master_password_from_db is None:
return await send_tmp_message(bot, mes.chat.id, "Нет мастер пароля")
bot_mes = await bot.send_message(mes.chat.id, "Отправьте json файл")
register_state(mes, functools.partial(_import2, bot, engine, bot_mes))
async def _import2(
bot: AsyncTeleBot, engine: Engine, prev_mes: Message, mes: Message
) -> None:
await base_handler(bot, mes, prev_mes)
if mes.text is not None:
text = mes.text.strip()
if text == "/cancel":
return await send_tmp_message(bot, mes.chat.id, "Успешная отмена")
if mes.document is None:
return await send_tmp_message(
bot,
mes.chat.id,
"Вы должны отправить документ",
)
if mes.document.file_size > 102_400: # If file size is bigger than 100 MB
return await send_tmp_message(
bot,
mes.chat.id,
"Файл слишком большой",
)
file_info = await bot.get_file(mes.document.file_id)
downloaded_file = await bot.download_file(file_info.file_path)
try:
accounts = json_to_accounts(
downloaded_file.decode("utf-8"),
mes.from_user.id,
)
except Exception:
return await send_tmp_message(
bot,
mes.chat.id,
"Ошибка во время работы с файлом",
)
bot_mes = await bot.send_message(mes.chat.id, "Отправьте мастер пароль")
register_state(
mes,
functools.partial(_import3, bot, engine, bot_mes, accounts),
)
async def _import3(
bot: AsyncTeleBot,
engine: Engine,
prev_mes: Message,
accounts: list[DecryptedAccount],
mes: Message,
) -> None:
await base_handler(bot, mes, prev_mes)
text = mes.text.strip()
if text == "/cancel":
return await send_tmp_message(bot, mes.chat.id, "Успешная отмена")
master_password = db.get.get_master_pass(
engine,
mes.from_user.id,
)
if not encryption.master_pass.check_master_pass(text, master_password):
return await send_tmp_message(
bot,
mes.chat.id,
"Не подходит мастер пароль",
)
# List of names of accounts, which failed to be added to the database
# or failed the tests
failed: list[str] = []
tasks: list[asyncio.Future[db.models.Account]] = []
loop = asyncio.get_running_loop()
with ProcessPoolExecutor() as pool:
for account in accounts:
if not check_account(account):
failed.append(account.name)
continue
function = functools.partial(
encryption.accounts.encrypt,
account,
text,
)
tasks.append(loop.run_in_executor(pool, function))
enc_accounts: list[db.models.Account] = await asyncio.gather(*tasks)
results = db.add.add_accounts(engine, enc_accounts)
failed_accounts = itertools.compress(
enc_accounts, (not result for result in results)
)
failed.extend((account.name for account in failed_accounts))
if failed:
await send_deleteable_message(
bot, mes.chat.id, "Не удалось добавить:\n" + "\n".join(failed)
)
else:
await send_tmp_message(bot, mes.chat.id, "Успех")
del text, mes, accounts, function, tasks, failed_accounts
gc.collect()
async def gen_password(bot: AsyncTeleBot, mes: Message) -> None:
await base_handler(bot, mes)
# Generate 10 passwords and put them in the backticks
passwords = (f"`{generate_password.gen_password()}`" for _ in range(10))
text = (
"Пароли:\n"
+ "\n".join(passwords)
+ "\nНажмите на пароль, чтобы его скопировать"
)
await send_deleteable_message(bot, mes.chat.id, text)
async def message_handler(bot: AsyncTeleBot, mes: Message) -> None:
handler = get_state(mes)
if handler is None:
await delete_message(bot, mes)
if mes.text.strip() == "/cancel":
await send_tmp_message(bot, mes.chat.id, "Нет активного действия")
return
await send_tmp_message(
bot,
mes.chat.id,
"Вы отправили не корректное сообщение",
)
return
try:
await handler(mes)
except Exception:
await send_tmp_message(
bot,
mes.chat.id,
"Произошла непредвиденная ошибка",
)
raise

View File

@ -1,3 +0,0 @@
from . import master_pass, other_accounts
__all__ = ["master_pass", "other_accounts"]

View File

@ -1,26 +0,0 @@
from typing import overload
import bcrypt
@overload
def encrypt_master_pass(passwd: str, salt: bytes) -> bytes:
...
@overload
def encrypt_master_pass(passwd: str) -> tuple[bytes, bytes]:
...
def encrypt_master_pass(
passwd: str, salt: bytes | None = None
) -> tuple[bytes, bytes] | bytes:
"""Hashes master password and return tuple of hashed password and salt"""
if salt is None:
salt = bcrypt.gensalt()
gened_salt = True
else:
gened_salt = False
hashed = bcrypt.hashpw(passwd.encode("utf-8"), salt)
return (hashed, salt) if gened_salt else hashed

View File

@ -1,43 +0,0 @@
import base64
import bcrypt
from cryptography.fernet import Fernet
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
def _generate_key(salt: bytes, master_pass: bytes) -> bytes:
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=salt,
iterations=100000,
backend=default_backend(),
)
key = base64.urlsafe_b64encode(kdf.derive(master_pass))
return key
def encrypt_account_info(
login: str, passwd: str, master_pass: bytes
) -> tuple[bytes, bytes, bytes]:
"""Encrypts login and password of a user using hash of their master password as a key.
Returns a tuple of encrypted login password and salt"""
salt = bcrypt.gensalt()
key = _generate_key(salt, master_pass)
f = Fernet(key)
enc_login = f.encrypt(login.encode("utf-8"))
enc_passwd = f.encrypt(passwd.encode("utf-8"))
return (enc_login, enc_passwd, salt)
def decrypt_account_info(
enc_login: bytes, enc_pass: bytes, master_pass: bytes, salt: bytes
) -> tuple[str, str]:
key = _generate_key(salt, master_pass)
f = Fernet(key)
login_bytes = f.decrypt(enc_login)
pass_bytes = f.decrypt(enc_pass)
return (login_bytes.decode("utf-8"), pass_bytes.decode("utf-8"))

View File

@ -1,40 +0,0 @@
import sqlmodel
from sqlalchemy.exc import IntegrityError
from sqlalchemy.future import Engine
from . import models
def add_account(
engine: Engine,
user_id: int,
name: str,
salt: bytes,
enc_login: bytes,
enc_pass: bytes,
) -> bool:
"""Adds account to db. Returns true, if on success"""
account = models.Account(
user_id=user_id, name=name, salt=salt, enc_login=enc_login, enc_pass=enc_pass
)
try:
with sqlmodel.Session(engine) as session:
session.add(account)
session.commit()
except IntegrityError:
return False
else:
return True
def add_master_pass(engine: Engine, user_id: int, salt: bytes, passwd: bytes) -> bool:
"""Adds master password to db. Returns true, if on success"""
master_pass = models.MasterPass(user_id=user_id, salt=salt, passwd=passwd)
try:
with sqlmodel.Session(engine) as session:
session.add(master_pass)
session.commit()
except IntegrityError:
return False
else:
return True

View File

@ -1,17 +0,0 @@
import sqlmodel
from sqlalchemy.future import Engine
from . import models
def change_master_pass(
engine: Engine, user_id: int, salt: bytes, passwd: bytes
) -> None:
statement = sqlmodel.update(
models.MasterPass,
models.MasterPass.user_id == user_id,
{"salt": salt, "passwd": passwd},
)
with sqlmodel.Session(engine) as session:
session.exec(statement)
session.commit()

View File

@ -1,38 +0,0 @@
import sqlmodel
from sqlalchemy.future import Engine
from . import models
def get_master_pass(engine: Engine, user_id: int) -> tuple[bytes, bytes] | None:
"""Gets master pass. Returns tuple of salt and password"""
statement = sqlmodel.select(models.MasterPass).where(
models.MasterPass.user_id == user_id
)
with sqlmodel.Session(engine) as session:
result = session.exec(statement).first()
if result is None:
return
return (result.salt, result.passwd)
def get_accounts(engine: Engine, user_id: int) -> list[str]:
"""Gets list of account names"""
statement = sqlmodel.select(models.Account).where(models.Account.user_id == user_id)
with sqlmodel.Session(engine) as session:
result = session.exec(statement)
return [account.name for account in result]
def get_account_info(
engine: Engine, user_id: int, name: str
) -> tuple[bytes, bytes, bytes]:
"""Gets account info. Returns tuple of salt, login and password"""
statement = sqlmodel.select(models.Account).where(
models.Account.user_id == user_id and models.Account.name == name
)
with sqlmodel.Session(engine) as session:
result = session.exec(statement).first()
if result is None:
return
return (result.salt, result.enc_login, result.enc_pass)

View File

@ -1,32 +0,0 @@
from typing import Optional
import sqlmodel
class MasterPass(sqlmodel.SQLModel, table=True):
__tablename__ = "master_passwords"
id: Optional[int] = sqlmodel.Field(primary_key=True)
user_id: int = sqlmodel.Field(nullable=False, index=True, unique=True)
salt: bytes = sqlmodel.Field(
sa_column=sqlmodel.Column(type_=sqlmodel.VARBINARY(255), nullable=False)
)
passwd: bytes = sqlmodel.Field(
sa_column=sqlmodel.Column(type_=sqlmodel.VARBINARY(255), nullable=False)
)
class Account(sqlmodel.SQLModel, table=True):
__tablename__ = "accounts"
__table_args__ = (sqlmodel.UniqueConstraint("user_id", "name"),)
id: Optional[int] = sqlmodel.Field(primary_key=True)
user_id: int = sqlmodel.Field(nullable=False, index=True)
name: str = sqlmodel.Field(nullable=False, index=True, max_length=255)
salt: bytes = sqlmodel.Field(
sa_column=sqlmodel.Column(type_=sqlmodel.VARBINARY(255), nullable=False)
)
enc_login: bytes = sqlmodel.Field(
sa_column=sqlmodel.Column(type_=sqlmodel.VARBINARY(255), nullable=False)
)
enc_pass: bytes = sqlmodel.Field(
sa_column=sqlmodel.Column(type_=sqlmodel.VARBINARY(255), nullable=False)
)

View File

@ -1,18 +0,0 @@
import sqlmodel
from sqlalchemy.future import Engine
from . import models
def get_engine(host: str, user: str, passwd: str, db: str) -> Engine:
engine = sqlmodel.create_engine(
f"mariadb+mariadbconnector://{user}:{passwd}@{host}/{db}"
)
return engine
def prepare(engine: Engine) -> None:
sqlmodel.SQLModel.metadata.create_all(
engine,
# [models.Account, models.MasterPass]
)

View File

@ -1,3 +1,3 @@
from . import add, delete, get, models, prepare, change
from . import add, change, delete, get, models, prepare
__all__ = ["add", "delete", "get", "models", "prepare", "change"]

46
src/db/add.py Normal file
View File

@ -0,0 +1,46 @@
import sqlmodel
from sqlalchemy.exc import IntegrityError
from sqlalchemy.future import Engine
from . import models
def _add_model(
session: sqlmodel.Session, model: models.Account | models.MasterPass
) -> bool:
"""Adds model to the session. Returns true on success,
false otherwise"""
try:
session.add(model)
except IntegrityError:
return False
else:
return True
def add_account(engine: Engine, account: models.Account) -> bool:
"""Adds account to the database. Returns true on success,
false otherwise"""
with sqlmodel.Session(engine) as session:
result = _add_model(session, account)
session.commit()
return result
def add_master_pass(engine: Engine, master_pass: models.MasterPass) -> bool:
"""Adds master password the database. Returns true on success,
false otherwise"""
with sqlmodel.Session(engine) as session:
result = _add_model(session, master_pass)
session.commit()
return result
def add_accounts(
engine: Engine,
accounts: list[models.Account],
) -> list[bool]:
with sqlmodel.Session(engine) as session:
result = [_add_model(session, account) for account in accounts]
session.commit()
return result

22
src/db/change.py Normal file
View File

@ -0,0 +1,22 @@
import sqlmodel
from sqlalchemy.future import Engine
from . import models
def change_master_pass(
engine: Engine,
master_password: models.MasterPass,
) -> None:
"""Changes master password and salt in the database"""
statement = (
sqlmodel.update(models.MasterPass)
.where(models.MasterPass.user_id == master_password.user_id)
.values(
salt=master_password.salt,
password_hash=master_password.password_hash,
)
)
with sqlmodel.Session(engine) as session:
session.exec(statement)
session.commit()

View File

@ -5,13 +5,17 @@ from . import models
def purge_accounts(engine: Engine, user_id: int) -> None:
statement = sqlmodel.delete(models.Account).where(models.Account.user_id == user_id)
"""Deletes all user's accounts"""
statement = sqlmodel.delete(models.Account).where(
models.Account.user_id == user_id,
)
with sqlmodel.Session(engine) as session:
session.exec(statement)
session.commit()
def delete_master_pass(engine: Engine, user_id: int) -> None:
"""Delets master password of the user"""
statement = sqlmodel.delete(models.MasterPass).where(
models.MasterPass.user_id == user_id
)
@ -21,8 +25,10 @@ def delete_master_pass(engine: Engine, user_id: int) -> None:
def delete_account(engine: Engine, user_id: int, name: str) -> None:
"""Deletes specific user account"""
statement = sqlmodel.delete(models.Account).where(
models.Account.user_id == user_id, models.Account.name == name
models.Account.user_id == user_id,
models.Account.name == name,
)
with sqlmodel.Session(engine) as session:
session.exec(statement)

63
src/db/get.py Normal file
View File

@ -0,0 +1,63 @@
import sqlmodel
from sqlalchemy.future import Engine
from . import models
def get_master_pass(
engine: Engine,
user_id: int,
) -> models.MasterPass | None:
"""Gets master password of a user"""
statement = sqlmodel.select(models.MasterPass).where(
models.MasterPass.user_id == user_id,
)
with sqlmodel.Session(engine) as session:
result = session.exec(statement).first()
return result
def get_account_names(
engine: Engine,
user_id: int,
*,
to_sort: bool = False,
) -> list[str]:
"""Gets a list of account names of a user"""
statement = sqlmodel.select(models.Account.name).where(
models.Account.user_id == user_id,
)
if to_sort:
statement = statement.order_by(models.Account.name)
with sqlmodel.Session(engine) as session:
result = session.exec(statement).fetchall()
return result
def get_accounts(engine: Engine, user_id: int) -> list[models.Account]:
"""Returns a list of accounts of a user"""
statement = (
sqlmodel.select(models.Account)
.where(
models.Account.user_id == user_id,
)
.order_by(models.Account.name)
)
with sqlmodel.Session(engine) as session:
result = session.exec(statement).fetchall()
return result
def get_account_info(
engine: Engine,
user_id: int,
name: str,
) -> models.Account:
"""Gets account info"""
statement = sqlmodel.select(models.Account).where(
models.Account.user_id == user_id,
models.Account.name == name,
)
with sqlmodel.Session(engine) as session:
result = session.exec(statement).first()
return result

42
src/db/models.py Normal file
View File

@ -0,0 +1,42 @@
import sqlmodel
class MasterPass(sqlmodel.SQLModel, table=True):
__tablename__ = "master_passwords"
user_id: int = sqlmodel.Field(
sa_column=sqlmodel.Column(
sqlmodel.INT(),
primary_key=True,
autoincrement=False,
)
)
salt: bytes = sqlmodel.Field(
sa_column=sqlmodel.Column(sqlmodel.BINARY(64), nullable=False),
max_length=64,
min_length=64,
)
password_hash: bytes = sqlmodel.Field(
sa_column=sqlmodel.Column(sqlmodel.BINARY(128), nullable=False),
max_length=128,
min_length=128,
)
class Account(sqlmodel.SQLModel, table=True):
__tablename__ = "accounts"
__table_args__ = (sqlmodel.PrimaryKeyConstraint("user_id", "name"),)
user_id: int = sqlmodel.Field()
name: str = sqlmodel.Field(max_length=256)
salt: bytes = sqlmodel.Field(
sa_column=sqlmodel.Column(sqlmodel.BINARY(64), nullable=False),
max_length=64,
min_length=64,
)
enc_login: bytes = sqlmodel.Field(
sa_column=sqlmodel.Column(sqlmodel.VARBINARY(256), nullable=False),
max_length=256,
)
enc_password: bytes = sqlmodel.Field(
sa_column=sqlmodel.Column(sqlmodel.VARBINARY(256), nullable=False),
max_length=256,
)

18
src/db/prepare.py Normal file
View File

@ -0,0 +1,18 @@
import sqlmodel
from sqlalchemy.future import Engine
from . import models # noqa
HOUR_IN_SECONDS = 3600
def get_engine(host: str, user: str, passwd: str, db: str) -> Engine:
"""Creates an engine for mariadb with pymysql as connector"""
uri = f"mariadb+pymysql://{user}:{passwd}@{host}/{db}"
engine = sqlmodel.create_engine(uri, pool_recycle=HOUR_IN_SECONDS)
return engine
def prepare(engine: Engine) -> None:
"""Creates all tables, indexes and constrains in the database"""
sqlmodel.SQLModel.metadata.create_all(engine)

8
src/decrypted_account.py Normal file
View File

@ -0,0 +1,8 @@
import pydantic
class DecryptedAccount(pydantic.BaseModel):
user_id: int
name: str
login: str
password: str

View File

@ -0,0 +1,3 @@
from . import accounts, master_pass
__all__ = ["master_pass", "accounts"]

View File

@ -0,0 +1,79 @@
import os
from typing import Self
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.ciphers.aead import ChaCha20Poly1305
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from ..db.models import Account
from ..decrypted_account import DecryptedAccount
class Cipher:
def __init__(self, key: bytes) -> None:
self._chacha = ChaCha20Poly1305(key)
@classmethod
def generate_cipher(cls, salt: bytes, password: bytes) -> Self:
"""Generates cipher which uses key derived from a given password"""
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=salt,
iterations=100000,
)
return cls(kdf.derive(password))
def encrypt(self, data: bytes) -> bytes:
nonce = os.urandom(12)
return nonce + self._chacha.encrypt(
nonce,
data,
associated_data=None,
)
def decrypt(self, data: bytes) -> bytes:
return self._chacha.decrypt(
nonce=data[:12],
data=data[12:],
associated_data=None,
)
def encrypt(
account: DecryptedAccount,
master_pass: str,
) -> Account:
"""Encrypts account using master password and returns Account object"""
salt = os.urandom(64)
cipher = Cipher.generate_cipher(salt, master_pass.encode("utf-8"))
enc_login = cipher.encrypt(account.login.encode("utf-8"))
enc_password = cipher.encrypt(account.password.encode("utf-8"))
return Account(
user_id=account.user_id,
name=account.name,
salt=salt,
enc_login=enc_login,
enc_password=enc_password,
)
def decrypt(
account: Account,
master_pass: str,
) -> DecryptedAccount:
"""Decrypts account using master password and returns
DecryptedAccount object"""
cipher = Cipher.generate_cipher(account.salt, master_pass.encode("utf-8"))
login = cipher.decrypt(account.enc_login).decode("utf-8")
password = cipher.decrypt(account.enc_password).decode("utf-8")
return DecryptedAccount(
user_id=account.user_id,
name=account.name,
login=login,
password=password,
)

View File

@ -0,0 +1,42 @@
import os
from cryptography.exceptions import InvalidKey
from cryptography.hazmat.primitives.kdf.scrypt import Scrypt
from ..db.models import MasterPass
MEMORY_USAGE = 2**14
def _get_kdf(salt: bytes) -> Scrypt:
kdf = Scrypt(
salt=salt,
length=128,
n=MEMORY_USAGE,
r=8,
p=1,
)
return kdf
def encrypt_master_pass(user_id: int, password: str) -> MasterPass:
"""Hashes master password and returns MasterPass object"""
salt = os.urandom(64)
kdf = _get_kdf(salt)
password_hash = kdf.derive(password.encode("utf-8"))
return MasterPass(
user_id=user_id,
password_hash=password_hash,
salt=salt,
)
def check_master_pass(password: str, master_password: MasterPass) -> bool:
"""Checks if the master password is correct"""
kdf = _get_kdf(master_password.salt)
try:
kdf.verify(password.encode("utf-8"), master_password.password_hash)
except InvalidKey:
return False
else:
return True

18
src/generate_password.py Normal file
View File

@ -0,0 +1,18 @@
import string
from random import SystemRandom
from .account_checks import FORBIDDEN_CHARS, PUNCTUATION, check_gened_password
PASSWORD_CHARS = tuple(
frozenset(string.ascii_letters + string.digits).difference(FORBIDDEN_CHARS)
| PUNCTUATION
)
def gen_password() -> str:
"""Generates password of length 32"""
choices = SystemRandom().choices
while True:
passwd = "".join(choices(PASSWORD_CHARS, k=32))
if check_gened_password(passwd):
return passwd