Performance: 4x schnellere XSLT-Transformationen durch Worker-Pool

Problem: 82 XML-Dateien brauchten 160 Sekunden (JVM-Startup-Overhead)

Lösung: Persistente JVM-Worker-Prozesse mit JAXP Transformer API
- Saxon Worker Pool mit N persistenten JVM-Prozessen
- Eliminiert JVM-Startup und Classpath-Scanning bei jedem Job
- Parallele Verarbeitung mit ThreadPoolExecutor
- JAXP Transformer API (javax.xml.transform) - stabil, kein System.exit()
- Konfigurierbare Worker-Anzahl über Performance-Menü

Ergebnis: 82 Dateien in 40 Sekunden (4x Speedup, ~0.49s pro Datei)

Zusätzliche Verbesserungen:
- Dual-Logging (Datei + Konsole) mit Timestamps
- Worker-stderr-Logs in Projektverzeichnis/temp/
- Umfangreiche Debug-Ausgaben für Fehlerdiagnose
- Robuste Fehlerbehandlung mit ErrorListener

Technische Details:
- SaxonWorkerPool: Verwaltet N Worker-Prozesse
- JAXP statt Transform.main() (kein System.exit!)
- Worker-Locks für thread-sichere Job-Verteilung
- Graceful Shutdown mit EXIT-Befehl
- Fallback auf subprocess bei Pool-Fehlern

