Compare commits
	
		
			8 Commits
		
	
	
		
			1.0
			...
			46a1fe59b2
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 46a1fe59b2 | |||
| 9ee51dc19f | |||
| 2c07b3bed2 | |||
| b8113dc47b | |||
| 21bd01c3ed | |||
| c2280e8bc2 | |||
| f7f954ecd3 | |||
| 559ee8f6d8 | 
@@ -23,6 +23,7 @@
 | 
			
		||||
- /help - помощь
 | 
			
		||||
- /export - получить пароли в json формате
 | 
			
		||||
- /import - импортировать пароли из json в файле в таком же формате, как из /export
 | 
			
		||||
- /gen_password - создать 10 надёжных паролей
 | 
			
		||||
 | 
			
		||||
### Настройка
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -46,4 +46,7 @@ def create_bot(token: str, engine: Engine) -> telebot.TeleBot:
 | 
			
		||||
    bot.register_message_handler(
 | 
			
		||||
        functools.partial(handlers.import_accounts, bot, engine), commands=["import"]
 | 
			
		||||
    )
 | 
			
		||||
    bot.register_message_handler(
 | 
			
		||||
        functools.partial(handlers.gen_password, bot), commands=["gen_password"]
 | 
			
		||||
    )
 | 
			
		||||
    return bot
 | 
			
		||||
 
 | 
			
		||||
@@ -9,10 +9,11 @@ from .. import cryptography, database
 | 
			
		||||
