62 Commits

Author SHA1 Message Date
18abdecb74 Increased time untill deletion for /gen_password to 15 secs 2022-11-13 18:53:11 +03:00
46a1fe59b2 Documented /gen_password command 2022-11-13 18:51:23 +03:00
9ee51dc19f Added /gen_password command 2022-11-13 18:49:57 +03:00
2c07b3bed2 Added function for password generation is utils.py 2022-11-13 18:49:14 +03:00
b8113dc47b Sorted imports in utils.py 2022-11-13 18:16:03 +03:00
21bd01c3ed Changed and added comments in the database files 2022-11-10 18:45:10 +03:00
c2280e8bc2 Changed functions for encryption and decryption of accounts to use str of master_pass instead of bytes 2022-11-09 19:02:58 +03:00
f7f954ecd3 Sorted imports 2022-11-09 18:52:59 +03:00
559ee8f6d8 Renamed _memory_use in cryptography into MEMORY_USAGE 2022-11-07 21:02:13 +03:00
a1bed9014d Changed the way the master password hashing works
Switched from Bcrypt to Scrypt for master password hashing
Changed models to use new sizes for hashes and salts, doubled the size of enc_login and enc_passwd for accounts
Created new function to check master password validity
Increased salt sizes for accounts and master passwords
Removed bcrypt from requirements
2022-11-07 16:30:45 +03:00
66ab13b45d Changed handlers.get_accounts to use a guard clause, added comment to clarify, why we are putting account in backticks 2022-11-07 16:29:37 +03:00
da42d7ad1d Changed comments in cryptograpy.other_accounts to be more precise 2022-11-07 16:29:31 +03:00
570f15001e switched mariadb connector to pymysql 2022-11-04 02:17:17 +03:00
b4bf9fbf41 Changed mariadb.Connection to Engine in __init__ of bot 2022-11-04 02:16:47 +03:00
042ca9312e Cleaned up code in handlers
Renamed variables in _set_master_pass2 for consistency
Added a few missing returns in guard clauses
Added file size limit for importing account to 100 MB
2022-11-04 01:20:25 +03:00
b10b1fe2b3 changed to the newer version of mariadb 2022-11-03 16:27:52 +00:00
03646b6156 Added checks to /import 2022-11-03 10:49:00 +03:00
b56ebd0b61 Added account name and account credential checking 2022-11-02 23:11:29 +03:00
adf9865fbe Added ability to copy account name from /get_accounts
That also fixed an issue of having wrong behaviour, if using any special chars in account name
2022-11-02 23:02:20 +03:00
4122e15308 Added data directory to .dockerignore 2022-11-02 18:29:54 +00:00
37a81ed22d Changed change_master_pass to be more readable 2022-11-02 18:34:02 +03:00
fb850e3737 Added ability to import accounts 2022-11-02 14:48:15 +03:00
dbf27d401e Added ability to export accounts to json 2022-11-02 14:48:15 +03:00
373623b0b4 Removed type_ from sqlmodel.Column, because it isn't needed 2022-11-02 14:42:26 +03:00
afc03a6c1f Removed comment which broke code, when uncommented 2022-11-02 14:42:26 +03:00
4ee7f0a609 Added installation of g++ to Dockerfile 2022-11-01 09:26:13 +00:00
ae2b214904 Fixed an error of getting wrong account
I'm now quite shure how it worked before
2022-10-31 21:09:12 +03:00
01ab461d28 Added message of failure to add account 2022-10-31 20:40:53 +03:00
9c095779a5 Fixed a typo in README and changed one of the phrase to be more correct 2022-10-30 20:39:00 +00:00
9f1790b58d Changed message for reseting master password 2022-10-30 23:22:01 +03:00
3c5d309b9a Removed mension of missing conformation for deleting all 2022-10-30 23:20:38 +03:00
3e570dbaff Changed order of operations in the Dockerfile 2022-10-30 23:08:24 +03:00
b7120f2627 Fixed an error of deleting all passwords, instead of 1 with command delete_account 2022-10-30 23:03:26 +03:00
99c24d9917 Added commit statement 2022-10-30 22:57:26 +03:00
2101c302db Added note about ability to copt 2022-10-30 16:53:56 +03:00
ac9d89fb3d made login and password easily copyable 2022-10-30 16:37:24 +03:00
f6b58df6c4 Properly documented help command 2022-10-30 16:30:00 +03:00
69cddd1cbb Remved unnessasary import 2022-10-30 16:28:23 +03:00
88d51785ed Added conformation for deleting all, added cacel command for the case, when there is no active action 2022-10-30 16:24:32 +03:00
731893ad33 Added cancel command 2022-10-30 16:16:04 +03:00
689de06782 Tweaks in handlers to optimize database usage 2022-10-30 16:06:38 +03:00
d65468134a fixed compose.yaml 2022-10-29 22:16:19 +00:00
394990c3ac changed dockerfile to use python3.11 2022-10-29 22:09:16 +00:00
796aa79db0 changed commands in README 2022-10-30 01:07:47 +03:00
dec7a9b7c9 Now commands ask for params in separate messages 2022-10-29 22:04:04 +00:00
fae04547c8 added commits to delete funcs 2022-10-29 22:04:04 +00:00
fab8e18723 Changes to the positon of the telegram token 2022-10-29 22:03:20 +00:00
baef759929 merged functions for master password hashing 2022-10-29 19:35:54 +03:00
a5d93f0d5c Fixed a typo 2022-10-21 19:14:32 +03:00
98b621b39d Изменению README.md, добавил документацию 2022-10-21 19:03:27 +03:00
8128a059df Vadim's preferences 2022-10-14 21:26:20 +03:00
cfd2a24e25 Added help command 2022-10-14 20:16:46 +03:00
b5aebdb101 Added message after account deletion 2022-10-14 20:02:27 +03:00
267e54bb29 Added ability to delete account of a user 2022-10-14 20:01:05 +03:00
b1017082a9 Fixed reset of master password 2022-10-14 19:53:05 +03:00
39e86793a6 Removed print 2022-10-14 19:52:48 +03:00
822061ae49 Removed prints 2022-10-14 19:35:16 +03:00
4702e048f3 Docker files 2022-10-14 18:54:29 +03:00
865c98ad46 Created basic version of the bot 2022-10-14 17:48:54 +03:00
7fe7b23a63 Filled delete.py and created change.py 2022-10-14 17:48:32 +03:00
85129d8b7a Changes to db: fixed error in __init__, looking for more precise errors 2022-10-14 17:12:24 +03:00
b60579ecd6 Added sqlmodel to requirements 2022-10-14 15:42:32 +03:00
19 changed files with 897 additions and 58 deletions

