Split up into multiple files

This commit is contained in:
StNicolay 2024-08-09 18:09:43 +03:00
parent 16475a27e8
commit ff629b8903
Signed by: StNicolay
GPG Key ID: 9693D04DCD962B0D
5 changed files with 416 additions and 412 deletions

View File

@ -1,424 +1,17 @@
from __future__ import annotations
import os import os
from typing import Protocol, Self
import datetime
import uuid
import sys import sys
import dotenv import dotenv
import httpx from PyQt6.QtWidgets import QApplication
import keyring from state import State
import pydantic
from PyQt6.QtCore import Qt, QPoint
from PyQt6.QtGui import (
QAction,
QDragEnterEvent,
QDragMoveEvent,
QDropEvent,
QIcon,
)
from PyQt6.QtWidgets import (
QApplication,
QHBoxLayout,
QLabel,
QLineEdit,
QListWidget,
QListWidgetItem,
QMainWindow,
QMenu,
QMessageBox,
QPushButton,
QStackedWidget,
QVBoxLayout,
QWidget,
)
class RequestClient:
_client: httpx.Client = None
def __new__(cls) -> Self:
if cls._client is None:
url = os.environ.get("DRIVE_HOST_URL").strip()
if not url:
url = "localhost:3000"
cls._client = httpx.Client(base_url=url)
return super().__new__(cls)
@classmethod
def set_token(cls, token: str):
cls._client.headers = {"Authorization": f"Bearer {token}"}
@property
def client(self) -> httpx.Client:
return self._client
class RegisterWidget(QWidget):
def __init__(self, switcher: State):
super().__init__()
self.switcher = switcher
self.username_label = QLabel("Username:")
self.username_input = QLineEdit()
self.email_label = QLabel("Email:")
self.email_input = QLineEdit()
self.password_label = QLabel("Password:")
self.password_input = QLineEdit()
self.password_input.setEchoMode(QLineEdit.EchoMode.Password)
self.register_button = QPushButton("Register")
self.register_button.clicked.connect(self.register)
self.login_button = QPushButton("Switch to Login")
self.login_button.clicked.connect(self.switcher.switch_to_login)
layout = QVBoxLayout()
layout.addWidget(self.username_label)
layout.addWidget(self.username_input)
layout.addWidget(self.email_label)
layout.addWidget(self.email_input)
layout.addWidget(self.password_label)
layout.addWidget(self.password_input)
layout.addWidget(self.register_button)
layout.addWidget(self.login_button)
self.setLayout(layout)
def register(self):
username = self.username_input.text()
email = self.email_input.text()
password = self.password_input.text()
if not username or not email or not password:
QMessageBox.warning(self, "Input Error", "All fields are required")
return
try:
response = RequestClient().client.post(
"http://localhost:3000/users/register",
data={
"username": username,
"email": email,
"password": password,
},
)
if response.is_success:
token = response.json().get("access_token")
if not token:
QMessageBox.critical(
self,
"Error getting the token",
"Error getting the token",
)
self.switcher.login(token)
else:
QMessageBox.warning(
self, "Error", f"Registration failed: {response.text}"
)
except httpx.HTTPError as e:
QMessageBox.critical(self, "HTTP Error", str(e))
class LoginWidget(QWidget):
def __init__(self, switcher: State):
super().__init__()
self.switcher = switcher
self.username_label = QLabel("Username:")
self.username_input = QLineEdit()
self.password_label = QLabel("Password:")
self.password_input = QLineEdit()
self.password_input.setEchoMode(QLineEdit.EchoMode.Password)
self.login_button = QPushButton("Login")
self.login_button.clicked.connect(self.login)
self.register_button = QPushButton("Switch to Register")
self.register_button.clicked.connect(self.switcher.switch_to_register)
layout = QVBoxLayout()
layout.addWidget(self.username_label)
layout.addWidget(self.username_input)
layout.addWidget(self.password_label)
layout.addWidget(self.password_input)
layout.addWidget(self.login_button)
layout.addWidget(self.register_button)
self.setLayout(layout)
def login(self):
username = self.username_input.text()
password = self.password_input.text()
if not username or not password:
QMessageBox.warning(
self, "Input Error", "Email and Password are required"
)
return
try:
response = RequestClient().client.post(
"http://localhost:3000/users/authorize",
data={"username": username, "password": password},
)
if response.is_success:
access_token = response.json().get("access_token")
if access_token:
self.switcher.login(access_token)
else:
QMessageBox.warning(
self, "Error", "No access token received"
)
else:
QMessageBox.warning(
self, "Error", f"Login failed: {response.text}"
)
except httpx.HTTPError as e:
QMessageBox.critical(self, "HTTP Error", str(e))
class FileListWidget(QListWidget):
def __init__(self, state: State):
super().__init__()
self.state = state
self.setAcceptDrops(True)
self.setContextMenuPolicy(Qt.ContextMenuPolicy.CustomContextMenu)
self.customContextMenuRequested.connect(self.show_context_menu)
self.responses: list[ListResponse] = []
self.update_response()
def current_response(self) -> ListResponse:
if not self.responses:
self.update_response()
return self.responses[-1]
def update_response(self):
if not self.responses:
response = ListResponse.model_validate_json(
RequestClient().client.get("/folders").text
)
self.responses.append(response)
print(response.files)
self.responses[-1] = ListResponse.model_validate_json(
RequestClient()
.client.get(
"/folders", params={"folder_id": self.responses[-1].folder_id}
)
.text
)
self.update()
def dragEnterEvent(self, event: QDragEnterEvent):
if event.mimeData().hasUrls():
event.accept()
else:
event.ignore()
def dragMoveEvent(self, e: QDragMoveEvent | None) -> None:
return self.dragEnterEvent(e)
def dropEvent(self, event: QDropEvent):
event.accept()
print("hi")
for url in event.mimeData().urls():
file_path = url.toLocalFile()
self.upload_file(file_path)
def upload_file(self, file_path):
file_name = os.path.basename(file_path)
try:
with open(file_path, "rb") as f:
files = {"file": (file_name, f)}
response = RequestClient().client.post(
"http://localhost:3000/files",
files=files,
params={
"parent_folder": self.current_response().folder_id
},
)
if response.is_success:
QMessageBox.information(
self, "Success", "File uploaded successfully"
)
self.update_response()
else:
QMessageBox.warning(
self, "Error", f"Upload failed: {response.text}"
)
except httpx.HTTPError as e:
QMessageBox.critical(self, "HTTP Error", str(e))
def add_file_item(self, file_name):
item = QListWidgetItem(file_name)
item.setIcon(QIcon.fromTheme("text-x-generic")) # File icon
self.addItem(item)
def show_context_menu(self, pos: QPoint):
item = self.itemAt(pos)
if item:
menu = QMenu(self)
details_action = QAction("Details", self)
details_action.triggered.connect(lambda: self.show_details(item))
delete_action = QAction("Delete", self)
delete_action.triggered.connect(lambda: self.delete_item(item))
menu.addAction(details_action)
menu.addAction(delete_action)
menu.exec(self.mapToGlobal(pos))
def show_details(self, item):
file_name = item.text()
QMessageBox.information(self, "Details", f"Details for {file_name}")
def delete_item(self, item):
row = self.row(item)
item = self.current_response().items()[row]
item.delete()
self.update_response()
QMessageBox.information(self, "Delete", f"{item.name()} deleted")
def update(self) -> None:
self.clear()
last = self.responses[-1]
for item in last.items():
self.add_file_item(item.name())
class Sidebar(QWidget):
def __init__(self, state: State):
super().__init__()
self.state = state
layout = QVBoxLayout()
self.setLayout(layout)
# Add your sidebar buttons here
for i in range(5): # Example buttons
btn = QPushButton(f"Button {i+1}")
layout.addWidget(btn)
layout.addStretch()
class MainFileWidget(QWidget):
def __init__(self, state: State):
super().__init__()
self.state = state
layout = QHBoxLayout()
self.sidebar = Sidebar(state)
self.file_list = FileListWidget(state)
layout.addWidget(self.sidebar)
layout.addWidget(self.file_list)
self.setLayout(layout)
class DisplayProtocol(Protocol):
def name(self) -> str:
raise NotImplementedError
def delete(self) -> None:
raise NotImplementedError
def details(self) -> QWidget:
raise NotImplementedError
class File(pydantic.BaseModel):
file_id: uuid.UUID
file_name: str
file_size: int
sha512: str
created_at: datetime.datetime
updated_at: datetime.datetime
def name(self) -> str:
return self.file_name
def delete(self) -> None:
RequestClient().client.delete(
"/files", params={"file_id": self.file_id}
)
def details_button(self) -> QPushButton:
# TODO
raise NotImplementedError
class Folder(pydantic.BaseModel):
folder_id: uuid.UUID
owner_id: int
folder_name: str
created_at: datetime.datetime
def name(self) -> str:
return self.folder_name
def delete(self) -> None:
RequestClient().client.delete(
"/folders", params={"folder_id": self.folder_id}
)
def details_button(self) -> QPushButton:
# TODO
raise NotImplementedError
class ListResponse(pydantic.BaseModel):
folder_id: uuid.UUID
files: list[File]
folders: list[Folder]
def items(self) -> list[DisplayProtocol]:
return self.files + self.folders
class State(QMainWindow):
def __init__(self, url: str):
super().__init__()
self.setWindowTitle("Auth App")
self.stack = QStackedWidget()
self.client = httpx.Client(base_url=url)
self.register_widget = RegisterWidget(self)
self.login_widget = LoginWidget(self)
self.stack.addWidget(
self.login_widget
) # Login widget is the first in the stack
self.stack.addWidget(self.register_widget)
self.setCentralWidget(self.stack)
password = keyring.get_credential("auth_app", "access_token")
if password is None:
self.switch_to_login() # Start with the login widget
self.login(password.password)
def switch_to_login(self):
self.stack.setCurrentWidget(self.login_widget)
def switch_to_register(self):
self.stack.setCurrentWidget(self.register_widget)
def login(self, token: str):
keyring.set_password("auth_app", "access_token", token)
RequestClient().set_token(token)
self.file_widget = MainFileWidget(self)
self.stack.addWidget(self.file_widget)
self.stack.setCurrentWidget(self.file_widget)
if __name__ == "__main__": if __name__ == "__main__":
dotenv.load_dotenv() dotenv.load_dotenv()
url = os.environ.get("DRIVE_HOST_URL").strip() url = os.environ.get("DRIVE_HOST_URL")
if not url: if not url:
url = "localhost:3000" url = "localhost:3000"
else:
url = url.strip()
app = QApplication(sys.argv) app = QApplication(sys.argv)
window = State(url) window = State(url)
window.show() window.show()