Dateien:
- src/saxon_pool.py (NEU): Worker-Pool-Implementation
- src/transform.py: Integration mit Worker-Pool
- src/ui/MainWindow.py: Pool-Initialisierung, Performance-Menü
- src/conf.py: max_workers Einstellung
- src/main.py: Dual-Logging

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
This commit is contained in:
2025-12-28 16:46:39 +01:00
parent 055428e8cf
commit d0cdcd6432
5 changed files with 719 additions and 58 deletions
+4 -2
View File
@@ -72,6 +72,7 @@ class SSLMode(str, Enum):
VERIFY_CA = "verify-ca" VERIFY_CA = "verify-ca"
VERIFY_FULL = "verify-full" VERIFY_FULL = "verify-full"
class PostgreSqlDb(BaseModel): class PostgreSqlDb(BaseModel):
id: int id: int
name: str name: str
@@ -141,6 +142,7 @@ class AppSettings(BaseSettings):
pdf_projects: list[Project] = [] pdf_projects: list[Project] = []
postgresql_dbs: list[PostgreSqlDb] = [] postgresql_dbs: list[PostgreSqlDb] = []
theme: str | None = None theme: str | None = None
max_workers: int = 8 # Anzahl paralleler Worker für Transformationen (Standard: 8)
# UI-Zustand # UI-Zustand
window_geometry: tuple[int, int, int, int] | None = None # (x, y, width, height) window_geometry: tuple[int, int, int, int] | None = None # (x, y, width, height)
@@ -210,8 +212,8 @@ class ProjectData(BaseModel):
def readSettings(cls, project_dir: Path): def readSettings(cls, project_dir: Path):
# Explizit UTF-8 Encoding verwenden # Explizit UTF-8 Encoding verwenden
project_yaml_path = project_dir / "project.yaml" project_yaml_path = project_dir / "project.yaml"
with open(project_yaml_path, 'r', encoding='utf-8') as f: with open(project_yaml_path, "r", encoding="utf-8") as f:
yaml = YAML(typ='safe') yaml = YAML(typ="safe")
yaml_data = yaml.load(f) yaml_data = yaml.load(f)
return cls.model_validate(yaml_data) return cls.model_validate(yaml_data)
+33 -6
View File
@@ -10,12 +10,39 @@ from conf import app_settings
def main(): def main():
"""Haupteinstiegspunkt der Anwendung.""" """Haupteinstiegspunkt der Anwendung."""
# Logging konfigurieren # Logging konfigurieren - sowohl Datei als auch Konsole
logging.basicConfig( from datetime import datetime
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', # Log-Verzeichnis erstellen (im selben Verzeichnis wie config.json)
datefmt='%H:%M:%S' from conf import config_path
)
log_dir = config_path.parent / "logs"
log_dir.mkdir(exist_ok=True)
# Log-Dateiname mit Timestamp
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
log_file = log_dir / f"documentor_{timestamp}.log"
# Root-Logger konfigurieren
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
# Formatter für alle Handler
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s", datefmt="%H:%M:%S")
# Handler 1: Datei (alles ab DEBUG)
file_handler = logging.FileHandler(log_file, encoding="utf-8")
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
# Handler 2: Konsole (alles ab INFO)
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(logging.INFO)
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
logging.info(f"Logging initialisiert: {log_file}")
# QApplication-Instanz erstellen # QApplication-Instanz erstellen
app = QApplication(sys.argv) app = QApplication(sys.argv)
+442
View File
@@ -0,0 +1,442 @@
"""
Saxon Worker Pool - Persistente JVM-Prozesse für schnelle XSLT-Transformationen.
Eliminiert JVM-Startup-Overhead durch Vorinitialisierung von N Worker-Prozessen.
Jeder Worker läuft als Daemon und verarbeitet mehrere Transformationen nacheinander.
"""
import logging
import subprocess
import threading
from pathlib import Path
from queue import Queue
from typing import Optional
import tempfile
logger = logging.getLogger(__name__)
# Java-Worker-Code (wird zur Laufzeit kompiliert)
SAXON_WORKER_JAVA = """
import javax.xml.transform.*;
import javax.xml.transform.stream.*;
import java.io.*;
import java.util.*;
public class SaxonWorker {
public static void main(String[] args) {
BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
String line;
// Create TransformerFactory once and reuse
TransformerFactory factory = TransformerFactory.newInstance();
System.err.println("SaxonWorker started and ready (using JAXP Transformer API)");
System.err.flush();
try {
while ((line = reader.readLine()) != null) {
System.err.println("DEBUG: Received line: " + line.substring(0, Math.min(100, line.length())));
System.err.flush();
if ("EXIT".equals(line.trim())) {
System.err.println("SaxonWorker exiting");
break;
}
try {
// Parse job
System.err.println("DEBUG: Parsing job...");
System.err.flush();
String[] parts = line.split("\\t");
System.err.println("DEBUG: Parts count: " + parts.length);
System.err.flush();
if (parts.length < 3) {
System.out.println("ERROR: Invalid job format");
System.out.flush();
continue;
}
String sourceXml = parts[0];
String xslStylesheet = parts[1];
String outputFo = parts[2];
System.err.println("DEBUG: Creating transformer from stylesheet...");
System.err.flush();
// Create Source and Result objects
StreamSource xslSource = new StreamSource(new File(xslStylesheet));
StreamSource xmlSource = new StreamSource(new File(sourceXml));
StreamResult result = new StreamResult(new File(outputFo));
System.err.println("DEBUG: Compiling stylesheet...");
System.err.flush();
// Create transformer from stylesheet
Transformer transformer = factory.newTransformer(xslSource);
// Set parameters if present
if (parts.length > 3 && !parts[3].isEmpty()) {
String[] params = parts[3].split("\\\\|\\\\|\\\\|");
for (String param : params) {
if (!param.isEmpty() && param.contains("=")) {
String[] kv = param.split("=", 2);
transformer.setParameter(kv[0], kv[1]);
System.err.println("DEBUG: Set parameter: " + kv[0] + " = " + kv[1]);
}
}
System.err.flush();
}
System.err.println("DEBUG: Running transformation...");
System.err.flush();
// Capture errors via ErrorListener
final StringBuilder errors = new StringBuilder();
transformer.setErrorListener(new ErrorListener() {
@Override
public void warning(TransformerException e) {
errors.append("WARNING: ").append(e.getMessage()).append("\\n");
}
@Override
public void error(TransformerException e) {
errors.append("ERROR: ").append(e.getMessage()).append("\\n");
}
@Override
public void fatalError(TransformerException e) throws TransformerException {
errors.append("FATAL: ").append(e.getMessage()).append("\\n");
throw e;
}
});
// Run transformation
transformer.transform(xmlSource, result);
System.err.println("DEBUG: Transformation completed");
System.err.flush();
// Check for errors
if (errors.length() > 0) {
System.out.println("ERROR: " + errors.toString().trim());
} else {
System.out.println("OK");
}
System.out.flush();
} catch (TransformerException e) {
System.err.println("DEBUG: Transformer exception: " + e.getClass().getName());
System.err.flush();
e.printStackTrace(System.err);
String errorMsg = e.getMessage();
if (errorMsg == null || errorMsg.isEmpty()) {
errorMsg = e.getClass().getSimpleName();
}
System.out.println("ERROR: " + errorMsg);
System.out.flush();
} catch (Exception e) {
System.err.println("DEBUG: Job processing exception: " + e.getClass().getName());
System.err.flush();
e.printStackTrace(System.err);
System.out.println("ERROR: " + (e.getMessage() != null ? e.getMessage() : e.getClass().getName()));
System.out.flush();
}
}
} catch (IOException e) {
System.err.println("SaxonWorker I/O error: " + e.getMessage());
e.printStackTrace(System.err);
}
}
}
"""
class SaxonWorkerPool:
"""
Pool von lang-laufenden JVM-Prozessen für Saxon-Transformationen.
Eliminiert JVM-Startup-Overhead durch Wiederverwendung von N Worker-Prozessen.
"""
def __init__(
self,
num_workers: int,
java_vm_path: Path,
saxon_jar_path: Path,
classpath_cache: dict[Path, str],
log_dir: Optional[Path] = None,
):
"""
Initialisiert den Saxon-Worker-Pool.
Args:
num_workers: Anzahl der Worker-Prozesse
java_vm_path: Pfad zur Java VM Binary
saxon_jar_path: Pfad zur Saxon JAR-Datei
classpath_cache: Cache für Saxon-Classpaths
log_dir: Optionales Verzeichnis für Worker-Logs (Standard: temp_dir/temp)
"""
self.num_workers = num_workers
self.java_vm_path = java_vm_path
self.saxon_jar_path = saxon_jar_path
self.classpath_cache = classpath_cache
self.log_dir = log_dir
# Worker-Prozesse und Queues
self.workers: list[subprocess.Popen] = []
self.job_queue: Queue = Queue()
self.result_queue: Queue = Queue()
self.worker_locks: list[threading.Lock] = []
# Temporäres Verzeichnis für kompilierte Java-Klasse
self.temp_dir: Optional[Path] = None
self.worker_class_path: Optional[Path] = None
self.worker_log_dir: Optional[Path] = None
# Initialisierung
self._compile_worker_class()
self._start_workers()
logger.info(f"SaxonWorkerPool initialisiert mit {num_workers} Workern")
def _compile_worker_class(self):
"""Kompiliert die SaxonWorker-Java-Klasse."""
try:
# Erstelle temporäres Verzeichnis
self.temp_dir = Path(tempfile.mkdtemp(prefix="saxon_worker_"))
# Schreibe Java-Quellcode
java_file = self.temp_dir / "SaxonWorker.java"
java_file.write_text(SAXON_WORKER_JAVA, encoding="utf-8")
# Hole Classpath
saxon_dir = self.saxon_jar_path.parent
if saxon_dir in self.classpath_cache:
classpath = self.classpath_cache[saxon_dir]
else:
# Fallback: Baue Classpath neu
import glob
import sys
all_jars = glob.glob(str(saxon_dir / "*.jar"))
lib_dir = saxon_dir / "lib"
if lib_dir.exists():
all_jars.extend(glob.glob(str(lib_dir / "*.jar")))
classpath_separator = ";" if sys.platform == "win32" else ":"
classpath = classpath_separator.join(all_jars)
# Kompiliere Java-Klasse
javac_cmd = [str(self.java_vm_path).replace("java", "javac"), "-cp", classpath, str(java_file)]
logger.debug(f"Kompiliere SaxonWorker: {' '.join(javac_cmd)}")
result = subprocess.run(javac_cmd, capture_output=True, text=True, timeout=30)
if result.returncode != 0:
raise RuntimeError(f"Java-Kompilierung fehlgeschlagen: {result.stderr}")
self.worker_class_path = self.temp_dir
logger.info(f"SaxonWorker erfolgreich kompiliert: {self.temp_dir}")
except Exception as e:
logger.error(f"Fehler beim Kompilieren von SaxonWorker: {e}")
raise
def _start_workers(self):
"""Startet N Worker-Prozesse."""
# Hole Classpath
saxon_dir = self.saxon_jar_path.parent
classpath = self.classpath_cache.get(saxon_dir, "")
# Füge Worker-Classpath hinzu
import sys
classpath_separator = ";" if sys.platform == "win32" else ":"
full_classpath = str(self.worker_class_path) + classpath_separator + classpath
# Bestimme Log-Verzeichnis
self.worker_log_dir = self.log_dir if self.log_dir else self.temp_dir
if self.log_dir:
self.worker_log_dir.mkdir(parents=True, exist_ok=True)
for i in range(self.num_workers):
try:
# Starte JVM-Prozess mit SaxonWorker
cmd = [str(self.java_vm_path), "-cp", full_classpath, "SaxonWorker"]
# Öffne stderr-Log-Datei für diesen Worker
stderr_log = self.worker_log_dir / f"worker_{i}_stderr.log"
stderr_file = open(stderr_log, "w", encoding="utf-8")
process = subprocess.Popen(
cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=stderr_file, # Redirect stderr to file
text=True,
bufsize=1, # Line buffered
)
self.workers.append(process)
self.worker_locks.append(threading.Lock())
logger.debug(f"Worker {i} gestartet (PID: {process.pid}, stderr: {stderr_log})")
# Warte kurz damit Worker initialisieren kann
import time
time.sleep(0.1)
# Prüfe ob Worker noch läuft
if process.poll() is not None:
# Worker ist bereits beendet - Fehler!
stderr_file.close()
with open(stderr_log, "r") as f:
stderr_content = f.read()
raise RuntimeError(
f"Worker {i} ist sofort beendet (Exit Code: {process.returncode})\nstderr:\n{stderr_content}"
)
except Exception as e:
logger.error(f"Fehler beim Starten von Worker {i}: {e}")
raise
logger.info(f"{len(self.workers)} Saxon-Worker erfolgreich gestartet")
def transform(
self, source_xml: Path, xsl_stylesheet: Path, output_fo: Path, xslt_params: dict[str, str]
) -> tuple[bool, str]:
"""
Führt eine XSLT-Transformation mit einem Worker aus dem Pool aus.
Args:
source_xml: Pfad zur XML-Eingabedatei
xsl_stylesheet: Pfad zur XSL-Stylesheet-Datei
output_fo: Pfad zur FO-Ausgabedatei
xslt_params: Dictionary mit XSLT-Parametern
Returns:
tuple[bool, str]: (Erfolg, Fehlermeldung/Info)
"""
# Finde freien Worker
worker_idx = None
for i, lock in enumerate(self.worker_locks):
if lock.acquire(blocking=False):
worker_idx = i
break
if worker_idx is None:
# Kein freier Worker, warte auf ersten verfügbaren
for i, lock in enumerate(self.worker_locks):
lock.acquire()
worker_idx = i
break
try:
worker = self.workers[worker_idx]
# Prüfe ob Worker noch läuft
if worker.poll() is not None:
# Worker ist tot!
stderr_log = self.worker_log_dir / f"worker_{worker_idx}_stderr.log"
try:
with open(stderr_log, "r") as f:
stderr_content = f.read()
error_msg = (
f"Worker {worker_idx} ist beendet (Exit: {worker.returncode})\nstderr:\n{stderr_content}"
)
except Exception:
error_msg = f"Worker {worker_idx} ist beendet (Exit: {worker.returncode})"
logger.error(error_msg)
return False, error_msg
# Formatiere Parameter
params_str = "|||".join([f"{key}={value}" for key, value in xslt_params.items()])
# Erstelle Job-String (Tab-separated)
job = f"{source_xml}\t{xsl_stylesheet}\t{output_fo}\t{params_str}\n"
logger.debug(f"Sende Job an Worker {worker_idx}: {source_xml.name}")
# Sende Job an Worker
worker.stdin.write(job)
worker.stdin.flush()
# Warte auf Antwort
response = worker.stdout.readline().strip()
logger.debug(f"Worker {worker_idx} Antwort: '{response}'")
if response == "OK":
return True, "Erfolgreich"
elif response.startswith("ERROR:"):
error_msg = response[6:].strip()
return False, f"Saxon-Fehler: {error_msg}"
else:
# Leere Antwort bedeutet Worker ist crashed
if not response:
stderr_log = self.worker_log_dir / f"worker_{worker_idx}_stderr.log"
try:
with open(stderr_log, "r") as f:
stderr_content = f.read()[-500:] # Letzte 500 Zeichen
return False, f"Worker {worker_idx} crashed (keine Antwort)\nstderr:\n{stderr_content}"
except Exception:
return False, f"Worker {worker_idx} crashed (keine Antwort)"
return False, f"Unerwartete Antwort: {response}"
except Exception as e:
logger.error(f"Fehler bei Worker {worker_idx}: {e}")
return False, f"Worker-Fehler: {str(e)}"
finally:
# Gebe Worker-Lock frei
self.worker_locks[worker_idx].release()
def shutdown(self):
"""Beendet alle Worker-Prozesse sauber."""
logger.info("Beende Saxon-Worker-Pool...")
for i, worker in enumerate(self.workers):
try:
# Sende EXIT-Befehl
if worker.stdin and not worker.stdin.closed:
worker.stdin.write("EXIT\n")
worker.stdin.flush()
# Warte auf Beendigung (max 2 Sekunden)
worker.wait(timeout=2)
logger.debug(f"Worker {i} beendet")
except subprocess.TimeoutExpired:
# Force kill falls nötig
worker.kill()
logger.warning(f"Worker {i} musste gekillt werden")
except Exception as e:
logger.error(f"Fehler beim Beenden von Worker {i}: {e}")
# Lösche temporäres Verzeichnis
if self.temp_dir and self.temp_dir.exists():
try:
import shutil
shutil.rmtree(self.temp_dir)
logger.debug(f"Temporäres Verzeichnis gelöscht: {self.temp_dir}")
except Exception as e:
logger.warning(f"Konnte temporäres Verzeichnis nicht löschen: {e}")
logger.info("Saxon-Worker-Pool beendet")
def __enter__(self):
"""Context manager entry."""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit."""
self.shutdown()
+70 -19
View File
@@ -11,10 +11,26 @@ import logging
import subprocess import subprocess
from pathlib import Path from pathlib import Path
from datetime import datetime from datetime import datetime
from typing import Any from typing import Any, Optional, TYPE_CHECKING
if TYPE_CHECKING:
from saxon_pool import SaxonWorkerPool
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# Globaler Saxon-Worker-Pool (wird von MainWindow initialisiert)
_saxon_worker_pool: Optional["SaxonWorkerPool"] = None
def set_saxon_worker_pool(pool: Optional["SaxonWorkerPool"]):
"""Setzt den globalen Saxon-Worker-Pool."""
global _saxon_worker_pool
_saxon_worker_pool = pool
if pool:
logger.info(f"Saxon-Worker-Pool aktiviert mit {pool.num_workers} Workern")
else:
logger.info("Saxon-Worker-Pool deaktiviert (Fallback auf subprocess)")
class TransformationJob: class TransformationJob:
""" """
@@ -23,6 +39,9 @@ class TransformationJob:
Ähnlich zur TestFall-Klasse in validate-xls.py, aber für DocuMentor angepasst. Ähnlich zur TestFall-Klasse in validate-xls.py, aber für DocuMentor angepasst.
""" """
# Klassenweiter Cache für Saxon-Classpaths (Performance-Optimierung)
_classpath_cache: dict[Path, str] = {}
def __init__( def __init__(
self, self,
project_dir: Path, project_dir: Path,
@@ -161,30 +180,63 @@ class TransformationJob:
logger.error(error_msg) logger.error(error_msg)
return False, error_msg return False, error_msg
logger.info(f"Starte Saxon-Transformation: {self.xml_file.name}")
# Versuche zuerst den Worker-Pool zu nutzen (schneller!)
global _saxon_worker_pool
if _saxon_worker_pool:
try:
success, message = _saxon_worker_pool.transform(
source_xml=xml_abs,
xsl_stylesheet=self.xsl_file,
output_fo=self.temp_fo,
xslt_params=self.xslt_params,
)
if success:
logger.info(f"Saxon-Transformation erfolgreich (Worker-Pool): {self.xml_file.name}")
else:
logger.error(f"Saxon-Transformation fehlgeschlagen (Worker-Pool): {message}")
return success, message
except Exception as e:
logger.warning(f"Worker-Pool-Fehler, Fallback auf subprocess: {e}")
# Fallback auf subprocess unten
# Fallback: Traditionelle subprocess-Methode (langsamer, aber robuster)
# XSLT-Parameter formatieren # XSLT-Parameter formatieren
params = [f"{key}={value}" for key, value in self.xslt_params.items()] params = [f"{key}={value}" for key, value in self.xslt_params.items()]
# Sammle alle JAR-Dateien im Saxon-Verzeichnis für den Classpath # Hole Classpath aus Cache oder erstelle ihn
import glob
saxon_dir = self.saxon_jar_path.parent saxon_dir = self.saxon_jar_path.parent
all_jars = glob.glob(str(saxon_dir / "*.jar")) if saxon_dir not in TransformationJob._classpath_cache:
# Sammle alle JAR-Dateien im Saxon-Verzeichnis für den Classpath
import glob
# Sammle auch alle JARs aus dem lib-Unterordner (z.B. xmlresolver) all_jars = glob.glob(str(saxon_dir / "*.jar"))
lib_dir = saxon_dir / "lib"
if lib_dir.exists() and lib_dir.is_dir():
lib_jars = glob.glob(str(lib_dir / "*.jar"))
all_jars.extend(lib_jars)
logger.debug(f"Zusätzliche JARs aus lib-Verzeichnis gefunden: {len(lib_jars)}")
# Verwende alle JARs im Classpath (getrennt durch : auf Linux/Mac, ; auf Windows) # Sammle auch alle JARs aus dem lib-Unterordner (z.B. xmlresolver)
import sys lib_dir = saxon_dir / "lib"
if lib_dir.exists() and lib_dir.is_dir():
lib_jars = glob.glob(str(lib_dir / "*.jar"))
all_jars.extend(lib_jars)
logger.debug(f"Zusätzliche JARs aus lib-Verzeichnis gefunden: {len(lib_jars)}")
classpath_separator = ";" if sys.platform == "win32" else ":" # Verwende alle JARs im Classpath (getrennt durch : auf Linux/Mac, ; auf Windows)
classpath = classpath_separator.join(all_jars) import sys
classpath_separator = ";" if sys.platform == "win32" else ":"
classpath = classpath_separator.join(all_jars)
# Cache den Classpath für zukünftige Jobs
TransformationJob._classpath_cache[saxon_dir] = classpath
logger.debug(f"Classpath für {saxon_dir} gecacht")
else:
classpath = TransformationJob._classpath_cache[saxon_dir]
logger.debug("Classpath aus Cache verwendet")
# Saxon-Kommandozeile # Saxon-Kommandozeile
# Verwende -cp mit allen JARs und rufe Transform-Main direkt auf
cmd_line = [ cmd_line = [
str(self.java_vm_path), str(self.java_vm_path),
"-cp", "-cp",
@@ -196,8 +248,7 @@ class TransformationJob:
*params, *params,
] ]
logger.info(f"Starte Saxon-Transformation: {self.xml_file.name}") logger.debug(f"Kommandozeile (subprocess fallback): {' '.join(cmd_line)}")
logger.debug(f"Kommandozeile: {' '.join(cmd_line)}")
try: try:
result = subprocess.run( result = subprocess.run(
@@ -214,7 +265,7 @@ class TransformationJob:
logger.debug(f"Saxon StdErr:\n{result.stderr}") logger.debug(f"Saxon StdErr:\n{result.stderr}")
if result.returncode == 0: if result.returncode == 0:
logger.info(f"Saxon-Transformation erfolgreich: {self.xml_file.name}") logger.info(f"Saxon-Transformation erfolgreich (subprocess): {self.xml_file.name}")
return True, "Erfolgreich" return True, "Erfolgreich"
else: else:
error_msg = ( error_msg = (
+165 -26
View File
@@ -29,7 +29,8 @@ from ui.TreeNodeEditDialog import TreeNodeEditDialog
from ui.XslFileEditDialog import XslFileEditDialog from ui.XslFileEditDialog import XslFileEditDialog
from ui.XmlToXslAssignDialog import XmlToXslAssignDialog from ui.XmlToXslAssignDialog import XmlToXslAssignDialog
from conf import app_settings, Project, ProjectData, TreeNode, XslFile, XmlFile from conf import app_settings, Project, ProjectData, TreeNode, XslFile, XmlFile
from transform import TransformationJob from transform import TransformationJob, set_saxon_worker_pool
from saxon_pool import SaxonWorkerPool
from pathlib import Path from pathlib import Path
@@ -390,48 +391,79 @@ class TransformationThread(QThread):
job_error = Signal(str, str, str) # xml_file_name, xsl_id_str, error_message job_error = Signal(str, str, str) # xml_file_name, xsl_id_str, error_message
all_jobs_finished = Signal(int, int, float) # successful_count, total_count, total_duration all_jobs_finished = Signal(int, int, float) # successful_count, total_count, total_duration
def __init__(self, jobs: list[TransformationJob], force: bool = False): def __init__(self, jobs: list[TransformationJob], force: bool = False, max_workers: int = 8):
""" """
Initialisiert den Transformations-Thread. Initialisiert den Transformations-Thread.
Args: Args:
jobs: Liste der TransformationJob-Objekte jobs: Liste der TransformationJob-Objekte
force: Wenn True, werden alle Jobs ausgeführt (ignoriert Up-to-Date) force: Wenn True, werden alle Jobs ausgeführt (ignoriert Up-to-Date)
max_workers: Maximale Anzahl paralleler Worker (Standard: 8)
""" """
super().__init__() super().__init__()
self.jobs = jobs self.jobs = jobs
self.force = force self.force = force
self.max_workers = max_workers
self.successful_count = 0 self.successful_count = 0
def _process_single_job(self, job: TransformationJob) -> dict:
"""
Verarbeitet einen einzelnen Transformations-Job (Thread-safe).
Args:
job: Der zu verarbeitende TransformationJob
Returns:
dict: Ergebnis-Dictionary des Jobs
"""
try:
# Sende Start-Signal mit XSL-ID
xsl_id_str = "_".join(str(x) for x in job.xsl_id) if job.xsl_id else ""
self.job_started.emit(str(job.xml_file), xsl_id_str)
# Führe Transformations-Pipeline aus
result = job.run_full_pipeline(force=self.force)
# Sende Abschluss-Signal
self.job_finished.emit(result)
return result
except Exception as e:
error_msg = f"Unerwarteter Fehler bei Transformation: {str(e)}"
logger.error(error_msg)
xsl_id_str = "_".join(str(x) for x in job.xsl_id) if job.xsl_id else ""
self.job_error.emit(str(job.xml_file), xsl_id_str, error_msg)
return {"success": False, "error": error_msg}
def run(self): def run(self):
""" """
Führt alle Transformations-Jobs sequenziell aus. Führt alle Transformations-Jobs parallel aus mit ThreadPoolExecutor.
""" """
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime from datetime import datetime
import threading
start_time = datetime.now() start_time = datetime.now()
logger.info(f"Starte Transformation von {len(self.jobs)} Jobs") logger.info(f"Starte parallele Transformation von {len(self.jobs)} Jobs mit {self.max_workers} Workern")
for job in self.jobs: # Thread-sicherer Counter
try: successful_lock = threading.Lock()
# Sende Start-Signal mit XSL-ID
xsl_id_str = "_".join(str(x) for x in job.xsl_id) if job.xsl_id else ""
self.job_started.emit(str(job.xml_file), xsl_id_str)
# Führe Transformations-Pipeline aus # Verwende ThreadPoolExecutor für parallele Verarbeitung
result = job.run_full_pipeline(force=self.force) with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
# Starte alle Jobs
future_to_job = {executor.submit(self._process_single_job, job): job for job in self.jobs}
# Sende Abschluss-Signal # Warte auf Abschluss und sammle Ergebnisse
self.job_finished.emit(result) for future in as_completed(future_to_job):
try:
if result["success"]: result = future.result()
self.successful_count += 1 if result.get("success", False):
with successful_lock:
except Exception as e: self.successful_count += 1
error_msg = f"Unerwarteter Fehler bei Transformation: {str(e)}" except Exception as e:
logger.error(error_msg) logger.error(f"Fehler beim Verarbeiten des Future: {e}")
xsl_id_str = "_".join(str(x) for x in job.xsl_id) if job.xsl_id else ""
self.job_error.emit(str(job.xml_file), xsl_id_str, error_msg)
# Berechne Gesamtdauer # Berechne Gesamtdauer
total_duration = (datetime.now() - start_time).total_seconds() total_duration = (datetime.now() - start_time).total_seconds()
@@ -439,7 +471,8 @@ class TransformationThread(QThread):
# Sende Abschluss-Signal für alle Jobs mit Gesamtdauer # Sende Abschluss-Signal für alle Jobs mit Gesamtdauer
self.all_jobs_finished.emit(self.successful_count, len(self.jobs), total_duration) self.all_jobs_finished.emit(self.successful_count, len(self.jobs), total_duration)
logger.info( logger.info(
f"Transformation abgeschlossen: {self.successful_count}/{len(self.jobs)} erfolgreich ({total_duration:.2f}s)" f"Transformation abgeschlossen: {self.successful_count}/{len(self.jobs)} erfolgreich ({total_duration:.2f}s) "
f"[{len(self.jobs) / total_duration:.2f} Jobs/s mit {self.max_workers} Workern]"
) )
@@ -517,6 +550,9 @@ class MainWindow(QMainWindow):
# Vorhandene Projekte-Menü initialisieren # Vorhandene Projekte-Menü initialisieren
self._setup_projects_menu() self._setup_projects_menu()
# Performance-Einstellungen-Menü initialisieren
self._setup_performance_menu()
# #
if theme := app_settings.theme: if theme := app_settings.theme:
self.change_theme(theme) self.change_theme(theme)
@@ -612,6 +648,47 @@ class MainWindow(QMainWindow):
logger.info(f"Projekte-Menü initialisiert mit {len(app_settings.pdf_projects)} Projekten") logger.info(f"Projekte-Menü initialisiert mit {len(app_settings.pdf_projects)} Projekten")
def _setup_performance_menu(self):
"""Fügt ein Menü-Item für Performance-Einstellungen hinzu."""
# Füge Separator vor der Performance-Einstellung hinzu
self.ui.menuProjekt.addSeparator()
# Erstelle Aktion für Performance-Einstellungen
performance_action = QAction("Performance-Einstellungen...", self)
performance_action.setToolTip(f"Parallele Worker: {app_settings.max_workers}")
performance_action.triggered.connect(self._open_performance_settings)
# Füge die Aktion zum Projekt-Menü hinzu
self.ui.menuProjekt.addAction(performance_action)
logger.debug(f"Performance-Menü initialisiert (max_workers={app_settings.max_workers})")
def _open_performance_settings(self):
"""Öffnet einen Dialog für Performance-Einstellungen."""
from PySide6.QtWidgets import QInputDialog
current_workers = app_settings.max_workers
new_workers, ok = QInputDialog.getInt(
self,
"Performance-Einstellungen",
"Anzahl paralleler Worker für Transformationen:",
current_workers, # value
1, # minValue
32, # maxValue
1, # step
)
if ok and new_workers != current_workers:
app_settings.max_workers = new_workers
app_settings.save()
logger.info(f"max_workers geändert: {current_workers}{new_workers}")
QMessageBox.information(
self,
"Einstellungen gespeichert",
f"Anzahl paralleler Worker wurde auf {new_workers} gesetzt.\n\n"
f"Die Änderung wird bei der nächsten Transformation wirksam.",
)
def open_existing_project(self, project: Project): def open_existing_project(self, project: Project):
""" """
Öffnet ein vorhandenes Projekt. Öffnet ein vorhandenes Projekt.
@@ -647,6 +724,9 @@ class MainWindow(QMainWindow):
# Starte Hash-Berechnung für alle XML-Dateien # Starte Hash-Berechnung für alle XML-Dateien
self._start_xml_hash_calculation() self._start_xml_hash_calculation()
# Initialisiere Saxon-Worker-Pool für schnellere Transformationen
self._initialize_saxon_worker_pool()
except Exception as e: except Exception as e:
logger.error(f"Fehler beim Laden des Projekts '{project.name}': {e}") logger.error(f"Fehler beim Laden des Projekts '{project.name}': {e}")
# Fallback: Erstelle Standard-Einstellungen # Fallback: Erstelle Standard-Einstellungen
@@ -658,6 +738,62 @@ class MainWindow(QMainWindow):
except Exception as fallback_error: except Exception as fallback_error:
logger.error(f"Fehler beim Erstellen der Fallback-Einstellungen: {fallback_error}") logger.error(f"Fehler beim Erstellen der Fallback-Einstellungen: {fallback_error}")
def _initialize_saxon_worker_pool(self):
"""Initialisiert den Saxon-Worker-Pool für schnelle Transformationen."""
try:
# Shutdown vorherigen Pool falls vorhanden
self._shutdown_saxon_worker_pool()
if not self.project:
logger.warning("Kein Projekt geladen, Saxon-Worker-Pool nicht initialisiert")
return
# Hole Tool-Konfigurationen
java_vm = next((vm for vm in app_settings.java_vms if vm.id == self.project.java_vm_id), None)
saxon_jar = next((jar for jar in app_settings.saxon_jars if jar.id == self.project.saxon_jar_id), None)
if not java_vm or not saxon_jar:
logger.warning("Java VM oder Saxon JAR nicht gefunden, Pool nicht initialisiert")
return
# Erstelle Worker-Pool
num_workers = app_settings.max_workers
log_dir = self.project.project_dir / "temp"
pool = SaxonWorkerPool(
num_workers=num_workers,
java_vm_path=java_vm.path_to_binary_file,
saxon_jar_path=saxon_jar.path_to_jar_file,
classpath_cache=TransformationJob._classpath_cache,
log_dir=log_dir,
)
# Setze globalen Pool
set_saxon_worker_pool(pool)
logger.info(
f"Saxon-Worker-Pool initialisiert: {num_workers} Worker "
f"(erwartet: {num_workers}x schneller für Saxon-Transformationen)"
)
except Exception as e:
logger.error(f"Fehler beim Initialisieren des Saxon-Worker-Pools: {e}")
# Kein Pool ist OK - Fallback auf subprocess
def _shutdown_saxon_worker_pool(self):
"""Beendet den Saxon-Worker-Pool sauber."""
try:
# Importiere transform um Zugriff auf globalen Pool zu haben
import transform
if transform._saxon_worker_pool:
logger.info("Beende Saxon-Worker-Pool...")
transform._saxon_worker_pool.shutdown()
set_saxon_worker_pool(None)
logger.info("Saxon-Worker-Pool beendet")
except Exception as e:
logger.error(f"Fehler beim Beenden des Saxon-Worker-Pools: {e}")
def change_theme(self, theme_name): def change_theme(self, theme_name):
""" """
Wechselt das Theme der Anwendung. Wechselt das Theme der Anwendung.
@@ -2900,7 +3036,7 @@ class MainWindow(QMainWindow):
""" """
# Erstelle Zusammenfassungstext # Erstelle Zusammenfassungstext
summary_lines = [] summary_lines = []
summary_lines.append(f"Verarbeitung abgeschlossen:\n") summary_lines.append("Verarbeitung abgeschlossen:\n")
summary_lines.append(f"📊 Gesamt: {stats['total']} Datei(en)") summary_lines.append(f"📊 Gesamt: {stats['total']} Datei(en)")
summary_lines.append(f"✓ Verarbeitet: {stats['processed']} Datei(en)") summary_lines.append(f"✓ Verarbeitet: {stats['processed']} Datei(en)")
@@ -2917,7 +3053,7 @@ class MainWindow(QMainWindow):
summary_lines.append(f"🚫 Abgebrochen: {stats['cancelled']} Datei(en)") summary_lines.append(f"🚫 Abgebrochen: {stats['cancelled']} Datei(en)")
if stats["renamed_files"]: if stats["renamed_files"]:
summary_lines.append(f"\n📝 Umbenannte Dateien:") summary_lines.append("\n📝 Umbenannte Dateien:")
for renamed in stats["renamed_files"]: for renamed in stats["renamed_files"]:
summary_lines.append(f"{renamed}") summary_lines.append(f"{renamed}")
@@ -3938,7 +4074,7 @@ class MainWindow(QMainWindow):
return return
# Erstelle und konfiguriere Thread # Erstelle und konfiguriere Thread
self.transformation_thread = TransformationThread(jobs, force=force) self.transformation_thread = TransformationThread(jobs, force=force, max_workers=app_settings.max_workers)
# Verbinde Signale # Verbinde Signale
self.transformation_thread.job_started.connect(self._on_transformation_job_started) self.transformation_thread.job_started.connect(self._on_transformation_job_started)
@@ -4537,6 +4673,9 @@ class MainWindow(QMainWindow):
self.transformation_thread.quit() self.transformation_thread.quit()
self.transformation_thread.wait() self.transformation_thread.wait()
# Beende Saxon-Worker-Pool
self._shutdown_saxon_worker_pool()
# PDF-Dokumente schließen ist bei QtPdf automatisch durch Garbage Collection # PDF-Dokumente schließen ist bei QtPdf automatisch durch Garbage Collection
super().closeEvent(event) super().closeEvent(event)