27
.dockerignore Normal file
View File

@ -0,0 +1,27 @@
**/__pycache__
**/.venv
**/.classpath
**/.dockerignore
**/.env
**/.git
**/.gitignore
**/.project
**/.settings
**/.toolstarget
**/.vs
**/.vscode
**/*.*proj.user
**/*.dbmdl
**/*.jfm
**/bin
**/charts
**/docker-compose*
**/compose*
**/Dockerfile*
**/node_modules
**/npm-debug.log
**/obj
**/secrets.dev.yaml
**/values.dev.yaml
README.md
data/

2
.gitignore vendored
View File

@ -160,3 +160,5 @@ cython_debug/
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
# Database data
data/

25
Dockerfile Normal file
View File

@ -0,0 +1,25 @@
FROM python:3.11-slim
# Keeps Python from generating .pyc files in the container
ENV PYTHONDONTWRITEBYTECODE=1
# Turns off buffering for easier container logging
ENV PYTHONUNBUFFERED=1
WORKDIR /app
# Creates new user
RUN adduser -u 1000 --disabled-password --gecos "" appuser && chown -R appuser /app
# Install deps
RUN apt update && apt full-upgrade -y
# Install pip requirements
COPY requirements.txt .
RUN python -m pip install -r requirements.txt
COPY . /app
USER appuser
CMD ["python", "main.py"]

View File

@ -1,18 +1,38 @@
# PassManager
### Цель
## Описание
**Создать бота для сохранения, созания паролей в зашифрованой базе данных.**
Менеджер паролей для телеграм, где все пароли надёжно зашифрованы
### Задачи
## Документация
* Создать телеграм бота
* Научить его работать с БД (MariaDB)
* Зашифровать БД
* Сделать общедоступным
### База данных
### Инструменты
В качестве базы данных данный бот использует mariadb, к которой бот должен иметь доступ всегда
* Язык программирования Python
* PyTelegramBotAPI
* MariaDB
### Команды и их синтаксис
- /set_master_pass - установить мастер пароль
- /add_account - создать аккаунт
- /get_accounts - получить список аккаунтов
- /get_account - получить логин и пароль аккаунта
- /delete_account - удалить аккаунт
- /delete_all - удалить все аккаунты и мастер пароль
- /reset_master_pass- удалить все аккаунты и изменить мастер пароль
- /cancel - отмена текущего действия
- /help - помощь
- /export - получить пароли в json формате
- /import - импортировать пароли из json в файле в таком же формате, как из /export
- /gen_password - создать 10 надёжных паролей
### Настройка
Настройка производится через переменные среды. Их можно прописать в файле .env, если не хотите задавать их каждый раз в случае, если вы не используете Docker
#### Переменные среды
- DB_HOST - ip адрес базы данных
- DB_USER - пользователь базы данных
- DB_PASS - пароль пользователя базы данных
- DB_NAME - название базы данных
- TG_TOKEN - токен телеграма, его следует указать в файле .env в формате TG_TOKEN={токен}

33
compose.yaml Normal file
View File

@ -0,0 +1,33 @@
version: '3.4'
networks:
password_manager: {}
services:
passmanager:
build:
context: .
dockerfile: ./Dockerfile
restart: always
environment:
DB_HOST: db
DB_USER: manager
DB_PASS: passwd123!
DB_NAME: passmanager
TG_TOKEN: ${TG_TOKEN}
depends_on:
- db
networks:
- password_manager
db:
image: stnicolay/mariadb-aria
restart: always
environment:
MYSQL_ROOT_PASSWORD: example123!
MYSQL_DATABASE: passmanager
MYSQL_USER: manager
MYSQL_PASSWORD: passwd123!
volumes:
- ./data:/var/lib/mysql
networks:
- password_manager

View File

@ -1,5 +1,6 @@
bcrypt
cryptography
mariadb
pymysql
python-dotenv
pyTelegramBotAPI
pyTelegramBotAPI
sqlmodel
pydantic

View File