136
desktop_client/auth.py Normal file
View File

@ -0,0 +1,136 @@
from __future__ import annotations
import httpx
import state
from PyQt6.QtWidgets import (
QLabel,
QLineEdit,
QMessageBox,
QPushButton,
QVBoxLayout,
QWidget,
)
from request_client import RequestClient
class RegisterWidget(QWidget):
def __init__(self, switcher: state.State):
super().__init__()
self.switcher = switcher
self.username_label = QLabel("Username:")
self.username_input = QLineEdit()
self.email_label = QLabel("Email:")
self.email_input = QLineEdit()
self.password_label = QLabel("Password:")
self.password_input = QLineEdit()
self.password_input.setEchoMode(QLineEdit.EchoMode.Password)
self.register_button = QPushButton("Register")
self.register_button.clicked.connect(self.register)
self.login_button = QPushButton("Switch to Login")
self.login_button.clicked.connect(self.switcher.switch_to_login)
layout = QVBoxLayout()
layout.addWidget(self.username_label)
layout.addWidget(self.username_input)
layout.addWidget(self.email_label)
layout.addWidget(self.email_input)
layout.addWidget(self.password_label)
layout.addWidget(self.password_input)
layout.addWidget(self.register_button)
layout.addWidget(self.login_button)
self.setLayout(layout)
def register(self):
username = self.username_input.text()
email = self.email_input.text()
password = self.password_input.text()
if not username or not email or not password:
QMessageBox.warning(self, "Input Error", "All fields are required")
return
try:
response = RequestClient().client.post(
"http://localhost:3000/users/register",
data={
"username": username,
"email": email,
"password": password,
},
)
if response.is_success:
token = response.json().get("access_token")
if not token:
QMessageBox.critical(
self,
"Error getting the token",
"Error getting the token",
)
self.switcher.login(token)
else:
QMessageBox.warning(
self, "Error", f"Registration failed: {response.text}"
)
except httpx.HTTPError as e:
QMessageBox.critical(self, "HTTP Error", str(e))
class LoginWidget(QWidget):
def __init__(self, switcher: State):
super().__init__()
self.switcher = switcher
self.username_label = QLabel("Username:")
self.username_input = QLineEdit()
self.password_label = QLabel("Password:")
self.password_input = QLineEdit()
self.password_input.setEchoMode(QLineEdit.EchoMode.Password)
self.login_button = QPushButton("Login")
self.login_button.clicked.connect(self.login)
self.register_button = QPushButton("Switch to Register")
self.register_button.clicked.connect(self.switcher.switch_to_register)
layout = QVBoxLayout()
layout.addWidget(self.username_label)
layout.addWidget(self.username_input)
layout.addWidget(self.password_label)
layout.addWidget(self.password_input)
layout.addWidget(self.login_button)
layout.addWidget(self.register_button)
self.setLayout(layout)
def login(self):
username = self.username_input.text()
password = self.password_input.text()
if not username or not password:
QMessageBox.warning(self, "Input Error", "Email and Password are required")
return
try:
response = RequestClient().client.post(
"http://localhost:3000/users/authorize",
data={"username": username, "password": password},
)
if response.is_success:
access_token = response.json().get("access_token")
if access_token:
self.switcher.login(access_token)
else:
QMessageBox.warning(self, "Error", "No access token received")
else:
QMessageBox.warning(self, "Error", f"Login failed: {response.text}")
except httpx.HTTPError as e:
QMessageBox.critical(self, "HTTP Error", str(e))

