Files

268 lines
10 KiB
Python
Raw Permalink Normal View History

"""
Saxon Worker Pool (s9api) - Persistente JVM-Prozesse für XSLT 2.0/3.0 Transformationen.
Diese Variante verwendet die Saxon s9api API anstatt JAXP und ist für XSLT 2.0 und 3.0 geeignet.
Eliminiert JVM-Startup-Overhead durch Vorinitialisierung von N Worker-Prozessen.
Jeder Worker läuft als Daemon und verarbeitet mehrere Transformationen nacheinander.
"""
import logging
from pathlib import Path
from typing import Optional
from worker_pool_base import BaseWorkerPool, build_jar_classpath
logger = logging.getLogger(__name__)
# Java-Worker-Code für s9api (wird zur Laufzeit kompiliert)
SAXON_S9API_WORKER_JAVA = """
import net.sf.saxon.s9api.*;
import javax.xml.transform.stream.StreamSource;
import java.io.*;
import java.util.HashMap;
import java.util.Map;
public class SaxonS9ApiWorker {
public static void main(String[] args) {
BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
String line;
// Create Processor once and reuse (equivalent to TransformerFactory)
Processor processor = new Processor(false);
// Cache für kompilierte Stylesheets (Performance-Optimierung)
Map<String, XsltExecutable> stylesheetCache = new HashMap<>();
System.err.println("SaxonS9ApiWorker started and ready (using s9api for XSLT 2.0/3.0 with stylesheet caching)");
System.err.flush();
try {
while ((line = reader.readLine()) != null) {
System.err.println("DEBUG: Received line: " + line.substring(0, Math.min(100, line.length())));
System.err.flush();
if ("EXIT".equals(line.trim())) {
System.err.println("SaxonS9ApiWorker exiting");
break;
}
try {
// Parse job
System.err.println("DEBUG: Parsing job...");
System.err.flush();
String[] parts = line.split("\\\\t");
System.err.println("DEBUG: Parts count: " + parts.length);
System.err.flush();
if (parts.length < 3) {
System.out.println("ERROR: Invalid job format");
System.out.flush();
continue;
}
String sourceXml = parts[0];
String xslStylesheet = parts[1];
String outputFo = parts[2];
// Prüfe ob Stylesheet bereits im Cache ist
XsltExecutable executable;
if (stylesheetCache.containsKey(xslStylesheet)) {
executable = stylesheetCache.get(xslStylesheet);
System.err.println("DEBUG: Using cached stylesheet: " + xslStylesheet);
System.err.flush();
} else {
System.err.println("DEBUG: Compiling and caching stylesheet: " + xslStylesheet);
System.err.flush();
XsltCompiler compiler = processor.newXsltCompiler();
executable = compiler.compile(new StreamSource(new File(xslStylesheet)));
stylesheetCache.put(xslStylesheet, executable);
System.err.println("DEBUG: Stylesheet compiled and cached (cache size: " + stylesheetCache.size() + ")");
System.err.flush();
}
System.err.println("DEBUG: Creating transformer...");
System.err.flush();
// Create transformer
XsltTransformer transformer = executable.load();
// Set source
transformer.setSource(new StreamSource(new File(sourceXml)));
// Set destination
Serializer serializer = processor.newSerializer(new File(outputFo));
transformer.setDestination(serializer);
// Set parameters if present
if (parts.length > 3 && !parts[3].isEmpty()) {
String[] params = parts[3].split("\\\\|\\\\|\\\\|");
for (String param : params) {
if (!param.isEmpty() && param.contains("=")) {
String[] kv = param.split("=", 2);
transformer.setParameter(new QName(kv[0]), new XdmAtomicValue(kv[1]));
System.err.println("DEBUG: Set parameter: " + kv[0] + " = " + kv[1]);
}
}
System.err.flush();
}
System.err.println("DEBUG: Running transformation...");
System.err.flush();
// Transform
transformer.transform();
System.err.println("DEBUG: Transformation completed");
System.err.flush();
System.out.println("OK");
System.out.flush();
} catch (SaxonApiException e) {
System.err.println("DEBUG: SaxonApiException: " + e.getClass().getName());
System.err.flush();
e.printStackTrace(System.err);
String errorMsg = e.getMessage();
if (errorMsg == null || errorMsg.isEmpty()) {
errorMsg = e.getClass().getSimpleName();
}
System.out.println("ERROR: " + errorMsg);
System.out.flush();
} catch (Exception e) {
System.err.println("DEBUG: Job processing exception: " + e.getClass().getName());
System.err.flush();
e.printStackTrace(System.err);
System.out.println("ERROR: " + (e.getMessage() != null ? e.getMessage() : e.getClass().getName()));
System.out.flush();
}
}
} catch (IOException e) {
System.err.println("SaxonS9ApiWorker I/O error: " + e.getMessage());
e.printStackTrace(System.err);
}
}
}
"""
class SaxonWorkerPoolS9Api(BaseWorkerPool):
"""
Pool von lang-laufenden JVM-Prozessen für Saxon-Transformationen mit s9api.
Diese Variante verwendet die Saxon s9api API anstatt JAXP und unterstützt
vollständig XSLT 2.0 und 3.0 Transformationen.
"""
def __init__(
self,
num_workers: int,
java_vm_path: Path,
saxon_jar_path: Path,
classpath_cache: dict[Path, str],
log_dir: Optional[Path] = None,
):
super().__init__(num_workers, java_vm_path, log_dir)
self.saxon_jar_path = saxon_jar_path
self.classpath_cache = classpath_cache
self._compile_worker_class()
self._start_workers()
logger.info(f"SaxonWorkerPoolS9Api initialisiert mit {num_workers} Workern (XSLT 2.0/3.0)")
# --- Abstrakte Properties ---
@property
def _pool_name(self) -> str:
return "Saxon-S9Api"
@property
def _java_source_code(self) -> str:
return SAXON_S9API_WORKER_JAVA
@property
def _java_class_name(self) -> str:
return "SaxonS9ApiWorker"
@property
def _temp_dir_prefix(self) -> str:
return "saxon_s9api_worker_"
@property
def _worker_init_sleep(self) -> float:
return 0.1
# --- Abstrakte Methoden ---
def _get_classpath(self) -> str:
saxon_dir = self.saxon_jar_path.parent
if saxon_dir not in self.classpath_cache:
self.classpath_cache[saxon_dir] = build_jar_classpath(saxon_dir)
logger.debug(f"Classpath für {saxon_dir} neu erstellt und gecacht")
return self.classpath_cache[saxon_dir]
def _build_worker_cmd(self, full_classpath: str) -> list[str]:
return [str(self.java_vm_path), "-cp", full_classpath, "SaxonS9ApiWorker"]
def _stderr_log_name(self, i: int) -> str:
return f"s9api_worker_{i}_stderr.log"
# --- Saxon-s9api-spezifische Job-Methode ---
def transform(
self, source_xml: Path, xsl_stylesheet: Path, output_fo: Path, xslt_params: dict[str, str]
) -> tuple[bool, str]:
"""
Führt eine XSLT-Transformation mit einem Worker aus dem Pool aus.
Args:
source_xml: Pfad zur XML-Eingabedatei
xsl_stylesheet: Pfad zur XSL-Stylesheet-Datei
output_fo: Pfad zur FO-Ausgabedatei
xslt_params: Dictionary mit XSLT-Parametern
Returns:
tuple[bool, str]: (Erfolg, Fehlermeldung/Info)
"""
worker_idx = self._acquire_worker()
try:
worker = self.workers[worker_idx]
if worker.poll() is not None:
stderr_content = self._read_stderr_log(worker_idx)
error_msg = (
f"S9Api Worker {worker_idx} ist beendet (Exit: {worker.returncode})\nstderr:\n{stderr_content}"
)
logger.error(error_msg)
return False, error_msg
params_str = "|||".join([f"{k}={v}" for k, v in xslt_params.items()])
job = f"{source_xml}\t{xsl_stylesheet}\t{output_fo}\t{params_str}\n"
logger.debug(f"Sende Job an S9Api Worker {worker_idx}: {source_xml.name}")
worker.stdin.write(job)
worker.stdin.flush()
response = worker.stdout.readline().strip()
logger.debug(f"S9Api Worker {worker_idx} Antwort: '{response}'")
if response == "OK":
return True, "Erfolgreich"
elif response.startswith("ERROR:"):
return False, f"Saxon-Fehler (s9api): {response[6:].strip()}"
elif not response:
stderr_content = self._read_stderr_log(worker_idx, tail=500)
return False, f"S9Api Worker {worker_idx} crashed (keine Antwort)\nstderr:\n{stderr_content}"
else:
return False, f"Unerwartete Antwort: {response}"
except Exception as e:
logger.error(f"Fehler bei S9Api Worker {worker_idx}: {e}")
return False, f"Worker-Fehler: {str(e)}"
finally:
self.worker_locks[worker_idx].release()