462 lines
18 KiB
Python
462 lines
18 KiB
Python
|
|
"""
|
||
|
|
Thread-Klassen für asynchrone Operationen in DocuMentor.
|
||
|
|
|
||
|
|
Dieses Modul enthält alle QThread-Klassen, die für Hintergrundoperationen verwendet werden:
|
||
|
|
- XmlHashCalculatorThread: Berechnung von blake2b-Hashes für XML-Dateien
|
||
|
|
- XmlBatchProcessingThread: Batch-Verarbeitung von XML-Dateien
|
||
|
|
- TransformationThread: Ausführung von XSL-Transformationen
|
||
|
|
"""
|
||
|
|
|
||
|
|
import hashlib
|
||
|
|
import logging
|
||
|
|
import shutil
|
||
|
|
from pathlib import Path
|
||
|
|
from typing import List
|
||
|
|
|
||
|
|
from PySide6.QtCore import QThread, Signal
|
||
|
|
|
||
|
|
from conf import TreeNode, XslFile, XmlFile
|
||
|
|
from transform import TransformationJob
|
||
|
|
|
||
|
|
|
||
|
|
logger = logging.getLogger(__name__)
|
||
|
|
|
||
|
|
|
||
|
|
class XmlHashCalculatorThread(QThread):
|
||
|
|
"""
|
||
|
|
Thread für die asynchrone Berechnung von blake2b-Hash-Werten für XML-Dateien.
|
||
|
|
"""
|
||
|
|
|
||
|
|
# Signale für die Kommunikation mit dem Haupt-Thread
|
||
|
|
hash_calculated = Signal(object, str) # xml_file_object, hash_value
|
||
|
|
calculation_finished = Signal(int, int) # processed_count, total_count
|
||
|
|
error_occurred = Signal(str, str) # xml_file_path, error_message
|
||
|
|
|
||
|
|
def __init__(self, project_dir: Path, xml_files: List[XmlFile]):
|
||
|
|
"""
|
||
|
|
Initialisiert den Hash-Berechnungs-Thread.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
project_dir: Pfad zum Projekt-Verzeichnis
|
||
|
|
xml_files: Liste der XmlFile-Objekte für die Hash-Berechnung
|
||
|
|
"""
|
||
|
|
super().__init__()
|
||
|
|
self.project_dir = project_dir
|
||
|
|
self.xml_files = xml_files
|
||
|
|
self.processed_count = 0
|
||
|
|
|
||
|
|
def run(self):
|
||
|
|
"""
|
||
|
|
Führt die Hash-Berechnung für alle XML-Dateien aus.
|
||
|
|
"""
|
||
|
|
logger.info(f"Starte Hash-Berechnung für {len(self.xml_files)} XML-Dateien")
|
||
|
|
|
||
|
|
for xml_file in self.xml_files:
|
||
|
|
try:
|
||
|
|
# Prüfe ob hashsum bereits vorhanden ist
|
||
|
|
if xml_file.hashsum:
|
||
|
|
logger.debug(f"Hash bereits vorhanden für {xml_file.xml}: {xml_file.hashsum}")
|
||
|
|
self.processed_count += 1
|
||
|
|
continue
|
||
|
|
|
||
|
|
# Berechne Hash für die XML-Datei
|
||
|
|
xml_file_path = self.project_dir / xml_file.xml
|
||
|
|
hash_value = self._calculate_blake2b_hash(xml_file_path)
|
||
|
|
|
||
|
|
if hash_value:
|
||
|
|
# Sende Signal mit berechnetem Hash
|
||
|
|
self.hash_calculated.emit(xml_file, hash_value)
|
||
|
|
logger.debug(f"Hash berechnet für {xml_file.xml}: {hash_value}")
|
||
|
|
|
||
|
|
self.processed_count += 1
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
error_msg = f"Fehler bei Hash-Berechnung für {xml_file.xml}: {str(e)}"
|
||
|
|
logger.error(error_msg)
|
||
|
|
self.error_occurred.emit(str(xml_file.xml), error_msg)
|
||
|
|
self.processed_count += 1
|
||
|
|
|
||
|
|
# Sende Abschluss-Signal
|
||
|
|
self.calculation_finished.emit(self.processed_count, len(self.xml_files))
|
||
|
|
logger.info(f"Hash-Berechnung abgeschlossen: {self.processed_count}/{len(self.xml_files)} verarbeitet")
|
||
|
|
|
||
|
|
def _calculate_blake2b_hash(self, file_path: Path) -> str | None:
|
||
|
|
"""
|
||
|
|
Berechnet den blake2b-Hash einer XML-Datei.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
file_path: Pfad zur XML-Datei
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
str: Hash-Wert mit "blake2b:" Präfix oder None bei Fehler
|
||
|
|
"""
|
||
|
|
try:
|
||
|
|
if not file_path.exists():
|
||
|
|
logger.warning(f"XML-Datei nicht gefunden: {file_path}")
|
||
|
|
return None
|
||
|
|
|
||
|
|
# Datei binär lesen und Hash berechnen
|
||
|
|
with open(file_path, "rb") as f:
|
||
|
|
file_content = f.read()
|
||
|
|
hash_obj = hashlib.blake2b(file_content)
|
||
|
|
hash_hex = hash_obj.hexdigest()
|
||
|
|
|
||
|
|
# Präfix hinzufügen
|
||
|
|
return f"blake2b:{hash_hex}"
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
logger.error(f"Fehler beim Berechnen des Hash für {file_path}: {e}")
|
||
|
|
return None
|
||
|
|
|
||
|
|
|
||
|
|
class XmlBatchProcessingThread(QThread):
|
||
|
|
"""
|
||
|
|
Thread für die asynchrone Batch-Verarbeitung von mehreren XML-Dateien.
|
||
|
|
Verarbeitet XML-Dateien mit Hash-Berechnung, Duplikatserkennung und Dateikopieren.
|
||
|
|
"""
|
||
|
|
|
||
|
|
# Signale für die Kommunikation mit dem Haupt-Thread
|
||
|
|
progress_update = Signal(int, int, str) # current, total, current_file_name
|
||
|
|
file_processed = Signal(dict) # result dictionary
|
||
|
|
processing_finished = Signal(dict) # stats dictionary
|
||
|
|
error_occurred = Signal(str) # error_message
|
||
|
|
|
||
|
|
def __init__(self, xml_files: list, selected_xsl_nodes: list, project_dir: Path, pdf_project):
|
||
|
|
"""
|
||
|
|
Initialisiert den Batch-Verarbeitungs-Thread.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
xml_files: Liste von Pfaden zu XML-Dateien
|
||
|
|
selected_xsl_nodes: Liste der ausgewählten XSL-Knoten
|
||
|
|
project_dir: Pfad zum Projekt-Verzeichnis
|
||
|
|
pdf_project: ProjectData-Objekt
|
||
|
|
"""
|
||
|
|
super().__init__()
|
||
|
|
self.xml_files = xml_files
|
||
|
|
self.selected_xsl_nodes = selected_xsl_nodes
|
||
|
|
self.project_dir = project_dir
|
||
|
|
self.pdf_project = pdf_project
|
||
|
|
|
||
|
|
# Statistiken
|
||
|
|
self.stats = {
|
||
|
|
"total": len(xml_files),
|
||
|
|
"processed": 0,
|
||
|
|
"new_added": 0,
|
||
|
|
"existing_added": 0,
|
||
|
|
"already_assigned": 0,
|
||
|
|
"cancelled": 0,
|
||
|
|
"errors": 0,
|
||
|
|
"error_messages": [],
|
||
|
|
"renamed_files": [],
|
||
|
|
}
|
||
|
|
|
||
|
|
def run(self):
|
||
|
|
"""
|
||
|
|
Führt die Batch-Verarbeitung aller XML-Dateien aus.
|
||
|
|
"""
|
||
|
|
logger.info(f"Starte Batch-Verarbeitung für {len(self.xml_files)} XML-Dateien")
|
||
|
|
|
||
|
|
for i, xml_file_path in enumerate(self.xml_files):
|
||
|
|
try:
|
||
|
|
# Sende Progress-Update
|
||
|
|
self.progress_update.emit(i + 1, len(self.xml_files), xml_file_path.name)
|
||
|
|
|
||
|
|
# Prüfe ob die Datei existiert
|
||
|
|
if not xml_file_path.exists():
|
||
|
|
self.stats["errors"] += 1
|
||
|
|
self.stats["error_messages"].append(f"{xml_file_path.name}: Datei existiert nicht")
|
||
|
|
continue
|
||
|
|
|
||
|
|
# Verarbeite die XML-Datei
|
||
|
|
result = self._process_xml_file(xml_file_path)
|
||
|
|
|
||
|
|
# Aktualisiere Statistiken
|
||
|
|
self._update_stats(result)
|
||
|
|
|
||
|
|
# Sende Ergebnis
|
||
|
|
self.file_processed.emit(result)
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
error_msg = f"Fehler bei {xml_file_path.name}: {str(e)}"
|
||
|
|
logger.error(error_msg)
|
||
|
|
self.stats["errors"] += 1
|
||
|
|
self.stats["error_messages"].append(error_msg)
|
||
|
|
|
||
|
|
# Sende Abschluss-Signal mit Statistiken
|
||
|
|
self.processing_finished.emit(self.stats)
|
||
|
|
logger.info(f"Batch-Verarbeitung abgeschlossen: {self.stats['processed']}/{self.stats['total']} verarbeitet")
|
||
|
|
|
||
|
|
def _process_xml_file(self, xml_file_path: Path) -> dict:
|
||
|
|
"""
|
||
|
|
Verarbeitet eine einzelne XML-Datei.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
xml_file_path: Pfad zur XML-Datei
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
dict: Ergebnis-Dictionary mit Status
|
||
|
|
"""
|
||
|
|
try:
|
||
|
|
# 1. Hash berechnen
|
||
|
|
file_hash = self._calculate_hash_for_file(xml_file_path)
|
||
|
|
if not file_hash:
|
||
|
|
logger.warning(f"Hash-Berechnung für {xml_file_path} fehlgeschlagen")
|
||
|
|
|
||
|
|
# 2. Prüfe auf Hash-Duplikat
|
||
|
|
existing_xml = self._find_xml_file_by_hash(file_hash) if file_hash else None
|
||
|
|
|
||
|
|
if existing_xml:
|
||
|
|
# Hash-Match: Ordne vorhandene Datei zu
|
||
|
|
return self._assign_existing_xml_to_nodes(existing_xml)
|
||
|
|
else:
|
||
|
|
# Keine Duplikate: Verarbeite als neue Datei
|
||
|
|
return self._process_new_xml_file(xml_file_path, file_hash)
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
return {"status": "error", "error_msg": str(e)}
|
||
|
|
|
||
|
|
def _calculate_hash_for_file(self, file_path: Path) -> str | None:
|
||
|
|
"""Berechnet blake2b Hash für eine Datei."""
|
||
|
|
try:
|
||
|
|
if not file_path.exists():
|
||
|
|
return None
|
||
|
|
|
||
|
|
with open(file_path, "rb") as f:
|
||
|
|
file_content = f.read()
|
||
|
|
hash_obj = hashlib.blake2b(file_content)
|
||
|
|
hash_hex = hash_obj.hexdigest()
|
||
|
|
|
||
|
|
return f"blake2b:{hash_hex}"
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
logger.error(f"Fehler beim Berechnen des Hash für {file_path}: {e}")
|
||
|
|
return None
|
||
|
|
|
||
|
|
def _find_xml_file_by_hash(self, hash_value: str) -> XmlFile | None:
|
||
|
|
"""Sucht eine XML-Datei anhand ihres Hash-Werts."""
|
||
|
|
if not hash_value or not self.pdf_project.nodes:
|
||
|
|
return None
|
||
|
|
|
||
|
|
def search_recursive(nodes):
|
||
|
|
for node in nodes:
|
||
|
|
if isinstance(node, XslFile) and node.xmls:
|
||
|
|
for xml_file in node.xmls:
|
||
|
|
if xml_file.hashsum == hash_value:
|
||
|
|
return xml_file
|
||
|
|
elif isinstance(node, TreeNode) and node.children:
|
||
|
|
found = search_recursive(node.children)
|
||
|
|
if found:
|
||
|
|
return found
|
||
|
|
return None
|
||
|
|
|
||
|
|
return search_recursive(self.pdf_project.nodes)
|
||
|
|
|
||
|
|
def _assign_existing_xml_to_nodes(self, existing_xml: XmlFile) -> dict:
|
||
|
|
"""Ordnet eine vorhandene XML-Datei den Knoten zu."""
|
||
|
|
try:
|
||
|
|
added_count = 0
|
||
|
|
|
||
|
|
for xsl_node in self.selected_xsl_nodes:
|
||
|
|
already_assigned = any(xml_file.xml == existing_xml.xml for xml_file in xsl_node.xmls)
|
||
|
|
|
||
|
|
if not already_assigned:
|
||
|
|
new_xml_ref = XmlFile(xml=existing_xml.xml, hashsum=existing_xml.hashsum)
|
||
|
|
xsl_node.xmls.append(new_xml_ref)
|
||
|
|
added_count += 1
|
||
|
|
|
||
|
|
if added_count > 0:
|
||
|
|
return {
|
||
|
|
"status": "existing_added",
|
||
|
|
"added_count": added_count,
|
||
|
|
"existing_file": existing_xml.xml.name,
|
||
|
|
}
|
||
|
|
else:
|
||
|
|
return {"status": "already_assigned", "added_count": 0, "existing_file": existing_xml.xml.name}
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
return {"status": "error", "error_msg": str(e)}
|
||
|
|
|
||
|
|
def _process_new_xml_file(self, xml_file_path: Path, file_hash: str | None) -> dict:
|
||
|
|
"""Verarbeitet eine neue XML-Datei."""
|
||
|
|
try:
|
||
|
|
# Erstelle xml-Ordner
|
||
|
|
xml_dir = self.project_dir / "xml"
|
||
|
|
xml_dir.mkdir(parents=True, exist_ok=True)
|
||
|
|
|
||
|
|
# Bestimme Ziel-Pfad
|
||
|
|
target_xml_path = xml_dir / xml_file_path.name
|
||
|
|
|
||
|
|
# Prüfe auf Namenskonflikte und generiere ggf. alternativen Namen
|
||
|
|
original_name = xml_file_path.name
|
||
|
|
counter = 1
|
||
|
|
while target_xml_path.exists() or self._is_filename_used_in_project(Path("xml") / target_xml_path.name):
|
||
|
|
# Generiere alternativen Namen
|
||
|
|
stem = xml_file_path.stem
|
||
|
|
suffix = xml_file_path.suffix
|
||
|
|
target_xml_path = xml_dir / f"{stem}_{counter}{suffix}"
|
||
|
|
counter += 1
|
||
|
|
|
||
|
|
# Sicherheit: Maximal 1000 Versuche
|
||
|
|
if counter > 1000:
|
||
|
|
return {"status": "error", "error_msg": "Konnte keinen eindeutigen Dateinamen finden"}
|
||
|
|
|
||
|
|
# Kopiere Datei
|
||
|
|
shutil.copy2(xml_file_path, target_xml_path)
|
||
|
|
|
||
|
|
# Erstelle relatives Path
|
||
|
|
relative_xml_path = Path("xml") / target_xml_path.name
|
||
|
|
|
||
|
|
# Füge zu XSL-Knoten hinzu
|
||
|
|
added_count = 0
|
||
|
|
for xsl_node in self.selected_xsl_nodes:
|
||
|
|
existing_xml = any(xml_file.xml == relative_xml_path for xml_file in xsl_node.xmls)
|
||
|
|
|
||
|
|
if not existing_xml:
|
||
|
|
new_xml_file = XmlFile(xml=relative_xml_path, hashsum=file_hash)
|
||
|
|
xsl_node.xmls.append(new_xml_file)
|
||
|
|
added_count += 1
|
||
|
|
|
||
|
|
if added_count > 0:
|
||
|
|
return {
|
||
|
|
"status": "new_added",
|
||
|
|
"added_count": added_count,
|
||
|
|
"new_file": target_xml_path.name,
|
||
|
|
"renamed_from": original_name if target_xml_path.name != original_name else None,
|
||
|
|
}
|
||
|
|
else:
|
||
|
|
return {"status": "already_assigned", "added_count": 0, "new_file": target_xml_path.name}
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
return {"status": "error", "error_msg": str(e)}
|
||
|
|
|
||
|
|
def _is_filename_used_in_project(self, filename: Path) -> bool:
|
||
|
|
"""Prüft ob ein Dateiname bereits im Projekt verwendet wird."""
|
||
|
|
if not self.pdf_project.nodes:
|
||
|
|
return False
|
||
|
|
|
||
|
|
def search_recursive(nodes):
|
||
|
|
for node in nodes:
|
||
|
|
if isinstance(node, XslFile) and node.xmls:
|
||
|
|
for xml_file in node.xmls:
|
||
|
|
if xml_file.xml == filename:
|
||
|
|
return True
|
||
|
|
elif isinstance(node, TreeNode) and node.children:
|
||
|
|
if search_recursive(node.children):
|
||
|
|
return True
|
||
|
|
return False
|
||
|
|
|
||
|
|
return search_recursive(self.pdf_project.nodes)
|
||
|
|
|
||
|
|
def _update_stats(self, result: dict):
|
||
|
|
"""Aktualisiert die Statistiken."""
|
||
|
|
self.stats["processed"] += 1
|
||
|
|
|
||
|
|
status = result.get("status")
|
||
|
|
if status == "new_added":
|
||
|
|
self.stats["new_added"] += 1
|
||
|
|
if result.get("renamed_from"):
|
||
|
|
self.stats["renamed_files"].append(f"{result['renamed_from']} → {result['new_file']}")
|
||
|
|
elif status == "existing_added":
|
||
|
|
self.stats["existing_added"] += 1
|
||
|
|
elif status == "already_assigned":
|
||
|
|
self.stats["already_assigned"] += 1
|
||
|
|
elif status == "error":
|
||
|
|
self.stats["errors"] += 1
|
||
|
|
self.stats["error_messages"].append(result.get("error_msg", "Unbekannter Fehler"))
|
||
|
|
|
||
|
|
|
||
|
|
class TransformationThread(QThread):
|
||
|
|
"""
|
||
|
|
Thread für die asynchrone Ausführung von Transformations-Jobs.
|
||
|
|
"""
|
||
|
|
|
||
|
|
# Signale für die Kommunikation mit dem Haupt-Thread
|
||
|
|
job_started = Signal(str, str) # xml_file_name, xsl_id_str
|
||
|
|
job_finished = Signal(dict) # result_dict
|
||
|
|
job_error = Signal(str, str, str) # xml_file_name, xsl_id_str, error_message
|
||
|
|
all_jobs_finished = Signal(int, int, float) # successful_count, total_count, total_duration
|
||
|
|
|
||
|
|
def __init__(self, jobs: list[TransformationJob], force: bool = False, max_workers: int = 8):
|
||
|
|
"""
|
||
|
|
Initialisiert den Transformations-Thread.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
jobs: Liste der TransformationJob-Objekte
|
||
|
|
force: Wenn True, werden alle Jobs ausgeführt (ignoriert Up-to-Date)
|
||
|
|
max_workers: Maximale Anzahl paralleler Worker (Standard: 8)
|
||
|
|
"""
|
||
|
|
super().__init__()
|
||
|
|
self.jobs = jobs
|
||
|
|
self.force = force
|
||
|
|
self.max_workers = max_workers
|
||
|
|
self.successful_count = 0
|
||
|
|
|
||
|
|
def _process_single_job(self, job: TransformationJob) -> dict:
|
||
|
|
"""
|
||
|
|
Verarbeitet einen einzelnen Transformations-Job (Thread-safe).
|
||
|
|
|
||
|
|
Args:
|
||
|
|
job: Der zu verarbeitende TransformationJob
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
dict: Ergebnis-Dictionary des Jobs
|
||
|
|
"""
|
||
|
|
try:
|
||
|
|
# Sende Start-Signal mit XSL-ID
|
||
|
|
xsl_id_str = "_".join(str(x) for x in job.xsl_id) if job.xsl_id else ""
|
||
|
|
self.job_started.emit(str(job.xml_file), xsl_id_str)
|
||
|
|
|
||
|
|
# Führe Transformations-Pipeline aus
|
||
|
|
result = job.run_full_pipeline(force=self.force)
|
||
|
|
|
||
|
|
# Sende Abschluss-Signal
|
||
|
|
self.job_finished.emit(result)
|
||
|
|
|
||
|
|
return result
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
error_msg = f"Unerwarteter Fehler bei Transformation: {str(e)}"
|
||
|
|
logger.error(error_msg)
|
||
|
|
xsl_id_str = "_".join(str(x) for x in job.xsl_id) if job.xsl_id else ""
|
||
|
|
self.job_error.emit(str(job.xml_file), xsl_id_str, error_msg)
|
||
|
|
return {"success": False, "error": error_msg}
|
||
|
|
|
||
|
|
def run(self):
|
||
|
|
"""
|
||
|
|
Führt alle Transformations-Jobs parallel aus mit ThreadPoolExecutor.
|
||
|
|
"""
|
||
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||
|
|
from datetime import datetime
|
||
|
|
import threading
|
||
|
|
|
||
|
|
start_time = datetime.now()
|
||
|
|
logger.info(f"Starte parallele Transformation von {len(self.jobs)} Jobs mit {self.max_workers} Workern")
|
||
|
|
|
||
|
|
# Thread-sicherer Counter
|
||
|
|
successful_lock = threading.Lock()
|
||
|
|
|
||
|
|
# Verwende ThreadPoolExecutor für parallele Verarbeitung
|
||
|
|
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
|
||
|
|
# Starte alle Jobs
|
||
|
|
future_to_job = {executor.submit(self._process_single_job, job): job for job in self.jobs}
|
||
|
|
|
||
|
|
# Warte auf Abschluss und sammle Ergebnisse
|
||
|
|
for future in as_completed(future_to_job):
|
||
|
|
try:
|
||
|
|
result = future.result()
|
||
|
|
if result.get("success", False):
|
||
|
|
with successful_lock:
|
||
|
|
self.successful_count += 1
|
||
|
|
except Exception as e:
|
||
|
|
logger.error(f"Fehler beim Verarbeiten des Future: {e}")
|
||
|
|
|
||
|
|
# Berechne Gesamtdauer
|
||
|
|
total_duration = (datetime.now() - start_time).total_seconds()
|
||
|
|
|
||
|
|
# Sende Abschluss-Signal für alle Jobs mit Gesamtdauer
|
||
|
|
self.all_jobs_finished.emit(self.successful_count, len(self.jobs), total_duration)
|
||
|
|
logger.info(
|
||
|
|
f"Transformation abgeschlossen: {self.successful_count}/{len(self.jobs)} erfolgreich ({total_duration:.2f}s) "
|
||
|
|
f"[{len(self.jobs) / total_duration:.2f} Jobs/s mit {self.max_workers} Workern]"
|
||
|
|
)
|