View File

@ -0,0 +1,207 @@
from __future__ import annotations
import datetime
import os
import uuid
from typing import Protocol
import httpx
import pydantic
import state
from PyQt6.QtCore import QPoint, Qt
from PyQt6.QtGui import QAction, QDragEnterEvent, QDragMoveEvent, QDropEvent, QIcon
from PyQt6.QtWidgets import (
QHBoxLayout,
QListWidget,
QListWidgetItem,
QMenu,
QMessageBox,
QPushButton,
QVBoxLayout,
QWidget,
)
from request_client import RequestClient
class DisplayProtocol(Protocol):
def name(self) -> str:
raise NotImplementedError
def delete(self) -> None:
raise NotImplementedError
def details(self) -> QWidget:
raise NotImplementedError
class File(pydantic.BaseModel):
file_id: uuid.UUID
file_name: str
file_size: int
sha512: str
created_at: datetime.datetime
updated_at: datetime.datetime
def name(self) -> str:
return self.file_name
def delete(self) -> None:
RequestClient().client.delete("/files", params={"file_id": self.file_id})
def details_button(self) -> QPushButton:
# TODO
raise NotImplementedError
class Folder(pydantic.BaseModel):
folder_id: uuid.UUID
owner_id: int
folder_name: str
created_at: datetime.datetime
def name(self) -> str:
return self.folder_name
def delete(self) -> None:
RequestClient().client.delete("/folders", params={"folder_id": self.folder_id})
def details_button(self) -> QPushButton:
# TODO
raise NotImplementedError
class ListResponse(pydantic.BaseModel):
folder_id: uuid.UUID
files: list[File]
folders: list[Folder]
def items(self) -> list[DisplayProtocol]:
return self.files + self.folders
class FileListWidget(QListWidget):
def __init__(self, state: state.State):
super().__init__()
self.state = state
self.setAcceptDrops(True)
self.setContextMenuPolicy(Qt.ContextMenuPolicy.CustomContextMenu)
self.customContextMenuRequested.connect(self.show_context_menu)
self.responses: list[ListResponse] = []
self.update_response()
def current_response(self) -> ListResponse:
if not self.responses:
self.update_response()
return self.responses[-1]
def update_response(self):
if not self.responses:
response = ListResponse.model_validate_json(
RequestClient().client.get("/folders").text
)
self.responses.append(response)
print(response.files)
self.responses[-1] = ListResponse.model_validate_json(
RequestClient()
.client.get("/folders", params={"folder_id": self.responses[-1].folder_id})
.text
)
self.update()
def dragEnterEvent(self, event: QDragEnterEvent):
if event.mimeData().hasUrls():
event.accept()
else:
event.ignore()
def dragMoveEvent(self, e: QDragMoveEvent | None) -> None:
return self.dragEnterEvent(e)
def dropEvent(self, event: QDropEvent):
event.accept()
print("hi")
for url in event.mimeData().urls():
file_path = url.toLocalFile()
self.upload_file(file_path)
def upload_file(self, file_path):
file_name = os.path.basename(file_path)
try:
with open(file_path, "rb") as f:
files = {"file": (file_name, f)}
response = RequestClient().client.post(
"http://localhost:3000/files",
files=files,
params={"parent_folder": self.current_response().folder_id},
)
if response.is_success:
QMessageBox.information(
self, "Success", "File uploaded successfully"
)
self.update_response()
else:
QMessageBox.warning(
self, "Error", f"Upload failed: {response.text}"
)
except httpx.HTTPError as e:
QMessageBox.critical(self, "HTTP Error", str(e))
def add_file_item(self, file_name):
item = QListWidgetItem(file_name)
item.setIcon(QIcon.fromTheme("text-x-generic")) # File icon
self.addItem(item)
def show_context_menu(self, pos: QPoint):
item = self.itemAt(pos)
if item:
menu = QMenu(self)
details_action = QAction("Details", self)
details_action.triggered.connect(lambda: self.show_details(item))
delete_action = QAction("Delete", self)
delete_action.triggered.connect(lambda: self.delete_item(item))
menu.addAction(details_action)
menu.addAction(delete_action)
menu.exec(self.mapToGlobal(pos))
def show_details(self, item):
file_name = item.text()
QMessageBox.information(self, "Details", f"Details for {file_name}")
def delete_item(self, item):
row = self.row(item)
item = self.current_response().items()[row]
item.delete()
self.update_response()
QMessageBox.information(self, "Delete", f"{item.name()} deleted")
def update(self) -> None:
self.clear()
last = self.responses[-1]
for item in last.items():
self.add_file_item(item.name())
class Sidebar(QWidget):
def __init__(self, state: state.State):
super().__init__()
self.state = state
layout = QVBoxLayout()
self.setLayout(layout)
# Add your sidebar buttons here
for i in range(5): # Example buttons
btn = QPushButton(f"Button {i+1}")
layout.addWidget(btn)
layout.addStretch()
class MainFileWidget(QWidget):
def __init__(self, state: state.State):
super().__init__()
self.state = state
layout = QHBoxLayout()
self.sidebar = Sidebar(state)
self.file_list = FileListWidget(state)
layout.addWidget(self.sidebar)
layout.addWidget(self.file_list)
self.setLayout(layout)

