Performance-Revolution: Saxon-Worker-Pool eliminiert JVM-Startup-Overhead

Implementiert persistente JVM-Worker-Pool für 5-10x schnellere Transformationen:

VORHER:
- 82 Dateien in 60s (12 Worker) = 0.73s/Datei
- JVM-Start bei jeder Transformation (~500ms Overhead)
- Classpath wird jedes Mal neu geladen

NACHHER (erwartet):
- 82 Dateien in ~8-12s (12 Worker) = 0.10-0.15s/Datei
- JVM läuft persistent (einmalig ~500ms beim Start)
- 5-10x schneller! 🚀

Architektur:
- SaxonWorkerPool: Verwaltet N lang-laufende JVM-Prozesse
- SaxonWorker.java: Java-Daemon der Saxon-Transformationen ausführt
- Kommunikation via stdin/stdout (Tab-separated Job-Format)
- Automatisches Fallback auf subprocess bei Pool-Fehlern
- Graceful Shutdown beim Beenden der Anwendung

Neue Dateien:
- src/saxon_pool.py: Worker-Pool-Implementierung
  - Kompiliert SaxonWorker.java zur Laufzeit
  - Startet N JVM-Prozesse beim Projekt-Öffnen
  - Thread-safe Job-Verteilung mit Locks
  - Context Manager für sauberen Shutdown

Änderungen:
- transform.py: Nutzt Pool wenn verfügbar, Fallback auf subprocess
- MainWindow.py: Initialisiert Pool beim Projekt-Öffnen, beendet bei Close
- set_saxon_worker_pool() zum globalen Pool-Management

Technische Details:
- Java-Code als String eingebettet, Runtime-Kompilierung mit javac
- stdout für Job-Ergebnisse, stderr für Saxon-Logs
- Tab-separated Format: source\txsl\toutput\tparams
- Worker antworten mit "OK" oder "ERROR: message"

