""" Thread-Klassen für asynchrone Operationen in DocuMentor. Dieses Modul enthält alle QThread-Klassen, die für Hintergrundoperationen verwendet werden: - XmlHashCalculatorThread: Berechnung von blake2b-Hashes für XML-Dateien - XmlBatchProcessingThread: Batch-Verarbeitung von XML-Dateien - TransformationThread: Ausführung von XSL-Transformationen """ import logging import shutil from pathlib import Path from PySide6.QtCore import QThread, Signal from conf import TreeNode, XslFile, XmlFile from transform import TransformationJob from utils import calculate_blake2b_hash logger = logging.getLogger(__name__) class XmlHashCalculatorThread(QThread): """ Thread für die asynchrone Berechnung von blake2b-Hash-Werten für XML-Dateien. """ # Signale für die Kommunikation mit dem Haupt-Thread hash_calculated = Signal(object, str) # xml_file_object, hash_value calculation_finished = Signal(int, int) # processed_count, total_count error_occurred = Signal(str, str) # xml_file_path, error_message def __init__(self, project_dir: Path, xml_files: list[XmlFile]): """ Initialisiert den Hash-Berechnungs-Thread. Args: project_dir: Pfad zum Projekt-Verzeichnis xml_files: Liste der XmlFile-Objekte für die Hash-Berechnung """ super().__init__() self.project_dir = project_dir self.xml_files = xml_files self.processed_count = 0 def run(self): """ Führt die Hash-Berechnung für alle XML-Dateien aus. """ logger.info(f"Starte Hash-Berechnung für {len(self.xml_files)} XML-Dateien") for xml_file in self.xml_files: try: # Prüfe ob hashsum bereits vorhanden ist if xml_file.hashsum: logger.debug(f"Hash bereits vorhanden für {xml_file.xml}: {xml_file.hashsum}") self.processed_count += 1 continue # Berechne Hash für die XML-Datei xml_file_path = self.project_dir / xml_file.xml hash_value = self._calculate_blake2b_hash(xml_file_path) if hash_value: # Sende Signal mit berechnetem Hash self.hash_calculated.emit(xml_file, hash_value) logger.debug(f"Hash berechnet für {xml_file.xml}: {hash_value}") self.processed_count += 1 except Exception as e: error_msg = f"Fehler bei Hash-Berechnung für {xml_file.xml}: {str(e)}" logger.error(error_msg) self.error_occurred.emit(str(xml_file.xml), error_msg) self.processed_count += 1 # Sende Abschluss-Signal self.calculation_finished.emit(self.processed_count, len(self.xml_files)) logger.info(f"Hash-Berechnung abgeschlossen: {self.processed_count}/{len(self.xml_files)} verarbeitet") def _calculate_blake2b_hash(self, file_path: Path) -> str | None: """Berechnet den blake2b-Hash einer XML-Datei.""" return calculate_blake2b_hash(file_path) class XmlBatchProcessingThread(QThread): """ Thread für die asynchrone Batch-Verarbeitung von mehreren XML-Dateien. Verarbeitet XML-Dateien mit Hash-Berechnung, Duplikatserkennung und Dateikopieren. """ # Signale für die Kommunikation mit dem Haupt-Thread progress_update = Signal(int, int, str) # current, total, current_file_name file_processed = Signal(dict) # result dictionary processing_finished = Signal(dict) # stats dictionary error_occurred = Signal(str) # error_message def __init__(self, xml_files: list, selected_xsl_nodes: list, project_dir: Path, pdf_project): """ Initialisiert den Batch-Verarbeitungs-Thread. Args: xml_files: Liste von Pfaden zu XML-Dateien selected_xsl_nodes: Liste der ausgewählten XSL-Knoten project_dir: Pfad zum Projekt-Verzeichnis pdf_project: ProjectData-Objekt """ super().__init__() self.xml_files = xml_files self.selected_xsl_nodes = selected_xsl_nodes self.project_dir = project_dir self.pdf_project = pdf_project # Statistiken self.stats = { "total": len(xml_files), "processed": 0, "new_added": 0, "existing_added": 0, "already_assigned": 0, "cancelled": 0, "errors": 0, "error_messages": [], "renamed_files": [], } def run(self): """ Führt die Batch-Verarbeitung aller XML-Dateien aus. """ logger.info(f"Starte Batch-Verarbeitung für {len(self.xml_files)} XML-Dateien") for i, xml_file_path in enumerate(self.xml_files): try: # Sende Progress-Update self.progress_update.emit(i + 1, len(self.xml_files), xml_file_path.name) # Prüfe ob die Datei existiert if not xml_file_path.exists(): self.stats["errors"] += 1 self.stats["error_messages"].append(f"{xml_file_path.name}: Datei existiert nicht") continue # Verarbeite die XML-Datei result = self._process_xml_file(xml_file_path) # Aktualisiere Statistiken self._update_stats(result) # Sende Ergebnis self.file_processed.emit(result) except Exception as e: error_msg = f"Fehler bei {xml_file_path.name}: {str(e)}" logger.error(error_msg) self.stats["errors"] += 1 self.stats["error_messages"].append(error_msg) # Sende Abschluss-Signal mit Statistiken self.processing_finished.emit(self.stats) logger.info(f"Batch-Verarbeitung abgeschlossen: {self.stats['processed']}/{self.stats['total']} verarbeitet") def _process_xml_file(self, xml_file_path: Path) -> dict: """ Verarbeitet eine einzelne XML-Datei. Args: xml_file_path: Pfad zur XML-Datei Returns: dict: Ergebnis-Dictionary mit Status """ try: # 1. Hash berechnen file_hash = self._calculate_hash_for_file(xml_file_path) if not file_hash: logger.warning(f"Hash-Berechnung für {xml_file_path} fehlgeschlagen") # 2. Prüfe auf Hash-Duplikat existing_xml = self._find_xml_file_by_hash(file_hash) if file_hash else None if existing_xml: # Hash-Match: Ordne vorhandene Datei zu return self._assign_existing_xml_to_nodes(existing_xml) else: # Keine Duplikate: Verarbeite als neue Datei return self._process_new_xml_file(xml_file_path, file_hash) except Exception as e: return {"status": "error", "error_msg": str(e)} def _calculate_hash_for_file(self, file_path: Path) -> str | None: """Berechnet blake2b Hash für eine Datei.""" return calculate_blake2b_hash(file_path) def _find_xml_file_by_hash(self, hash_value: str) -> XmlFile | None: """Sucht eine XML-Datei anhand ihres Hash-Werts.""" if not hash_value or not self.pdf_project.nodes: return None def search_recursive(nodes): for node in nodes: if isinstance(node, XslFile) and node.xmls: for xml_file in node.xmls: if xml_file.hashsum == hash_value: return xml_file elif isinstance(node, TreeNode) and node.children: found = search_recursive(node.children) if found: return found return None return search_recursive(self.pdf_project.nodes) def _assign_existing_xml_to_nodes(self, existing_xml: XmlFile) -> dict: """Ordnet eine vorhandene XML-Datei den Knoten zu.""" try: added_count = 0 for xsl_node in self.selected_xsl_nodes: already_assigned = any(xml_file.xml == existing_xml.xml for xml_file in xsl_node.xmls) if not already_assigned: new_xml_ref = XmlFile(xml=existing_xml.xml, hashsum=existing_xml.hashsum) xsl_node.xmls.append(new_xml_ref) added_count += 1 if added_count > 0: return { "status": "existing_added", "added_count": added_count, "existing_file": existing_xml.xml.name, } else: return {"status": "already_assigned", "added_count": 0, "existing_file": existing_xml.xml.name} except Exception as e: return {"status": "error", "error_msg": str(e)} def _process_new_xml_file(self, xml_file_path: Path, file_hash: str | None) -> dict: """Verarbeitet eine neue XML-Datei.""" try: # Erstelle xml-Ordner xml_dir = self.project_dir / "xml" xml_dir.mkdir(parents=True, exist_ok=True) # Bestimme Ziel-Pfad target_xml_path = xml_dir / xml_file_path.name # Set aller bereits im Projekt verwendeten Dateinamen — einmalig aufbauen used_filenames = self._collect_project_filenames() # Prüfe auf Namenskonflikte und generiere ggf. alternativen Namen original_name = xml_file_path.name counter = 1 while target_xml_path.exists() or (Path("xml") / target_xml_path.name) in used_filenames: # Generiere alternativen Namen stem = xml_file_path.stem suffix = xml_file_path.suffix target_xml_path = xml_dir / f"{stem}_{counter}{suffix}" counter += 1 # Sicherheit: Maximal 1000 Versuche if counter > 1000: return {"status": "error", "error_msg": "Konnte keinen eindeutigen Dateinamen finden"} # Kopiere Datei shutil.copy2(xml_file_path, target_xml_path) # Erstelle relatives Path relative_xml_path = Path("xml") / target_xml_path.name # Füge zu XSL-Knoten hinzu added_count = 0 for xsl_node in self.selected_xsl_nodes: existing_xml = any(xml_file.xml == relative_xml_path for xml_file in xsl_node.xmls) if not existing_xml: new_xml_file = XmlFile(xml=relative_xml_path, hashsum=file_hash) xsl_node.xmls.append(new_xml_file) added_count += 1 if added_count > 0: return { "status": "new_added", "added_count": added_count, "new_file": target_xml_path.name, "renamed_from": original_name if target_xml_path.name != original_name else None, } else: return {"status": "already_assigned", "added_count": 0, "new_file": target_xml_path.name} except Exception as e: return {"status": "error", "error_msg": str(e)} def _collect_project_filenames(self) -> set[Path]: """Gibt ein Set aller im Projekt verwendeten XML-Dateipfade zurück.""" result: set[Path] = set() def collect(nodes): for node in nodes: if isinstance(node, XslFile) and node.xmls: for xml_file in node.xmls: result.add(xml_file.xml) elif isinstance(node, TreeNode) and node.children: collect(node.children) collect(self.pdf_project.nodes or []) return result def _update_stats(self, result: dict): """Aktualisiert die Statistiken.""" self.stats["processed"] += 1 status = result.get("status") if status == "new_added": self.stats["new_added"] += 1 if result.get("renamed_from"): self.stats["renamed_files"].append(f"{result['renamed_from']} → {result['new_file']}") elif status == "existing_added": self.stats["existing_added"] += 1 elif status == "already_assigned": self.stats["already_assigned"] += 1 elif status == "error": self.stats["errors"] += 1 self.stats["error_messages"].append(result.get("error_msg", "Unbekannter Fehler")) class TransformationThread(QThread): """ Thread für die asynchrone Ausführung von Transformations-Jobs. """ # Signale für die Kommunikation mit dem Haupt-Thread job_started = Signal(str, str) # xml_file_name, xsl_id_str job_finished = Signal(dict) # result_dict job_error = Signal(str, str, str) # xml_file_name, xsl_id_str, error_message all_jobs_finished = Signal(int, int, float) # successful_count, total_count, total_duration def __init__(self, jobs: list[TransformationJob], force: bool = False, max_workers: int = 8): """ Initialisiert den Transformations-Thread. Args: jobs: Liste der TransformationJob-Objekte force: Wenn True, werden alle Jobs ausgeführt (ignoriert Up-to-Date) max_workers: Maximale Anzahl paralleler Worker (Standard: 8) """ super().__init__() self.jobs = jobs self.force = force self.max_workers = max_workers self.successful_count = 0 def _process_single_job(self, job: TransformationJob) -> dict: """ Verarbeitet einen einzelnen Transformations-Job (Thread-safe). Args: job: Der zu verarbeitende TransformationJob Returns: dict: Ergebnis-Dictionary des Jobs """ try: # Sende Start-Signal mit XSL-ID xsl_id_str = "_".join(str(x) for x in job.xsl_id) if job.xsl_id else "" self.job_started.emit(str(job.xml_file), xsl_id_str) # Führe Transformations-Pipeline aus result = job.run_full_pipeline(force=self.force) # Sende Abschluss-Signal self.job_finished.emit(result) return result except Exception as e: error_msg = f"Unerwarteter Fehler bei Transformation: {str(e)}" logger.error(error_msg) xsl_id_str = "_".join(str(x) for x in job.xsl_id) if job.xsl_id else "" self.job_error.emit(str(job.xml_file), xsl_id_str, error_msg) return {"success": False, "error": error_msg} def run(self): """ Führt alle Transformations-Jobs parallel aus mit ThreadPoolExecutor. """ from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime import threading start_time = datetime.now() logger.info(f"Starte parallele Transformation von {len(self.jobs)} Jobs mit {self.max_workers} Workern") # Thread-sicherer Counter successful_lock = threading.Lock() # Verwende ThreadPoolExecutor für parallele Verarbeitung with ThreadPoolExecutor(max_workers=self.max_workers) as executor: # Starte alle Jobs future_to_job = {executor.submit(self._process_single_job, job): job for job in self.jobs} # Warte auf Abschluss und sammle Ergebnisse for future in as_completed(future_to_job): try: result = future.result() if result.get("success", False): with successful_lock: self.successful_count += 1 except Exception as e: logger.error(f"Fehler beim Verarbeiten des Future: {e}") # Berechne Gesamtdauer total_duration = (datetime.now() - start_time).total_seconds() # Sende Abschluss-Signal für alle Jobs mit Gesamtdauer self.all_jobs_finished.emit(self.successful_count, len(self.jobs), total_duration) logger.info( f"Transformation abgeschlossen: {self.successful_count}/{len(self.jobs)} erfolgreich ({total_duration:.2f}s) " f"[{len(self.jobs) / total_duration:.2f} Jobs/s mit {self.max_workers} Workern]" )