Sync widget

This commit is contained in:
StNicolay 2024-08-11 10:03:43 +03:00
parent c101aa8aa6
commit 409f13b584
Signed by: StNicolay
GPG Key ID: 9693D04DCD962B0D
6 changed files with 280 additions and 182 deletions

124
desktop_client/file.py Normal file
View File

@ -0,0 +1,124 @@
from __future__ import annotations
import base64
import datetime
import hashlib
import os
import uuid
import file_widgets
import pydantic
from PyQt6.QtGui import QIcon
from PyQt6.QtWidgets import QFileDialog, QLabel, QMessageBox, QWidget
from request_client import RequestClient
class File(pydantic.BaseModel):
file_id: uuid.UUID
file_name: str
file_size: int
sha512: str
created_at: datetime.datetime
updated_at: datetime.datetime
def name(self) -> str:
return self.file_name
def delete(self) -> None:
RequestClient().client.delete(
"/files", params={"file_id": self.file_id}
)
def details(self, list: file_widgets.FileListWidget) -> QWidget:
del list
file_size = self._format_bytes(self.file_size)
file_size_text = f"{file_size[0]:.2f} {file_size[1]}"
details = (
f"file id: {self.file_id}\nfile_name: {self.file_name}\n"
+ f"file_size: {file_size_text}\n"
+ f"created at: {self.created_at}\nupdated at: {self.updated_at}"
)
label = QLabel()
label.setWindowTitle("File info")
label.setText(details)
return label
@staticmethod
def _format_bytes(size: int):
power = 2**10
n = 0
power_labels = {0: "", 1: "kibi", 2: "mebi", 3: "gibi", 4: "tebi"}
while size > power and n < 4:
size /= power
n += 1
return size, power_labels[n] + "bytes"
def icon(self) -> QIcon:
return QIcon("assets/file.png")
def double_click(self, list: file_widgets.FileListWidget) -> None:
location = QFileDialog.getExistingDirectory(
list, caption="Select save location"
)
if not location:
return
with open(
os.path.join(location, self.file_name), "wb"
) as f, RequestClient().client.stream(
"GET", "/files", params={"file_id": self.file_id}
) as stream:
if not stream.is_success:
QMessageBox.warning(list, "Error downloading the file")
return
for data in stream.iter_bytes():
f.write(data)
def create(path: str, parent_id: uuid.UUID):
"""Upload the file"""
file_name = os.path.basename(path)
try:
with open(path, "rb") as f:
files = {"file": (file_name, f)}
response = RequestClient().client.post(
"/files",
files=files,
params={"parent_folder": parent_id},
)
if not response.is_success:
QMessageBox.warning(
None, "Error", f"Upload failed: {response.text}"
)
print(response.text)
except Exception as e:
QMessageBox.critical(None, "HTTP Error", str(e))
def download(self, path: str):
with open(path, "wb") as f, RequestClient().client.stream(
"GET", "/files", params={"file_id": self.file_id}
) as stream:
if not stream.is_success:
QMessageBox.warning(None, "Error downloading the file")
return
for data in stream.iter_bytes():
f.write(data)
def modify(self, path: str):
"""Upload the file"""
file_name = os.path.basename(path)
hash = hashlib.file_digest(open(path, "rb"), "sha512").digest()
if base64.b64decode(self.sha512) == hash:
return
try:
with open(path, "rb") as f:
files = {"file": (file_name, f)}
response = RequestClient().client.patch(
"/files",
files=files,
params={"file_id": self.file_id},
)
if not response.is_success:
QMessageBox.warning(
None, "Error", f"Upload failed: {response.text}"
)
except Exception as e:
QMessageBox.critical(None, "HTTP Error", str(e))

View File

