Debugging: Verbesserte Fehlerdiagnose für Saxon Worker Pool
- Leitet stderr jedes Workers in separate Log-Dateien um (worker_N_stderr.log) - Fügt Startup-Health-Check hinzu: Prüft nach 100ms ob Worker noch läuft - Fügt Pre-Transform-Check hinzu: Validiert Worker-Status vor jedem Job - Zeigt stderr-Inhalt in Fehlermeldungen wenn Worker crashen - Erweitert Debug-Logging für Job-Submission und Worker-Antworten Dies hilft, die Ursache der "broken pipe" Fehler zu identifizieren. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
This commit is contained in:
+49
-2
@@ -207,11 +207,15 @@ class SaxonWorkerPool:
|
|||||||
# Starte JVM-Prozess mit SaxonWorker
|
# Starte JVM-Prozess mit SaxonWorker
|
||||||
cmd = [str(self.java_vm_path), "-cp", full_classpath, "SaxonWorker"]
|
cmd = [str(self.java_vm_path), "-cp", full_classpath, "SaxonWorker"]
|
||||||
|
|
||||||
|
# Öffne stderr-Log-Datei für diesen Worker
|
||||||
|
stderr_log = self.temp_dir / f"worker_{i}_stderr.log"
|
||||||
|
stderr_file = open(stderr_log, "w", encoding="utf-8")
|
||||||
|
|
||||||
process = subprocess.Popen(
|
process = subprocess.Popen(
|
||||||
cmd,
|
cmd,
|
||||||
stdin=subprocess.PIPE,
|
stdin=subprocess.PIPE,
|
||||||
stdout=subprocess.PIPE,
|
stdout=subprocess.PIPE,
|
||||||
stderr=subprocess.PIPE,
|
stderr=stderr_file, # Redirect stderr to file
|
||||||
text=True,
|
text=True,
|
||||||
bufsize=1, # Line buffered
|
bufsize=1, # Line buffered
|
||||||
)
|
)
|
||||||
@@ -219,7 +223,22 @@ class SaxonWorkerPool:
|
|||||||
self.workers.append(process)
|
self.workers.append(process)
|
||||||
self.worker_locks.append(threading.Lock())
|
self.worker_locks.append(threading.Lock())
|
||||||
|
|
||||||
logger.debug(f"Worker {i} gestartet (PID: {process.pid})")
|
logger.debug(f"Worker {i} gestartet (PID: {process.pid}, stderr: {stderr_log})")
|
||||||
|
|
||||||
|
# Warte kurz damit Worker initialisieren kann
|
||||||
|
import time
|
||||||
|
|
||||||
|
time.sleep(0.1)
|
||||||
|
|
||||||
|
# Prüfe ob Worker noch läuft
|
||||||
|
if process.poll() is not None:
|
||||||
|
# Worker ist bereits beendet - Fehler!
|
||||||
|
stderr_file.close()
|
||||||
|
with open(stderr_log, "r") as f:
|
||||||
|
stderr_content = f.read()
|
||||||
|
raise RuntimeError(
|
||||||
|
f"Worker {i} ist sofort beendet (Exit Code: {process.returncode})\nstderr:\n{stderr_content}"
|
||||||
|
)
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Fehler beim Starten von Worker {i}: {e}")
|
logger.error(f"Fehler beim Starten von Worker {i}: {e}")
|
||||||
@@ -259,12 +278,29 @@ class SaxonWorkerPool:
|
|||||||
try:
|
try:
|
||||||
worker = self.workers[worker_idx]
|
worker = self.workers[worker_idx]
|
||||||
|
|
||||||
|
# Prüfe ob Worker noch läuft
|
||||||
|
if worker.poll() is not None:
|
||||||
|
# Worker ist tot!
|
||||||
|
stderr_log = self.temp_dir / f"worker_{worker_idx}_stderr.log"
|
||||||
|
try:
|
||||||
|
with open(stderr_log, "r") as f:
|
||||||
|
stderr_content = f.read()
|
||||||
|
error_msg = (
|
||||||
|
f"Worker {worker_idx} ist beendet (Exit: {worker.returncode})\nstderr:\n{stderr_content}"
|
||||||
|
)
|
||||||
|
except Exception:
|
||||||
|
error_msg = f"Worker {worker_idx} ist beendet (Exit: {worker.returncode})"
|
||||||
|
logger.error(error_msg)
|
||||||
|
return False, error_msg
|
||||||
|
|
||||||
# Formatiere Parameter
|
# Formatiere Parameter
|
||||||
params_str = "|||".join([f"{key}={value}" for key, value in xslt_params.items()])
|
params_str = "|||".join([f"{key}={value}" for key, value in xslt_params.items()])
|
||||||
|
|
||||||
# Erstelle Job-String (Tab-separated)
|
# Erstelle Job-String (Tab-separated)
|
||||||
job = f"{source_xml}\t{xsl_stylesheet}\t{output_fo}\t{params_str}\n"
|
job = f"{source_xml}\t{xsl_stylesheet}\t{output_fo}\t{params_str}\n"
|
||||||
|
|
||||||
|
logger.debug(f"Sende Job an Worker {worker_idx}: {source_xml.name}")
|
||||||
|
|
||||||
# Sende Job an Worker
|
# Sende Job an Worker
|
||||||
worker.stdin.write(job)
|
worker.stdin.write(job)
|
||||||
worker.stdin.flush()
|
worker.stdin.flush()
|
||||||
@@ -272,12 +308,23 @@ class SaxonWorkerPool:
|
|||||||
# Warte auf Antwort
|
# Warte auf Antwort
|
||||||
response = worker.stdout.readline().strip()
|
response = worker.stdout.readline().strip()
|
||||||
|
|
||||||
|
logger.debug(f"Worker {worker_idx} Antwort: '{response}'")
|
||||||
|
|
||||||
if response == "OK":
|
if response == "OK":
|
||||||
return True, "Erfolgreich"
|
return True, "Erfolgreich"
|
||||||
elif response.startswith("ERROR:"):
|
elif response.startswith("ERROR:"):
|
||||||
error_msg = response[6:].strip()
|
error_msg = response[6:].strip()
|
||||||
return False, f"Saxon-Fehler: {error_msg}"
|
return False, f"Saxon-Fehler: {error_msg}"
|
||||||
else:
|
else:
|
||||||
|
# Leere Antwort bedeutet Worker ist crashed
|
||||||
|
if not response:
|
||||||
|
stderr_log = self.temp_dir / f"worker_{worker_idx}_stderr.log"
|
||||||
|
try:
|
||||||
|
with open(stderr_log, "r") as f:
|
||||||
|
stderr_content = f.read()[-500:] # Letzte 500 Zeichen
|
||||||
|
return False, f"Worker {worker_idx} crashed (keine Antwort)\nstderr:\n{stderr_content}"
|
||||||
|
except Exception:
|
||||||
|
return False, f"Worker {worker_idx} crashed (keine Antwort)"
|
||||||
return False, f"Unerwartete Antwort: {response}"
|
return False, f"Unerwartete Antwort: {response}"
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
|||||||
Reference in New Issue
Block a user