""" Saxon Worker Pool - Persistente JVM-Prozesse für schnelle XSLT-Transformationen. Eliminiert JVM-Startup-Overhead durch Vorinitialisierung von N Worker-Prozessen. Jeder Worker läuft als Daemon und verarbeitet mehrere Transformationen nacheinander. """ import logging import subprocess import threading import time import psutil from pathlib import Path from typing import Optional import tempfile from worker_metrics import WorkerPoolMetrics logger = logging.getLogger(__name__) # Java-Worker-Code (wird zur Laufzeit kompiliert) SAXON_WORKER_JAVA = """ import javax.xml.transform.*; import javax.xml.transform.stream.*; import java.io.*; import java.util.*; public class SaxonWorker { public static void main(String[] args) { BufferedReader reader = new BufferedReader(new InputStreamReader(System.in)); String line; // Create TransformerFactory once and reuse TransformerFactory factory = TransformerFactory.newInstance(); // Cache für kompilierte Stylesheets (Performance-Optimierung) Map templatesCache = new HashMap<>(); System.err.println("SaxonWorker started and ready (using JAXP Transformer API with stylesheet caching)"); System.err.flush(); try { while ((line = reader.readLine()) != null) { System.err.println("DEBUG: Received line: " + line.substring(0, Math.min(100, line.length()))); System.err.flush(); if ("EXIT".equals(line.trim())) { System.err.println("SaxonWorker exiting"); break; } try { // Parse job System.err.println("DEBUG: Parsing job..."); System.err.flush(); String[] parts = line.split("\\t"); System.err.println("DEBUG: Parts count: " + parts.length); System.err.flush(); if (parts.length < 3) { System.out.println("ERROR: Invalid job format"); System.out.flush(); continue; } String sourceXml = parts[0]; String xslStylesheet = parts[1]; String outputFo = parts[2]; // Prüfe ob Stylesheet bereits im Cache ist Templates templates; if (templatesCache.containsKey(xslStylesheet)) { templates = templatesCache.get(xslStylesheet); System.err.println("DEBUG: Using cached stylesheet: " + xslStylesheet); System.err.flush(); } else { System.err.println("DEBUG: Compiling and caching stylesheet: " + xslStylesheet); System.err.flush(); StreamSource xslSource = new StreamSource(new File(xslStylesheet)); templates = factory.newTemplates(xslSource); templatesCache.put(xslStylesheet, templates); System.err.println("DEBUG: Stylesheet compiled and cached (cache size: " + templatesCache.size() + ")"); System.err.flush(); } System.err.println("DEBUG: Creating transformer from cached template..."); System.err.flush(); // Create Source and Result objects StreamSource xmlSource = new StreamSource(new File(sourceXml)); StreamResult result = new StreamResult(new File(outputFo)); // Create transformer from templates Transformer transformer = templates.newTransformer(); // Set parameters if present if (parts.length > 3 && !parts[3].isEmpty()) { String[] params = parts[3].split("\\\\|\\\\|\\\\|"); for (String param : params) { if (!param.isEmpty() && param.contains("=")) { String[] kv = param.split("=", 2); transformer.setParameter(kv[0], kv[1]); System.err.println("DEBUG: Set parameter: " + kv[0] + " = " + kv[1]); } } System.err.flush(); } System.err.println("DEBUG: Running transformation..."); System.err.flush(); // Capture errors via ErrorListener final StringBuilder errors = new StringBuilder(); transformer.setErrorListener(new ErrorListener() { @Override public void warning(TransformerException e) { errors.append("WARNING: ").append(e.getMessage()).append("\\n"); } @Override public void error(TransformerException e) { errors.append("ERROR: ").append(e.getMessage()).append("\\n"); } @Override public void fatalError(TransformerException e) throws TransformerException { errors.append("FATAL: ").append(e.getMessage()).append("\\n"); throw e; } }); // Run transformation transformer.transform(xmlSource, result); System.err.println("DEBUG: Transformation completed"); System.err.flush(); // Check for errors if (errors.length() > 0) { System.out.println("ERROR: " + errors.toString().trim()); } else { System.out.println("OK"); } System.out.flush(); } catch (TransformerException e) { System.err.println("DEBUG: Transformer exception: " + e.getClass().getName()); System.err.flush(); e.printStackTrace(System.err); String errorMsg = e.getMessage(); if (errorMsg == null || errorMsg.isEmpty()) { errorMsg = e.getClass().getSimpleName(); } System.out.println("ERROR: " + errorMsg); System.out.flush(); } catch (Exception e) { System.err.println("DEBUG: Job processing exception: " + e.getClass().getName()); System.err.flush(); e.printStackTrace(System.err); System.out.println("ERROR: " + (e.getMessage() != null ? e.getMessage() : e.getClass().getName())); System.out.flush(); } } } catch (IOException e) { System.err.println("SaxonWorker I/O error: " + e.getMessage()); e.printStackTrace(System.err); } } } """ class SaxonWorkerPool: """ Pool von lang-laufenden JVM-Prozessen für Saxon-Transformationen. Eliminiert JVM-Startup-Overhead durch Wiederverwendung von N Worker-Prozessen. """ def __init__( self, num_workers: int, java_vm_path: Path, saxon_jar_path: Path, classpath_cache: dict[Path, str], log_dir: Optional[Path] = None, ): """ Initialisiert den Saxon-Worker-Pool. Args: num_workers: Anzahl der Worker-Prozesse java_vm_path: Pfad zur Java VM Binary saxon_jar_path: Pfad zur Saxon JAR-Datei classpath_cache: Cache für Saxon-Classpaths log_dir: Optionales Verzeichnis für Worker-Logs (Standard: temp_dir/tmp) """ self.num_workers = num_workers self.java_vm_path = java_vm_path self.saxon_jar_path = saxon_jar_path self.classpath_cache = classpath_cache self.log_dir = log_dir # Worker-Prozesse self.workers: list[subprocess.Popen] = [] self.worker_locks: list[threading.Lock] = [] # Temporäres Verzeichnis für kompilierte Java-Klasse self.temp_dir: Optional[Path] = None self.worker_class_path: Optional[Path] = None self.worker_log_dir: Optional[Path] = None # Performance-Metriken self.metrics = WorkerPoolMetrics() # Initialisierung self._compile_worker_class() self._start_workers() logger.info(f"SaxonWorkerPool initialisiert mit {num_workers} Workern") def _compile_worker_class(self): """Kompiliert die SaxonWorker-Java-Klasse.""" start_time = time.time() try: # Erstelle temporäres Verzeichnis self.temp_dir = Path(tempfile.mkdtemp(prefix="saxon_worker_")) # Schreibe Java-Quellcode java_file = self.temp_dir / "SaxonWorker.java" java_file.write_text(SAXON_WORKER_JAVA, encoding="utf-8") # Hole Classpath saxon_dir = self.saxon_jar_path.parent if saxon_dir in self.classpath_cache: classpath = self.classpath_cache[saxon_dir] else: # Fallback: Baue Classpath neu import glob import sys all_jars = glob.glob(str(saxon_dir / "*.jar")) lib_dir = saxon_dir / "lib" if lib_dir.exists(): all_jars.extend(glob.glob(str(lib_dir / "*.jar"))) classpath_separator = ";" if sys.platform == "win32" else ":" classpath = classpath_separator.join(all_jars) # Kompiliere Java-Klasse javac_cmd = [str(self.java_vm_path).replace("java", "javac"), "-cp", classpath, str(java_file)] logger.debug(f"Kompiliere SaxonWorker: {' '.join(javac_cmd)}") result = subprocess.run(javac_cmd, capture_output=True, text=True, timeout=30) if result.returncode != 0: raise RuntimeError(f"Java-Kompilierung fehlgeschlagen: {result.stderr}") self.worker_class_path = self.temp_dir # Speichere Kompilierungszeit self.metrics.compilation_time_seconds = time.time() - start_time logger.info( f"SaxonWorker erfolgreich kompiliert: {self.temp_dir} " f"({self.metrics.compilation_time_seconds:.3f}s)" ) except Exception as e: logger.error(f"Fehler beim Kompilieren von SaxonWorker: {e}") raise def _start_workers(self): """Startet N Worker-Prozesse.""" # Hole Classpath saxon_dir = self.saxon_jar_path.parent classpath = self.classpath_cache.get(saxon_dir, "") # Füge Worker-Classpath hinzu import sys classpath_separator = ";" if sys.platform == "win32" else ":" full_classpath = str(self.worker_class_path) + classpath_separator + classpath # Bestimme Log-Verzeichnis self.worker_log_dir = self.log_dir if self.log_dir else self.temp_dir if self.log_dir: self.worker_log_dir.mkdir(parents=True, exist_ok=True) for i in range(self.num_workers): worker_start_time = time.time() try: # Starte JVM-Prozess mit SaxonWorker cmd = [str(self.java_vm_path), "-cp", full_classpath, "SaxonWorker"] # Öffne stderr-Log-Datei für diesen Worker stderr_log = self.worker_log_dir / f"worker_{i}_stderr.log" stderr_file = open(stderr_log, "w", encoding="utf-8") process = subprocess.Popen( cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=stderr_file, # Redirect stderr to file text=True, bufsize=1, # Line buffered ) self.workers.append(process) self.worker_locks.append(threading.Lock()) logger.debug(f"Worker {i} gestartet (PID: {process.pid}, stderr: {stderr_log})") # Warte kurz damit Worker initialisieren kann time.sleep(0.1) # Prüfe ob Worker noch läuft if process.poll() is not None: # Worker ist bereits beendet - Fehler! stderr_file.close() with open(stderr_log, "r") as f: stderr_content = f.read() raise RuntimeError( f"Worker {i} ist sofort beendet (Exit Code: {process.returncode})\nstderr:\n{stderr_content}" ) # Speichere Worker-Startzeit worker_elapsed = time.time() - worker_start_time self.metrics.worker_start_times.append(worker_elapsed) except Exception as e: logger.error(f"Fehler beim Starten von Worker {i}: {e}") raise # Berechne Aggregat-Werte für Worker-Startzeiten self.metrics.calculate_aggregates() logger.info( f"{len(self.workers)} Saxon-Worker erfolgreich gestartet " f"(Summe: {self.metrics.total_worker_start_time_seconds:.3f}s, " f"Durchschnitt: {self.metrics.average_worker_start_time_seconds:.3f}s)" ) def transform( self, source_xml: Path, xsl_stylesheet: Path, output_fo: Path, xslt_params: dict[str, str] ) -> tuple[bool, str]: """ Führt eine XSLT-Transformation mit einem Worker aus dem Pool aus. Args: source_xml: Pfad zur XML-Eingabedatei xsl_stylesheet: Pfad zur XSL-Stylesheet-Datei output_fo: Pfad zur FO-Ausgabedatei xslt_params: Dictionary mit XSLT-Parametern Returns: tuple[bool, str]: (Erfolg, Fehlermeldung/Info) """ # Finde freien Worker worker_idx = None for i, lock in enumerate(self.worker_locks): if lock.acquire(blocking=False): worker_idx = i break if worker_idx is None: # Kein freier Worker, warte auf ersten verfügbaren for i, lock in enumerate(self.worker_locks): lock.acquire() worker_idx = i break try: worker = self.workers[worker_idx] # Prüfe ob Worker noch läuft if worker.poll() is not None: # Worker ist tot! stderr_log = self.worker_log_dir / f"worker_{worker_idx}_stderr.log" try: with open(stderr_log, "r") as f: stderr_content = f.read() error_msg = ( f"Worker {worker_idx} ist beendet (Exit: {worker.returncode})\nstderr:\n{stderr_content}" ) except Exception: error_msg = f"Worker {worker_idx} ist beendet (Exit: {worker.returncode})" logger.error(error_msg) return False, error_msg # Formatiere Parameter params_str = "|||".join([f"{key}={value}" for key, value in xslt_params.items()]) # Erstelle Job-String (Tab-separated) job = f"{source_xml}\t{xsl_stylesheet}\t{output_fo}\t{params_str}\n" logger.debug(f"Sende Job an Worker {worker_idx}: {source_xml.name}") # Sende Job an Worker worker.stdin.write(job) worker.stdin.flush() # Warte auf Antwort response = worker.stdout.readline().strip() logger.debug(f"Worker {worker_idx} Antwort: '{response}'") if response == "OK": return True, "Erfolgreich" elif response.startswith("ERROR:"): error_msg = response[6:].strip() return False, f"Saxon-Fehler: {error_msg}" else: # Leere Antwort bedeutet Worker ist crashed if not response: stderr_log = self.worker_log_dir / f"worker_{worker_idx}_stderr.log" try: with open(stderr_log, "r") as f: stderr_content = f.read()[-500:] # Letzte 500 Zeichen return False, f"Worker {worker_idx} crashed (keine Antwort)\nstderr:\n{stderr_content}" except Exception: return False, f"Worker {worker_idx} crashed (keine Antwort)" return False, f"Unerwartete Antwort: {response}" except Exception as e: logger.error(f"Fehler bei Worker {worker_idx}: {e}") return False, f"Worker-Fehler: {str(e)}" finally: # Gebe Worker-Lock frei self.worker_locks[worker_idx].release() def measure_ram_usage(self) -> tuple[float, float, list[float]]: """ Misst den aktuellen RAM-Verbrauch aller Worker-Prozesse. Returns: tuple: (total_mb, average_mb, per_worker_mb_list) """ ram_per_worker = [] for i, worker in enumerate(self.workers): try: if worker.poll() is None: # Worker läuft noch process = psutil.Process(worker.pid) # Hole Speicherinfo (RSS = Resident Set Size in Bytes) mem_info = process.memory_info() ram_mb = mem_info.rss / (1024 * 1024) # Konvertiere zu MB ram_per_worker.append(ram_mb) else: logger.warning(f"Worker {i} ist nicht mehr aktiv (kann RAM nicht messen)") except (psutil.NoSuchProcess, psutil.AccessDenied) as e: logger.warning(f"Konnte RAM für Worker {i} nicht messen: {e}") total_ram = sum(ram_per_worker) average_ram = total_ram / len(ram_per_worker) if ram_per_worker else 0.0 return total_ram, average_ram, ram_per_worker def capture_ram_before_transform(self): """Erfasst RAM-Verbrauch vor der ersten Transformation.""" total, average, per_worker = self.measure_ram_usage() self.metrics.ram_before_transform_mb_per_worker = per_worker self.metrics.total_ram_before_mb = total self.metrics.average_ram_before_mb = average logger.info( f"RAM vor Transformation: {self.metrics.total_ram_before_mb:.1f} MB " f"(Durchschnitt: {self.metrics.average_ram_before_mb:.1f} MB/Worker)" ) def capture_ram_after_transform(self): """Erfasst RAM-Verbrauch nach allen Transformationen.""" total, average, per_worker = self.measure_ram_usage() self.metrics.ram_after_transform_mb_per_worker = per_worker self.metrics.total_ram_after_mb = total self.metrics.average_ram_after_mb = average logger.info( f"RAM nach Transformation: {self.metrics.total_ram_after_mb:.1f} MB " f"(Durchschnitt: {self.metrics.average_ram_after_mb:.1f} MB/Worker)" ) def shutdown(self): """Beendet alle Worker-Prozesse sauber.""" logger.info("Beende Saxon-Worker-Pool...") for i, worker in enumerate(self.workers): try: # Sende EXIT-Befehl if worker.stdin and not worker.stdin.closed: worker.stdin.write("EXIT\n") worker.stdin.flush() # Warte auf Beendigung (max 2 Sekunden) worker.wait(timeout=2) logger.debug(f"Worker {i} beendet") except subprocess.TimeoutExpired: # Force kill falls nötig worker.kill() logger.warning(f"Worker {i} musste gekillt werden") except Exception as e: logger.error(f"Fehler beim Beenden von Worker {i}: {e}") # Lösche temporäres Verzeichnis if self.temp_dir and self.temp_dir.exists(): try: import shutil shutil.rmtree(self.temp_dir) logger.debug(f"Temporäres Verzeichnis gelöscht: {self.temp_dir}") except Exception as e: logger.warning(f"Konnte temporäres Verzeichnis nicht löschen: {e}") logger.info("Saxon-Worker-Pool beendet") def __enter__(self): """Context manager entry.""" return self def __exit__(self, exc_type, exc_val, exc_tb): """Context manager exit.""" self.shutdown()