""" FOP Worker Pool - Persistente JVM-Prozesse für schnelle PDF-Generierung. Eliminiert JVM-Startup-Overhead durch Vorinitialisierung von N Worker-Prozessen. Jeder Worker läuft als Daemon und verarbeitet mehrere FO→PDF Transformationen nacheinander. """ import glob import logging from pathlib import Path from typing import Optional from worker_pool_base import BaseWorkerPool, _CLASSPATH_SEP logger = logging.getLogger(__name__) # Java-Worker-Code (wird zur Laufzeit kompiliert) FOP_WORKER_JAVA = """ import org.apache.fop.apps.*; import org.xml.sax.SAXException; import javax.xml.transform.*; import javax.xml.transform.sax.SAXResult; import javax.xml.transform.stream.StreamSource; import java.io.*; import java.net.URI; public class FopWorker { public static void main(String[] args) { BufferedReader reader = new BufferedReader(new InputStreamReader(System.in)); String line; System.err.println("FopWorker starting..."); System.err.flush(); // Create FopFactory once and reuse (major performance boost!) FopFactory fopFactory = null; try { // Check if config file is provided as first argument if (args.length > 0 && !args[0].isEmpty()) { File configFile = new File(args[0]); if (configFile.exists()) { System.err.println("Loading FOP config: " + configFile.getAbsolutePath()); fopFactory = FopFactory.newInstance(configFile); } else { System.err.println("Config file not found, using default configuration"); fopFactory = FopFactory.newInstance(new File(".").toURI()); } } else { System.err.println("No config file specified, using default FOP configuration"); fopFactory = FopFactory.newInstance(new File(".").toURI()); } System.err.println("FopWorker started and ready"); System.err.flush(); } catch (Exception e) { System.err.println("FATAL: Failed to initialize FopFactory: " + e.getMessage()); e.printStackTrace(System.err); System.exit(1); } try { while ((line = reader.readLine()) != null) { System.err.println("DEBUG: Received line: " + line.substring(0, Math.min(100, line.length()))); System.err.flush(); if ("EXIT".equals(line.trim())) { System.err.println("FopWorker exiting"); break; } try { // Parse job System.err.println("DEBUG: Parsing job..."); System.err.flush(); String[] parts = line.split("\\t"); System.err.println("DEBUG: Parts count: " + parts.length); System.err.flush(); if (parts.length < 2) { System.out.println("ERROR: Invalid job format"); System.out.flush(); continue; } String inputFo = parts[0]; String outputPdf = parts[1]; System.err.println("DEBUG: Input FO: " + inputFo); System.err.println("DEBUG: Output PDF: " + outputPdf); System.err.flush(); // Create FOUserAgent for this transformation FOUserAgent foUserAgent = fopFactory.newFOUserAgent(); // Note: Event Listener für detailliertes Error-Logging könnte hier hinzugefügt werden, // aber ist nicht kritisch - Fehler werden durch Exceptions gefangen // Create output stream File outputFile = new File(outputPdf); outputFile.getParentFile().mkdirs(); OutputStream out = new BufferedOutputStream(new FileOutputStream(outputFile)); try { System.err.println("DEBUG: Creating Fop instance..."); System.err.flush(); // Create Fop instance Fop fop = fopFactory.newFop(MimeConstants.MIME_PDF, foUserAgent, out); System.err.println("DEBUG: Setting up transformer..."); System.err.flush(); // Setup Transformer TransformerFactory transformerFactory = TransformerFactory.newInstance(); Transformer transformer = transformerFactory.newTransformer(); // Setup input and output Source src = new StreamSource(new File(inputFo)); Result res = new SAXResult(fop.getDefaultHandler()); System.err.println("DEBUG: Running FOP transformation..."); System.err.flush(); // Run transformation transformer.transform(src, res); System.err.println("DEBUG: FOP transformation completed"); System.err.flush(); } finally { out.close(); } // Transformation erfolgreich System.out.println("OK"); System.out.flush(); } catch (Exception e) { System.err.println("DEBUG: Job processing exception: " + e.getClass().getName()); System.err.flush(); e.printStackTrace(System.err); String errorMsg = e.getMessage(); if (errorMsg == null || errorMsg.isEmpty()) { errorMsg = e.getClass().getSimpleName(); } System.out.println("ERROR: " + errorMsg); System.out.flush(); } } } catch (IOException e) { System.err.println("FopWorker I/O error: " + e.getMessage()); e.printStackTrace(System.err); } } } """ class FopWorkerPool(BaseWorkerPool): """ Pool von lang-laufenden JVM-Prozessen für Apache FOP PDF-Generierung. Eliminiert JVM-Startup-Overhead durch Wiederverwendung von N Worker-Prozessen. """ def __init__( self, num_workers: int, java_vm_path: Path, apache_fop_dir: Path, fop_config_file: Optional[Path] = None, log_dir: Optional[Path] = None, ): super().__init__(num_workers, java_vm_path, log_dir) self.apache_fop_dir = apache_fop_dir self.fop_config_file = fop_config_file self.fop_classpath: Optional[str] = None self._build_fop_classpath() self._compile_worker_class() self._start_workers() logger.info(f"FopWorkerPool initialisiert mit {num_workers} Workern") def _build_fop_classpath(self): """Erstellt den Classpath für Apache FOP.""" all_jars = glob.glob(str(self.apache_fop_dir / "build" / "*.jar")) lib_dir = self.apache_fop_dir / "lib" if lib_dir.exists() and lib_dir.is_dir(): all_jars.extend(glob.glob(str(lib_dir / "*.jar"))) if not all_jars: raise RuntimeError(f"Keine FOP JAR-Dateien gefunden in {self.apache_fop_dir}") self.fop_classpath = _CLASSPATH_SEP.join(all_jars) logger.debug(f"FOP Classpath: {len(all_jars)} JARs") # --- Abstrakte Properties --- @property def _pool_name(self) -> str: return "FOP" @property def _java_source_code(self) -> str: return FOP_WORKER_JAVA @property def _java_class_name(self) -> str: return "FopWorker" @property def _temp_dir_prefix(self) -> str: return "fop_worker_" @property def _worker_init_sleep(self) -> float: return 0.2 # FOP braucht etwas länger zum Initialisieren # --- Abstrakte Methoden --- def _get_classpath(self) -> str: return self.fop_classpath def _build_worker_cmd(self, full_classpath: str) -> list[str]: cmd = [str(self.java_vm_path), "-cp", full_classpath, "FopWorker"] if self.fop_config_file and self.fop_config_file.exists(): cmd.append(str(self.fop_config_file)) return cmd def _stderr_log_name(self, i: int) -> str: return f"fop_worker_{i}_stderr.log" # --- FOP-spezifische Job-Methode --- def build_pdf(self, input_fo: Path, output_pdf: Path) -> tuple[bool, str]: """ Generiert PDF aus FO-Datei mit einem Worker aus dem Pool. Args: input_fo: Pfad zur FO-Eingabedatei output_pdf: Pfad zur PDF-Ausgabedatei Returns: tuple[bool, str]: (Erfolg, Fehlermeldung/Info) """ worker_idx = self._acquire_worker() try: worker = self.workers[worker_idx] if worker.poll() is not None: stderr_content = self._read_stderr_log(worker_idx) error_msg = f"FOP Worker {worker_idx} ist beendet (Exit: {worker.returncode})\nstderr:\n{stderr_content}" logger.error(error_msg) return False, error_msg job = f"{input_fo}\t{output_pdf}\n" logger.debug(f"Sende FOP-Job an Worker {worker_idx}: {input_fo.name} → {output_pdf.name}") worker.stdin.write(job) worker.stdin.flush() response = worker.stdout.readline().strip() logger.debug(f"FOP Worker {worker_idx} Antwort: '{response}'") if response == "OK": return True, "Erfolgreich" elif response.startswith("ERROR:"): return False, f"FOP-Fehler: {response[6:].strip()}" elif not response: stderr_content = self._read_stderr_log(worker_idx, tail=500) return False, f"FOP Worker {worker_idx} crashed (keine Antwort)\nstderr:\n{stderr_content}" else: return False, f"Unerwartete Antwort: {response}" except Exception as e: logger.error(f"Fehler bei FOP Worker {worker_idx}: {e}") return False, f"Worker-Fehler: {str(e)}" finally: self.worker_locks[worker_idx].release()