diff --git a/src/app/core/download/downloader.py b/src/app/core/download/downloader.py index b839f9f..8b6801a 100644 --- a/src/app/core/download/downloader.py +++ b/src/app/core/download/downloader.py @@ -68,6 +68,7 @@ class Downloader: def run(): import yt_dlp # type: ignore + from pathlib import Path class _QuietLogger: def debug(self, msg): @@ -97,6 +98,38 @@ class Downloader: "logger": _QuietLogger(), } + progress_cb = getattr(job, "progress_callback", None) + if progress_cb is not None: + def hook(d): + try: + payload = { + "status": d.get("status"), + "downloaded_bytes": d.get("downloaded_bytes"), + "total_bytes": d.get("total_bytes") or d.get("total_bytes_estimate"), + "speed": d.get("speed"), + "eta": d.get("eta"), + "filename": d.get("filename"), + } + total = payload.get("total_bytes") + done = payload.get("downloaded_bytes") + if total and done is not None: + payload["progress"] = float(done) / float(total) + progress_cb(payload) + except Exception: + pass + + ydl_opts["progress_hooks"] = [hook] + + # If user provided an ffmpeg path, pass it through to yt-dlp so it doesn't rely on PATH. + ffmpeg_hint = getattr(job, "ffmpeg_path", None) or self.ffmpeg_path + if ffmpeg_hint: + try: + p = Path(str(ffmpeg_hint)) + if p.exists(): + ydl_opts["ffmpeg_location"] = str(p) + except Exception: + pass + with yt_dlp.YoutubeDL(ydl_opts) as ydl: # type: ignore[attr-defined] ydl.download([job.url]) diff --git a/src/app/core/download/queue_manager.py b/src/app/core/download/queue_manager.py index bf85399..24ebada 100644 --- a/src/app/core/download/queue_manager.py +++ b/src/app/core/download/queue_manager.py @@ -4,7 +4,7 @@ import asyncio from dataclasses import dataclass from enum import Enum from pathlib import Path -from typing import Optional +from typing import Any, Callable, Optional from ..models import PlaylistItem @@ -29,6 +29,8 @@ class DownloadJob: error: Optional[str] = None ffmpeg_path: Optional[str] = None max_download_quality: Optional[str] = None + playlist_id: Optional[str] = None + progress_callback: Optional[Callable[[dict[str, Any]], None]] = None audio_output_path: Optional[Path] = None # when mode=video and we also want mp3 keep_video: bool = True diff --git a/src/app/core/sync/executor.py b/src/app/core/sync/executor.py index bfab456..b8a60ae 100644 --- a/src/app/core/sync/executor.py +++ b/src/app/core/sync/executor.py @@ -1,6 +1,7 @@ from __future__ import annotations import asyncio +import time import shutil from pathlib import Path from typing import Iterable, List @@ -22,7 +23,24 @@ class ActionExecutor: self.bus = event_bus async def execute(self, actions: Iterable[SyncAction], playlist_cfg: dict) -> None: - self._preflight_dependencies(actions, playlist_cfg) + actions_list = list(actions) + playlist_id = extract_playlist_id(playlist_cfg.get("url", "")) or playlist_cfg.get("url", "") + start = time.monotonic() + counts: dict[str, int] = {} + for a in actions_list: + counts[a.type.name] = counts.get(a.type.name, 0) + 1 + + if self.bus: + await self.bus.publish( + "SyncStarted", + { + "playlist_id": playlist_id, + "actions_total": sum(counts.values()), + "counts": dict(counts), + }, + ) + + self._preflight_dependencies(actions_list, playlist_cfg) save_path = Path(playlist_cfg.get("save_path", "./downloads")).resolve() mode = playlist_cfg.get("download_mode", "video") @@ -34,13 +52,23 @@ class ActionExecutor: video_root.mkdir(parents=True, exist_ok=True) # First, handle renames safely in batch per extension - await self._apply_renames(actions, audio_root, video_root, playlist_cfg) + await self._apply_renames(actions_list, audio_root, video_root, playlist_cfg) # Then, recycle deletions - self._apply_deletions(actions, audio_root, video_root, playlist_cfg) + self._apply_deletions(actions_list, audio_root, video_root, playlist_cfg) # Finally, perform downloads concurrently - await self._apply_downloads(actions, mode, audio_root, video_root, playlist_cfg) + await self._apply_downloads(actions_list, mode, audio_root, video_root, playlist_cfg) + + duration_s = round(time.monotonic() - start, 3) + summary = { + "playlist_id": playlist_id, + "duration_s": duration_s, + "counts": dict(counts), + } + if self.bus: + await self.bus.publish("SyncSummary", dict(summary)) + await self.bus.publish("SyncFinished", dict(summary)) def _preflight_dependencies(self, actions: Iterable[SyncAction], playlist_cfg: dict) -> None: """ @@ -128,6 +156,7 @@ class ActionExecutor: async def _apply_downloads(self, actions: Iterable[SyncAction], mode: str, audio_root: Path, video_root: Path, playlist_cfg: dict) -> None: playlist_id = extract_playlist_id(playlist_cfg.get("url", "")) or playlist_cfg.get("url", "") + loop = asyncio.get_running_loop() concurrency_cfg = playlist_cfg.get("max_parallel_downloads", self.concurrency) try: concurrency = int(concurrency_cfg) if concurrency_cfg is not None else self.concurrency @@ -147,6 +176,18 @@ class ActionExecutor: retry_delay_seconds = 1.5 async def worker(job: DownloadJob): + job.playlist_id = playlist_id + + if self.bus: + def _progress_cb(info: dict): + payload = dict(info) + payload.setdefault("playlist_id", playlist_id) + if job.item: + payload.setdefault("video_id", job.item.video_id) + loop.call_soon_threadsafe(asyncio.create_task, self.bus.publish("DownloadProgress", payload)) + + job.progress_callback = _progress_cb + if self.bus and job.item: await self.bus.publish("DownloadStarted", {"playlist_id": playlist_id, "video_id": job.item.video_id, "target": str(job.output_path)}) await default_worker(job, max_retries=retry_max_retries, delay_seconds=retry_delay_seconds) diff --git a/tests/test_event_surface.py b/tests/test_event_surface.py new file mode 100644 index 0000000..666cb56 --- /dev/null +++ b/tests/test_event_surface.py @@ -0,0 +1,85 @@ +from __future__ import annotations + +import asyncio +import sys +from pathlib import Path + +from src.app.core.download.downloader import Downloader +from src.app.core.download.queue_manager import DownloadJob +from src.app.core.events.event_bus import EventBus +from src.app.core.models import PlaylistItem, SyncAction, SyncActionType +from src.app.core.sync.executor import ActionExecutor + + +def test_executor_emits_sync_events(tmp_path): + published: list[tuple[str, dict]] = [] + + class TestBus(EventBus): + async def publish(self, event_name: str, payload: dict) -> None: # type: ignore[override] + published.append((event_name, dict(payload))) + + class StubDB: + def update_local_filename(self, playlist_id: str, video_id: str, filename: str) -> None: + return None + + def mark_downloaded(self, playlist_id: str, video_id: str, downloaded: bool) -> None: + return None + + bus = TestBus() + ex = ActionExecutor(StubDB(), concurrency=1, event_bus=bus) # type: ignore[arg-type] + + item = PlaylistItem(playlist_id="p", video_id="v", title="t", playlist_index=1) + actions = [SyncAction(SyncActionType.SKIP, item=item, to_name="0001 - t.mp4")] + + asyncio.run(ex.execute(actions, {"url": "p", "save_path": str(tmp_path)})) + + names = [n for n, _ in published] + assert "SyncStarted" in names + assert "SyncSummary" in names + assert "SyncFinished" in names + + summary = [p for n, p in published if n == "SyncSummary"][0] + assert summary["playlist_id"] == "p" + assert "duration_s" in summary + assert isinstance(summary["counts"], dict) + + +def test_downloader_progress_hook_calls_callback(tmp_path, monkeypatch): + callbacks: list[dict] = [] + + class DummyYDL: + def __init__(self, opts): + self.opts = opts + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, tb): + return False + + def download(self, urls): + hooks = self.opts.get("progress_hooks") or [] + for h in hooks: + h({"status": "downloading", "downloaded_bytes": 50, "total_bytes": 100, "speed": 1.0, "eta": 1, "filename": "x"}) + h({"status": "finished", "downloaded_bytes": 100, "total_bytes": 100, "speed": 1.0, "eta": 0, "filename": "x"}) + + dummy = type("yt_dlp", (), {"YoutubeDL": DummyYDL}) + monkeypatch.setitem(sys.modules, "yt_dlp", dummy) + + ffmpeg = tmp_path / "ffmpeg" + ffmpeg.write_text("x", encoding="utf-8") + + job = DownloadJob( + item=PlaylistItem(playlist_id="p", video_id="v", title="t", playlist_index=1), + url="https://example.invalid", + output_path=tmp_path / "out.mp4", + ffmpeg_path=str(ffmpeg), + ) + job.progress_callback = lambda payload: callbacks.append(dict(payload)) + + dl = Downloader() + asyncio.run(dl._download(job)) # type: ignore[attr-defined] + + assert callbacks + assert any("progress" in c for c in callbacks) +