Files
xsl-validator/src/transform.py
T

600 lines
23 KiB
Python
Raw Normal View History

"""
Transformations-Engine für XSL-FO PDF-Generierung.
Dieses Modul implementiert die Transformations-Pipeline:
1. XML → FO (Saxon XSLT Transformation)
2. FO → PDF (Apache FOP)
3. PDF-Vergleich (diff-pdf)
"""
import logging
import subprocess
from pathlib import Path
from datetime import datetime
from typing import Any, Optional, TYPE_CHECKING
if TYPE_CHECKING:
from saxon_pool import SaxonWorkerPool
from saxon_pool_s9api import SaxonWorkerPoolS9Api
from fop_pool import FopWorkerPool
from xsl_dependencies import XslDependencyGraph
logger = logging.getLogger(__name__)
# Globaler Saxon-Worker-Pool (wird von MainWindow initialisiert)
# Kann entweder JAXP oder s9api Variante sein
_saxon_worker_pool: Optional["SaxonWorkerPool | SaxonWorkerPoolS9Api"] = None
# Globaler FOP-Worker-Pool (wird von MainWindow initialisiert)
_fop_worker_pool: Optional["FopWorkerPool"] = None
def set_saxon_worker_pool(pool: Optional["SaxonWorkerPool | SaxonWorkerPoolS9Api"]):
"""Setzt den globalen Saxon-Worker-Pool."""
global _saxon_worker_pool
_saxon_worker_pool = pool
if pool:
logger.info(f"Saxon-Worker-Pool aktiviert mit {pool.num_workers} Workern")
else:
logger.info("Saxon-Worker-Pool deaktiviert (Fallback auf subprocess)")
def get_saxon_worker_pool() -> Optional["SaxonWorkerPool | SaxonWorkerPoolS9Api"]:
"""Gibt den aktuellen globalen Saxon-Worker-Pool zurück."""
return _saxon_worker_pool
def set_fop_worker_pool(pool: Optional["FopWorkerPool"]):
"""Setzt den globalen FOP-Worker-Pool."""
global _fop_worker_pool
_fop_worker_pool = pool
if pool:
logger.info(f"FOP-Worker-Pool aktiviert mit {pool.num_workers} Workern")
else:
logger.info("FOP-Worker-Pool deaktiviert (Fallback auf subprocess)")
def get_fop_worker_pool() -> Optional["FopWorkerPool"]:
"""Gibt den aktuellen globalen FOP-Worker-Pool zurück."""
return _fop_worker_pool
class TransformationJob:
"""
Repräsentiert einen einzelnen Transformations-Job.
Ähnlich zur TestFall-Klasse in validate-xls.py, aber für DocuMentor angepasst.
"""
# Klassenweiter Cache für Saxon-Classpaths (Performance-Optimierung)
_classpath_cache: dict[Path, str] = {}
def __init__(
self,
project_dir: Path,
xml_file: Path,
xsl_file: Path,
xslt_params: dict[str, str],
java_vm_path: Path,
saxon_jar_path: Path,
apache_fop_dir: Path,
diff_pdf_path: Path,
diff_pdf_params: list[str],
xsl_id: tuple | None = None,
fop_config_dir: Path | None = None,
dependency_graph: Optional["XslDependencyGraph"] = None,
):
"""
Initialisiert einen Transformations-Job.
Args:
project_dir: Pfad zum Projekt-Verzeichnis
xml_file: Relative Pfad zur XML-Eingabedatei (relativ zu project_dir)
xsl_file: Absolute Pfad zur XSL-Stylesheet-Datei
xslt_params: Dictionary mit XSLT-Parametern
java_vm_path: Pfad zur Java VM Binary
saxon_jar_path: Pfad zur Saxon JAR-Datei
apache_fop_dir: Pfad zum Apache FOP-Verzeichnis
diff_pdf_path: Pfad zur diff-pdf Binary
diff_pdf_params: Standard-Parameter für diff-pdf
xsl_id: ID der XSL-Datei (als Tuple)
fop_config_dir: Optionaler Pfad zum FOP-Config-Verzeichnis (überschreibt Standardpfad)
dependency_graph: Optionaler XSL-Abhängigkeitsgraph für Import/Include-Prüfung
"""
self.project_dir = project_dir
self.xml_file = xml_file # Relativ
self.xsl_file = xsl_file # Absolut
self.xslt_params = xslt_params
self.xsl_id = xsl_id
# Tool-Pfade
self.java_vm_path = java_vm_path
self.saxon_jar_path = saxon_jar_path
self.apache_fop_dir = apache_fop_dir
self.fop_config_dir = fop_config_dir
self.diff_pdf_path = diff_pdf_path
self.diff_pdf_params = diff_pdf_params
self.dependency_graph = dependency_graph
# Ausgabe-Verzeichnisse im Projektordner
self.new_dir = project_dir / "new"
self.ref_dir = project_dir / "ref"
self.diff_dir = project_dir / "diff"
# Stelle sicher, dass Ausgabe-Verzeichnisse existieren
self.new_dir.mkdir(exist_ok=True)
self.ref_dir.mkdir(exist_ok=True)
self.diff_dir.mkdir(exist_ok=True)
# Dateinamen basierend auf XML-Datei + XSL-ID
base_name = self.xml_file.stem
# Füge XSL-ID zum Dateinamen hinzu, falls vorhanden
if xsl_id:
# Konvertiere Tuple (1, 2, 3) zu String "1_2_3"
xsl_id_str = "_".join(str(x) for x in xsl_id)
file_name_base = f"{base_name}_xsl_{xsl_id_str}"
else:
file_name_base = base_name
self.temp_fo = self.new_dir / f"{file_name_base}.fo"
self.new_pdf = self.new_dir / f"{file_name_base}.pdf"
self.ref_pdf = self.ref_dir / f"{file_name_base}.pdf"
self.diff_pdf = self.diff_dir / f"{file_name_base}.pdf"
# Apache FOP Binaries (plattformabhängig)
import sys
if sys.platform == "win32":
self.fop_cmd = self.apache_fop_dir / "fop.cmd"
else:
self.fop_cmd = self.apache_fop_dir / "fop"
# FOP-Konfigurationsdatei: Verwende fop_config_dir falls angegeben, sonst Standardpfad
if self.fop_config_dir:
self.fop_conf = self.fop_config_dir / "fop.xconf"
else:
self.fop_conf = self.apache_fop_dir / "conf" / "fop.xconf"
def is_up_to_date(self) -> bool:
"""
Prüft, ob die Transformation aktuell ist.
Returns:
bool: True wenn New-PDF existiert und aktueller ist als alle Inputs
"""
if not self.new_pdf.exists():
logger.debug(f"New-PDF existiert nicht: {self.new_pdf}")
return False
output_mtime = self.new_pdf.stat().st_mtime
# Prüfe XML-Datei
xml_abs = self.project_dir / self.xml_file
if xml_abs.exists() and xml_abs.stat().st_mtime > output_mtime:
logger.debug(f"XML-Datei ist neuer: {xml_abs}")
return False
# Prüfe XSL-Datei
if self.xsl_file.exists() and self.xsl_file.stat().st_mtime > output_mtime:
logger.debug(f"XSL-Datei ist neuer: {self.xsl_file}")
return False
# Prüfe importierte/inkludierte XSL-Dateien (transitiv)
if self.dependency_graph and self.xsl_file.exists():
for dep_xsl in self.dependency_graph.get_dependencies(self.xsl_file):
if dep_xsl.exists() and dep_xsl.stat().st_mtime > output_mtime:
logger.debug(f"Importierte XSL-Datei ist neuer: {dep_xsl}")
return False
logger.debug(f"Transformation ist aktuell: {self.new_pdf}")
return True
def transform_saxon(self, force: bool = False) -> tuple[bool, str]:
"""
Führt XSLT-Transformation mit Saxon aus: XML → FO.
Args:
force: Wenn True, wird Transformation auch bei aktuellem Output durchgeführt
Returns:
tuple[bool, str]: (Erfolg, Fehlermeldung/Info)
"""
if not force and self.is_up_to_date():
logger.info(f"Transformation übersprungen (aktuell): {self.xml_file.name}")
return True, "Übersprungen (aktuell)"
xml_abs = self.project_dir / self.xml_file
# Prüfe ob Eingabedateien existieren
if not xml_abs.exists():
error_msg = f"XML-Datei nicht gefunden: {xml_abs}"
logger.error(error_msg)
return False, error_msg
if not self.xsl_file.exists():
error_msg = f"XSL-Datei nicht gefunden: {self.xsl_file}"
logger.error(error_msg)
return False, error_msg
logger.info(f"Starte Saxon-Transformation: {self.xml_file.name}")
# Versuche zuerst den Worker-Pool zu nutzen (schneller!)
global _saxon_worker_pool
if _saxon_worker_pool:
try:
success, message = _saxon_worker_pool.transform(
source_xml=xml_abs,
xsl_stylesheet=self.xsl_file,
output_fo=self.temp_fo,
xslt_params=self.xslt_params,
)
if success:
logger.info(f"Saxon-Transformation erfolgreich (Worker-Pool): {self.xml_file.name}")
else:
logger.error(f"Saxon-Transformation fehlgeschlagen (Worker-Pool): {message}")
return success, message
except Exception as e:
logger.warning(f"Worker-Pool-Fehler, Fallback auf subprocess: {e}")
# Fallback auf subprocess unten
# Fallback: Traditionelle subprocess-Methode (langsamer, aber robuster)
# XSLT-Parameter formatieren
params = [f"{key}={value}" for key, value in self.xslt_params.items()]
# Hole Classpath aus Cache oder erstelle ihn
saxon_dir = self.saxon_jar_path.parent
if saxon_dir not in TransformationJob._classpath_cache:
# Sammle alle JAR-Dateien im Saxon-Verzeichnis für den Classpath
import glob
all_jars = glob.glob(str(saxon_dir / "*.jar"))
# Sammle auch alle JARs aus dem lib-Unterordner (z.B. xmlresolver)
lib_dir = saxon_dir / "lib"
if lib_dir.exists() and lib_dir.is_dir():
lib_jars = glob.glob(str(lib_dir / "*.jar"))
all_jars.extend(lib_jars)
logger.debug(f"Zusätzliche JARs aus lib-Verzeichnis gefunden: {len(lib_jars)}")
# Verwende alle JARs im Classpath (getrennt durch : auf Linux/Mac, ; auf Windows)
import sys
classpath_separator = ";" if sys.platform == "win32" else ":"
classpath = classpath_separator.join(all_jars)
# Cache den Classpath für zukünftige Jobs
TransformationJob._classpath_cache[saxon_dir] = classpath
logger.debug(f"Classpath für {saxon_dir} gecacht")
else:
classpath = TransformationJob._classpath_cache[saxon_dir]
logger.debug("Classpath aus Cache verwendet")
# Saxon-Kommandozeile
cmd_line = [
str(self.java_vm_path),
"-cp",
classpath,
"net.sf.saxon.Transform",
f"-s:{xml_abs}",
f"-xsl:{self.xsl_file}",
f"-o:{self.temp_fo}",
*params,
]
logger.debug(f"Kommandozeile (subprocess fallback): {' '.join(cmd_line)}")
try:
result = subprocess.run(
cmd_line,
capture_output=True,
text=True,
timeout=120, # 2 Minuten Timeout
)
# Saxon Ausgaben loggen
if result.stdout:
logger.debug(f"Saxon StdOut:\n{result.stdout}")
if result.stderr:
logger.debug(f"Saxon StdErr:\n{result.stderr}")
if result.returncode == 0:
logger.info(f"Saxon-Transformation erfolgreich (subprocess): {self.xml_file.name}")
return True, "Erfolgreich"
else:
error_msg = (
f"Saxon-Fehler (Exit {result.returncode}):\nStdOut: {result.stdout}\nStdErr: {result.stderr}"
)
logger.error(error_msg)
return False, error_msg
except subprocess.TimeoutExpired:
error_msg = "Saxon-Transformation Timeout (>120s)"
logger.error(error_msg)
return False, error_msg
except Exception as e:
error_msg = f"Unerwarteter Fehler bei Saxon-Transformation: {str(e)}"
logger.error(error_msg)
return False, error_msg
def build_pdf(self, force: bool = False) -> tuple[bool, str]:
"""
Generiert PDF aus FO-Datei mit Apache FOP: FO → PDF.
Args:
force: Wenn True, wird Build auch bei aktuellem Output durchgeführt
Returns:
tuple[bool, str]: (Erfolg, Fehlermeldung/Info)
"""
if not force and self.is_up_to_date():
logger.info(f"PDF-Build übersprungen (aktuell): {self.xml_file.name}")
return True, "Übersprungen (aktuell)"
# Prüfe ob FO-Datei existiert
if not self.temp_fo.exists():
error_msg = f"FO-Datei nicht gefunden: {self.temp_fo}"
logger.error(error_msg)
return False, error_msg
logger.info(f"Starte Apache FOP PDF-Generierung: {self.xml_file.name}")
# Versuche zuerst den Worker-Pool zu nutzen (schneller!)
global _fop_worker_pool
if _fop_worker_pool:
try:
success, message = _fop_worker_pool.build_pdf(
input_fo=self.temp_fo,
output_pdf=self.new_pdf,
)
if success:
logger.info(f"FOP PDF-Generierung erfolgreich (Worker-Pool): {self.xml_file.name}")
# Temporäre FO-Datei löschen
if self.temp_fo.exists():
try:
self.temp_fo.unlink()
logger.debug(f"Temporäre FO-Datei gelöscht: {self.temp_fo}")
except Exception as e:
logger.warning(f"Konnte FO-Datei nicht löschen: {e}")
# Wenn kein Ref-PDF existiert, erstelle es
if not self.ref_pdf.exists():
try:
import shutil
shutil.copy2(self.new_pdf, self.ref_pdf)
logger.info(f"Ref-PDF erstellt: {self.ref_pdf}")
except Exception as e:
logger.warning(f"Konnte Ref-PDF nicht erstellen: {e}")
return True, "Erfolgreich"
else:
logger.error(f"FOP PDF-Generierung fehlgeschlagen (Worker-Pool): {message}")
return False, message
except Exception as e:
logger.warning(f"FOP Worker-Pool-Fehler, Fallback auf subprocess: {e}")
# Fallback auf subprocess unten
# Fallback: Traditionelle subprocess-Methode (langsamer, aber robuster)
# Apache FOP Kommandozeile
cmd_line = [
str(self.fop_cmd),
"-c",
str(self.fop_conf) if self.fop_conf.exists() else "",
"-r",
"-fo",
str(self.temp_fo),
"-pdf",
str(self.new_pdf),
]
# Entferne leere Config-Parameter wenn fop.xconf nicht existiert
if not self.fop_conf.exists():
cmd_line = [c for c in cmd_line if c not in ["-c", ""]]
logger.info(f"Starte Apache FOP PDF-Generierung: {self.xml_file.name}")
logger.debug(f"Kommandozeile: {' '.join(cmd_line)}")
try:
result = subprocess.run(
cmd_line,
capture_output=True,
text=True,
timeout=180, # 3 Minuten Timeout
)
# Apache FOP Ausgaben loggen
if result.stdout:
logger.debug(f"FOP StdOut:\n{result.stdout}")
if result.stderr:
logger.debug(f"FOP StdErr:\n{result.stderr}")
# Temporäre FO-Datei löschen
if self.temp_fo.exists():
try:
self.temp_fo.unlink()
logger.debug(f"Temporäre FO-Datei gelöscht: {self.temp_fo}")
except Exception as e:
logger.warning(f"Konnte FO-Datei nicht löschen: {e}")
if result.returncode == 0:
# Wenn kein Ref-PDF existiert, erstelle es
if not self.ref_pdf.exists():
try:
import shutil
shutil.copy2(self.new_pdf, self.ref_pdf)
logger.info(f"Ref-PDF erstellt: {self.ref_pdf}")
except Exception as e:
logger.warning(f"Konnte Ref-PDF nicht erstellen: {e}")
logger.info(f"PDF-Generierung erfolgreich: {self.new_pdf}")
return True, "Erfolgreich"
else:
error_msg = f"FOP-Fehler (Exit {result.returncode}):\nStdOut: {result.stdout}\nStdErr: {result.stderr}"
logger.error(error_msg)
return False, error_msg
except subprocess.TimeoutExpired:
error_msg = "FOP PDF-Generierung Timeout (>180s)"
logger.error(error_msg)
return False, error_msg
except Exception as e:
error_msg = f"Unerwarteter Fehler bei PDF-Generierung: {str(e)}"
logger.error(error_msg)
return False, error_msg
def compare_pdf(self) -> tuple[bool, str]:
"""
Vergleicht New-PDF mit Ref-PDF und erstellt ggf. Diff-PDF.
Returns:
tuple[bool, str]: (PDFs sind identisch, Fehlermeldung/Info)
"""
# Prüfe ob beide PDFs existieren
if not self.ref_pdf.exists():
info_msg = "Kein Ref-PDF vorhanden (wird beim nächsten Build erstellt)"
logger.info(info_msg)
return True, info_msg
if not self.new_pdf.exists():
error_msg = f"New-PDF nicht gefunden: {self.new_pdf}"
logger.error(error_msg)
return False, error_msg
logger.info(f"Vergleiche PDFs: {self.xml_file.name}")
# Erster Vergleich (ohne Diff-Generierung)
cmd_compare = [
str(self.diff_pdf_path),
*self.diff_pdf_params,
str(self.ref_pdf),
str(self.new_pdf),
]
logger.debug(f"Kommandozeile Vergleich: {' '.join(cmd_compare)}")
try:
result = subprocess.run(
cmd_compare,
capture_output=True,
text=True,
timeout=60, # 1 Minute Timeout
)
if result.returncode == 0:
# PDFs sind identisch
logger.info(f"PDFs sind identisch: {self.xml_file.name}")
# Lösche altes Diff-PDF falls vorhanden
if self.diff_pdf.exists():
try:
self.diff_pdf.unlink()
logger.debug(f"Diff-PDF gelöscht (nicht mehr nötig): {self.diff_pdf}")
except Exception as e:
logger.warning(f"Konnte Diff-PDF nicht löschen: {e}")
return True, "PDFs sind identisch"
else:
# PDFs unterscheiden sich - erstelle Diff-PDF
logger.info(f"PDFs unterscheiden sich, erstelle Diff-PDF: {self.xml_file.name}")
cmd_diff = [
str(self.diff_pdf_path),
f"--output-diff={self.diff_pdf}",
*self.diff_pdf_params,
"--mark-differences",
str(self.ref_pdf),
str(self.new_pdf),
]
logger.debug(f"Kommandozeile Diff: {' '.join(cmd_diff)}")
result_diff = subprocess.run(
cmd_diff,
capture_output=True,
text=True,
timeout=90, # 1.5 Minuten Timeout
)
if result_diff.returncode == 0 or self.diff_pdf.exists():
logger.info(f"Diff-PDF erstellt: {self.diff_pdf}")
return False, f"Unterschiede gefunden - Diff-PDF: {self.diff_pdf.name}"
else:
error_msg = f"Diff-PDF-Erstellung fehlgeschlagen: {result_diff.stderr}"
logger.error(error_msg)
return False, error_msg
except subprocess.TimeoutExpired:
error_msg = "PDF-Vergleich Timeout"
logger.error(error_msg)
return False, error_msg
except Exception as e:
error_msg = f"Unerwarteter Fehler beim PDF-Vergleich: {str(e)}"
logger.error(error_msg)
return False, error_msg
def run_full_pipeline(self, force: bool = False) -> dict[str, Any]:
"""
Führt die komplette Transformations-Pipeline aus:
1. Saxon-Transformation (XML → FO)
2. PDF-Generierung (FO → PDF)
3. PDF-Vergleich
Args:
force: Wenn True, werden alle Schritte ausgeführt (ignoriert Up-to-Date)
Returns:
dict: Ergebnis-Dictionary mit Status und Meldungen
"""
start_time = datetime.now()
result = {
"success": False,
"xml_file": str(self.xml_file),
"xsl_id": self.xsl_id,
"steps": {},
"duration": None,
"new_pdf": str(self.new_pdf) if self.new_pdf.exists() else None,
"diff_pdf": str(self.diff_pdf) if self.diff_pdf.exists() else None,
}
logger.info(f"Starte Transformations-Pipeline: {self.xml_file.name}")
# Schritt 1: Saxon-Transformation
success_saxon, msg_saxon = self.transform_saxon(force=force)
result["steps"]["saxon"] = {"success": success_saxon, "message": msg_saxon}
if not success_saxon:
result["success"] = False
result["duration"] = (datetime.now() - start_time).total_seconds()
return result
# Schritt 2: PDF-Generierung
success_build, msg_build = self.build_pdf(force=force)
result["steps"]["build"] = {"success": success_build, "message": msg_build}
if not success_build:
result["success"] = False
result["duration"] = (datetime.now() - start_time).total_seconds()
return result
# Schritt 3: PDF-Vergleich
pdfs_identical, msg_compare = self.compare_pdf()
result["steps"]["compare"] = {"identical": pdfs_identical, "message": msg_compare}
result["pdfs_identical"] = pdfs_identical
# Pipeline erfolgreich abgeschlossen
result["success"] = True
result["duration"] = (datetime.now() - start_time).total_seconds()
logger.info(f"Pipeline abgeschlossen: {self.xml_file.name} ({result['duration']:.2f}s)")
return result