mirror of
https://github.com/darkzoul5/YoutubePlaylistSync.git
synced 2026-07-03 04:23:59 +03:00
feat(backend): add queue system; add retry system
This commit is contained in:
@@ -34,6 +34,9 @@ Create/edit `config/yt-playlist-config.json`:
|
||||
```json
|
||||
{
|
||||
"ffmpeg_path": "./bin/ffmpeg.exe",
|
||||
"max_parallel_downloads": 2,
|
||||
"retry_max_retries": 2,
|
||||
"retry_delay_seconds": 1.5,
|
||||
"playlists": [
|
||||
{
|
||||
"url": "https://www.youtube.com/playlist?list=YOUR_PLAYLIST_ID",
|
||||
@@ -46,10 +49,14 @@ Create/edit `config/yt-playlist-config.json`:
|
||||
```
|
||||
|
||||
Defaults:
|
||||
|
||||
- `ffmpeg_path`: `./bin/ffmpeg.exe` (Windows) or `./bin/ffmpeg` (Linux)
|
||||
- `download_mode`: `video`
|
||||
- `max_download_quality`: `1080p`
|
||||
- `save_path`: `./downloads`
|
||||
- `max_parallel_downloads`: `2`
|
||||
- `retry_max_retries`: `2`
|
||||
- `retry_delay_seconds`: `1.5`
|
||||
|
||||
`max_download_quality`:
|
||||
|
||||
@@ -62,6 +69,12 @@ Defaults:
|
||||
- `audio`: download muxed `.mp4`, extract `.mp3`, delete the `.mp4`
|
||||
- `both`: download muxed `.mp4`, extract `.mp3`, keep both files
|
||||
|
||||
Queue / retry:
|
||||
|
||||
- `max_parallel_downloads`: number of concurrent download workers.
|
||||
- `retry_max_retries`: how many times a failed download job is retried.
|
||||
- `retry_delay_seconds`: base delay before retry; increases with backoff.
|
||||
|
||||
## Run
|
||||
|
||||
- Compute-only:
|
||||
|
||||
@@ -1,5 +1,8 @@
|
||||
{
|
||||
"ffmpeg_path": "./bin/ffmpeg.exe",
|
||||
"max_parallel_downloads": 2,
|
||||
"retry_max_retries": 2,
|
||||
"retry_delay_seconds": 1.5,
|
||||
"playlists": [
|
||||
{
|
||||
"url": "https://www.youtube.com/playlist?list=YOUR_PLAYLIST_ID_HERE",
|
||||
|
||||
@@ -18,6 +18,9 @@ DEFAULT_CONFIG: Dict[str, Any] = {
|
||||
"max_download_quality": "1080p",
|
||||
"save_path": "./downloads",
|
||||
"ffmpeg_path": _default_ffmpeg_path(),
|
||||
"max_parallel_downloads": 2,
|
||||
"retry_max_retries": 2,
|
||||
"retry_delay_seconds": 1.5,
|
||||
}
|
||||
|
||||
|
||||
@@ -63,6 +66,9 @@ class Settings:
|
||||
"max_download_quality": self.data.get("max_download_quality", DEFAULT_CONFIG["max_download_quality"]),
|
||||
"save_path": self.data.get("save_path", DEFAULT_CONFIG["save_path"]),
|
||||
"ffmpeg_path": self.data.get("ffmpeg_path", DEFAULT_CONFIG["ffmpeg_path"]),
|
||||
"max_parallel_downloads": self.data.get("max_parallel_downloads", DEFAULT_CONFIG["max_parallel_downloads"]),
|
||||
"retry_max_retries": self.data.get("retry_max_retries", DEFAULT_CONFIG["retry_max_retries"]),
|
||||
"retry_delay_seconds": self.data.get("retry_delay_seconds", DEFAULT_CONFIG["retry_delay_seconds"]),
|
||||
}
|
||||
|
||||
results: List[Dict[str, Any]] = []
|
||||
|
||||
@@ -59,3 +59,6 @@ class QueueManager:
|
||||
|
||||
async def enqueue(self, job: DownloadJob):
|
||||
await self._queue.put(job)
|
||||
|
||||
async def join(self) -> None:
|
||||
await self._queue.join()
|
||||
|
||||
@@ -8,7 +8,7 @@ from ..models import FilesystemEntry, PlaylistItem, SyncAction, SyncActionType
|
||||
class DiffEngine:
|
||||
"""
|
||||
Compares remote playlist items, database state, and filesystem to
|
||||
produce a list of actions. Initial MVP computes DOWNLOAD/RENAME/REORDER
|
||||
produce a list of actions. Initial MVP computes DOWNLOAD/RENAME/DELETE
|
||||
based on simple filename scheme "0001 - Title.ext".
|
||||
"""
|
||||
|
||||
|
||||
@@ -128,12 +128,28 @@ class ActionExecutor:
|
||||
|
||||
async def _apply_downloads(self, actions: Iterable[SyncAction], mode: str, audio_root: Path, video_root: Path, playlist_cfg: dict) -> None:
|
||||
playlist_id = extract_playlist_id(playlist_cfg.get("url", "")) or playlist_cfg.get("url", "")
|
||||
queue = QueueManager(concurrency=self.concurrency)
|
||||
concurrency_cfg = playlist_cfg.get("max_parallel_downloads", self.concurrency)
|
||||
try:
|
||||
concurrency = int(concurrency_cfg) if concurrency_cfg is not None else self.concurrency
|
||||
except Exception:
|
||||
concurrency = self.concurrency
|
||||
queue = QueueManager(concurrency=concurrency)
|
||||
|
||||
retry_max_cfg = playlist_cfg.get("retry_max_retries", 2)
|
||||
retry_delay_cfg = playlist_cfg.get("retry_delay_seconds", 1.5)
|
||||
try:
|
||||
retry_max_retries = int(retry_max_cfg) if retry_max_cfg is not None else 2
|
||||
except Exception:
|
||||
retry_max_retries = 2
|
||||
try:
|
||||
retry_delay_seconds = float(retry_delay_cfg) if retry_delay_cfg is not None else 1.5
|
||||
except Exception:
|
||||
retry_delay_seconds = 1.5
|
||||
|
||||
async def worker(job: DownloadJob):
|
||||
if self.bus and job.item:
|
||||
await self.bus.publish("DownloadStarted", {"playlist_id": playlist_id, "video_id": job.item.video_id, "target": str(job.output_path)})
|
||||
await default_worker(job)
|
||||
await default_worker(job, max_retries=retry_max_retries, delay_seconds=retry_delay_seconds)
|
||||
|
||||
await queue.start(worker)
|
||||
try:
|
||||
@@ -223,7 +239,7 @@ class ActionExecutor:
|
||||
jobs.append(job)
|
||||
await queue.enqueue(job)
|
||||
finally:
|
||||
await queue._queue.join() # wait for all jobs
|
||||
await queue.join() # wait for all jobs
|
||||
await queue.stop()
|
||||
|
||||
# Persist DB updates for completed jobs
|
||||
|
||||
@@ -0,0 +1,46 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
from src.app.core.models import PlaylistItem, SyncAction, SyncActionType
|
||||
from src.app.core.sync.executor import ActionExecutor
|
||||
from src.app.core.sync.reorder import safe_multi_rename
|
||||
|
||||
|
||||
def test_safe_multi_rename_swaps_files(tmp_path: Path):
|
||||
a = tmp_path / "0001 - A.mp4"
|
||||
b = tmp_path / "0002 - B.mp4"
|
||||
a.write_text("A", encoding="utf-8")
|
||||
b.write_text("B", encoding="utf-8")
|
||||
|
||||
safe_multi_rename([(a, b), (b, a)])
|
||||
|
||||
assert (tmp_path / "0001 - A.mp4").read_text(encoding="utf-8") == "B"
|
||||
assert (tmp_path / "0002 - B.mp4").read_text(encoding="utf-8") == "A"
|
||||
|
||||
|
||||
def test_executor_deletes_to_recycle(tmp_path: Path):
|
||||
class StubDB:
|
||||
def clear_file_state(self, playlist_id: str, video_id: str) -> None:
|
||||
return None
|
||||
|
||||
executor = ActionExecutor(StubDB()) # type: ignore[arg-type]
|
||||
|
||||
save_root = tmp_path / "downloads"
|
||||
audio_root = save_root / "audio"
|
||||
video_root = save_root / "video"
|
||||
audio_root.mkdir(parents=True, exist_ok=True)
|
||||
video_root.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
victim = audio_root / "0001 - X.mp3"
|
||||
victim.write_text("x", encoding="utf-8")
|
||||
|
||||
item = PlaylistItem(playlist_id="p", video_id="v", title="t", playlist_index=1, local_filename=victim.name, downloaded=True)
|
||||
action = SyncAction(SyncActionType.DELETE, item=item, from_name=victim.name)
|
||||
|
||||
executor._apply_deletions([action], audio_root, video_root, {"url": "p"}) # type: ignore[attr-defined]
|
||||
|
||||
assert not victim.exists()
|
||||
recycled = save_root / ".recycle" / "audio" / victim.name
|
||||
assert recycled.exists()
|
||||
|
||||
Reference in New Issue
Block a user