from .utils import (
 | 
			
		||||
    accounts_to_json,
 | 
			
		||||
    base_handler,
 | 
			
		||||
    check_account,
 | 
			
		||||
    check_account_name,
 | 
			
		||||
    check_login,
 | 
			
		||||
    check_passwd,
 | 
			
		||||
    check_account,
 | 
			
		||||
    gen_passwd,
 | 
			
		||||
    get_all_accounts,
 | 
			
		||||
    json_to_accounts,
 | 
			
		||||
    send_tmp_message,
 | 
			
		||||
@@ -225,7 +226,7 @@ def _add_account5(
 | 
			
		||||
    name, login, passwd = data["name"], data["login"], data["passwd"]
 | 
			
		||||
 | 
			
		||||
    enc_login, enc_pass, salt = cryptography.other_accounts.encrypt_account_info(
 | 
			
		||||
        login, passwd, text.encode("utf-8")
 | 
			
		||||
        login, passwd, text
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
    result = database.add.add_account(
 | 
			
		||||
@@ -288,7 +289,7 @@ def _get_account3(
 | 
			
		||||
        engine, mes.from_user.id, name
 | 
			
		||||
    )
 | 
			
		||||
    login, passwd = cryptography.other_accounts.decrypt_account_info(
 | 
			
		||||
        enc_login, enc_pass, text.encode("utf-8"), salt
 | 
			
		||||
        enc_login, enc_pass, text, salt
 | 
			
		||||
    )
 | 
			
		||||
    send_tmp_message(
 | 
			
		||||
        bot,
 | 
			
		||||
@@ -344,7 +345,8 @@ def help(bot: telebot.TeleBot, mes: telebot.types.Message) -> None:
 | 
			
		||||
/cancel - отмена текущего действия
 | 
			
		||||
/help - помощь
 | 
			
		||||
/export - получить пароли в json формате
 | 
			
		||||
/import - импортировать пароли из json в файле в таком же формате, как из /export"""
 | 
			
		||||
/import - импортировать пароли из json в файле в таком же формате, как из /export
 | 
			
		||||
/gen_password - создать 10 надёжных паролей"""
 | 
			
		||||
    bot.send_message(mes.chat.id, message)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@@ -456,7 +458,7 @@ def _import3(
 | 
			
		||||
            failed.append(name)
 | 
			
		||||
            continue
 | 
			
		||||
        enc_login, enc_passwd, salt = cryptography.other_accounts.encrypt_account_info(
 | 
			
		||||
            login, passwd, text.encode("utf-8")
 | 
			
		||||
            login, passwd, text
 | 
			
		||||
        )
 | 
			
		||||
        result = database.add.add_account(
 | 
			
		||||
            engine, mes.from_user.id, name, salt, enc_login, enc_passwd
 | 
			
		||||
@@ -471,3 +473,15 @@ def _import3(
 | 
			
		||||
    send_tmp_message(bot, mes.chat.id, mes_text, 10)
 | 
			
		||||
    del text, mes, accounts
 | 
			
		||||
    gc.collect()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def gen_password(bot: telebot.TeleBot, mes: Message) -> None:
 | 
			
		||||
    # Generate 10 passwords and put 'em in the backticks
 | 
			
		||||
    base_handler(bot, mes)
 | 
			
		||||
    passwords = (f"`{gen_passwd()}`" for _ in range(10))
 | 
			
		||||
    text = (
 | 
			
		||||
        "Пароли:\n"
 | 
			
		||||
        + "\n".join(passwords)
 | 
			
		||||
        + "\nНажмите на пароль, чтобы его скопировать"
 | 
			
		||||
    )
 | 
			
		||||
    send_tmp_message(bot, mes.chat.id, text)
 | 
			
		||||
 
 | 
			
		||||
@@ -1,12 +1,14 @@
 | 
			
		||||
import io
 | 
			
		||||
import string
 | 
			
		||||
import time
 | 
			
		||||
from random import SystemRandom
 | 
			
		||||
from typing import Self, Type
 | 
			
		||||
 | 
			
		||||
import pydantic
 | 
			
		||||
import telebot
 | 
			
		||||
from sqlalchemy.future import Engine
 | 
			
		||||
 | 
			
		||||
from .. import database, cryptography
 | 
			
		||||
from .. import cryptography, database
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class Account(pydantic.BaseModel):
 | 
			
		||||
@@ -59,7 +61,6 @@ def get_all_accounts(
 | 
			
		||||
    engine: Engine, user_id: int, master_pass: str
 | 
			
		||||
) -> list[tuple[str, str, str]]:
 | 
			
		||||
    accounts: list[tuple[str, str, str]] = []
 | 
			
		||||
    master_pass = master_pass.encode("utf-8")
 | 
			
		||||
    for account_name in database.get.get_accounts(engine, user_id):
 | 
			
		||||
        salt, enc_login, enc_passwd = database.get.get_account_info(
 | 
			
		||||
            engine, user_id, account_name
 | 
			
		||||
@@ -100,3 +101,22 @@ def check_passwd(passwd: str) -> bool:
 | 
			
		||||
def check_account(name: str, login: str, passwd: str) -> bool:
 | 
			
		||||
    """Runs checks for account name, login and password"""
 | 
			
		||||
    return check_account_name(name) and check_login(login) and check_passwd(passwd)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def gen_passwd() -> str:
 | 
			
		||||
    """Generates password of length 32"""
 | 
			
		||||
    choices = SystemRandom().choices
 | 
			
		||||
    chars = frozenset(string.ascii_letters + string.digits + string.punctuation)
 | 
			
		||||
    # Remove backtick and pipe characters and convert into tuple
 | 
			
		||||
    chars = tuple(chars.difference("`|"))
 | 
			
		||||
    while True:
 | 
			
		||||
        passwd = "".join(choices(chars, k=32))
 | 
			
		||||
        passwd_chars = frozenset(passwd)
 | 
			
		||||
        # If there is at least one lowercase character, uppercase character
 | 
			
		||||
        # and one punctuation character
 | 
			
		||||
        if (
 | 
			
		||||
            passwd_chars.intersection(string.ascii_lowercase)
 | 
			
		||||
            and passwd_chars.intersection(string.ascii_uppercase)
 | 
			
		||||
            and passwd_chars.intersection(string.punctuation)
 | 
			
		||||
        ):
 | 
			
		||||
            return passwd
 | 
			
		||||
 
 | 
			
		||||
@@ -3,14 +3,14 @@ import os
 | 
			
		||||
from cryptography.exceptions import InvalidKey
 | 
			
		||||
from cryptography.hazmat.primitives.kdf.scrypt import Scrypt
 | 
			
		||||
 | 
			
		||||
_memory_use = 2**14
 | 
			
		||||
MEMORY_USAGE = 2**14
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def _get_kdf(salt: bytes) -> Scrypt:
 | 
			
		||||
    kdf = Scrypt(
 | 
			
		||||
        salt=salt,
 | 
			
		||||
        length=128,
 | 
			
		||||
        n=_memory_use,
 | 
			
		||||
        n=MEMORY_USAGE,
 | 
			
		||||
        r=8,
 | 
			
		||||
        p=1,
 | 
			
		||||
    )
 | 
			
		||||
 
 | 
			
		||||
@@ -20,12 +20,12 @@ def _generate_key(salt: bytes, master_pass: bytes) -> bytes:
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def encrypt_account_info(
 | 
			
		||||
    login: str, passwd: str, master_pass: bytes
 | 
			
		||||
    login: str, passwd: str, master_pass: str
 | 
			
		||||
) -> tuple[bytes, bytes, bytes]:
 | 
			
		||||
    """Encrypts login and password of a user using their master password as a key.
 | 
			
		||||
    Returns a tuple of encrypted login, password and salt"""
 | 
			
		||||
    salt = os.urandom(64)
 | 
			
		||||
    key = _generate_key(salt, master_pass)
 | 
			
		||||
    key = _generate_key(salt, master_pass.encode("utf-8"))
 | 
			
		||||
    f = Fernet(key)
 | 
			
		||||
    enc_login = f.encrypt(login.encode("utf-8"))
 | 
			
		||||
    enc_passwd = f.encrypt(passwd.encode("utf-8"))
 | 
			
		||||
@@ -33,11 +33,11 @@ def encrypt_account_info(
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def decrypt_account_info(
 | 
			
		||||
    enc_login: bytes, enc_pass: bytes, master_pass: bytes, salt: bytes
 | 
			
		||||
    enc_login: bytes, enc_pass: bytes, master_pass: str, salt: bytes
 | 
			
		||||
) -> tuple[str, str]:
 | 
			
		||||
    """Decrypts login and password using their master password as a key.
 | 
			
		||||
    Returns a tuple of decrypted login and password"""
 | 
			
		||||
    key = _generate_key(salt, master_pass)
 | 
			
		||||
    key = _generate_key(salt, master_pass.encode("utf-8"))
 | 
			
		||||
    f = Fernet(key)
 | 
			
		||||
    login_bytes = f.decrypt(enc_login)
 | 
			
		||||
    pass_bytes = f.decrypt(enc_pass)
 | 
			
		||||
 
 | 
			
		||||
@@ -13,7 +13,7 @@ def add_account(
 | 
			
		||||
    enc_login: bytes,
 | 
			
		||||
    enc_pass: bytes,
 | 
			
		||||
) -> bool:
 | 
			
		||||
    """Adds account to db. Returns true, if on success"""
 | 
			
		||||
    """Adds account to the database. Returns true on success, false otherwise"""
 | 
			
		||||
    account = models.Account(
 | 
			
		||||
        user_id=user_id, name=name, salt=salt, enc_login=enc_login, enc_pass=enc_pass
 | 
			
		||||
    )
 | 
			
		||||
@@ -28,7 +28,7 @@ def add_account(
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def add_master_pass(engine: Engine, user_id: int, salt: bytes, passwd: bytes) -> bool:
 | 
			
		||||
    """Adds master password to db. Returns true, if on success"""
 | 
			
		||||
    """Adds master password the database. Returns true on success, false otherwise"""
 | 
			
		||||
    master_pass = models.MasterPass(user_id=user_id, salt=salt, passwd=passwd)
 | 
			
		||||
    try:
 | 
			
		||||
        with sqlmodel.Session(engine) as session:
 | 
			
		||||
 
 | 
			
		||||
@@ -7,6 +7,7 @@ from . import models
 | 
			
		||||
def change_master_pass(
 | 
			
		||||
    engine: Engine, user_id: int, salt: bytes, passwd: bytes
 | 
			
		||||
) -> None:
 | 
			
		||||
    """Changes master password and salt in the database"""
 | 
			
		||||
    statement = (
 | 
			
		||||
        sqlmodel.update(models.MasterPass)
 | 
			
		||||
        .where(models.MasterPass.user_id == user_id)
 | 
			
		||||
 
 | 
			
		||||
@@ -5,6 +5,7 @@ from . import models
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def purge_accounts(engine: Engine, user_id: int) -> None:
 | 
			
		||||
    """Deletes all user's accounts"""
 | 
			
		||||
    statement = sqlmodel.delete(models.Account).where(models.Account.user_id == user_id)
 | 
			
		||||
    with sqlmodel.Session(engine) as session:
 | 
			
		||||
        session.exec(statement)
 | 
			
		||||
@@ -12,6 +13,7 @@ def purge_accounts(engine: Engine, user_id: int) -> None:
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def delete_master_pass(engine: Engine, user_id: int) -> None:
 | 
			
		||||
    """Delets master password of the user"""
 | 
			
		||||
    statement = sqlmodel.delete(models.MasterPass).where(
 | 
			
		||||
        models.MasterPass.user_id == user_id
 | 
			
		||||
    )
 | 
			
		||||
@@ -21,6 +23,7 @@ def delete_master_pass(engine: Engine, user_id: int) -> None:
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def delete_account(engine: Engine, user_id: int, name: str) -> None:
 | 
			
		||||
    """Deletes specific user account"""
 | 
			
		||||
    statement = sqlmodel.delete(models.Account).where(
 | 
			
		||||
        models.Account.user_id == user_id, models.Account.name == name
 | 
			
		||||
    )
 | 
			
		||||
 
 | 
			
		||||
@@ -5,7 +5,8 @@ from . import models
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def get_master_pass(engine: Engine, user_id: int) -> tuple[bytes, bytes] | None:
 | 
			
		||||
    """Gets master pass. Returns tuple of salt and password"""
 | 
			
		||||
    """Gets master pass. Returns tuple of salt and password
 | 
			
		||||
    or None if it wasn't found"""
 | 
			
		||||
    statement = sqlmodel.select(models.MasterPass).where(
 | 
			
		||||
        models.MasterPass.user_id == user_id
 | 
			
		||||
    )
 | 
			
		||||
@@ -27,7 +28,8 @@ def get_accounts(engine: Engine, user_id: int) -> list[str]:
 | 
			
		||||
def get_account_info(
 | 
			
		||||
    engine: Engine, user_id: int, name: str
 | 
			
		||||
) -> tuple[bytes, bytes, bytes]:
 | 
			
		||||
    """Gets account info. Returns tuple of salt, login and password"""
 | 
			
		||||
    """Gets account info. Returns tuple of salt, login and password
 | 
			
		||||
    or None if it wasn't found"""
 | 
			
		||||
    statement = sqlmodel.select(models.Account).where(
 | 
			
		||||
        models.Account.user_id == user_id, models.Account.name == name
 | 
			
		||||
    )
 | 
			
		||||
 
 | 
			
		||||
@@ -5,9 +5,11 @@ from . import models
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def get_engine(host: str, user: str, passwd: str, db: str) -> Engine:
 | 
			
		||||
    """Creates an engine for mariadb with pymysql as connector"""
 | 
			
		||||
    engine = sqlmodel.create_engine(f"mariadb+pymysql://{user}:{passwd}@{host}/{db}")
 | 
			
		||||
    return engine
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def prepare(engine: Engine) -> None:
 | 
			
		||||
    """Creates all tables, indexes and constrains in the database"""
 | 
			
		||||
    sqlmodel.SQLModel.metadata.create_all(engine)
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user