Files
xsl-validator/src/ui/threads.py
T

423 lines
17 KiB
Python
Raw Normal View History

"""
Thread-Klassen für asynchrone Operationen in DocuMentor.
Dieses Modul enthält alle QThread-Klassen, die für Hintergrundoperationen verwendet werden:
- XmlHashCalculatorThread: Berechnung von blake2b-Hashes für XML-Dateien
- XmlBatchProcessingThread: Batch-Verarbeitung von XML-Dateien
- TransformationThread: Ausführung von XSL-Transformationen
"""
import logging
import shutil
from pathlib import Path
from PySide6.QtCore import QThread, Signal
from conf import TreeNode, XslFile, XmlFile
from transform import TransformationJob
from utils import calculate_blake2b_hash
logger = logging.getLogger(__name__)
class XmlHashCalculatorThread(QThread):
"""
Thread für die asynchrone Berechnung von blake2b-Hash-Werten für XML-Dateien.
"""
# Signale für die Kommunikation mit dem Haupt-Thread
hash_calculated = Signal(object, str) # xml_file_object, hash_value
calculation_finished = Signal(int, int) # processed_count, total_count
error_occurred = Signal(str, str) # xml_file_path, error_message
def __init__(self, project_dir: Path, xml_files: list[XmlFile]):
"""
Initialisiert den Hash-Berechnungs-Thread.
Args:
project_dir: Pfad zum Projekt-Verzeichnis
xml_files: Liste der XmlFile-Objekte für die Hash-Berechnung
"""
super().__init__()
self.project_dir = project_dir
self.xml_files = xml_files
self.processed_count = 0
def run(self):
"""
Führt die Hash-Berechnung für alle XML-Dateien aus.
"""
logger.info(f"Starte Hash-Berechnung für {len(self.xml_files)} XML-Dateien")
for xml_file in self.xml_files:
try:
# Prüfe ob hashsum bereits vorhanden ist
if xml_file.hashsum:
logger.debug(f"Hash bereits vorhanden für {xml_file.xml}: {xml_file.hashsum}")
self.processed_count += 1
continue
# Berechne Hash für die XML-Datei
xml_file_path = self.project_dir / xml_file.xml
hash_value = self._calculate_blake2b_hash(xml_file_path)
if hash_value:
# Sende Signal mit berechnetem Hash
self.hash_calculated.emit(xml_file, hash_value)
logger.debug(f"Hash berechnet für {xml_file.xml}: {hash_value}")
self.processed_count += 1
except Exception as e:
error_msg = f"Fehler bei Hash-Berechnung für {xml_file.xml}: {str(e)}"
logger.error(error_msg)
self.error_occurred.emit(str(xml_file.xml), error_msg)
self.processed_count += 1
# Sende Abschluss-Signal
self.calculation_finished.emit(self.processed_count, len(self.xml_files))
logger.info(f"Hash-Berechnung abgeschlossen: {self.processed_count}/{len(self.xml_files)} verarbeitet")
def _calculate_blake2b_hash(self, file_path: Path) -> str | None:
"""Berechnet den blake2b-Hash einer XML-Datei."""
return calculate_blake2b_hash(file_path)
class XmlBatchProcessingThread(QThread):
"""
Thread für die asynchrone Batch-Verarbeitung von mehreren XML-Dateien.
Verarbeitet XML-Dateien mit Hash-Berechnung, Duplikatserkennung und Dateikopieren.
"""
# Signale für die Kommunikation mit dem Haupt-Thread
progress_update = Signal(int, int, str) # current, total, current_file_name
file_processed = Signal(dict) # result dictionary
processing_finished = Signal(dict) # stats dictionary
error_occurred = Signal(str) # error_message
def __init__(self, xml_files: list, selected_xsl_nodes: list, project_dir: Path, pdf_project):
"""
Initialisiert den Batch-Verarbeitungs-Thread.
Args:
xml_files: Liste von Pfaden zu XML-Dateien
selected_xsl_nodes: Liste der ausgewählten XSL-Knoten
project_dir: Pfad zum Projekt-Verzeichnis
pdf_project: ProjectData-Objekt
"""
super().__init__()
self.xml_files = xml_files
self.selected_xsl_nodes = selected_xsl_nodes
self.project_dir = project_dir
self.pdf_project = pdf_project
# Statistiken
self.stats = {
"total": len(xml_files),
"processed": 0,
"new_added": 0,
"existing_added": 0,
"already_assigned": 0,
"cancelled": 0,
"errors": 0,
"error_messages": [],
"renamed_files": [],
}
def run(self):
"""
Führt die Batch-Verarbeitung aller XML-Dateien aus.
"""
logger.info(f"Starte Batch-Verarbeitung für {len(self.xml_files)} XML-Dateien")
for i, xml_file_path in enumerate(self.xml_files):
try:
# Sende Progress-Update
self.progress_update.emit(i + 1, len(self.xml_files), xml_file_path.name)
# Prüfe ob die Datei existiert
if not xml_file_path.exists():
self.stats["errors"] += 1
self.stats["error_messages"].append(f"{xml_file_path.name}: Datei existiert nicht")
continue
# Verarbeite die XML-Datei
result = self._process_xml_file(xml_file_path)
# Aktualisiere Statistiken
self._update_stats(result)
# Sende Ergebnis
self.file_processed.emit(result)
except Exception as e:
error_msg = f"Fehler bei {xml_file_path.name}: {str(e)}"
logger.error(error_msg)
self.stats["errors"] += 1
self.stats["error_messages"].append(error_msg)
# Sende Abschluss-Signal mit Statistiken
self.processing_finished.emit(self.stats)
logger.info(f"Batch-Verarbeitung abgeschlossen: {self.stats['processed']}/{self.stats['total']} verarbeitet")
def _process_xml_file(self, xml_file_path: Path) -> dict:
"""
Verarbeitet eine einzelne XML-Datei.
Args:
xml_file_path: Pfad zur XML-Datei
Returns:
dict: Ergebnis-Dictionary mit Status
"""
try:
# 1. Hash berechnen
file_hash = self._calculate_hash_for_file(xml_file_path)
if not file_hash:
logger.warning(f"Hash-Berechnung für {xml_file_path} fehlgeschlagen")
# 2. Prüfe auf Hash-Duplikat
existing_xml = self._find_xml_file_by_hash(file_hash) if file_hash else None
if existing_xml:
# Hash-Match: Ordne vorhandene Datei zu
return self._assign_existing_xml_to_nodes(existing_xml)
else:
# Keine Duplikate: Verarbeite als neue Datei
return self._process_new_xml_file(xml_file_path, file_hash)
except Exception as e:
return {"status": "error", "error_msg": str(e)}
def _calculate_hash_for_file(self, file_path: Path) -> str | None:
"""Berechnet blake2b Hash für eine Datei."""
return calculate_blake2b_hash(file_path)
def _find_xml_file_by_hash(self, hash_value: str) -> XmlFile | None:
"""Sucht eine XML-Datei anhand ihres Hash-Werts."""
if not hash_value or not self.pdf_project.nodes:
return None
def search_recursive(nodes):
for node in nodes:
if isinstance(node, XslFile) and node.xmls:
for xml_file in node.xmls:
if xml_file.hashsum == hash_value:
return xml_file
elif isinstance(node, TreeNode) and node.children:
found = search_recursive(node.children)
if found:
return found
return None
return search_recursive(self.pdf_project.nodes)
def _assign_existing_xml_to_nodes(self, existing_xml: XmlFile) -> dict:
"""Ordnet eine vorhandene XML-Datei den Knoten zu."""
try:
added_count = 0
for xsl_node in self.selected_xsl_nodes:
already_assigned = any(xml_file.xml == existing_xml.xml for xml_file in xsl_node.xmls)
if not already_assigned:
new_xml_ref = XmlFile(xml=existing_xml.xml, hashsum=existing_xml.hashsum)
xsl_node.xmls.append(new_xml_ref)
added_count += 1
if added_count > 0:
return {
"status": "existing_added",
"added_count": added_count,
"existing_file": existing_xml.xml.name,
}
else:
return {"status": "already_assigned", "added_count": 0, "existing_file": existing_xml.xml.name}
except Exception as e:
return {"status": "error", "error_msg": str(e)}
def _process_new_xml_file(self, xml_file_path: Path, file_hash: str | None) -> dict:
"""Verarbeitet eine neue XML-Datei."""
try:
# Erstelle xml-Ordner
xml_dir = self.project_dir / "xml"
xml_dir.mkdir(parents=True, exist_ok=True)
# Bestimme Ziel-Pfad
target_xml_path = xml_dir / xml_file_path.name
# Prüfe auf Namenskonflikte und generiere ggf. alternativen Namen
original_name = xml_file_path.name
counter = 1
while target_xml_path.exists() or self._is_filename_used_in_project(Path("xml") / target_xml_path.name):
# Generiere alternativen Namen
stem = xml_file_path.stem
suffix = xml_file_path.suffix
target_xml_path = xml_dir / f"{stem}_{counter}{suffix}"
counter += 1
# Sicherheit: Maximal 1000 Versuche
if counter > 1000:
return {"status": "error", "error_msg": "Konnte keinen eindeutigen Dateinamen finden"}
# Kopiere Datei
shutil.copy2(xml_file_path, target_xml_path)
# Erstelle relatives Path
relative_xml_path = Path("xml") / target_xml_path.name
# Füge zu XSL-Knoten hinzu
added_count = 0
for xsl_node in self.selected_xsl_nodes:
existing_xml = any(xml_file.xml == relative_xml_path for xml_file in xsl_node.xmls)
if not existing_xml:
new_xml_file = XmlFile(xml=relative_xml_path, hashsum=file_hash)
xsl_node.xmls.append(new_xml_file)
added_count += 1
if added_count > 0:
return {
"status": "new_added",
"added_count": added_count,
"new_file": target_xml_path.name,
"renamed_from": original_name if target_xml_path.name != original_name else None,
}
else:
return {"status": "already_assigned", "added_count": 0, "new_file": target_xml_path.name}
except Exception as e:
return {"status": "error", "error_msg": str(e)}
def _is_filename_used_in_project(self, filename: Path) -> bool:
"""Prüft ob ein Dateiname bereits im Projekt verwendet wird."""
if not self.pdf_project.nodes:
return False
def search_recursive(nodes):
for node in nodes:
if isinstance(node, XslFile) and node.xmls:
for xml_file in node.xmls:
if xml_file.xml == filename:
return True
elif isinstance(node, TreeNode) and node.children:
if search_recursive(node.children):
return True
return False
return search_recursive(self.pdf_project.nodes)
def _update_stats(self, result: dict):
"""Aktualisiert die Statistiken."""
self.stats["processed"] += 1
status = result.get("status")
if status == "new_added":
self.stats["new_added"] += 1
if result.get("renamed_from"):
self.stats["renamed_files"].append(f"{result['renamed_from']}{result['new_file']}")
elif status == "existing_added":
self.stats["existing_added"] += 1
elif status == "already_assigned":
self.stats["already_assigned"] += 1
elif status == "error":
self.stats["errors"] += 1
self.stats["error_messages"].append(result.get("error_msg", "Unbekannter Fehler"))
class TransformationThread(QThread):
"""
Thread für die asynchrone Ausführung von Transformations-Jobs.
"""
# Signale für die Kommunikation mit dem Haupt-Thread
job_started = Signal(str, str) # xml_file_name, xsl_id_str
job_finished = Signal(dict) # result_dict
job_error = Signal(str, str, str) # xml_file_name, xsl_id_str, error_message
all_jobs_finished = Signal(int, int, float) # successful_count, total_count, total_duration
def __init__(self, jobs: list[TransformationJob], force: bool = False, max_workers: int = 8):
"""
Initialisiert den Transformations-Thread.
Args:
jobs: Liste der TransformationJob-Objekte
force: Wenn True, werden alle Jobs ausgeführt (ignoriert Up-to-Date)
max_workers: Maximale Anzahl paralleler Worker (Standard: 8)
"""
super().__init__()
self.jobs = jobs
self.force = force
self.max_workers = max_workers
self.successful_count = 0
def _process_single_job(self, job: TransformationJob) -> dict:
"""
Verarbeitet einen einzelnen Transformations-Job (Thread-safe).
Args:
job: Der zu verarbeitende TransformationJob
Returns:
dict: Ergebnis-Dictionary des Jobs
"""
try:
# Sende Start-Signal mit XSL-ID
xsl_id_str = "_".join(str(x) for x in job.xsl_id) if job.xsl_id else ""
self.job_started.emit(str(job.xml_file), xsl_id_str)
# Führe Transformations-Pipeline aus
result = job.run_full_pipeline(force=self.force)
# Sende Abschluss-Signal
self.job_finished.emit(result)
return result
except Exception as e:
error_msg = f"Unerwarteter Fehler bei Transformation: {str(e)}"
logger.error(error_msg)
xsl_id_str = "_".join(str(x) for x in job.xsl_id) if job.xsl_id else ""
self.job_error.emit(str(job.xml_file), xsl_id_str, error_msg)
return {"success": False, "error": error_msg}
def run(self):
"""
Führt alle Transformations-Jobs parallel aus mit ThreadPoolExecutor.
"""
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime
import threading
start_time = datetime.now()
logger.info(f"Starte parallele Transformation von {len(self.jobs)} Jobs mit {self.max_workers} Workern")
# Thread-sicherer Counter
successful_lock = threading.Lock()
# Verwende ThreadPoolExecutor für parallele Verarbeitung
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
# Starte alle Jobs
future_to_job = {executor.submit(self._process_single_job, job): job for job in self.jobs}
# Warte auf Abschluss und sammle Ergebnisse
for future in as_completed(future_to_job):
try:
result = future.result()
if result.get("success", False):
with successful_lock:
self.successful_count += 1
except Exception as e:
logger.error(f"Fehler beim Verarbeiten des Future: {e}")
# Berechne Gesamtdauer
total_duration = (datetime.now() - start_time).total_seconds()
# Sende Abschluss-Signal für alle Jobs mit Gesamtdauer
self.all_jobs_finished.emit(self.successful_count, len(self.jobs), total_duration)
logger.info(
f"Transformation abgeschlossen: {self.successful_count}/{len(self.jobs)} erfolgreich ({total_duration:.2f}s) "
f"[{len(self.jobs) / total_duration:.2f} Jobs/s mit {self.max_workers} Workern]"
)