diff --git a/src/app/core/download/queue_manager.py b/src/app/core/download/queue_manager.py index eaa90c5..3bbc4a0 100644 --- a/src/app/core/download/queue_manager.py +++ b/src/app/core/download/queue_manager.py @@ -37,6 +37,13 @@ class DownloadJob: class QueueManager: + """A small asyncio worker pool for download jobs. + + Jobs are pushed into a shared queue and processed by a fixed number of + background tasks. This keeps the downloader concurrency bounded without + forcing the caller to manage worker lifetimes directly. + """ + def __init__(self, concurrency: int = 2) -> None: self._queue: "asyncio.Queue[DownloadJob]" = asyncio.Queue() self._concurrency = max(1, concurrency) @@ -44,6 +51,7 @@ class QueueManager: self._stopped = asyncio.Event() async def start(self, worker_coro): + """Start the worker tasks that drain the queue.""" async def runner(idx: int): while not self._stopped.is_set(): job = await self._queue.get() @@ -55,13 +63,16 @@ class QueueManager: self._workers = [asyncio.create_task(runner(i)) for i in range(self._concurrency)] async def stop(self): + """Cancel all worker tasks and mark the queue as stopped.""" self._stopped.set() for w in self._workers: w.cancel() self._workers.clear() async def enqueue(self, job: DownloadJob): + """Add a job to the shared queue.""" await self._queue.put(job) async def join(self) -> None: + """Block until every queued job has been acknowledged.""" await self._queue.join() diff --git a/src/app/core/sync/service.py b/src/app/core/sync/service.py index 67ca866..4a251d9 100644 --- a/src/app/core/sync/service.py +++ b/src/app/core/sync/service.py @@ -13,6 +13,13 @@ from ..utils.yt import extract_playlist_id class SyncService: + """High-level orchestration for a single playlist sync pass. + + The service pulls the latest remote playlist snapshot, persists the + playlist and item metadata in the database, and asks the diff engine to + compare the remote state with the local filesystem. + """ + def __init__(self, db: Database) -> None: self.db = db self.scanner = PlaylistScanner() @@ -28,6 +35,12 @@ class SyncService: return [".mp4"] def sync_from_config(self, playlist_cfg: dict) -> List[SyncAction]: + """Return the sync actions required to bring one playlist in sync. + + This method does not apply any changes itself. It normalizes the + configuration, refreshes the playlist/item records in SQLite, and then + computes the actions needed for the configured download mode. + """ url: str = playlist_cfg.get("url") mode: str = playlist_cfg.get("download_mode", "video") save_path = Path(playlist_cfg.get("save_path", "./downloads")).resolve()