""" XSL-Abhängigkeitsgraph für die Erkennung transitiver Imports/Includes. Dieses Modul parst XSL-Dateien und baut einen Abhängigkeitsgraph auf, um alle direkt und transitiv importierten/inkludierten Dateien zu ermitteln. Der Graph wird im Speicher gehalten und bei Änderungen (mtime) automatisch invalidiert. """ import logging import time from lxml import etree from pathlib import Path logger = logging.getLogger(__name__) _XSL_NAMESPACE = "http://www.w3.org/1999/XSL/Transform" def _parse_import_include_hrefs(content: str) -> list[str]: """Parst XSL-Dateiinhalt und gibt alle href-Werte von xsl:import/xsl:include zurück.""" try: root = etree.fromstring(content.encode("utf-8")) except etree.XMLSyntaxError: return [] hrefs = [] for tag in ("import", "include"): for elem in root.iter(f"{{{_XSL_NAMESPACE}}}{tag}"): href = elem.get("href") if href: hrefs.append(href) return hrefs class XslDependencyGraph: """ Verwaltet einen Cache von XSL-Abhängigkeiten (import/include). Für jede XSL-Datei wird die Menge aller transitiv referenzierten XSL-Dateien gespeichert. Der Cache wird über die mtime der Dateien automatisch invalidiert. """ def __init__(self): # Cache: xsl_path -> (mtime_bei_parse, set[Path] aller Abhängigkeiten) self._cache: dict[Path, tuple[float, set[Path]]] = {} def get_dependencies(self, xsl_file: Path) -> set[Path]: """ Gibt alle transitiven Abhängigkeiten einer XSL-Datei zurück. Args: xsl_file: Absoluter Pfad zur XSL-Datei Returns: set[Path]: Menge aller transitiv importierten/inkludierten XSL-Dateien """ xsl_file = xsl_file.resolve() if not xsl_file.exists(): return set() current_mtime = xsl_file.stat().st_mtime # Cache-Hit prüfen if xsl_file in self._cache: cached_mtime, cached_deps = self._cache[xsl_file] if cached_mtime == current_mtime: return cached_deps # Neu parsen deps = self._resolve_recursive(xsl_file, set()) self._cache[xsl_file] = (current_mtime, deps) logger.debug(f"XSL-Abhängigkeiten aufgelöst für {xsl_file.name}: {len(deps)} Abhängigkeit(en)") return deps def _resolve_recursive(self, xsl_file: Path, visited: set[Path]) -> set[Path]: """ Löst rekursiv alle import/include-Referenzen auf. Args: xsl_file: Absoluter Pfad zur XSL-Datei visited: Bereits besuchte Dateien (Zykluserkennung) Returns: set[Path]: Alle transitiven Abhängigkeiten (ohne die Datei selbst) """ xsl_file = xsl_file.resolve() if xsl_file in visited or not xsl_file.exists(): return set() visited.add(xsl_file) result: set[Path] = set() try: content = xsl_file.read_text(encoding="utf-8", errors="replace") except Exception as e: logger.warning(f"Konnte XSL-Datei nicht lesen: {xsl_file} ({e})") return result # Finde alle import/include-Referenzen for href in _parse_import_include_hrefs(content): referenced_path = (xsl_file.parent / href).resolve() if referenced_path.exists() and referenced_path not in visited: result.add(referenced_path) # Rekursiv Abhängigkeiten der referenzierten Datei auflösen result.update(self._resolve_recursive(referenced_path, visited)) return result def get_reverse_dependencies(self, xsl_file: Path, xsl_root_dir: Path) -> set[Path]: """ Gibt alle Dateien zurück, die die gegebene XSL-Datei direkt oder transitiv importieren/inkludieren. Args: xsl_file: Absoluter Pfad zur XSL-Datei xsl_root_dir: Wurzelverzeichnis der XSL-Dateien (für den Scan) Returns: set[Path]: Menge aller Dateien, die diese Datei importieren """ xsl_file = xsl_file.resolve() result: set[Path] = set() # Scanne alle XSL-Dateien im Verzeichnis if not xsl_root_dir.exists(): return result for candidate in xsl_root_dir.rglob("*.xsl"): candidate = candidate.resolve() if candidate == xsl_file: continue deps = self.get_dependencies(candidate) if xsl_file in deps: result.add(candidate) return result def build_full_graph(self, xsl_root_dir: Path) -> dict[Path, set[Path]]: """ Baut den vollständigen Abhängigkeitsgraph für alle XSL-Dateien in einem Verzeichnis auf. Args: xsl_root_dir: Wurzelverzeichnis der XSL-Dateien Returns: dict[Path, set[Path]]: Mapping von XSL-Datei zu ihren Abhängigkeiten """ graph: dict[Path, set[Path]] = {} if not xsl_root_dir.exists(): return graph start = time.perf_counter() for xsl_file in sorted(xsl_root_dir.rglob("*.xsl")): xsl_file = xsl_file.resolve() deps = self.get_dependencies(xsl_file) graph[xsl_file] = deps elapsed = time.perf_counter() - start logger.info(f"Vollständiger XSL-Graph aufgebaut: {len(graph)} Dateien in {elapsed:.3f}s") return graph def _get_direct_dependencies(self, xsl_file: Path) -> set[Path]: """ Gibt nur die direkten (nicht-transitiven) import/include-Abhängigkeiten zurück. Args: xsl_file: Absoluter Pfad zur XSL-Datei Returns: set[Path]: Menge der direkt importierten/inkludierten XSL-Dateien """ xsl_file = xsl_file.resolve() result: set[Path] = set() if not xsl_file.exists(): return result try: content = xsl_file.read_text(encoding="utf-8", errors="replace") except Exception as e: logger.warning(f"Konnte XSL-Datei nicht lesen: {xsl_file} ({e})") return result for href in _parse_import_include_hrefs(content): referenced_path = (xsl_file.parent / href).resolve() if referenced_path.exists(): result.add(referenced_path) return result def build_direct_graph(self, xsl_root_dir: Path) -> dict[Path, set[Path]]: """ Baut einen Graph mit nur direkten (nicht-transitiven) Abhängigkeiten auf. Args: xsl_root_dir: Wurzelverzeichnis der XSL-Dateien Returns: dict[Path, set[Path]]: Mapping von XSL-Datei zu ihren direkten Abhängigkeiten """ graph: dict[Path, set[Path]] = {} if not xsl_root_dir.exists(): return graph start = time.perf_counter() for xsl_file in sorted(xsl_root_dir.rglob("*.xsl")): xsl_file = xsl_file.resolve() graph[xsl_file] = self._get_direct_dependencies(xsl_file) elapsed = time.perf_counter() - start logger.info(f"Direkter XSL-Graph aufgebaut: {len(graph)} Dateien in {elapsed:.3f}s") return graph def invalidate(self, xsl_file: Path | None = None): """ Invalidiert den Cache für eine bestimmte Datei oder den gesamten Cache. Args: xsl_file: Pfad zur XSL-Datei oder None für vollständige Invalidierung """ if xsl_file is None: self._cache.clear() logger.debug("XSL-Abhängigkeitscache vollständig invalidiert") else: xsl_file = xsl_file.resolve() # Entferne nicht nur den direkten Eintrag, sondern auch alle Einträge, # die diese Datei als Abhängigkeit haben keys_to_remove = [xsl_file] for cached_path, (_, deps) in self._cache.items(): if xsl_file in deps: keys_to_remove.append(cached_path) for key in keys_to_remove: self._cache.pop(key, None) if keys_to_remove: logger.debug(f"XSL-Abhängigkeitscache invalidiert für {len(keys_to_remove)} Einträg(e)")