From decc4c675dd8efc868c0890456563be685a44b8b Mon Sep 17 00:00:00 2001 From: DARKZOUL5 Date: Sat, 16 May 2026 17:53:37 +0300 Subject: [PATCH] feat(backend): add queue system; add retry system --- README.md | 13 ++++++++ config/yt-playlist-config.example.json | 3 ++ src/app/config/settings.py | 6 ++++ src/app/core/download/queue_manager.py | 3 ++ src/app/core/sync/diff_engine.py | 2 +- src/app/core/sync/executor.py | 22 ++++++++++-- tests/test_reorder_and_delete.py | 46 ++++++++++++++++++++++++++ 7 files changed, 91 insertions(+), 4 deletions(-) create mode 100644 tests/test_reorder_and_delete.py diff --git a/README.md b/README.md index 6eafc28..037214d 100644 --- a/README.md +++ b/README.md @@ -34,6 +34,9 @@ Create/edit `config/yt-playlist-config.json`: ```json { "ffmpeg_path": "./bin/ffmpeg.exe", + "max_parallel_downloads": 2, + "retry_max_retries": 2, + "retry_delay_seconds": 1.5, "playlists": [ { "url": "https://www.youtube.com/playlist?list=YOUR_PLAYLIST_ID", @@ -46,10 +49,14 @@ Create/edit `config/yt-playlist-config.json`: ``` Defaults: + - `ffmpeg_path`: `./bin/ffmpeg.exe` (Windows) or `./bin/ffmpeg` (Linux) - `download_mode`: `video` - `max_download_quality`: `1080p` - `save_path`: `./downloads` +- `max_parallel_downloads`: `2` +- `retry_max_retries`: `2` +- `retry_delay_seconds`: `1.5` `max_download_quality`: @@ -62,6 +69,12 @@ Defaults: - `audio`: download muxed `.mp4`, extract `.mp3`, delete the `.mp4` - `both`: download muxed `.mp4`, extract `.mp3`, keep both files +Queue / retry: + +- `max_parallel_downloads`: number of concurrent download workers. +- `retry_max_retries`: how many times a failed download job is retried. +- `retry_delay_seconds`: base delay before retry; increases with backoff. + ## Run - Compute-only: diff --git a/config/yt-playlist-config.example.json b/config/yt-playlist-config.example.json index 05fd760..61f6e98 100644 --- a/config/yt-playlist-config.example.json +++ b/config/yt-playlist-config.example.json @@ -1,5 +1,8 @@ { "ffmpeg_path": "./bin/ffmpeg.exe", + "max_parallel_downloads": 2, + "retry_max_retries": 2, + "retry_delay_seconds": 1.5, "playlists": [ { "url": "https://www.youtube.com/playlist?list=YOUR_PLAYLIST_ID_HERE", diff --git a/src/app/config/settings.py b/src/app/config/settings.py index 724bd82..3f06e24 100644 --- a/src/app/config/settings.py +++ b/src/app/config/settings.py @@ -18,6 +18,9 @@ DEFAULT_CONFIG: Dict[str, Any] = { "max_download_quality": "1080p", "save_path": "./downloads", "ffmpeg_path": _default_ffmpeg_path(), + "max_parallel_downloads": 2, + "retry_max_retries": 2, + "retry_delay_seconds": 1.5, } @@ -63,6 +66,9 @@ class Settings: "max_download_quality": self.data.get("max_download_quality", DEFAULT_CONFIG["max_download_quality"]), "save_path": self.data.get("save_path", DEFAULT_CONFIG["save_path"]), "ffmpeg_path": self.data.get("ffmpeg_path", DEFAULT_CONFIG["ffmpeg_path"]), + "max_parallel_downloads": self.data.get("max_parallel_downloads", DEFAULT_CONFIG["max_parallel_downloads"]), + "retry_max_retries": self.data.get("retry_max_retries", DEFAULT_CONFIG["retry_max_retries"]), + "retry_delay_seconds": self.data.get("retry_delay_seconds", DEFAULT_CONFIG["retry_delay_seconds"]), } results: List[Dict[str, Any]] = [] diff --git a/src/app/core/download/queue_manager.py b/src/app/core/download/queue_manager.py index eaf9148..bf85399 100644 --- a/src/app/core/download/queue_manager.py +++ b/src/app/core/download/queue_manager.py @@ -59,3 +59,6 @@ class QueueManager: async def enqueue(self, job: DownloadJob): await self._queue.put(job) + + async def join(self) -> None: + await self._queue.join() diff --git a/src/app/core/sync/diff_engine.py b/src/app/core/sync/diff_engine.py index 9ebee55..20593d6 100644 --- a/src/app/core/sync/diff_engine.py +++ b/src/app/core/sync/diff_engine.py @@ -8,7 +8,7 @@ from ..models import FilesystemEntry, PlaylistItem, SyncAction, SyncActionType class DiffEngine: """ Compares remote playlist items, database state, and filesystem to - produce a list of actions. Initial MVP computes DOWNLOAD/RENAME/REORDER + produce a list of actions. Initial MVP computes DOWNLOAD/RENAME/DELETE based on simple filename scheme "0001 - Title.ext". """ diff --git a/src/app/core/sync/executor.py b/src/app/core/sync/executor.py index dcfb02e..bfab456 100644 --- a/src/app/core/sync/executor.py +++ b/src/app/core/sync/executor.py @@ -128,12 +128,28 @@ class ActionExecutor: async def _apply_downloads(self, actions: Iterable[SyncAction], mode: str, audio_root: Path, video_root: Path, playlist_cfg: dict) -> None: playlist_id = extract_playlist_id(playlist_cfg.get("url", "")) or playlist_cfg.get("url", "") - queue = QueueManager(concurrency=self.concurrency) + concurrency_cfg = playlist_cfg.get("max_parallel_downloads", self.concurrency) + try: + concurrency = int(concurrency_cfg) if concurrency_cfg is not None else self.concurrency + except Exception: + concurrency = self.concurrency + queue = QueueManager(concurrency=concurrency) + + retry_max_cfg = playlist_cfg.get("retry_max_retries", 2) + retry_delay_cfg = playlist_cfg.get("retry_delay_seconds", 1.5) + try: + retry_max_retries = int(retry_max_cfg) if retry_max_cfg is not None else 2 + except Exception: + retry_max_retries = 2 + try: + retry_delay_seconds = float(retry_delay_cfg) if retry_delay_cfg is not None else 1.5 + except Exception: + retry_delay_seconds = 1.5 async def worker(job: DownloadJob): if self.bus and job.item: await self.bus.publish("DownloadStarted", {"playlist_id": playlist_id, "video_id": job.item.video_id, "target": str(job.output_path)}) - await default_worker(job) + await default_worker(job, max_retries=retry_max_retries, delay_seconds=retry_delay_seconds) await queue.start(worker) try: @@ -223,7 +239,7 @@ class ActionExecutor: jobs.append(job) await queue.enqueue(job) finally: - await queue._queue.join() # wait for all jobs + await queue.join() # wait for all jobs await queue.stop() # Persist DB updates for completed jobs diff --git a/tests/test_reorder_and_delete.py b/tests/test_reorder_and_delete.py new file mode 100644 index 0000000..bf0a124 --- /dev/null +++ b/tests/test_reorder_and_delete.py @@ -0,0 +1,46 @@ +from __future__ import annotations + +from pathlib import Path + +from src.app.core.models import PlaylistItem, SyncAction, SyncActionType +from src.app.core.sync.executor import ActionExecutor +from src.app.core.sync.reorder import safe_multi_rename + + +def test_safe_multi_rename_swaps_files(tmp_path: Path): + a = tmp_path / "0001 - A.mp4" + b = tmp_path / "0002 - B.mp4" + a.write_text("A", encoding="utf-8") + b.write_text("B", encoding="utf-8") + + safe_multi_rename([(a, b), (b, a)]) + + assert (tmp_path / "0001 - A.mp4").read_text(encoding="utf-8") == "B" + assert (tmp_path / "0002 - B.mp4").read_text(encoding="utf-8") == "A" + + +def test_executor_deletes_to_recycle(tmp_path: Path): + class StubDB: + def clear_file_state(self, playlist_id: str, video_id: str) -> None: + return None + + executor = ActionExecutor(StubDB()) # type: ignore[arg-type] + + save_root = tmp_path / "downloads" + audio_root = save_root / "audio" + video_root = save_root / "video" + audio_root.mkdir(parents=True, exist_ok=True) + video_root.mkdir(parents=True, exist_ok=True) + + victim = audio_root / "0001 - X.mp3" + victim.write_text("x", encoding="utf-8") + + item = PlaylistItem(playlist_id="p", video_id="v", title="t", playlist_index=1, local_filename=victim.name, downloaded=True) + action = SyncAction(SyncActionType.DELETE, item=item, from_name=victim.name) + + executor._apply_deletions([action], audio_root, video_root, {"url": "p"}) # type: ignore[attr-defined] + + assert not victim.exists() + recycled = save_root / ".recycle" / "audio" / victim.name + assert recycled.exists() +