mirror of
https://github.com/darkzoul5/YoutubePlaylistSync.git
synced 2026-07-03 04:23:59 +03:00
feat(backend): Implemented executor, safe renames, recycle deletes, and real yt-dlp downloads.
Extended service to compute actions for audio, video, and both.
This commit is contained in:
@@ -18,9 +18,49 @@ class Downloader:
|
||||
async def handle_job(self, job: DownloadJob):
|
||||
try:
|
||||
job.state = JobState.DOWNLOADING
|
||||
# TODO: Implement actual download via yt-dlp Python API or subprocess
|
||||
# For now, mark as completed without side effects.
|
||||
await self._download(job)
|
||||
job.state = JobState.COMPLETED
|
||||
except Exception as exc: # pragma: no cover - placeholder
|
||||
except Exception as exc: # pragma: no cover - environment dependent
|
||||
job.state = JobState.FAILED
|
||||
job.error = str(exc)
|
||||
|
||||
async def _download(self, job: DownloadJob):
|
||||
# Use yt-dlp Python API, executed in a worker thread
|
||||
import asyncio
|
||||
|
||||
def run():
|
||||
import yt_dlp # type: ignore
|
||||
|
||||
outtmpl = str(job.output_path)
|
||||
if job.mode == "audio":
|
||||
ydl_opts = {
|
||||
"format": "bestaudio/best",
|
||||
"outtmpl": outtmpl,
|
||||
"postprocessors": [
|
||||
{
|
||||
"key": "FFmpegExtractAudio",
|
||||
"preferredcodec": "mp3",
|
||||
"preferredquality": "0",
|
||||
}
|
||||
],
|
||||
"noplaylist": True,
|
||||
"quiet": True,
|
||||
"no_warnings": True,
|
||||
}
|
||||
else: # video
|
||||
ydl_opts = {
|
||||
"format": "bestvideo+bestaudio/best",
|
||||
"merge_output_format": "mp4",
|
||||
"outtmpl": outtmpl,
|
||||
"noplaylist": True,
|
||||
"quiet": True,
|
||||
"no_warnings": True,
|
||||
}
|
||||
|
||||
if self.ffmpeg_path:
|
||||
ydl_opts["ffmpeg_location"] = self.ffmpeg_path
|
||||
|
||||
with yt_dlp.YoutubeDL(ydl_opts) as ydl: # type: ignore[attr-defined]
|
||||
ydl.download([job.url])
|
||||
|
||||
await asyncio.to_thread(run)
|
||||
|
||||
@@ -3,6 +3,7 @@ from __future__ import annotations
|
||||
import asyncio
|
||||
from dataclasses import dataclass
|
||||
from enum import Enum
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
from ..models import PlaylistItem
|
||||
@@ -21,7 +22,9 @@ class JobState(str, Enum):
|
||||
@dataclass
|
||||
class DownloadJob:
|
||||
item: PlaylistItem
|
||||
output_name: Optional[str] = None
|
||||
output_path: Optional[Path] = None
|
||||
url: Optional[str] = None
|
||||
mode: str = "audio" # audio|video
|
||||
state: JobState = JobState.QUEUED
|
||||
error: Optional[str] = None
|
||||
|
||||
|
||||
@@ -0,0 +1,100 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import shutil
|
||||
from pathlib import Path
|
||||
from typing import Iterable, List
|
||||
|
||||
from ..download.queue_manager import DownloadJob, QueueManager
|
||||
from ..download.workers import default_worker
|
||||
from ..models import SyncAction, SyncActionType
|
||||
from ..sync.reorder import safe_multi_rename
|
||||
|
||||
|
||||
class ActionExecutor:
|
||||
def __init__(self, concurrency: int = 2) -> None:
|
||||
self.concurrency = max(1, concurrency)
|
||||
|
||||
async def execute(self, actions: Iterable[SyncAction], playlist_cfg: dict) -> None:
|
||||
save_path = Path(playlist_cfg.get("save_path", "./downloads")).resolve()
|
||||
mode = playlist_cfg.get("download_mode", "audio")
|
||||
|
||||
# Prepare roots
|
||||
audio_root = save_path / "audio"
|
||||
video_root = save_path / "video"
|
||||
audio_root.mkdir(parents=True, exist_ok=True)
|
||||
video_root.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# First, handle renames safely in batch per extension
|
||||
await self._apply_renames(actions, audio_root, video_root)
|
||||
|
||||
# Then, recycle deletions
|
||||
self._apply_deletions(actions, audio_root, video_root)
|
||||
|
||||
# Finally, perform downloads concurrently
|
||||
await self._apply_downloads(actions, mode, audio_root, video_root)
|
||||
|
||||
async def _apply_renames(self, actions: Iterable[SyncAction], audio_root: Path, video_root: Path) -> None:
|
||||
audio_renames = []
|
||||
video_renames = []
|
||||
for a in actions:
|
||||
if a.type != SyncActionType.RENAME or not a.from_name or not a.to_name:
|
||||
continue
|
||||
if a.to_name.endswith(".mp3"):
|
||||
audio_renames.append((audio_root / a.from_name, audio_root / a.to_name))
|
||||
elif a.to_name.endswith(".mp4"):
|
||||
video_renames.append((video_root / a.from_name, video_root / a.to_name))
|
||||
|
||||
if audio_renames:
|
||||
safe_multi_rename(audio_renames)
|
||||
if video_renames:
|
||||
safe_multi_rename(video_renames)
|
||||
|
||||
def _apply_deletions(self, actions: Iterable[SyncAction], audio_root: Path, video_root: Path) -> None:
|
||||
recycle_audio = audio_root.parent / ".recycle" / "audio"
|
||||
recycle_video = video_root.parent / ".recycle" / "video"
|
||||
recycle_audio.mkdir(parents=True, exist_ok=True)
|
||||
recycle_video.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
for a in actions:
|
||||
if a.type != SyncActionType.DELETE or not a.from_name:
|
||||
continue
|
||||
if a.from_name.endswith(".mp3"):
|
||||
src = audio_root / a.from_name
|
||||
dst = recycle_audio / a.from_name
|
||||
else:
|
||||
src = video_root / a.from_name
|
||||
dst = recycle_video / a.from_name
|
||||
if src.exists():
|
||||
try:
|
||||
if dst.exists():
|
||||
dst.unlink()
|
||||
shutil.move(str(src), str(dst))
|
||||
except Exception:
|
||||
# fallback to delete if move fails
|
||||
try:
|
||||
src.unlink()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
async def _apply_downloads(self, actions: Iterable[SyncAction], mode: str, audio_root: Path, video_root: Path) -> None:
|
||||
queue = QueueManager(concurrency=self.concurrency)
|
||||
|
||||
async def worker(job: DownloadJob):
|
||||
await default_worker(job)
|
||||
|
||||
await queue.start(worker)
|
||||
try:
|
||||
for a in actions:
|
||||
if a.type != SyncActionType.DOWNLOAD or not a.item or not a.to_name:
|
||||
continue
|
||||
is_audio = a.to_name.endswith(".mp3")
|
||||
root = audio_root if is_audio else video_root
|
||||
output_path = root / a.to_name
|
||||
output_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
url = f"https://www.youtube.com/watch?v={a.item.video_id}"
|
||||
job = DownloadJob(item=a.item, output_path=output_path, url=url, mode=("audio" if is_audio else "video"))
|
||||
await queue.enqueue(job)
|
||||
finally:
|
||||
await queue._queue.join() # wait for all jobs
|
||||
await queue.stop()
|
||||
@@ -0,0 +1,43 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from pathlib import Path
|
||||
from typing import Dict, Iterable, Tuple
|
||||
|
||||
|
||||
def safe_multi_rename(renames: Iterable[Tuple[Path, Path]]) -> None:
|
||||
"""
|
||||
Apply multiple renames safely using a two-pass strategy to avoid
|
||||
name collisions. Each item is a tuple (src_path, dst_path).
|
||||
"""
|
||||
temp_suffix = ".renametemp"
|
||||
planned = list(renames)
|
||||
existing_dests = {dst for _, dst in planned}
|
||||
|
||||
# Pass 1: move all sources that would collide to temporary names
|
||||
temps: Dict[Path, Path] = {}
|
||||
for src, dst in planned:
|
||||
if not src.exists():
|
||||
continue
|
||||
if src.name == dst.name:
|
||||
continue
|
||||
# If destination exists or another source will become destination, use temp
|
||||
if dst.exists() or dst in existing_dests:
|
||||
tmp = src.with_suffix(src.suffix + temp_suffix)
|
||||
# Ensure unique temp
|
||||
i = 0
|
||||
while tmp.exists():
|
||||
i += 1
|
||||
tmp = src.with_name(src.name + f".{i}" + temp_suffix)
|
||||
src.rename(tmp)
|
||||
temps[tmp] = dst
|
||||
else:
|
||||
# direct rename safe
|
||||
src.rename(dst)
|
||||
|
||||
# Pass 2: move all temp files to their final destinations
|
||||
for tmp, dst in temps.items():
|
||||
if not tmp.exists():
|
||||
continue
|
||||
if dst.exists():
|
||||
dst.unlink()
|
||||
tmp.rename(dst)
|
||||
@@ -19,12 +19,14 @@ class SyncService:
|
||||
self.scanner = PlaylistScanner()
|
||||
self.diff = DiffEngine()
|
||||
|
||||
def _mode_to_extension(self, mode: str) -> str:
|
||||
def _mode_to_extensions(self, mode: str) -> list[str]:
|
||||
if mode == "audio":
|
||||
return ".mp3"
|
||||
return [".mp3"]
|
||||
if mode == "video":
|
||||
return ".mp4"
|
||||
return ".mp3" # default for MVP
|
||||
return [".mp4"]
|
||||
if mode == "both":
|
||||
return [".mp3", ".mp4"]
|
||||
return [".mp3"]
|
||||
|
||||
def sync_from_config(self, playlist_cfg: dict) -> List[dict]:
|
||||
url: str = playlist_cfg.get("url")
|
||||
@@ -33,8 +35,6 @@ class SyncService:
|
||||
save_path.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
playlist_id = extract_playlist_id(url) or url
|
||||
ext = self._mode_to_extension(mode)
|
||||
|
||||
items = self.scanner.scan(url, playlist_id)
|
||||
|
||||
sanitized: List[PlaylistItem] = []
|
||||
@@ -76,11 +76,14 @@ class SyncService:
|
||||
downloaded=bool(row["downloaded"]),
|
||||
)
|
||||
|
||||
mode_dir = "audio" if ext == ".mp3" else "video"
|
||||
fs_root = (save_path / mode_dir)
|
||||
fs_entries = list_files(fs_root, [ext])
|
||||
|
||||
actions = self.diff.compute_actions(sanitized, db_index, fs_entries, ext)
|
||||
exts = self._mode_to_extensions(mode)
|
||||
merged_actions = []
|
||||
for ext in exts:
|
||||
mode_dir = "audio" if ext == ".mp3" else "video"
|
||||
fs_root = (save_path / mode_dir)
|
||||
fs_entries = list_files(fs_root, [ext])
|
||||
actions = self.diff.compute_actions(sanitized, db_index, fs_entries, ext)
|
||||
merged_actions.extend(actions)
|
||||
|
||||
return [
|
||||
{
|
||||
@@ -89,5 +92,5 @@ class SyncService:
|
||||
"from_name": a.from_name,
|
||||
"to_name": a.to_name,
|
||||
}
|
||||
for a in actions
|
||||
for a in merged_actions
|
||||
]
|
||||
|
||||
Reference in New Issue
Block a user