From e5ad786bcf6c9b9c47eebbd31e97d72589d388f0 Mon Sep 17 00:00:00 2001 From: DARKZOUL5 Date: Fri, 15 May 2026 14:32:48 +0300 Subject: [PATCH] feat(backend): Implemented executor, safe renames, recycle deletes, and real yt-dlp downloads. Extended service to compute actions for audio, video, and both. --- src/app/core/download/downloader.py | 46 +++++++++++- src/app/core/download/queue_manager.py | 5 +- src/app/core/sync/executor.py | 100 +++++++++++++++++++++++++ src/app/core/sync/reorder.py | 43 +++++++++++ src/app/core/sync/service.py | 27 ++++--- 5 files changed, 205 insertions(+), 16 deletions(-) create mode 100644 src/app/core/sync/executor.py create mode 100644 src/app/core/sync/reorder.py diff --git a/src/app/core/download/downloader.py b/src/app/core/download/downloader.py index b03b29c..b2cedfe 100644 --- a/src/app/core/download/downloader.py +++ b/src/app/core/download/downloader.py @@ -18,9 +18,49 @@ class Downloader: async def handle_job(self, job: DownloadJob): try: job.state = JobState.DOWNLOADING - # TODO: Implement actual download via yt-dlp Python API or subprocess - # For now, mark as completed without side effects. + await self._download(job) job.state = JobState.COMPLETED - except Exception as exc: # pragma: no cover - placeholder + except Exception as exc: # pragma: no cover - environment dependent job.state = JobState.FAILED job.error = str(exc) + + async def _download(self, job: DownloadJob): + # Use yt-dlp Python API, executed in a worker thread + import asyncio + + def run(): + import yt_dlp # type: ignore + + outtmpl = str(job.output_path) + if job.mode == "audio": + ydl_opts = { + "format": "bestaudio/best", + "outtmpl": outtmpl, + "postprocessors": [ + { + "key": "FFmpegExtractAudio", + "preferredcodec": "mp3", + "preferredquality": "0", + } + ], + "noplaylist": True, + "quiet": True, + "no_warnings": True, + } + else: # video + ydl_opts = { + "format": "bestvideo+bestaudio/best", + "merge_output_format": "mp4", + "outtmpl": outtmpl, + "noplaylist": True, + "quiet": True, + "no_warnings": True, + } + + if self.ffmpeg_path: + ydl_opts["ffmpeg_location"] = self.ffmpeg_path + + with yt_dlp.YoutubeDL(ydl_opts) as ydl: # type: ignore[attr-defined] + ydl.download([job.url]) + + await asyncio.to_thread(run) diff --git a/src/app/core/download/queue_manager.py b/src/app/core/download/queue_manager.py index cd49bcc..ee20a3f 100644 --- a/src/app/core/download/queue_manager.py +++ b/src/app/core/download/queue_manager.py @@ -3,6 +3,7 @@ from __future__ import annotations import asyncio from dataclasses import dataclass from enum import Enum +from pathlib import Path from typing import Optional from ..models import PlaylistItem @@ -21,7 +22,9 @@ class JobState(str, Enum): @dataclass class DownloadJob: item: PlaylistItem - output_name: Optional[str] = None + output_path: Optional[Path] = None + url: Optional[str] = None + mode: str = "audio" # audio|video state: JobState = JobState.QUEUED error: Optional[str] = None diff --git a/src/app/core/sync/executor.py b/src/app/core/sync/executor.py new file mode 100644 index 0000000..001372b --- /dev/null +++ b/src/app/core/sync/executor.py @@ -0,0 +1,100 @@ +from __future__ import annotations + +import asyncio +import shutil +from pathlib import Path +from typing import Iterable, List + +from ..download.queue_manager import DownloadJob, QueueManager +from ..download.workers import default_worker +from ..models import SyncAction, SyncActionType +from ..sync.reorder import safe_multi_rename + + +class ActionExecutor: + def __init__(self, concurrency: int = 2) -> None: + self.concurrency = max(1, concurrency) + + async def execute(self, actions: Iterable[SyncAction], playlist_cfg: dict) -> None: + save_path = Path(playlist_cfg.get("save_path", "./downloads")).resolve() + mode = playlist_cfg.get("download_mode", "audio") + + # Prepare roots + audio_root = save_path / "audio" + video_root = save_path / "video" + audio_root.mkdir(parents=True, exist_ok=True) + video_root.mkdir(parents=True, exist_ok=True) + + # First, handle renames safely in batch per extension + await self._apply_renames(actions, audio_root, video_root) + + # Then, recycle deletions + self._apply_deletions(actions, audio_root, video_root) + + # Finally, perform downloads concurrently + await self._apply_downloads(actions, mode, audio_root, video_root) + + async def _apply_renames(self, actions: Iterable[SyncAction], audio_root: Path, video_root: Path) -> None: + audio_renames = [] + video_renames = [] + for a in actions: + if a.type != SyncActionType.RENAME or not a.from_name or not a.to_name: + continue + if a.to_name.endswith(".mp3"): + audio_renames.append((audio_root / a.from_name, audio_root / a.to_name)) + elif a.to_name.endswith(".mp4"): + video_renames.append((video_root / a.from_name, video_root / a.to_name)) + + if audio_renames: + safe_multi_rename(audio_renames) + if video_renames: + safe_multi_rename(video_renames) + + def _apply_deletions(self, actions: Iterable[SyncAction], audio_root: Path, video_root: Path) -> None: + recycle_audio = audio_root.parent / ".recycle" / "audio" + recycle_video = video_root.parent / ".recycle" / "video" + recycle_audio.mkdir(parents=True, exist_ok=True) + recycle_video.mkdir(parents=True, exist_ok=True) + + for a in actions: + if a.type != SyncActionType.DELETE or not a.from_name: + continue + if a.from_name.endswith(".mp3"): + src = audio_root / a.from_name + dst = recycle_audio / a.from_name + else: + src = video_root / a.from_name + dst = recycle_video / a.from_name + if src.exists(): + try: + if dst.exists(): + dst.unlink() + shutil.move(str(src), str(dst)) + except Exception: + # fallback to delete if move fails + try: + src.unlink() + except Exception: + pass + + async def _apply_downloads(self, actions: Iterable[SyncAction], mode: str, audio_root: Path, video_root: Path) -> None: + queue = QueueManager(concurrency=self.concurrency) + + async def worker(job: DownloadJob): + await default_worker(job) + + await queue.start(worker) + try: + for a in actions: + if a.type != SyncActionType.DOWNLOAD or not a.item or not a.to_name: + continue + is_audio = a.to_name.endswith(".mp3") + root = audio_root if is_audio else video_root + output_path = root / a.to_name + output_path.parent.mkdir(parents=True, exist_ok=True) + url = f"https://www.youtube.com/watch?v={a.item.video_id}" + job = DownloadJob(item=a.item, output_path=output_path, url=url, mode=("audio" if is_audio else "video")) + await queue.enqueue(job) + finally: + await queue._queue.join() # wait for all jobs + await queue.stop() diff --git a/src/app/core/sync/reorder.py b/src/app/core/sync/reorder.py new file mode 100644 index 0000000..b15e383 --- /dev/null +++ b/src/app/core/sync/reorder.py @@ -0,0 +1,43 @@ +from __future__ import annotations + +from pathlib import Path +from typing import Dict, Iterable, Tuple + + +def safe_multi_rename(renames: Iterable[Tuple[Path, Path]]) -> None: + """ + Apply multiple renames safely using a two-pass strategy to avoid + name collisions. Each item is a tuple (src_path, dst_path). + """ + temp_suffix = ".renametemp" + planned = list(renames) + existing_dests = {dst for _, dst in planned} + + # Pass 1: move all sources that would collide to temporary names + temps: Dict[Path, Path] = {} + for src, dst in planned: + if not src.exists(): + continue + if src.name == dst.name: + continue + # If destination exists or another source will become destination, use temp + if dst.exists() or dst in existing_dests: + tmp = src.with_suffix(src.suffix + temp_suffix) + # Ensure unique temp + i = 0 + while tmp.exists(): + i += 1 + tmp = src.with_name(src.name + f".{i}" + temp_suffix) + src.rename(tmp) + temps[tmp] = dst + else: + # direct rename safe + src.rename(dst) + + # Pass 2: move all temp files to their final destinations + for tmp, dst in temps.items(): + if not tmp.exists(): + continue + if dst.exists(): + dst.unlink() + tmp.rename(dst) diff --git a/src/app/core/sync/service.py b/src/app/core/sync/service.py index 4b251d0..675a33e 100644 --- a/src/app/core/sync/service.py +++ b/src/app/core/sync/service.py @@ -19,12 +19,14 @@ class SyncService: self.scanner = PlaylistScanner() self.diff = DiffEngine() - def _mode_to_extension(self, mode: str) -> str: + def _mode_to_extensions(self, mode: str) -> list[str]: if mode == "audio": - return ".mp3" + return [".mp3"] if mode == "video": - return ".mp4" - return ".mp3" # default for MVP + return [".mp4"] + if mode == "both": + return [".mp3", ".mp4"] + return [".mp3"] def sync_from_config(self, playlist_cfg: dict) -> List[dict]: url: str = playlist_cfg.get("url") @@ -33,8 +35,6 @@ class SyncService: save_path.mkdir(parents=True, exist_ok=True) playlist_id = extract_playlist_id(url) or url - ext = self._mode_to_extension(mode) - items = self.scanner.scan(url, playlist_id) sanitized: List[PlaylistItem] = [] @@ -76,11 +76,14 @@ class SyncService: downloaded=bool(row["downloaded"]), ) - mode_dir = "audio" if ext == ".mp3" else "video" - fs_root = (save_path / mode_dir) - fs_entries = list_files(fs_root, [ext]) - - actions = self.diff.compute_actions(sanitized, db_index, fs_entries, ext) + exts = self._mode_to_extensions(mode) + merged_actions = [] + for ext in exts: + mode_dir = "audio" if ext == ".mp3" else "video" + fs_root = (save_path / mode_dir) + fs_entries = list_files(fs_root, [ext]) + actions = self.diff.compute_actions(sanitized, db_index, fs_entries, ext) + merged_actions.extend(actions) return [ { @@ -89,5 +92,5 @@ class SyncService: "from_name": a.from_name, "to_name": a.to_name, } - for a in actions + for a in merged_actions ]