Refactor: Gemeinsame Basisklassen für Worker-Pools und Parameter-Dialoge

- BaseWorkerPool (worker_pool_base.py): Eliminiert ~450 Zeilen Duplikation
  aus saxon_pool.py, saxon_pool_s9api.py und fop_pool.py; behebt stderr-Handle-Leak
- XsltParamsEditDialog (XsltParamsEditDialog.py): Gemeinsame Basisklasse für
  TreeNodeEditDialog und XslFileEditDialog; reduziert je 162 auf 8 Zeilen

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-09 19:49:57 +01:00
parent cb90f9e483
commit 37ebdff349
7 changed files with 552 additions and 1182 deletions
+39 -282
View File
@@ -5,16 +5,12 @@ Eliminiert JVM-Startup-Overhead durch Vorinitialisierung von N Worker-Prozessen.
Jeder Worker läuft als Daemon und verarbeitet mehrere FO→PDF Transformationen nacheinander.
"""
import glob
import logging
import subprocess
import threading
import time
import psutil
from pathlib import Path
from typing import Optional
import tempfile
from worker_metrics import WorkerPoolMetrics
from worker_pool_base import BaseWorkerPool, _CLASSPATH_SEP
logger = logging.getLogger(__name__)
@@ -164,7 +160,7 @@ public class FopWorker {
"""
class FopWorkerPool:
class FopWorkerPool(BaseWorkerPool):
"""
Pool von lang-laufenden JVM-Prozessen für Apache FOP PDF-Generierung.
@@ -179,53 +175,19 @@ class FopWorkerPool:
fop_config_file: Optional[Path] = None,
log_dir: Optional[Path] = None,
):
"""
Initialisiert den FOP-Worker-Pool.
Args:
num_workers: Anzahl der Worker-Prozesse
java_vm_path: Pfad zur Java VM Binary
apache_fop_dir: Pfad zum Apache FOP-Verzeichnis
fop_config_file: Optionaler Pfad zur fop.xconf Konfigurationsdatei
log_dir: Optionales Verzeichnis für Worker-Logs (Standard: temp_dir/tmp)
"""
self.num_workers = num_workers
self.java_vm_path = java_vm_path
super().__init__(num_workers, java_vm_path, log_dir)
self.apache_fop_dir = apache_fop_dir
self.fop_config_file = fop_config_file
self.log_dir = log_dir
# Worker-Prozesse
self.workers: list[subprocess.Popen] = []
self.worker_locks: list[threading.Lock] = []
# Temporäres Verzeichnis für kompilierte Java-Klasse
self.temp_dir: Optional[Path] = None
self.worker_class_path: Optional[Path] = None
self.worker_log_dir: Optional[Path] = None
# Classpath für FOP
self.fop_classpath: Optional[str] = None
# Performance-Metriken
self.metrics = WorkerPoolMetrics()
# Initialisierung
self._build_fop_classpath()
self._compile_worker_class()
self._start_workers()
logger.info(f"FopWorkerPool initialisiert mit {num_workers} Workern")
def _build_fop_classpath(self):
"""Erstellt den Classpath für Apache FOP."""
import glob
import sys
# Sammle alle JAR-Dateien im FOP-Verzeichnis
all_jars = glob.glob(str(self.apache_fop_dir / "build" / "*.jar"))
# FOP lib-Verzeichnis
lib_dir = self.apache_fop_dir / "lib"
if lib_dir.exists() and lib_dir.is_dir():
all_jars.extend(glob.glob(str(lib_dir / "*.jar")))
@@ -233,120 +195,46 @@ class FopWorkerPool:
if not all_jars:
raise RuntimeError(f"Keine FOP JAR-Dateien gefunden in {self.apache_fop_dir}")
classpath_separator = ";" if sys.platform == "win32" else ":"
self.fop_classpath = classpath_separator.join(all_jars)
self.fop_classpath = _CLASSPATH_SEP.join(all_jars)
logger.debug(f"FOP Classpath: {len(all_jars)} JARs")
def _compile_worker_class(self):
"""Kompiliert die FopWorker-Java-Klasse."""
start_time = time.time()
try:
# Erstelle temporäres Verzeichnis
self.temp_dir = Path(tempfile.mkdtemp(prefix="fop_worker_"))
# --- Abstrakte Properties ---
# Schreibe Java-Quellcode
java_file = self.temp_dir / "FopWorker.java"
java_file.write_text(FOP_WORKER_JAVA, encoding="utf-8")
@property
def _pool_name(self) -> str:
return "FOP"
# Kompiliere Java-Klasse
javac_cmd = [
str(self.java_vm_path).replace("java", "javac"),
"-cp",
self.fop_classpath,
str(java_file),
]
@property
def _java_source_code(self) -> str:
return FOP_WORKER_JAVA
logger.debug(f"Kompiliere FopWorker: {' '.join(javac_cmd[:3])}...")
@property
def _java_class_name(self) -> str:
return "FopWorker"
result = subprocess.run(javac_cmd, capture_output=True, text=True, timeout=30)
@property
def _temp_dir_prefix(self) -> str:
return "fop_worker_"
if result.returncode != 0:
raise RuntimeError(f"Java-Kompilierung fehlgeschlagen: {result.stderr}")
@property
def _worker_init_sleep(self) -> float:
return 0.2 # FOP braucht etwas länger zum Initialisieren
self.worker_class_path = self.temp_dir
# --- Abstrakte Methoden ---
# Speichere Kompilierungszeit
self.metrics.compilation_time_seconds = time.time() - start_time
def _get_classpath(self) -> str:
return self.fop_classpath
logger.info(
f"FopWorker erfolgreich kompiliert: {self.temp_dir} " f"({self.metrics.compilation_time_seconds:.3f}s)"
)
def _build_worker_cmd(self, full_classpath: str) -> list[str]:
cmd = [str(self.java_vm_path), "-cp", full_classpath, "FopWorker"]
if self.fop_config_file and self.fop_config_file.exists():
cmd.append(str(self.fop_config_file))
return cmd
except Exception as e:
logger.error(f"Fehler beim Kompilieren von FopWorker: {e}")
raise
def _stderr_log_name(self, i: int) -> str:
return f"fop_worker_{i}_stderr.log"
def _start_workers(self):
"""Startet N Worker-Prozesse."""
import sys
# Füge Worker-Classpath zum FOP-Classpath hinzu
classpath_separator = ";" if sys.platform == "win32" else ":"
full_classpath = str(self.worker_class_path) + classpath_separator + self.fop_classpath
# Bestimme Log-Verzeichnis
self.worker_log_dir = self.log_dir if self.log_dir else self.temp_dir
if self.log_dir:
self.worker_log_dir.mkdir(parents=True, exist_ok=True)
for i in range(self.num_workers):
worker_start_time = time.time()
try:
# Starte JVM-Prozess mit FopWorker
# Übergebe fop.xconf als Argument falls vorhanden
cmd = [str(self.java_vm_path), "-cp", full_classpath, "FopWorker"]
if self.fop_config_file and self.fop_config_file.exists():
cmd.append(str(self.fop_config_file))
# Öffne stderr-Log-Datei für diesen Worker
stderr_log = self.worker_log_dir / f"fop_worker_{i}_stderr.log"
stderr_file = open(stderr_log, "w", encoding="utf-8")
process = subprocess.Popen(
cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=stderr_file, # Redirect stderr to file
text=True,
bufsize=1, # Line buffered
)
self.workers.append(process)
self.worker_locks.append(threading.Lock())
logger.debug(f"FOP Worker {i} gestartet (PID: {process.pid}, stderr: {stderr_log})")
# Warte kurz damit Worker initialisieren kann
time.sleep(0.2) # FOP braucht etwas länger zum Initialisieren
# Prüfe ob Worker noch läuft
if process.poll() is not None:
# Worker ist bereits beendet - Fehler!
stderr_file.close()
with open(stderr_log, "r") as f:
stderr_content = f.read()
raise RuntimeError(
f"FOP Worker {i} ist sofort beendet (Exit Code: {process.returncode})\nstderr:\n{stderr_content}"
)
# Speichere Worker-Startzeit
worker_elapsed = time.time() - worker_start_time
self.metrics.worker_start_times.append(worker_elapsed)
except Exception as e:
logger.error(f"Fehler beim Starten von FOP Worker {i}: {e}")
raise
# Berechne Aggregat-Werte für Worker-Startzeiten
self.metrics.calculate_aggregates()
logger.info(
f"{len(self.workers)} FOP-Worker erfolgreich gestartet "
f"(Summe: {self.metrics.total_worker_start_time_seconds:.3f}s, "
f"Durchschnitt: {self.metrics.average_worker_start_time_seconds:.3f}s)"
)
# --- FOP-spezifische Job-Methode ---
def build_pdf(self, input_fo: Path, output_pdf: Path) -> tuple[bool, str]:
"""
@@ -359,167 +247,36 @@ class FopWorkerPool:
Returns:
tuple[bool, str]: (Erfolg, Fehlermeldung/Info)
"""
# Finde freien Worker
worker_idx = None
for i, lock in enumerate(self.worker_locks):
if lock.acquire(blocking=False):
worker_idx = i
break
if worker_idx is None:
# Kein freier Worker, warte auf ersten verfügbaren
for i, lock in enumerate(self.worker_locks):
lock.acquire()
worker_idx = i
break
worker_idx = self._acquire_worker()
try:
worker = self.workers[worker_idx]
# Prüfe ob Worker noch läuft
if worker.poll() is not None:
# Worker ist tot!
stderr_log = self.worker_log_dir / f"fop_worker_{worker_idx}_stderr.log"
try:
with open(stderr_log, "r") as f:
stderr_content = f.read()
error_msg = (
f"FOP Worker {worker_idx} ist beendet (Exit: {worker.returncode})\nstderr:\n{stderr_content}"
)
except Exception:
error_msg = f"FOP Worker {worker_idx} ist beendet (Exit: {worker.returncode})"
stderr_content = self._read_stderr_log(worker_idx)
error_msg = f"FOP Worker {worker_idx} ist beendet (Exit: {worker.returncode})\nstderr:\n{stderr_content}"
logger.error(error_msg)
return False, error_msg
# Erstelle Job-String (Tab-separated)
job = f"{input_fo}\t{output_pdf}\n"
logger.debug(f"Sende FOP-Job an Worker {worker_idx}: {input_fo.name}{output_pdf.name}")
# Sende Job an Worker
worker.stdin.write(job)
worker.stdin.flush()
# Warte auf Antwort
response = worker.stdout.readline().strip()
logger.debug(f"FOP Worker {worker_idx} Antwort: '{response}'")
if response == "OK":
return True, "Erfolgreich"
elif response.startswith("ERROR:"):
error_msg = response[6:].strip()
return False, f"FOP-Fehler: {error_msg}"
return False, f"FOP-Fehler: {response[6:].strip()}"
elif not response:
stderr_content = self._read_stderr_log(worker_idx, tail=500)
return False, f"FOP Worker {worker_idx} crashed (keine Antwort)\nstderr:\n{stderr_content}"
else:
# Leere Antwort bedeutet Worker ist crashed
if not response:
stderr_log = self.worker_log_dir / f"fop_worker_{worker_idx}_stderr.log"
try:
with open(stderr_log, "r") as f:
stderr_content = f.read()[-500:] # Letzte 500 Zeichen
return False, f"FOP Worker {worker_idx} crashed (keine Antwort)\nstderr:\n{stderr_content}"
except Exception:
return False, f"FOP Worker {worker_idx} crashed (keine Antwort)"
return False, f"Unerwartete Antwort: {response}"
except Exception as e:
logger.error(f"Fehler bei FOP Worker {worker_idx}: {e}")
return False, f"Worker-Fehler: {str(e)}"
finally:
# Gebe Worker-Lock frei
self.worker_locks[worker_idx].release()
def measure_ram_usage(self) -> tuple[float, float, list[float]]:
"""
Misst den aktuellen RAM-Verbrauch aller Worker-Prozesse.
Returns:
tuple: (total_mb, average_mb, per_worker_mb_list)
"""
ram_per_worker = []
for i, worker in enumerate(self.workers):
try:
if worker.poll() is None: # Worker läuft noch
process = psutil.Process(worker.pid)
# Hole Speicherinfo (RSS = Resident Set Size in Bytes)
mem_info = process.memory_info()
ram_mb = mem_info.rss / (1024 * 1024) # Konvertiere zu MB
ram_per_worker.append(ram_mb)
else:
logger.warning(f"Worker {i} ist nicht mehr aktiv (kann RAM nicht messen)")
except (psutil.NoSuchProcess, psutil.AccessDenied) as e:
logger.warning(f"Konnte RAM für Worker {i} nicht messen: {e}")
total_ram = sum(ram_per_worker)
average_ram = total_ram / len(ram_per_worker) if ram_per_worker else 0.0
return total_ram, average_ram, ram_per_worker
def capture_ram_before_transform(self):
"""Erfasst RAM-Verbrauch vor der ersten Transformation."""
total, average, per_worker = self.measure_ram_usage()
self.metrics.ram_before_transform_mb_per_worker = per_worker
self.metrics.total_ram_before_mb = total
self.metrics.average_ram_before_mb = average
logger.info(
f"RAM vor Transformation: {self.metrics.total_ram_before_mb:.1f} MB "
f"(Durchschnitt: {self.metrics.average_ram_before_mb:.1f} MB/Worker)"
)
def capture_ram_after_transform(self):
"""Erfasst RAM-Verbrauch nach allen Transformationen."""
total, average, per_worker = self.measure_ram_usage()
self.metrics.ram_after_transform_mb_per_worker = per_worker
self.metrics.total_ram_after_mb = total
self.metrics.average_ram_after_mb = average
logger.info(
f"RAM nach Transformation: {self.metrics.total_ram_after_mb:.1f} MB "
f"(Durchschnitt: {self.metrics.average_ram_after_mb:.1f} MB/Worker)"
)
def shutdown(self):
"""Beendet alle Worker-Prozesse sauber."""
logger.info("Beende FOP-Worker-Pool...")
for i, worker in enumerate(self.workers):
try:
# Sende EXIT-Befehl
if worker.stdin and not worker.stdin.closed:
worker.stdin.write("EXIT\n")
worker.stdin.flush()
# Warte auf Beendigung (max 2 Sekunden)
worker.wait(timeout=2)
logger.debug(f"FOP Worker {i} beendet")
except subprocess.TimeoutExpired:
# Force kill falls nötig
worker.kill()
logger.warning(f"FOP Worker {i} musste gekillt werden")
except Exception as e:
logger.error(f"Fehler beim Beenden von FOP Worker {i}: {e}")
# Lösche temporäres Verzeichnis
if self.temp_dir and self.temp_dir.exists():
try:
import shutil
shutil.rmtree(self.temp_dir)
logger.debug(f"Temporäres Verzeichnis gelöscht: {self.temp_dir}")
except Exception as e:
logger.warning(f"Konnte temporäres Verzeichnis nicht löschen: {e}")
logger.info("FOP-Worker-Pool beendet")
def __enter__(self):
"""Context manager entry."""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit."""
self.shutdown()
+38 -285
View File
@@ -6,15 +6,10 @@ Jeder Worker läuft als Daemon und verarbeitet mehrere Transformationen nacheina
"""
import logging
import subprocess
import threading
import time
import psutil
from pathlib import Path
from typing import Optional
import tempfile
from worker_metrics import WorkerPoolMetrics
from worker_pool_base import BaseWorkerPool, build_jar_classpath
logger = logging.getLogger(__name__)
@@ -175,9 +170,9 @@ public class SaxonWorker {
"""
class SaxonWorkerPool:
class SaxonWorkerPool(BaseWorkerPool):
"""
Pool von lang-laufenden JVM-Prozessen für Saxon-Transformationen.
Pool von lang-laufenden JVM-Prozessen für Saxon-Transformationen (JAXP/XSLT 1.0).
Eliminiert JVM-Startup-Overhead durch Wiederverwendung von N Worker-Prozessen.
"""
@@ -190,161 +185,51 @@ class SaxonWorkerPool:
classpath_cache: dict[Path, str],
log_dir: Optional[Path] = None,
):
"""
Initialisiert den Saxon-Worker-Pool.
Args:
num_workers: Anzahl der Worker-Prozesse
java_vm_path: Pfad zur Java VM Binary
saxon_jar_path: Pfad zur Saxon JAR-Datei
classpath_cache: Cache für Saxon-Classpaths
log_dir: Optionales Verzeichnis für Worker-Logs (Standard: temp_dir/tmp)
"""
self.num_workers = num_workers
self.java_vm_path = java_vm_path
super().__init__(num_workers, java_vm_path, log_dir)
self.saxon_jar_path = saxon_jar_path
self.classpath_cache = classpath_cache
self.log_dir = log_dir
# Worker-Prozesse
self.workers: list[subprocess.Popen] = []
self.worker_locks: list[threading.Lock] = []
# Temporäres Verzeichnis für kompilierte Java-Klasse
self.temp_dir: Optional[Path] = None
self.worker_class_path: Optional[Path] = None
self.worker_log_dir: Optional[Path] = None
# Performance-Metriken
self.metrics = WorkerPoolMetrics()
# Initialisierung
self._compile_worker_class()
self._start_workers()
logger.info(f"SaxonWorkerPool initialisiert mit {num_workers} Workern")
def _compile_worker_class(self):
"""Kompiliert die SaxonWorker-Java-Klasse."""
start_time = time.time()
try:
# Erstelle temporäres Verzeichnis
self.temp_dir = Path(tempfile.mkdtemp(prefix="saxon_worker_"))
# --- Abstrakte Properties ---
# Schreibe Java-Quellcode
java_file = self.temp_dir / "SaxonWorker.java"
java_file.write_text(SAXON_WORKER_JAVA, encoding="utf-8")
@property
def _pool_name(self) -> str:
return "Saxon"
# Hole Classpath
saxon_dir = self.saxon_jar_path.parent
if saxon_dir in self.classpath_cache:
classpath = self.classpath_cache[saxon_dir]
else:
# Fallback: Baue Classpath neu
import glob
import sys
@property
def _java_source_code(self) -> str:
return SAXON_WORKER_JAVA
all_jars = glob.glob(str(saxon_dir / "*.jar"))
lib_dir = saxon_dir / "lib"
if lib_dir.exists():
all_jars.extend(glob.glob(str(lib_dir / "*.jar")))
@property
def _java_class_name(self) -> str:
return "SaxonWorker"
classpath_separator = ";" if sys.platform == "win32" else ":"
classpath = classpath_separator.join(all_jars)
@property
def _temp_dir_prefix(self) -> str:
return "saxon_worker_"
# Kompiliere Java-Klasse
javac_cmd = [str(self.java_vm_path).replace("java", "javac"), "-cp", classpath, str(java_file)]
@property
def _worker_init_sleep(self) -> float:
return 0.1
logger.debug(f"Kompiliere SaxonWorker: {' '.join(javac_cmd)}")
# --- Abstrakte Methoden ---
result = subprocess.run(javac_cmd, capture_output=True, text=True, timeout=30)
if result.returncode != 0:
raise RuntimeError(f"Java-Kompilierung fehlgeschlagen: {result.stderr}")
self.worker_class_path = self.temp_dir
# Speichere Kompilierungszeit
self.metrics.compilation_time_seconds = time.time() - start_time
logger.info(
f"SaxonWorker erfolgreich kompiliert: {self.temp_dir} " f"({self.metrics.compilation_time_seconds:.3f}s)"
)
except Exception as e:
logger.error(f"Fehler beim Kompilieren von SaxonWorker: {e}")
raise
def _start_workers(self):
"""Startet N Worker-Prozesse."""
# Hole Classpath
def _get_classpath(self) -> str:
saxon_dir = self.saxon_jar_path.parent
classpath = self.classpath_cache.get(saxon_dir, "")
if saxon_dir not in self.classpath_cache:
self.classpath_cache[saxon_dir] = build_jar_classpath(saxon_dir)
return self.classpath_cache[saxon_dir]
# Füge Worker-Classpath hinzu
import sys
def _build_worker_cmd(self, full_classpath: str) -> list[str]:
return [str(self.java_vm_path), "-cp", full_classpath, "SaxonWorker"]
classpath_separator = ";" if sys.platform == "win32" else ":"
full_classpath = str(self.worker_class_path) + classpath_separator + classpath
def _stderr_log_name(self, i: int) -> str:
return f"worker_{i}_stderr.log"
# Bestimme Log-Verzeichnis
self.worker_log_dir = self.log_dir if self.log_dir else self.temp_dir
if self.log_dir:
self.worker_log_dir.mkdir(parents=True, exist_ok=True)
for i in range(self.num_workers):
worker_start_time = time.time()
try:
# Starte JVM-Prozess mit SaxonWorker
cmd = [str(self.java_vm_path), "-cp", full_classpath, "SaxonWorker"]
# Öffne stderr-Log-Datei für diesen Worker
stderr_log = self.worker_log_dir / f"worker_{i}_stderr.log"
stderr_file = open(stderr_log, "w", encoding="utf-8")
process = subprocess.Popen(
cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=stderr_file, # Redirect stderr to file
text=True,
bufsize=1, # Line buffered
)
self.workers.append(process)
self.worker_locks.append(threading.Lock())
logger.debug(f"Worker {i} gestartet (PID: {process.pid}, stderr: {stderr_log})")
# Warte kurz damit Worker initialisieren kann
time.sleep(0.1)
# Prüfe ob Worker noch läuft
if process.poll() is not None:
# Worker ist bereits beendet - Fehler!
stderr_file.close()
with open(stderr_log, "r") as f:
stderr_content = f.read()
raise RuntimeError(
f"Worker {i} ist sofort beendet (Exit Code: {process.returncode})\nstderr:\n{stderr_content}"
)
# Speichere Worker-Startzeit
worker_elapsed = time.time() - worker_start_time
self.metrics.worker_start_times.append(worker_elapsed)
except Exception as e:
logger.error(f"Fehler beim Starten von Worker {i}: {e}")
raise
# Berechne Aggregat-Werte für Worker-Startzeiten
self.metrics.calculate_aggregates()
logger.info(
f"{len(self.workers)} Saxon-Worker erfolgreich gestartet "
f"(Summe: {self.metrics.total_worker_start_time_seconds:.3f}s, "
f"Durchschnitt: {self.metrics.average_worker_start_time_seconds:.3f}s)"
)
# --- Saxon-spezifische Job-Methode ---
def transform(
self, source_xml: Path, xsl_stylesheet: Path, output_fo: Path, xslt_params: dict[str, str]
@@ -361,170 +246,38 @@ class SaxonWorkerPool:
Returns:
tuple[bool, str]: (Erfolg, Fehlermeldung/Info)
"""
# Finde freien Worker
worker_idx = None
for i, lock in enumerate(self.worker_locks):
if lock.acquire(blocking=False):
worker_idx = i
break
if worker_idx is None:
# Kein freier Worker, warte auf ersten verfügbaren
for i, lock in enumerate(self.worker_locks):
lock.acquire()
worker_idx = i
break
worker_idx = self._acquire_worker()
try:
worker = self.workers[worker_idx]
# Prüfe ob Worker noch läuft
if worker.poll() is not None:
# Worker ist tot!
stderr_log = self.worker_log_dir / f"worker_{worker_idx}_stderr.log"
try:
with open(stderr_log, "r") as f:
stderr_content = f.read()
error_msg = (
f"Worker {worker_idx} ist beendet (Exit: {worker.returncode})\nstderr:\n{stderr_content}"
)
except Exception:
error_msg = f"Worker {worker_idx} ist beendet (Exit: {worker.returncode})"
stderr_content = self._read_stderr_log(worker_idx)
error_msg = f"Worker {worker_idx} ist beendet (Exit: {worker.returncode})\nstderr:\n{stderr_content}"
logger.error(error_msg)
return False, error_msg
# Formatiere Parameter
params_str = "|||".join([f"{key}={value}" for key, value in xslt_params.items()])
# Erstelle Job-String (Tab-separated)
params_str = "|||".join([f"{k}={v}" for k, v in xslt_params.items()])
job = f"{source_xml}\t{xsl_stylesheet}\t{output_fo}\t{params_str}\n"
logger.debug(f"Sende Job an Worker {worker_idx}: {source_xml.name}")
# Sende Job an Worker
worker.stdin.write(job)
worker.stdin.flush()
# Warte auf Antwort
response = worker.stdout.readline().strip()
logger.debug(f"Worker {worker_idx} Antwort: '{response}'")
if response == "OK":
return True, "Erfolgreich"
elif response.startswith("ERROR:"):
error_msg = response[6:].strip()
return False, f"Saxon-Fehler: {error_msg}"
return False, f"Saxon-Fehler: {response[6:].strip()}"
elif not response:
stderr_content = self._read_stderr_log(worker_idx, tail=500)
return False, f"Worker {worker_idx} crashed (keine Antwort)\nstderr:\n{stderr_content}"
else:
# Leere Antwort bedeutet Worker ist crashed
if not response:
stderr_log = self.worker_log_dir / f"worker_{worker_idx}_stderr.log"
try:
with open(stderr_log, "r") as f:
stderr_content = f.read()[-500:] # Letzte 500 Zeichen
return False, f"Worker {worker_idx} crashed (keine Antwort)\nstderr:\n{stderr_content}"
except Exception:
return False, f"Worker {worker_idx} crashed (keine Antwort)"
return False, f"Unerwartete Antwort: {response}"
except Exception as e:
logger.error(f"Fehler bei Worker {worker_idx}: {e}")
return False, f"Worker-Fehler: {str(e)}"
finally:
# Gebe Worker-Lock frei
self.worker_locks[worker_idx].release()
def measure_ram_usage(self) -> tuple[float, float, list[float]]:
"""
Misst den aktuellen RAM-Verbrauch aller Worker-Prozesse.
Returns:
tuple: (total_mb, average_mb, per_worker_mb_list)
"""
ram_per_worker = []
for i, worker in enumerate(self.workers):
try:
if worker.poll() is None: # Worker läuft noch
process = psutil.Process(worker.pid)
# Hole Speicherinfo (RSS = Resident Set Size in Bytes)
mem_info = process.memory_info()
ram_mb = mem_info.rss / (1024 * 1024) # Konvertiere zu MB
ram_per_worker.append(ram_mb)
else:
logger.warning(f"Worker {i} ist nicht mehr aktiv (kann RAM nicht messen)")
except (psutil.NoSuchProcess, psutil.AccessDenied) as e:
logger.warning(f"Konnte RAM für Worker {i} nicht messen: {e}")
total_ram = sum(ram_per_worker)
average_ram = total_ram / len(ram_per_worker) if ram_per_worker else 0.0
return total_ram, average_ram, ram_per_worker
def capture_ram_before_transform(self):
"""Erfasst RAM-Verbrauch vor der ersten Transformation."""
total, average, per_worker = self.measure_ram_usage()
self.metrics.ram_before_transform_mb_per_worker = per_worker
self.metrics.total_ram_before_mb = total
self.metrics.average_ram_before_mb = average
logger.info(
f"RAM vor Transformation: {self.metrics.total_ram_before_mb:.1f} MB "
f"(Durchschnitt: {self.metrics.average_ram_before_mb:.1f} MB/Worker)"
)
def capture_ram_after_transform(self):
"""Erfasst RAM-Verbrauch nach allen Transformationen."""
total, average, per_worker = self.measure_ram_usage()
self.metrics.ram_after_transform_mb_per_worker = per_worker
self.metrics.total_ram_after_mb = total
self.metrics.average_ram_after_mb = average
logger.info(
f"RAM nach Transformation: {self.metrics.total_ram_after_mb:.1f} MB "
f"(Durchschnitt: {self.metrics.average_ram_after_mb:.1f} MB/Worker)"
)
def shutdown(self):
"""Beendet alle Worker-Prozesse sauber."""
logger.info("Beende Saxon-Worker-Pool...")
for i, worker in enumerate(self.workers):
try:
# Sende EXIT-Befehl
if worker.stdin and not worker.stdin.closed:
worker.stdin.write("EXIT\n")
worker.stdin.flush()
# Warte auf Beendigung (max 2 Sekunden)
worker.wait(timeout=2)
logger.debug(f"Worker {i} beendet")
except subprocess.TimeoutExpired:
# Force kill falls nötig
worker.kill()
logger.warning(f"Worker {i} musste gekillt werden")
except Exception as e:
logger.error(f"Fehler beim Beenden von Worker {i}: {e}")
# Lösche temporäres Verzeichnis
if self.temp_dir and self.temp_dir.exists():
try:
import shutil
shutil.rmtree(self.temp_dir)
logger.debug(f"Temporäres Verzeichnis gelöscht: {self.temp_dir}")
except Exception as e:
logger.warning(f"Konnte temporäres Verzeichnis nicht löschen: {e}")
logger.info("Saxon-Worker-Pool beendet")
def __enter__(self):
"""Context manager entry."""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit."""
self.shutdown()
+39 -303
View File
@@ -7,15 +7,10 @@ Jeder Worker läuft als Daemon und verarbeitet mehrere Transformationen nacheina
"""
import logging
import subprocess
import threading
import time
import psutil
from pathlib import Path
from typing import Optional
import tempfile
from worker_metrics import WorkerPoolMetrics
from worker_pool_base import BaseWorkerPool, build_jar_classpath
logger = logging.getLogger(__name__)
@@ -155,7 +150,7 @@ public class SaxonS9ApiWorker {
"""
class SaxonWorkerPoolS9Api:
class SaxonWorkerPoolS9Api(BaseWorkerPool):
"""
Pool von lang-laufenden JVM-Prozessen für Saxon-Transformationen mit s9api.
@@ -171,181 +166,52 @@ class SaxonWorkerPoolS9Api:
classpath_cache: dict[Path, str],
log_dir: Optional[Path] = None,
):
"""
Initialisiert den Saxon-Worker-Pool mit s9api.
Args:
num_workers: Anzahl der Worker-Prozesse
java_vm_path: Pfad zur Java VM Binary
saxon_jar_path: Pfad zur Saxon JAR-Datei
classpath_cache: Cache für Saxon-Classpaths
log_dir: Optionales Verzeichnis für Worker-Logs (Standard: temp_dir/tmp)
"""
self.num_workers = num_workers
self.java_vm_path = java_vm_path
super().__init__(num_workers, java_vm_path, log_dir)
self.saxon_jar_path = saxon_jar_path
self.classpath_cache = classpath_cache
self.log_dir = log_dir
# Worker-Prozesse
self.workers: list[subprocess.Popen] = []
self.worker_locks: list[threading.Lock] = []
# Temporäres Verzeichnis für kompilierte Java-Klasse
self.temp_dir: Optional[Path] = None
self.worker_class_path: Optional[Path] = None
self.worker_log_dir: Optional[Path] = None
# Performance-Metriken
self.metrics = WorkerPoolMetrics()
# Initialisierung
self._compile_worker_class()
self._start_workers()
logger.info(f"SaxonWorkerPoolS9Api initialisiert mit {num_workers} Workern (XSLT 2.0/3.0)")
def _compile_worker_class(self):
"""Kompiliert die SaxonS9ApiWorker-Java-Klasse."""
start_time = time.time()
try:
# Erstelle temporäres Verzeichnis
self.temp_dir = Path(tempfile.mkdtemp(prefix="saxon_s9api_worker_"))
# --- Abstrakte Properties ---
# Schreibe Java-Quellcode
java_file = self.temp_dir / "SaxonS9ApiWorker.java"
java_file.write_text(SAXON_S9API_WORKER_JAVA, encoding="utf-8")
@property
def _pool_name(self) -> str:
return "Saxon-S9Api"
# Hole Classpath
saxon_dir = self.saxon_jar_path.parent
if saxon_dir in self.classpath_cache:
classpath = self.classpath_cache[saxon_dir]
else:
# Fallback: Baue Classpath neu
import glob
import sys
@property
def _java_source_code(self) -> str:
return SAXON_S9API_WORKER_JAVA
all_jars = glob.glob(str(saxon_dir / "*.jar"))
lib_dir = saxon_dir / "lib"
if lib_dir.exists():
all_jars.extend(glob.glob(str(lib_dir / "*.jar")))
@property
def _java_class_name(self) -> str:
return "SaxonS9ApiWorker"
classpath_separator = ";" if sys.platform == "win32" else ":"
classpath = classpath_separator.join(all_jars)
@property
def _temp_dir_prefix(self) -> str:
return "saxon_s9api_worker_"
# Kompiliere Java-Klasse
javac_cmd = [str(self.java_vm_path).replace("java", "javac"), "-cp", classpath, str(java_file)]
@property
def _worker_init_sleep(self) -> float:
return 0.1
logger.debug(f"Kompiliere SaxonS9ApiWorker: {' '.join(javac_cmd)}")
# --- Abstrakte Methoden ---
result = subprocess.run(javac_cmd, capture_output=True, text=True, timeout=30)
if result.returncode != 0:
raise RuntimeError(f"Java-Kompilierung fehlgeschlagen: {result.stderr}")
self.worker_class_path = self.temp_dir
# Speichere Kompilierungszeit
self.metrics.compilation_time_seconds = time.time() - start_time
logger.info(
f"SaxonS9ApiWorker erfolgreich kompiliert: {self.temp_dir} "
f"({self.metrics.compilation_time_seconds:.3f}s)"
)
except Exception as e:
logger.error(f"Fehler beim Kompilieren von SaxonS9ApiWorker: {e}")
raise
def _start_workers(self):
"""Startet N Worker-Prozesse."""
# Hole Classpath
def _get_classpath(self) -> str:
saxon_dir = self.saxon_jar_path.parent
if saxon_dir in self.classpath_cache:
classpath = self.classpath_cache[saxon_dir]
else:
# Fallback: Baue Classpath neu (sollte nicht nötig sein, aber zur Sicherheit)
import glob
import sys
all_jars = glob.glob(str(saxon_dir / "*.jar"))
lib_dir = saxon_dir / "lib"
if lib_dir.exists():
all_jars.extend(glob.glob(str(lib_dir / "*.jar")))
classpath_separator = ";" if sys.platform == "win32" else ":"
classpath = classpath_separator.join(all_jars)
# Cache für zukünftige Verwendung
self.classpath_cache[saxon_dir] = classpath
if saxon_dir not in self.classpath_cache:
self.classpath_cache[saxon_dir] = build_jar_classpath(saxon_dir)
logger.debug(f"Classpath für {saxon_dir} neu erstellt und gecacht")
return self.classpath_cache[saxon_dir]
# Füge Worker-Classpath hinzu
import sys
def _build_worker_cmd(self, full_classpath: str) -> list[str]:
return [str(self.java_vm_path), "-cp", full_classpath, "SaxonS9ApiWorker"]
classpath_separator = ";" if sys.platform == "win32" else ":"
full_classpath = str(self.worker_class_path) + classpath_separator + classpath
def _stderr_log_name(self, i: int) -> str:
return f"s9api_worker_{i}_stderr.log"
logger.debug(f"S9Api Worker Classpath: {full_classpath[:200]}...")
# Bestimme Log-Verzeichnis
self.worker_log_dir = self.log_dir if self.log_dir else self.temp_dir
if self.log_dir:
self.worker_log_dir.mkdir(parents=True, exist_ok=True)
for i in range(self.num_workers):
worker_start_time = time.time()
try:
# Starte JVM-Prozess mit SaxonS9ApiWorker
cmd = [str(self.java_vm_path), "-cp", full_classpath, "SaxonS9ApiWorker"]
# Öffne stderr-Log-Datei für diesen Worker
stderr_log = self.worker_log_dir / f"s9api_worker_{i}_stderr.log"
stderr_file = open(stderr_log, "w", encoding="utf-8")
process = subprocess.Popen(
cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=stderr_file, # Redirect stderr to file
text=True,
bufsize=1, # Line buffered
)
self.workers.append(process)
self.worker_locks.append(threading.Lock())
logger.debug(f"S9Api Worker {i} gestartet (PID: {process.pid}, stderr: {stderr_log})")
# Warte kurz damit Worker initialisieren kann
time.sleep(0.1)
# Prüfe ob Worker noch läuft
if process.poll() is not None:
# Worker ist bereits beendet - Fehler!
stderr_file.close()
with open(stderr_log, "r") as f:
stderr_content = f.read()
raise RuntimeError(
f"S9Api Worker {i} ist sofort beendet (Exit Code: {process.returncode})\nstderr:\n{stderr_content}"
)
# Speichere Worker-Startzeit
worker_elapsed = time.time() - worker_start_time
self.metrics.worker_start_times.append(worker_elapsed)
except Exception as e:
logger.error(f"Fehler beim Starten von S9Api Worker {i}: {e}")
raise
# Berechne Aggregat-Werte für Worker-Startzeiten
self.metrics.calculate_aggregates()
logger.info(
f"{len(self.workers)} Saxon-S9Api-Worker erfolgreich gestartet "
f"(Summe: {self.metrics.total_worker_start_time_seconds:.3f}s, "
f"Durchschnitt: {self.metrics.average_worker_start_time_seconds:.3f}s)"
)
# --- Saxon-s9api-spezifische Job-Methode ---
def transform(
self, source_xml: Path, xsl_stylesheet: Path, output_fo: Path, xslt_params: dict[str, str]
@@ -362,170 +228,40 @@ class SaxonWorkerPoolS9Api:
Returns:
tuple[bool, str]: (Erfolg, Fehlermeldung/Info)
"""
# Finde freien Worker
worker_idx = None
for i, lock in enumerate(self.worker_locks):
if lock.acquire(blocking=False):
worker_idx = i
break
if worker_idx is None:
# Kein freier Worker, warte auf ersten verfügbaren
for i, lock in enumerate(self.worker_locks):
lock.acquire()
worker_idx = i
break
worker_idx = self._acquire_worker()
try:
worker = self.workers[worker_idx]
# Prüfe ob Worker noch läuft
if worker.poll() is not None:
# Worker ist tot!
stderr_log = self.worker_log_dir / f"s9api_worker_{worker_idx}_stderr.log"
try:
with open(stderr_log, "r") as f:
stderr_content = f.read()
error_msg = (
f"S9Api Worker {worker_idx} ist beendet (Exit: {worker.returncode})\nstderr:\n{stderr_content}"
)
except Exception:
error_msg = f"S9Api Worker {worker_idx} ist beendet (Exit: {worker.returncode})"
stderr_content = self._read_stderr_log(worker_idx)
error_msg = (
f"S9Api Worker {worker_idx} ist beendet (Exit: {worker.returncode})\nstderr:\n{stderr_content}"
)
logger.error(error_msg)
return False, error_msg
# Formatiere Parameter
params_str = "|||".join([f"{key}={value}" for key, value in xslt_params.items()])
# Erstelle Job-String (Tab-separated)
params_str = "|||".join([f"{k}={v}" for k, v in xslt_params.items()])
job = f"{source_xml}\t{xsl_stylesheet}\t{output_fo}\t{params_str}\n"
logger.debug(f"Sende Job an S9Api Worker {worker_idx}: {source_xml.name}")
# Sende Job an Worker
worker.stdin.write(job)
worker.stdin.flush()
# Warte auf Antwort
response = worker.stdout.readline().strip()
logger.debug(f"S9Api Worker {worker_idx} Antwort: '{response}'")
if response == "OK":
return True, "Erfolgreich"
elif response.startswith("ERROR:"):
error_msg = response[6:].strip()
return False, f"Saxon-Fehler (s9api): {error_msg}"
return False, f"Saxon-Fehler (s9api): {response[6:].strip()}"
elif not response:
stderr_content = self._read_stderr_log(worker_idx, tail=500)
return False, f"S9Api Worker {worker_idx} crashed (keine Antwort)\nstderr:\n{stderr_content}"
else:
# Leere Antwort bedeutet Worker ist crashed
if not response:
stderr_log = self.worker_log_dir / f"s9api_worker_{worker_idx}_stderr.log"
try:
with open(stderr_log, "r") as f:
stderr_content = f.read()[-500:] # Letzte 500 Zeichen
return False, f"S9Api Worker {worker_idx} crashed (keine Antwort)\nstderr:\n{stderr_content}"
except Exception:
return False, f"S9Api Worker {worker_idx} crashed (keine Antwort)"
return False, f"Unerwartete Antwort: {response}"
except Exception as e:
logger.error(f"Fehler bei S9Api Worker {worker_idx}: {e}")
return False, f"Worker-Fehler: {str(e)}"
finally:
# Gebe Worker-Lock frei
self.worker_locks[worker_idx].release()
def measure_ram_usage(self) -> tuple[float, float, list[float]]:
"""
Misst den aktuellen RAM-Verbrauch aller Worker-Prozesse.
Returns:
tuple: (total_mb, average_mb, per_worker_mb_list)
"""
ram_per_worker = []
for i, worker in enumerate(self.workers):
try:
if worker.poll() is None: # Worker läuft noch
process = psutil.Process(worker.pid)
# Hole Speicherinfo (RSS = Resident Set Size in Bytes)
mem_info = process.memory_info()
ram_mb = mem_info.rss / (1024 * 1024) # Konvertiere zu MB
ram_per_worker.append(ram_mb)
else:
logger.warning(f"Worker {i} ist nicht mehr aktiv (kann RAM nicht messen)")
except (psutil.NoSuchProcess, psutil.AccessDenied) as e:
logger.warning(f"Konnte RAM für Worker {i} nicht messen: {e}")
total_ram = sum(ram_per_worker)
average_ram = total_ram / len(ram_per_worker) if ram_per_worker else 0.0
return total_ram, average_ram, ram_per_worker
def capture_ram_before_transform(self):
"""Erfasst RAM-Verbrauch vor der ersten Transformation."""
total, average, per_worker = self.measure_ram_usage()
self.metrics.ram_before_transform_mb_per_worker = per_worker
self.metrics.total_ram_before_mb = total
self.metrics.average_ram_before_mb = average
logger.info(
f"RAM vor Transformation: {self.metrics.total_ram_before_mb:.1f} MB "
f"(Durchschnitt: {self.metrics.average_ram_before_mb:.1f} MB/Worker)"
)
def capture_ram_after_transform(self):
"""Erfasst RAM-Verbrauch nach allen Transformationen."""
total, average, per_worker = self.measure_ram_usage()
self.metrics.ram_after_transform_mb_per_worker = per_worker
self.metrics.total_ram_after_mb = total
self.metrics.average_ram_after_mb = average
logger.info(
f"RAM nach Transformation: {self.metrics.total_ram_after_mb:.1f} MB "
f"(Durchschnitt: {self.metrics.average_ram_after_mb:.1f} MB/Worker)"
)
def shutdown(self):
"""Beendet alle Worker-Prozesse sauber."""
logger.info("Beende Saxon-S9Api-Worker-Pool...")
for i, worker in enumerate(self.workers):
try:
# Sende EXIT-Befehl
if worker.stdin and not worker.stdin.closed:
worker.stdin.write("EXIT\n")
worker.stdin.flush()
# Warte auf Beendigung (max 2 Sekunden)
worker.wait(timeout=2)
logger.debug(f"S9Api Worker {i} beendet")
except subprocess.TimeoutExpired:
# Force kill falls nötig
worker.kill()
logger.warning(f"S9Api Worker {i} musste gekillt werden")
except Exception as e:
logger.error(f"Fehler beim Beenden von S9Api Worker {i}: {e}")
# Lösche temporäres Verzeichnis
if self.temp_dir and self.temp_dir.exists():
try:
import shutil
shutil.rmtree(self.temp_dir)
logger.debug(f"Temporäres Verzeichnis gelöscht: {self.temp_dir}")
except Exception as e:
logger.warning(f"Konnte temporäres Verzeichnis nicht löschen: {e}")
logger.info("Saxon-S9Api-Worker-Pool beendet")
def __enter__(self):
"""Context manager entry."""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit."""
self.shutdown()
+4 -156
View File
@@ -1,161 +1,9 @@
from PySide6.QtWidgets import QDialog, QTableWidgetItem, QMessageBox
from PySide6.QtCore import Qt
from ui.TreeNodeEditDialog_ui import Ui_TreeNodeEditDialog
from ui.XsltParamsEditDialog import XsltParamsEditDialog
class TreeNodeEditDialog(QDialog):
class TreeNodeEditDialog(XsltParamsEditDialog):
"""Dialog zur Bearbeitung von TreeNode-Objekten."""
def __init__(self, parent=None, node=None, parent_params=None):
"""
Initialisiert den Dialog.
Args:
parent: Übergeordnetes Widget
node: TreeNode-Objekt zum Bearbeiten
parent_params: Dictionary mit Eltern-Parametern (nur anzeigen)
"""
super().__init__(parent)
# UI einrichten
self.ui = Ui_TreeNodeEditDialog()
self.ui.setupUi(self)
# Node-Objekt speichern
self.node = node
self.parent_params = parent_params or {}
# Signale verbinden
self.ui.addParamButton.clicked.connect(self.add_parameter)
self.ui.removeParamButton.clicked.connect(self.remove_parameter)
# Tabellen konfigurieren
self._setup_tables()
# Daten laden
if self.node:
self._load_data()
def _setup_tables(self):
"""Konfiguriert die Tabellen."""
# XSLT Parameter Tabelle
self.ui.xsltParamsTable.setColumnWidth(0, 200)
self.ui.xsltParamsTable.setColumnWidth(1, 300)
self.ui.xsltParamsTable.horizontalHeader().setStretchLastSection(True)
# Eltern-Parameter Tabelle
self.ui.parentParamsTable.setColumnWidth(0, 200)
self.ui.parentParamsTable.setColumnWidth(1, 300)
self.ui.parentParamsTable.horizontalHeader().setStretchLastSection(True)
def _load_data(self):
"""Lädt die Daten des TreeNode in den Dialog."""
if not self.node:
return
# Bezeichnung setzen
self.ui.bezEdit.setText(str(self.node.bez) if self.node.bez else "")
# XSLT Parameter laden
self._load_xslt_params()
# Eltern-Parameter laden
self._load_parent_params()
def _load_xslt_params(self):
"""Lädt die XSLT Parameter in die Tabelle."""
if not self.node or not self.node.xslt_params:
return
params = self.node.xslt_params
self.ui.xsltParamsTable.setRowCount(len(params))
for row, (key, value) in enumerate(params.items()):
# Parameter-Name
key_item = QTableWidgetItem(str(key))
self.ui.xsltParamsTable.setItem(row, 0, key_item)
# Parameter-Wert
value_item = QTableWidgetItem(str(value))
self.ui.xsltParamsTable.setItem(row, 1, value_item)
def _load_parent_params(self):
"""Lädt die Eltern-Parameter in die Tabelle (nur anzeigen)."""
if not self.parent_params:
return
self.ui.parentParamsTable.setRowCount(len(self.parent_params))
for row, (key, value) in enumerate(self.parent_params.items()):
# Parameter-Name
key_item = QTableWidgetItem(str(key))
key_item.setFlags(key_item.flags() & ~Qt.ItemFlag.ItemIsEditable)
self.ui.parentParamsTable.setItem(row, 0, key_item)
# Parameter-Wert
value_item = QTableWidgetItem(str(value))
value_item.setFlags(value_item.flags() & ~Qt.ItemFlag.ItemIsEditable)
self.ui.parentParamsTable.setItem(row, 1, value_item)
def add_parameter(self):
"""Fügt einen neuen Parameter hinzu."""
row_count = self.ui.xsltParamsTable.rowCount()
self.ui.xsltParamsTable.insertRow(row_count)
# Leere Items hinzufügen
key_item = QTableWidgetItem("")
value_item = QTableWidgetItem("")
self.ui.xsltParamsTable.setItem(row_count, 0, key_item)
self.ui.xsltParamsTable.setItem(row_count, 1, value_item)
# Fokus auf den neuen Parameter setzen
self.ui.xsltParamsTable.setCurrentCell(row_count, 0)
def remove_parameter(self):
"""Entfernt den ausgewählten Parameter."""
current_row = self.ui.xsltParamsTable.currentRow()
if current_row >= 0:
self.ui.xsltParamsTable.removeRow(current_row)
def get_data(self):
"""
Gibt die bearbeiteten Daten zurück.
Returns:
dict: Dictionary mit den bearbeiteten Daten oder None bei Fehler
"""
# Bezeichnung prüfen
bez = self.ui.bezEdit.text().strip()
if not bez:
QMessageBox.warning(self, "Warnung", "Bitte geben Sie eine Bezeichnung ein.")
return None
# XSLT Parameter sammeln
xslt_params = {}
for row in range(self.ui.xsltParamsTable.rowCount()):
key_item = self.ui.xsltParamsTable.item(row, 0)
value_item = self.ui.xsltParamsTable.item(row, 1)
if key_item and value_item:
key = key_item.text().strip()
value = value_item.text().strip()
if key: # Nur Parameter mit nicht-leerem Schlüssel hinzufügen
xslt_params[key] = value
# CheckBox für Force-Transformation prüfen
force_transform = self.ui.alle_xml_transformieren.isChecked()
return {
"bez": bez,
"xslt_params": xslt_params,
"force_transform": force_transform
}
def accept(self):
"""Überschreibt accept() um Datenvalidierung durchzuführen."""
data = self.get_data()
if data is not None:
super().accept()
def _create_ui(self):
return Ui_TreeNodeEditDialog()
+4 -156
View File
@@ -1,161 +1,9 @@
from PySide6.QtWidgets import QDialog, QTableWidgetItem, QMessageBox
from PySide6.QtCore import Qt
from ui.XslFileEditDialog_ui import Ui_XslFileEditDialog
from ui.XsltParamsEditDialog import XsltParamsEditDialog
class XslFileEditDialog(QDialog):
class XslFileEditDialog(XsltParamsEditDialog):
"""Dialog zur Bearbeitung von XslFile-Objekten."""
def __init__(self, parent=None, node=None, parent_params=None):
"""
Initialisiert den Dialog.
Args:
parent: Übergeordnetes Widget
node: XslFile-Objekt zum Bearbeiten
parent_params: Dictionary mit Eltern-Parametern (nur anzeigen)
"""
super().__init__(parent)
# UI einrichten
self.ui = Ui_XslFileEditDialog()
self.ui.setupUi(self)
# Node-Objekt speichern
self.node = node
self.parent_params = parent_params or {}
# Signale verbinden
self.ui.addParamButton.clicked.connect(self.add_parameter)
self.ui.removeParamButton.clicked.connect(self.remove_parameter)
# Tabellen konfigurieren
self._setup_tables()
# Daten laden
if self.node:
self._load_data()
def _setup_tables(self):
"""Konfiguriert die Tabellen."""
# XSLT Parameter Tabelle
self.ui.xsltParamsTable.setColumnWidth(0, 200)
self.ui.xsltParamsTable.setColumnWidth(1, 300)
self.ui.xsltParamsTable.horizontalHeader().setStretchLastSection(True)
# Eltern-Parameter Tabelle
self.ui.parentParamsTable.setColumnWidth(0, 200)
self.ui.parentParamsTable.setColumnWidth(1, 300)
self.ui.parentParamsTable.horizontalHeader().setStretchLastSection(True)
def _load_data(self):
"""Lädt die Daten des XslFile in den Dialog."""
if not self.node:
return
# Bezeichnung setzen
self.ui.bezEdit.setText(str(self.node.bez) if self.node.bez else "")
# XSLT Parameter laden
self._load_xslt_params()
# Eltern-Parameter laden
self._load_parent_params()
def _load_xslt_params(self):
"""Lädt die XSLT Parameter in die Tabelle."""
if not self.node or not self.node.xslt_params:
return
params = self.node.xslt_params
self.ui.xsltParamsTable.setRowCount(len(params))
for row, (key, value) in enumerate(params.items()):
# Parameter-Name
key_item = QTableWidgetItem(str(key))
self.ui.xsltParamsTable.setItem(row, 0, key_item)
# Parameter-Wert
value_item = QTableWidgetItem(str(value))
self.ui.xsltParamsTable.setItem(row, 1, value_item)
def _load_parent_params(self):
"""Lädt die Eltern-Parameter in die Tabelle (nur anzeigen)."""
if not self.parent_params:
return
self.ui.parentParamsTable.setRowCount(len(self.parent_params))
for row, (key, value) in enumerate(self.parent_params.items()):
# Parameter-Name
key_item = QTableWidgetItem(str(key))
key_item.setFlags(key_item.flags() & ~Qt.ItemFlag.ItemIsEditable)
self.ui.parentParamsTable.setItem(row, 0, key_item)
# Parameter-Wert
value_item = QTableWidgetItem(str(value))
value_item.setFlags(value_item.flags() & ~Qt.ItemFlag.ItemIsEditable)
self.ui.parentParamsTable.setItem(row, 1, value_item)
def add_parameter(self):
"""Fügt einen neuen Parameter hinzu."""
row_count = self.ui.xsltParamsTable.rowCount()
self.ui.xsltParamsTable.insertRow(row_count)
# Leere Items hinzufügen
key_item = QTableWidgetItem("")
value_item = QTableWidgetItem("")
self.ui.xsltParamsTable.setItem(row_count, 0, key_item)
self.ui.xsltParamsTable.setItem(row_count, 1, value_item)
# Fokus auf den neuen Parameter setzen
self.ui.xsltParamsTable.setCurrentCell(row_count, 0)
def remove_parameter(self):
"""Entfernt den ausgewählten Parameter."""
current_row = self.ui.xsltParamsTable.currentRow()
if current_row >= 0:
self.ui.xsltParamsTable.removeRow(current_row)
def get_data(self):
"""
Gibt die bearbeiteten Daten zurück.
Returns:
dict: Dictionary mit den bearbeiteten Daten oder None bei Fehler
"""
# Bezeichnung prüfen
bez = self.ui.bezEdit.text().strip()
if not bez:
QMessageBox.warning(self, "Warnung", "Bitte geben Sie eine Bezeichnung ein.")
return None
# XSLT Parameter sammeln
xslt_params = {}
for row in range(self.ui.xsltParamsTable.rowCount()):
key_item = self.ui.xsltParamsTable.item(row, 0)
value_item = self.ui.xsltParamsTable.item(row, 1)
if key_item and value_item:
key = key_item.text().strip()
value = value_item.text().strip()
if key: # Nur Parameter mit nicht-leerem Schlüssel hinzufügen
xslt_params[key] = value
# CheckBox für Force-Transformation prüfen
force_transform = self.ui.alle_xml_transformieren.isChecked()
return {
"bez": bez,
"xslt_params": xslt_params,
"force_transform": force_transform
}
def accept(self):
"""Überschreibt accept() um Datenvalidierung durchzuführen."""
data = self.get_data()
if data is not None:
super().accept()
def _create_ui(self):
return Ui_XslFileEditDialog()
+120
View File
@@ -0,0 +1,120 @@
from PySide6.QtWidgets import QDialog, QTableWidgetItem, QMessageBox
from PySide6.QtCore import Qt
class XsltParamsEditDialog(QDialog):
"""Gemeinsame Basisklasse für Dialoge zur Bearbeitung von XSLT-Parametern."""
def __init__(self, parent=None, node=None, parent_params=None):
super().__init__(parent)
self.ui = self._create_ui()
self.ui.setupUi(self)
self.node = node
self.parent_params = parent_params or {}
self.ui.addParamButton.clicked.connect(self.add_parameter)
self.ui.removeParamButton.clicked.connect(self.remove_parameter)
self._setup_tables()
if self.node:
self._load_data()
def _create_ui(self):
"""Gibt eine Instanz der konkreten UI-Klasse zurück. Muss überschrieben werden."""
raise NotImplementedError
def _setup_tables(self):
"""Konfiguriert die Tabellen."""
self.ui.xsltParamsTable.setColumnWidth(0, 200)
self.ui.xsltParamsTable.setColumnWidth(1, 300)
self.ui.xsltParamsTable.horizontalHeader().setStretchLastSection(True)
self.ui.parentParamsTable.setColumnWidth(0, 200)
self.ui.parentParamsTable.setColumnWidth(1, 300)
self.ui.parentParamsTable.horizontalHeader().setStretchLastSection(True)
def _load_data(self):
"""Lädt die Daten des Knotens in den Dialog."""
if not self.node:
return
self.ui.bezEdit.setText(str(self.node.bez) if self.node.bez else "")
self._load_xslt_params()
self._load_parent_params()
def _load_xslt_params(self):
"""Lädt die XSLT-Parameter in die Tabelle."""
if not self.node or not self.node.xslt_params:
return
params = self.node.xslt_params
self.ui.xsltParamsTable.setRowCount(len(params))
for row, (key, value) in enumerate(params.items()):
self.ui.xsltParamsTable.setItem(row, 0, QTableWidgetItem(str(key)))
self.ui.xsltParamsTable.setItem(row, 1, QTableWidgetItem(str(value)))
def _load_parent_params(self):
"""Lädt die Eltern-Parameter in die Tabelle (nur anzeigen)."""
if not self.parent_params:
return
self.ui.parentParamsTable.setRowCount(len(self.parent_params))
for row, (key, value) in enumerate(self.parent_params.items()):
key_item = QTableWidgetItem(str(key))
key_item.setFlags(key_item.flags() & ~Qt.ItemFlag.ItemIsEditable)
self.ui.parentParamsTable.setItem(row, 0, key_item)
value_item = QTableWidgetItem(str(value))
value_item.setFlags(value_item.flags() & ~Qt.ItemFlag.ItemIsEditable)
self.ui.parentParamsTable.setItem(row, 1, value_item)
def add_parameter(self):
"""Fügt einen neuen Parameter hinzu."""
row_count = self.ui.xsltParamsTable.rowCount()
self.ui.xsltParamsTable.insertRow(row_count)
self.ui.xsltParamsTable.setItem(row_count, 0, QTableWidgetItem(""))
self.ui.xsltParamsTable.setItem(row_count, 1, QTableWidgetItem(""))
self.ui.xsltParamsTable.setCurrentCell(row_count, 0)
def remove_parameter(self):
"""Entfernt den ausgewählten Parameter."""
current_row = self.ui.xsltParamsTable.currentRow()
if current_row >= 0:
self.ui.xsltParamsTable.removeRow(current_row)
def get_data(self):
"""
Gibt die bearbeiteten Daten zurück.
Returns:
dict mit 'bez', 'xslt_params', 'force_transform', oder None bei Validierungsfehler
"""
bez = self.ui.bezEdit.text().strip()
if not bez:
QMessageBox.warning(self, "Warnung", "Bitte geben Sie eine Bezeichnung ein.")
return None
xslt_params = {}
for row in range(self.ui.xsltParamsTable.rowCount()):
key_item = self.ui.xsltParamsTable.item(row, 0)
value_item = self.ui.xsltParamsTable.item(row, 1)
if key_item and value_item:
key = key_item.text().strip()
if key:
xslt_params[key] = value_item.text().strip()
return {
"bez": bez,
"xslt_params": xslt_params,
"force_transform": self.ui.alle_xml_transformieren.isChecked(),
}
def accept(self):
"""Überschreibt accept() um Datenvalidierung durchzuführen."""
if self.get_data() is not None:
super().accept()
+308
View File
@@ -0,0 +1,308 @@
"""
BaseWorkerPool - Gemeinsame Basisklasse für JVM-Worker-Pools.
Enthält alle gemeinsamen Methoden für SaxonWorkerPool, SaxonWorkerPoolS9Api
und FopWorkerPool: Kompilierung, Worker-Start, RAM-Messung und Shutdown.
"""
import glob
import logging
import shutil
import subprocess
import sys
import tempfile
import threading
import time
from abc import ABC, abstractmethod
from pathlib import Path
from typing import Optional
import psutil
from worker_metrics import WorkerPoolMetrics
logger = logging.getLogger(__name__)
_CLASSPATH_SEP = ";" if sys.platform == "win32" else ":"
def build_jar_classpath(base_dir: Path, include_lib: bool = True) -> str:
"""
Baut einen Classpath aus allen JAR-Dateien in einem Verzeichnis.
Args:
base_dir: Hauptverzeichnis mit JAR-Dateien
include_lib: Ob das lib/-Unterverzeichnis ebenfalls einbezogen wird
Returns:
Classpath-String (plattformspezifischer Trennzeichen)
"""
all_jars = glob.glob(str(base_dir / "*.jar"))
if include_lib:
lib_dir = base_dir / "lib"
if lib_dir.exists():
all_jars.extend(glob.glob(str(lib_dir / "*.jar")))
return _CLASSPATH_SEP.join(all_jars)
class BaseWorkerPool(ABC):
"""
Abstrakte Basisklasse für JVM-basierte Worker-Pools.
Konkrete Unterklassen müssen folgende Properties/Methoden implementieren:
- _pool_name: Anzeigename für Log-Meldungen
- _java_source_code: Java-Quellcode-String
- _java_class_name: Name der Java-Klasse (ohne .java)
- _temp_dir_prefix: Präfix für das temporäre Verzeichnis
- _worker_init_sleep: Wartezeit nach Worker-Start (Sekunden)
- _get_classpath(): Gibt den Classpath für Kompilierung und Ausführung zurück
- _build_worker_cmd(full_classpath): Gibt die Worker-Startkommando-Liste zurück
- _stderr_log_name(i): Gibt den Dateinamen der stderr-Logdatei für Worker i zurück
"""
def __init__(self, num_workers: int, java_vm_path: Path, log_dir: Optional[Path] = None):
self.num_workers = num_workers
self.java_vm_path = java_vm_path
self.log_dir = log_dir
# Worker-Prozesse
self.workers: list[subprocess.Popen] = []
self.worker_locks: list[threading.Lock] = []
self._worker_stderr_files: list = []
# Temporäres Verzeichnis für kompilierte Java-Klasse
self.temp_dir: Optional[Path] = None
self.worker_class_path: Optional[Path] = None
self.worker_log_dir: Optional[Path] = None
# Performance-Metriken
self.metrics = WorkerPoolMetrics()
# --- Abstrakte Schnittstelle ---
@property
@abstractmethod
def _pool_name(self) -> str:
"""Anzeigename für Log-Meldungen (z.B. 'Saxon', 'FOP')."""
@property
@abstractmethod
def _java_source_code(self) -> str:
"""Java-Quellcode-String der Worker-Klasse."""
@property
@abstractmethod
def _java_class_name(self) -> str:
"""Name der Java-Klasse ohne Suffix (z.B. 'SaxonWorker')."""
@property
@abstractmethod
def _temp_dir_prefix(self) -> str:
"""Präfix für das temporäre Verzeichnis (z.B. 'saxon_worker_')."""
@property
@abstractmethod
def _worker_init_sleep(self) -> float:
"""Wartezeit in Sekunden nach Worker-Start."""
@abstractmethod
def _get_classpath(self) -> str:
"""Gibt den Classpath-String für Kompilierung und Worker-Start zurück."""
@abstractmethod
def _build_worker_cmd(self, full_classpath: str) -> list[str]:
"""Gibt die Worker-Startkommando-Liste zurück."""
@abstractmethod
def _stderr_log_name(self, i: int) -> str:
"""Gibt den Dateinamen der stderr-Logdatei für Worker i zurück."""
# --- Gemeinsame Implementierungen ---
def _compile_worker_class(self):
"""Kompiliert die Worker-Java-Klasse ins temporäre Verzeichnis."""
start_time = time.time()
try:
self.temp_dir = Path(tempfile.mkdtemp(prefix=self._temp_dir_prefix))
java_file = self.temp_dir / f"{self._java_class_name}.java"
java_file.write_text(self._java_source_code, encoding="utf-8")
classpath = self._get_classpath()
javac_cmd = [str(self.java_vm_path).replace("java", "javac"), "-cp", classpath, str(java_file)]
logger.debug(f"Kompiliere {self._java_class_name}: {' '.join(javac_cmd[:3])}...")
result = subprocess.run(javac_cmd, capture_output=True, text=True, timeout=30)
if result.returncode != 0:
raise RuntimeError(f"Java-Kompilierung fehlgeschlagen: {result.stderr}")
self.worker_class_path = self.temp_dir
self.metrics.compilation_time_seconds = time.time() - start_time
logger.info(
f"{self._java_class_name} erfolgreich kompiliert: {self.temp_dir} "
f"({self.metrics.compilation_time_seconds:.3f}s)"
)
except Exception as e:
logger.error(f"Fehler beim Kompilieren von {self._java_class_name}: {e}")
raise
def _start_workers(self):
"""Startet N Worker-Prozesse."""
classpath = self._get_classpath()
full_classpath = str(self.worker_class_path) + _CLASSPATH_SEP + classpath
self.worker_log_dir = self.log_dir if self.log_dir else self.temp_dir
if self.log_dir:
self.worker_log_dir.mkdir(parents=True, exist_ok=True)
cmd = self._build_worker_cmd(full_classpath)
for i in range(self.num_workers):
worker_start_time = time.time()
try:
stderr_log = self.worker_log_dir / self._stderr_log_name(i)
stderr_file = open(stderr_log, "w", encoding="utf-8")
self._worker_stderr_files.append(stderr_file)
process = subprocess.Popen(
cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=stderr_file,
text=True,
bufsize=1,
)
self.workers.append(process)
self.worker_locks.append(threading.Lock())
logger.debug(f"{self._pool_name} Worker {i} gestartet (PID: {process.pid}, stderr: {stderr_log})")
time.sleep(self._worker_init_sleep)
if process.poll() is not None:
stderr_file.close()
with open(stderr_log, "r") as f:
stderr_content = f.read()
raise RuntimeError(
f"{self._pool_name} Worker {i} ist sofort beendet "
f"(Exit Code: {process.returncode})\nstderr:\n{stderr_content}"
)
self.metrics.worker_start_times.append(time.time() - worker_start_time)
except Exception as e:
logger.error(f"Fehler beim Starten von {self._pool_name} Worker {i}: {e}")
raise
self.metrics.calculate_aggregates()
logger.info(
f"{len(self.workers)} {self._pool_name}-Worker erfolgreich gestartet "
f"(Summe: {self.metrics.total_worker_start_time_seconds:.3f}s, "
f"Durchschnitt: {self.metrics.average_worker_start_time_seconds:.3f}s)"
)
def _acquire_worker(self) -> int:
"""Gibt den Index eines freien Workers zurück (blockiert bis einer frei ist)."""
for i, lock in enumerate(self.worker_locks):
if lock.acquire(blocking=False):
return i
# Kein freier Worker — warte auf den ersten verfügbaren
for i, lock in enumerate(self.worker_locks):
lock.acquire()
return i
def _read_stderr_log(self, worker_idx: int, tail: int = 0) -> str:
"""Liest die stderr-Logdatei eines Workers (optional nur die letzten N Zeichen)."""
try:
stderr_log = self.worker_log_dir / self._stderr_log_name(worker_idx)
with open(stderr_log, "r") as f:
content = f.read()
return content[-tail:] if tail else content
except Exception:
return ""
def measure_ram_usage(self) -> tuple[float, float, list[float]]:
"""
Misst den aktuellen RAM-Verbrauch aller Worker-Prozesse.
Returns:
tuple: (total_mb, average_mb, per_worker_mb_list)
"""
ram_per_worker = []
for i, worker in enumerate(self.workers):
try:
if worker.poll() is None:
process = psutil.Process(worker.pid)
ram_mb = process.memory_info().rss / (1024 * 1024)
ram_per_worker.append(ram_mb)
else:
logger.warning(f"{self._pool_name} Worker {i} ist nicht mehr aktiv (kann RAM nicht messen)")
except (psutil.NoSuchProcess, psutil.AccessDenied) as e:
logger.warning(f"Konnte RAM für {self._pool_name} Worker {i} nicht messen: {e}")
total_ram = sum(ram_per_worker)
average_ram = total_ram / len(ram_per_worker) if ram_per_worker else 0.0
return total_ram, average_ram, ram_per_worker
def capture_ram_before_transform(self):
"""Erfasst RAM-Verbrauch vor der ersten Transformation."""
total, average, per_worker = self.measure_ram_usage()
self.metrics.ram_before_transform_mb_per_worker = per_worker
self.metrics.total_ram_before_mb = total
self.metrics.average_ram_before_mb = average
logger.info(
f"RAM vor Transformation ({self._pool_name}): {total:.1f} MB (Ø {average:.1f} MB/Worker)"
)
def capture_ram_after_transform(self):
"""Erfasst RAM-Verbrauch nach allen Transformationen."""
total, average, per_worker = self.measure_ram_usage()
self.metrics.ram_after_transform_mb_per_worker = per_worker
self.metrics.total_ram_after_mb = total
self.metrics.average_ram_after_mb = average
logger.info(
f"RAM nach Transformation ({self._pool_name}): {total:.1f} MB (Ø {average:.1f} MB/Worker)"
)
def shutdown(self):
"""Beendet alle Worker-Prozesse sauber."""
logger.info(f"Beende {self._pool_name}-Worker-Pool...")
for i, worker in enumerate(self.workers):
try:
if worker.stdin and not worker.stdin.closed:
worker.stdin.write("EXIT\n")
worker.stdin.flush()
worker.wait(timeout=2)
logger.debug(f"{self._pool_name} Worker {i} beendet")
except subprocess.TimeoutExpired:
worker.kill()
logger.warning(f"{self._pool_name} Worker {i} musste gekillt werden")
except Exception as e:
logger.error(f"Fehler beim Beenden von {self._pool_name} Worker {i}: {e}")
for f in self._worker_stderr_files:
try:
f.close()
except Exception:
pass
self._worker_stderr_files.clear()
if self.temp_dir and self.temp_dir.exists():
try:
shutil.rmtree(self.temp_dir)
logger.debug(f"Temporäres Verzeichnis gelöscht: {self.temp_dir}")
except Exception as e:
logger.warning(f"Konnte temporäres Verzeichnis nicht löschen: {e}")
logger.info(f"{self._pool_name}-Worker-Pool beendet")
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.shutdown()