309 lines
12 KiB
Python
309 lines
12 KiB
Python
|
|
"""
|
||
|
|
BaseWorkerPool - Gemeinsame Basisklasse für JVM-Worker-Pools.
|
||
|
|
|
||
|
|
Enthält alle gemeinsamen Methoden für SaxonWorkerPool, SaxonWorkerPoolS9Api
|
||
|
|
und FopWorkerPool: Kompilierung, Worker-Start, RAM-Messung und Shutdown.
|
||
|
|
"""
|
||
|
|
|
||
|
|
import glob
|
||
|
|
import logging
|
||
|
|
import shutil
|
||
|
|
import subprocess
|
||
|
|
import sys
|
||
|
|
import tempfile
|
||
|
|
import threading
|
||
|
|
import time
|
||
|
|
from abc import ABC, abstractmethod
|
||
|
|
from pathlib import Path
|
||
|
|
from typing import Optional
|
||
|
|
|
||
|
|
import psutil
|
||
|
|
|
||
|
|
from worker_metrics import WorkerPoolMetrics
|
||
|
|
|
||
|
|
logger = logging.getLogger(__name__)
|
||
|
|
|
||
|
|
_CLASSPATH_SEP = ";" if sys.platform == "win32" else ":"
|
||
|
|
|
||
|
|
|
||
|
|
def build_jar_classpath(base_dir: Path, include_lib: bool = True) -> str:
|
||
|
|
"""
|
||
|
|
Baut einen Classpath aus allen JAR-Dateien in einem Verzeichnis.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
base_dir: Hauptverzeichnis mit JAR-Dateien
|
||
|
|
include_lib: Ob das lib/-Unterverzeichnis ebenfalls einbezogen wird
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
Classpath-String (plattformspezifischer Trennzeichen)
|
||
|
|
"""
|
||
|
|
all_jars = glob.glob(str(base_dir / "*.jar"))
|
||
|
|
if include_lib:
|
||
|
|
lib_dir = base_dir / "lib"
|
||
|
|
if lib_dir.exists():
|
||
|
|
all_jars.extend(glob.glob(str(lib_dir / "*.jar")))
|
||
|
|
return _CLASSPATH_SEP.join(all_jars)
|
||
|
|
|
||
|
|
|
||
|
|
class BaseWorkerPool(ABC):
|
||
|
|
"""
|
||
|
|
Abstrakte Basisklasse für JVM-basierte Worker-Pools.
|
||
|
|
|
||
|
|
Konkrete Unterklassen müssen folgende Properties/Methoden implementieren:
|
||
|
|
- _pool_name: Anzeigename für Log-Meldungen
|
||
|
|
- _java_source_code: Java-Quellcode-String
|
||
|
|
- _java_class_name: Name der Java-Klasse (ohne .java)
|
||
|
|
- _temp_dir_prefix: Präfix für das temporäre Verzeichnis
|
||
|
|
- _worker_init_sleep: Wartezeit nach Worker-Start (Sekunden)
|
||
|
|
- _get_classpath(): Gibt den Classpath für Kompilierung und Ausführung zurück
|
||
|
|
- _build_worker_cmd(full_classpath): Gibt die Worker-Startkommando-Liste zurück
|
||
|
|
- _stderr_log_name(i): Gibt den Dateinamen der stderr-Logdatei für Worker i zurück
|
||
|
|
"""
|
||
|
|
|
||
|
|
def __init__(self, num_workers: int, java_vm_path: Path, log_dir: Optional[Path] = None):
|
||
|
|
self.num_workers = num_workers
|
||
|
|
self.java_vm_path = java_vm_path
|
||
|
|
self.log_dir = log_dir
|
||
|
|
|
||
|
|
# Worker-Prozesse
|
||
|
|
self.workers: list[subprocess.Popen] = []
|
||
|
|
self.worker_locks: list[threading.Lock] = []
|
||
|
|
self._worker_stderr_files: list = []
|
||
|
|
|
||
|
|
# Temporäres Verzeichnis für kompilierte Java-Klasse
|
||
|
|
self.temp_dir: Optional[Path] = None
|
||
|
|
self.worker_class_path: Optional[Path] = None
|
||
|
|
self.worker_log_dir: Optional[Path] = None
|
||
|
|
|
||
|
|
# Performance-Metriken
|
||
|
|
self.metrics = WorkerPoolMetrics()
|
||
|
|
|
||
|
|
# --- Abstrakte Schnittstelle ---
|
||
|
|
|
||
|
|
@property
|
||
|
|
@abstractmethod
|
||
|
|
def _pool_name(self) -> str:
|
||
|
|
"""Anzeigename für Log-Meldungen (z.B. 'Saxon', 'FOP')."""
|
||
|
|
|
||
|
|
@property
|
||
|
|
@abstractmethod
|
||
|
|
def _java_source_code(self) -> str:
|
||
|
|
"""Java-Quellcode-String der Worker-Klasse."""
|
||
|
|
|
||
|
|
@property
|
||
|
|
@abstractmethod
|
||
|
|
def _java_class_name(self) -> str:
|
||
|
|
"""Name der Java-Klasse ohne Suffix (z.B. 'SaxonWorker')."""
|
||
|
|
|
||
|
|
@property
|
||
|
|
@abstractmethod
|
||
|
|
def _temp_dir_prefix(self) -> str:
|
||
|
|
"""Präfix für das temporäre Verzeichnis (z.B. 'saxon_worker_')."""
|
||
|
|
|
||
|
|
@property
|
||
|
|
@abstractmethod
|
||
|
|
def _worker_init_sleep(self) -> float:
|
||
|
|
"""Wartezeit in Sekunden nach Worker-Start."""
|
||
|
|
|
||
|
|
@abstractmethod
|
||
|
|
def _get_classpath(self) -> str:
|
||
|
|
"""Gibt den Classpath-String für Kompilierung und Worker-Start zurück."""
|
||
|
|
|
||
|
|
@abstractmethod
|
||
|
|
def _build_worker_cmd(self, full_classpath: str) -> list[str]:
|
||
|
|
"""Gibt die Worker-Startkommando-Liste zurück."""
|
||
|
|
|
||
|
|
@abstractmethod
|
||
|
|
def _stderr_log_name(self, i: int) -> str:
|
||
|
|
"""Gibt den Dateinamen der stderr-Logdatei für Worker i zurück."""
|
||
|
|
|
||
|
|
# --- Gemeinsame Implementierungen ---
|
||
|
|
|
||
|
|
def _compile_worker_class(self):
|
||
|
|
"""Kompiliert die Worker-Java-Klasse ins temporäre Verzeichnis."""
|
||
|
|
start_time = time.time()
|
||
|
|
try:
|
||
|
|
self.temp_dir = Path(tempfile.mkdtemp(prefix=self._temp_dir_prefix))
|
||
|
|
|
||
|
|
java_file = self.temp_dir / f"{self._java_class_name}.java"
|
||
|
|
java_file.write_text(self._java_source_code, encoding="utf-8")
|
||
|
|
|
||
|
|
classpath = self._get_classpath()
|
||
|
|
javac_cmd = [str(self.java_vm_path).replace("java", "javac"), "-cp", classpath, str(java_file)]
|
||
|
|
|
||
|
|
logger.debug(f"Kompiliere {self._java_class_name}: {' '.join(javac_cmd[:3])}...")
|
||
|
|
|
||
|
|
result = subprocess.run(javac_cmd, capture_output=True, text=True, timeout=30)
|
||
|
|
if result.returncode != 0:
|
||
|
|
raise RuntimeError(f"Java-Kompilierung fehlgeschlagen: {result.stderr}")
|
||
|
|
|
||
|
|
self.worker_class_path = self.temp_dir
|
||
|
|
self.metrics.compilation_time_seconds = time.time() - start_time
|
||
|
|
|
||
|
|
logger.info(
|
||
|
|
f"{self._java_class_name} erfolgreich kompiliert: {self.temp_dir} "
|
||
|
|
f"({self.metrics.compilation_time_seconds:.3f}s)"
|
||
|
|
)
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
logger.error(f"Fehler beim Kompilieren von {self._java_class_name}: {e}")
|
||
|
|
raise
|
||
|
|
|
||
|
|
def _start_workers(self):
|
||
|
|
"""Startet N Worker-Prozesse."""
|
||
|
|
classpath = self._get_classpath()
|
||
|
|
full_classpath = str(self.worker_class_path) + _CLASSPATH_SEP + classpath
|
||
|
|
|
||
|
|
self.worker_log_dir = self.log_dir if self.log_dir else self.temp_dir
|
||
|
|
if self.log_dir:
|
||
|
|
self.worker_log_dir.mkdir(parents=True, exist_ok=True)
|
||
|
|
|
||
|
|
cmd = self._build_worker_cmd(full_classpath)
|
||
|
|
|
||
|
|
for i in range(self.num_workers):
|
||
|
|
worker_start_time = time.time()
|
||
|
|
try:
|
||
|
|
stderr_log = self.worker_log_dir / self._stderr_log_name(i)
|
||
|
|
stderr_file = open(stderr_log, "w", encoding="utf-8")
|
||
|
|
self._worker_stderr_files.append(stderr_file)
|
||
|
|
|
||
|
|
process = subprocess.Popen(
|
||
|
|
cmd,
|
||
|
|
stdin=subprocess.PIPE,
|
||
|
|
stdout=subprocess.PIPE,
|
||
|
|
stderr=stderr_file,
|
||
|
|
text=True,
|
||
|
|
bufsize=1,
|
||
|
|
)
|
||
|
|
|
||
|
|
self.workers.append(process)
|
||
|
|
self.worker_locks.append(threading.Lock())
|
||
|
|
|
||
|
|
logger.debug(f"{self._pool_name} Worker {i} gestartet (PID: {process.pid}, stderr: {stderr_log})")
|
||
|
|
|
||
|
|
time.sleep(self._worker_init_sleep)
|
||
|
|
|
||
|
|
if process.poll() is not None:
|
||
|
|
stderr_file.close()
|
||
|
|
with open(stderr_log, "r") as f:
|
||
|
|
stderr_content = f.read()
|
||
|
|
raise RuntimeError(
|
||
|
|
f"{self._pool_name} Worker {i} ist sofort beendet "
|
||
|
|
f"(Exit Code: {process.returncode})\nstderr:\n{stderr_content}"
|
||
|
|
)
|
||
|
|
|
||
|
|
self.metrics.worker_start_times.append(time.time() - worker_start_time)
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
logger.error(f"Fehler beim Starten von {self._pool_name} Worker {i}: {e}")
|
||
|
|
raise
|
||
|
|
|
||
|
|
self.metrics.calculate_aggregates()
|
||
|
|
logger.info(
|
||
|
|
f"{len(self.workers)} {self._pool_name}-Worker erfolgreich gestartet "
|
||
|
|
f"(Summe: {self.metrics.total_worker_start_time_seconds:.3f}s, "
|
||
|
|
f"Durchschnitt: {self.metrics.average_worker_start_time_seconds:.3f}s)"
|
||
|
|
)
|
||
|
|
|
||
|
|
def _acquire_worker(self) -> int:
|
||
|
|
"""Gibt den Index eines freien Workers zurück (blockiert bis einer frei ist)."""
|
||
|
|
for i, lock in enumerate(self.worker_locks):
|
||
|
|
if lock.acquire(blocking=False):
|
||
|
|
return i
|
||
|
|
# Kein freier Worker — warte auf den ersten verfügbaren
|
||
|
|
for i, lock in enumerate(self.worker_locks):
|
||
|
|
lock.acquire()
|
||
|
|
return i
|
||
|
|
|
||
|
|
def _read_stderr_log(self, worker_idx: int, tail: int = 0) -> str:
|
||
|
|
"""Liest die stderr-Logdatei eines Workers (optional nur die letzten N Zeichen)."""
|
||
|
|
try:
|
||
|
|
stderr_log = self.worker_log_dir / self._stderr_log_name(worker_idx)
|
||
|
|
with open(stderr_log, "r") as f:
|
||
|
|
content = f.read()
|
||
|
|
return content[-tail:] if tail else content
|
||
|
|
except Exception:
|
||
|
|
return ""
|
||
|
|
|
||
|
|
def measure_ram_usage(self) -> tuple[float, float, list[float]]:
|
||
|
|
"""
|
||
|
|
Misst den aktuellen RAM-Verbrauch aller Worker-Prozesse.
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
tuple: (total_mb, average_mb, per_worker_mb_list)
|
||
|
|
"""
|
||
|
|
ram_per_worker = []
|
||
|
|
for i, worker in enumerate(self.workers):
|
||
|
|
try:
|
||
|
|
if worker.poll() is None:
|
||
|
|
process = psutil.Process(worker.pid)
|
||
|
|
ram_mb = process.memory_info().rss / (1024 * 1024)
|
||
|
|
ram_per_worker.append(ram_mb)
|
||
|
|
else:
|
||
|
|
logger.warning(f"{self._pool_name} Worker {i} ist nicht mehr aktiv (kann RAM nicht messen)")
|
||
|
|
except (psutil.NoSuchProcess, psutil.AccessDenied) as e:
|
||
|
|
logger.warning(f"Konnte RAM für {self._pool_name} Worker {i} nicht messen: {e}")
|
||
|
|
|
||
|
|
total_ram = sum(ram_per_worker)
|
||
|
|
average_ram = total_ram / len(ram_per_worker) if ram_per_worker else 0.0
|
||
|
|
return total_ram, average_ram, ram_per_worker
|
||
|
|
|
||
|
|
def capture_ram_before_transform(self):
|
||
|
|
"""Erfasst RAM-Verbrauch vor der ersten Transformation."""
|
||
|
|
total, average, per_worker = self.measure_ram_usage()
|
||
|
|
self.metrics.ram_before_transform_mb_per_worker = per_worker
|
||
|
|
self.metrics.total_ram_before_mb = total
|
||
|
|
self.metrics.average_ram_before_mb = average
|
||
|
|
logger.info(
|
||
|
|
f"RAM vor Transformation ({self._pool_name}): {total:.1f} MB (Ø {average:.1f} MB/Worker)"
|
||
|
|
)
|
||
|
|
|
||
|
|
def capture_ram_after_transform(self):
|
||
|
|
"""Erfasst RAM-Verbrauch nach allen Transformationen."""
|
||
|
|
total, average, per_worker = self.measure_ram_usage()
|
||
|
|
self.metrics.ram_after_transform_mb_per_worker = per_worker
|
||
|
|
self.metrics.total_ram_after_mb = total
|
||
|
|
self.metrics.average_ram_after_mb = average
|
||
|
|
logger.info(
|
||
|
|
f"RAM nach Transformation ({self._pool_name}): {total:.1f} MB (Ø {average:.1f} MB/Worker)"
|
||
|
|
)
|
||
|
|
|
||
|
|
def shutdown(self):
|
||
|
|
"""Beendet alle Worker-Prozesse sauber."""
|
||
|
|
logger.info(f"Beende {self._pool_name}-Worker-Pool...")
|
||
|
|
|
||
|
|
for i, worker in enumerate(self.workers):
|
||
|
|
try:
|
||
|
|
if worker.stdin and not worker.stdin.closed:
|
||
|
|
worker.stdin.write("EXIT\n")
|
||
|
|
worker.stdin.flush()
|
||
|
|
worker.wait(timeout=2)
|
||
|
|
logger.debug(f"{self._pool_name} Worker {i} beendet")
|
||
|
|
except subprocess.TimeoutExpired:
|
||
|
|
worker.kill()
|
||
|
|
logger.warning(f"{self._pool_name} Worker {i} musste gekillt werden")
|
||
|
|
except Exception as e:
|
||
|
|
logger.error(f"Fehler beim Beenden von {self._pool_name} Worker {i}: {e}")
|
||
|
|
|
||
|
|
for f in self._worker_stderr_files:
|
||
|
|
try:
|
||
|
|
f.close()
|
||
|
|
except Exception:
|
||
|
|
pass
|
||
|
|
self._worker_stderr_files.clear()
|
||
|
|
|
||
|
|
if self.temp_dir and self.temp_dir.exists():
|
||
|
|
try:
|
||
|
|
shutil.rmtree(self.temp_dir)
|
||
|
|
logger.debug(f"Temporäres Verzeichnis gelöscht: {self.temp_dir}")
|
||
|
|
except Exception as e:
|
||
|
|
logger.warning(f"Konnte temporäres Verzeichnis nicht löschen: {e}")
|
||
|
|
|
||
|
|
logger.info(f"{self._pool_name}-Worker-Pool beendet")
|
||
|
|
|
||
|
|
def __enter__(self):
|
||
|
|
return self
|
||
|
|
|
||
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
||
|
|
self.shutdown()
|