@ -10,8 +10,6 @@ engine: Engine
def main() -> None:
global engine
load_dotenv("./.env")
engine = database.prepare.get_engine(
host=os.getenv("DB_HOST"),
@ -20,5 +18,5 @@ def main() -> None:
db=os.getenv("DB_NAME"),
) # type: ignore
database.prepare.prepare(engine)
bot_ = bot.create_bot(os.getenv("TG_TOKEN"), con) # type: ignore
bot_ = bot.create_bot(os.getenv("TG_TOKEN"), engine) # type: ignore
bot_.infinity_polling()

View File

@ -0,0 +1,52 @@
import functools
from sqlalchemy.future import Engine
import telebot
from . import handlers, utils
__all__ = ["handlers", "utils"]
def create_bot(token: str, engine: Engine) -> telebot.TeleBot:
bot = telebot.TeleBot(token)
bot.register_message_handler(
functools.partial(handlers.set_master_password, bot, engine),
commands=["set_master_pass"],
)
bot.register_message_handler(
functools.partial(handlers.get_account, bot, engine), commands=["get_account"]
)
bot.register_message_handler(
functools.partial(handlers.get_accounts, bot, engine), commands=["get_accounts"]
)
bot.register_message_handler(
functools.partial(handlers.add_account, bot, engine), commands=["add_account"]
)
bot.register_message_handler(
functools.partial(handlers.delete_all, bot, engine), commands=["delete_all"]
)
bot.register_message_handler(
functools.partial(handlers.reset_master_pass, bot, engine),
commands=["reset_master_pass"],
)
bot.register_message_handler(
functools.partial(handlers.delete_account, bot, engine),
commands=["delete_account"],
)
bot.register_message_handler(
functools.partial(handlers.help, bot), commands=["help", "start"]
)
bot.register_message_handler(
functools.partial(handlers.cancel, bot), commands=["cancel"]
)
bot.register_message_handler(
functools.partial(handlers.export, bot, engine), commands=["export"]
)
bot.register_message_handler(
functools.partial(handlers.import_accounts, bot, engine), commands=["import"]
)
bot.register_message_handler(
functools.partial(handlers.gen_password, bot), commands=["gen_password"]
)
return bot

487
src/bot/handlers.py Normal file
View File

