""" FOP Worker Pool - Persistente JVM-Prozesse für schnelle PDF-Generierung. Eliminiert JVM-Startup-Overhead durch Vorinitialisierung von N Worker-Prozessen. Jeder Worker läuft als Daemon und verarbeitet mehrere FO→PDF Transformationen nacheinander. """ import logging import subprocess import threading from pathlib import Path from queue import Queue from typing import Optional import tempfile logger = logging.getLogger(__name__) # Java-Worker-Code (wird zur Laufzeit kompiliert) FOP_WORKER_JAVA = """ import org.apache.fop.apps.*; import org.xml.sax.SAXException; import javax.xml.transform.*; import javax.xml.transform.sax.SAXResult; import javax.xml.transform.stream.StreamSource; import java.io.*; import java.net.URI; public class FopWorker { public static void main(String[] args) { BufferedReader reader = new BufferedReader(new InputStreamReader(System.in)); String line; System.err.println("FopWorker starting..."); System.err.flush(); // Create FopFactory once and reuse (major performance boost!) FopFactory fopFactory = null; try { // Check if config file is provided as first argument if (args.length > 0 && !args[0].isEmpty()) { File configFile = new File(args[0]); if (configFile.exists()) { System.err.println("Loading FOP config: " + configFile.getAbsolutePath()); fopFactory = FopFactory.newInstance(configFile); } else { System.err.println("Config file not found, using default configuration"); fopFactory = FopFactory.newInstance(new File(".").toURI()); } } else { System.err.println("No config file specified, using default FOP configuration"); fopFactory = FopFactory.newInstance(new File(".").toURI()); } System.err.println("FopWorker started and ready"); System.err.flush(); } catch (Exception e) { System.err.println("FATAL: Failed to initialize FopFactory: " + e.getMessage()); e.printStackTrace(System.err); System.exit(1); } try { while ((line = reader.readLine()) != null) { System.err.println("DEBUG: Received line: " + line.substring(0, Math.min(100, line.length()))); System.err.flush(); if ("EXIT".equals(line.trim())) { System.err.println("FopWorker exiting"); break; } try { // Parse job System.err.println("DEBUG: Parsing job..."); System.err.flush(); String[] parts = line.split("\\t"); System.err.println("DEBUG: Parts count: " + parts.length); System.err.flush(); if (parts.length < 2) { System.out.println("ERROR: Invalid job format"); System.out.flush(); continue; } String inputFo = parts[0]; String outputPdf = parts[1]; System.err.println("DEBUG: Input FO: " + inputFo); System.err.println("DEBUG: Output PDF: " + outputPdf); System.err.flush(); // Create FOUserAgent for this transformation FOUserAgent foUserAgent = fopFactory.newFOUserAgent(); // Note: Event Listener für detailliertes Error-Logging könnte hier hinzugefügt werden, // aber ist nicht kritisch - Fehler werden durch Exceptions gefangen // Create output stream File outputFile = new File(outputPdf); outputFile.getParentFile().mkdirs(); OutputStream out = new BufferedOutputStream(new FileOutputStream(outputFile)); try { System.err.println("DEBUG: Creating Fop instance..."); System.err.flush(); // Create Fop instance Fop fop = fopFactory.newFop(MimeConstants.MIME_PDF, foUserAgent, out); System.err.println("DEBUG: Setting up transformer..."); System.err.flush(); // Setup Transformer TransformerFactory transformerFactory = TransformerFactory.newInstance(); Transformer transformer = transformerFactory.newTransformer(); // Setup input and output Source src = new StreamSource(new File(inputFo)); Result res = new SAXResult(fop.getDefaultHandler()); System.err.println("DEBUG: Running FOP transformation..."); System.err.flush(); // Run transformation transformer.transform(src, res); System.err.println("DEBUG: FOP transformation completed"); System.err.flush(); } finally { out.close(); } // Transformation erfolgreich System.out.println("OK"); System.out.flush(); } catch (Exception e) { System.err.println("DEBUG: Job processing exception: " + e.getClass().getName()); System.err.flush(); e.printStackTrace(System.err); String errorMsg = e.getMessage(); if (errorMsg == null || errorMsg.isEmpty()) { errorMsg = e.getClass().getSimpleName(); } System.out.println("ERROR: " + errorMsg); System.out.flush(); } } } catch (IOException e) { System.err.println("FopWorker I/O error: " + e.getMessage()); e.printStackTrace(System.err); } } } """ class FopWorkerPool: """ Pool von lang-laufenden JVM-Prozessen für Apache FOP PDF-Generierung. Eliminiert JVM-Startup-Overhead durch Wiederverwendung von N Worker-Prozessen. """ def __init__( self, num_workers: int, java_vm_path: Path, apache_fop_dir: Path, fop_config_file: Optional[Path] = None, log_dir: Optional[Path] = None, ): """ Initialisiert den FOP-Worker-Pool. Args: num_workers: Anzahl der Worker-Prozesse java_vm_path: Pfad zur Java VM Binary apache_fop_dir: Pfad zum Apache FOP-Verzeichnis fop_config_file: Optionaler Pfad zur fop.xconf Konfigurationsdatei log_dir: Optionales Verzeichnis für Worker-Logs (Standard: temp_dir/temp) """ self.num_workers = num_workers self.java_vm_path = java_vm_path self.apache_fop_dir = apache_fop_dir self.fop_config_file = fop_config_file self.log_dir = log_dir # Worker-Prozesse und Queues self.workers: list[subprocess.Popen] = [] self.job_queue: Queue = Queue() self.result_queue: Queue = Queue() self.worker_locks: list[threading.Lock] = [] # Temporäres Verzeichnis für kompilierte Java-Klasse self.temp_dir: Optional[Path] = None self.worker_class_path: Optional[Path] = None self.worker_log_dir: Optional[Path] = None # Classpath für FOP self.fop_classpath: Optional[str] = None # Initialisierung self._build_fop_classpath() self._compile_worker_class() self._start_workers() logger.info(f"FopWorkerPool initialisiert mit {num_workers} Workern") def _build_fop_classpath(self): """Erstellt den Classpath für Apache FOP.""" import glob import sys # Sammle alle JAR-Dateien im FOP-Verzeichnis all_jars = glob.glob(str(self.apache_fop_dir / "build" / "*.jar")) # FOP lib-Verzeichnis lib_dir = self.apache_fop_dir / "lib" if lib_dir.exists() and lib_dir.is_dir(): all_jars.extend(glob.glob(str(lib_dir / "*.jar"))) if not all_jars: raise RuntimeError(f"Keine FOP JAR-Dateien gefunden in {self.apache_fop_dir}") classpath_separator = ";" if sys.platform == "win32" else ":" self.fop_classpath = classpath_separator.join(all_jars) logger.debug(f"FOP Classpath: {len(all_jars)} JARs") def _compile_worker_class(self): """Kompiliert die FopWorker-Java-Klasse.""" try: # Erstelle temporäres Verzeichnis self.temp_dir = Path(tempfile.mkdtemp(prefix="fop_worker_")) # Schreibe Java-Quellcode java_file = self.temp_dir / "FopWorker.java" java_file.write_text(FOP_WORKER_JAVA, encoding="utf-8") # Kompiliere Java-Klasse javac_cmd = [ str(self.java_vm_path).replace("java", "javac"), "-cp", self.fop_classpath, str(java_file), ] logger.debug(f"Kompiliere FopWorker: {' '.join(javac_cmd[:3])}...") result = subprocess.run(javac_cmd, capture_output=True, text=True, timeout=30) if result.returncode != 0: raise RuntimeError(f"Java-Kompilierung fehlgeschlagen: {result.stderr}") self.worker_class_path = self.temp_dir logger.info(f"FopWorker erfolgreich kompiliert: {self.temp_dir}") except Exception as e: logger.error(f"Fehler beim Kompilieren von FopWorker: {e}") raise def _start_workers(self): """Startet N Worker-Prozesse.""" import sys # Füge Worker-Classpath zum FOP-Classpath hinzu classpath_separator = ";" if sys.platform == "win32" else ":" full_classpath = str(self.worker_class_path) + classpath_separator + self.fop_classpath # Bestimme Log-Verzeichnis self.worker_log_dir = self.log_dir if self.log_dir else self.temp_dir if self.log_dir: self.worker_log_dir.mkdir(parents=True, exist_ok=True) for i in range(self.num_workers): try: # Starte JVM-Prozess mit FopWorker # Übergebe fop.xconf als Argument falls vorhanden cmd = [str(self.java_vm_path), "-cp", full_classpath, "FopWorker"] if self.fop_config_file and self.fop_config_file.exists(): cmd.append(str(self.fop_config_file)) # Öffne stderr-Log-Datei für diesen Worker stderr_log = self.worker_log_dir / f"fop_worker_{i}_stderr.log" stderr_file = open(stderr_log, "w", encoding="utf-8") process = subprocess.Popen( cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=stderr_file, # Redirect stderr to file text=True, bufsize=1, # Line buffered ) self.workers.append(process) self.worker_locks.append(threading.Lock()) logger.debug(f"FOP Worker {i} gestartet (PID: {process.pid}, stderr: {stderr_log})") # Warte kurz damit Worker initialisieren kann import time time.sleep(0.2) # FOP braucht etwas länger zum Initialisieren # Prüfe ob Worker noch läuft if process.poll() is not None: # Worker ist bereits beendet - Fehler! stderr_file.close() with open(stderr_log, "r") as f: stderr_content = f.read() raise RuntimeError( f"FOP Worker {i} ist sofort beendet (Exit Code: {process.returncode})\nstderr:\n{stderr_content}" ) except Exception as e: logger.error(f"Fehler beim Starten von FOP Worker {i}: {e}") raise logger.info(f"{len(self.workers)} FOP-Worker erfolgreich gestartet") def build_pdf(self, input_fo: Path, output_pdf: Path) -> tuple[bool, str]: """ Generiert PDF aus FO-Datei mit einem Worker aus dem Pool. Args: input_fo: Pfad zur FO-Eingabedatei output_pdf: Pfad zur PDF-Ausgabedatei Returns: tuple[bool, str]: (Erfolg, Fehlermeldung/Info) """ # Finde freien Worker worker_idx = None for i, lock in enumerate(self.worker_locks): if lock.acquire(blocking=False): worker_idx = i break if worker_idx is None: # Kein freier Worker, warte auf ersten verfügbaren for i, lock in enumerate(self.worker_locks): lock.acquire() worker_idx = i break try: worker = self.workers[worker_idx] # Prüfe ob Worker noch läuft if worker.poll() is not None: # Worker ist tot! stderr_log = self.worker_log_dir / f"fop_worker_{worker_idx}_stderr.log" try: with open(stderr_log, "r") as f: stderr_content = f.read() error_msg = ( f"FOP Worker {worker_idx} ist beendet (Exit: {worker.returncode})\nstderr:\n{stderr_content}" ) except Exception: error_msg = f"FOP Worker {worker_idx} ist beendet (Exit: {worker.returncode})" logger.error(error_msg) return False, error_msg # Erstelle Job-String (Tab-separated) job = f"{input_fo}\t{output_pdf}\n" logger.debug(f"Sende FOP-Job an Worker {worker_idx}: {input_fo.name} → {output_pdf.name}") # Sende Job an Worker worker.stdin.write(job) worker.stdin.flush() # Warte auf Antwort response = worker.stdout.readline().strip() logger.debug(f"FOP Worker {worker_idx} Antwort: '{response}'") if response == "OK": return True, "Erfolgreich" elif response.startswith("ERROR:"): error_msg = response[6:].strip() return False, f"FOP-Fehler: {error_msg}" else: # Leere Antwort bedeutet Worker ist crashed if not response: stderr_log = self.worker_log_dir / f"fop_worker_{worker_idx}_stderr.log" try: with open(stderr_log, "r") as f: stderr_content = f.read()[-500:] # Letzte 500 Zeichen return False, f"FOP Worker {worker_idx} crashed (keine Antwort)\nstderr:\n{stderr_content}" except Exception: return False, f"FOP Worker {worker_idx} crashed (keine Antwort)" return False, f"Unerwartete Antwort: {response}" except Exception as e: logger.error(f"Fehler bei FOP Worker {worker_idx}: {e}") return False, f"Worker-Fehler: {str(e)}" finally: # Gebe Worker-Lock frei self.worker_locks[worker_idx].release() def shutdown(self): """Beendet alle Worker-Prozesse sauber.""" logger.info("Beende FOP-Worker-Pool...") for i, worker in enumerate(self.workers): try: # Sende EXIT-Befehl if worker.stdin and not worker.stdin.closed: worker.stdin.write("EXIT\n") worker.stdin.flush() # Warte auf Beendigung (max 2 Sekunden) worker.wait(timeout=2) logger.debug(f"FOP Worker {i} beendet") except subprocess.TimeoutExpired: # Force kill falls nötig worker.kill() logger.warning(f"FOP Worker {i} musste gekillt werden") except Exception as e: logger.error(f"Fehler beim Beenden von FOP Worker {i}: {e}") # Lösche temporäres Verzeichnis if self.temp_dir and self.temp_dir.exists(): try: import shutil shutil.rmtree(self.temp_dir) logger.debug(f"Temporäres Verzeichnis gelöscht: {self.temp_dir}") except Exception as e: logger.warning(f"Konnte temporäres Verzeichnis nicht löschen: {e}") logger.info("FOP-Worker-Pool beendet") def __enter__(self): """Context manager entry.""" return self def __exit__(self, exc_type, exc_val, exc_tb): """Context manager exit.""" self.shutdown()