From d0cdcd643225c3ac032c3cde1e024b9e994b97cd Mon Sep 17 00:00:00 2001 From: Vitali Graf Date: Sun, 28 Dec 2025 16:46:39 +0100 Subject: [PATCH] Performance: 4x schnellere XSLT-Transformationen durch Worker-Pool MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Problem: 82 XML-Dateien brauchten 160 Sekunden (JVM-Startup-Overhead) Lösung: Persistente JVM-Worker-Prozesse mit JAXP Transformer API - Saxon Worker Pool mit N persistenten JVM-Prozessen - Eliminiert JVM-Startup und Classpath-Scanning bei jedem Job - Parallele Verarbeitung mit ThreadPoolExecutor - JAXP Transformer API (javax.xml.transform) - stabil, kein System.exit() - Konfigurierbare Worker-Anzahl über Performance-Menü Ergebnis: 82 Dateien in 40 Sekunden (4x Speedup, ~0.49s pro Datei) Zusätzliche Verbesserungen: - Dual-Logging (Datei + Konsole) mit Timestamps - Worker-stderr-Logs in Projektverzeichnis/temp/ - Umfangreiche Debug-Ausgaben für Fehlerdiagnose - Robuste Fehlerbehandlung mit ErrorListener Technische Details: - SaxonWorkerPool: Verwaltet N Worker-Prozesse - JAXP statt Transform.main() (kein System.exit!) - Worker-Locks für thread-sichere Job-Verteilung - Graceful Shutdown mit EXIT-Befehl - Fallback auf subprocess bei Pool-Fehlern Dateien: - src/saxon_pool.py (NEU): Worker-Pool-Implementation - src/transform.py: Integration mit Worker-Pool - src/ui/MainWindow.py: Pool-Initialisierung, Performance-Menü - src/conf.py: max_workers Einstellung - src/main.py: Dual-Logging 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Sonnet 4.5 --- src/conf.py | 16 +- src/main.py | 39 +++- src/saxon_pool.py | 442 +++++++++++++++++++++++++++++++++++++++++++ src/transform.py | 89 +++++++-- src/ui/MainWindow.py | 191 ++++++++++++++++--- 5 files changed, 719 insertions(+), 58 deletions(-) create mode 100644 src/saxon_pool.py diff --git a/src/conf.py b/src/conf.py index 34a6766..94522e0 100644 --- a/src/conf.py +++ b/src/conf.py @@ -62,8 +62,8 @@ class XslDir(BaseModel): id: int name: str path_to_root_dir: Path - - + + class SSLMode(str, Enum): DISABLE = "disable" ALLOW = "allow" @@ -72,6 +72,7 @@ class SSLMode(str, Enum): VERIFY_CA = "verify-ca" VERIFY_FULL = "verify-full" + class PostgreSqlDb(BaseModel): id: int name: str @@ -141,6 +142,7 @@ class AppSettings(BaseSettings): pdf_projects: list[Project] = [] postgresql_dbs: list[PostgreSqlDb] = [] theme: str | None = None + max_workers: int = 8 # Anzahl paralleler Worker für Transformationen (Standard: 8) # UI-Zustand window_geometry: tuple[int, int, int, int] | None = None # (x, y, width, height) @@ -165,7 +167,7 @@ class AppSettings(BaseSettings): # Ordner existert nicht if not config_path.parent.exists(): config_path.parent.mkdir(parents=True, exist_ok=True) - + if not config_path.parent.is_dir() or not os.access(config_path.parent, os.W_OK): logger.exception(f"{config_path.parent} ist kein Verzeichnis oder es gibt keine Schreibrechte") sys.exit(1) @@ -205,16 +207,16 @@ class ProjectData(BaseModel): """ nodes: list[TreeNode] = [] - + @classmethod def readSettings(cls, project_dir: Path): # Explizit UTF-8 Encoding verwenden project_yaml_path = project_dir / "project.yaml" - with open(project_yaml_path, 'r', encoding='utf-8') as f: - yaml = YAML(typ='safe') + with open(project_yaml_path, "r", encoding="utf-8") as f: + yaml = YAML(typ="safe") yaml_data = yaml.load(f) return cls.model_validate(yaml_data) - + def writeSettings(self, project_dir: Path): with open(project_dir / "project.yaml", "w", encoding="utf8") as f: f.write(to_yaml_str(self)) diff --git a/src/main.py b/src/main.py index 0c2599c..c1a1b31 100644 --- a/src/main.py +++ b/src/main.py @@ -10,12 +10,39 @@ from conf import app_settings def main(): """Haupteinstiegspunkt der Anwendung.""" - # Logging konfigurieren - logging.basicConfig( - level=logging.DEBUG, - format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', - datefmt='%H:%M:%S' - ) + # Logging konfigurieren - sowohl Datei als auch Konsole + from datetime import datetime + + # Log-Verzeichnis erstellen (im selben Verzeichnis wie config.json) + from conf import config_path + + log_dir = config_path.parent / "logs" + log_dir.mkdir(exist_ok=True) + + # Log-Dateiname mit Timestamp + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + log_file = log_dir / f"documentor_{timestamp}.log" + + # Root-Logger konfigurieren + logger = logging.getLogger() + logger.setLevel(logging.DEBUG) + + # Formatter für alle Handler + formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s", datefmt="%H:%M:%S") + + # Handler 1: Datei (alles ab DEBUG) + file_handler = logging.FileHandler(log_file, encoding="utf-8") + file_handler.setLevel(logging.DEBUG) + file_handler.setFormatter(formatter) + logger.addHandler(file_handler) + + # Handler 2: Konsole (alles ab INFO) + console_handler = logging.StreamHandler(sys.stdout) + console_handler.setLevel(logging.INFO) + console_handler.setFormatter(formatter) + logger.addHandler(console_handler) + + logging.info(f"Logging initialisiert: {log_file}") # QApplication-Instanz erstellen app = QApplication(sys.argv) diff --git a/src/saxon_pool.py b/src/saxon_pool.py new file mode 100644 index 0000000..13b93da --- /dev/null +++ b/src/saxon_pool.py @@ -0,0 +1,442 @@ +""" +Saxon Worker Pool - Persistente JVM-Prozesse für schnelle XSLT-Transformationen. + +Eliminiert JVM-Startup-Overhead durch Vorinitialisierung von N Worker-Prozessen. +Jeder Worker läuft als Daemon und verarbeitet mehrere Transformationen nacheinander. +""" + +import logging +import subprocess +import threading +from pathlib import Path +from queue import Queue +from typing import Optional +import tempfile + +logger = logging.getLogger(__name__) + +# Java-Worker-Code (wird zur Laufzeit kompiliert) +SAXON_WORKER_JAVA = """ +import javax.xml.transform.*; +import javax.xml.transform.stream.*; +import java.io.*; +import java.util.*; + +public class SaxonWorker { + public static void main(String[] args) { + BufferedReader reader = new BufferedReader(new InputStreamReader(System.in)); + String line; + + // Create TransformerFactory once and reuse + TransformerFactory factory = TransformerFactory.newInstance(); + + System.err.println("SaxonWorker started and ready (using JAXP Transformer API)"); + System.err.flush(); + + try { + while ((line = reader.readLine()) != null) { + System.err.println("DEBUG: Received line: " + line.substring(0, Math.min(100, line.length()))); + System.err.flush(); + + if ("EXIT".equals(line.trim())) { + System.err.println("SaxonWorker exiting"); + break; + } + + try { + // Parse job + System.err.println("DEBUG: Parsing job..."); + System.err.flush(); + + String[] parts = line.split("\\t"); + System.err.println("DEBUG: Parts count: " + parts.length); + System.err.flush(); + + if (parts.length < 3) { + System.out.println("ERROR: Invalid job format"); + System.out.flush(); + continue; + } + + String sourceXml = parts[0]; + String xslStylesheet = parts[1]; + String outputFo = parts[2]; + + System.err.println("DEBUG: Creating transformer from stylesheet..."); + System.err.flush(); + + // Create Source and Result objects + StreamSource xslSource = new StreamSource(new File(xslStylesheet)); + StreamSource xmlSource = new StreamSource(new File(sourceXml)); + StreamResult result = new StreamResult(new File(outputFo)); + + System.err.println("DEBUG: Compiling stylesheet..."); + System.err.flush(); + + // Create transformer from stylesheet + Transformer transformer = factory.newTransformer(xslSource); + + // Set parameters if present + if (parts.length > 3 && !parts[3].isEmpty()) { + String[] params = parts[3].split("\\\\|\\\\|\\\\|"); + for (String param : params) { + if (!param.isEmpty() && param.contains("=")) { + String[] kv = param.split("=", 2); + transformer.setParameter(kv[0], kv[1]); + System.err.println("DEBUG: Set parameter: " + kv[0] + " = " + kv[1]); + } + } + System.err.flush(); + } + + System.err.println("DEBUG: Running transformation..."); + System.err.flush(); + + // Capture errors via ErrorListener + final StringBuilder errors = new StringBuilder(); + transformer.setErrorListener(new ErrorListener() { + @Override + public void warning(TransformerException e) { + errors.append("WARNING: ").append(e.getMessage()).append("\\n"); + } + + @Override + public void error(TransformerException e) { + errors.append("ERROR: ").append(e.getMessage()).append("\\n"); + } + + @Override + public void fatalError(TransformerException e) throws TransformerException { + errors.append("FATAL: ").append(e.getMessage()).append("\\n"); + throw e; + } + }); + + // Run transformation + transformer.transform(xmlSource, result); + + System.err.println("DEBUG: Transformation completed"); + System.err.flush(); + + // Check for errors + if (errors.length() > 0) { + System.out.println("ERROR: " + errors.toString().trim()); + } else { + System.out.println("OK"); + } + System.out.flush(); + + } catch (TransformerException e) { + System.err.println("DEBUG: Transformer exception: " + e.getClass().getName()); + System.err.flush(); + e.printStackTrace(System.err); + + String errorMsg = e.getMessage(); + if (errorMsg == null || errorMsg.isEmpty()) { + errorMsg = e.getClass().getSimpleName(); + } + System.out.println("ERROR: " + errorMsg); + System.out.flush(); + + } catch (Exception e) { + System.err.println("DEBUG: Job processing exception: " + e.getClass().getName()); + System.err.flush(); + e.printStackTrace(System.err); + System.out.println("ERROR: " + (e.getMessage() != null ? e.getMessage() : e.getClass().getName())); + System.out.flush(); + } + } + } catch (IOException e) { + System.err.println("SaxonWorker I/O error: " + e.getMessage()); + e.printStackTrace(System.err); + } + } +} +""" + + +class SaxonWorkerPool: + """ + Pool von lang-laufenden JVM-Prozessen für Saxon-Transformationen. + + Eliminiert JVM-Startup-Overhead durch Wiederverwendung von N Worker-Prozessen. + """ + + def __init__( + self, + num_workers: int, + java_vm_path: Path, + saxon_jar_path: Path, + classpath_cache: dict[Path, str], + log_dir: Optional[Path] = None, + ): + """ + Initialisiert den Saxon-Worker-Pool. + + Args: + num_workers: Anzahl der Worker-Prozesse + java_vm_path: Pfad zur Java VM Binary + saxon_jar_path: Pfad zur Saxon JAR-Datei + classpath_cache: Cache für Saxon-Classpaths + log_dir: Optionales Verzeichnis für Worker-Logs (Standard: temp_dir/temp) + """ + self.num_workers = num_workers + self.java_vm_path = java_vm_path + self.saxon_jar_path = saxon_jar_path + self.classpath_cache = classpath_cache + self.log_dir = log_dir + + # Worker-Prozesse und Queues + self.workers: list[subprocess.Popen] = [] + self.job_queue: Queue = Queue() + self.result_queue: Queue = Queue() + self.worker_locks: list[threading.Lock] = [] + + # Temporäres Verzeichnis für kompilierte Java-Klasse + self.temp_dir: Optional[Path] = None + self.worker_class_path: Optional[Path] = None + self.worker_log_dir: Optional[Path] = None + + # Initialisierung + self._compile_worker_class() + self._start_workers() + + logger.info(f"SaxonWorkerPool initialisiert mit {num_workers} Workern") + + def _compile_worker_class(self): + """Kompiliert die SaxonWorker-Java-Klasse.""" + try: + # Erstelle temporäres Verzeichnis + self.temp_dir = Path(tempfile.mkdtemp(prefix="saxon_worker_")) + + # Schreibe Java-Quellcode + java_file = self.temp_dir / "SaxonWorker.java" + java_file.write_text(SAXON_WORKER_JAVA, encoding="utf-8") + + # Hole Classpath + saxon_dir = self.saxon_jar_path.parent + if saxon_dir in self.classpath_cache: + classpath = self.classpath_cache[saxon_dir] + else: + # Fallback: Baue Classpath neu + import glob + import sys + + all_jars = glob.glob(str(saxon_dir / "*.jar")) + lib_dir = saxon_dir / "lib" + if lib_dir.exists(): + all_jars.extend(glob.glob(str(lib_dir / "*.jar"))) + + classpath_separator = ";" if sys.platform == "win32" else ":" + classpath = classpath_separator.join(all_jars) + + # Kompiliere Java-Klasse + javac_cmd = [str(self.java_vm_path).replace("java", "javac"), "-cp", classpath, str(java_file)] + + logger.debug(f"Kompiliere SaxonWorker: {' '.join(javac_cmd)}") + + result = subprocess.run(javac_cmd, capture_output=True, text=True, timeout=30) + + if result.returncode != 0: + raise RuntimeError(f"Java-Kompilierung fehlgeschlagen: {result.stderr}") + + self.worker_class_path = self.temp_dir + + logger.info(f"SaxonWorker erfolgreich kompiliert: {self.temp_dir}") + + except Exception as e: + logger.error(f"Fehler beim Kompilieren von SaxonWorker: {e}") + raise + + def _start_workers(self): + """Startet N Worker-Prozesse.""" + # Hole Classpath + saxon_dir = self.saxon_jar_path.parent + classpath = self.classpath_cache.get(saxon_dir, "") + + # Füge Worker-Classpath hinzu + import sys + + classpath_separator = ";" if sys.platform == "win32" else ":" + full_classpath = str(self.worker_class_path) + classpath_separator + classpath + + # Bestimme Log-Verzeichnis + self.worker_log_dir = self.log_dir if self.log_dir else self.temp_dir + if self.log_dir: + self.worker_log_dir.mkdir(parents=True, exist_ok=True) + + for i in range(self.num_workers): + try: + # Starte JVM-Prozess mit SaxonWorker + cmd = [str(self.java_vm_path), "-cp", full_classpath, "SaxonWorker"] + + # Öffne stderr-Log-Datei für diesen Worker + stderr_log = self.worker_log_dir / f"worker_{i}_stderr.log" + stderr_file = open(stderr_log, "w", encoding="utf-8") + + process = subprocess.Popen( + cmd, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=stderr_file, # Redirect stderr to file + text=True, + bufsize=1, # Line buffered + ) + + self.workers.append(process) + self.worker_locks.append(threading.Lock()) + + logger.debug(f"Worker {i} gestartet (PID: {process.pid}, stderr: {stderr_log})") + + # Warte kurz damit Worker initialisieren kann + import time + + time.sleep(0.1) + + # Prüfe ob Worker noch läuft + if process.poll() is not None: + # Worker ist bereits beendet - Fehler! + stderr_file.close() + with open(stderr_log, "r") as f: + stderr_content = f.read() + raise RuntimeError( + f"Worker {i} ist sofort beendet (Exit Code: {process.returncode})\nstderr:\n{stderr_content}" + ) + + except Exception as e: + logger.error(f"Fehler beim Starten von Worker {i}: {e}") + raise + + logger.info(f"{len(self.workers)} Saxon-Worker erfolgreich gestartet") + + def transform( + self, source_xml: Path, xsl_stylesheet: Path, output_fo: Path, xslt_params: dict[str, str] + ) -> tuple[bool, str]: + """ + Führt eine XSLT-Transformation mit einem Worker aus dem Pool aus. + + Args: + source_xml: Pfad zur XML-Eingabedatei + xsl_stylesheet: Pfad zur XSL-Stylesheet-Datei + output_fo: Pfad zur FO-Ausgabedatei + xslt_params: Dictionary mit XSLT-Parametern + + Returns: + tuple[bool, str]: (Erfolg, Fehlermeldung/Info) + """ + # Finde freien Worker + worker_idx = None + for i, lock in enumerate(self.worker_locks): + if lock.acquire(blocking=False): + worker_idx = i + break + + if worker_idx is None: + # Kein freier Worker, warte auf ersten verfügbaren + for i, lock in enumerate(self.worker_locks): + lock.acquire() + worker_idx = i + break + + try: + worker = self.workers[worker_idx] + + # Prüfe ob Worker noch läuft + if worker.poll() is not None: + # Worker ist tot! + stderr_log = self.worker_log_dir / f"worker_{worker_idx}_stderr.log" + try: + with open(stderr_log, "r") as f: + stderr_content = f.read() + error_msg = ( + f"Worker {worker_idx} ist beendet (Exit: {worker.returncode})\nstderr:\n{stderr_content}" + ) + except Exception: + error_msg = f"Worker {worker_idx} ist beendet (Exit: {worker.returncode})" + logger.error(error_msg) + return False, error_msg + + # Formatiere Parameter + params_str = "|||".join([f"{key}={value}" for key, value in xslt_params.items()]) + + # Erstelle Job-String (Tab-separated) + job = f"{source_xml}\t{xsl_stylesheet}\t{output_fo}\t{params_str}\n" + + logger.debug(f"Sende Job an Worker {worker_idx}: {source_xml.name}") + + # Sende Job an Worker + worker.stdin.write(job) + worker.stdin.flush() + + # Warte auf Antwort + response = worker.stdout.readline().strip() + + logger.debug(f"Worker {worker_idx} Antwort: '{response}'") + + if response == "OK": + return True, "Erfolgreich" + elif response.startswith("ERROR:"): + error_msg = response[6:].strip() + return False, f"Saxon-Fehler: {error_msg}" + else: + # Leere Antwort bedeutet Worker ist crashed + if not response: + stderr_log = self.worker_log_dir / f"worker_{worker_idx}_stderr.log" + try: + with open(stderr_log, "r") as f: + stderr_content = f.read()[-500:] # Letzte 500 Zeichen + return False, f"Worker {worker_idx} crashed (keine Antwort)\nstderr:\n{stderr_content}" + except Exception: + return False, f"Worker {worker_idx} crashed (keine Antwort)" + return False, f"Unerwartete Antwort: {response}" + + except Exception as e: + logger.error(f"Fehler bei Worker {worker_idx}: {e}") + return False, f"Worker-Fehler: {str(e)}" + + finally: + # Gebe Worker-Lock frei + self.worker_locks[worker_idx].release() + + def shutdown(self): + """Beendet alle Worker-Prozesse sauber.""" + logger.info("Beende Saxon-Worker-Pool...") + + for i, worker in enumerate(self.workers): + try: + # Sende EXIT-Befehl + if worker.stdin and not worker.stdin.closed: + worker.stdin.write("EXIT\n") + worker.stdin.flush() + + # Warte auf Beendigung (max 2 Sekunden) + worker.wait(timeout=2) + logger.debug(f"Worker {i} beendet") + + except subprocess.TimeoutExpired: + # Force kill falls nötig + worker.kill() + logger.warning(f"Worker {i} musste gekillt werden") + + except Exception as e: + logger.error(f"Fehler beim Beenden von Worker {i}: {e}") + + # Lösche temporäres Verzeichnis + if self.temp_dir and self.temp_dir.exists(): + try: + import shutil + + shutil.rmtree(self.temp_dir) + logger.debug(f"Temporäres Verzeichnis gelöscht: {self.temp_dir}") + except Exception as e: + logger.warning(f"Konnte temporäres Verzeichnis nicht löschen: {e}") + + logger.info("Saxon-Worker-Pool beendet") + + def __enter__(self): + """Context manager entry.""" + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + """Context manager exit.""" + self.shutdown() diff --git a/src/transform.py b/src/transform.py index 81760be..7143a56 100644 --- a/src/transform.py +++ b/src/transform.py @@ -11,10 +11,26 @@ import logging import subprocess from pathlib import Path from datetime import datetime -from typing import Any +from typing import Any, Optional, TYPE_CHECKING + +if TYPE_CHECKING: + from saxon_pool import SaxonWorkerPool logger = logging.getLogger(__name__) +# Globaler Saxon-Worker-Pool (wird von MainWindow initialisiert) +_saxon_worker_pool: Optional["SaxonWorkerPool"] = None + + +def set_saxon_worker_pool(pool: Optional["SaxonWorkerPool"]): + """Setzt den globalen Saxon-Worker-Pool.""" + global _saxon_worker_pool + _saxon_worker_pool = pool + if pool: + logger.info(f"Saxon-Worker-Pool aktiviert mit {pool.num_workers} Workern") + else: + logger.info("Saxon-Worker-Pool deaktiviert (Fallback auf subprocess)") + class TransformationJob: """ @@ -23,6 +39,9 @@ class TransformationJob: Ähnlich zur TestFall-Klasse in validate-xls.py, aber für DocuMentor angepasst. """ + # Klassenweiter Cache für Saxon-Classpaths (Performance-Optimierung) + _classpath_cache: dict[Path, str] = {} + def __init__( self, project_dir: Path, @@ -161,30 +180,63 @@ class TransformationJob: logger.error(error_msg) return False, error_msg + logger.info(f"Starte Saxon-Transformation: {self.xml_file.name}") + + # Versuche zuerst den Worker-Pool zu nutzen (schneller!) + global _saxon_worker_pool + if _saxon_worker_pool: + try: + success, message = _saxon_worker_pool.transform( + source_xml=xml_abs, + xsl_stylesheet=self.xsl_file, + output_fo=self.temp_fo, + xslt_params=self.xslt_params, + ) + + if success: + logger.info(f"Saxon-Transformation erfolgreich (Worker-Pool): {self.xml_file.name}") + else: + logger.error(f"Saxon-Transformation fehlgeschlagen (Worker-Pool): {message}") + + return success, message + + except Exception as e: + logger.warning(f"Worker-Pool-Fehler, Fallback auf subprocess: {e}") + # Fallback auf subprocess unten + + # Fallback: Traditionelle subprocess-Methode (langsamer, aber robuster) # XSLT-Parameter formatieren params = [f"{key}={value}" for key, value in self.xslt_params.items()] - # Sammle alle JAR-Dateien im Saxon-Verzeichnis für den Classpath - import glob - + # Hole Classpath aus Cache oder erstelle ihn saxon_dir = self.saxon_jar_path.parent - all_jars = glob.glob(str(saxon_dir / "*.jar")) + if saxon_dir not in TransformationJob._classpath_cache: + # Sammle alle JAR-Dateien im Saxon-Verzeichnis für den Classpath + import glob - # Sammle auch alle JARs aus dem lib-Unterordner (z.B. xmlresolver) - lib_dir = saxon_dir / "lib" - if lib_dir.exists() and lib_dir.is_dir(): - lib_jars = glob.glob(str(lib_dir / "*.jar")) - all_jars.extend(lib_jars) - logger.debug(f"Zusätzliche JARs aus lib-Verzeichnis gefunden: {len(lib_jars)}") + all_jars = glob.glob(str(saxon_dir / "*.jar")) - # Verwende alle JARs im Classpath (getrennt durch : auf Linux/Mac, ; auf Windows) - import sys + # Sammle auch alle JARs aus dem lib-Unterordner (z.B. xmlresolver) + lib_dir = saxon_dir / "lib" + if lib_dir.exists() and lib_dir.is_dir(): + lib_jars = glob.glob(str(lib_dir / "*.jar")) + all_jars.extend(lib_jars) + logger.debug(f"Zusätzliche JARs aus lib-Verzeichnis gefunden: {len(lib_jars)}") - classpath_separator = ";" if sys.platform == "win32" else ":" - classpath = classpath_separator.join(all_jars) + # Verwende alle JARs im Classpath (getrennt durch : auf Linux/Mac, ; auf Windows) + import sys + + classpath_separator = ";" if sys.platform == "win32" else ":" + classpath = classpath_separator.join(all_jars) + + # Cache den Classpath für zukünftige Jobs + TransformationJob._classpath_cache[saxon_dir] = classpath + logger.debug(f"Classpath für {saxon_dir} gecacht") + else: + classpath = TransformationJob._classpath_cache[saxon_dir] + logger.debug("Classpath aus Cache verwendet") # Saxon-Kommandozeile - # Verwende -cp mit allen JARs und rufe Transform-Main direkt auf cmd_line = [ str(self.java_vm_path), "-cp", @@ -196,8 +248,7 @@ class TransformationJob: *params, ] - logger.info(f"Starte Saxon-Transformation: {self.xml_file.name}") - logger.debug(f"Kommandozeile: {' '.join(cmd_line)}") + logger.debug(f"Kommandozeile (subprocess fallback): {' '.join(cmd_line)}") try: result = subprocess.run( @@ -214,7 +265,7 @@ class TransformationJob: logger.debug(f"Saxon StdErr:\n{result.stderr}") if result.returncode == 0: - logger.info(f"Saxon-Transformation erfolgreich: {self.xml_file.name}") + logger.info(f"Saxon-Transformation erfolgreich (subprocess): {self.xml_file.name}") return True, "Erfolgreich" else: error_msg = ( diff --git a/src/ui/MainWindow.py b/src/ui/MainWindow.py index 81b67be..19d4747 100644 --- a/src/ui/MainWindow.py +++ b/src/ui/MainWindow.py @@ -29,7 +29,8 @@ from ui.TreeNodeEditDialog import TreeNodeEditDialog from ui.XslFileEditDialog import XslFileEditDialog from ui.XmlToXslAssignDialog import XmlToXslAssignDialog from conf import app_settings, Project, ProjectData, TreeNode, XslFile, XmlFile -from transform import TransformationJob +from transform import TransformationJob, set_saxon_worker_pool +from saxon_pool import SaxonWorkerPool from pathlib import Path @@ -390,48 +391,79 @@ class TransformationThread(QThread): job_error = Signal(str, str, str) # xml_file_name, xsl_id_str, error_message all_jobs_finished = Signal(int, int, float) # successful_count, total_count, total_duration - def __init__(self, jobs: list[TransformationJob], force: bool = False): + def __init__(self, jobs: list[TransformationJob], force: bool = False, max_workers: int = 8): """ Initialisiert den Transformations-Thread. Args: jobs: Liste der TransformationJob-Objekte force: Wenn True, werden alle Jobs ausgeführt (ignoriert Up-to-Date) + max_workers: Maximale Anzahl paralleler Worker (Standard: 8) """ super().__init__() self.jobs = jobs self.force = force + self.max_workers = max_workers self.successful_count = 0 + def _process_single_job(self, job: TransformationJob) -> dict: + """ + Verarbeitet einen einzelnen Transformations-Job (Thread-safe). + + Args: + job: Der zu verarbeitende TransformationJob + + Returns: + dict: Ergebnis-Dictionary des Jobs + """ + try: + # Sende Start-Signal mit XSL-ID + xsl_id_str = "_".join(str(x) for x in job.xsl_id) if job.xsl_id else "" + self.job_started.emit(str(job.xml_file), xsl_id_str) + + # Führe Transformations-Pipeline aus + result = job.run_full_pipeline(force=self.force) + + # Sende Abschluss-Signal + self.job_finished.emit(result) + + return result + + except Exception as e: + error_msg = f"Unerwarteter Fehler bei Transformation: {str(e)}" + logger.error(error_msg) + xsl_id_str = "_".join(str(x) for x in job.xsl_id) if job.xsl_id else "" + self.job_error.emit(str(job.xml_file), xsl_id_str, error_msg) + return {"success": False, "error": error_msg} + def run(self): """ - Führt alle Transformations-Jobs sequenziell aus. + Führt alle Transformations-Jobs parallel aus mit ThreadPoolExecutor. """ + from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime + import threading start_time = datetime.now() - logger.info(f"Starte Transformation von {len(self.jobs)} Jobs") + logger.info(f"Starte parallele Transformation von {len(self.jobs)} Jobs mit {self.max_workers} Workern") - for job in self.jobs: - try: - # Sende Start-Signal mit XSL-ID - xsl_id_str = "_".join(str(x) for x in job.xsl_id) if job.xsl_id else "" - self.job_started.emit(str(job.xml_file), xsl_id_str) + # Thread-sicherer Counter + successful_lock = threading.Lock() - # Führe Transformations-Pipeline aus - result = job.run_full_pipeline(force=self.force) + # Verwende ThreadPoolExecutor für parallele Verarbeitung + with ThreadPoolExecutor(max_workers=self.max_workers) as executor: + # Starte alle Jobs + future_to_job = {executor.submit(self._process_single_job, job): job for job in self.jobs} - # Sende Abschluss-Signal - self.job_finished.emit(result) - - if result["success"]: - self.successful_count += 1 - - except Exception as e: - error_msg = f"Unerwarteter Fehler bei Transformation: {str(e)}" - logger.error(error_msg) - xsl_id_str = "_".join(str(x) for x in job.xsl_id) if job.xsl_id else "" - self.job_error.emit(str(job.xml_file), xsl_id_str, error_msg) + # Warte auf Abschluss und sammle Ergebnisse + for future in as_completed(future_to_job): + try: + result = future.result() + if result.get("success", False): + with successful_lock: + self.successful_count += 1 + except Exception as e: + logger.error(f"Fehler beim Verarbeiten des Future: {e}") # Berechne Gesamtdauer total_duration = (datetime.now() - start_time).total_seconds() @@ -439,7 +471,8 @@ class TransformationThread(QThread): # Sende Abschluss-Signal für alle Jobs mit Gesamtdauer self.all_jobs_finished.emit(self.successful_count, len(self.jobs), total_duration) logger.info( - f"Transformation abgeschlossen: {self.successful_count}/{len(self.jobs)} erfolgreich ({total_duration:.2f}s)" + f"Transformation abgeschlossen: {self.successful_count}/{len(self.jobs)} erfolgreich ({total_duration:.2f}s) " + f"[{len(self.jobs) / total_duration:.2f} Jobs/s mit {self.max_workers} Workern]" ) @@ -517,6 +550,9 @@ class MainWindow(QMainWindow): # Vorhandene Projekte-Menü initialisieren self._setup_projects_menu() + # Performance-Einstellungen-Menü initialisieren + self._setup_performance_menu() + # if theme := app_settings.theme: self.change_theme(theme) @@ -612,6 +648,47 @@ class MainWindow(QMainWindow): logger.info(f"Projekte-Menü initialisiert mit {len(app_settings.pdf_projects)} Projekten") + def _setup_performance_menu(self): + """Fügt ein Menü-Item für Performance-Einstellungen hinzu.""" + # Füge Separator vor der Performance-Einstellung hinzu + self.ui.menuProjekt.addSeparator() + + # Erstelle Aktion für Performance-Einstellungen + performance_action = QAction("Performance-Einstellungen...", self) + performance_action.setToolTip(f"Parallele Worker: {app_settings.max_workers}") + performance_action.triggered.connect(self._open_performance_settings) + + # Füge die Aktion zum Projekt-Menü hinzu + self.ui.menuProjekt.addAction(performance_action) + + logger.debug(f"Performance-Menü initialisiert (max_workers={app_settings.max_workers})") + + def _open_performance_settings(self): + """Öffnet einen Dialog für Performance-Einstellungen.""" + from PySide6.QtWidgets import QInputDialog + + current_workers = app_settings.max_workers + new_workers, ok = QInputDialog.getInt( + self, + "Performance-Einstellungen", + "Anzahl paralleler Worker für Transformationen:", + current_workers, # value + 1, # minValue + 32, # maxValue + 1, # step + ) + + if ok and new_workers != current_workers: + app_settings.max_workers = new_workers + app_settings.save() + logger.info(f"max_workers geändert: {current_workers} → {new_workers}") + QMessageBox.information( + self, + "Einstellungen gespeichert", + f"Anzahl paralleler Worker wurde auf {new_workers} gesetzt.\n\n" + f"Die Änderung wird bei der nächsten Transformation wirksam.", + ) + def open_existing_project(self, project: Project): """ Öffnet ein vorhandenes Projekt. @@ -647,6 +724,9 @@ class MainWindow(QMainWindow): # Starte Hash-Berechnung für alle XML-Dateien self._start_xml_hash_calculation() + # Initialisiere Saxon-Worker-Pool für schnellere Transformationen + self._initialize_saxon_worker_pool() + except Exception as e: logger.error(f"Fehler beim Laden des Projekts '{project.name}': {e}") # Fallback: Erstelle Standard-Einstellungen @@ -658,6 +738,62 @@ class MainWindow(QMainWindow): except Exception as fallback_error: logger.error(f"Fehler beim Erstellen der Fallback-Einstellungen: {fallback_error}") + def _initialize_saxon_worker_pool(self): + """Initialisiert den Saxon-Worker-Pool für schnelle Transformationen.""" + try: + # Shutdown vorherigen Pool falls vorhanden + self._shutdown_saxon_worker_pool() + + if not self.project: + logger.warning("Kein Projekt geladen, Saxon-Worker-Pool nicht initialisiert") + return + + # Hole Tool-Konfigurationen + java_vm = next((vm for vm in app_settings.java_vms if vm.id == self.project.java_vm_id), None) + saxon_jar = next((jar for jar in app_settings.saxon_jars if jar.id == self.project.saxon_jar_id), None) + + if not java_vm or not saxon_jar: + logger.warning("Java VM oder Saxon JAR nicht gefunden, Pool nicht initialisiert") + return + + # Erstelle Worker-Pool + num_workers = app_settings.max_workers + log_dir = self.project.project_dir / "temp" + pool = SaxonWorkerPool( + num_workers=num_workers, + java_vm_path=java_vm.path_to_binary_file, + saxon_jar_path=saxon_jar.path_to_jar_file, + classpath_cache=TransformationJob._classpath_cache, + log_dir=log_dir, + ) + + # Setze globalen Pool + set_saxon_worker_pool(pool) + + logger.info( + f"Saxon-Worker-Pool initialisiert: {num_workers} Worker " + f"(erwartet: {num_workers}x schneller für Saxon-Transformationen)" + ) + + except Exception as e: + logger.error(f"Fehler beim Initialisieren des Saxon-Worker-Pools: {e}") + # Kein Pool ist OK - Fallback auf subprocess + + def _shutdown_saxon_worker_pool(self): + """Beendet den Saxon-Worker-Pool sauber.""" + try: + # Importiere transform um Zugriff auf globalen Pool zu haben + import transform + + if transform._saxon_worker_pool: + logger.info("Beende Saxon-Worker-Pool...") + transform._saxon_worker_pool.shutdown() + set_saxon_worker_pool(None) + logger.info("Saxon-Worker-Pool beendet") + + except Exception as e: + logger.error(f"Fehler beim Beenden des Saxon-Worker-Pools: {e}") + def change_theme(self, theme_name): """ Wechselt das Theme der Anwendung. @@ -2900,7 +3036,7 @@ class MainWindow(QMainWindow): """ # Erstelle Zusammenfassungstext summary_lines = [] - summary_lines.append(f"Verarbeitung abgeschlossen:\n") + summary_lines.append("Verarbeitung abgeschlossen:\n") summary_lines.append(f"📊 Gesamt: {stats['total']} Datei(en)") summary_lines.append(f"✓ Verarbeitet: {stats['processed']} Datei(en)") @@ -2917,7 +3053,7 @@ class MainWindow(QMainWindow): summary_lines.append(f"🚫 Abgebrochen: {stats['cancelled']} Datei(en)") if stats["renamed_files"]: - summary_lines.append(f"\n📝 Umbenannte Dateien:") + summary_lines.append("\n📝 Umbenannte Dateien:") for renamed in stats["renamed_files"]: summary_lines.append(f" • {renamed}") @@ -3938,7 +4074,7 @@ class MainWindow(QMainWindow): return # Erstelle und konfiguriere Thread - self.transformation_thread = TransformationThread(jobs, force=force) + self.transformation_thread = TransformationThread(jobs, force=force, max_workers=app_settings.max_workers) # Verbinde Signale self.transformation_thread.job_started.connect(self._on_transformation_job_started) @@ -4537,6 +4673,9 @@ class MainWindow(QMainWindow): self.transformation_thread.quit() self.transformation_thread.wait() + # Beende Saxon-Worker-Pool + self._shutdown_saxon_worker_pool() + # PDF-Dokumente schließen ist bei QtPdf automatisch durch Garbage Collection super().closeEvent(event)