Files
xsl-validator/src/saxon_pool.py
T

517 lines
20 KiB
Python
Raw Normal View History

"""
Saxon Worker Pool - Persistente JVM-Prozesse für schnelle XSLT-Transformationen.
Eliminiert JVM-Startup-Overhead durch Vorinitialisierung von N Worker-Prozessen.
Jeder Worker läuft als Daemon und verarbeitet mehrere Transformationen nacheinander.
"""
import logging
import subprocess
import threading
import time
import psutil
from pathlib import Path
from queue import Queue
from typing import Optional
import tempfile
from worker_metrics import WorkerPoolMetrics
logger = logging.getLogger(__name__)
# Java-Worker-Code (wird zur Laufzeit kompiliert)
SAXON_WORKER_JAVA = """
import javax.xml.transform.*;
import javax.xml.transform.stream.*;
import java.io.*;
import java.util.*;
public class SaxonWorker {
public static void main(String[] args) {
BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
String line;
// Create TransformerFactory once and reuse
TransformerFactory factory = TransformerFactory.newInstance();
System.err.println("SaxonWorker started and ready (using JAXP Transformer API)");
System.err.flush();
try {
while ((line = reader.readLine()) != null) {
System.err.println("DEBUG: Received line: " + line.substring(0, Math.min(100, line.length())));
System.err.flush();
if ("EXIT".equals(line.trim())) {
System.err.println("SaxonWorker exiting");
break;
}
try {
// Parse job
System.err.println("DEBUG: Parsing job...");
System.err.flush();
String[] parts = line.split("\\t");
System.err.println("DEBUG: Parts count: " + parts.length);
System.err.flush();
if (parts.length < 3) {
System.out.println("ERROR: Invalid job format");
System.out.flush();
continue;
}
String sourceXml = parts[0];
String xslStylesheet = parts[1];
String outputFo = parts[2];
System.err.println("DEBUG: Creating transformer from stylesheet...");
System.err.flush();
// Create Source and Result objects
StreamSource xslSource = new StreamSource(new File(xslStylesheet));
StreamSource xmlSource = new StreamSource(new File(sourceXml));
StreamResult result = new StreamResult(new File(outputFo));
System.err.println("DEBUG: Compiling stylesheet...");
System.err.flush();
// Create transformer from stylesheet
Transformer transformer = factory.newTransformer(xslSource);
// Set parameters if present
if (parts.length > 3 && !parts[3].isEmpty()) {
String[] params = parts[3].split("\\\\|\\\\|\\\\|");
for (String param : params) {
if (!param.isEmpty() && param.contains("=")) {
String[] kv = param.split("=", 2);
transformer.setParameter(kv[0], kv[1]);
System.err.println("DEBUG: Set parameter: " + kv[0] + " = " + kv[1]);
}
}
System.err.flush();
}
System.err.println("DEBUG: Running transformation...");
System.err.flush();
// Capture errors via ErrorListener
final StringBuilder errors = new StringBuilder();
transformer.setErrorListener(new ErrorListener() {
@Override
public void warning(TransformerException e) {
errors.append("WARNING: ").append(e.getMessage()).append("\\n");
}
@Override
public void error(TransformerException e) {
errors.append("ERROR: ").append(e.getMessage()).append("\\n");
}
@Override
public void fatalError(TransformerException e) throws TransformerException {
errors.append("FATAL: ").append(e.getMessage()).append("\\n");
throw e;
}
});
// Run transformation
transformer.transform(xmlSource, result);
System.err.println("DEBUG: Transformation completed");
System.err.flush();
// Check for errors
if (errors.length() > 0) {
System.out.println("ERROR: " + errors.toString().trim());
} else {
System.out.println("OK");
}
System.out.flush();
} catch (TransformerException e) {
System.err.println("DEBUG: Transformer exception: " + e.getClass().getName());
System.err.flush();
e.printStackTrace(System.err);
String errorMsg = e.getMessage();
if (errorMsg == null || errorMsg.isEmpty()) {
errorMsg = e.getClass().getSimpleName();
}
System.out.println("ERROR: " + errorMsg);
System.out.flush();
} catch (Exception e) {
System.err.println("DEBUG: Job processing exception: " + e.getClass().getName());
System.err.flush();
e.printStackTrace(System.err);
System.out.println("ERROR: " + (e.getMessage() != null ? e.getMessage() : e.getClass().getName()));
System.out.flush();
}
}
} catch (IOException e) {
System.err.println("SaxonWorker I/O error: " + e.getMessage());
e.printStackTrace(System.err);
}
}
}
"""
class SaxonWorkerPool:
"""
Pool von lang-laufenden JVM-Prozessen für Saxon-Transformationen.
Eliminiert JVM-Startup-Overhead durch Wiederverwendung von N Worker-Prozessen.
"""
def __init__(
self,
num_workers: int,
java_vm_path: Path,
saxon_jar_path: Path,
classpath_cache: dict[Path, str],
log_dir: Optional[Path] = None,
):
"""
Initialisiert den Saxon-Worker-Pool.
Args:
num_workers: Anzahl der Worker-Prozesse
java_vm_path: Pfad zur Java VM Binary
saxon_jar_path: Pfad zur Saxon JAR-Datei
classpath_cache: Cache für Saxon-Classpaths
log_dir: Optionales Verzeichnis für Worker-Logs (Standard: temp_dir/temp)
"""
self.num_workers = num_workers
self.java_vm_path = java_vm_path
self.saxon_jar_path = saxon_jar_path
self.classpath_cache = classpath_cache
self.log_dir = log_dir
# Worker-Prozesse und Queues
self.workers: list[subprocess.Popen] = []
self.job_queue: Queue = Queue()
self.result_queue: Queue = Queue()
self.worker_locks: list[threading.Lock] = []
# Temporäres Verzeichnis für kompilierte Java-Klasse
self.temp_dir: Optional[Path] = None
self.worker_class_path: Optional[Path] = None
self.worker_log_dir: Optional[Path] = None
# Performance-Metriken
self.metrics = WorkerPoolMetrics()
# Initialisierung
self._compile_worker_class()
self._start_workers()
logger.info(f"SaxonWorkerPool initialisiert mit {num_workers} Workern")
def _compile_worker_class(self):
"""Kompiliert die SaxonWorker-Java-Klasse."""
start_time = time.time()
try:
# Erstelle temporäres Verzeichnis
self.temp_dir = Path(tempfile.mkdtemp(prefix="saxon_worker_"))
# Schreibe Java-Quellcode
java_file = self.temp_dir / "SaxonWorker.java"
java_file.write_text(SAXON_WORKER_JAVA, encoding="utf-8")
# Hole Classpath
saxon_dir = self.saxon_jar_path.parent
if saxon_dir in self.classpath_cache:
classpath = self.classpath_cache[saxon_dir]
else:
# Fallback: Baue Classpath neu
import glob
import sys
all_jars = glob.glob(str(saxon_dir / "*.jar"))
lib_dir = saxon_dir / "lib"
if lib_dir.exists():
all_jars.extend(glob.glob(str(lib_dir / "*.jar")))
classpath_separator = ";" if sys.platform == "win32" else ":"
classpath = classpath_separator.join(all_jars)
# Kompiliere Java-Klasse
javac_cmd = [str(self.java_vm_path).replace("java", "javac"), "-cp", classpath, str(java_file)]
logger.debug(f"Kompiliere SaxonWorker: {' '.join(javac_cmd)}")
result = subprocess.run(javac_cmd, capture_output=True, text=True, timeout=30)
if result.returncode != 0:
raise RuntimeError(f"Java-Kompilierung fehlgeschlagen: {result.stderr}")
self.worker_class_path = self.temp_dir
# Speichere Kompilierungszeit
self.metrics.compilation_time_seconds = time.time() - start_time
logger.info(
f"SaxonWorker erfolgreich kompiliert: {self.temp_dir} " f"({self.metrics.compilation_time_seconds:.3f}s)"
)
except Exception as e:
logger.error(f"Fehler beim Kompilieren von SaxonWorker: {e}")
raise
def _start_workers(self):
"""Startet N Worker-Prozesse."""
# Hole Classpath
saxon_dir = self.saxon_jar_path.parent
classpath = self.classpath_cache.get(saxon_dir, "")
# Füge Worker-Classpath hinzu
import sys
classpath_separator = ";" if sys.platform == "win32" else ":"
full_classpath = str(self.worker_class_path) + classpath_separator + classpath
# Bestimme Log-Verzeichnis
self.worker_log_dir = self.log_dir if self.log_dir else self.temp_dir
if self.log_dir:
self.worker_log_dir.mkdir(parents=True, exist_ok=True)
for i in range(self.num_workers):
worker_start_time = time.time()
try:
# Starte JVM-Prozess mit SaxonWorker
cmd = [str(self.java_vm_path), "-cp", full_classpath, "SaxonWorker"]
# Öffne stderr-Log-Datei für diesen Worker
stderr_log = self.worker_log_dir / f"worker_{i}_stderr.log"
stderr_file = open(stderr_log, "w", encoding="utf-8")
process = subprocess.Popen(
cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=stderr_file, # Redirect stderr to file
text=True,
bufsize=1, # Line buffered
)
self.workers.append(process)
self.worker_locks.append(threading.Lock())
logger.debug(f"Worker {i} gestartet (PID: {process.pid}, stderr: {stderr_log})")
# Warte kurz damit Worker initialisieren kann
time.sleep(0.1)
# Prüfe ob Worker noch läuft
if process.poll() is not None:
# Worker ist bereits beendet - Fehler!
stderr_file.close()
with open(stderr_log, "r") as f:
stderr_content = f.read()
raise RuntimeError(
f"Worker {i} ist sofort beendet (Exit Code: {process.returncode})\nstderr:\n{stderr_content}"
)
# Speichere Worker-Startzeit
worker_elapsed = time.time() - worker_start_time
self.metrics.worker_start_times.append(worker_elapsed)
except Exception as e:
logger.error(f"Fehler beim Starten von Worker {i}: {e}")
raise
# Berechne Aggregat-Werte für Worker-Startzeiten
self.metrics.calculate_aggregates()
logger.info(
f"{len(self.workers)} Saxon-Worker erfolgreich gestartet "
f"(Summe: {self.metrics.total_worker_start_time_seconds:.3f}s, "
f"Durchschnitt: {self.metrics.average_worker_start_time_seconds:.3f}s)"
)
def transform(
self, source_xml: Path, xsl_stylesheet: Path, output_fo: Path, xslt_params: dict[str, str]
) -> tuple[bool, str]:
"""
Führt eine XSLT-Transformation mit einem Worker aus dem Pool aus.
Args:
source_xml: Pfad zur XML-Eingabedatei
xsl_stylesheet: Pfad zur XSL-Stylesheet-Datei
output_fo: Pfad zur FO-Ausgabedatei
xslt_params: Dictionary mit XSLT-Parametern
Returns:
tuple[bool, str]: (Erfolg, Fehlermeldung/Info)
"""
# Finde freien Worker
worker_idx = None
for i, lock in enumerate(self.worker_locks):
if lock.acquire(blocking=False):
worker_idx = i
break
if worker_idx is None:
# Kein freier Worker, warte auf ersten verfügbaren
for i, lock in enumerate(self.worker_locks):
lock.acquire()
worker_idx = i
break
try:
worker = self.workers[worker_idx]
# Prüfe ob Worker noch läuft
if worker.poll() is not None:
# Worker ist tot!
stderr_log = self.worker_log_dir / f"worker_{worker_idx}_stderr.log"
try:
with open(stderr_log, "r") as f:
stderr_content = f.read()
error_msg = (
f"Worker {worker_idx} ist beendet (Exit: {worker.returncode})\nstderr:\n{stderr_content}"
)
except Exception:
error_msg = f"Worker {worker_idx} ist beendet (Exit: {worker.returncode})"
logger.error(error_msg)
return False, error_msg
# Formatiere Parameter
params_str = "|||".join([f"{key}={value}" for key, value in xslt_params.items()])
# Erstelle Job-String (Tab-separated)
job = f"{source_xml}\t{xsl_stylesheet}\t{output_fo}\t{params_str}\n"
logger.debug(f"Sende Job an Worker {worker_idx}: {source_xml.name}")
# Sende Job an Worker
worker.stdin.write(job)
worker.stdin.flush()
# Warte auf Antwort
response = worker.stdout.readline().strip()
logger.debug(f"Worker {worker_idx} Antwort: '{response}'")
if response == "OK":
return True, "Erfolgreich"
elif response.startswith("ERROR:"):
error_msg = response[6:].strip()
return False, f"Saxon-Fehler: {error_msg}"
else:
# Leere Antwort bedeutet Worker ist crashed
if not response:
stderr_log = self.worker_log_dir / f"worker_{worker_idx}_stderr.log"
try:
with open(stderr_log, "r") as f:
stderr_content = f.read()[-500:] # Letzte 500 Zeichen
return False, f"Worker {worker_idx} crashed (keine Antwort)\nstderr:\n{stderr_content}"
except Exception:
return False, f"Worker {worker_idx} crashed (keine Antwort)"
return False, f"Unerwartete Antwort: {response}"
except Exception as e:
logger.error(f"Fehler bei Worker {worker_idx}: {e}")
return False, f"Worker-Fehler: {str(e)}"
finally:
# Gebe Worker-Lock frei
self.worker_locks[worker_idx].release()
def measure_ram_usage(self) -> tuple[float, float, list[float]]:
"""
Misst den aktuellen RAM-Verbrauch aller Worker-Prozesse.
Returns:
tuple: (total_mb, average_mb, per_worker_mb_list)
"""
ram_per_worker = []
for i, worker in enumerate(self.workers):
try:
if worker.poll() is None: # Worker läuft noch
process = psutil.Process(worker.pid)
# Hole Speicherinfo (RSS = Resident Set Size in Bytes)
mem_info = process.memory_info()
ram_mb = mem_info.rss / (1024 * 1024) # Konvertiere zu MB
ram_per_worker.append(ram_mb)
else:
logger.warning(f"Worker {i} ist nicht mehr aktiv (kann RAM nicht messen)")
except (psutil.NoSuchProcess, psutil.AccessDenied) as e:
logger.warning(f"Konnte RAM für Worker {i} nicht messen: {e}")
total_ram = sum(ram_per_worker)
average_ram = total_ram / len(ram_per_worker) if ram_per_worker else 0.0
return total_ram, average_ram, ram_per_worker
def capture_ram_before_transform(self):
"""Erfasst RAM-Verbrauch vor der ersten Transformation."""
total, average, per_worker = self.measure_ram_usage()
self.metrics.ram_before_transform_mb_per_worker = per_worker
self.metrics.total_ram_before_mb = total
self.metrics.average_ram_before_mb = average
logger.info(
f"RAM vor Transformation: {self.metrics.total_ram_before_mb:.1f} MB "
f"(Durchschnitt: {self.metrics.average_ram_before_mb:.1f} MB/Worker)"
)
def capture_ram_after_transform(self):
"""Erfasst RAM-Verbrauch nach allen Transformationen."""
total, average, per_worker = self.measure_ram_usage()
self.metrics.ram_after_transform_mb_per_worker = per_worker
self.metrics.total_ram_after_mb = total
self.metrics.average_ram_after_mb = average
logger.info(
f"RAM nach Transformation: {self.metrics.total_ram_after_mb:.1f} MB "
f"(Durchschnitt: {self.metrics.average_ram_after_mb:.1f} MB/Worker)"
)
def shutdown(self):
"""Beendet alle Worker-Prozesse sauber."""
logger.info("Beende Saxon-Worker-Pool...")
for i, worker in enumerate(self.workers):
try:
# Sende EXIT-Befehl
if worker.stdin and not worker.stdin.closed:
worker.stdin.write("EXIT\n")
worker.stdin.flush()
# Warte auf Beendigung (max 2 Sekunden)
worker.wait(timeout=2)
logger.debug(f"Worker {i} beendet")
except subprocess.TimeoutExpired:
# Force kill falls nötig
worker.kill()
logger.warning(f"Worker {i} musste gekillt werden")
except Exception as e:
logger.error(f"Fehler beim Beenden von Worker {i}: {e}")
# Lösche temporäres Verzeichnis
if self.temp_dir and self.temp_dir.exists():
try:
import shutil
shutil.rmtree(self.temp_dir)
logger.debug(f"Temporäres Verzeichnis gelöscht: {self.temp_dir}")
except Exception as e:
logger.warning(f"Konnte temporäres Verzeichnis nicht löschen: {e}")
logger.info("Saxon-Worker-Pool beendet")
def __enter__(self):
"""Context manager entry."""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit."""
self.shutdown()