mirror of
https://github.com/darkzoul5/YoutubePlaylistSync.git
synced 2026-07-03 04:23:59 +03:00
add code comments to queue manager and service.py
This commit is contained in:
@@ -37,6 +37,13 @@ class DownloadJob:
|
||||
|
||||
|
||||
class QueueManager:
|
||||
"""A small asyncio worker pool for download jobs.
|
||||
|
||||
Jobs are pushed into a shared queue and processed by a fixed number of
|
||||
background tasks. This keeps the downloader concurrency bounded without
|
||||
forcing the caller to manage worker lifetimes directly.
|
||||
"""
|
||||
|
||||
def __init__(self, concurrency: int = 2) -> None:
|
||||
self._queue: "asyncio.Queue[DownloadJob]" = asyncio.Queue()
|
||||
self._concurrency = max(1, concurrency)
|
||||
@@ -44,6 +51,7 @@ class QueueManager:
|
||||
self._stopped = asyncio.Event()
|
||||
|
||||
async def start(self, worker_coro):
|
||||
"""Start the worker tasks that drain the queue."""
|
||||
async def runner(idx: int):
|
||||
while not self._stopped.is_set():
|
||||
job = await self._queue.get()
|
||||
@@ -55,13 +63,16 @@ class QueueManager:
|
||||
self._workers = [asyncio.create_task(runner(i)) for i in range(self._concurrency)]
|
||||
|
||||
async def stop(self):
|
||||
"""Cancel all worker tasks and mark the queue as stopped."""
|
||||
self._stopped.set()
|
||||
for w in self._workers:
|
||||
w.cancel()
|
||||
self._workers.clear()
|
||||
|
||||
async def enqueue(self, job: DownloadJob):
|
||||
"""Add a job to the shared queue."""
|
||||
await self._queue.put(job)
|
||||
|
||||
async def join(self) -> None:
|
||||
"""Block until every queued job has been acknowledged."""
|
||||
await self._queue.join()
|
||||
|
||||
@@ -13,6 +13,13 @@ from ..utils.yt import extract_playlist_id
|
||||
|
||||
|
||||
class SyncService:
|
||||
"""High-level orchestration for a single playlist sync pass.
|
||||
|
||||
The service pulls the latest remote playlist snapshot, persists the
|
||||
playlist and item metadata in the database, and asks the diff engine to
|
||||
compare the remote state with the local filesystem.
|
||||
"""
|
||||
|
||||
def __init__(self, db: Database) -> None:
|
||||
self.db = db
|
||||
self.scanner = PlaylistScanner()
|
||||
@@ -28,6 +35,12 @@ class SyncService:
|
||||
return [".mp4"]
|
||||
|
||||
def sync_from_config(self, playlist_cfg: dict) -> List[SyncAction]:
|
||||
"""Return the sync actions required to bring one playlist in sync.
|
||||
|
||||
This method does not apply any changes itself. It normalizes the
|
||||
configuration, refreshes the playlist/item records in SQLite, and then
|
||||
computes the actions needed for the configured download mode.
|
||||
"""
|
||||
url: str = playlist_cfg.get("url")
|
||||
mode: str = playlist_cfg.get("download_mode", "video")
|
||||
save_path = Path(playlist_cfg.get("save_path", "./downloads")).resolve()
|
||||
|
||||
Reference in New Issue
Block a user