mirror of
https://github.com/darkzoul5/YoutubePlaylistSync.git
synced 2026-07-03 04:23:59 +03:00
feat(backend): add Download progress; add SyncStarted, SyncFinished, SyncSummary
This commit is contained in:
@@ -68,6 +68,7 @@ class Downloader:
|
||||
|
||||
def run():
|
||||
import yt_dlp # type: ignore
|
||||
from pathlib import Path
|
||||
|
||||
class _QuietLogger:
|
||||
def debug(self, msg):
|
||||
@@ -97,6 +98,38 @@ class Downloader:
|
||||
"logger": _QuietLogger(),
|
||||
}
|
||||
|
||||
progress_cb = getattr(job, "progress_callback", None)
|
||||
if progress_cb is not None:
|
||||
def hook(d):
|
||||
try:
|
||||
payload = {
|
||||
"status": d.get("status"),
|
||||
"downloaded_bytes": d.get("downloaded_bytes"),
|
||||
"total_bytes": d.get("total_bytes") or d.get("total_bytes_estimate"),
|
||||
"speed": d.get("speed"),
|
||||
"eta": d.get("eta"),
|
||||
"filename": d.get("filename"),
|
||||
}
|
||||
total = payload.get("total_bytes")
|
||||
done = payload.get("downloaded_bytes")
|
||||
if total and done is not None:
|
||||
payload["progress"] = float(done) / float(total)
|
||||
progress_cb(payload)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
ydl_opts["progress_hooks"] = [hook]
|
||||
|
||||
# If user provided an ffmpeg path, pass it through to yt-dlp so it doesn't rely on PATH.
|
||||
ffmpeg_hint = getattr(job, "ffmpeg_path", None) or self.ffmpeg_path
|
||||
if ffmpeg_hint:
|
||||
try:
|
||||
p = Path(str(ffmpeg_hint))
|
||||
if p.exists():
|
||||
ydl_opts["ffmpeg_location"] = str(p)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
with yt_dlp.YoutubeDL(ydl_opts) as ydl: # type: ignore[attr-defined]
|
||||
ydl.download([job.url])
|
||||
|
||||
|
||||
@@ -4,7 +4,7 @@ import asyncio
|
||||
from dataclasses import dataclass
|
||||
from enum import Enum
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
from typing import Any, Callable, Optional
|
||||
|
||||
from ..models import PlaylistItem
|
||||
|
||||
@@ -29,6 +29,8 @@ class DownloadJob:
|
||||
error: Optional[str] = None
|
||||
ffmpeg_path: Optional[str] = None
|
||||
max_download_quality: Optional[str] = None
|
||||
playlist_id: Optional[str] = None
|
||||
progress_callback: Optional[Callable[[dict[str, Any]], None]] = None
|
||||
audio_output_path: Optional[Path] = None # when mode=video and we also want mp3
|
||||
keep_video: bool = True
|
||||
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import time
|
||||
import shutil
|
||||
from pathlib import Path
|
||||
from typing import Iterable, List
|
||||
@@ -22,7 +23,24 @@ class ActionExecutor:
|
||||
self.bus = event_bus
|
||||
|
||||
async def execute(self, actions: Iterable[SyncAction], playlist_cfg: dict) -> None:
|
||||
self._preflight_dependencies(actions, playlist_cfg)
|
||||
actions_list = list(actions)
|
||||
playlist_id = extract_playlist_id(playlist_cfg.get("url", "")) or playlist_cfg.get("url", "")
|
||||
start = time.monotonic()
|
||||
counts: dict[str, int] = {}
|
||||
for a in actions_list:
|
||||
counts[a.type.name] = counts.get(a.type.name, 0) + 1
|
||||
|
||||
if self.bus:
|
||||
await self.bus.publish(
|
||||
"SyncStarted",
|
||||
{
|
||||
"playlist_id": playlist_id,
|
||||
"actions_total": sum(counts.values()),
|
||||
"counts": dict(counts),
|
||||
},
|
||||
)
|
||||
|
||||
self._preflight_dependencies(actions_list, playlist_cfg)
|
||||
|
||||
save_path = Path(playlist_cfg.get("save_path", "./downloads")).resolve()
|
||||
mode = playlist_cfg.get("download_mode", "video")
|
||||
@@ -34,13 +52,23 @@ class ActionExecutor:
|
||||
video_root.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# First, handle renames safely in batch per extension
|
||||
await self._apply_renames(actions, audio_root, video_root, playlist_cfg)
|
||||
await self._apply_renames(actions_list, audio_root, video_root, playlist_cfg)
|
||||
|
||||
# Then, recycle deletions
|
||||
self._apply_deletions(actions, audio_root, video_root, playlist_cfg)
|
||||
self._apply_deletions(actions_list, audio_root, video_root, playlist_cfg)
|
||||
|
||||
# Finally, perform downloads concurrently
|
||||
await self._apply_downloads(actions, mode, audio_root, video_root, playlist_cfg)
|
||||
await self._apply_downloads(actions_list, mode, audio_root, video_root, playlist_cfg)
|
||||
|
||||
duration_s = round(time.monotonic() - start, 3)
|
||||
summary = {
|
||||
"playlist_id": playlist_id,
|
||||
"duration_s": duration_s,
|
||||
"counts": dict(counts),
|
||||
}
|
||||
if self.bus:
|
||||
await self.bus.publish("SyncSummary", dict(summary))
|
||||
await self.bus.publish("SyncFinished", dict(summary))
|
||||
|
||||
def _preflight_dependencies(self, actions: Iterable[SyncAction], playlist_cfg: dict) -> None:
|
||||
"""
|
||||
@@ -128,6 +156,7 @@ class ActionExecutor:
|
||||
|
||||
async def _apply_downloads(self, actions: Iterable[SyncAction], mode: str, audio_root: Path, video_root: Path, playlist_cfg: dict) -> None:
|
||||
playlist_id = extract_playlist_id(playlist_cfg.get("url", "")) or playlist_cfg.get("url", "")
|
||||
loop = asyncio.get_running_loop()
|
||||
concurrency_cfg = playlist_cfg.get("max_parallel_downloads", self.concurrency)
|
||||
try:
|
||||
concurrency = int(concurrency_cfg) if concurrency_cfg is not None else self.concurrency
|
||||
@@ -147,6 +176,18 @@ class ActionExecutor:
|
||||
retry_delay_seconds = 1.5
|
||||
|
||||
async def worker(job: DownloadJob):
|
||||
job.playlist_id = playlist_id
|
||||
|
||||
if self.bus:
|
||||
def _progress_cb(info: dict):
|
||||
payload = dict(info)
|
||||
payload.setdefault("playlist_id", playlist_id)
|
||||
if job.item:
|
||||
payload.setdefault("video_id", job.item.video_id)
|
||||
loop.call_soon_threadsafe(asyncio.create_task, self.bus.publish("DownloadProgress", payload))
|
||||
|
||||
job.progress_callback = _progress_cb
|
||||
|
||||
if self.bus and job.item:
|
||||
await self.bus.publish("DownloadStarted", {"playlist_id": playlist_id, "video_id": job.item.video_id, "target": str(job.output_path)})
|
||||
await default_worker(job, max_retries=retry_max_retries, delay_seconds=retry_delay_seconds)
|
||||
|
||||
@@ -0,0 +1,85 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
from src.app.core.download.downloader import Downloader
|
||||
from src.app.core.download.queue_manager import DownloadJob
|
||||
from src.app.core.events.event_bus import EventBus
|
||||
from src.app.core.models import PlaylistItem, SyncAction, SyncActionType
|
||||
from src.app.core.sync.executor import ActionExecutor
|
||||
|
||||
|
||||
def test_executor_emits_sync_events(tmp_path):
|
||||
published: list[tuple[str, dict]] = []
|
||||
|
||||
class TestBus(EventBus):
|
||||
async def publish(self, event_name: str, payload: dict) -> None: # type: ignore[override]
|
||||
published.append((event_name, dict(payload)))
|
||||
|
||||
class StubDB:
|
||||
def update_local_filename(self, playlist_id: str, video_id: str, filename: str) -> None:
|
||||
return None
|
||||
|
||||
def mark_downloaded(self, playlist_id: str, video_id: str, downloaded: bool) -> None:
|
||||
return None
|
||||
|
||||
bus = TestBus()
|
||||
ex = ActionExecutor(StubDB(), concurrency=1, event_bus=bus) # type: ignore[arg-type]
|
||||
|
||||
item = PlaylistItem(playlist_id="p", video_id="v", title="t", playlist_index=1)
|
||||
actions = [SyncAction(SyncActionType.SKIP, item=item, to_name="0001 - t.mp4")]
|
||||
|
||||
asyncio.run(ex.execute(actions, {"url": "p", "save_path": str(tmp_path)}))
|
||||
|
||||
names = [n for n, _ in published]
|
||||
assert "SyncStarted" in names
|
||||
assert "SyncSummary" in names
|
||||
assert "SyncFinished" in names
|
||||
|
||||
summary = [p for n, p in published if n == "SyncSummary"][0]
|
||||
assert summary["playlist_id"] == "p"
|
||||
assert "duration_s" in summary
|
||||
assert isinstance(summary["counts"], dict)
|
||||
|
||||
|
||||
def test_downloader_progress_hook_calls_callback(tmp_path, monkeypatch):
|
||||
callbacks: list[dict] = []
|
||||
|
||||
class DummyYDL:
|
||||
def __init__(self, opts):
|
||||
self.opts = opts
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc, tb):
|
||||
return False
|
||||
|
||||
def download(self, urls):
|
||||
hooks = self.opts.get("progress_hooks") or []
|
||||
for h in hooks:
|
||||
h({"status": "downloading", "downloaded_bytes": 50, "total_bytes": 100, "speed": 1.0, "eta": 1, "filename": "x"})
|
||||
h({"status": "finished", "downloaded_bytes": 100, "total_bytes": 100, "speed": 1.0, "eta": 0, "filename": "x"})
|
||||
|
||||
dummy = type("yt_dlp", (), {"YoutubeDL": DummyYDL})
|
||||
monkeypatch.setitem(sys.modules, "yt_dlp", dummy)
|
||||
|
||||
ffmpeg = tmp_path / "ffmpeg"
|
||||
ffmpeg.write_text("x", encoding="utf-8")
|
||||
|
||||
job = DownloadJob(
|
||||
item=PlaylistItem(playlist_id="p", video_id="v", title="t", playlist_index=1),
|
||||
url="https://example.invalid",
|
||||
output_path=tmp_path / "out.mp4",
|
||||
ffmpeg_path=str(ffmpeg),
|
||||
)
|
||||
job.progress_callback = lambda payload: callbacks.append(dict(payload))
|
||||
|
||||
dl = Downloader()
|
||||
asyncio.run(dl._download(job)) # type: ignore[attr-defined]
|
||||
|
||||
assert callbacks
|
||||
assert any("progress" in c for c in callbacks)
|
||||
|
||||
Reference in New Issue
Block a user