@ -1,25 +1,28 @@
from __future__ import annotations from __future__ import annotations
import base64
import dataclasses import dataclasses
import datetime
import hashlib
import os import os
import uuid import uuid
from typing import Protocol, Self from typing import Protocol, Self
import create_folder_widget import create_folder_widget
import folder_info import sync
import httpx import httpx
import pydantic import pydantic
import state import state
import user import user
from file import File
from folder import Folder
from PyQt6.QtCore import QPoint, Qt from PyQt6.QtCore import QPoint, Qt
from PyQt6.QtGui import QAction, QDragEnterEvent, QDragMoveEvent, QDropEvent, QIcon from PyQt6.QtGui import (
QAction,
QDragEnterEvent,
QDragMoveEvent,
QDropEvent,
QIcon,
)
from PyQt6.QtWidgets import ( from PyQt6.QtWidgets import (
QFileDialog,
QHBoxLayout, QHBoxLayout,
QLabel,
QListWidget, QListWidget,
QListWidgetItem, QListWidgetItem,
QMenu, QMenu,
@ -48,155 +51,6 @@ class DisplayProtocol(Protocol):
raise NotImplementedError raise NotImplementedError
class File(pydantic.BaseModel):
file_id: uuid.UUID
file_name: str
file_size: int
sha512: str
created_at: datetime.datetime
updated_at: datetime.datetime
def name(self) -> str:
return self.file_name
def delete(self) -> None:
RequestClient().client.delete("/files", params={"file_id": self.file_id})
def details(self, list: FileListWidget) -> QWidget:
del list
file_size = self._format_bytes(self.file_size)
file_size_text = f"{file_size[0]:.2f} {file_size[1]}"
details = (
f"file id: {self.file_id}\nfile_name: {self.file_name}\n"
+ f"file_size: {file_size_text}\n"
+ f"created at: {self.created_at}\nupdated at: {self.updated_at}"
)
label = QLabel()
label.setWindowTitle("File info")
label.setText(details)
return label
@staticmethod
def _format_bytes(size: int):
power = 2**10
n = 0
power_labels = {0: "", 1: "kibi", 2: "mebi", 3: "gibi", 4: "tebi"}
while size > power and n < 4:
size /= power
n += 1
return size, power_labels[n] + "bytes"
def icon(self) -> QIcon:
return QIcon("assets/file.png")
def double_click(self, list: FileListWidget) -> None:
location = QFileDialog.getExistingDirectory(
list, caption="Select save location"
)
if not location:
return
with open(
os.path.join(location, self.file_name), "wb"
) as f, RequestClient().client.stream(
"GET", "/files", params={"file_id": self.file_id}
) as stream:
if not stream.is_success:
QMessageBox.warning(list, "Error downloading the file")
return
for data in stream.iter_bytes():
f.write(data)
def create(path: str, parent_id: uuid.UUID):
"""Upload the file"""
file_name = os.path.basename(path)
try:
with open(path, "rb") as f:
files = {"file": (file_name, f)}
response = RequestClient().client.post(
"/files",
files=files,
params={"parent_folder": parent_id},
)
if not response.is_success:
QMessageBox.warning(
None, "Error", f"Upload failed: {response.text}"
)
print(response.text)
except httpx.HTTPError as e:
QMessageBox.critical(None, "HTTP Error", str(e))
def download(self, path: str):
with open(path, "wb") as f, RequestClient().client.stream(
"GET", "/files", params={"file_id": self.file_id}
) as stream:
if not stream.is_success:
QMessageBox.warning(None, "Error downloading the file")
return
for data in stream.iter_bytes():
f.write(data)
def modify(self, path: str):
"""Upload the file"""
file_name = os.path.basename(path)
hash = hashlib.file_digest(open(path, "rb"), "sha512").digest()
if base64.b64decode(self.sha512) == hash:
return
try:
with open(path, "rb") as f:
files = {"file": (file_name, f)}
response = RequestClient().client.patch(
"/files",
files=files,
params={"file_id": self.file_id},
)
if not response.is_success:
QMessageBox.warning(
None, "Error", f"Upload failed: {response.text}"
)
except httpx.HTTPError as e:
QMessageBox.critical(None, "HTTP Error", str(e))
class Folder(pydantic.BaseModel):
folder_id: uuid.UUID
owner_id: int
folder_name: str
created_at: datetime.datetime
def name(self) -> str:
return self.folder_name
def delete(self) -> None:
response = RequestClient().client.delete(
"/folders", params={"folder_id": self.folder_id}
)
if not response:
QMessageBox.warning(None, "Error deleting folder", response.text)
def details(self, list: FileListWidget) -> QWidget:
return folder_info.FolderInfoWidget(self)
def icon(self) -> QIcon:
return QIcon("assets/folder.png")
def double_click(self, list: FileListWidget) -> None:
list.responses.append(ListResponse.get(self.folder_id))
list.update()
@staticmethod
def create(name: str, parent_id: uuid.UUID) -> uuid.UUID:
response = RequestClient().client.post(
"/folders",
json={
"folder_name": name,
"parent_folder_id": str(parent_id),
},
)
if not response.is_success:
QMessageBox.warning(None, "Error creating folder", response.text)
return uuid.UUID(response.text.strip('"'))
class ResponseProtocol(Protocol): class ResponseProtocol(Protocol):
def items(self) -> list[DisplayProtocol]: def items(self) -> list[DisplayProtocol]:
raise NotImplementedError raise NotImplementedError
@ -229,7 +83,9 @@ class ListResponse(pydantic.BaseModel):
return self.get(self.folder_id) return self.get(self.folder_id)
def create_folder(self, file_list: FileListWidget): def create_folder(self, file_list: FileListWidget):
return create_folder_widget.CreateFolderWidget(self.folder_id, file_list) return create_folder_widget.CreateFolderWidget(
self.folder_id, file_list
)
@dataclasses.dataclass(slots=True) @dataclasses.dataclass(slots=True)
@ -305,7 +161,9 @@ class FileListWidget(QListWidget):
response = RequestClient().client.post( response = RequestClient().client.post(
"http://localhost:3000/files", "http://localhost:3000/files",
files=files, files=files,
params={"parent_folder": self.current_response().folder_id}, params={
"parent_folder": self.current_response().folder_id,
},
) )
if response.is_success: if response.is_success:
QMessageBox.information( QMessageBox.information(
@ -370,8 +228,9 @@ class Sidebar(QWidget):
("Go back", self.go_back), ("Go back", self.go_back),
("Go root", self.go_root), ("Go root", self.go_root),
("Get permitted", self.get_tlp), ("Get permitted", self.get_tlp),
("Sync", self.sync), ("Refresh", self.refresh),
("Create folder", self.create_folder), ("Create folder", self.create_folder),
("Sync all", self.sync_all),
] ]
for text, func in buttons: for text, func in buttons:
button = QPushButton(text) button = QPushButton(text)
@ -400,9 +259,8 @@ class Sidebar(QWidget):
self.file_list.responses.append(TlpResponse.get()) self.file_list.responses.append(TlpResponse.get())
self.file_list.update() self.file_list.update()
def sync(self): def refresh(self):
# TODO self.file_list.update_response()
...
def create_folder(self): def create_folder(self):
self.folder_widget = self.file_list.current_response().create_folder( self.folder_widget = self.file_list.current_response().create_folder(
@ -411,6 +269,9 @@ class Sidebar(QWidget):
if self.folder_widget is not None: if self.folder_widget is not None:
self.folder_widget.show() self.folder_widget.show()
def sync_all(self):
sync.SyncData.sync_all()
class MainFileWidget(QWidget): class MainFileWidget(QWidget):
def __init__(self, state: state.State): def __init__(self, state: state.State):

60
desktop_client/folder.py Normal file
View File

@ -0,0 +1,60 @@
from __future__ import annotations
import datetime
import uuid
import typing
if typing.TYPE_CHECKING:
import file_widgets
import pydantic
from PyQt6.QtGui import QIcon
from PyQt6.QtWidgets import QMessageBox, QWidget
from request_client import RequestClient
class Folder(pydantic.BaseModel):
folder_id: uuid.UUID
owner_id: int
folder_name: str
created_at: datetime.datetime
def name(self) -> str:
return self.folder_name
def delete(self) -> None:
import sync
sync.SyncData.delete(self.folder_id)
response = RequestClient().client.delete(
"/folders", params={"folder_id": self.folder_id}
)
if not response:
QMessageBox.warning(None, "Error deleting folder", response.text)
def details(self, list: file_widgets.FileListWidget) -> QWidget:
import folder_info
return folder_info.FolderInfoWidget(self)
def icon(self) -> QIcon:
return QIcon("assets/folder.png")
def double_click(self, list: file_widgets.FileListWidget) -> None:
import file_widgets
list.responses.append(file_widgets.ListResponse.get(self.folder_id))
list.update()
@staticmethod
def create(name: str, parent_id: uuid.UUID) -> uuid.UUID:
response = RequestClient().client.post(
"/folders",
json={
"folder_name": name,
"parent_folder_id": str(parent_id),
},
)
if not response.is_success:
QMessageBox.warning(None, "Error creating folder", response.text)
return uuid.UUID(response.text.strip('"'))

View File

@ -7,14 +7,17 @@ from functools import cache, partial
import file_widgets import file_widgets
import pydantic import pydantic
import sync
import user import user
from PyQt6.QtWidgets import ( from PyQt6.QtWidgets import (
QComboBox, QComboBox,
QFileDialog,
QHBoxLayout, QHBoxLayout,
QLabel, QLabel,
QLineEdit, QLineEdit,
QMessageBox, QMessageBox,
QPushButton, QPushButton,
QSpacerItem,
QVBoxLayout, QVBoxLayout,
QWidget, QWidget,
) )
@ -36,7 +39,9 @@ class Permission(enum.StrEnum):
}, },
) )
if not response.is_success: if not response.is_success:
QMessageBox.warning(None, "Error setting permissions", response.text) QMessageBox.warning(
None, "Error setting permissions", response.text
)
@staticmethod @staticmethod
def delete(folder_id: uuid.UUID, user_id: int, widget: FolderInfoWidget): def delete(folder_id: uuid.UUID, user_id: int, widget: FolderInfoWidget):
@ -48,7 +53,9 @@ class Permission(enum.StrEnum):
}, },
) )
if not response.is_success: if not response.is_success:
QMessageBox.warning(None, "Error deleting permissions", response.text) QMessageBox.warning(
None, "Error deleting permissions", response.text
)
else: else:
widget.redraw() widget.redraw()
@ -94,10 +101,14 @@ class FolderInfoWidget(QWidget):
combo = QComboBox() combo = QComboBox()
combo.addItems(map(str, Permission)) combo.addItems(map(str, Permission))
combo.setCurrentText(str(permissions)) combo.setCurrentText(str(permissions))
combo.currentTextChanged.connect(partial(self.change, user_id=user_id)) combo.currentTextChanged.connect(
partial(self.change, user_id=user_id)
)
delete = QPushButton("Delete") delete = QPushButton("Delete")
delete.clicked.connect( delete.clicked.connect(
partial(Permission.delete, self.folder.folder_id, user_id, self) partial(
Permission.delete, self.folder.folder_id, user_id, self
)
) )
layout.addWidget(name) layout.addWidget(name)
layout.addWidget(combo) layout.addWidget(combo)
@ -123,6 +134,25 @@ class FolderInfoWidget(QWidget):
layout.addWidget(button) layout.addWidget(button)
main_layout.addLayout(layout) main_layout.addLayout(layout)
main_layout.addSpacerItem(QSpacerItem(0, 50))
synced_to = sync.SyncData.get_for_folder(self.folder.folder_id)
if synced_to is not None:
synced_to_text = f"Synched to {synced_to.path}"
else:
synced_to_text = "Not synced"
layout = QHBoxLayout()
label = QLabel(synced_to_text)
layout.addWidget(label)
if synced_to is not None:
button = QPushButton("Delete")
button.clicked.connect(self.remove_sync)
else:
button = QPushButton("Add sync")
button.clicked.connect(self.add_sync)
layout.addWidget(button)
main_layout.addLayout(layout)
if self.layout(): if self.layout():
QWidget().setLayout(self.layout()) QWidget().setLayout(self.layout())
self.setLayout(main_layout) self.setLayout(main_layout)
@ -140,6 +170,19 @@ class FolderInfoWidget(QWidget):
def search_save(self): def search_save(self):
permission = Permission(self.perm_combo.currentText()) permission = Permission(self.perm_combo.currentText())
search = self.search_combo.currentText()
if not search:
return
user_id = self.user_mapping[self.search_combo.currentText()] user_id = self.user_mapping[self.search_combo.currentText()]
permission.set(self.folder.folder_id, user_id) permission.set(self.folder.folder_id, user_id)
self.redraw() self.redraw()
def add_sync(self):
path = QFileDialog.getExistingDirectory()
if path:
sync.SyncData.new_sync(path, self.folder.folder_id)
self.redraw()
def remove_sync(self):
sync.SyncData.delete(self.folder.folder_id)
self.redraw()

