This repository has been archived on 2024-08-23. You can view files and clone it, but cannot push or open issues or pull requests.
lessons/Python/sql/main.py

420 lines
14 KiB
Python
Executable File

from __future__ import annotations
import os
import sys
from typing import Optional
import psycopg2
from psycopg2 import extensions
from PyQt6.QtWidgets import (
QApplication,
QHBoxLayout,
QLabel,
QLineEdit,
QMessageBox,
QPushButton,
QTableWidget,
QTableWidgetItem,
QVBoxLayout,
QWidget,
)
from cryptography.hazmat.primitives.kdf.scrypt import Scrypt
from cryptography.exceptions import InvalidKey
def get_kdf(salt: bytes) -> Scrypt:
return Scrypt(salt, length=128, n=2**14, r=8, p=1)
def encrypt(password: str) -> bytes:
salt = os.urandom(64)
return (salt, get_kdf(salt).derive(password.encode("UTF-8")))
def check_password(password: str, salt: bytes, expected: bytes) -> bool:
try:
get_kdf(salt).verify(password.encode("UTF-8"), expected)
except InvalidKey:
return False
else:
return True
Student = tuple[int, str, str, str]
HOST = "127.0.0.1"
USER = "tester"
PASSWORD = "example123!"
PORT = 5432
DATABASE = "testing"
class Connection:
connection: Optional[extensions.connection] = None
@classmethod
def get(cls) -> extensions.connection:
if cls.connection is not None:
return cls.connection
try:
cls.connection = psycopg2.connect(
host=HOST, user=USER, password=PASSWORD, database=DATABASE
)
cls.connection.autocommit = True
except Exception as e:
print(type(e))
raise
return cls.connection
@classmethod
def close(cls) -> None:
if cls.connection is not None:
cls.connection.close()
cls.connection = None
@classmethod
def get_all_data(cls) -> list[Student]:
with cls.get().cursor() as cursor:
cursor.execute("SELECT * FROM students ORDER BY id")
return cursor.fetchall()
@classmethod
def get_user(cls, login: str) -> Optional[tuple[bytes, bytes]]:
with cls.get().cursor() as cursor:
cursor.execute(
"SELECT salt, password, admin FROM users WHERE login = %s",
(login,),
)
result = cursor.fetchone()
if result is None:
return None
return (bytes(result[0]), bytes(result[1]), result[2])
def prepare_database() -> None:
with Connection.get().cursor() as cursor:
cursor.execute("DROP TABLE IF EXISTS students")
cursor.execute(
"CREATE TABLE students("
"id INTEGER GENERATED ALWAYS AS IDENTITY PRIMARY KEY,"
"surname VARCHAR(255),"
"name VARCHAR(255),"
"group_ VARCHAR(255))"
)
cursor.execute("DROP TABLE IF EXISTS users")
cursor.execute(
"CREATE TABLE users("
"id INTEGER GENERATED ALWAYS AS IDENTITY PRIMARY KEY,"
"login VARCHAR(255),"
"salt BYTEA,"
"password BYTEA,"
"admin BOOLEAN)"
)
def add_test_data() -> None:
with Connection.get().cursor() as cursor:
cursor.execute(
"INSERT INTO STUDENTS(name, surname, group_) VALUES "
"('Nick', 'Stepanov', 'programmer'),"
"('Maxim', 'Yes', 'programmer')"
)
admin_creds = encrypt("admin")
user_creds = encrypt("example123!")
cursor.execute(
"INSERT INTO users(login, salt, password, admin) VALUES "
"('admin', %s, %s, true),"
"('user', %s, %s, false)",
(*admin_creds, *user_creds),
)
class Window(QWidget):
def __init__(self, is_admin: bool):
super().__init__()
self.is_admin = is_admin
self.setGeometry(200, 200, 700, 400)
self.setWindowTitle("SQL Test")
self.vbox = QVBoxLayout()
self.setLayout(self.vbox)
self.need_clean = []
self.update()
def update(self, data: Optional[list[Student]] = None):
self.clean_widgets()
if data is None:
data = Connection.get_all_data()
hbox_buttons = QHBoxLayout()
but1 = QPushButton("Create")
but1.clicked.connect(self.button_create_handler)
but2 = QPushButton("Edit")
but2.clicked.connect(self.button_edit_handler)
but3 = QPushButton("Remove")
but3.clicked.connect(self.button_remove_handler)
but4 = QPushButton("Find")
but4.clicked.connect(self.button_find_handler)
hbox_buttons.addWidget(but1)
hbox_buttons.addWidget(but2)
hbox_buttons.addWidget(but3)
hbox_buttons.addWidget(but4)
self.add_widget_to_need_clean(but1, but2, but3, but4)
hbox_id = QHBoxLayout()
label_id = QLabel("ID")
self.text_id = QLineEdit()
hbox_id.addWidget(label_id)
hbox_id.addWidget(self.text_id)
self.add_widget_to_need_clean(hbox_id, label_id, self.text_id)
hbox_surname = QHBoxLayout()
label_surname = QLabel("Surname")
self.text_surname = QLineEdit()
hbox_surname.addWidget(label_surname)
hbox_surname.addWidget(self.text_surname)
self.add_widget_to_need_clean(
hbox_surname,
label_surname,
self.text_surname,
)
hbox_name = QHBoxLayout()
label_name = QLabel("Name")
self.text_name = QLineEdit()
hbox_name.addWidget(label_name)
hbox_name.addWidget(self.text_name)
self.add_widget_to_need_clean(hbox_name, label_name, self.text_name)
hbox_group = QHBoxLayout()
label_group = QLabel("Group")
self.text_group = QLineEdit()
hbox_group.addWidget(label_group)
hbox_group.addWidget(self.text_group)
self.add_widget_to_need_clean(hbox_group, label_group, self.text_group)
hbox_table = QHBoxLayout()
table = self.form_table(data)
hbox_table.addWidget(table)
self.add_widget_to_need_clean(table)
self.vbox.addLayout(hbox_id)
self.vbox.addLayout(hbox_surname)
self.vbox.addLayout(hbox_name)
self.vbox.addLayout(hbox_group)
self.vbox.addLayout(hbox_buttons)
self.vbox.addLayout(hbox_table)
def add_widget_to_need_clean(self, *args):
for el in args:
self.need_clean.append(el)
def clean_widgets(self):
if self.need_clean:
for widget in self.need_clean:
widget.deleteLater()
self.need_clean.clear()
@staticmethod
def form_table(data: list[Student]) -> QTableWidget:
table = QTableWidget()
table.setRowCount(10)
table.setColumnCount(4)
table.setHorizontalHeaderLabels(("ID", "Surname", "Name", "Group"))
for i, row in enumerate(data):
for j, column in enumerate(map(str, row)):
table.setItem(i, j, QTableWidgetItem(column))
return table
def button_edit_handler(self):
if not self.is_admin:
return QMessageBox().warning(
self, "Rights", "You have to be an admin to do that"
)
with Connection.get().cursor() as cursor:
cursor.execute("SELECT * FROM students;")
students = cursor.fetchall()
indexes = []
for student in students:
indexes.append(str(student[0]))
if self.text_id.text() in indexes:
commands = []
if self.text_surname.text() != "":
commands.append(
"UPDATE students SET surname = '"
+ self.text_surname.text()
+ "' "
+ "WHERE id = "
+ self.text_id.text()
+ ";"
)
if self.text_name.text() != "":
commands.append(
"UPDATE students SET name = '"
+ self.text_name.text()
+ "' "
+ "WHERE id = "
+ self.text_id.text()
+ ";"
)
if self.text_group.text() != "":
commands.append(
"UPDATE students SET gr = '"
+ self.text_group.text()
+ "' "
+ "WHERE id = "
+ self.text_id.text()
+ ";"
)
with Connection.get().cursor() as cursor:
for command in commands:
cursor.execute(command)
QMessageBox.information(self, "Info", "Data changed")
else:
QMessageBox.warning(self, "Error", "Data not changed")
self.update()
def button_create_handler(self):
if not self.is_admin:
return QMessageBox().warning(
self, "Rights", "You have to be an admin to do that"
)
name = self.text_name.text()
surname = self.text_surname.text()
group = self.text_group.text()
if not all((surname, name, group)):
QMessageBox.warning(self, "Invalid id", "Invalid id")
return
with Connection.get().cursor() as cursor:
cursor.execute(
"INSERT INTO students(name, surname, group_) VALUES (%s, %s, %s)", # noqa
(name, surname, group),
)
self.update()
def button_remove_handler(self) -> None:
if not self.is_admin:
return QMessageBox().warning(
self, "Rights", "You have to be an admin to do that"
)
try:
id = int(self.text_id.text())
except ValueError:
QMessageBox.warning(self, "Invalid ID", "Invalid ID")
return
with Connection.get().cursor() as cursor:
cursor.execute("DELETE FROM students WHERE id = %s", (id,))
self.update()
def button_find_handler(self) -> None:
id = self.text_id.text()
name = self.text_name.text()
surname = self.text_surname.text()
group = self.text_group.text()
filters = []
params = []
if id:
if not id.isnumeric():
QMessageBox.warning(self, "Invalid id", "Invalid id")
return
filters.append("id = %s")
params.append(id)
if name:
filters.append("LOWER(name) ~ %s")
params.append(name.lower())
if surname:
filters.append("LOWER(surname) ~ %s")
params.append(surname.lower())
if group:
filters.append("LOWER(group_) ~ %s")
params.append(group.lower())
with Connection.get().cursor() as cursor:
if not params:
filters = ""
else:
filters = f" WHERE {' AND '.join(filters)}"
query = f"SELECT * FROM students{filters} ORDER BY id"
cursor.execute(query, params)
data = cursor.fetchall()
self.update(data)
class WindowPW(QWidget):
def __init__(self) -> None:
super().__init__()
self.loged_in = False
self.is_admin = False
self.setGeometry(200, 200, 250, 150)
self.setWindowTitle("Login")
label = QLabel("Name", self)
label.move(10, 10)
self.line_login = QLineEdit(self)
self.line_login.move(100, 10)
label_pw = QLabel("password", self)
label_pw.move(10, 50)
self.line_pw = QLineEdit(self)
self.line_pw.move(100, 50)
register_but = QPushButton("Register", self)
register_but.move(25, 100)
register_but.clicked.connect(self.register_handler)
but = QPushButton("OK", self)
but.clicked.connect(self.but_handler)
but.move(100, 100)
def but_handler(self) -> None:
login = self.line_login.text().strip()
password = self.line_pw.text().strip()
user = Connection.get_user(login)
if user is None:
self.close()
return QMessageBox.warning(self, "Bye!", "Wrong creds")
salt, expected, admin = Connection.get_user(login)
if check_password(password, salt, expected):
self.loged_in = True
self.is_admin = admin
else:
QMessageBox.warning(self, "Bye!", "Wrong creds")
self.close()
def register_handler(self) -> None:
login = self.line_login.text().strip()
password = self.line_pw.text().strip()
if not (login and password):
return QMessageBox().warning(
self, "Wrong creds", "You must input both login and password"
)
with Connection.get().cursor() as cursor:
cursor.execute(
"SELECT true FROM users WHERE login = %s LIMIT 1",
(login,),
)
if cursor.fetchone() is not None:
return QMessageBox().warning(
self, "Wrong creds", "Login already exists"
)
cursor.execute(
"INSERT INTO users(login, salt, password) VALUES (%s, %s, %s)",
(login, *encrypt(password)),
)
self.loged_in = True
self.close()
def main() -> None:
prepare_database()
add_test_data()
app = QApplication(sys.argv)
window = WindowPW()
window.show()
result = app.exec()
if not window.loged_in:
sys.exit(result)
window = Window(window.is_admin)
window.show()
sys.exit(app.exec())
if __name__ == "__main__":
main()