Files

284 lines
11 KiB
Python
Raw Permalink Normal View History

"""
Saxon Worker Pool - Persistente JVM-Prozesse für schnelle XSLT-Transformationen.
Eliminiert JVM-Startup-Overhead durch Vorinitialisierung von N Worker-Prozessen.
Jeder Worker läuft als Daemon und verarbeitet mehrere Transformationen nacheinander.
"""
import logging
from pathlib import Path
from typing import Optional
from worker_pool_base import BaseWorkerPool, build_jar_classpath
logger = logging.getLogger(__name__)
# Java-Worker-Code (wird zur Laufzeit kompiliert)
SAXON_WORKER_JAVA = """
import javax.xml.transform.*;
import javax.xml.transform.stream.*;
import java.io.*;
import java.util.*;
public class SaxonWorker {
public static void main(String[] args) {
BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
String line;
// Create TransformerFactory once and reuse
TransformerFactory factory = TransformerFactory.newInstance();
// Cache für kompilierte Stylesheets (Performance-Optimierung)
Map<String, Templates> templatesCache = new HashMap<>();
System.err.println("SaxonWorker started and ready (using JAXP Transformer API with stylesheet caching)");
System.err.flush();
try {
while ((line = reader.readLine()) != null) {
System.err.println("DEBUG: Received line: " + line.substring(0, Math.min(100, line.length())));
System.err.flush();
if ("EXIT".equals(line.trim())) {
System.err.println("SaxonWorker exiting");
break;
}
try {
// Parse job
System.err.println("DEBUG: Parsing job...");
System.err.flush();
String[] parts = line.split("\\t");
System.err.println("DEBUG: Parts count: " + parts.length);
System.err.flush();
if (parts.length < 3) {
System.out.println("ERROR: Invalid job format");
System.out.flush();
continue;
}
String sourceXml = parts[0];
String xslStylesheet = parts[1];
String outputFo = parts[2];
// Prüfe ob Stylesheet bereits im Cache ist
Templates templates;
if (templatesCache.containsKey(xslStylesheet)) {
templates = templatesCache.get(xslStylesheet);
System.err.println("DEBUG: Using cached stylesheet: " + xslStylesheet);
System.err.flush();
} else {
System.err.println("DEBUG: Compiling and caching stylesheet: " + xslStylesheet);
System.err.flush();
StreamSource xslSource = new StreamSource(new File(xslStylesheet));
templates = factory.newTemplates(xslSource);
templatesCache.put(xslStylesheet, templates);
System.err.println("DEBUG: Stylesheet compiled and cached (cache size: " + templatesCache.size() + ")");
System.err.flush();
}
System.err.println("DEBUG: Creating transformer from cached template...");
System.err.flush();
// Create Source and Result objects
StreamSource xmlSource = new StreamSource(new File(sourceXml));
StreamResult result = new StreamResult(new File(outputFo));
// Create transformer from templates
Transformer transformer = templates.newTransformer();
// Set parameters if present
if (parts.length > 3 && !parts[3].isEmpty()) {
String[] params = parts[3].split("\\\\|\\\\|\\\\|");
for (String param : params) {
if (!param.isEmpty() && param.contains("=")) {
String[] kv = param.split("=", 2);
transformer.setParameter(kv[0], kv[1]);
System.err.println("DEBUG: Set parameter: " + kv[0] + " = " + kv[1]);
}
}
System.err.flush();
}
System.err.println("DEBUG: Running transformation...");
System.err.flush();
// Capture errors via ErrorListener
final StringBuilder errors = new StringBuilder();
transformer.setErrorListener(new ErrorListener() {
@Override
public void warning(TransformerException e) {
errors.append("WARNING: ").append(e.getMessage()).append("\\n");
}
@Override
public void error(TransformerException e) {
errors.append("ERROR: ").append(e.getMessage()).append("\\n");
}
@Override
public void fatalError(TransformerException e) throws TransformerException {
errors.append("FATAL: ").append(e.getMessage()).append("\\n");
throw e;
}
});
// Run transformation
transformer.transform(xmlSource, result);
System.err.println("DEBUG: Transformation completed");
System.err.flush();
// Check for errors
if (errors.length() > 0) {
System.out.println("ERROR: " + errors.toString().trim());
} else {
System.out.println("OK");
}
System.out.flush();
} catch (TransformerException e) {
System.err.println("DEBUG: Transformer exception: " + e.getClass().getName());
System.err.flush();
e.printStackTrace(System.err);
String errorMsg = e.getMessage();
if (errorMsg == null || errorMsg.isEmpty()) {
errorMsg = e.getClass().getSimpleName();
}
System.out.println("ERROR: " + errorMsg);
System.out.flush();
} catch (Exception e) {
System.err.println("DEBUG: Job processing exception: " + e.getClass().getName());
System.err.flush();
e.printStackTrace(System.err);
System.out.println("ERROR: " + (e.getMessage() != null ? e.getMessage() : e.getClass().getName()));
System.out.flush();
}
}
} catch (IOException e) {
System.err.println("SaxonWorker I/O error: " + e.getMessage());
e.printStackTrace(System.err);
}
}
}
"""
class SaxonWorkerPool(BaseWorkerPool):
"""
Pool von lang-laufenden JVM-Prozessen für Saxon-Transformationen (JAXP/XSLT 1.0).
Eliminiert JVM-Startup-Overhead durch Wiederverwendung von N Worker-Prozessen.
"""
def __init__(
self,
num_workers: int,
java_vm_path: Path,
saxon_jar_path: Path,
classpath_cache: dict[Path, str],
log_dir: Optional[Path] = None,
):
super().__init__(num_workers, java_vm_path, log_dir)
self.saxon_jar_path = saxon_jar_path
self.classpath_cache = classpath_cache
self._compile_worker_class()
self._start_workers()
logger.info(f"SaxonWorkerPool initialisiert mit {num_workers} Workern")
# --- Abstrakte Properties ---
@property
def _pool_name(self) -> str:
return "Saxon"
@property
def _java_source_code(self) -> str:
return SAXON_WORKER_JAVA
@property
def _java_class_name(self) -> str:
return "SaxonWorker"
@property
def _temp_dir_prefix(self) -> str:
return "saxon_worker_"
@property
def _worker_init_sleep(self) -> float:
return 0.1
# --- Abstrakte Methoden ---
def _get_classpath(self) -> str:
saxon_dir = self.saxon_jar_path.parent
if saxon_dir not in self.classpath_cache:
self.classpath_cache[saxon_dir] = build_jar_classpath(saxon_dir)
return self.classpath_cache[saxon_dir]
def _build_worker_cmd(self, full_classpath: str) -> list[str]:
return [str(self.java_vm_path), "-cp", full_classpath, "SaxonWorker"]
def _stderr_log_name(self, i: int) -> str:
return f"worker_{i}_stderr.log"
# --- Saxon-spezifische Job-Methode ---
def transform(
self, source_xml: Path, xsl_stylesheet: Path, output_fo: Path, xslt_params: dict[str, str]
) -> tuple[bool, str]:
"""
Führt eine XSLT-Transformation mit einem Worker aus dem Pool aus.
Args:
source_xml: Pfad zur XML-Eingabedatei
xsl_stylesheet: Pfad zur XSL-Stylesheet-Datei
output_fo: Pfad zur FO-Ausgabedatei
xslt_params: Dictionary mit XSLT-Parametern
Returns:
tuple[bool, str]: (Erfolg, Fehlermeldung/Info)
"""
worker_idx = self._acquire_worker()
try:
worker = self.workers[worker_idx]
if worker.poll() is not None:
stderr_content = self._read_stderr_log(worker_idx)
error_msg = f"Worker {worker_idx} ist beendet (Exit: {worker.returncode})\nstderr:\n{stderr_content}"
logger.error(error_msg)
return False, error_msg
params_str = "|||".join([f"{k}={v}" for k, v in xslt_params.items()])
job = f"{source_xml}\t{xsl_stylesheet}\t{output_fo}\t{params_str}\n"
logger.debug(f"Sende Job an Worker {worker_idx}: {source_xml.name}")
worker.stdin.write(job)
worker.stdin.flush()
response = worker.stdout.readline().strip()
logger.debug(f"Worker {worker_idx} Antwort: '{response}'")
if response == "OK":
return True, "Erfolgreich"
elif response.startswith("ERROR:"):
return False, f"Saxon-Fehler: {response[6:].strip()}"
elif not response:
stderr_content = self._read_stderr_log(worker_idx, tail=500)
return False, f"Worker {worker_idx} crashed (keine Antwort)\nstderr:\n{stderr_content}"
else:
return False, f"Unerwartete Antwort: {response}"
except Exception as e:
logger.error(f"Fehler bei Worker {worker_idx}: {e}")
return False, f"Worker-Fehler: {str(e)}"
finally:
self.worker_locks[worker_idx].release()