""" Saxon Worker Pool - Persistente JVM-Prozesse für schnelle XSLT-Transformationen. Eliminiert JVM-Startup-Overhead durch Vorinitialisierung von N Worker-Prozessen. Jeder Worker läuft als Daemon und verarbeitet mehrere Transformationen nacheinander. """ import logging from pathlib import Path from typing import Optional from worker_pool_base import BaseWorkerPool, build_jar_classpath logger = logging.getLogger(__name__) # Java-Worker-Code (wird zur Laufzeit kompiliert) SAXON_WORKER_JAVA = """ import javax.xml.transform.*; import javax.xml.transform.stream.*; import java.io.*; import java.util.*; public class SaxonWorker { public static void main(String[] args) { BufferedReader reader = new BufferedReader(new InputStreamReader(System.in)); String line; // Create TransformerFactory once and reuse TransformerFactory factory = TransformerFactory.newInstance(); // Cache für kompilierte Stylesheets (Performance-Optimierung) Map templatesCache = new HashMap<>(); System.err.println("SaxonWorker started and ready (using JAXP Transformer API with stylesheet caching)"); System.err.flush(); try { while ((line = reader.readLine()) != null) { System.err.println("DEBUG: Received line: " + line.substring(0, Math.min(100, line.length()))); System.err.flush(); if ("EXIT".equals(line.trim())) { System.err.println("SaxonWorker exiting"); break; } try { // Parse job System.err.println("DEBUG: Parsing job..."); System.err.flush(); String[] parts = line.split("\\t"); System.err.println("DEBUG: Parts count: " + parts.length); System.err.flush(); if (parts.length < 3) { System.out.println("ERROR: Invalid job format"); System.out.flush(); continue; } String sourceXml = parts[0]; String xslStylesheet = parts[1]; String outputFo = parts[2]; // Prüfe ob Stylesheet bereits im Cache ist Templates templates; if (templatesCache.containsKey(xslStylesheet)) { templates = templatesCache.get(xslStylesheet); System.err.println("DEBUG: Using cached stylesheet: " + xslStylesheet); System.err.flush(); } else { System.err.println("DEBUG: Compiling and caching stylesheet: " + xslStylesheet); System.err.flush(); StreamSource xslSource = new StreamSource(new File(xslStylesheet)); templates = factory.newTemplates(xslSource); templatesCache.put(xslStylesheet, templates); System.err.println("DEBUG: Stylesheet compiled and cached (cache size: " + templatesCache.size() + ")"); System.err.flush(); } System.err.println("DEBUG: Creating transformer from cached template..."); System.err.flush(); // Create Source and Result objects StreamSource xmlSource = new StreamSource(new File(sourceXml)); StreamResult result = new StreamResult(new File(outputFo)); // Create transformer from templates Transformer transformer = templates.newTransformer(); // Set parameters if present if (parts.length > 3 && !parts[3].isEmpty()) { String[] params = parts[3].split("\\\\|\\\\|\\\\|"); for (String param : params) { if (!param.isEmpty() && param.contains("=")) { String[] kv = param.split("=", 2); transformer.setParameter(kv[0], kv[1]); System.err.println("DEBUG: Set parameter: " + kv[0] + " = " + kv[1]); } } System.err.flush(); } System.err.println("DEBUG: Running transformation..."); System.err.flush(); // Capture errors via ErrorListener final StringBuilder errors = new StringBuilder(); transformer.setErrorListener(new ErrorListener() { @Override public void warning(TransformerException e) { errors.append("WARNING: ").append(e.getMessage()).append("\\n"); } @Override public void error(TransformerException e) { errors.append("ERROR: ").append(e.getMessage()).append("\\n"); } @Override public void fatalError(TransformerException e) throws TransformerException { errors.append("FATAL: ").append(e.getMessage()).append("\\n"); throw e; } }); // Run transformation transformer.transform(xmlSource, result); System.err.println("DEBUG: Transformation completed"); System.err.flush(); // Check for errors if (errors.length() > 0) { System.out.println("ERROR: " + errors.toString().trim()); } else { System.out.println("OK"); } System.out.flush(); } catch (TransformerException e) { System.err.println("DEBUG: Transformer exception: " + e.getClass().getName()); System.err.flush(); e.printStackTrace(System.err); String errorMsg = e.getMessage(); if (errorMsg == null || errorMsg.isEmpty()) { errorMsg = e.getClass().getSimpleName(); } System.out.println("ERROR: " + errorMsg); System.out.flush(); } catch (Exception e) { System.err.println("DEBUG: Job processing exception: " + e.getClass().getName()); System.err.flush(); e.printStackTrace(System.err); System.out.println("ERROR: " + (e.getMessage() != null ? e.getMessage() : e.getClass().getName())); System.out.flush(); } } } catch (IOException e) { System.err.println("SaxonWorker I/O error: " + e.getMessage()); e.printStackTrace(System.err); } } } """ class SaxonWorkerPool(BaseWorkerPool): """ Pool von lang-laufenden JVM-Prozessen für Saxon-Transformationen (JAXP/XSLT 1.0). Eliminiert JVM-Startup-Overhead durch Wiederverwendung von N Worker-Prozessen. """ def __init__( self, num_workers: int, java_vm_path: Path, saxon_jar_path: Path, classpath_cache: dict[Path, str], log_dir: Optional[Path] = None, ): super().__init__(num_workers, java_vm_path, log_dir) self.saxon_jar_path = saxon_jar_path self.classpath_cache = classpath_cache self._compile_worker_class() self._start_workers() logger.info(f"SaxonWorkerPool initialisiert mit {num_workers} Workern") # --- Abstrakte Properties --- @property def _pool_name(self) -> str: return "Saxon" @property def _java_source_code(self) -> str: return SAXON_WORKER_JAVA @property def _java_class_name(self) -> str: return "SaxonWorker" @property def _temp_dir_prefix(self) -> str: return "saxon_worker_" @property def _worker_init_sleep(self) -> float: return 0.1 # --- Abstrakte Methoden --- def _get_classpath(self) -> str: saxon_dir = self.saxon_jar_path.parent if saxon_dir not in self.classpath_cache: self.classpath_cache[saxon_dir] = build_jar_classpath(saxon_dir) return self.classpath_cache[saxon_dir] def _build_worker_cmd(self, full_classpath: str) -> list[str]: return [str(self.java_vm_path), "-cp", full_classpath, "SaxonWorker"] def _stderr_log_name(self, i: int) -> str: return f"worker_{i}_stderr.log" # --- Saxon-spezifische Job-Methode --- def transform( self, source_xml: Path, xsl_stylesheet: Path, output_fo: Path, xslt_params: dict[str, str] ) -> tuple[bool, str]: """ Führt eine XSLT-Transformation mit einem Worker aus dem Pool aus. Args: source_xml: Pfad zur XML-Eingabedatei xsl_stylesheet: Pfad zur XSL-Stylesheet-Datei output_fo: Pfad zur FO-Ausgabedatei xslt_params: Dictionary mit XSLT-Parametern Returns: tuple[bool, str]: (Erfolg, Fehlermeldung/Info) """ worker_idx = self._acquire_worker() try: worker = self.workers[worker_idx] if worker.poll() is not None: stderr_content = self._read_stderr_log(worker_idx) error_msg = f"Worker {worker_idx} ist beendet (Exit: {worker.returncode})\nstderr:\n{stderr_content}" logger.error(error_msg) return False, error_msg params_str = "|||".join([f"{k}={v}" for k, v in xslt_params.items()]) job = f"{source_xml}\t{xsl_stylesheet}\t{output_fo}\t{params_str}\n" logger.debug(f"Sende Job an Worker {worker_idx}: {source_xml.name}") worker.stdin.write(job) worker.stdin.flush() response = worker.stdout.readline().strip() logger.debug(f"Worker {worker_idx} Antwort: '{response}'") if response == "OK": return True, "Erfolgreich" elif response.startswith("ERROR:"): return False, f"Saxon-Fehler: {response[6:].strip()}" elif not response: stderr_content = self._read_stderr_log(worker_idx, tail=500) return False, f"Worker {worker_idx} crashed (keine Antwort)\nstderr:\n{stderr_content}" else: return False, f"Unerwartete Antwort: {response}" except Exception as e: logger.error(f"Fehler bei Worker {worker_idx}: {e}") return False, f"Worker-Fehler: {str(e)}" finally: self.worker_locks[worker_idx].release()