diff --git a/src/fop_pool.py b/src/fop_pool.py index c2dcb7d..cc3e487 100644 --- a/src/fop_pool.py +++ b/src/fop_pool.py @@ -5,16 +5,12 @@ Eliminiert JVM-Startup-Overhead durch Vorinitialisierung von N Worker-Prozessen. Jeder Worker läuft als Daemon und verarbeitet mehrere FO→PDF Transformationen nacheinander. """ +import glob import logging -import subprocess -import threading -import time -import psutil from pathlib import Path from typing import Optional -import tempfile -from worker_metrics import WorkerPoolMetrics +from worker_pool_base import BaseWorkerPool, _CLASSPATH_SEP logger = logging.getLogger(__name__) @@ -164,7 +160,7 @@ public class FopWorker { """ -class FopWorkerPool: +class FopWorkerPool(BaseWorkerPool): """ Pool von lang-laufenden JVM-Prozessen für Apache FOP PDF-Generierung. @@ -179,53 +175,19 @@ class FopWorkerPool: fop_config_file: Optional[Path] = None, log_dir: Optional[Path] = None, ): - """ - Initialisiert den FOP-Worker-Pool. - - Args: - num_workers: Anzahl der Worker-Prozesse - java_vm_path: Pfad zur Java VM Binary - apache_fop_dir: Pfad zum Apache FOP-Verzeichnis - fop_config_file: Optionaler Pfad zur fop.xconf Konfigurationsdatei - log_dir: Optionales Verzeichnis für Worker-Logs (Standard: temp_dir/tmp) - """ - self.num_workers = num_workers - self.java_vm_path = java_vm_path + super().__init__(num_workers, java_vm_path, log_dir) self.apache_fop_dir = apache_fop_dir self.fop_config_file = fop_config_file - self.log_dir = log_dir - - # Worker-Prozesse - self.workers: list[subprocess.Popen] = [] - self.worker_locks: list[threading.Lock] = [] - - # Temporäres Verzeichnis für kompilierte Java-Klasse - self.temp_dir: Optional[Path] = None - self.worker_class_path: Optional[Path] = None - self.worker_log_dir: Optional[Path] = None - - # Classpath für FOP self.fop_classpath: Optional[str] = None - # Performance-Metriken - self.metrics = WorkerPoolMetrics() - - # Initialisierung self._build_fop_classpath() self._compile_worker_class() self._start_workers() - logger.info(f"FopWorkerPool initialisiert mit {num_workers} Workern") def _build_fop_classpath(self): """Erstellt den Classpath für Apache FOP.""" - import glob - import sys - - # Sammle alle JAR-Dateien im FOP-Verzeichnis all_jars = glob.glob(str(self.apache_fop_dir / "build" / "*.jar")) - - # FOP lib-Verzeichnis lib_dir = self.apache_fop_dir / "lib" if lib_dir.exists() and lib_dir.is_dir(): all_jars.extend(glob.glob(str(lib_dir / "*.jar"))) @@ -233,120 +195,46 @@ class FopWorkerPool: if not all_jars: raise RuntimeError(f"Keine FOP JAR-Dateien gefunden in {self.apache_fop_dir}") - classpath_separator = ";" if sys.platform == "win32" else ":" - self.fop_classpath = classpath_separator.join(all_jars) - + self.fop_classpath = _CLASSPATH_SEP.join(all_jars) logger.debug(f"FOP Classpath: {len(all_jars)} JARs") - def _compile_worker_class(self): - """Kompiliert die FopWorker-Java-Klasse.""" - start_time = time.time() - try: - # Erstelle temporäres Verzeichnis - self.temp_dir = Path(tempfile.mkdtemp(prefix="fop_worker_")) + # --- Abstrakte Properties --- - # Schreibe Java-Quellcode - java_file = self.temp_dir / "FopWorker.java" - java_file.write_text(FOP_WORKER_JAVA, encoding="utf-8") + @property + def _pool_name(self) -> str: + return "FOP" - # Kompiliere Java-Klasse - javac_cmd = [ - str(self.java_vm_path).replace("java", "javac"), - "-cp", - self.fop_classpath, - str(java_file), - ] + @property + def _java_source_code(self) -> str: + return FOP_WORKER_JAVA - logger.debug(f"Kompiliere FopWorker: {' '.join(javac_cmd[:3])}...") + @property + def _java_class_name(self) -> str: + return "FopWorker" - result = subprocess.run(javac_cmd, capture_output=True, text=True, timeout=30) + @property + def _temp_dir_prefix(self) -> str: + return "fop_worker_" - if result.returncode != 0: - raise RuntimeError(f"Java-Kompilierung fehlgeschlagen: {result.stderr}") + @property + def _worker_init_sleep(self) -> float: + return 0.2 # FOP braucht etwas länger zum Initialisieren - self.worker_class_path = self.temp_dir + # --- Abstrakte Methoden --- - # Speichere Kompilierungszeit - self.metrics.compilation_time_seconds = time.time() - start_time + def _get_classpath(self) -> str: + return self.fop_classpath - logger.info( - f"FopWorker erfolgreich kompiliert: {self.temp_dir} " f"({self.metrics.compilation_time_seconds:.3f}s)" - ) + def _build_worker_cmd(self, full_classpath: str) -> list[str]: + cmd = [str(self.java_vm_path), "-cp", full_classpath, "FopWorker"] + if self.fop_config_file and self.fop_config_file.exists(): + cmd.append(str(self.fop_config_file)) + return cmd - except Exception as e: - logger.error(f"Fehler beim Kompilieren von FopWorker: {e}") - raise + def _stderr_log_name(self, i: int) -> str: + return f"fop_worker_{i}_stderr.log" - def _start_workers(self): - """Startet N Worker-Prozesse.""" - import sys - - # Füge Worker-Classpath zum FOP-Classpath hinzu - classpath_separator = ";" if sys.platform == "win32" else ":" - full_classpath = str(self.worker_class_path) + classpath_separator + self.fop_classpath - - # Bestimme Log-Verzeichnis - self.worker_log_dir = self.log_dir if self.log_dir else self.temp_dir - if self.log_dir: - self.worker_log_dir.mkdir(parents=True, exist_ok=True) - - for i in range(self.num_workers): - worker_start_time = time.time() - try: - # Starte JVM-Prozess mit FopWorker - # Übergebe fop.xconf als Argument falls vorhanden - cmd = [str(self.java_vm_path), "-cp", full_classpath, "FopWorker"] - - if self.fop_config_file and self.fop_config_file.exists(): - cmd.append(str(self.fop_config_file)) - - # Öffne stderr-Log-Datei für diesen Worker - stderr_log = self.worker_log_dir / f"fop_worker_{i}_stderr.log" - stderr_file = open(stderr_log, "w", encoding="utf-8") - - process = subprocess.Popen( - cmd, - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=stderr_file, # Redirect stderr to file - text=True, - bufsize=1, # Line buffered - ) - - self.workers.append(process) - self.worker_locks.append(threading.Lock()) - - logger.debug(f"FOP Worker {i} gestartet (PID: {process.pid}, stderr: {stderr_log})") - - # Warte kurz damit Worker initialisieren kann - time.sleep(0.2) # FOP braucht etwas länger zum Initialisieren - - # Prüfe ob Worker noch läuft - if process.poll() is not None: - # Worker ist bereits beendet - Fehler! - stderr_file.close() - with open(stderr_log, "r") as f: - stderr_content = f.read() - raise RuntimeError( - f"FOP Worker {i} ist sofort beendet (Exit Code: {process.returncode})\nstderr:\n{stderr_content}" - ) - - # Speichere Worker-Startzeit - worker_elapsed = time.time() - worker_start_time - self.metrics.worker_start_times.append(worker_elapsed) - - except Exception as e: - logger.error(f"Fehler beim Starten von FOP Worker {i}: {e}") - raise - - # Berechne Aggregat-Werte für Worker-Startzeiten - self.metrics.calculate_aggregates() - - logger.info( - f"{len(self.workers)} FOP-Worker erfolgreich gestartet " - f"(Summe: {self.metrics.total_worker_start_time_seconds:.3f}s, " - f"Durchschnitt: {self.metrics.average_worker_start_time_seconds:.3f}s)" - ) + # --- FOP-spezifische Job-Methode --- def build_pdf(self, input_fo: Path, output_pdf: Path) -> tuple[bool, str]: """ @@ -359,167 +247,36 @@ class FopWorkerPool: Returns: tuple[bool, str]: (Erfolg, Fehlermeldung/Info) """ - # Finde freien Worker - worker_idx = None - for i, lock in enumerate(self.worker_locks): - if lock.acquire(blocking=False): - worker_idx = i - break - - if worker_idx is None: - # Kein freier Worker, warte auf ersten verfügbaren - for i, lock in enumerate(self.worker_locks): - lock.acquire() - worker_idx = i - break - + worker_idx = self._acquire_worker() try: worker = self.workers[worker_idx] - # Prüfe ob Worker noch läuft if worker.poll() is not None: - # Worker ist tot! - stderr_log = self.worker_log_dir / f"fop_worker_{worker_idx}_stderr.log" - try: - with open(stderr_log, "r") as f: - stderr_content = f.read() - error_msg = ( - f"FOP Worker {worker_idx} ist beendet (Exit: {worker.returncode})\nstderr:\n{stderr_content}" - ) - except Exception: - error_msg = f"FOP Worker {worker_idx} ist beendet (Exit: {worker.returncode})" + stderr_content = self._read_stderr_log(worker_idx) + error_msg = f"FOP Worker {worker_idx} ist beendet (Exit: {worker.returncode})\nstderr:\n{stderr_content}" logger.error(error_msg) return False, error_msg - # Erstelle Job-String (Tab-separated) job = f"{input_fo}\t{output_pdf}\n" - logger.debug(f"Sende FOP-Job an Worker {worker_idx}: {input_fo.name} → {output_pdf.name}") - - # Sende Job an Worker worker.stdin.write(job) worker.stdin.flush() - # Warte auf Antwort response = worker.stdout.readline().strip() - logger.debug(f"FOP Worker {worker_idx} Antwort: '{response}'") if response == "OK": return True, "Erfolgreich" elif response.startswith("ERROR:"): - error_msg = response[6:].strip() - return False, f"FOP-Fehler: {error_msg}" + return False, f"FOP-Fehler: {response[6:].strip()}" + elif not response: + stderr_content = self._read_stderr_log(worker_idx, tail=500) + return False, f"FOP Worker {worker_idx} crashed (keine Antwort)\nstderr:\n{stderr_content}" else: - # Leere Antwort bedeutet Worker ist crashed - if not response: - stderr_log = self.worker_log_dir / f"fop_worker_{worker_idx}_stderr.log" - try: - with open(stderr_log, "r") as f: - stderr_content = f.read()[-500:] # Letzte 500 Zeichen - return False, f"FOP Worker {worker_idx} crashed (keine Antwort)\nstderr:\n{stderr_content}" - except Exception: - return False, f"FOP Worker {worker_idx} crashed (keine Antwort)" return False, f"Unerwartete Antwort: {response}" except Exception as e: logger.error(f"Fehler bei FOP Worker {worker_idx}: {e}") return False, f"Worker-Fehler: {str(e)}" - finally: - # Gebe Worker-Lock frei self.worker_locks[worker_idx].release() - - def measure_ram_usage(self) -> tuple[float, float, list[float]]: - """ - Misst den aktuellen RAM-Verbrauch aller Worker-Prozesse. - - Returns: - tuple: (total_mb, average_mb, per_worker_mb_list) - """ - ram_per_worker = [] - - for i, worker in enumerate(self.workers): - try: - if worker.poll() is None: # Worker läuft noch - process = psutil.Process(worker.pid) - # Hole Speicherinfo (RSS = Resident Set Size in Bytes) - mem_info = process.memory_info() - ram_mb = mem_info.rss / (1024 * 1024) # Konvertiere zu MB - ram_per_worker.append(ram_mb) - else: - logger.warning(f"Worker {i} ist nicht mehr aktiv (kann RAM nicht messen)") - except (psutil.NoSuchProcess, psutil.AccessDenied) as e: - logger.warning(f"Konnte RAM für Worker {i} nicht messen: {e}") - - total_ram = sum(ram_per_worker) - average_ram = total_ram / len(ram_per_worker) if ram_per_worker else 0.0 - - return total_ram, average_ram, ram_per_worker - - def capture_ram_before_transform(self): - """Erfasst RAM-Verbrauch vor der ersten Transformation.""" - total, average, per_worker = self.measure_ram_usage() - self.metrics.ram_before_transform_mb_per_worker = per_worker - self.metrics.total_ram_before_mb = total - self.metrics.average_ram_before_mb = average - - logger.info( - f"RAM vor Transformation: {self.metrics.total_ram_before_mb:.1f} MB " - f"(Durchschnitt: {self.metrics.average_ram_before_mb:.1f} MB/Worker)" - ) - - def capture_ram_after_transform(self): - """Erfasst RAM-Verbrauch nach allen Transformationen.""" - total, average, per_worker = self.measure_ram_usage() - self.metrics.ram_after_transform_mb_per_worker = per_worker - self.metrics.total_ram_after_mb = total - self.metrics.average_ram_after_mb = average - - logger.info( - f"RAM nach Transformation: {self.metrics.total_ram_after_mb:.1f} MB " - f"(Durchschnitt: {self.metrics.average_ram_after_mb:.1f} MB/Worker)" - ) - - def shutdown(self): - """Beendet alle Worker-Prozesse sauber.""" - logger.info("Beende FOP-Worker-Pool...") - - for i, worker in enumerate(self.workers): - try: - # Sende EXIT-Befehl - if worker.stdin and not worker.stdin.closed: - worker.stdin.write("EXIT\n") - worker.stdin.flush() - - # Warte auf Beendigung (max 2 Sekunden) - worker.wait(timeout=2) - logger.debug(f"FOP Worker {i} beendet") - - except subprocess.TimeoutExpired: - # Force kill falls nötig - worker.kill() - logger.warning(f"FOP Worker {i} musste gekillt werden") - - except Exception as e: - logger.error(f"Fehler beim Beenden von FOP Worker {i}: {e}") - - # Lösche temporäres Verzeichnis - if self.temp_dir and self.temp_dir.exists(): - try: - import shutil - - shutil.rmtree(self.temp_dir) - logger.debug(f"Temporäres Verzeichnis gelöscht: {self.temp_dir}") - except Exception as e: - logger.warning(f"Konnte temporäres Verzeichnis nicht löschen: {e}") - - logger.info("FOP-Worker-Pool beendet") - - def __enter__(self): - """Context manager entry.""" - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - """Context manager exit.""" - self.shutdown() diff --git a/src/saxon_pool.py b/src/saxon_pool.py index 0222348..2a6def7 100644 --- a/src/saxon_pool.py +++ b/src/saxon_pool.py @@ -6,15 +6,10 @@ Jeder Worker läuft als Daemon und verarbeitet mehrere Transformationen nacheina """ import logging -import subprocess -import threading -import time -import psutil from pathlib import Path from typing import Optional -import tempfile -from worker_metrics import WorkerPoolMetrics +from worker_pool_base import BaseWorkerPool, build_jar_classpath logger = logging.getLogger(__name__) @@ -175,9 +170,9 @@ public class SaxonWorker { """ -class SaxonWorkerPool: +class SaxonWorkerPool(BaseWorkerPool): """ - Pool von lang-laufenden JVM-Prozessen für Saxon-Transformationen. + Pool von lang-laufenden JVM-Prozessen für Saxon-Transformationen (JAXP/XSLT 1.0). Eliminiert JVM-Startup-Overhead durch Wiederverwendung von N Worker-Prozessen. """ @@ -190,161 +185,51 @@ class SaxonWorkerPool: classpath_cache: dict[Path, str], log_dir: Optional[Path] = None, ): - """ - Initialisiert den Saxon-Worker-Pool. - - Args: - num_workers: Anzahl der Worker-Prozesse - java_vm_path: Pfad zur Java VM Binary - saxon_jar_path: Pfad zur Saxon JAR-Datei - classpath_cache: Cache für Saxon-Classpaths - log_dir: Optionales Verzeichnis für Worker-Logs (Standard: temp_dir/tmp) - """ - self.num_workers = num_workers - self.java_vm_path = java_vm_path + super().__init__(num_workers, java_vm_path, log_dir) self.saxon_jar_path = saxon_jar_path self.classpath_cache = classpath_cache - self.log_dir = log_dir - # Worker-Prozesse - self.workers: list[subprocess.Popen] = [] - self.worker_locks: list[threading.Lock] = [] - - # Temporäres Verzeichnis für kompilierte Java-Klasse - self.temp_dir: Optional[Path] = None - self.worker_class_path: Optional[Path] = None - self.worker_log_dir: Optional[Path] = None - - # Performance-Metriken - self.metrics = WorkerPoolMetrics() - - # Initialisierung self._compile_worker_class() self._start_workers() - logger.info(f"SaxonWorkerPool initialisiert mit {num_workers} Workern") - def _compile_worker_class(self): - """Kompiliert die SaxonWorker-Java-Klasse.""" - start_time = time.time() - try: - # Erstelle temporäres Verzeichnis - self.temp_dir = Path(tempfile.mkdtemp(prefix="saxon_worker_")) + # --- Abstrakte Properties --- - # Schreibe Java-Quellcode - java_file = self.temp_dir / "SaxonWorker.java" - java_file.write_text(SAXON_WORKER_JAVA, encoding="utf-8") + @property + def _pool_name(self) -> str: + return "Saxon" - # Hole Classpath - saxon_dir = self.saxon_jar_path.parent - if saxon_dir in self.classpath_cache: - classpath = self.classpath_cache[saxon_dir] - else: - # Fallback: Baue Classpath neu - import glob - import sys + @property + def _java_source_code(self) -> str: + return SAXON_WORKER_JAVA - all_jars = glob.glob(str(saxon_dir / "*.jar")) - lib_dir = saxon_dir / "lib" - if lib_dir.exists(): - all_jars.extend(glob.glob(str(lib_dir / "*.jar"))) + @property + def _java_class_name(self) -> str: + return "SaxonWorker" - classpath_separator = ";" if sys.platform == "win32" else ":" - classpath = classpath_separator.join(all_jars) + @property + def _temp_dir_prefix(self) -> str: + return "saxon_worker_" - # Kompiliere Java-Klasse - javac_cmd = [str(self.java_vm_path).replace("java", "javac"), "-cp", classpath, str(java_file)] + @property + def _worker_init_sleep(self) -> float: + return 0.1 - logger.debug(f"Kompiliere SaxonWorker: {' '.join(javac_cmd)}") + # --- Abstrakte Methoden --- - result = subprocess.run(javac_cmd, capture_output=True, text=True, timeout=30) - - if result.returncode != 0: - raise RuntimeError(f"Java-Kompilierung fehlgeschlagen: {result.stderr}") - - self.worker_class_path = self.temp_dir - - # Speichere Kompilierungszeit - self.metrics.compilation_time_seconds = time.time() - start_time - - logger.info( - f"SaxonWorker erfolgreich kompiliert: {self.temp_dir} " f"({self.metrics.compilation_time_seconds:.3f}s)" - ) - - except Exception as e: - logger.error(f"Fehler beim Kompilieren von SaxonWorker: {e}") - raise - - def _start_workers(self): - """Startet N Worker-Prozesse.""" - # Hole Classpath + def _get_classpath(self) -> str: saxon_dir = self.saxon_jar_path.parent - classpath = self.classpath_cache.get(saxon_dir, "") + if saxon_dir not in self.classpath_cache: + self.classpath_cache[saxon_dir] = build_jar_classpath(saxon_dir) + return self.classpath_cache[saxon_dir] - # Füge Worker-Classpath hinzu - import sys + def _build_worker_cmd(self, full_classpath: str) -> list[str]: + return [str(self.java_vm_path), "-cp", full_classpath, "SaxonWorker"] - classpath_separator = ";" if sys.platform == "win32" else ":" - full_classpath = str(self.worker_class_path) + classpath_separator + classpath + def _stderr_log_name(self, i: int) -> str: + return f"worker_{i}_stderr.log" - # Bestimme Log-Verzeichnis - self.worker_log_dir = self.log_dir if self.log_dir else self.temp_dir - if self.log_dir: - self.worker_log_dir.mkdir(parents=True, exist_ok=True) - - for i in range(self.num_workers): - worker_start_time = time.time() - try: - # Starte JVM-Prozess mit SaxonWorker - cmd = [str(self.java_vm_path), "-cp", full_classpath, "SaxonWorker"] - - # Öffne stderr-Log-Datei für diesen Worker - stderr_log = self.worker_log_dir / f"worker_{i}_stderr.log" - stderr_file = open(stderr_log, "w", encoding="utf-8") - - process = subprocess.Popen( - cmd, - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=stderr_file, # Redirect stderr to file - text=True, - bufsize=1, # Line buffered - ) - - self.workers.append(process) - self.worker_locks.append(threading.Lock()) - - logger.debug(f"Worker {i} gestartet (PID: {process.pid}, stderr: {stderr_log})") - - # Warte kurz damit Worker initialisieren kann - time.sleep(0.1) - - # Prüfe ob Worker noch läuft - if process.poll() is not None: - # Worker ist bereits beendet - Fehler! - stderr_file.close() - with open(stderr_log, "r") as f: - stderr_content = f.read() - raise RuntimeError( - f"Worker {i} ist sofort beendet (Exit Code: {process.returncode})\nstderr:\n{stderr_content}" - ) - - # Speichere Worker-Startzeit - worker_elapsed = time.time() - worker_start_time - self.metrics.worker_start_times.append(worker_elapsed) - - except Exception as e: - logger.error(f"Fehler beim Starten von Worker {i}: {e}") - raise - - # Berechne Aggregat-Werte für Worker-Startzeiten - self.metrics.calculate_aggregates() - - logger.info( - f"{len(self.workers)} Saxon-Worker erfolgreich gestartet " - f"(Summe: {self.metrics.total_worker_start_time_seconds:.3f}s, " - f"Durchschnitt: {self.metrics.average_worker_start_time_seconds:.3f}s)" - ) + # --- Saxon-spezifische Job-Methode --- def transform( self, source_xml: Path, xsl_stylesheet: Path, output_fo: Path, xslt_params: dict[str, str] @@ -361,170 +246,38 @@ class SaxonWorkerPool: Returns: tuple[bool, str]: (Erfolg, Fehlermeldung/Info) """ - # Finde freien Worker - worker_idx = None - for i, lock in enumerate(self.worker_locks): - if lock.acquire(blocking=False): - worker_idx = i - break - - if worker_idx is None: - # Kein freier Worker, warte auf ersten verfügbaren - for i, lock in enumerate(self.worker_locks): - lock.acquire() - worker_idx = i - break - + worker_idx = self._acquire_worker() try: worker = self.workers[worker_idx] - # Prüfe ob Worker noch läuft if worker.poll() is not None: - # Worker ist tot! - stderr_log = self.worker_log_dir / f"worker_{worker_idx}_stderr.log" - try: - with open(stderr_log, "r") as f: - stderr_content = f.read() - error_msg = ( - f"Worker {worker_idx} ist beendet (Exit: {worker.returncode})\nstderr:\n{stderr_content}" - ) - except Exception: - error_msg = f"Worker {worker_idx} ist beendet (Exit: {worker.returncode})" + stderr_content = self._read_stderr_log(worker_idx) + error_msg = f"Worker {worker_idx} ist beendet (Exit: {worker.returncode})\nstderr:\n{stderr_content}" logger.error(error_msg) return False, error_msg - # Formatiere Parameter - params_str = "|||".join([f"{key}={value}" for key, value in xslt_params.items()]) - - # Erstelle Job-String (Tab-separated) + params_str = "|||".join([f"{k}={v}" for k, v in xslt_params.items()]) job = f"{source_xml}\t{xsl_stylesheet}\t{output_fo}\t{params_str}\n" logger.debug(f"Sende Job an Worker {worker_idx}: {source_xml.name}") - - # Sende Job an Worker worker.stdin.write(job) worker.stdin.flush() - # Warte auf Antwort response = worker.stdout.readline().strip() - logger.debug(f"Worker {worker_idx} Antwort: '{response}'") if response == "OK": return True, "Erfolgreich" elif response.startswith("ERROR:"): - error_msg = response[6:].strip() - return False, f"Saxon-Fehler: {error_msg}" + return False, f"Saxon-Fehler: {response[6:].strip()}" + elif not response: + stderr_content = self._read_stderr_log(worker_idx, tail=500) + return False, f"Worker {worker_idx} crashed (keine Antwort)\nstderr:\n{stderr_content}" else: - # Leere Antwort bedeutet Worker ist crashed - if not response: - stderr_log = self.worker_log_dir / f"worker_{worker_idx}_stderr.log" - try: - with open(stderr_log, "r") as f: - stderr_content = f.read()[-500:] # Letzte 500 Zeichen - return False, f"Worker {worker_idx} crashed (keine Antwort)\nstderr:\n{stderr_content}" - except Exception: - return False, f"Worker {worker_idx} crashed (keine Antwort)" return False, f"Unerwartete Antwort: {response}" except Exception as e: logger.error(f"Fehler bei Worker {worker_idx}: {e}") return False, f"Worker-Fehler: {str(e)}" - finally: - # Gebe Worker-Lock frei self.worker_locks[worker_idx].release() - - def measure_ram_usage(self) -> tuple[float, float, list[float]]: - """ - Misst den aktuellen RAM-Verbrauch aller Worker-Prozesse. - - Returns: - tuple: (total_mb, average_mb, per_worker_mb_list) - """ - ram_per_worker = [] - - for i, worker in enumerate(self.workers): - try: - if worker.poll() is None: # Worker läuft noch - process = psutil.Process(worker.pid) - # Hole Speicherinfo (RSS = Resident Set Size in Bytes) - mem_info = process.memory_info() - ram_mb = mem_info.rss / (1024 * 1024) # Konvertiere zu MB - ram_per_worker.append(ram_mb) - else: - logger.warning(f"Worker {i} ist nicht mehr aktiv (kann RAM nicht messen)") - except (psutil.NoSuchProcess, psutil.AccessDenied) as e: - logger.warning(f"Konnte RAM für Worker {i} nicht messen: {e}") - - total_ram = sum(ram_per_worker) - average_ram = total_ram / len(ram_per_worker) if ram_per_worker else 0.0 - - return total_ram, average_ram, ram_per_worker - - def capture_ram_before_transform(self): - """Erfasst RAM-Verbrauch vor der ersten Transformation.""" - total, average, per_worker = self.measure_ram_usage() - self.metrics.ram_before_transform_mb_per_worker = per_worker - self.metrics.total_ram_before_mb = total - self.metrics.average_ram_before_mb = average - - logger.info( - f"RAM vor Transformation: {self.metrics.total_ram_before_mb:.1f} MB " - f"(Durchschnitt: {self.metrics.average_ram_before_mb:.1f} MB/Worker)" - ) - - def capture_ram_after_transform(self): - """Erfasst RAM-Verbrauch nach allen Transformationen.""" - total, average, per_worker = self.measure_ram_usage() - self.metrics.ram_after_transform_mb_per_worker = per_worker - self.metrics.total_ram_after_mb = total - self.metrics.average_ram_after_mb = average - - logger.info( - f"RAM nach Transformation: {self.metrics.total_ram_after_mb:.1f} MB " - f"(Durchschnitt: {self.metrics.average_ram_after_mb:.1f} MB/Worker)" - ) - - def shutdown(self): - """Beendet alle Worker-Prozesse sauber.""" - logger.info("Beende Saxon-Worker-Pool...") - - for i, worker in enumerate(self.workers): - try: - # Sende EXIT-Befehl - if worker.stdin and not worker.stdin.closed: - worker.stdin.write("EXIT\n") - worker.stdin.flush() - - # Warte auf Beendigung (max 2 Sekunden) - worker.wait(timeout=2) - logger.debug(f"Worker {i} beendet") - - except subprocess.TimeoutExpired: - # Force kill falls nötig - worker.kill() - logger.warning(f"Worker {i} musste gekillt werden") - - except Exception as e: - logger.error(f"Fehler beim Beenden von Worker {i}: {e}") - - # Lösche temporäres Verzeichnis - if self.temp_dir and self.temp_dir.exists(): - try: - import shutil - - shutil.rmtree(self.temp_dir) - logger.debug(f"Temporäres Verzeichnis gelöscht: {self.temp_dir}") - except Exception as e: - logger.warning(f"Konnte temporäres Verzeichnis nicht löschen: {e}") - - logger.info("Saxon-Worker-Pool beendet") - - def __enter__(self): - """Context manager entry.""" - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - """Context manager exit.""" - self.shutdown() diff --git a/src/saxon_pool_s9api.py b/src/saxon_pool_s9api.py index 2da19b9..e182d46 100644 --- a/src/saxon_pool_s9api.py +++ b/src/saxon_pool_s9api.py @@ -7,15 +7,10 @@ Jeder Worker läuft als Daemon und verarbeitet mehrere Transformationen nacheina """ import logging -import subprocess -import threading -import time -import psutil from pathlib import Path from typing import Optional -import tempfile -from worker_metrics import WorkerPoolMetrics +from worker_pool_base import BaseWorkerPool, build_jar_classpath logger = logging.getLogger(__name__) @@ -155,7 +150,7 @@ public class SaxonS9ApiWorker { """ -class SaxonWorkerPoolS9Api: +class SaxonWorkerPoolS9Api(BaseWorkerPool): """ Pool von lang-laufenden JVM-Prozessen für Saxon-Transformationen mit s9api. @@ -171,181 +166,52 @@ class SaxonWorkerPoolS9Api: classpath_cache: dict[Path, str], log_dir: Optional[Path] = None, ): - """ - Initialisiert den Saxon-Worker-Pool mit s9api. - - Args: - num_workers: Anzahl der Worker-Prozesse - java_vm_path: Pfad zur Java VM Binary - saxon_jar_path: Pfad zur Saxon JAR-Datei - classpath_cache: Cache für Saxon-Classpaths - log_dir: Optionales Verzeichnis für Worker-Logs (Standard: temp_dir/tmp) - """ - self.num_workers = num_workers - self.java_vm_path = java_vm_path + super().__init__(num_workers, java_vm_path, log_dir) self.saxon_jar_path = saxon_jar_path self.classpath_cache = classpath_cache - self.log_dir = log_dir - # Worker-Prozesse - self.workers: list[subprocess.Popen] = [] - self.worker_locks: list[threading.Lock] = [] - - # Temporäres Verzeichnis für kompilierte Java-Klasse - self.temp_dir: Optional[Path] = None - self.worker_class_path: Optional[Path] = None - self.worker_log_dir: Optional[Path] = None - - # Performance-Metriken - self.metrics = WorkerPoolMetrics() - - # Initialisierung self._compile_worker_class() self._start_workers() - logger.info(f"SaxonWorkerPoolS9Api initialisiert mit {num_workers} Workern (XSLT 2.0/3.0)") - def _compile_worker_class(self): - """Kompiliert die SaxonS9ApiWorker-Java-Klasse.""" - start_time = time.time() - try: - # Erstelle temporäres Verzeichnis - self.temp_dir = Path(tempfile.mkdtemp(prefix="saxon_s9api_worker_")) + # --- Abstrakte Properties --- - # Schreibe Java-Quellcode - java_file = self.temp_dir / "SaxonS9ApiWorker.java" - java_file.write_text(SAXON_S9API_WORKER_JAVA, encoding="utf-8") + @property + def _pool_name(self) -> str: + return "Saxon-S9Api" - # Hole Classpath - saxon_dir = self.saxon_jar_path.parent - if saxon_dir in self.classpath_cache: - classpath = self.classpath_cache[saxon_dir] - else: - # Fallback: Baue Classpath neu - import glob - import sys + @property + def _java_source_code(self) -> str: + return SAXON_S9API_WORKER_JAVA - all_jars = glob.glob(str(saxon_dir / "*.jar")) - lib_dir = saxon_dir / "lib" - if lib_dir.exists(): - all_jars.extend(glob.glob(str(lib_dir / "*.jar"))) + @property + def _java_class_name(self) -> str: + return "SaxonS9ApiWorker" - classpath_separator = ";" if sys.platform == "win32" else ":" - classpath = classpath_separator.join(all_jars) + @property + def _temp_dir_prefix(self) -> str: + return "saxon_s9api_worker_" - # Kompiliere Java-Klasse - javac_cmd = [str(self.java_vm_path).replace("java", "javac"), "-cp", classpath, str(java_file)] + @property + def _worker_init_sleep(self) -> float: + return 0.1 - logger.debug(f"Kompiliere SaxonS9ApiWorker: {' '.join(javac_cmd)}") + # --- Abstrakte Methoden --- - result = subprocess.run(javac_cmd, capture_output=True, text=True, timeout=30) - - if result.returncode != 0: - raise RuntimeError(f"Java-Kompilierung fehlgeschlagen: {result.stderr}") - - self.worker_class_path = self.temp_dir - - # Speichere Kompilierungszeit - self.metrics.compilation_time_seconds = time.time() - start_time - - logger.info( - f"SaxonS9ApiWorker erfolgreich kompiliert: {self.temp_dir} " - f"({self.metrics.compilation_time_seconds:.3f}s)" - ) - - except Exception as e: - logger.error(f"Fehler beim Kompilieren von SaxonS9ApiWorker: {e}") - raise - - def _start_workers(self): - """Startet N Worker-Prozesse.""" - # Hole Classpath + def _get_classpath(self) -> str: saxon_dir = self.saxon_jar_path.parent - if saxon_dir in self.classpath_cache: - classpath = self.classpath_cache[saxon_dir] - else: - # Fallback: Baue Classpath neu (sollte nicht nötig sein, aber zur Sicherheit) - import glob - import sys - - all_jars = glob.glob(str(saxon_dir / "*.jar")) - lib_dir = saxon_dir / "lib" - if lib_dir.exists(): - all_jars.extend(glob.glob(str(lib_dir / "*.jar"))) - - classpath_separator = ";" if sys.platform == "win32" else ":" - classpath = classpath_separator.join(all_jars) - - # Cache für zukünftige Verwendung - self.classpath_cache[saxon_dir] = classpath + if saxon_dir not in self.classpath_cache: + self.classpath_cache[saxon_dir] = build_jar_classpath(saxon_dir) logger.debug(f"Classpath für {saxon_dir} neu erstellt und gecacht") + return self.classpath_cache[saxon_dir] - # Füge Worker-Classpath hinzu - import sys + def _build_worker_cmd(self, full_classpath: str) -> list[str]: + return [str(self.java_vm_path), "-cp", full_classpath, "SaxonS9ApiWorker"] - classpath_separator = ";" if sys.platform == "win32" else ":" - full_classpath = str(self.worker_class_path) + classpath_separator + classpath + def _stderr_log_name(self, i: int) -> str: + return f"s9api_worker_{i}_stderr.log" - logger.debug(f"S9Api Worker Classpath: {full_classpath[:200]}...") - - # Bestimme Log-Verzeichnis - self.worker_log_dir = self.log_dir if self.log_dir else self.temp_dir - if self.log_dir: - self.worker_log_dir.mkdir(parents=True, exist_ok=True) - - for i in range(self.num_workers): - worker_start_time = time.time() - try: - # Starte JVM-Prozess mit SaxonS9ApiWorker - cmd = [str(self.java_vm_path), "-cp", full_classpath, "SaxonS9ApiWorker"] - - # Öffne stderr-Log-Datei für diesen Worker - stderr_log = self.worker_log_dir / f"s9api_worker_{i}_stderr.log" - stderr_file = open(stderr_log, "w", encoding="utf-8") - - process = subprocess.Popen( - cmd, - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=stderr_file, # Redirect stderr to file - text=True, - bufsize=1, # Line buffered - ) - - self.workers.append(process) - self.worker_locks.append(threading.Lock()) - - logger.debug(f"S9Api Worker {i} gestartet (PID: {process.pid}, stderr: {stderr_log})") - - # Warte kurz damit Worker initialisieren kann - time.sleep(0.1) - - # Prüfe ob Worker noch läuft - if process.poll() is not None: - # Worker ist bereits beendet - Fehler! - stderr_file.close() - with open(stderr_log, "r") as f: - stderr_content = f.read() - raise RuntimeError( - f"S9Api Worker {i} ist sofort beendet (Exit Code: {process.returncode})\nstderr:\n{stderr_content}" - ) - - # Speichere Worker-Startzeit - worker_elapsed = time.time() - worker_start_time - self.metrics.worker_start_times.append(worker_elapsed) - - except Exception as e: - logger.error(f"Fehler beim Starten von S9Api Worker {i}: {e}") - raise - - # Berechne Aggregat-Werte für Worker-Startzeiten - self.metrics.calculate_aggregates() - - logger.info( - f"{len(self.workers)} Saxon-S9Api-Worker erfolgreich gestartet " - f"(Summe: {self.metrics.total_worker_start_time_seconds:.3f}s, " - f"Durchschnitt: {self.metrics.average_worker_start_time_seconds:.3f}s)" - ) + # --- Saxon-s9api-spezifische Job-Methode --- def transform( self, source_xml: Path, xsl_stylesheet: Path, output_fo: Path, xslt_params: dict[str, str] @@ -362,170 +228,40 @@ class SaxonWorkerPoolS9Api: Returns: tuple[bool, str]: (Erfolg, Fehlermeldung/Info) """ - # Finde freien Worker - worker_idx = None - for i, lock in enumerate(self.worker_locks): - if lock.acquire(blocking=False): - worker_idx = i - break - - if worker_idx is None: - # Kein freier Worker, warte auf ersten verfügbaren - for i, lock in enumerate(self.worker_locks): - lock.acquire() - worker_idx = i - break - + worker_idx = self._acquire_worker() try: worker = self.workers[worker_idx] - # Prüfe ob Worker noch läuft if worker.poll() is not None: - # Worker ist tot! - stderr_log = self.worker_log_dir / f"s9api_worker_{worker_idx}_stderr.log" - try: - with open(stderr_log, "r") as f: - stderr_content = f.read() - error_msg = ( - f"S9Api Worker {worker_idx} ist beendet (Exit: {worker.returncode})\nstderr:\n{stderr_content}" - ) - except Exception: - error_msg = f"S9Api Worker {worker_idx} ist beendet (Exit: {worker.returncode})" + stderr_content = self._read_stderr_log(worker_idx) + error_msg = ( + f"S9Api Worker {worker_idx} ist beendet (Exit: {worker.returncode})\nstderr:\n{stderr_content}" + ) logger.error(error_msg) return False, error_msg - # Formatiere Parameter - params_str = "|||".join([f"{key}={value}" for key, value in xslt_params.items()]) - - # Erstelle Job-String (Tab-separated) + params_str = "|||".join([f"{k}={v}" for k, v in xslt_params.items()]) job = f"{source_xml}\t{xsl_stylesheet}\t{output_fo}\t{params_str}\n" logger.debug(f"Sende Job an S9Api Worker {worker_idx}: {source_xml.name}") - - # Sende Job an Worker worker.stdin.write(job) worker.stdin.flush() - # Warte auf Antwort response = worker.stdout.readline().strip() - logger.debug(f"S9Api Worker {worker_idx} Antwort: '{response}'") if response == "OK": return True, "Erfolgreich" elif response.startswith("ERROR:"): - error_msg = response[6:].strip() - return False, f"Saxon-Fehler (s9api): {error_msg}" + return False, f"Saxon-Fehler (s9api): {response[6:].strip()}" + elif not response: + stderr_content = self._read_stderr_log(worker_idx, tail=500) + return False, f"S9Api Worker {worker_idx} crashed (keine Antwort)\nstderr:\n{stderr_content}" else: - # Leere Antwort bedeutet Worker ist crashed - if not response: - stderr_log = self.worker_log_dir / f"s9api_worker_{worker_idx}_stderr.log" - try: - with open(stderr_log, "r") as f: - stderr_content = f.read()[-500:] # Letzte 500 Zeichen - return False, f"S9Api Worker {worker_idx} crashed (keine Antwort)\nstderr:\n{stderr_content}" - except Exception: - return False, f"S9Api Worker {worker_idx} crashed (keine Antwort)" return False, f"Unerwartete Antwort: {response}" except Exception as e: logger.error(f"Fehler bei S9Api Worker {worker_idx}: {e}") return False, f"Worker-Fehler: {str(e)}" - finally: - # Gebe Worker-Lock frei self.worker_locks[worker_idx].release() - - def measure_ram_usage(self) -> tuple[float, float, list[float]]: - """ - Misst den aktuellen RAM-Verbrauch aller Worker-Prozesse. - - Returns: - tuple: (total_mb, average_mb, per_worker_mb_list) - """ - ram_per_worker = [] - - for i, worker in enumerate(self.workers): - try: - if worker.poll() is None: # Worker läuft noch - process = psutil.Process(worker.pid) - # Hole Speicherinfo (RSS = Resident Set Size in Bytes) - mem_info = process.memory_info() - ram_mb = mem_info.rss / (1024 * 1024) # Konvertiere zu MB - ram_per_worker.append(ram_mb) - else: - logger.warning(f"Worker {i} ist nicht mehr aktiv (kann RAM nicht messen)") - except (psutil.NoSuchProcess, psutil.AccessDenied) as e: - logger.warning(f"Konnte RAM für Worker {i} nicht messen: {e}") - - total_ram = sum(ram_per_worker) - average_ram = total_ram / len(ram_per_worker) if ram_per_worker else 0.0 - - return total_ram, average_ram, ram_per_worker - - def capture_ram_before_transform(self): - """Erfasst RAM-Verbrauch vor der ersten Transformation.""" - total, average, per_worker = self.measure_ram_usage() - self.metrics.ram_before_transform_mb_per_worker = per_worker - self.metrics.total_ram_before_mb = total - self.metrics.average_ram_before_mb = average - - logger.info( - f"RAM vor Transformation: {self.metrics.total_ram_before_mb:.1f} MB " - f"(Durchschnitt: {self.metrics.average_ram_before_mb:.1f} MB/Worker)" - ) - - def capture_ram_after_transform(self): - """Erfasst RAM-Verbrauch nach allen Transformationen.""" - total, average, per_worker = self.measure_ram_usage() - self.metrics.ram_after_transform_mb_per_worker = per_worker - self.metrics.total_ram_after_mb = total - self.metrics.average_ram_after_mb = average - - logger.info( - f"RAM nach Transformation: {self.metrics.total_ram_after_mb:.1f} MB " - f"(Durchschnitt: {self.metrics.average_ram_after_mb:.1f} MB/Worker)" - ) - - def shutdown(self): - """Beendet alle Worker-Prozesse sauber.""" - logger.info("Beende Saxon-S9Api-Worker-Pool...") - - for i, worker in enumerate(self.workers): - try: - # Sende EXIT-Befehl - if worker.stdin and not worker.stdin.closed: - worker.stdin.write("EXIT\n") - worker.stdin.flush() - - # Warte auf Beendigung (max 2 Sekunden) - worker.wait(timeout=2) - logger.debug(f"S9Api Worker {i} beendet") - - except subprocess.TimeoutExpired: - # Force kill falls nötig - worker.kill() - logger.warning(f"S9Api Worker {i} musste gekillt werden") - - except Exception as e: - logger.error(f"Fehler beim Beenden von S9Api Worker {i}: {e}") - - # Lösche temporäres Verzeichnis - if self.temp_dir and self.temp_dir.exists(): - try: - import shutil - - shutil.rmtree(self.temp_dir) - logger.debug(f"Temporäres Verzeichnis gelöscht: {self.temp_dir}") - except Exception as e: - logger.warning(f"Konnte temporäres Verzeichnis nicht löschen: {e}") - - logger.info("Saxon-S9Api-Worker-Pool beendet") - - def __enter__(self): - """Context manager entry.""" - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - """Context manager exit.""" - self.shutdown() diff --git a/src/ui/TreeNodeEditDialog.py b/src/ui/TreeNodeEditDialog.py index 66fbcdc..0ec24c0 100644 --- a/src/ui/TreeNodeEditDialog.py +++ b/src/ui/TreeNodeEditDialog.py @@ -1,161 +1,9 @@ -from PySide6.QtWidgets import QDialog, QTableWidgetItem, QMessageBox -from PySide6.QtCore import Qt - from ui.TreeNodeEditDialog_ui import Ui_TreeNodeEditDialog +from ui.XsltParamsEditDialog import XsltParamsEditDialog -class TreeNodeEditDialog(QDialog): +class TreeNodeEditDialog(XsltParamsEditDialog): """Dialog zur Bearbeitung von TreeNode-Objekten.""" - def __init__(self, parent=None, node=None, parent_params=None): - """ - Initialisiert den Dialog. - - Args: - parent: Übergeordnetes Widget - node: TreeNode-Objekt zum Bearbeiten - parent_params: Dictionary mit Eltern-Parametern (nur anzeigen) - """ - super().__init__(parent) - - # UI einrichten - self.ui = Ui_TreeNodeEditDialog() - self.ui.setupUi(self) - - # Node-Objekt speichern - self.node = node - self.parent_params = parent_params or {} - - # Signale verbinden - self.ui.addParamButton.clicked.connect(self.add_parameter) - self.ui.removeParamButton.clicked.connect(self.remove_parameter) - - # Tabellen konfigurieren - self._setup_tables() - - # Daten laden - if self.node: - self._load_data() - - def _setup_tables(self): - """Konfiguriert die Tabellen.""" - # XSLT Parameter Tabelle - self.ui.xsltParamsTable.setColumnWidth(0, 200) - self.ui.xsltParamsTable.setColumnWidth(1, 300) - self.ui.xsltParamsTable.horizontalHeader().setStretchLastSection(True) - - # Eltern-Parameter Tabelle - self.ui.parentParamsTable.setColumnWidth(0, 200) - self.ui.parentParamsTable.setColumnWidth(1, 300) - self.ui.parentParamsTable.horizontalHeader().setStretchLastSection(True) - - def _load_data(self): - """Lädt die Daten des TreeNode in den Dialog.""" - if not self.node: - return - - # Bezeichnung setzen - self.ui.bezEdit.setText(str(self.node.bez) if self.node.bez else "") - - # XSLT Parameter laden - self._load_xslt_params() - - # Eltern-Parameter laden - self._load_parent_params() - - def _load_xslt_params(self): - """Lädt die XSLT Parameter in die Tabelle.""" - if not self.node or not self.node.xslt_params: - return - - params = self.node.xslt_params - self.ui.xsltParamsTable.setRowCount(len(params)) - - for row, (key, value) in enumerate(params.items()): - # Parameter-Name - key_item = QTableWidgetItem(str(key)) - self.ui.xsltParamsTable.setItem(row, 0, key_item) - - # Parameter-Wert - value_item = QTableWidgetItem(str(value)) - self.ui.xsltParamsTable.setItem(row, 1, value_item) - - def _load_parent_params(self): - """Lädt die Eltern-Parameter in die Tabelle (nur anzeigen).""" - if not self.parent_params: - return - - self.ui.parentParamsTable.setRowCount(len(self.parent_params)) - - for row, (key, value) in enumerate(self.parent_params.items()): - # Parameter-Name - key_item = QTableWidgetItem(str(key)) - key_item.setFlags(key_item.flags() & ~Qt.ItemFlag.ItemIsEditable) - self.ui.parentParamsTable.setItem(row, 0, key_item) - - # Parameter-Wert - value_item = QTableWidgetItem(str(value)) - value_item.setFlags(value_item.flags() & ~Qt.ItemFlag.ItemIsEditable) - self.ui.parentParamsTable.setItem(row, 1, value_item) - - def add_parameter(self): - """Fügt einen neuen Parameter hinzu.""" - row_count = self.ui.xsltParamsTable.rowCount() - self.ui.xsltParamsTable.insertRow(row_count) - - # Leere Items hinzufügen - key_item = QTableWidgetItem("") - value_item = QTableWidgetItem("") - - self.ui.xsltParamsTable.setItem(row_count, 0, key_item) - self.ui.xsltParamsTable.setItem(row_count, 1, value_item) - - # Fokus auf den neuen Parameter setzen - self.ui.xsltParamsTable.setCurrentCell(row_count, 0) - - def remove_parameter(self): - """Entfernt den ausgewählten Parameter.""" - current_row = self.ui.xsltParamsTable.currentRow() - if current_row >= 0: - self.ui.xsltParamsTable.removeRow(current_row) - - def get_data(self): - """ - Gibt die bearbeiteten Daten zurück. - - Returns: - dict: Dictionary mit den bearbeiteten Daten oder None bei Fehler - """ - # Bezeichnung prüfen - bez = self.ui.bezEdit.text().strip() - if not bez: - QMessageBox.warning(self, "Warnung", "Bitte geben Sie eine Bezeichnung ein.") - return None - - # XSLT Parameter sammeln - xslt_params = {} - for row in range(self.ui.xsltParamsTable.rowCount()): - key_item = self.ui.xsltParamsTable.item(row, 0) - value_item = self.ui.xsltParamsTable.item(row, 1) - - if key_item and value_item: - key = key_item.text().strip() - value = value_item.text().strip() - - if key: # Nur Parameter mit nicht-leerem Schlüssel hinzufügen - xslt_params[key] = value - - # CheckBox für Force-Transformation prüfen - force_transform = self.ui.alle_xml_transformieren.isChecked() - - return { - "bez": bez, - "xslt_params": xslt_params, - "force_transform": force_transform - } - - def accept(self): - """Überschreibt accept() um Datenvalidierung durchzuführen.""" - data = self.get_data() - if data is not None: - super().accept() + def _create_ui(self): + return Ui_TreeNodeEditDialog() diff --git a/src/ui/XslFileEditDialog.py b/src/ui/XslFileEditDialog.py index cffdaa0..f0c4a42 100644 --- a/src/ui/XslFileEditDialog.py +++ b/src/ui/XslFileEditDialog.py @@ -1,161 +1,9 @@ -from PySide6.QtWidgets import QDialog, QTableWidgetItem, QMessageBox -from PySide6.QtCore import Qt - from ui.XslFileEditDialog_ui import Ui_XslFileEditDialog +from ui.XsltParamsEditDialog import XsltParamsEditDialog -class XslFileEditDialog(QDialog): +class XslFileEditDialog(XsltParamsEditDialog): """Dialog zur Bearbeitung von XslFile-Objekten.""" - def __init__(self, parent=None, node=None, parent_params=None): - """ - Initialisiert den Dialog. - - Args: - parent: Übergeordnetes Widget - node: XslFile-Objekt zum Bearbeiten - parent_params: Dictionary mit Eltern-Parametern (nur anzeigen) - """ - super().__init__(parent) - - # UI einrichten - self.ui = Ui_XslFileEditDialog() - self.ui.setupUi(self) - - # Node-Objekt speichern - self.node = node - self.parent_params = parent_params or {} - - # Signale verbinden - self.ui.addParamButton.clicked.connect(self.add_parameter) - self.ui.removeParamButton.clicked.connect(self.remove_parameter) - - # Tabellen konfigurieren - self._setup_tables() - - # Daten laden - if self.node: - self._load_data() - - def _setup_tables(self): - """Konfiguriert die Tabellen.""" - # XSLT Parameter Tabelle - self.ui.xsltParamsTable.setColumnWidth(0, 200) - self.ui.xsltParamsTable.setColumnWidth(1, 300) - self.ui.xsltParamsTable.horizontalHeader().setStretchLastSection(True) - - # Eltern-Parameter Tabelle - self.ui.parentParamsTable.setColumnWidth(0, 200) - self.ui.parentParamsTable.setColumnWidth(1, 300) - self.ui.parentParamsTable.horizontalHeader().setStretchLastSection(True) - - def _load_data(self): - """Lädt die Daten des XslFile in den Dialog.""" - if not self.node: - return - - # Bezeichnung setzen - self.ui.bezEdit.setText(str(self.node.bez) if self.node.bez else "") - - # XSLT Parameter laden - self._load_xslt_params() - - # Eltern-Parameter laden - self._load_parent_params() - - def _load_xslt_params(self): - """Lädt die XSLT Parameter in die Tabelle.""" - if not self.node or not self.node.xslt_params: - return - - params = self.node.xslt_params - self.ui.xsltParamsTable.setRowCount(len(params)) - - for row, (key, value) in enumerate(params.items()): - # Parameter-Name - key_item = QTableWidgetItem(str(key)) - self.ui.xsltParamsTable.setItem(row, 0, key_item) - - # Parameter-Wert - value_item = QTableWidgetItem(str(value)) - self.ui.xsltParamsTable.setItem(row, 1, value_item) - - def _load_parent_params(self): - """Lädt die Eltern-Parameter in die Tabelle (nur anzeigen).""" - if not self.parent_params: - return - - self.ui.parentParamsTable.setRowCount(len(self.parent_params)) - - for row, (key, value) in enumerate(self.parent_params.items()): - # Parameter-Name - key_item = QTableWidgetItem(str(key)) - key_item.setFlags(key_item.flags() & ~Qt.ItemFlag.ItemIsEditable) - self.ui.parentParamsTable.setItem(row, 0, key_item) - - # Parameter-Wert - value_item = QTableWidgetItem(str(value)) - value_item.setFlags(value_item.flags() & ~Qt.ItemFlag.ItemIsEditable) - self.ui.parentParamsTable.setItem(row, 1, value_item) - - def add_parameter(self): - """Fügt einen neuen Parameter hinzu.""" - row_count = self.ui.xsltParamsTable.rowCount() - self.ui.xsltParamsTable.insertRow(row_count) - - # Leere Items hinzufügen - key_item = QTableWidgetItem("") - value_item = QTableWidgetItem("") - - self.ui.xsltParamsTable.setItem(row_count, 0, key_item) - self.ui.xsltParamsTable.setItem(row_count, 1, value_item) - - # Fokus auf den neuen Parameter setzen - self.ui.xsltParamsTable.setCurrentCell(row_count, 0) - - def remove_parameter(self): - """Entfernt den ausgewählten Parameter.""" - current_row = self.ui.xsltParamsTable.currentRow() - if current_row >= 0: - self.ui.xsltParamsTable.removeRow(current_row) - - def get_data(self): - """ - Gibt die bearbeiteten Daten zurück. - - Returns: - dict: Dictionary mit den bearbeiteten Daten oder None bei Fehler - """ - # Bezeichnung prüfen - bez = self.ui.bezEdit.text().strip() - if not bez: - QMessageBox.warning(self, "Warnung", "Bitte geben Sie eine Bezeichnung ein.") - return None - - # XSLT Parameter sammeln - xslt_params = {} - for row in range(self.ui.xsltParamsTable.rowCount()): - key_item = self.ui.xsltParamsTable.item(row, 0) - value_item = self.ui.xsltParamsTable.item(row, 1) - - if key_item and value_item: - key = key_item.text().strip() - value = value_item.text().strip() - - if key: # Nur Parameter mit nicht-leerem Schlüssel hinzufügen - xslt_params[key] = value - - # CheckBox für Force-Transformation prüfen - force_transform = self.ui.alle_xml_transformieren.isChecked() - - return { - "bez": bez, - "xslt_params": xslt_params, - "force_transform": force_transform - } - - def accept(self): - """Überschreibt accept() um Datenvalidierung durchzuführen.""" - data = self.get_data() - if data is not None: - super().accept() + def _create_ui(self): + return Ui_XslFileEditDialog() diff --git a/src/ui/XsltParamsEditDialog.py b/src/ui/XsltParamsEditDialog.py new file mode 100644 index 0000000..02c5be9 --- /dev/null +++ b/src/ui/XsltParamsEditDialog.py @@ -0,0 +1,120 @@ +from PySide6.QtWidgets import QDialog, QTableWidgetItem, QMessageBox +from PySide6.QtCore import Qt + + +class XsltParamsEditDialog(QDialog): + """Gemeinsame Basisklasse für Dialoge zur Bearbeitung von XSLT-Parametern.""" + + def __init__(self, parent=None, node=None, parent_params=None): + super().__init__(parent) + + self.ui = self._create_ui() + self.ui.setupUi(self) + + self.node = node + self.parent_params = parent_params or {} + + self.ui.addParamButton.clicked.connect(self.add_parameter) + self.ui.removeParamButton.clicked.connect(self.remove_parameter) + + self._setup_tables() + + if self.node: + self._load_data() + + def _create_ui(self): + """Gibt eine Instanz der konkreten UI-Klasse zurück. Muss überschrieben werden.""" + raise NotImplementedError + + def _setup_tables(self): + """Konfiguriert die Tabellen.""" + self.ui.xsltParamsTable.setColumnWidth(0, 200) + self.ui.xsltParamsTable.setColumnWidth(1, 300) + self.ui.xsltParamsTable.horizontalHeader().setStretchLastSection(True) + + self.ui.parentParamsTable.setColumnWidth(0, 200) + self.ui.parentParamsTable.setColumnWidth(1, 300) + self.ui.parentParamsTable.horizontalHeader().setStretchLastSection(True) + + def _load_data(self): + """Lädt die Daten des Knotens in den Dialog.""" + if not self.node: + return + + self.ui.bezEdit.setText(str(self.node.bez) if self.node.bez else "") + self._load_xslt_params() + self._load_parent_params() + + def _load_xslt_params(self): + """Lädt die XSLT-Parameter in die Tabelle.""" + if not self.node or not self.node.xslt_params: + return + + params = self.node.xslt_params + self.ui.xsltParamsTable.setRowCount(len(params)) + + for row, (key, value) in enumerate(params.items()): + self.ui.xsltParamsTable.setItem(row, 0, QTableWidgetItem(str(key))) + self.ui.xsltParamsTable.setItem(row, 1, QTableWidgetItem(str(value))) + + def _load_parent_params(self): + """Lädt die Eltern-Parameter in die Tabelle (nur anzeigen).""" + if not self.parent_params: + return + + self.ui.parentParamsTable.setRowCount(len(self.parent_params)) + + for row, (key, value) in enumerate(self.parent_params.items()): + key_item = QTableWidgetItem(str(key)) + key_item.setFlags(key_item.flags() & ~Qt.ItemFlag.ItemIsEditable) + self.ui.parentParamsTable.setItem(row, 0, key_item) + + value_item = QTableWidgetItem(str(value)) + value_item.setFlags(value_item.flags() & ~Qt.ItemFlag.ItemIsEditable) + self.ui.parentParamsTable.setItem(row, 1, value_item) + + def add_parameter(self): + """Fügt einen neuen Parameter hinzu.""" + row_count = self.ui.xsltParamsTable.rowCount() + self.ui.xsltParamsTable.insertRow(row_count) + self.ui.xsltParamsTable.setItem(row_count, 0, QTableWidgetItem("")) + self.ui.xsltParamsTable.setItem(row_count, 1, QTableWidgetItem("")) + self.ui.xsltParamsTable.setCurrentCell(row_count, 0) + + def remove_parameter(self): + """Entfernt den ausgewählten Parameter.""" + current_row = self.ui.xsltParamsTable.currentRow() + if current_row >= 0: + self.ui.xsltParamsTable.removeRow(current_row) + + def get_data(self): + """ + Gibt die bearbeiteten Daten zurück. + + Returns: + dict mit 'bez', 'xslt_params', 'force_transform', oder None bei Validierungsfehler + """ + bez = self.ui.bezEdit.text().strip() + if not bez: + QMessageBox.warning(self, "Warnung", "Bitte geben Sie eine Bezeichnung ein.") + return None + + xslt_params = {} + for row in range(self.ui.xsltParamsTable.rowCount()): + key_item = self.ui.xsltParamsTable.item(row, 0) + value_item = self.ui.xsltParamsTable.item(row, 1) + if key_item and value_item: + key = key_item.text().strip() + if key: + xslt_params[key] = value_item.text().strip() + + return { + "bez": bez, + "xslt_params": xslt_params, + "force_transform": self.ui.alle_xml_transformieren.isChecked(), + } + + def accept(self): + """Überschreibt accept() um Datenvalidierung durchzuführen.""" + if self.get_data() is not None: + super().accept() diff --git a/src/worker_pool_base.py b/src/worker_pool_base.py new file mode 100644 index 0000000..cc8e2f4 --- /dev/null +++ b/src/worker_pool_base.py @@ -0,0 +1,308 @@ +""" +BaseWorkerPool - Gemeinsame Basisklasse für JVM-Worker-Pools. + +Enthält alle gemeinsamen Methoden für SaxonWorkerPool, SaxonWorkerPoolS9Api +und FopWorkerPool: Kompilierung, Worker-Start, RAM-Messung und Shutdown. +""" + +import glob +import logging +import shutil +import subprocess +import sys +import tempfile +import threading +import time +from abc import ABC, abstractmethod +from pathlib import Path +from typing import Optional + +import psutil + +from worker_metrics import WorkerPoolMetrics + +logger = logging.getLogger(__name__) + +_CLASSPATH_SEP = ";" if sys.platform == "win32" else ":" + + +def build_jar_classpath(base_dir: Path, include_lib: bool = True) -> str: + """ + Baut einen Classpath aus allen JAR-Dateien in einem Verzeichnis. + + Args: + base_dir: Hauptverzeichnis mit JAR-Dateien + include_lib: Ob das lib/-Unterverzeichnis ebenfalls einbezogen wird + + Returns: + Classpath-String (plattformspezifischer Trennzeichen) + """ + all_jars = glob.glob(str(base_dir / "*.jar")) + if include_lib: + lib_dir = base_dir / "lib" + if lib_dir.exists(): + all_jars.extend(glob.glob(str(lib_dir / "*.jar"))) + return _CLASSPATH_SEP.join(all_jars) + + +class BaseWorkerPool(ABC): + """ + Abstrakte Basisklasse für JVM-basierte Worker-Pools. + + Konkrete Unterklassen müssen folgende Properties/Methoden implementieren: + - _pool_name: Anzeigename für Log-Meldungen + - _java_source_code: Java-Quellcode-String + - _java_class_name: Name der Java-Klasse (ohne .java) + - _temp_dir_prefix: Präfix für das temporäre Verzeichnis + - _worker_init_sleep: Wartezeit nach Worker-Start (Sekunden) + - _get_classpath(): Gibt den Classpath für Kompilierung und Ausführung zurück + - _build_worker_cmd(full_classpath): Gibt die Worker-Startkommando-Liste zurück + - _stderr_log_name(i): Gibt den Dateinamen der stderr-Logdatei für Worker i zurück + """ + + def __init__(self, num_workers: int, java_vm_path: Path, log_dir: Optional[Path] = None): + self.num_workers = num_workers + self.java_vm_path = java_vm_path + self.log_dir = log_dir + + # Worker-Prozesse + self.workers: list[subprocess.Popen] = [] + self.worker_locks: list[threading.Lock] = [] + self._worker_stderr_files: list = [] + + # Temporäres Verzeichnis für kompilierte Java-Klasse + self.temp_dir: Optional[Path] = None + self.worker_class_path: Optional[Path] = None + self.worker_log_dir: Optional[Path] = None + + # Performance-Metriken + self.metrics = WorkerPoolMetrics() + + # --- Abstrakte Schnittstelle --- + + @property + @abstractmethod + def _pool_name(self) -> str: + """Anzeigename für Log-Meldungen (z.B. 'Saxon', 'FOP').""" + + @property + @abstractmethod + def _java_source_code(self) -> str: + """Java-Quellcode-String der Worker-Klasse.""" + + @property + @abstractmethod + def _java_class_name(self) -> str: + """Name der Java-Klasse ohne Suffix (z.B. 'SaxonWorker').""" + + @property + @abstractmethod + def _temp_dir_prefix(self) -> str: + """Präfix für das temporäre Verzeichnis (z.B. 'saxon_worker_').""" + + @property + @abstractmethod + def _worker_init_sleep(self) -> float: + """Wartezeit in Sekunden nach Worker-Start.""" + + @abstractmethod + def _get_classpath(self) -> str: + """Gibt den Classpath-String für Kompilierung und Worker-Start zurück.""" + + @abstractmethod + def _build_worker_cmd(self, full_classpath: str) -> list[str]: + """Gibt die Worker-Startkommando-Liste zurück.""" + + @abstractmethod + def _stderr_log_name(self, i: int) -> str: + """Gibt den Dateinamen der stderr-Logdatei für Worker i zurück.""" + + # --- Gemeinsame Implementierungen --- + + def _compile_worker_class(self): + """Kompiliert die Worker-Java-Klasse ins temporäre Verzeichnis.""" + start_time = time.time() + try: + self.temp_dir = Path(tempfile.mkdtemp(prefix=self._temp_dir_prefix)) + + java_file = self.temp_dir / f"{self._java_class_name}.java" + java_file.write_text(self._java_source_code, encoding="utf-8") + + classpath = self._get_classpath() + javac_cmd = [str(self.java_vm_path).replace("java", "javac"), "-cp", classpath, str(java_file)] + + logger.debug(f"Kompiliere {self._java_class_name}: {' '.join(javac_cmd[:3])}...") + + result = subprocess.run(javac_cmd, capture_output=True, text=True, timeout=30) + if result.returncode != 0: + raise RuntimeError(f"Java-Kompilierung fehlgeschlagen: {result.stderr}") + + self.worker_class_path = self.temp_dir + self.metrics.compilation_time_seconds = time.time() - start_time + + logger.info( + f"{self._java_class_name} erfolgreich kompiliert: {self.temp_dir} " + f"({self.metrics.compilation_time_seconds:.3f}s)" + ) + + except Exception as e: + logger.error(f"Fehler beim Kompilieren von {self._java_class_name}: {e}") + raise + + def _start_workers(self): + """Startet N Worker-Prozesse.""" + classpath = self._get_classpath() + full_classpath = str(self.worker_class_path) + _CLASSPATH_SEP + classpath + + self.worker_log_dir = self.log_dir if self.log_dir else self.temp_dir + if self.log_dir: + self.worker_log_dir.mkdir(parents=True, exist_ok=True) + + cmd = self._build_worker_cmd(full_classpath) + + for i in range(self.num_workers): + worker_start_time = time.time() + try: + stderr_log = self.worker_log_dir / self._stderr_log_name(i) + stderr_file = open(stderr_log, "w", encoding="utf-8") + self._worker_stderr_files.append(stderr_file) + + process = subprocess.Popen( + cmd, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=stderr_file, + text=True, + bufsize=1, + ) + + self.workers.append(process) + self.worker_locks.append(threading.Lock()) + + logger.debug(f"{self._pool_name} Worker {i} gestartet (PID: {process.pid}, stderr: {stderr_log})") + + time.sleep(self._worker_init_sleep) + + if process.poll() is not None: + stderr_file.close() + with open(stderr_log, "r") as f: + stderr_content = f.read() + raise RuntimeError( + f"{self._pool_name} Worker {i} ist sofort beendet " + f"(Exit Code: {process.returncode})\nstderr:\n{stderr_content}" + ) + + self.metrics.worker_start_times.append(time.time() - worker_start_time) + + except Exception as e: + logger.error(f"Fehler beim Starten von {self._pool_name} Worker {i}: {e}") + raise + + self.metrics.calculate_aggregates() + logger.info( + f"{len(self.workers)} {self._pool_name}-Worker erfolgreich gestartet " + f"(Summe: {self.metrics.total_worker_start_time_seconds:.3f}s, " + f"Durchschnitt: {self.metrics.average_worker_start_time_seconds:.3f}s)" + ) + + def _acquire_worker(self) -> int: + """Gibt den Index eines freien Workers zurück (blockiert bis einer frei ist).""" + for i, lock in enumerate(self.worker_locks): + if lock.acquire(blocking=False): + return i + # Kein freier Worker — warte auf den ersten verfügbaren + for i, lock in enumerate(self.worker_locks): + lock.acquire() + return i + + def _read_stderr_log(self, worker_idx: int, tail: int = 0) -> str: + """Liest die stderr-Logdatei eines Workers (optional nur die letzten N Zeichen).""" + try: + stderr_log = self.worker_log_dir / self._stderr_log_name(worker_idx) + with open(stderr_log, "r") as f: + content = f.read() + return content[-tail:] if tail else content + except Exception: + return "" + + def measure_ram_usage(self) -> tuple[float, float, list[float]]: + """ + Misst den aktuellen RAM-Verbrauch aller Worker-Prozesse. + + Returns: + tuple: (total_mb, average_mb, per_worker_mb_list) + """ + ram_per_worker = [] + for i, worker in enumerate(self.workers): + try: + if worker.poll() is None: + process = psutil.Process(worker.pid) + ram_mb = process.memory_info().rss / (1024 * 1024) + ram_per_worker.append(ram_mb) + else: + logger.warning(f"{self._pool_name} Worker {i} ist nicht mehr aktiv (kann RAM nicht messen)") + except (psutil.NoSuchProcess, psutil.AccessDenied) as e: + logger.warning(f"Konnte RAM für {self._pool_name} Worker {i} nicht messen: {e}") + + total_ram = sum(ram_per_worker) + average_ram = total_ram / len(ram_per_worker) if ram_per_worker else 0.0 + return total_ram, average_ram, ram_per_worker + + def capture_ram_before_transform(self): + """Erfasst RAM-Verbrauch vor der ersten Transformation.""" + total, average, per_worker = self.measure_ram_usage() + self.metrics.ram_before_transform_mb_per_worker = per_worker + self.metrics.total_ram_before_mb = total + self.metrics.average_ram_before_mb = average + logger.info( + f"RAM vor Transformation ({self._pool_name}): {total:.1f} MB (Ø {average:.1f} MB/Worker)" + ) + + def capture_ram_after_transform(self): + """Erfasst RAM-Verbrauch nach allen Transformationen.""" + total, average, per_worker = self.measure_ram_usage() + self.metrics.ram_after_transform_mb_per_worker = per_worker + self.metrics.total_ram_after_mb = total + self.metrics.average_ram_after_mb = average + logger.info( + f"RAM nach Transformation ({self._pool_name}): {total:.1f} MB (Ø {average:.1f} MB/Worker)" + ) + + def shutdown(self): + """Beendet alle Worker-Prozesse sauber.""" + logger.info(f"Beende {self._pool_name}-Worker-Pool...") + + for i, worker in enumerate(self.workers): + try: + if worker.stdin and not worker.stdin.closed: + worker.stdin.write("EXIT\n") + worker.stdin.flush() + worker.wait(timeout=2) + logger.debug(f"{self._pool_name} Worker {i} beendet") + except subprocess.TimeoutExpired: + worker.kill() + logger.warning(f"{self._pool_name} Worker {i} musste gekillt werden") + except Exception as e: + logger.error(f"Fehler beim Beenden von {self._pool_name} Worker {i}: {e}") + + for f in self._worker_stderr_files: + try: + f.close() + except Exception: + pass + self._worker_stderr_files.clear() + + if self.temp_dir and self.temp_dir.exists(): + try: + shutil.rmtree(self.temp_dir) + logger.debug(f"Temporäres Verzeichnis gelöscht: {self.temp_dir}") + except Exception as e: + logger.warning(f"Konnte temporäres Verzeichnis nicht löschen: {e}") + + logger.info(f"{self._pool_name}-Worker-Pool beendet") + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.shutdown()