from __future__ import annotations import os import sys from typing import Optional import psycopg2 from psycopg2 import extensions from PyQt6.QtWidgets import ( QApplication, QHBoxLayout, QLabel, QLineEdit, QMessageBox, QPushButton, QTableWidget, QTableWidgetItem, QVBoxLayout, QWidget, ) from cryptography.hazmat.primitives.kdf.scrypt import Scrypt from cryptography.exceptions import InvalidKey def get_kdf(salt: bytes) -> Scrypt: return Scrypt(salt, length=128, n=2**14, r=8, p=1) def encrypt(password: str) -> bytes: salt = os.urandom(64) return (salt, get_kdf(salt).derive(password.encode("UTF-8"))) def check_password(password: str, salt: bytes, expected: bytes) -> bool: try: get_kdf(salt).verify(password.encode("UTF-8"), expected) except InvalidKey: return False else: return True Student = tuple[int, str, str, str] HOST = "127.0.0.1" USER = "tester" PASSWORD = "example123!" PORT = 5432 DATABASE = "testing" class Connection: connection: Optional[extensions.connection] = None @classmethod def get(cls) -> extensions.connection: if cls.connection is not None: return cls.connection try: cls.connection = psycopg2.connect( host=HOST, user=USER, password=PASSWORD, database=DATABASE ) cls.connection.autocommit = True except Exception as e: print(type(e)) raise return cls.connection @classmethod def close(cls) -> None: if cls.connection is not None: cls.connection.close() cls.connection = None @classmethod def get_all_data(cls) -> list[Student]: with cls.get().cursor() as cursor: cursor.execute("SELECT * FROM students ORDER BY id") return cursor.fetchall() @classmethod def get_user(cls, login: str) -> Optional[tuple[bytes, bytes]]: with cls.get().cursor() as cursor: cursor.execute( "SELECT salt, password, admin FROM users WHERE login = %s", (login,), ) result = cursor.fetchone() if result is None: return None return (bytes(result[0]), bytes(result[1]), result[2]) def prepare_database() -> None: with Connection.get().cursor() as cursor: cursor.execute("DROP TABLE IF EXISTS students") cursor.execute( "CREATE TABLE students(" "id INTEGER GENERATED ALWAYS AS IDENTITY PRIMARY KEY," "surname VARCHAR(255)," "name VARCHAR(255)," "group_ VARCHAR(255))" ) cursor.execute("DROP TABLE IF EXISTS users") cursor.execute( "CREATE TABLE users(" "id INTEGER GENERATED ALWAYS AS IDENTITY PRIMARY KEY," "login VARCHAR(255)," "salt BYTEA," "password BYTEA," "admin BOOLEAN)" ) def add_test_data() -> None: with Connection.get().cursor() as cursor: cursor.execute( "INSERT INTO STUDENTS(name, surname, group_) VALUES " "('Nick', 'Stepanov', 'programmer')," "('Maxim', 'Yes', 'programmer')" ) admin_creds = encrypt("admin") user_creds = encrypt("example123!") cursor.execute( "INSERT INTO users(login, salt, password, admin) VALUES " "('admin', %s, %s, true)," "('user', %s, %s, false)", (*admin_creds, *user_creds), ) class Window(QWidget): def __init__(self, is_admin: bool): super().__init__() self.is_admin = is_admin self.setGeometry(200, 200, 700, 400) self.setWindowTitle("SQL Test") self.vbox = QVBoxLayout() self.setLayout(self.vbox) self.need_clean = [] self.update() def update(self, data: Optional[list[Student]] = None): self.clean_widgets() if data is None: data = Connection.get_all_data() hbox_buttons = QHBoxLayout() but1 = QPushButton("Create") but1.clicked.connect(self.button_create_handler) but2 = QPushButton("Edit") but2.clicked.connect(self.button_edit_handler) but3 = QPushButton("Remove") but3.clicked.connect(self.button_remove_handler) but4 = QPushButton("Find") but4.clicked.connect(self.button_find_handler) hbox_buttons.addWidget(but1) hbox_buttons.addWidget(but2) hbox_buttons.addWidget(but3) hbox_buttons.addWidget(but4) self.add_widget_to_need_clean(but1, but2, but3, but4) hbox_id = QHBoxLayout() label_id = QLabel("ID") self.text_id = QLineEdit() hbox_id.addWidget(label_id) hbox_id.addWidget(self.text_id) self.add_widget_to_need_clean(hbox_id, label_id, self.text_id) hbox_surname = QHBoxLayout() label_surname = QLabel("Surname") self.text_surname = QLineEdit() hbox_surname.addWidget(label_surname) hbox_surname.addWidget(self.text_surname) self.add_widget_to_need_clean( hbox_surname, label_surname, self.text_surname, ) hbox_name = QHBoxLayout() label_name = QLabel("Name") self.text_name = QLineEdit() hbox_name.addWidget(label_name) hbox_name.addWidget(self.text_name) self.add_widget_to_need_clean(hbox_name, label_name, self.text_name) hbox_group = QHBoxLayout() label_group = QLabel("Group") self.text_group = QLineEdit() hbox_group.addWidget(label_group) hbox_group.addWidget(self.text_group) self.add_widget_to_need_clean(hbox_group, label_group, self.text_group) hbox_table = QHBoxLayout() table = self.form_table(data) hbox_table.addWidget(table) self.add_widget_to_need_clean(table) self.vbox.addLayout(hbox_id) self.vbox.addLayout(hbox_surname) self.vbox.addLayout(hbox_name) self.vbox.addLayout(hbox_group) self.vbox.addLayout(hbox_buttons) self.vbox.addLayout(hbox_table) def add_widget_to_need_clean(self, *args): for el in args: self.need_clean.append(el) def clean_widgets(self): if self.need_clean: for widget in self.need_clean: widget.deleteLater() self.need_clean.clear() @staticmethod def form_table(data: list[Student]) -> QTableWidget: table = QTableWidget() table.setRowCount(10) table.setColumnCount(4) table.setHorizontalHeaderLabels(("ID", "Surname", "Name", "Group")) for i, row in enumerate(data): for j, column in enumerate(map(str, row)): table.setItem(i, j, QTableWidgetItem(column)) return table def button_edit_handler(self): if not self.is_admin: return QMessageBox().warning( self, "Rights", "You have to be an admin to do that" ) with Connection.get().cursor() as cursor: cursor.execute("SELECT * FROM students;") students = cursor.fetchall() indexes = [] for student in students: indexes.append(str(student[0])) if self.text_id.text() in indexes: commands = [] if self.text_surname.text() != "": commands.append( "UPDATE students SET surname = '" + self.text_surname.text() + "' " + "WHERE id = " + self.text_id.text() + ";" ) if self.text_name.text() != "": commands.append( "UPDATE students SET name = '" + self.text_name.text() + "' " + "WHERE id = " + self.text_id.text() + ";" ) if self.text_group.text() != "": commands.append( "UPDATE students SET gr = '" + self.text_group.text() + "' " + "WHERE id = " + self.text_id.text() + ";" ) with Connection.get().cursor() as cursor: for command in commands: cursor.execute(command) QMessageBox.information(self, "Info", "Data changed") else: QMessageBox.warning(self, "Error", "Data not changed") self.update() def button_create_handler(self): if not self.is_admin: return QMessageBox().warning( self, "Rights", "You have to be an admin to do that" ) name = self.text_name.text() surname = self.text_surname.text() group = self.text_group.text() if not all((surname, name, group)): QMessageBox.warning(self, "Invalid id", "Invalid id") return with Connection.get().cursor() as cursor: cursor.execute( "INSERT INTO students(name, surname, group_) VALUES (%s, %s, %s)", # noqa (name, surname, group), ) self.update() def button_remove_handler(self) -> None: if not self.is_admin: return QMessageBox().warning( self, "Rights", "You have to be an admin to do that" ) try: id = int(self.text_id.text()) except ValueError: QMessageBox.warning(self, "Invalid ID", "Invalid ID") return with Connection.get().cursor() as cursor: cursor.execute("DELETE FROM students WHERE id = %s", (id,)) self.update() def button_find_handler(self) -> None: id = self.text_id.text() name = self.text_name.text() surname = self.text_surname.text() group = self.text_group.text() filters = [] params = [] if id: if not id.isnumeric(): QMessageBox.warning(self, "Invalid id", "Invalid id") return filters.append("id = %s") params.append(id) if name: filters.append("LOWER(name) ~ %s") params.append(name.lower()) if surname: filters.append("LOWER(surname) ~ %s") params.append(surname.lower()) if group: filters.append("LOWER(group_) ~ %s") params.append(group.lower()) with Connection.get().cursor() as cursor: if not params: filters = "" else: filters = f" WHERE {' AND '.join(filters)}" query = f"SELECT * FROM students{filters} ORDER BY id" cursor.execute(query, params) data = cursor.fetchall() self.update(data) class WindowPW(QWidget): def __init__(self) -> None: super().__init__() self.loged_in = False self.is_admin = False self.setGeometry(200, 200, 250, 150) self.setWindowTitle("Login") label = QLabel("Name", self) label.move(10, 10) self.line_login = QLineEdit(self) self.line_login.move(100, 10) label_pw = QLabel("password", self) label_pw.move(10, 50) self.line_pw = QLineEdit(self) self.line_pw.move(100, 50) register_but = QPushButton("Register", self) register_but.move(25, 100) register_but.clicked.connect(self.register_handler) but = QPushButton("OK", self) but.clicked.connect(self.but_handler) but.move(100, 100) def but_handler(self) -> None: login = self.line_login.text().strip() password = self.line_pw.text().strip() user = Connection.get_user(login) if user is None: self.close() return QMessageBox.warning(self, "Bye!", "Wrong creds") salt, expected, admin = Connection.get_user(login) if check_password(password, salt, expected): self.loged_in = True self.is_admin = admin else: QMessageBox.warning(self, "Bye!", "Wrong creds") self.close() def register_handler(self) -> None: login = self.line_login.text().strip() password = self.line_pw.text().strip() if not (login and password): return QMessageBox().warning( self, "Wrong creds", "You must input both login and password" ) with Connection.get().cursor() as cursor: cursor.execute( "SELECT true FROM users WHERE login = %s LIMIT 1", (login,), ) if cursor.fetchone() is not None: return QMessageBox().warning( self, "Wrong creds", "Login already exists" ) cursor.execute( "INSERT INTO users(login, salt, password) VALUES (%s, %s, %s)", (login, *encrypt(password)), ) self.loged_in = True self.close() def main() -> None: prepare_database() add_test_data() app = QApplication(sys.argv) window = WindowPW() window.show() result = app.exec() if not window.loged_in: sys.exit(result) window = Window(window.is_admin) window.show() sys.exit(app.exec()) if __name__ == "__main__": main()