View File

@ -0,0 +1,24 @@
import os
from typing import Self
import httpx
class RequestClient:
_client: httpx.Client = None
def __new__(cls) -> Self:
if cls._client is None:
url = os.environ.get("DRIVE_HOST_URL").strip()
if not url:
url = "localhost:3000"
cls._client = httpx.Client(base_url=url)
return super().__new__(cls)
@classmethod
def set_token(cls, token: str):
cls._client.headers = {"Authorization": f"Bearer {token}"}
@property
def client(self) -> httpx.Client:
return self._client

44
desktop_client/state.py Normal file
View File

@ -0,0 +1,44 @@
import auth
import file_widgets
import httpx
import keyring
import request_client
from PyQt6.QtWidgets import QMainWindow, QStackedWidget
class State(QMainWindow):
def __init__(self, url: str):
super().__init__()
self.setWindowTitle("Auth App")
self.stack = QStackedWidget()
self.client = httpx.Client(base_url=url)
self.register_widget = auth.RegisterWidget(self)
self.login_widget = auth.LoginWidget(self)
self.stack.addWidget(
self.login_widget
) # Login widget is the first in the stack
self.stack.addWidget(self.register_widget)
self.setCentralWidget(self.stack)
password = keyring.get_credential("auth_app", "access_token")
if password is None:
self.switch_to_login() # Start with the login widget
self.login(password.password)
def switch_to_login(self):
self.stack.setCurrentWidget(self.login_widget)
def switch_to_register(self):
self.stack.setCurrentWidget(self.register_widget)
def login(self, token: str):
keyring.set_password("auth_app", "access_token", token)
request_client.RequestClient().set_token(token)
self.file_widget = file_widgets.MainFileWidget(self)
self.stack.addWidget(self.file_widget)
self.stack.setCurrentWidget(self.file_widget)