Files
xsl-validator/src/ui/mixins/database.py
T

352 lines
14 KiB
Python
Raw Normal View History

"""
DatabaseMixin - Mixin für Datenbank-Operationen.
Dieses Mixin enthält alle Methoden zur PostgreSQL-Datenbankanbindung
und Datenverarbeitung für das MainWindow.
"""
import sys
import time
import logging
from pathlib import Path
import polars as pl
from PySide6.QtCore import QThread, Signal, Qt
from PySide6.QtWidgets import QMessageBox, QProgressDialog
from conf import app_settings, TreeNode, XslFile
logger = logging.getLogger(__name__)
class DatabaseQueryThread(QThread):
"""Thread für asynchrone Datenbankabfragen."""
query_completed = Signal(object) # pl.DataFrame
query_failed = Signal(str) # Fehlermeldung
def __init__(self, sql_query, connection_string):
super().__init__()
self.sql_query = sql_query
self.connection_string = connection_string
def run(self):
try:
df = pl.read_database_uri(self.sql_query, self.connection_string, engine="connectorx").sort(
["reporttyp_bez", "report_bez", "repfile_bez"]
)
self.query_completed.emit(df)
except Exception as e:
self.query_failed.emit(str(e))
class DatabaseMixin:
"""
Mixin für Datenbank-Operationen.
Dieses Mixin wird von MainWindow verwendet und erwartet folgende Attribute:
- self.project: Das aktuelle Projekt
- self.pdf_project: Die Projekt-Daten (ProjectData)
- self._load_nodes_to_tree(): Methode zum Laden der Nodes in den Tree
- self._save_project_settings(): Methode zum Speichern der Projekt-Einstellungen
"""
def on_load_from_fn2_clicked(self):
"""
Wird ausgeführt, wenn der Button "lade aus FN2" geklickt wird.
Startet SQL-Abfrage asynchron mit Fortschrittsdialog.
"""
logger.debug("Button 'lade aus FN2' wurde geklickt!")
try:
# Prüfe ob ein Projekt geladen ist
if not hasattr(self, "project") or not self.project:
QMessageBox.warning(self, "Warnung", "Kein Projekt geladen. Bitte öffnen Sie zuerst ein Projekt.")
return
# Hole das aktuelle Projekt aus app_settings
if not self.project:
QMessageBox.warning(self, "Warnung", "Aktuelles Projekt nicht in den Einstellungen gefunden.")
return
# Hole die PostgreSQL-Datenbank-Konfiguration
db_config = self._get_database_config(self.project.postgre_sql_db_id)
if not db_config:
QMessageBox.warning(self, "Warnung", "PostgreSQL-Datenbank-Konfiguration nicht gefunden.")
return
# SQL-Abfrage und Connection-String vorbereiten
sql_query, connection_string = self._prepare_sql_query(db_config)
if sql_query is None:
return # Fehler bereits angezeigt
# Fortschrittsdialog erstellen
self._db_progress = QProgressDialog("Verbinde mit Datenbank...", "Abbrechen", 0, 0, self)
self._db_progress.setWindowTitle("Datenbank-Abfrage")
self._db_progress.setWindowModality(Qt.WindowModal)
self._db_progress.setMinimumDuration(0)
# Query-Thread erstellen und starten
self._db_query_thread = DatabaseQueryThread(sql_query, connection_string)
self._db_query_thread.query_completed.connect(self._on_db_query_completed)
self._db_query_thread.query_failed.connect(self._on_db_query_failed)
self._db_query_thread.finished.connect(self._cleanup_db_query)
self._db_progress.canceled.connect(self._on_db_query_canceled)
self._db_query_thread.start()
except Exception as e:
logger.error(f"Fehler beim Laden aus FN2: {e}")
QMessageBox.critical(self, "Fehler", f"Fehler beim Laden aus FN2:\n{str(e)}")
def _on_db_query_completed(self, df):
"""Wird aufgerufen, wenn die Datenbankabfrage erfolgreich war."""
# Ignoriere Ergebnis, falls der Benutzer abgebrochen hat
if hasattr(self, "_db_progress") and self._db_progress.wasCanceled():
return
try:
new_nodes = self._process_sql_data(df)
self._merge_nodes_with_existing(new_nodes)
self._save_project_settings()
self._load_nodes_to_tree()
except Exception as e:
logger.error(f"Fehler beim Verarbeiten der DB-Daten: {e}")
QMessageBox.critical(self, "Fehler", f"Fehler beim Verarbeiten der Daten:\n{str(e)}")
def _on_db_query_failed(self, error_msg):
"""Wird aufgerufen, wenn die Datenbankabfrage fehlgeschlagen ist."""
if hasattr(self, "_db_progress") and self._db_progress.wasCanceled():
return
error = f"Fehler beim Ausführen der SQL-Abfrage: {error_msg}"
logger.error(error)
QMessageBox.critical(self, "Fehler", error)
def _on_db_query_canceled(self):
"""Wird aufgerufen, wenn der Benutzer die Abfrage abbricht."""
logger.info("Datenbankabfrage vom Benutzer abgebrochen")
def _cleanup_db_query(self):
"""Räumt nach der Datenbankabfrage auf."""
if hasattr(self, "_db_progress"):
self._db_progress.canceled.disconnect(self._on_db_query_canceled)
self._db_progress.close()
self._db_progress.deleteLater()
del self._db_progress
if hasattr(self, "_db_query_thread"):
self._db_query_thread.deleteLater()
del self._db_query_thread
def _get_database_config(self, db_id):
"""
Holt die Datenbank-Konfiguration anhand der ID.
Args:
db_id: ID der PostgreSQL-Datenbank
Returns:
PostgreSqlDb|None: Die Datenbank-Konfiguration oder None
"""
for db in app_settings.postgresql_dbs:
if db.id == db_id:
return db
return None
def _prepare_sql_query(self, db_config):
"""
Bereitet SQL-Abfrage und Connection-String vor.
Args:
db_config: PostgreSQL-Datenbank-Konfiguration
Returns:
tuple[str, str]|tuple[None, None]: (sql_query, connection_string) oder (None, None) bei Fehler
"""
try:
# PyInstaller entpackt Ressourcen nach sys._MEIPASS;
# im Entwicklungsmodus liegt die Datei relativ zum Repo-Root
if hasattr(sys, "_MEIPASS"):
sql_file_path = Path(sys._MEIPASS) / "res" / "data.sql" # type: ignore[attr-defined]
else:
sql_file_path = Path(__file__).parents[3] / "src" / "res" / "data.sql"
if not sql_file_path.exists():
QMessageBox.critical(self, "Fehler", f"SQL-Datei nicht gefunden: {sql_file_path}")
return None, None
with open(sql_file_path, "r", encoding="utf-8") as f:
sql_query = f.read()
logger.debug(f"SQL-Abfrage geladen: {len(sql_query)} Zeichen")
connection_string = (
"postgresql://"
f"{db_config.username}:"
f"{db_config.password}@"
f"{db_config.host}:"
f"{db_config.port}/"
f"{db_config.database}?"
f"sslmode={db_config.ssl_mode.value}"
f"&connect_timeout={db_config.timeout}"
)
logger.info(f"Verbinde zu PostgreSQL: {db_config.host}:{db_config.port}/{db_config.database}")
return sql_query, connection_string
except Exception as e:
error_msg = f"Fehler beim Vorbereiten der SQL-Abfrage: {str(e)}"
logger.error(error_msg)
QMessageBox.critical(self, "Fehler", error_msg)
return None, None
def _process_sql_data(self, df):
"""
Verarbeitet die SQL-Daten wie in readCsv.py und erstellt Node-Struktur.
Args:
df: Polars DataFrame mit den SQL-Ergebnissen
Returns:
list[TreeNode]: Liste der erstellten Root-Nodes
"""
try:
start_time = time.time()
# Gruppiere die Daten wie in readCsv.py
ebene_1 = df.group_by(["reporttyp", "reporttyp_bez"]).len()
ebene_2 = df.group_by(["reporttyp", "report", "report_bez"]).len()
ebene_3 = df.group_by(["reporttyp", "report", "repfile", "repfile_bez", "xsl_datei"]).len()
group_time = time.time() - start_time
logger.debug(f"Performance: Gruppierung in {group_time:.3f}s")
new_nodes = []
start_time = time.time()
# Erstelle Node-Struktur wie in readCsv.py
for r1 in ebene_1.rows(named=True):
tn_1 = TreeNode(id=(r1["reporttyp"],), bez=r1["reporttyp_bez"], children=[])
r1_children = ebene_2.filter(pl.col("reporttyp") == r1["reporttyp"])
for r2 in r1_children.rows(named=True):
tn_2 = TreeNode(id=(r2["reporttyp"], r2["report"]), bez=r2["report_bez"], children=[])
r2_children = ebene_3.filter(
(pl.col("reporttyp") == r1["reporttyp"]) & (pl.col("report") == r2["report"])
)
for r3 in r2_children.rows(named=True):
x = XslFile(
id=(r3["reporttyp"], r3["report"], r3["repfile"]),
bez=r3["repfile_bez"],
xsl_file=Path(r3["xsl_datei"]),
xmls=[],
)
tn_2.children.append(x)
tn_1.children.append(tn_2)
new_nodes.append(tn_1)
nodes_time = time.time() - start_time
logger.debug(f"Performance: Node-Erstellung in {nodes_time:.3f}s")
logger.info(f"Erstellt: {len(new_nodes)} Root-Nodes")
return new_nodes
except Exception as e:
logger.error(f"Fehler beim Verarbeiten der SQL-Daten: {e}")
raise
def _merge_nodes_with_existing(self, new_nodes):
"""
Merged neue Nodes mit vorhandenen Nodes basierend auf IDs.
Überschreibt nur einzelne Eigenschaften, nicht ganze Nodes.
Args:
new_nodes: Liste der neuen Nodes
"""
try:
logger.info("Merge neue Nodes mit vorhandenen...")
# Erstelle ein Dictionary der neuen Nodes für schnellen Zugriff
new_nodes_dict = {}
self._build_nodes_dict(new_nodes, new_nodes_dict)
logger.debug(f"Neue Nodes Dictionary erstellt: {len(new_nodes_dict)} Einträge")
# Merge mit vorhandenen Nodes
if self.pdf_project and self.pdf_project.nodes:
self._merge_nodes_recursive(self.pdf_project.nodes, new_nodes_dict)
# Füge komplett neue Root-Nodes hinzu
if self.pdf_project and self.pdf_project.nodes:
existing_root_ids = {node.id for node in self.pdf_project.nodes}
for new_node in new_nodes:
if new_node.id not in existing_root_ids:
self.pdf_project.nodes.append(new_node)
logger.info(f"Neue Root-Node hinzugefügt: {new_node.bez}")
elif self.pdf_project:
# Wenn keine Nodes vorhanden sind, füge alle neuen Nodes hinzu
self.pdf_project.nodes = new_nodes
logger.info(f"Alle {len(new_nodes)} Root-Nodes hinzugefügt (keine vorhandenen Nodes)")
logger.info("Merge abgeschlossen")
except Exception as e:
logger.error(f"Fehler beim Mergen der Nodes: {e}")
raise
def _build_nodes_dict(self, nodes, nodes_dict):
"""
Erstellt rekursiv ein Dictionary aller Nodes für schnellen ID-basierten Zugriff.
Args:
nodes: Liste der Nodes
nodes_dict: Dictionary zum Füllen
"""
for node in nodes:
nodes_dict[node.id] = node
if isinstance(node, TreeNode) and node.children:
self._build_nodes_dict(node.children, nodes_dict)
def _merge_nodes_recursive(self, existing_nodes, new_nodes_dict):
"""
Merged rekursiv vorhandene Nodes mit neuen Nodes.
Args:
existing_nodes: Liste der vorhandenen Nodes
new_nodes_dict: Dictionary der neuen Nodes
"""
for existing_node in existing_nodes:
if existing_node.id in new_nodes_dict:
new_node = new_nodes_dict[existing_node.id]
# Aktualisiere nur die Bezeichnung, falls sie sich geändert hat
if existing_node.bez != new_node.bez:
logger.info(
f"Aktualisiere Bezeichnung für Node {existing_node.id}: '{existing_node.bez}' -> '{new_node.bez}'"
)
existing_node.bez = new_node.bez
# Für XslFile: Aktualisiere xsl_file Pfad
if isinstance(existing_node, XslFile) and isinstance(new_node, XslFile):
if existing_node.xsl_file != new_node.xsl_file:
logger.info(
f"Aktualisiere XSL-Datei für Node {existing_node.id}: '{existing_node.xsl_file}' -> '{new_node.xsl_file}'"
)
existing_node.xsl_file = new_node.xsl_file
# Rekursiv für Knoten (nur bei TreeNode)
if isinstance(existing_node, TreeNode) and existing_node.children:
self._merge_nodes_recursive(existing_node.children, new_nodes_dict)
# Füge neue Knoten hinzu, die noch nicht existieren
if existing_node.id in new_nodes_dict:
new_node = new_nodes_dict[existing_node.id]
if isinstance(new_node, TreeNode) and new_node.children:
existing_child_ids = {child.id for child in existing_node.children}
for new_child in new_node.children:
if new_child.id not in existing_child_ids:
existing_node.children.append(new_child)
logger.info(f"Neues Kind hinzugefügt zu Node {existing_node.id}: {new_child.bez}")