2026-04-15 20:11:02 +02:00
|
|
|
"""Tests für SmtcController (Windows/SMTC)."""
|
|
|
|
|
|
|
|
|
|
import sys
|
|
|
|
|
from unittest.mock import AsyncMock, MagicMock
|
|
|
|
|
|
|
|
|
|
import pytest
|
|
|
|
|
|
|
|
|
|
pytestmark = pytest.mark.skipif(
|
|
|
|
|
sys.platform != "win32", reason="SMTC is Windows-only"
|
|
|
|
|
)
|
|
|
|
|
|
2026-04-15 20:13:14 +02:00
|
|
|
if sys.platform == "win32":
|
|
|
|
|
from winrt.windows.media.control import (
|
|
|
|
|
GlobalSystemMediaTransportControlsSessionPlaybackStatus as Status,
|
|
|
|
|
)
|
|
|
|
|
PLAYING = Status.PLAYING
|
|
|
|
|
PAUSED = Status.PAUSED
|
2026-04-15 20:11:02 +02:00
|
|
|
|
|
|
|
|
|
|
|
|
|
def _make_session(aumid: str, status) -> MagicMock:
|
|
|
|
|
"""Erzeugt eine gemockte SMTC-Session mit gegebenem PlaybackStatus."""
|
|
|
|
|
session = MagicMock()
|
|
|
|
|
session.source_app_user_model_id = aumid
|
|
|
|
|
info = MagicMock()
|
|
|
|
|
info.playback_status = status
|
|
|
|
|
session.get_playback_info = MagicMock(return_value=info)
|
|
|
|
|
session.try_pause_async = AsyncMock()
|
|
|
|
|
session.try_play_async = AsyncMock()
|
|
|
|
|
return session
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _make_manager(sessions: list) -> MagicMock:
|
|
|
|
|
"""Erzeugt einen gemockten SMTC-Manager mit gegebenen Sessions."""
|
|
|
|
|
manager = MagicMock()
|
|
|
|
|
manager.get_sessions = MagicMock(return_value=sessions)
|
|
|
|
|
return manager
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
|
|
|
async def test_pause_is_noop_when_smtc_unreachable(monkeypatch, caplog):
|
|
|
|
|
from whisper_local.media._smtc import SmtcController
|
|
|
|
|
|
|
|
|
|
controller = SmtcController()
|
|
|
|
|
monkeypatch.setattr(
|
|
|
|
|
controller,
|
|
|
|
|
"_ensure_manager",
|
|
|
|
|
AsyncMock(side_effect=RuntimeError("kein SMTC")),
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
with caplog.at_level("WARNING"):
|
|
|
|
|
await controller.pause()
|
|
|
|
|
|
|
|
|
|
assert controller._paused == []
|
|
|
|
|
assert any("SMTC" in r.message or "smtc" in r.message.lower() for r in caplog.records)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
|
|
|
async def test_pause_skips_reconnect_after_smtc_failure(monkeypatch):
|
|
|
|
|
from whisper_local.media._smtc import SmtcController
|
|
|
|
|
|
|
|
|
|
call_count = 0
|
|
|
|
|
|
|
|
|
|
async def failing_ensure():
|
|
|
|
|
nonlocal call_count
|
|
|
|
|
call_count += 1
|
|
|
|
|
raise RuntimeError("kein SMTC")
|
|
|
|
|
|
|
|
|
|
controller = SmtcController()
|
|
|
|
|
monkeypatch.setattr(controller, "_ensure_manager", failing_ensure)
|
|
|
|
|
|
|
|
|
|
await controller.pause()
|
|
|
|
|
await controller.pause()
|
|
|
|
|
await controller.pause()
|
|
|
|
|
|
|
|
|
|
assert call_count == 1
|
2026-04-15 20:14:36 +02:00
|
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
|
|
|
async def test_pause_with_no_sessions_is_noop(monkeypatch):
|
|
|
|
|
from whisper_local.media._smtc import SmtcController
|
|
|
|
|
|
|
|
|
|
controller = SmtcController()
|
|
|
|
|
monkeypatch.setattr(
|
|
|
|
|
controller, "_ensure_manager", AsyncMock(return_value=_make_manager([]))
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
await controller.pause()
|
|
|
|
|
|
|
|
|
|
assert controller._paused == []
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
|
|
|
async def test_pause_pauses_all_playing_sessions(monkeypatch):
|
|
|
|
|
from whisper_local.media._smtc import SmtcController
|
|
|
|
|
|
|
|
|
|
s1 = _make_session("Spotify", PLAYING)
|
|
|
|
|
s2 = _make_session("msedge", PLAYING)
|
|
|
|
|
controller = SmtcController()
|
|
|
|
|
monkeypatch.setattr(
|
|
|
|
|
controller,
|
|
|
|
|
"_ensure_manager",
|
|
|
|
|
AsyncMock(return_value=_make_manager([s1, s2])),
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
await controller.pause()
|
|
|
|
|
|
|
|
|
|
s1.try_pause_async.assert_awaited_once()
|
|
|
|
|
s2.try_pause_async.assert_awaited_once()
|
|
|
|
|
assert controller._paused == ["Spotify", "msedge"]
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
|
|
|
async def test_pause_skips_already_paused_sessions(monkeypatch):
|
|
|
|
|
from whisper_local.media._smtc import SmtcController
|
|
|
|
|
|
|
|
|
|
playing = _make_session("Spotify", PLAYING)
|
|
|
|
|
already_paused = _make_session("msedge", PAUSED)
|
|
|
|
|
controller = SmtcController()
|
|
|
|
|
monkeypatch.setattr(
|
|
|
|
|
controller,
|
|
|
|
|
"_ensure_manager",
|
|
|
|
|
AsyncMock(return_value=_make_manager([playing, already_paused])),
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
await controller.pause()
|
|
|
|
|
|
|
|
|
|
playing.try_pause_async.assert_awaited_once()
|
|
|
|
|
already_paused.try_pause_async.assert_not_awaited()
|
|
|
|
|
assert controller._paused == ["Spotify"]
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
|
|
|
async def test_pause_logs_and_continues_when_session_fails(monkeypatch, caplog):
|
|
|
|
|
from whisper_local.media._smtc import SmtcController
|
|
|
|
|
|
|
|
|
|
broken = _make_session("broken", PLAYING)
|
|
|
|
|
broken.try_pause_async = AsyncMock(side_effect=RuntimeError("Verbindung verloren"))
|
|
|
|
|
good = _make_session("Spotify", PLAYING)
|
|
|
|
|
controller = SmtcController()
|
|
|
|
|
monkeypatch.setattr(
|
|
|
|
|
controller,
|
|
|
|
|
"_ensure_manager",
|
|
|
|
|
AsyncMock(return_value=_make_manager([broken, good])),
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
with caplog.at_level("WARNING"):
|
|
|
|
|
await controller.pause()
|
|
|
|
|
|
|
|
|
|
good.try_pause_async.assert_awaited_once()
|
|
|
|
|
assert controller._paused == ["Spotify"]
|
|
|
|
|
assert any("broken" in r.message for r in caplog.records)
|
2026-04-16 18:15:16 +02:00
|
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
|
|
|
async def test_resume_with_empty_paused_list_is_noop(monkeypatch):
|
|
|
|
|
from whisper_local.media._smtc import SmtcController
|
|
|
|
|
|
|
|
|
|
controller = SmtcController()
|
|
|
|
|
controller._paused = []
|
|
|
|
|
ensure = AsyncMock()
|
|
|
|
|
monkeypatch.setattr(controller, "_ensure_manager", ensure)
|
|
|
|
|
|
|
|
|
|
await controller.resume()
|
|
|
|
|
|
|
|
|
|
ensure.assert_not_awaited()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
|
|
|
async def test_resume_plays_only_previously_paused(monkeypatch):
|
|
|
|
|
from whisper_local.media._smtc import SmtcController
|
|
|
|
|
|
|
|
|
|
spotify = _make_session("Spotify", PAUSED)
|
|
|
|
|
edge = _make_session("msedge", PAUSED)
|
|
|
|
|
controller = SmtcController()
|
|
|
|
|
controller._paused = ["Spotify"]
|
|
|
|
|
monkeypatch.setattr(
|
|
|
|
|
controller,
|
|
|
|
|
"_ensure_manager",
|
|
|
|
|
AsyncMock(return_value=_make_manager([spotify, edge])),
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
await controller.resume()
|
|
|
|
|
|
|
|
|
|
spotify.try_play_async.assert_awaited_once()
|
|
|
|
|
edge.try_play_async.assert_not_awaited()
|
|
|
|
|
assert controller._paused == []
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
|
|
|
async def test_resume_skips_disappeared_session(monkeypatch, caplog):
|
|
|
|
|
from whisper_local.media._smtc import SmtcController
|
|
|
|
|
|
|
|
|
|
still_there = _make_session("Spotify", PAUSED)
|
|
|
|
|
controller = SmtcController()
|
|
|
|
|
controller._paused = ["gone_app", "Spotify"]
|
|
|
|
|
monkeypatch.setattr(
|
|
|
|
|
controller,
|
|
|
|
|
"_ensure_manager",
|
|
|
|
|
AsyncMock(return_value=_make_manager([still_there])),
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
with caplog.at_level("WARNING"):
|
|
|
|
|
await controller.resume()
|
|
|
|
|
|
|
|
|
|
still_there.try_play_async.assert_awaited_once()
|
|
|
|
|
assert controller._paused == []
|
|
|
|
|
assert any("gone_app" in r.message for r in caplog.records)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
|
|
|
async def test_resume_logs_and_continues_when_session_fails(monkeypatch, caplog):
|
|
|
|
|
from whisper_local.media._smtc import SmtcController
|
|
|
|
|
|
|
|
|
|
broken = _make_session("broken", PAUSED)
|
|
|
|
|
broken.try_play_async = AsyncMock(side_effect=RuntimeError("Verbindung verloren"))
|
|
|
|
|
good = _make_session("Spotify", PAUSED)
|
|
|
|
|
controller = SmtcController()
|
|
|
|
|
controller._paused = ["broken", "Spotify"]
|
|
|
|
|
monkeypatch.setattr(
|
|
|
|
|
controller,
|
|
|
|
|
"_ensure_manager",
|
|
|
|
|
AsyncMock(return_value=_make_manager([broken, good])),
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
with caplog.at_level("WARNING"):
|
|
|
|
|
await controller.resume()
|
|
|
|
|
|
|
|
|
|
good.try_play_async.assert_awaited_once()
|
|
|
|
|
assert controller._paused == []
|
|
|
|
|
assert any("broken" in r.message for r in caplog.records)
|