View File

@ -1,5 +1,3 @@
import uuid
import auth import auth
import file_widgets import file_widgets
import httpx import httpx

View File

@ -6,21 +6,22 @@ import uuid
from concurrent.futures import ThreadPoolExecutor from concurrent.futures import ThreadPoolExecutor
from time import ctime from time import ctime
import file_widgets from file import File
from folder import Folder
from request_client import RequestClient from request_client import RequestClient
from sqlmodel import Field, Session, SQLModel, create_engine, select from sqlmodel import Field, Session, SQLModel, create_engine, delete, select
class FolderStructure(file_widgets.Folder): class FolderStructure(Folder):
files: list[file_widgets.File] files: list[File]
folders: list[FolderStructure] folders: list[FolderStructure]
def find_folder(self, name: str) -> file_widgets.File | None: def find_folder(self, name: str) -> File | None:
for file in self.folders: for file in self.folders:
if file.folder_name == name: if file.folder_name == name:
return file return file
def find_file(self, name: str) -> file_widgets.File | None: def find_file(self, name: str) -> File | None:
for file in self.files: for file in self.files:
if file.file_name == name: if file.file_name == name:
return file return file
@ -36,9 +37,9 @@ class FolderStructure(file_widgets.Folder):
class SyncData(SQLModel, table=True): class SyncData(SQLModel, table=True):
id: int | None = Field(default=None, primary_key=True) id: int | None = Field(default=None, primary_key=True)
folder_id: uuid.UUID = Field(unique=True, index=True)
path: str path: str
last_updated: datetime.datetime last_updated: datetime.datetime
folder_id: uuid.UUID
@staticmethod @staticmethod
def sync_all(): def sync_all():
@ -56,9 +57,7 @@ class SyncData(SQLModel, table=True):
tuple(map_) tuple(map_)
@staticmethod @staticmethod
def new_sync(path: str, parent_id): def new_sync(path: str, folder_id: uuid.UUID):
name = os.path.basename(path)
folder_id = file_widgets.Folder.create(name, parent_id)
with Session(engine) as s: with Session(engine) as s:
sync = SyncData( sync = SyncData(
path=path, path=path,
@ -69,6 +68,19 @@ class SyncData(SQLModel, table=True):
s.commit() s.commit()
upload_folder(path, folder_id) upload_folder(path, folder_id)
@staticmethod
def get_for_folder(folder_id: uuid.UUID) -> SyncData | None:
with Session(engine) as s:
return s.exec(
select(SyncData).where(SyncData.folder_id == folder_id)
).one_or_none()
@staticmethod
def delete(folder_id: uuid.UUID):
with Session(engine) as s:
s.exec(delete(SyncData).where(SyncData.folder_id == folder_id))
s.commit()
sqlite_file_name = "database.db" sqlite_file_name = "database.db"
sqlite_url = f"sqlite:///{sqlite_file_name}" sqlite_url = f"sqlite:///{sqlite_file_name}"
@ -82,9 +94,9 @@ def upload_folder(path: str, folder_id: uuid.UUID):
for item in items: for item in items:
item_path = os.path.join(path, item) item_path = os.path.join(path, item)
if os.path.isfile(item_path): if os.path.isfile(item_path):
file_widgets.File.create(item_path, folder_id) File.create(item_path, folder_id)
elif os.path.isdir(item_path): elif os.path.isdir(item_path):
id = file_widgets.Folder.create(item, folder_id) id = Folder.create(item, folder_id)
upload_folder(item_path, id) upload_folder(item_path, id)
@ -98,7 +110,7 @@ def merge_folder(
if os.path.isfile(item_path): if os.path.isfile(item_path):
file = structure.find_file(item) file = structure.find_file(item)
if file is None: if file is None:
file_widgets.File.create(item_path, structure.folder_id) File.create(item_path, structure.folder_id)
continue continue
mtime = datetime.datetime.strptime( mtime = datetime.datetime.strptime(
ctime(os.path.getmtime(item_path)), "%a %b %d %H:%M:%S %Y" ctime(os.path.getmtime(item_path)), "%a %b %d %H:%M:%S %Y"
@ -111,7 +123,7 @@ def merge_folder(
elif os.path.isdir(item_path): elif os.path.isdir(item_path):
folder = structure.find_folder(item) folder = structure.find_folder(item)
if folder is None: if folder is None:
folder_id = file_widgets.Folder.create(item, structure.folder_id) folder_id = Folder.create(item, structure.folder_id)
upload_folder(item_path, folder_id) upload_folder(item_path, folder_id)
else: else:
merge_folder(item_path, folder, time) merge_folder(item_path, folder, time)