""" Saxon Worker Pool (s9api) - Persistente JVM-Prozesse für XSLT 2.0/3.0 Transformationen. Diese Variante verwendet die Saxon s9api API anstatt JAXP und ist für XSLT 2.0 und 3.0 geeignet. Eliminiert JVM-Startup-Overhead durch Vorinitialisierung von N Worker-Prozessen. Jeder Worker läuft als Daemon und verarbeitet mehrere Transformationen nacheinander. """ import logging from pathlib import Path from typing import Optional from worker_pool_base import BaseWorkerPool, build_jar_classpath logger = logging.getLogger(__name__) # Java-Worker-Code für s9api (wird zur Laufzeit kompiliert) SAXON_S9API_WORKER_JAVA = """ import net.sf.saxon.s9api.*; import javax.xml.transform.stream.StreamSource; import java.io.*; import java.util.HashMap; import java.util.Map; public class SaxonS9ApiWorker { public static void main(String[] args) { BufferedReader reader = new BufferedReader(new InputStreamReader(System.in)); String line; // Create Processor once and reuse (equivalent to TransformerFactory) Processor processor = new Processor(false); // Cache für kompilierte Stylesheets (Performance-Optimierung) Map stylesheetCache = new HashMap<>(); System.err.println("SaxonS9ApiWorker started and ready (using s9api for XSLT 2.0/3.0 with stylesheet caching)"); System.err.flush(); try { while ((line = reader.readLine()) != null) { System.err.println("DEBUG: Received line: " + line.substring(0, Math.min(100, line.length()))); System.err.flush(); if ("EXIT".equals(line.trim())) { System.err.println("SaxonS9ApiWorker exiting"); break; } try { // Parse job System.err.println("DEBUG: Parsing job..."); System.err.flush(); String[] parts = line.split("\\\\t"); System.err.println("DEBUG: Parts count: " + parts.length); System.err.flush(); if (parts.length < 3) { System.out.println("ERROR: Invalid job format"); System.out.flush(); continue; } String sourceXml = parts[0]; String xslStylesheet = parts[1]; String outputFo = parts[2]; // Prüfe ob Stylesheet bereits im Cache ist XsltExecutable executable; if (stylesheetCache.containsKey(xslStylesheet)) { executable = stylesheetCache.get(xslStylesheet); System.err.println("DEBUG: Using cached stylesheet: " + xslStylesheet); System.err.flush(); } else { System.err.println("DEBUG: Compiling and caching stylesheet: " + xslStylesheet); System.err.flush(); XsltCompiler compiler = processor.newXsltCompiler(); executable = compiler.compile(new StreamSource(new File(xslStylesheet))); stylesheetCache.put(xslStylesheet, executable); System.err.println("DEBUG: Stylesheet compiled and cached (cache size: " + stylesheetCache.size() + ")"); System.err.flush(); } System.err.println("DEBUG: Creating transformer..."); System.err.flush(); // Create transformer XsltTransformer transformer = executable.load(); // Set source transformer.setSource(new StreamSource(new File(sourceXml))); // Set destination Serializer serializer = processor.newSerializer(new File(outputFo)); transformer.setDestination(serializer); // Set parameters if present if (parts.length > 3 && !parts[3].isEmpty()) { String[] params = parts[3].split("\\\\|\\\\|\\\\|"); for (String param : params) { if (!param.isEmpty() && param.contains("=")) { String[] kv = param.split("=", 2); transformer.setParameter(new QName(kv[0]), new XdmAtomicValue(kv[1])); System.err.println("DEBUG: Set parameter: " + kv[0] + " = " + kv[1]); } } System.err.flush(); } System.err.println("DEBUG: Running transformation..."); System.err.flush(); // Transform transformer.transform(); System.err.println("DEBUG: Transformation completed"); System.err.flush(); System.out.println("OK"); System.out.flush(); } catch (SaxonApiException e) { System.err.println("DEBUG: SaxonApiException: " + e.getClass().getName()); System.err.flush(); e.printStackTrace(System.err); String errorMsg = e.getMessage(); if (errorMsg == null || errorMsg.isEmpty()) { errorMsg = e.getClass().getSimpleName(); } System.out.println("ERROR: " + errorMsg); System.out.flush(); } catch (Exception e) { System.err.println("DEBUG: Job processing exception: " + e.getClass().getName()); System.err.flush(); e.printStackTrace(System.err); System.out.println("ERROR: " + (e.getMessage() != null ? e.getMessage() : e.getClass().getName())); System.out.flush(); } } } catch (IOException e) { System.err.println("SaxonS9ApiWorker I/O error: " + e.getMessage()); e.printStackTrace(System.err); } } } """ class SaxonWorkerPoolS9Api(BaseWorkerPool): """ Pool von lang-laufenden JVM-Prozessen für Saxon-Transformationen mit s9api. Diese Variante verwendet die Saxon s9api API anstatt JAXP und unterstützt vollständig XSLT 2.0 und 3.0 Transformationen. """ def __init__( self, num_workers: int, java_vm_path: Path, saxon_jar_path: Path, classpath_cache: dict[Path, str], log_dir: Optional[Path] = None, ): super().__init__(num_workers, java_vm_path, log_dir) self.saxon_jar_path = saxon_jar_path self.classpath_cache = classpath_cache self._compile_worker_class() self._start_workers() logger.info(f"SaxonWorkerPoolS9Api initialisiert mit {num_workers} Workern (XSLT 2.0/3.0)") # --- Abstrakte Properties --- @property def _pool_name(self) -> str: return "Saxon-S9Api" @property def _java_source_code(self) -> str: return SAXON_S9API_WORKER_JAVA @property def _java_class_name(self) -> str: return "SaxonS9ApiWorker" @property def _temp_dir_prefix(self) -> str: return "saxon_s9api_worker_" @property def _worker_init_sleep(self) -> float: return 0.1 # --- Abstrakte Methoden --- def _get_classpath(self) -> str: saxon_dir = self.saxon_jar_path.parent if saxon_dir not in self.classpath_cache: self.classpath_cache[saxon_dir] = build_jar_classpath(saxon_dir) logger.debug(f"Classpath für {saxon_dir} neu erstellt und gecacht") return self.classpath_cache[saxon_dir] def _build_worker_cmd(self, full_classpath: str) -> list[str]: return [str(self.java_vm_path), "-cp", full_classpath, "SaxonS9ApiWorker"] def _stderr_log_name(self, i: int) -> str: return f"s9api_worker_{i}_stderr.log" # --- Saxon-s9api-spezifische Job-Methode --- def transform( self, source_xml: Path, xsl_stylesheet: Path, output_fo: Path, xslt_params: dict[str, str] ) -> tuple[bool, str]: """ Führt eine XSLT-Transformation mit einem Worker aus dem Pool aus. Args: source_xml: Pfad zur XML-Eingabedatei xsl_stylesheet: Pfad zur XSL-Stylesheet-Datei output_fo: Pfad zur FO-Ausgabedatei xslt_params: Dictionary mit XSLT-Parametern Returns: tuple[bool, str]: (Erfolg, Fehlermeldung/Info) """ worker_idx = self._acquire_worker() try: worker = self.workers[worker_idx] if worker.poll() is not None: stderr_content = self._read_stderr_log(worker_idx) error_msg = ( f"S9Api Worker {worker_idx} ist beendet (Exit: {worker.returncode})\nstderr:\n{stderr_content}" ) logger.error(error_msg) return False, error_msg params_str = "|||".join([f"{k}={v}" for k, v in xslt_params.items()]) job = f"{source_xml}\t{xsl_stylesheet}\t{output_fo}\t{params_str}\n" logger.debug(f"Sende Job an S9Api Worker {worker_idx}: {source_xml.name}") worker.stdin.write(job) worker.stdin.flush() response = worker.stdout.readline().strip() logger.debug(f"S9Api Worker {worker_idx} Antwort: '{response}'") if response == "OK": return True, "Erfolgreich" elif response.startswith("ERROR:"): return False, f"Saxon-Fehler (s9api): {response[6:].strip()}" elif not response: stderr_content = self._read_stderr_log(worker_idx, tail=500) return False, f"S9Api Worker {worker_idx} crashed (keine Antwort)\nstderr:\n{stderr_content}" else: return False, f"Unerwartete Antwort: {response}" except Exception as e: logger.error(f"Fehler bei S9Api Worker {worker_idx}: {e}") return False, f"Worker-Fehler: {str(e)}" finally: self.worker_locks[worker_idx].release()