@ -0,0 +1,487 @@
import functools
import gc
import time
import telebot
from sqlalchemy.future import Engine
from .. import cryptography, database
from .utils import (
accounts_to_json,
base_handler,
check_account,
check_account_name,
check_login,
check_passwd,
gen_passwd,
get_all_accounts,
json_to_accounts,
send_tmp_message,
)
Message = telebot.types.Message
def get_accounts(
bot: telebot.TeleBot, engine: Engine, mes: telebot.types.Message
) -> None:
base_handler(bot, mes)
accounts = database.get.get_accounts(engine, mes.from_user.id)
if not accounts:
return send_tmp_message(bot, mes.chat.id, "У вас нет аккаунтов")
# Make accounts copyable and escape special chars
accounts = [f"`{account}`" for account in accounts]
send_tmp_message(
bot,
mes.chat.id,
"Ваши аккаунты:\n"
+ "\n".join(accounts)
+ "\nНажмите на название, чтобы скопировать",
30,
)
def delete_all(
bot: telebot.TeleBot, engine: Engine, mes: telebot.types.Message
) -> None:
base_handler(bot, mes)
bot_mes = bot.send_message(
mes.chat.id,
"Вы действительно хотите удалить все ваши аккаунты? Это действие нельзя отменить. Отправьте YES для подтверждения",
)
bot.register_next_step_handler(
mes, functools.partial(_delete_all, bot, engine, bot_mes)
)
def _delete_all(
bot: telebot.TeleBot, engine: Engine, prev_mes: Message, mes: Message
) -> None:
base_handler(bot, mes, prev_mes)
text = mes.text.strip()
if text == "YES":
database.delete.purge_accounts(engine, mes.from_user.id)
database.delete.delete_master_pass(engine, mes.from_user.id)
send_tmp_message(bot, mes.chat.id, "Всё успешно удалено", timeout=10)
else:
send_tmp_message(bot, mes.chat.id, "Вы отправили не YES, ничего не удалено")
def set_master_password(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None:
base_handler(bot, mes, None)
if database.get.get_master_pass(engine, mes.from_user.id) is not None:
return send_tmp_message(bot, mes.chat.id, "Мастер пароль уже существует")
bot_mes = bot.send_message(mes.chat.id, "Отправьте мастер пароль")
bot.register_next_step_handler(
mes, functools.partial(_set_master_pass2, bot, engine, bot_mes)
)
def _set_master_pass2(
bot: telebot.TeleBot, engine: Engine, prev_mes: Message, mes: Message
) -> None:
base_handler(bot, mes, prev_mes)
text = mes.text.strip()
if text == "/cancel":
return send_tmp_message(bot, mes.chat.id, "Успешная отмена")
hash_pass, master_salt = cryptography.master_pass.encrypt_master_pass(text)
database.add.add_master_pass(engine, mes.from_user.id, master_salt, hash_pass)
send_tmp_message(bot, mes.chat.id, "Успех")
del mes, text
gc.collect()
def reset_master_pass(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None:
base_handler(bot, mes)
if database.get.get_master_pass(engine, mes.from_user.id) is None:
return send_tmp_message(bot, mes.chat.id, "Мастер пароль не задан")
bot_mes = bot.send_message(
mes.chat.id,
"Отправьте новый мастер пароль, осторожно, все текущие аккаунты будут удалены навсегда",
)
bot.register_next_step_handler(
mes, functools.partial(_reset_master_pass2, bot, engine, bot_mes)
)
def _reset_master_pass2(
bot: telebot.TeleBot, engine: Engine, prev_mes: Message, mes: Message
) -> None:
base_handler(bot, mes, prev_mes)
text = mes.text.strip()
if text == "/cancel":
return send_tmp_message(bot, mes.chat.id, "Успешная отмена")
hash_, salt = cryptography.master_pass.encrypt_master_pass(text)
database.delete.purge_accounts(engine, mes.from_user.id)
database.change.change_master_pass(engine, mes.from_user.id, salt, hash_)
send_tmp_message(
bot, mes.chat.id, "Все ваши аккаунты удалены, а мастер пароль изменён"
)
del mes, text
gc.collect()
def add_account(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None:
base_handler(bot, mes)
master_password_from_db = database.get.get_master_pass(engine, mes.from_user.id)
if master_password_from_db is None:
return send_tmp_message(bot, mes.chat.id, "Нет мастер пароля")
bot_mes = bot.send_message(mes.chat.id, "Отправьте название аккаунта")
bot.register_next_step_handler(
mes, functools.partial(_add_account2, bot, engine, bot_mes)
)
def _add_account2(
bot: telebot.TeleBot, engine: Engine, prev_mes: Message, mes: Message
) -> None:
base_handler(bot, mes, prev_mes)
text = mes.text.strip()
if text == "/cancel":
return send_tmp_message(bot, mes.chat.id, "Успешная отмена")
if not check_account_name(text):
return send_tmp_message(bot, mes.chat.id, "Не корректное название аккаунта")
if text in database.get.get_accounts(engine, mes.from_user.id):
return send_tmp_message(
bot, mes.chat.id, "Аккаунт с таким именем уже существует"
)
bot_mes = bot.send_message(mes.chat.id, "Отправьте логин")
data = {"name": text}
bot.register_next_step_handler(
mes, functools.partial(_add_account3, bot, engine, bot_mes, data)
)
def _add_account3(
bot: telebot.TeleBot,
engine: Engine,
prev_mes: Message,
data: dict[str, str],
mes: Message,
) -> None:
base_handler(bot, mes, prev_mes)
text = mes.text.strip()
if text == "/cancel":
return send_tmp_message(bot, mes.chat.id, "Успешная отмена")
if not check_login(text):
return send_tmp_message(bot, mes.chat.id, "Не корректный логин")
data["login"] = text
bot_mes = bot.send_message(mes.chat.id, "Отправьте пароль от аккаунта")
bot.register_next_step_handler(
mes, functools.partial(_add_account4, bot, engine, bot_mes, data)
)
def _add_account4(
bot: telebot.TeleBot,
engine: Engine,
prev_mes: Message,
data: dict[str, str],
mes: Message,
) -> None:
base_handler(bot, mes, prev_mes)
text = mes.text.strip()
if text == "/cancel":
return send_tmp_message(bot, mes.chat.id, "Успешная отмена")
if not check_passwd(text):
return send_tmp_message(bot, mes.chat.id, "Не корректный пароль")
data["passwd"] = text
bot_mes = bot.send_message(mes.chat.id, "Отправьте мастер пароль")
bot.register_next_step_handler(
mes, functools.partial(_add_account5, bot, engine, bot_mes, data)
)
def _add_account5(
bot: telebot.TeleBot,
engine: Engine,
prev_mes: Message,
data: dict[str, str],
mes: Message,
) -> None:
base_handler(bot, mes, prev_mes)
text = mes.text.strip()
if text == "/cancel":
return send_tmp_message(bot, mes.chat.id, "Успешная отмена")
salt, hash_ = database.get.get_master_pass(engine, mes.from_user.id)
if not cryptography.master_pass.check_master_pass(text, hash_, salt):
return send_tmp_message(bot, mes.chat.id, "Не подходит главный пароль")
name, login, passwd = data["name"], data["login"], data["passwd"]
enc_login, enc_pass, salt = cryptography.other_accounts.encrypt_account_info(
login, passwd, text
)
result = database.add.add_account(
engine, mes.from_user.id, name, salt, enc_login, enc_pass
)
send_tmp_message(
bot, mes.chat.id, "Успех" if result else "Произошла не предвиденная ошибка"
)
del data, name, login, passwd, enc_login
gc.collect()
def get_account(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None:
base_handler(bot, mes)
bot_mes = bot.send_message(mes.chat.id, "Отправьте название аккаунта")
master_pass = database.get.get_master_pass(engine, mes.from_user.id)
if master_pass is None:
return send_tmp_message(bot, mes.chat.id, "Нет мастер пароля")
bot.register_next_step_handler(
mes, functools.partial(_get_account2, bot, engine, bot_mes)
)
def _get_account2(
bot: telebot.TeleBot, engine: Engine, prev_mes: Message, mes: Message
) -> None:
base_handler(bot, mes, prev_mes)
text = mes.text.strip()
if text == "/cancel":
return send_tmp_message(bot, mes.chat.id, "Успешная отмена")
if text not in database.get.get_accounts(engine, mes.from_user.id):
return send_tmp_message(bot, mes.chat.id, "Нет такого аккаунта")
bot_mes = bot.send_message(mes.chat.id, "Отправьте мастер пароль")
bot.register_next_step_handler(
mes, functools.partial(_get_account3, bot, engine, bot_mes, text)
)
def _get_account3(
bot: telebot.TeleBot, engine: Engine, prev_mes: Message, name: str, mes: Message
) -> None:
base_handler(bot, mes, prev_mes)
text = mes.text.strip()
if text == "/cancel":
return send_tmp_message(bot, mes.chat.id, "Успешная отмена")
master_salt, hash_pass = database.get.get_master_pass(engine, mes.from_user.id)
if not cryptography.master_pass.check_master_pass(text, hash_pass, master_salt):
return send_tmp_message(bot, mes.chat.id, "Не подходит мастер пароль")
salt, enc_login, enc_pass = database.get.get_account_info(
engine, mes.from_user.id, name
)
login, passwd = cryptography.other_accounts.decrypt_account_info(
enc_login, enc_pass, text, salt
)
send_tmp_message(
bot,
mes.chat.id,
f"Логин:\n`{login}`\nПароль:\n`{passwd}`\nНажмите на логин или пароль, чтобы скопировать",
30,
)
del text, mes, passwd, login
gc.collect()
def delete_account(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None:
base_handler(bot, mes)
master_pass = database.get.get_master_pass(engine, mes.from_user.id)
if master_pass is None:
return send_tmp_message(bot, mes.chat.id, "Нет мастер пароля")
bot_mes = bot.send_message(
mes.chat.id, "Отправьте название аккаунта, который вы хотите удалить"
)
bot.register_next_step_handler(
mes, functools.partial(_delete_account2, bot, engine, bot_mes)
)
def _delete_account2(
bot: telebot.TeleBot, engine: Engine, prev_mes: Message, mes: Message
) -> None:
base_handler(bot, mes, prev_mes)
text = mes.text.strip()
if text == "/cancel":
return send_tmp_message(bot, mes.chat.id, "Успешная отмена")
if text not in database.get.get_accounts(engine, mes.from_user.id):
return send_tmp_message(bot, mes.chat.id, "Нет такого аккаунта")
database.delete.delete_account(engine, mes.from_user.id, text)
send_tmp_message(bot, mes.chat.id, "Аккаунт удалён")
def help(bot: telebot.TeleBot, mes: telebot.types.Message) -> None:
message = """Команды:
/set_master_pass - установить мастер пароль
/add_account - создать аккаунт
/get_accounts - получить список аккаунтов
/get_account - получить логин и пароль аккаунта
/delete_account - удалить аккаунт
/delete_all - удалить все аккаунты и мастер пароль
/reset_master_pass - удалить все аккаунты и изменить мастер пароль
/cancel - отмена текущего действия
/help - помощь
/export - получить пароли в json формате
/import - импортировать пароли из json в файле в таком же формате, как из /export
/gen_password - создать 10 надёжных паролей"""
bot.send_message(mes.chat.id, message)
def cancel(bot: telebot.TeleBot, mes: Message) -> None:
send_tmp_message(bot, mes.chat.id, "Нет активного действия")
def export(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None:
base_handler(bot, mes)
master_password_from_db = database.get.get_master_pass(engine, mes.from_user.id)
if master_password_from_db is None:
return send_tmp_message(bot, mes.chat.id, "Нет мастер пароля")
if not database.get.get_accounts(engine, mes.from_user.id):
return send_tmp_message(bot, mes.chat.id, "Нет аккаунтов")
bot_mes = bot.send_message(mes.chat.id, "Отправьте мастер пароль")
bot.register_next_step_handler(
mes, functools.partial(_export2, bot, engine, bot_mes)
)
def _export2(
bot: telebot.TeleBot, engine: Engine, prev_mes: Message, mes: Message
) -> None:
base_handler(bot, mes, prev_mes)
text = mes.text.strip()
if text == "/cancel":
return send_tmp_message(bot, mes.chat.id, "Успешная отмена")
master_salt, hash_pass = database.get.get_master_pass(engine, mes.from_user.id)
if not cryptography.master_pass.check_master_pass(text, hash_pass, master_salt):
return send_tmp_message(bot, mes.chat.id, "Не подходит мастер пароль")
accounts = get_all_accounts(engine, mes.from_user.id, text)
json_io = accounts_to_json(accounts)
bot_mes = bot.send_document(mes.chat.id, json_io)
del text, accounts, json_io
gc.collect()
time.sleep(30)
bot.delete_message(bot_mes.chat.id, bot_mes.id)
def import_accounts(bot: telebot.TeleBot, engine: Engine, mes: Message) -> None:
base_handler(bot, mes)
master_password_from_db = database.get.get_master_pass(engine, mes.from_user.id)
if master_password_from_db is None:
return send_tmp_message(bot, mes.chat.id, "Нет мастер пароля")
bot_mes = bot.send_message(mes.chat.id, "Отправьте json файл")
bot.register_next_step_handler(
mes, functools.partial(_import2, bot, engine, bot_mes)
)
def _import2(
bot: telebot.TeleBot, engine: Engine, prev_mes: Message, mes: Message
) -> None:
base_handler(bot, mes, prev_mes)
if mes.text is not None:
text = mes.text.strip()
if text == "/cancel":
return send_tmp_message(bot, mes.chat.id, "Успешная отмена")
if mes.document is None:
return send_tmp_message(bot, mes.chat.id, "Вы должны отправить документ")
if mes.document.file_size > 102_400: # If file size is bigger that 100 MB
return send_tmp_message(bot, mes.chat.id, "Файл слишком большой")
file_info = bot.get_file(mes.document.file_id)
downloaded_file = bot.download_file(file_info.file_path)
try:
accounts = json_to_accounts(downloaded_file.decode("utf-8"))
except Exception:
return send_tmp_message(bot, mes.chat.id, "Ошибка во время работы с файлом")
bot_mes = bot.send_message(mes.chat.id, "Отправьте мастер пароль")
bot.register_next_step_handler(
mes, functools.partial(_import3, bot, engine, bot_mes, accounts)
)
def _import3(
bot: telebot.TeleBot,
engine: Engine,
prev_mes: Message,
accounts: list[tuple[str, str, str]],
mes: Message,
) -> None:
base_handler(bot, mes, prev_mes)
text = mes.text.strip()
if text == "/cancel":
return send_tmp_message(bot, mes.chat.id, "Успешная отмена")
master_salt, hash_pass = database.get.get_master_pass(engine, mes.from_user.id)
if not cryptography.master_pass.check_master_pass(text, hash_pass, master_salt):
return send_tmp_message(bot, mes.chat.id, "Не подходит мастер пароль")
# List of names of accounts, which failed to be added to the database or failed tests
failed: list[str] = []
for account in accounts:
name, login, passwd = account
if not check_account(name, login, passwd):
failed.append(name)
continue
enc_login, enc_passwd, salt = cryptography.other_accounts.encrypt_account_info(
login, passwd, text
)
result = database.add.add_account(
engine, mes.from_user.id, name, salt, enc_login, enc_passwd
)
if not result:
failed.append(name)
if failed:
mes_text = "Не удалось добавить:\n" + "\n".join(failed)
else:
mes_text = "Успех"
send_tmp_message(bot, mes.chat.id, mes_text, 10)
del text, mes, accounts
gc.collect()
def gen_password(bot: telebot.TeleBot, mes: Message) -> None:
# Generate 10 passwords and put 'em in the backticks
base_handler(bot, mes)
passwords = (f"`{gen_passwd()}`" for _ in range(10))
text = (
"Пароли:\n"
+ "\n".join(passwords)
+ "\nНажмите на пароль, чтобы его скопировать"
)
send_tmp_message(bot, mes.chat.id, text, 15)

122
src/bot/utils.py Normal file
View File

@ -0,0 +1,122 @@
import io
import string
import time
from random import SystemRandom
from typing import Self, Type
import pydantic
import telebot
from sqlalchemy.future import Engine
from .. import cryptography, database
class Account(pydantic.BaseModel):
name: str
login: str
passwd: str
@classmethod
def from_tuple(cls: Type[Self], tuple_: tuple[str, str, str]) -> Self:
return cls(name=tuple_[0], login=tuple_[1], passwd=tuple_[2])
def as_tuple(self: Self) -> tuple[str, str, str]:
return (self.name, self.login, self.passwd)
class _Accounts(pydantic.BaseModel):
accounts: list[Account] = pydantic.Field(default_factory=list)
def _accounts_list_to_json(accounts: list[tuple[str, str, str]]) -> str:
accounts = _Accounts(accounts=[Account.from_tuple(i) for i in accounts])
return accounts.json()
def json_to_accounts(json_: str) -> list[tuple[str, str, str]]:
accounts = _Accounts.parse_raw(json_)
return [i.as_tuple() for i in accounts.accounts]
Message = telebot.types.Message
def send_tmp_message(
bot: telebot.TeleBot, chat_id: telebot.types.Message, text: str, timeout: int = 5
) -> None:
bot_mes = bot.send_message(chat_id, text, parse_mode="MarkdownV2")
time.sleep(timeout)
bot.delete_message(chat_id, bot_mes.id)
def base_handler(
bot: telebot.TeleBot, mes: Message, prev_mes: Message | None = None
) -> None:
bot.delete_message(mes.chat.id, mes.id)
if prev_mes is not None:
bot.delete_message(prev_mes.chat.id, prev_mes.id)
def get_all_accounts(
engine: Engine, user_id: int, master_pass: str
) -> list[tuple[str, str, str]]:
accounts: list[tuple[str, str, str]] = []
for account_name in database.get.get_accounts(engine, user_id):
salt, enc_login, enc_passwd = database.get.get_account_info(
engine, user_id, account_name
)
login, passwd = cryptography.other_accounts.decrypt_account_info(
enc_login, enc_passwd, master_pass, salt
)
accounts.append((account_name, login, passwd))
return accounts
def accounts_to_json(accounts: list[tuple[str, str, str]]) -> io.StringIO:
file = io.StringIO(_accounts_list_to_json(accounts))
file.name = "passwords.json"
return file
def _base_check(val: str) -> bool:
"Returns false if finds new lines or backtick (`)"
return not ("\n" in val or "`" in val)
def check_account_name(name: str) -> bool:
"Returns true if account name is valid"
return _base_check(name)
def check_login(login: str) -> bool:
"Returns true if login is valid"
return _base_check(login)
def check_passwd(passwd: str) -> bool:
"Returns true if password is valid"
return _base_check(passwd)
def check_account(name: str, login: str, passwd: str) -> bool:
"""Runs checks for account name, login and password"""
return check_account_name(name) and check_login(login) and check_passwd(passwd)
def gen_passwd() -> str:
"""Generates password of length 32"""
choices = SystemRandom().choices
chars = frozenset(string.ascii_letters + string.digits + string.punctuation)
# Remove backtick and pipe characters and convert into tuple
chars = tuple(chars.difference("`|"))
while True:
passwd = "".join(choices(chars, k=32))
passwd_chars = frozenset(passwd)
# If there is at least one lowercase character, uppercase character
# and one punctuation character
if (
passwd_chars.intersection(string.ascii_lowercase)
and passwd_chars.intersection(string.ascii_uppercase)
and passwd_chars.intersection(string.punctuation)
):
return passwd

View File

@ -1,14 +1,35 @@
import bcrypt
import os
print("Hi")
from cryptography.exceptions import InvalidKey
from cryptography.hazmat.primitives.kdf.scrypt import Scrypt
MEMORY_USAGE = 2**14
def _get_kdf(salt: bytes) -> Scrypt:
kdf = Scrypt(
salt=salt,
length=128,
n=MEMORY_USAGE,
r=8,
p=1,
)
return kdf
def encrypt_master_pass(passwd: str) -> tuple[bytes, bytes]:
"""Hashes master password and return tuple of hashed password and salt"""
salt = bcrypt.gensalt()
hashed = bcrypt.hashpw(passwd.encode("utf-8"), salt)
return (hashed, salt)
salt = os.urandom(64)
kdf = _get_kdf(salt)
return kdf.derive(passwd.encode("utf-8")), salt
def encrypt_master_pass_known_salt(passwd: str, salt: bytes) -> bytes:
return bcrypt.hashpw(passwd.encode("utf-8"), salt)
def check_master_pass(passwd: str, enc_pass: bytes, salt: bytes) -> bool:
"""Checks if the master password is correct"""
kdf = _get_kdf(salt)
try:
kdf.verify(passwd.encode("utf-8"), enc_pass)
except InvalidKey:
return False
else:
return True

View File

@ -1,6 +1,5 @@
import base64
import bcrypt
import os
from cryptography.fernet import Fernet
from cryptography.hazmat.backends import default_backend
@ -21,12 +20,12 @@ def _generate_key(salt: bytes, master_pass: bytes) -> bytes:
def encrypt_account_info(
login: str, passwd: str, master_pass: bytes
login: str, passwd: str, master_pass: str
) -> tuple[bytes, bytes, bytes]:
"""Encrypts login and password of a user using hash of their master password as a key.
Returns a tuple of encrypted login password and salt"""
salt = bcrypt.gensalt()
key = _generate_key(salt, master_pass)
"""Encrypts login and password of a user using their master password as a key.
Returns a tuple of encrypted login, password and salt"""
salt = os.urandom(64)
key = _generate_key(salt, master_pass.encode("utf-8"))
f = Fernet(key)
enc_login = f.encrypt(login.encode("utf-8"))
enc_passwd = f.encrypt(passwd.encode("utf-8"))
@ -34,9 +33,11 @@ def encrypt_account_info(
def decrypt_account_info(
enc_login: bytes, enc_pass: bytes, master_pass: bytes, salt: bytes
enc_login: bytes, enc_pass: bytes, master_pass: str, salt: bytes
) -> tuple[str, str]:
key = _generate_key(salt, master_pass)
"""Decrypts login and password using their master password as a key.
Returns a tuple of decrypted login and password"""
key = _generate_key(salt, master_pass.encode("utf-8"))
f = Fernet(key)
login_bytes = f.decrypt(enc_login)
pass_bytes = f.decrypt(enc_pass)

View File

@ -1,3 +1,3 @@
from . import add, delete, get, models, prepare
from . import add, delete, get, models, prepare, change
__all__ = ["add", "delete", "get", "models", "prepare"]
__all__ = ["add", "delete", "get", "models", "prepare", "change"]

View File

@ -1,5 +1,5 @@
import sqlmodel
import mariadb
from sqlalchemy.exc import IntegrityError
from sqlalchemy.future import Engine
from . import models
@ -13,7 +13,7 @@ def add_account(
enc_login: bytes,
enc_pass: bytes,
) -> bool:
"""Adds account to db. Returns true, if on success"""
"""Adds account to the database. Returns true on success, false otherwise"""
account = models.Account(
user_id=user_id, name=name, salt=salt, enc_login=enc_login, enc_pass=enc_pass
)
@ -21,20 +21,20 @@ def add_account(
with sqlmodel.Session(engine) as session:
session.add(account)
session.commit()
except Exception:
except IntegrityError:
return False
else:
return True
def add_master_pass(engine: Engine, user_id: int, salt: bytes, passwd: bytes) -> bool:
"""Adds master password to db. Returns true, if on success"""
"""Adds master password the database. Returns true on success, false otherwise"""
master_pass = models.MasterPass(user_id=user_id, salt=salt, passwd=passwd)
try:
with sqlmodel.Session(engine) as session:
session.add(master_pass)
session.commit()
except Exception:
except IntegrityError:
return False
else:
return True

18
src/database/change.py Normal file
View File

@ -0,0 +1,18 @@
import sqlmodel
from sqlalchemy.future import Engine
from . import models
def change_master_pass(
engine: Engine, user_id: int, salt: bytes, passwd: bytes
) -> None:
"""Changes master password and salt in the database"""
statement = (
sqlmodel.update(models.MasterPass)
.where(models.MasterPass.user_id == user_id)
.values(salt=salt, passwd=passwd)
)
with sqlmodel.Session(engine) as session:
session.exec(statement)
session.commit()

View File

@ -0,0 +1,32 @@
import sqlmodel
from sqlalchemy.future import Engine
from . import models
def purge_accounts(engine: Engine, user_id: int) -> None:
"""Deletes all user's accounts"""
statement = sqlmodel.delete(models.Account).where(models.Account.user_id == user_id)
with sqlmodel.Session(engine) as session:
session.exec(statement)
session.commit()
def delete_master_pass(engine: Engine, user_id: int) -> None:
"""Delets master password of the user"""
statement = sqlmodel.delete(models.MasterPass).where(
models.MasterPass.user_id == user_id
)
with sqlmodel.Session(engine) as session:
session.exec(statement)
session.commit()
def delete_account(engine: Engine, user_id: int, name: str) -> None:
"""Deletes specific user account"""
statement = sqlmodel.delete(models.Account).where(
models.Account.user_id == user_id, models.Account.name == name
)
with sqlmodel.Session(engine) as session:
session.exec(statement)
session.commit()

View File

@ -5,13 +5,13 @@ from . import models
def get_master_pass(engine: Engine, user_id: int) -> tuple[bytes, bytes] | None:
"""Gets master pass. Returns tuple of salt and password"""
"""Gets master pass. Returns tuple of salt and password
or None if it wasn't found"""
statement = sqlmodel.select(models.MasterPass).where(
models.MasterPass.user_id == user_id
)
with sqlmodel.Session(engine) as session:
result = session.exec(statement).first()
print(result)
if result is None:
return
return (result.salt, result.passwd)
@ -22,18 +22,19 @@ def get_accounts(engine: Engine, user_id: int) -> list[str]:
statement = sqlmodel.select(models.Account).where(models.Account.user_id == user_id)
with sqlmodel.Session(engine) as session:
result = session.exec(statement)
return [account.name for account in result]
return [account.name for account in result]
def get_account_info(
engine: Engine, user_id: int, name: str
) -> tuple[bytes, bytes, bytes]:
"""Gets account info. Returns tuple of salt, login and password"""
"""Gets account info. Returns tuple of salt, login and password
or None if it wasn't found"""
statement = sqlmodel.select(models.Account).where(
models.Account.user_id == user_id and models.Account.name == name
models.Account.user_id == user_id, models.Account.name == name
)
with sqlmodel.Session(engine) as session:
result = session.exec(statement).first()
if result is None:
return
return (result.salt, result.enc_login, result.enc_pass)
if result is None:
return
return (result.salt, result.enc_login, result.enc_pass)

View File

@ -8,10 +8,10 @@ class MasterPass(sqlmodel.SQLModel, table=True):
id: Optional[int] = sqlmodel.Field(primary_key=True)
user_id: int = sqlmodel.Field(nullable=False, index=True, unique=True)
salt: bytes = sqlmodel.Field(
sa_column=sqlmodel.Column(type_=sqlmodel.VARBINARY(255), nullable=False)
sa_column=sqlmodel.Column(sqlmodel.BINARY(64), nullable=False)
)
passwd: bytes = sqlmodel.Field(
sa_column=sqlmodel.Column(type_=sqlmodel.VARBINARY(255), nullable=False)
sa_column=sqlmodel.Column(sqlmodel.BINARY(128), nullable=False)
)
@ -22,11 +22,11 @@ class Account(sqlmodel.SQLModel, table=True):
user_id: int = sqlmodel.Field(nullable=False, index=True)
name: str = sqlmodel.Field(nullable=False, index=True, max_length=255)
salt: bytes = sqlmodel.Field(
sa_column=sqlmodel.Column(type_=sqlmodel.VARBINARY(255), nullable=False)
sa_column=sqlmodel.Column(sqlmodel.BINARY(64), nullable=False)
)
enc_login: bytes = sqlmodel.Field(
sa_column=sqlmodel.Column(type_=sqlmodel.VARBINARY(255), nullable=False)
sa_column=sqlmodel.Column(sqlmodel.VARBINARY(500), nullable=False)
)
enc_pass: bytes = sqlmodel.Field(
sa_column=sqlmodel.Column(type_=sqlmodel.VARBINARY(255), nullable=False)
sa_column=sqlmodel.Column(sqlmodel.VARBINARY(500), nullable=False)
)

View File

@ -5,12 +5,11 @@ from . import models
def get_engine(host: str, user: str, passwd: str, db: str) -> Engine:
engine = sqlmodel.create_engine(
f"mariadb+mariadbconnector://{user}:{passwd}@{host}/{db}"
)
print(type(engine))
"""Creates an engine for mariadb with pymysql as connector"""
engine = sqlmodel.create_engine(f"mariadb+pymysql://{user}:{passwd}@{host}/{db}")
return engine
def prepare(engine: Engine) -> None:
sqlmodel.SQLModel.metadata.create_all(engine, [models.Account, models.MasterPass])
"""Creates all tables, indexes and constrains in the database"""
sqlmodel.SQLModel.metadata.create_all(engine)