Nächster Test wird zeigen ob 8-12s erreicht werden! 🎯

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
This commit is contained in:
2025-12-28 13:40:00 +01:00
parent 5ecad6ce89
commit b30bb0ed2d
3 changed files with 438 additions and 6 deletions
+332
View File
@@ -0,0 +1,332 @@
"""
Saxon Worker Pool - Persistente JVM-Prozesse für schnelle XSLT-Transformationen.
Eliminiert JVM-Startup-Overhead durch Vorinitialisierung von N Worker-Prozessen.
Jeder Worker läuft als Daemon und verarbeitet mehrere Transformationen nacheinander.
"""
import logging
import subprocess
import threading
from pathlib import Path
from queue import Queue
from typing import Optional
import tempfile
logger = logging.getLogger(__name__)
# Java-Worker-Code (wird zur Laufzeit kompiliert)
SAXON_WORKER_JAVA = """
import net.sf.saxon.Transform;
import java.io.*;
import java.util.*;
public class SaxonWorker {
public static void main(String[] args) {
BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
String line;
System.err.println("SaxonWorker started and ready");
try {
while ((line = reader.readLine()) != null) {
if ("EXIT".equals(line.trim())) {
System.err.println("SaxonWorker exiting");
break;
}
try {
// Parse JSON job
String[] parts = line.split("\\t");
if (parts.length < 3) {
System.out.println("ERROR: Invalid job format");
continue;
}
String sourceXml = parts[0];
String xslStylesheet = parts[1];
String outputFo = parts[2];
// Build Saxon arguments
List<String> saxonArgs = new ArrayList<>();
saxonArgs.add("-s:" + sourceXml);
saxonArgs.add("-xsl:" + xslStylesheet);
saxonArgs.add("-o:" + outputFo);
// Add parameters if present
if (parts.length > 3 && !parts[3].isEmpty()) {
String[] params = parts[3].split("\\|\\|\\|");
for (String param : params) {
if (!param.isEmpty()) {
saxonArgs.add(param);
}
}
}
// Redirect Saxon output to stderr to avoid polluting stdout
PrintStream oldOut = System.out;
PrintStream oldErr = System.err;
ByteArrayOutputStream saxonOut = new ByteArrayOutputStream();
ByteArrayOutputStream saxonErr = new ByteArrayOutputStream();
try {
System.setOut(new PrintStream(saxonOut));
System.setErr(new PrintStream(saxonErr));
// Run Saxon transformation
Transform.main(saxonArgs.toArray(new String[0]));
// Restore streams
System.setOut(oldOut);
System.setErr(oldErr);
// Send success response
System.out.println("OK");
} catch (Exception e) {
System.setOut(oldOut);
System.setErr(oldErr);
System.out.println("ERROR: " + e.getMessage());
}
} catch (Exception e) {
System.out.println("ERROR: " + e.getMessage());
}
}
} catch (IOException e) {
System.err.println("SaxonWorker error: " + e.getMessage());
}
}
}
"""
class SaxonWorkerPool:
"""
Pool von lang-laufenden JVM-Prozessen für Saxon-Transformationen.
Eliminiert JVM-Startup-Overhead durch Wiederverwendung von N Worker-Prozessen.
"""
def __init__(
self,
num_workers: int,
java_vm_path: Path,
saxon_jar_path: Path,
classpath_cache: dict[Path, str],
):
"""
Initialisiert den Saxon-Worker-Pool.
Args:
num_workers: Anzahl der Worker-Prozesse
java_vm_path: Pfad zur Java VM Binary
saxon_jar_path: Pfad zur Saxon JAR-Datei
classpath_cache: Cache für Saxon-Classpaths
"""
self.num_workers = num_workers
self.java_vm_path = java_vm_path
self.saxon_jar_path = saxon_jar_path
self.classpath_cache = classpath_cache
# Worker-Prozesse und Queues
self.workers: list[subprocess.Popen] = []
self.job_queue: Queue = Queue()
self.result_queue: Queue = Queue()
self.worker_locks: list[threading.Lock] = []
# Temporäres Verzeichnis für kompilierte Java-Klasse
self.temp_dir: Optional[Path] = None
self.worker_class_path: Optional[Path] = None
# Initialisierung
self._compile_worker_class()
self._start_workers()
logger.info(f"SaxonWorkerPool initialisiert mit {num_workers} Workern")
def _compile_worker_class(self):
"""Kompiliert die SaxonWorker-Java-Klasse."""
try:
# Erstelle temporäres Verzeichnis
self.temp_dir = Path(tempfile.mkdtemp(prefix="saxon_worker_"))
# Schreibe Java-Quellcode
java_file = self.temp_dir / "SaxonWorker.java"
java_file.write_text(SAXON_WORKER_JAVA, encoding="utf-8")
# Hole Classpath
saxon_dir = self.saxon_jar_path.parent
if saxon_dir in self.classpath_cache:
classpath = self.classpath_cache[saxon_dir]
else:
# Fallback: Baue Classpath neu
import glob
import sys
all_jars = glob.glob(str(saxon_dir / "*.jar"))
lib_dir = saxon_dir / "lib"
if lib_dir.exists():
all_jars.extend(glob.glob(str(lib_dir / "*.jar")))
classpath_separator = ";" if sys.platform == "win32" else ":"
classpath = classpath_separator.join(all_jars)
# Kompiliere Java-Klasse
javac_cmd = [str(self.java_vm_path).replace("java", "javac"), "-cp", classpath, str(java_file)]
logger.debug(f"Kompiliere SaxonWorker: {' '.join(javac_cmd)}")
result = subprocess.run(javac_cmd, capture_output=True, text=True, timeout=30)
if result.returncode != 0:
raise RuntimeError(f"Java-Kompilierung fehlgeschlagen: {result.stderr}")
self.worker_class_path = self.temp_dir
logger.info(f"SaxonWorker erfolgreich kompiliert: {self.temp_dir}")
except Exception as e:
logger.error(f"Fehler beim Kompilieren von SaxonWorker: {e}")
raise
def _start_workers(self):
"""Startet N Worker-Prozesse."""
# Hole Classpath
saxon_dir = self.saxon_jar_path.parent
classpath = self.classpath_cache.get(saxon_dir, "")
# Füge Worker-Classpath hinzu
import sys
classpath_separator = ";" if sys.platform == "win32" else ":"
full_classpath = str(self.worker_class_path) + classpath_separator + classpath
for i in range(self.num_workers):
try:
# Starte JVM-Prozess mit SaxonWorker
cmd = [str(self.java_vm_path), "-cp", full_classpath, "SaxonWorker"]
process = subprocess.Popen(
cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=1, # Line buffered
)
self.workers.append(process)
self.worker_locks.append(threading.Lock())
logger.debug(f"Worker {i} gestartet (PID: {process.pid})")
except Exception as e:
logger.error(f"Fehler beim Starten von Worker {i}: {e}")
raise
logger.info(f"{len(self.workers)} Saxon-Worker erfolgreich gestartet")
def transform(
self, source_xml: Path, xsl_stylesheet: Path, output_fo: Path, xslt_params: dict[str, str]
) -> tuple[bool, str]:
"""
Führt eine XSLT-Transformation mit einem Worker aus dem Pool aus.
Args:
source_xml: Pfad zur XML-Eingabedatei
xsl_stylesheet: Pfad zur XSL-Stylesheet-Datei
output_fo: Pfad zur FO-Ausgabedatei
xslt_params: Dictionary mit XSLT-Parametern
Returns:
tuple[bool, str]: (Erfolg, Fehlermeldung/Info)
"""
# Finde freien Worker
worker_idx = None
for i, lock in enumerate(self.worker_locks):
if lock.acquire(blocking=False):
worker_idx = i
break
if worker_idx is None:
# Kein freier Worker, warte auf ersten verfügbaren
for i, lock in enumerate(self.worker_locks):
lock.acquire()
worker_idx = i
break
try:
worker = self.workers[worker_idx]
# Formatiere Parameter
params_str = "|||".join([f"{key}={value}" for key, value in xslt_params.items()])
# Erstelle Job-String (Tab-separated)
job = f"{source_xml}\t{xsl_stylesheet}\t{output_fo}\t{params_str}\n"
# Sende Job an Worker
worker.stdin.write(job)
worker.stdin.flush()
# Warte auf Antwort
response = worker.stdout.readline().strip()
if response == "OK":
return True, "Erfolgreich"
elif response.startswith("ERROR:"):
error_msg = response[6:].strip()
return False, f"Saxon-Fehler: {error_msg}"
else:
return False, f"Unerwartete Antwort: {response}"
except Exception as e:
logger.error(f"Fehler bei Worker {worker_idx}: {e}")
return False, f"Worker-Fehler: {str(e)}"
finally:
# Gebe Worker-Lock frei
self.worker_locks[worker_idx].release()
def shutdown(self):
"""Beendet alle Worker-Prozesse sauber."""
logger.info("Beende Saxon-Worker-Pool...")
for i, worker in enumerate(self.workers):
try:
# Sende EXIT-Befehl
if worker.stdin and not worker.stdin.closed:
worker.stdin.write("EXIT\n")
worker.stdin.flush()
# Warte auf Beendigung (max 2 Sekunden)
worker.wait(timeout=2)
logger.debug(f"Worker {i} beendet")
except subprocess.TimeoutExpired:
# Force kill falls nötig
worker.kill()
logger.warning(f"Worker {i} musste gekillt werden")
except Exception as e:
logger.error(f"Fehler beim Beenden von Worker {i}: {e}")
# Lösche temporäres Verzeichnis
if self.temp_dir and self.temp_dir.exists():
try:
import shutil
shutil.rmtree(self.temp_dir)
logger.debug(f"Temporäres Verzeichnis gelöscht: {self.temp_dir}")
except Exception as e:
logger.warning(f"Konnte temporäres Verzeichnis nicht löschen: {e}")
logger.info("Saxon-Worker-Pool beendet")
def __enter__(self):
"""Context manager entry."""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit."""
self.shutdown()
+44 -5
View File
@@ -11,10 +11,26 @@ import logging
import subprocess import subprocess
from pathlib import Path from pathlib import Path
from datetime import datetime from datetime import datetime
from typing import Any from typing import Any, Optional, TYPE_CHECKING
if TYPE_CHECKING:
from saxon_pool import SaxonWorkerPool
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# Globaler Saxon-Worker-Pool (wird von MainWindow initialisiert)
_saxon_worker_pool: Optional["SaxonWorkerPool"] = None
def set_saxon_worker_pool(pool: Optional["SaxonWorkerPool"]):
"""Setzt den globalen Saxon-Worker-Pool."""
global _saxon_worker_pool
_saxon_worker_pool = pool
if pool:
logger.info(f"Saxon-Worker-Pool aktiviert mit {pool.num_workers} Workern")
else:
logger.info("Saxon-Worker-Pool deaktiviert (Fallback auf subprocess)")
class TransformationJob: class TransformationJob:
""" """
@@ -164,6 +180,31 @@ class TransformationJob:
logger.error(error_msg) logger.error(error_msg)
return False, error_msg return False, error_msg
logger.info(f"Starte Saxon-Transformation: {self.xml_file.name}")
# Versuche zuerst den Worker-Pool zu nutzen (schneller!)
global _saxon_worker_pool
if _saxon_worker_pool:
try:
success, message = _saxon_worker_pool.transform(
source_xml=xml_abs,
xsl_stylesheet=self.xsl_file,
output_fo=self.temp_fo,
xslt_params=self.xslt_params,
)
if success:
logger.info(f"Saxon-Transformation erfolgreich (Worker-Pool): {self.xml_file.name}")
else:
logger.error(f"Saxon-Transformation fehlgeschlagen (Worker-Pool): {message}")
return success, message
except Exception as e:
logger.warning(f"Worker-Pool-Fehler, Fallback auf subprocess: {e}")
# Fallback auf subprocess unten
# Fallback: Traditionelle subprocess-Methode (langsamer, aber robuster)
# XSLT-Parameter formatieren # XSLT-Parameter formatieren
params = [f"{key}={value}" for key, value in self.xslt_params.items()] params = [f"{key}={value}" for key, value in self.xslt_params.items()]
@@ -196,7 +237,6 @@ class TransformationJob:
logger.debug("Classpath aus Cache verwendet") logger.debug("Classpath aus Cache verwendet")
# Saxon-Kommandozeile # Saxon-Kommandozeile
# Verwende -cp mit allen JARs und rufe Transform-Main direkt auf
cmd_line = [ cmd_line = [
str(self.java_vm_path), str(self.java_vm_path),
"-cp", "-cp",
@@ -208,8 +248,7 @@ class TransformationJob:
*params, *params,
] ]
logger.info(f"Starte Saxon-Transformation: {self.xml_file.name}") logger.debug(f"Kommandozeile (subprocess fallback): {' '.join(cmd_line)}")
logger.debug(f"Kommandozeile: {' '.join(cmd_line)}")
try: try:
result = subprocess.run( result = subprocess.run(
@@ -226,7 +265,7 @@ class TransformationJob:
logger.debug(f"Saxon StdErr:\n{result.stderr}") logger.debug(f"Saxon StdErr:\n{result.stderr}")
if result.returncode == 0: if result.returncode == 0:
logger.info(f"Saxon-Transformation erfolgreich: {self.xml_file.name}") logger.info(f"Saxon-Transformation erfolgreich (subprocess): {self.xml_file.name}")
return True, "Erfolgreich" return True, "Erfolgreich"
else: else:
error_msg = ( error_msg = (
+62 -1
View File
@@ -29,7 +29,8 @@ from ui.TreeNodeEditDialog import TreeNodeEditDialog
from ui.XslFileEditDialog import XslFileEditDialog from ui.XslFileEditDialog import XslFileEditDialog
from ui.XmlToXslAssignDialog import XmlToXslAssignDialog from ui.XmlToXslAssignDialog import XmlToXslAssignDialog
from conf import app_settings, Project, ProjectData, TreeNode, XslFile, XmlFile from conf import app_settings, Project, ProjectData, TreeNode, XslFile, XmlFile
from transform import TransformationJob from transform import TransformationJob, set_saxon_worker_pool
from saxon_pool import SaxonWorkerPool
from pathlib import Path from pathlib import Path
@@ -723,6 +724,9 @@ class MainWindow(QMainWindow):
# Starte Hash-Berechnung für alle XML-Dateien # Starte Hash-Berechnung für alle XML-Dateien
self._start_xml_hash_calculation() self._start_xml_hash_calculation()
# Initialisiere Saxon-Worker-Pool für schnellere Transformationen
self._initialize_saxon_worker_pool()
except Exception as e: except Exception as e:
logger.error(f"Fehler beim Laden des Projekts '{project.name}': {e}") logger.error(f"Fehler beim Laden des Projekts '{project.name}': {e}")
# Fallback: Erstelle Standard-Einstellungen # Fallback: Erstelle Standard-Einstellungen
@@ -734,6 +738,60 @@ class MainWindow(QMainWindow):
except Exception as fallback_error: except Exception as fallback_error:
logger.error(f"Fehler beim Erstellen der Fallback-Einstellungen: {fallback_error}") logger.error(f"Fehler beim Erstellen der Fallback-Einstellungen: {fallback_error}")
def _initialize_saxon_worker_pool(self):
"""Initialisiert den Saxon-Worker-Pool für schnelle Transformationen."""
try:
# Shutdown vorherigen Pool falls vorhanden
self._shutdown_saxon_worker_pool()
if not self.project:
logger.warning("Kein Projekt geladen, Saxon-Worker-Pool nicht initialisiert")
return
# Hole Tool-Konfigurationen
java_vm = next((vm for vm in app_settings.java_vms if vm.id == self.project.java_vm_id), None)
saxon_jar = next((jar for jar in app_settings.saxon_jars if jar.id == self.project.saxon_jar_id), None)
if not java_vm or not saxon_jar:
logger.warning("Java VM oder Saxon JAR nicht gefunden, Pool nicht initialisiert")
return
# Erstelle Worker-Pool
num_workers = app_settings.max_workers
pool = SaxonWorkerPool(
num_workers=num_workers,
java_vm_path=java_vm.path_to_binary_file,
saxon_jar_path=saxon_jar.path_to_jar_file,
classpath_cache=TransformationJob._classpath_cache,
)
# Setze globalen Pool
set_saxon_worker_pool(pool)
logger.info(
f"Saxon-Worker-Pool initialisiert: {num_workers} Worker "
f"(erwartet: {num_workers}x schneller für Saxon-Transformationen)"
)
except Exception as e:
logger.error(f"Fehler beim Initialisieren des Saxon-Worker-Pools: {e}")
# Kein Pool ist OK - Fallback auf subprocess
def _shutdown_saxon_worker_pool(self):
"""Beendet den Saxon-Worker-Pool sauber."""
try:
# Importiere transform um Zugriff auf globalen Pool zu haben
import transform
if transform._saxon_worker_pool:
logger.info("Beende Saxon-Worker-Pool...")
transform._saxon_worker_pool.shutdown()
set_saxon_worker_pool(None)
logger.info("Saxon-Worker-Pool beendet")
except Exception as e:
logger.error(f"Fehler beim Beenden des Saxon-Worker-Pools: {e}")
def change_theme(self, theme_name): def change_theme(self, theme_name):
""" """
Wechselt das Theme der Anwendung. Wechselt das Theme der Anwendung.
@@ -4613,6 +4671,9 @@ class MainWindow(QMainWindow):
self.transformation_thread.quit() self.transformation_thread.quit()
self.transformation_thread.wait() self.transformation_thread.wait()
# Beende Saxon-Worker-Pool
self._shutdown_saxon_worker_pool()
# PDF-Dokumente schließen ist bei QtPdf automatisch durch Garbage Collection # PDF-Dokumente schließen ist bei QtPdf automatisch durch Garbage Collection
super().closeEvent(event) super().closeEvent(event)