Files
xsl-validator/src/ui/mixins/database.py
T

289 lines
11 KiB
Python
Raw Normal View History

"""
DatabaseMixin - Mixin für Datenbank-Operationen.
Dieses Mixin enthält alle Methoden zur PostgreSQL-Datenbankanbindung
und Datenverarbeitung für das MainWindow.
"""
import time
import logging
from pathlib import Path
import polars as pl
from PySide6.QtWidgets import QMessageBox
from conf import app_settings, TreeNode, XslFile
logger = logging.getLogger(__name__)
class DatabaseMixin:
"""
Mixin für Datenbank-Operationen.
Dieses Mixin wird von MainWindow verwendet und erwartet folgende Attribute:
- self.project: Das aktuelle Projekt
- self.pdf_project: Die Projekt-Daten (ProjectData)
- self._load_nodes_to_tree(): Methode zum Laden der Nodes in den Tree
- self._save_project_settings(): Methode zum Speichern der Projekt-Einstellungen
"""
def on_load_from_fn2_clicked(self):
"""
Wird ausgeführt, wenn der Button "lade aus FN2" geklickt wird.
Führt SQL-Abfrage aus und aktualisiert die Projekt-Nodes.
"""
logger.debug("Button 'lade aus FN2' wurde geklickt!")
try:
# Prüfe ob ein Projekt geladen ist
if not hasattr(self, "project") or not self.project:
QMessageBox.warning(self, "Warnung", "Kein Projekt geladen. Bitte öffnen Sie zuerst ein Projekt.")
return
# Hole das aktuelle Projekt aus app_settings
if not self.project:
QMessageBox.warning(self, "Warnung", "Aktuelles Projekt nicht in den Einstellungen gefunden.")
return
# Hole die PostgreSQL-Datenbank-Konfiguration
db_config = self._get_database_config(self.project.postgre_sql_db_id)
if not db_config:
QMessageBox.warning(self, "Warnung", "PostgreSQL-Datenbank-Konfiguration nicht gefunden.")
return
# Führe SQL-Abfrage aus
df = self._execute_sql_query(db_config)
if df is None:
return # Fehler bereits angezeigt
# Verarbeite die Daten wie in readCsv.py
new_nodes = self._process_sql_data(df)
# Merge mit vorhandenen Nodes
self._merge_nodes_with_existing(new_nodes)
# Speichere die aktualisierten Projekt-Einstellungen
self._save_project_settings()
# Lade das Projekt neu
self._load_nodes_to_tree()
# QMessageBox.information(self, "Erfolg", "Daten erfolgreich aus FN2 geladen und Projekt aktualisiert!")
except Exception as e:
logger.error(f"Fehler beim Laden aus FN2: {e}")
QMessageBox.critical(self, "Fehler", f"Fehler beim Laden aus FN2:\n{str(e)}")
def _get_database_config(self, db_id):
"""
Holt die Datenbank-Konfiguration anhand der ID.
Args:
db_id: ID der PostgreSQL-Datenbank
Returns:
PostgreSqlDb|None: Die Datenbank-Konfiguration oder None
"""
for db in app_settings.postgresql_dbs:
if db.id == db_id:
return db
return None
def _execute_sql_query(self, db_config):
"""
Führt die SQL-Abfrage aus der data.sql Datei aus.
Args:
db_config: PostgreSQL-Datenbank-Konfiguration
Returns:
pl.DataFrame|None: Die Abfrageergebnisse oder None bei Fehler
"""
try:
# Lade SQL-Abfrage aus Datei
sql_file_path = Path("src/res/data.sql")
if not sql_file_path.exists():
QMessageBox.critical(self, "Fehler", f"SQL-Datei nicht gefunden: {sql_file_path}")
return None
with open(sql_file_path, "r", encoding="utf-8") as f:
sql_query = f.read()
logger.debug(f"SQL-Abfrage geladen: {len(sql_query)} Zeichen")
# Verbindung zur PostgreSQL-Datenbank herstellen
connection_string = (
"postgresql://"
f"{db_config.username}:"
f"{db_config.password}@"
f"{db_config.host}:"
f"{db_config.port}/"
f"{db_config.database}?"
f"sslmode={db_config.ssl_mode.value}"
)
logger.info(f"Verbinde zu PostgreSQL: {db_config.host}:{db_config.port}/{db_config.database}")
df = pl.read_database_uri(sql_query, connection_string, engine="connectorx").sort(
["reporttyp_bez", "report_bez", "repfile_bez"]
)
return df
except Exception as e:
error_msg = f"Fehler beim Ausführen der SQL-Abfrage: {str(e)}"
logger.error(error_msg)
QMessageBox.critical(self, "Fehler", error_msg)
return None
def _process_sql_data(self, df):
"""
Verarbeitet die SQL-Daten wie in readCsv.py und erstellt Node-Struktur.
Args:
df: Polars DataFrame mit den SQL-Ergebnissen
Returns:
list[TreeNode]: Liste der erstellten Root-Nodes
"""
try:
start_time = time.time()
# Gruppiere die Daten wie in readCsv.py
ebene_1 = df.group_by(["reporttyp", "reporttyp_bez"]).len()
ebene_2 = df.group_by(["reporttyp", "report", "report_bez"]).len()
ebene_3 = df.group_by(["reporttyp", "report", "repfile", "repfile_bez", "xsl_datei"]).len()
group_time = time.time() - start_time
logger.debug(f"Performance: Gruppierung in {group_time:.3f}s")
new_nodes = []
start_time = time.time()
# Erstelle Node-Struktur wie in readCsv.py
for r1 in ebene_1.rows(named=True):
tn_1 = TreeNode(id=(r1["reporttyp"],), bez=r1["reporttyp_bez"], children=[])
r1_children = ebene_2.filter(pl.col("reporttyp") == r1["reporttyp"])
for r2 in r1_children.rows(named=True):
tn_2 = TreeNode(id=(r2["reporttyp"], r2["report"]), bez=r2["report_bez"], children=[])
r2_children = ebene_3.filter(
(pl.col("reporttyp") == r1["reporttyp"]) & (pl.col("report") == r2["report"])
)
for r3 in r2_children.rows(named=True):
x = XslFile(
id=(r3["reporttyp"], r3["report"], r3["repfile"]),
bez=r3["repfile_bez"],
xsl_file=Path(r3["xsl_datei"]),
xmls=[],
)
tn_2.children.append(x)
tn_1.children.append(tn_2)
new_nodes.append(tn_1)
nodes_time = time.time() - start_time
logger.debug(f"Performance: Node-Erstellung in {nodes_time:.3f}s")
logger.info(f"Erstellt: {len(new_nodes)} Root-Nodes")
return new_nodes
except Exception as e:
logger.error(f"Fehler beim Verarbeiten der SQL-Daten: {e}")
raise
def _merge_nodes_with_existing(self, new_nodes):
"""
Merged neue Nodes mit vorhandenen Nodes basierend auf IDs.
Überschreibt nur einzelne Eigenschaften, nicht ganze Nodes.
Args:
new_nodes: Liste der neuen Nodes
"""
try:
logger.info("Merge neue Nodes mit vorhandenen...")
# Erstelle ein Dictionary der neuen Nodes für schnellen Zugriff
new_nodes_dict = {}
self._build_nodes_dict(new_nodes, new_nodes_dict)
logger.debug(f"Neue Nodes Dictionary erstellt: {len(new_nodes_dict)} Einträge")
# Merge mit vorhandenen Nodes
if self.pdf_project and self.pdf_project.nodes:
self._merge_nodes_recursive(self.pdf_project.nodes, new_nodes_dict)
# Füge komplett neue Root-Nodes hinzu
if self.pdf_project and self.pdf_project.nodes:
existing_root_ids = {node.id for node in self.pdf_project.nodes}
for new_node in new_nodes:
if new_node.id not in existing_root_ids:
self.pdf_project.nodes.append(new_node)
logger.info(f"Neue Root-Node hinzugefügt: {new_node.bez}")
elif self.pdf_project:
# Wenn keine Nodes vorhanden sind, füge alle neuen Nodes hinzu
self.pdf_project.nodes = new_nodes
logger.info(f"Alle {len(new_nodes)} Root-Nodes hinzugefügt (keine vorhandenen Nodes)")
logger.info("Merge abgeschlossen")
except Exception as e:
logger.error(f"Fehler beim Mergen der Nodes: {e}")
raise
def _build_nodes_dict(self, nodes, nodes_dict):
"""
Erstellt rekursiv ein Dictionary aller Nodes für schnellen ID-basierten Zugriff.
Args:
nodes: Liste der Nodes
nodes_dict: Dictionary zum Füllen
"""
for node in nodes:
nodes_dict[node.id] = node
if isinstance(node, TreeNode) and node.children:
self._build_nodes_dict(node.children, nodes_dict)
def _merge_nodes_recursive(self, existing_nodes, new_nodes_dict):
"""
Merged rekursiv vorhandene Nodes mit neuen Nodes.
Args:
existing_nodes: Liste der vorhandenen Nodes
new_nodes_dict: Dictionary der neuen Nodes
"""
for existing_node in existing_nodes:
if existing_node.id in new_nodes_dict:
new_node = new_nodes_dict[existing_node.id]
# Aktualisiere nur die Bezeichnung, falls sie sich geändert hat
if existing_node.bez != new_node.bez:
logger.info(
f"Aktualisiere Bezeichnung für Node {existing_node.id}: '{existing_node.bez}' -> '{new_node.bez}'"
)
existing_node.bez = new_node.bez
# Für XslFile: Aktualisiere xsl_file Pfad
if isinstance(existing_node, XslFile) and isinstance(new_node, XslFile):
if existing_node.xsl_file != new_node.xsl_file:
logger.info(
f"Aktualisiere XSL-Datei für Node {existing_node.id}: '{existing_node.xsl_file}' -> '{new_node.xsl_file}'"
)
existing_node.xsl_file = new_node.xsl_file
# Rekursiv für Knoten (nur bei TreeNode)
if isinstance(existing_node, TreeNode) and existing_node.children:
self._merge_nodes_recursive(existing_node.children, new_nodes_dict)
# Füge neue Knoten hinzu, die noch nicht existieren
if existing_node.id in new_nodes_dict:
new_node = new_nodes_dict[existing_node.id]
if isinstance(new_node, TreeNode) and new_node.children:
existing_child_ids = {child.id for child in existing_node.children}
for new_child in new_node.children:
if new_child.id not in existing_child_ids:
existing_node.children.append(new_child)
logger.info(f"Neues Kind hinzugefügt zu Node {existing_node.id}: {new_child.bez}")