420 lines
14 KiB
Python
Executable File
420 lines
14 KiB
Python
Executable File
from __future__ import annotations
|
|
|
|
import os
|
|
import sys
|
|
from typing import Optional
|
|
|
|
import psycopg2
|
|
from psycopg2 import extensions
|
|
from PyQt6.QtWidgets import (
|
|
QApplication,
|
|
QHBoxLayout,
|
|
QLabel,
|
|
QLineEdit,
|
|
QMessageBox,
|
|
QPushButton,
|
|
QTableWidget,
|
|
QTableWidgetItem,
|
|
QVBoxLayout,
|
|
QWidget,
|
|
)
|
|
from cryptography.hazmat.primitives.kdf.scrypt import Scrypt
|
|
from cryptography.exceptions import InvalidKey
|
|
|
|
|
|
def get_kdf(salt: bytes) -> Scrypt:
|
|
return Scrypt(salt, length=128, n=2**14, r=8, p=1)
|
|
|
|
|
|
def encrypt(password: str) -> bytes:
|
|
salt = os.urandom(64)
|
|
return (salt, get_kdf(salt).derive(password.encode("UTF-8")))
|
|
|
|
|
|
def check_password(password: str, salt: bytes, expected: bytes) -> bool:
|
|
try:
|
|
get_kdf(salt).verify(password.encode("UTF-8"), expected)
|
|
except InvalidKey:
|
|
return False
|
|
else:
|
|
return True
|
|
|
|
|
|
Student = tuple[int, str, str, str]
|
|
|
|
HOST = "127.0.0.1"
|
|
USER = "tester"
|
|
PASSWORD = "example123!"
|
|
PORT = 5432
|
|
DATABASE = "testing"
|
|
|
|
|
|
class Connection:
|
|
connection: Optional[extensions.connection] = None
|
|
|
|
@classmethod
|
|
def get(cls) -> extensions.connection:
|
|
if cls.connection is not None:
|
|
return cls.connection
|
|
try:
|
|
cls.connection = psycopg2.connect(
|
|
host=HOST, user=USER, password=PASSWORD, database=DATABASE
|
|
)
|
|
cls.connection.autocommit = True
|
|
except Exception as e:
|
|
print(type(e))
|
|
raise
|
|
return cls.connection
|
|
|
|
@classmethod
|
|
def close(cls) -> None:
|
|
if cls.connection is not None:
|
|
cls.connection.close()
|
|
cls.connection = None
|
|
|
|
@classmethod
|
|
def get_all_data(cls) -> list[Student]:
|
|
with cls.get().cursor() as cursor:
|
|
cursor.execute("SELECT * FROM students ORDER BY id")
|
|
return cursor.fetchall()
|
|
|
|
@classmethod
|
|
def get_user(cls, login: str) -> Optional[tuple[bytes, bytes]]:
|
|
with cls.get().cursor() as cursor:
|
|
cursor.execute(
|
|
"SELECT salt, password, admin FROM users WHERE login = %s",
|
|
(login,),
|
|
)
|
|
result = cursor.fetchone()
|
|
if result is None:
|
|
return None
|
|
return (bytes(result[0]), bytes(result[1]), result[2])
|
|
|
|
|
|
def prepare_database() -> None:
|
|
with Connection.get().cursor() as cursor:
|
|
cursor.execute("DROP TABLE IF EXISTS students")
|
|
cursor.execute(
|
|
"CREATE TABLE students("
|
|
"id INTEGER GENERATED ALWAYS AS IDENTITY PRIMARY KEY,"
|
|
"surname VARCHAR(255),"
|
|
"name VARCHAR(255),"
|
|
"group_ VARCHAR(255))"
|
|
)
|
|
cursor.execute("DROP TABLE IF EXISTS users")
|
|
cursor.execute(
|
|
"CREATE TABLE users("
|
|
"id INTEGER GENERATED ALWAYS AS IDENTITY PRIMARY KEY,"
|
|
"login VARCHAR(255),"
|
|
"salt BYTEA,"
|
|
"password BYTEA,"
|
|
"admin BOOLEAN)"
|
|
)
|
|
|
|
|
|
def add_test_data() -> None:
|
|
with Connection.get().cursor() as cursor:
|
|
cursor.execute(
|
|
"INSERT INTO STUDENTS(name, surname, group_) VALUES "
|
|
"('Nick', 'Stepanov', 'programmer'),"
|
|
"('Maxim', 'Yes', 'programmer')"
|
|
)
|
|
admin_creds = encrypt("admin")
|
|
user_creds = encrypt("example123!")
|
|
cursor.execute(
|
|
"INSERT INTO users(login, salt, password, admin) VALUES "
|
|
"('admin', %s, %s, true),"
|
|
"('user', %s, %s, false)",
|
|
(*admin_creds, *user_creds),
|
|
)
|
|
|
|
|
|
class Window(QWidget):
|
|
def __init__(self, is_admin: bool):
|
|
super().__init__()
|
|
self.is_admin = is_admin
|
|
self.setGeometry(200, 200, 700, 400)
|
|
self.setWindowTitle("SQL Test")
|
|
self.vbox = QVBoxLayout()
|
|
self.setLayout(self.vbox)
|
|
self.need_clean = []
|
|
self.update()
|
|
|
|
def update(self, data: Optional[list[Student]] = None):
|
|
self.clean_widgets()
|
|
|
|
if data is None:
|
|
data = Connection.get_all_data()
|
|
|
|
hbox_buttons = QHBoxLayout()
|
|
but1 = QPushButton("Create")
|
|
but1.clicked.connect(self.button_create_handler)
|
|
but2 = QPushButton("Edit")
|
|
but2.clicked.connect(self.button_edit_handler)
|
|
but3 = QPushButton("Remove")
|
|
but3.clicked.connect(self.button_remove_handler)
|
|
but4 = QPushButton("Find")
|
|
but4.clicked.connect(self.button_find_handler)
|
|
hbox_buttons.addWidget(but1)
|
|
hbox_buttons.addWidget(but2)
|
|
hbox_buttons.addWidget(but3)
|
|
hbox_buttons.addWidget(but4)
|
|
self.add_widget_to_need_clean(but1, but2, but3, but4)
|
|
|
|
hbox_id = QHBoxLayout()
|
|
label_id = QLabel("ID")
|
|
self.text_id = QLineEdit()
|
|
hbox_id.addWidget(label_id)
|
|
hbox_id.addWidget(self.text_id)
|
|
self.add_widget_to_need_clean(hbox_id, label_id, self.text_id)
|
|
|
|
hbox_surname = QHBoxLayout()
|
|
label_surname = QLabel("Surname")
|
|
self.text_surname = QLineEdit()
|
|
hbox_surname.addWidget(label_surname)
|
|
hbox_surname.addWidget(self.text_surname)
|
|
self.add_widget_to_need_clean(
|
|
hbox_surname,
|
|
label_surname,
|
|
self.text_surname,
|
|
)
|
|
|
|
hbox_name = QHBoxLayout()
|
|
label_name = QLabel("Name")
|
|
self.text_name = QLineEdit()
|
|
hbox_name.addWidget(label_name)
|
|
hbox_name.addWidget(self.text_name)
|
|
self.add_widget_to_need_clean(hbox_name, label_name, self.text_name)
|
|
|
|
hbox_group = QHBoxLayout()
|
|
label_group = QLabel("Group")
|
|
self.text_group = QLineEdit()
|
|
hbox_group.addWidget(label_group)
|
|
hbox_group.addWidget(self.text_group)
|
|
self.add_widget_to_need_clean(hbox_group, label_group, self.text_group)
|
|
|
|
hbox_table = QHBoxLayout()
|
|
table = self.form_table(data)
|
|
hbox_table.addWidget(table)
|
|
self.add_widget_to_need_clean(table)
|
|
|
|
self.vbox.addLayout(hbox_id)
|
|
self.vbox.addLayout(hbox_surname)
|
|
self.vbox.addLayout(hbox_name)
|
|
self.vbox.addLayout(hbox_group)
|
|
self.vbox.addLayout(hbox_buttons)
|
|
self.vbox.addLayout(hbox_table)
|
|
|
|
def add_widget_to_need_clean(self, *args):
|
|
for el in args:
|
|
self.need_clean.append(el)
|
|
|
|
def clean_widgets(self):
|
|
if self.need_clean:
|
|
for widget in self.need_clean:
|
|
widget.deleteLater()
|
|
self.need_clean.clear()
|
|
|
|
@staticmethod
|
|
def form_table(data: list[Student]) -> QTableWidget:
|
|
table = QTableWidget()
|
|
table.setRowCount(10)
|
|
table.setColumnCount(4)
|
|
table.setHorizontalHeaderLabels(("ID", "Surname", "Name", "Group"))
|
|
for i, row in enumerate(data):
|
|
for j, column in enumerate(map(str, row)):
|
|
table.setItem(i, j, QTableWidgetItem(column))
|
|
return table
|
|
|
|
def button_edit_handler(self):
|
|
if not self.is_admin:
|
|
return QMessageBox().warning(
|
|
self, "Rights", "You have to be an admin to do that"
|
|
)
|
|
with Connection.get().cursor() as cursor:
|
|
cursor.execute("SELECT * FROM students;")
|
|
students = cursor.fetchall()
|
|
indexes = []
|
|
for student in students:
|
|
indexes.append(str(student[0]))
|
|
if self.text_id.text() in indexes:
|
|
commands = []
|
|
if self.text_surname.text() != "":
|
|
commands.append(
|
|
"UPDATE students SET surname = '"
|
|
+ self.text_surname.text()
|
|
+ "' "
|
|
+ "WHERE id = "
|
|
+ self.text_id.text()
|
|
+ ";"
|
|
)
|
|
if self.text_name.text() != "":
|
|
commands.append(
|
|
"UPDATE students SET name = '"
|
|
+ self.text_name.text()
|
|
+ "' "
|
|
+ "WHERE id = "
|
|
+ self.text_id.text()
|
|
+ ";"
|
|
)
|
|
if self.text_group.text() != "":
|
|
commands.append(
|
|
"UPDATE students SET gr = '"
|
|
+ self.text_group.text()
|
|
+ "' "
|
|
+ "WHERE id = "
|
|
+ self.text_id.text()
|
|
+ ";"
|
|
)
|
|
with Connection.get().cursor() as cursor:
|
|
for command in commands:
|
|
cursor.execute(command)
|
|
QMessageBox.information(self, "Info", "Data changed")
|
|
else:
|
|
QMessageBox.warning(self, "Error", "Data not changed")
|
|
self.update()
|
|
|
|
def button_create_handler(self):
|
|
if not self.is_admin:
|
|
return QMessageBox().warning(
|
|
self, "Rights", "You have to be an admin to do that"
|
|
)
|
|
name = self.text_name.text()
|
|
surname = self.text_surname.text()
|
|
group = self.text_group.text()
|
|
if not all((surname, name, group)):
|
|
QMessageBox.warning(self, "Invalid id", "Invalid id")
|
|
return
|
|
with Connection.get().cursor() as cursor:
|
|
cursor.execute(
|
|
"INSERT INTO students(name, surname, group_) VALUES (%s, %s, %s)", # noqa
|
|
(name, surname, group),
|
|
)
|
|
self.update()
|
|
|
|
def button_remove_handler(self) -> None:
|
|
if not self.is_admin:
|
|
return QMessageBox().warning(
|
|
self, "Rights", "You have to be an admin to do that"
|
|
)
|
|
try:
|
|
id = int(self.text_id.text())
|
|
except ValueError:
|
|
QMessageBox.warning(self, "Invalid ID", "Invalid ID")
|
|
return
|
|
with Connection.get().cursor() as cursor:
|
|
cursor.execute("DELETE FROM students WHERE id = %s", (id,))
|
|
self.update()
|
|
|
|
def button_find_handler(self) -> None:
|
|
id = self.text_id.text()
|
|
name = self.text_name.text()
|
|
surname = self.text_surname.text()
|
|
group = self.text_group.text()
|
|
filters = []
|
|
params = []
|
|
if id:
|
|
if not id.isnumeric():
|
|
QMessageBox.warning(self, "Invalid id", "Invalid id")
|
|
return
|
|
filters.append("id = %s")
|
|
params.append(id)
|
|
if name:
|
|
filters.append("LOWER(name) ~ %s")
|
|
params.append(name.lower())
|
|
if surname:
|
|
filters.append("LOWER(surname) ~ %s")
|
|
params.append(surname.lower())
|
|
if group:
|
|
filters.append("LOWER(group_) ~ %s")
|
|
params.append(group.lower())
|
|
|
|
with Connection.get().cursor() as cursor:
|
|
if not params:
|
|
filters = ""
|
|
else:
|
|
filters = f" WHERE {' AND '.join(filters)}"
|
|
query = f"SELECT * FROM students{filters} ORDER BY id"
|
|
cursor.execute(query, params)
|
|
data = cursor.fetchall()
|
|
self.update(data)
|
|
|
|
|
|
class WindowPW(QWidget):
|
|
def __init__(self) -> None:
|
|
super().__init__()
|
|
self.loged_in = False
|
|
self.is_admin = False
|
|
self.setGeometry(200, 200, 250, 150)
|
|
self.setWindowTitle("Login")
|
|
label = QLabel("Name", self)
|
|
label.move(10, 10)
|
|
self.line_login = QLineEdit(self)
|
|
self.line_login.move(100, 10)
|
|
label_pw = QLabel("password", self)
|
|
label_pw.move(10, 50)
|
|
self.line_pw = QLineEdit(self)
|
|
self.line_pw.move(100, 50)
|
|
register_but = QPushButton("Register", self)
|
|
register_but.move(25, 100)
|
|
register_but.clicked.connect(self.register_handler)
|
|
but = QPushButton("OK", self)
|
|
but.clicked.connect(self.but_handler)
|
|
but.move(100, 100)
|
|
|
|
def but_handler(self) -> None:
|
|
login = self.line_login.text().strip()
|
|
password = self.line_pw.text().strip()
|
|
user = Connection.get_user(login)
|
|
if user is None:
|
|
self.close()
|
|
return QMessageBox.warning(self, "Bye!", "Wrong creds")
|
|
salt, expected, admin = Connection.get_user(login)
|
|
if check_password(password, salt, expected):
|
|
self.loged_in = True
|
|
self.is_admin = admin
|
|
else:
|
|
QMessageBox.warning(self, "Bye!", "Wrong creds")
|
|
self.close()
|
|
|
|
def register_handler(self) -> None:
|
|
login = self.line_login.text().strip()
|
|
password = self.line_pw.text().strip()
|
|
if not (login and password):
|
|
return QMessageBox().warning(
|
|
self, "Wrong creds", "You must input both login and password"
|
|
)
|
|
with Connection.get().cursor() as cursor:
|
|
cursor.execute(
|
|
"SELECT true FROM users WHERE login = %s LIMIT 1",
|
|
(login,),
|
|
)
|
|
if cursor.fetchone() is not None:
|
|
return QMessageBox().warning(
|
|
self, "Wrong creds", "Login already exists"
|
|
)
|
|
cursor.execute(
|
|
"INSERT INTO users(login, salt, password) VALUES (%s, %s, %s)",
|
|
(login, *encrypt(password)),
|
|
)
|
|
self.loged_in = True
|
|
self.close()
|
|
|
|
|
|
def main() -> None:
|
|
prepare_database()
|
|
add_test_data()
|
|
app = QApplication(sys.argv)
|
|
window = WindowPW()
|
|
window.show()
|
|
result = app.exec()
|
|
if not window.loged_in:
|
|
sys.exit(result)
|
|
window = Window(window.is_admin)
|
|
window.show()
|
|
sys.exit(app.exec())
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|