diff --git a/src/app/core/sync/executor.py b/src/app/core/sync/executor.py index f4e70cd..b7689ce 100644 --- a/src/app/core/sync/executor.py +++ b/src/app/core/sync/executor.py @@ -18,12 +18,25 @@ from ..utils.rate_limit import is_youtube_rate_limit_error class ActionExecutor: + """Apply sync actions against the filesystem and persist their outcome. + + The executor is the imperative half of the sync pipeline: it publishes + lifecycle events, performs safe renames and deletions, coordinates the + download queue, and updates the database after each job completes. + """ + def __init__(self, db: Database, concurrency: int = 2, event_bus: EventBus | None = None) -> None: self.concurrency = max(1, concurrency) self.db = db self.bus = event_bus async def execute(self, actions: Iterable[SyncAction], playlist_cfg: dict, *, cancel_check=None, pause_check=None) -> None: + """Execute a batch of sync actions for one playlist. + + The workflow is intentionally ordered: announce the sync, wait for any + pause state to clear, validate dependencies, perform renames, recycle + deletions, and finally run downloads with bounded concurrency. + """ actions_list = list(actions) playlist_id = extract_playlist_id(playlist_cfg.get("url", "")) or playlist_cfg.get("url", "") start = time.monotonic() @@ -123,6 +136,7 @@ class ActionExecutor: ensure_ffmpeg_available(str(ffmpeg_hint) if ffmpeg_hint is not None else None) async def _apply_renames(self, actions: Iterable[SyncAction], audio_root: Path, video_root: Path, playlist_cfg: dict) -> None: + """Apply all rename actions in batches separated by output type.""" playlist_id = extract_playlist_id(playlist_cfg.get("url", "")) or playlist_cfg.get("url", "") audio_renames = [] video_renames = [] @@ -152,6 +166,7 @@ class ActionExecutor: await self.bus.publish("RenameApplied", {"playlist_id": playlist_id, "video_id": a.item.video_id, "to": a.to_name}) def _apply_deletions(self, actions: Iterable[SyncAction], audio_root: Path, video_root: Path, playlist_cfg: dict) -> None: + """Recycle or remove files that no longer belong to the playlist.""" playlist_id = extract_playlist_id(playlist_cfg.get("url", "")) or playlist_cfg.get("url", "") recycle_audio = audio_root.parent / ".recycle" / "audio" recycle_video = video_root.parent / ".recycle" / "video" @@ -198,6 +213,7 @@ class ActionExecutor: cancel_check=None, pause_check=None, ) -> None: + """Queue and run download jobs, then persist their final state.""" playlist_id = extract_playlist_id(playlist_cfg.get("url", "")) or playlist_cfg.get("url", "") loop = asyncio.get_running_loop() concurrency_cfg = playlist_cfg.get("max_parallel_downloads", self.concurrency)