Files
xsl-validator/src/worker_pool_base.py
T

313 lines
12 KiB
Python
Raw Normal View History

"""
BaseWorkerPool - Gemeinsame Basisklasse für JVM-Worker-Pools.
Enthält alle gemeinsamen Methoden für SaxonWorkerPool, SaxonWorkerPoolS9Api
und FopWorkerPool: Kompilierung, Worker-Start, RAM-Messung und Shutdown.
"""
import glob
import logging
import shutil
import subprocess
import sys
import tempfile
import threading
import time
from abc import ABC, abstractmethod
from pathlib import Path
from typing import Optional
# Verhindert Konsolenfenster bei Subprozessen in PyInstaller-EXE (Windows)
_SUBPROCESS_FLAGS = subprocess.CREATE_NO_WINDOW if sys.platform == "win32" else 0
import psutil
from worker_metrics import WorkerPoolMetrics
logger = logging.getLogger(__name__)
_CLASSPATH_SEP = ";" if sys.platform == "win32" else ":"
def build_jar_classpath(base_dir: Path, include_lib: bool = True) -> str:
"""
Baut einen Classpath aus allen JAR-Dateien in einem Verzeichnis.
Args:
base_dir: Hauptverzeichnis mit JAR-Dateien
include_lib: Ob das lib/-Unterverzeichnis ebenfalls einbezogen wird
Returns:
Classpath-String (plattformspezifischer Trennzeichen)
"""
all_jars = glob.glob(str(base_dir / "*.jar"))
if include_lib:
lib_dir = base_dir / "lib"
if lib_dir.exists():
all_jars.extend(glob.glob(str(lib_dir / "*.jar")))
return _CLASSPATH_SEP.join(all_jars)
class BaseWorkerPool(ABC):
"""
Abstrakte Basisklasse für JVM-basierte Worker-Pools.
Konkrete Unterklassen müssen folgende Properties/Methoden implementieren:
- _pool_name: Anzeigename für Log-Meldungen
- _java_source_code: Java-Quellcode-String
- _java_class_name: Name der Java-Klasse (ohne .java)
- _temp_dir_prefix: Präfix für das temporäre Verzeichnis
- _worker_init_sleep: Wartezeit nach Worker-Start (Sekunden)
- _get_classpath(): Gibt den Classpath für Kompilierung und Ausführung zurück
- _build_worker_cmd(full_classpath): Gibt die Worker-Startkommando-Liste zurück
- _stderr_log_name(i): Gibt den Dateinamen der stderr-Logdatei für Worker i zurück
"""
def __init__(self, num_workers: int, java_vm_path: Path, log_dir: Optional[Path] = None):
self.num_workers = num_workers
self.java_vm_path = java_vm_path
self.log_dir = log_dir
# Worker-Prozesse
self.workers: list[subprocess.Popen] = []
self.worker_locks: list[threading.Lock] = []
self._worker_stderr_files: list = []
# Temporäres Verzeichnis für kompilierte Java-Klasse
self.temp_dir: Optional[Path] = None
self.worker_class_path: Optional[Path] = None
self.worker_log_dir: Optional[Path] = None
# Performance-Metriken
self.metrics = WorkerPoolMetrics()
# --- Abstrakte Schnittstelle ---
@property
@abstractmethod
def _pool_name(self) -> str:
"""Anzeigename für Log-Meldungen (z.B. 'Saxon', 'FOP')."""
@property
@abstractmethod
def _java_source_code(self) -> str:
"""Java-Quellcode-String der Worker-Klasse."""
@property
@abstractmethod
def _java_class_name(self) -> str:
"""Name der Java-Klasse ohne Suffix (z.B. 'SaxonWorker')."""
@property
@abstractmethod
def _temp_dir_prefix(self) -> str:
"""Präfix für das temporäre Verzeichnis (z.B. 'saxon_worker_')."""
@property
@abstractmethod
def _worker_init_sleep(self) -> float:
"""Wartezeit in Sekunden nach Worker-Start."""
@abstractmethod
def _get_classpath(self) -> str:
"""Gibt den Classpath-String für Kompilierung und Worker-Start zurück."""
@abstractmethod
def _build_worker_cmd(self, full_classpath: str) -> list[str]:
"""Gibt die Worker-Startkommando-Liste zurück."""
@abstractmethod
def _stderr_log_name(self, i: int) -> str:
"""Gibt den Dateinamen der stderr-Logdatei für Worker i zurück."""
# --- Gemeinsame Implementierungen ---
def _compile_worker_class(self):
"""Kompiliert die Worker-Java-Klasse ins temporäre Verzeichnis."""
start_time = time.time()
try:
self.temp_dir = Path(tempfile.mkdtemp(prefix=self._temp_dir_prefix))
java_file = self.temp_dir / f"{self._java_class_name}.java"
java_file.write_text(self._java_source_code, encoding="utf-8")
classpath = self._get_classpath()
javac_cmd = [str(self.java_vm_path).replace("java", "javac"), "-cp", classpath, str(java_file)]
logger.debug(f"Kompiliere {self._java_class_name}: {' '.join(javac_cmd[:3])}...")
result = subprocess.run(javac_cmd, capture_output=True, text=True, timeout=30, creationflags=_SUBPROCESS_FLAGS)
if result.returncode != 0:
raise RuntimeError(f"Java-Kompilierung fehlgeschlagen: {result.stderr}")
self.worker_class_path = self.temp_dir
self.metrics.compilation_time_seconds = time.time() - start_time
logger.info(
f"{self._java_class_name} erfolgreich kompiliert: {self.temp_dir} "
f"({self.metrics.compilation_time_seconds:.3f}s)"
)
except Exception as e:
logger.error(f"Fehler beim Kompilieren von {self._java_class_name}: {e}")
raise
def _start_workers(self):
"""Startet N Worker-Prozesse."""
classpath = self._get_classpath()
full_classpath = str(self.worker_class_path) + _CLASSPATH_SEP + classpath
self.worker_log_dir = self.log_dir if self.log_dir else self.temp_dir
if self.log_dir:
self.worker_log_dir.mkdir(parents=True, exist_ok=True)
cmd = self._build_worker_cmd(full_classpath)
for i in range(self.num_workers):
worker_start_time = time.time()
try:
stderr_log = self.worker_log_dir / self._stderr_log_name(i)
stderr_file = open(stderr_log, "w", encoding="utf-8")
self._worker_stderr_files.append(stderr_file)
process = subprocess.Popen(
cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=stderr_file,
text=True,
bufsize=1,
creationflags=_SUBPROCESS_FLAGS,
)
self.workers.append(process)
self.worker_locks.append(threading.Lock())
logger.debug(f"{self._pool_name} Worker {i} gestartet (PID: {process.pid}, stderr: {stderr_log})")
time.sleep(self._worker_init_sleep)
if process.poll() is not None:
stderr_file.close()
with open(stderr_log, "r") as f:
stderr_content = f.read()
raise RuntimeError(
f"{self._pool_name} Worker {i} ist sofort beendet "
f"(Exit Code: {process.returncode})\nstderr:\n{stderr_content}"
)
self.metrics.worker_start_times.append(time.time() - worker_start_time)
except Exception as e:
logger.error(f"Fehler beim Starten von {self._pool_name} Worker {i}: {e}")
raise
self.metrics.calculate_aggregates()
logger.info(
f"{len(self.workers)} {self._pool_name}-Worker erfolgreich gestartet "
f"(Summe: {self.metrics.total_worker_start_time_seconds:.3f}s, "
f"Durchschnitt: {self.metrics.average_worker_start_time_seconds:.3f}s)"
)
def _acquire_worker(self) -> int:
"""Gibt den Index eines freien Workers zurück (blockiert bis einer frei ist)."""
for i, lock in enumerate(self.worker_locks):
if lock.acquire(blocking=False):
return i
# Kein freier Worker — warte auf den ersten verfügbaren
for i, lock in enumerate(self.worker_locks):
lock.acquire()
return i
def _read_stderr_log(self, worker_idx: int, tail: int = 0) -> str:
"""Liest die stderr-Logdatei eines Workers (optional nur die letzten N Zeichen)."""
try:
stderr_log = self.worker_log_dir / self._stderr_log_name(worker_idx)
with open(stderr_log, "r") as f:
content = f.read()
return content[-tail:] if tail else content
except Exception:
return ""
def measure_ram_usage(self) -> tuple[float, float, list[float]]:
"""
Misst den aktuellen RAM-Verbrauch aller Worker-Prozesse.
Returns:
tuple: (total_mb, average_mb, per_worker_mb_list)
"""
ram_per_worker = []
for i, worker in enumerate(self.workers):
try:
if worker.poll() is None:
process = psutil.Process(worker.pid)
ram_mb = process.memory_info().rss / (1024 * 1024)
ram_per_worker.append(ram_mb)
else:
logger.warning(f"{self._pool_name} Worker {i} ist nicht mehr aktiv (kann RAM nicht messen)")
except (psutil.NoSuchProcess, psutil.AccessDenied) as e:
logger.warning(f"Konnte RAM für {self._pool_name} Worker {i} nicht messen: {e}")
total_ram = sum(ram_per_worker)
average_ram = total_ram / len(ram_per_worker) if ram_per_worker else 0.0
return total_ram, average_ram, ram_per_worker
def capture_ram_before_transform(self):
"""Erfasst RAM-Verbrauch vor der ersten Transformation."""
total, average, per_worker = self.measure_ram_usage()
self.metrics.ram_before_transform_mb_per_worker = per_worker
self.metrics.total_ram_before_mb = total
self.metrics.average_ram_before_mb = average
logger.info(
f"RAM vor Transformation ({self._pool_name}): {total:.1f} MB (Ø {average:.1f} MB/Worker)"
)
def capture_ram_after_transform(self):
"""Erfasst RAM-Verbrauch nach allen Transformationen."""
total, average, per_worker = self.measure_ram_usage()
self.metrics.ram_after_transform_mb_per_worker = per_worker
self.metrics.total_ram_after_mb = total
self.metrics.average_ram_after_mb = average
logger.info(
f"RAM nach Transformation ({self._pool_name}): {total:.1f} MB (Ø {average:.1f} MB/Worker)"
)
def shutdown(self):
"""Beendet alle Worker-Prozesse sauber."""
logger.info(f"Beende {self._pool_name}-Worker-Pool...")
for i, worker in enumerate(self.workers):
try:
if worker.stdin and not worker.stdin.closed:
worker.stdin.write("EXIT\n")
worker.stdin.flush()
worker.wait(timeout=2)
logger.debug(f"{self._pool_name} Worker {i} beendet")
except subprocess.TimeoutExpired:
worker.kill()
logger.warning(f"{self._pool_name} Worker {i} musste gekillt werden")
except Exception as e:
logger.error(f"Fehler beim Beenden von {self._pool_name} Worker {i}: {e}")
for f in self._worker_stderr_files:
try:
f.close()
except Exception:
pass
self._worker_stderr_files.clear()
if self.temp_dir and self.temp_dir.exists():
try:
shutil.rmtree(self.temp_dir)
logger.debug(f"Temporäres Verzeichnis gelöscht: {self.temp_dir}")
except Exception as e:
logger.warning(f"Konnte temporäres Verzeichnis nicht löschen: {e}")
logger.info(f"{self._pool_name}-Worker-Pool beendet")
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.shutdown()