1
0
mirror of https://github.com/darkzoul5/YoutubePlaylistSync.git synced 2026-07-03 04:23:59 +03:00

start work on project refactor;

create file structure
move old code to /src/old
This commit is contained in:
2026-05-15 10:52:10 +03:00
parent 0cea4cfcb8
commit 0ab96e4399
31 changed files with 443 additions and 14 deletions
+4 -4
View File
@@ -200,8 +200,8 @@ jobs:
- name: Set docker image names
run: |
VERSION="${{ steps.version.outputs.version }}"
echo "RELEASE_IMAGE=ghcr.io/${GITHUB_ACTOR}/ytpld:${VERSION}" >> $GITHUB_ENV
echo "LATEST_IMAGE=ghcr.io/${GITHUB_ACTOR}/ytpld:latest" >> $GITHUB_ENV
echo "RELEASE_IMAGE=ghcr.io/${GITHUB_ACTOR}/ytst:${VERSION}" >> $GITHUB_ENV
echo "LATEST_IMAGE=ghcr.io/${GITHUB_ACTOR}/ytst:latest" >> $GITHUB_ENV
- name: Build Docker image
run: |
@@ -248,9 +248,9 @@ jobs:
- name: Push Docker images
run: |
docker load -i "${{ github.workspace }}/artifacts/docker-images/docker-image.tar"
docker push ghcr.io/${GITHUB_ACTOR}/ytpld:${{ steps.version.outputs.version }}
docker push ghcr.io/${GITHUB_ACTOR}/ytst:${{ steps.version.outputs.version }}
docker load -i "${{ github.workspace }}/artifacts/docker-images/docker-image-latest.tar"
docker push ghcr.io/${GITHUB_ACTOR}/ytpld:latest
docker push ghcr.io/${GITHUB_ACTOR}/ytst:latest
- name: Create Release
uses: softprops/action-gh-release@v3
+10
View File
@@ -0,0 +1,10 @@
"""
App package: backend foundation for playlist sync (no GUI).
This package is the new, state-driven backend. It is intentionally
minimal at this stage and will be filled out iteratively.
"""
__all__ = [
"core",
]
+1
View File
@@ -0,0 +1 @@
"""Config loader for the new backend (separate from legacy)."""
+33
View File
@@ -0,0 +1,33 @@
from __future__ import annotations
import json
from pathlib import Path
from typing import Any, Dict, List, Optional
DEFAULT_CONFIG: Dict[str, Any] = {
"playlists": [],
"download_mode": "audio",
"max_video_quality": "1080p",
"save_path": "./downloads",
"yt_dlp_path": "yt-dlp",
"ffmpeg_path": "ffmpeg",
}
class Settings:
def __init__(self, config_path: Optional[Path] = None) -> None:
base_dir = Path("config")
base_dir.mkdir(parents=True, exist_ok=True)
self.path = (config_path or (base_dir / "yt-playlist-config.json")).resolve()
self.data: Dict[str, Any] = dict(DEFAULT_CONFIG)
if self.path.exists():
try:
self.data.update(json.loads(self.path.read_text(encoding="utf-8")))
except Exception:
# Leave defaults if invalid JSON; validation can be added later.
pass
@property
def playlists(self) -> List[Dict[str, Any]]:
return list(self.data.get("playlists", []))
+10
View File
@@ -0,0 +1,10 @@
"""Core backend modules (scanner, sync, download, db, scheduler, events)."""
__all__ = [
"scanner",
"sync",
"download",
"database",
"scheduler",
"events",
]
+1
View File
@@ -0,0 +1 @@
"""Database helpers (SQLite)."""
+63
View File
@@ -0,0 +1,63 @@
from __future__ import annotations
import sqlite3
from pathlib import Path
from typing import Iterable
SCHEMA = """
PRAGMA journal_mode=WAL;
CREATE TABLE IF NOT EXISTS playlists (
id TEXT PRIMARY KEY,
name TEXT,
url TEXT,
path TEXT,
mode TEXT,
auto_sync INTEGER,
sync_interval_minutes INTEGER,
last_sync TEXT
);
CREATE TABLE IF NOT EXISTS playlist_items (
playlist_id TEXT,
video_id TEXT,
title TEXT,
playlist_index INTEGER,
local_filename TEXT,
downloaded INTEGER,
last_seen TEXT,
PRIMARY KEY (playlist_id, video_id)
);
"""
class Database:
def __init__(self, db_path: Path) -> None:
self.path = db_path
self.path.parent.mkdir(parents=True, exist_ok=True)
self._conn = sqlite3.connect(self.path)
self._conn.row_factory = sqlite3.Row
self._migrate()
def _migrate(self) -> None:
with self._conn:
self._conn.executescript(SCHEMA)
def upsert_playlist_items(self, rows: Iterable[tuple]):
sql = (
"INSERT INTO playlist_items (playlist_id, video_id, title, playlist_index, local_filename, downloaded, last_seen) "
"VALUES (?, ?, ?, ?, ?, ?, datetime('now')) "
"ON CONFLICT(playlist_id, video_id) DO UPDATE SET "
"title=excluded.title, playlist_index=excluded.playlist_index, local_filename=excluded.local_filename, "
"downloaded=excluded.downloaded, last_seen=datetime('now')"
)
with self._conn:
self._conn.executemany(sql, rows)
def get_items_index(self, playlist_id: str) -> dict[str, sqlite3.Row]:
cur = self._conn.execute(
"SELECT * FROM playlist_items WHERE playlist_id = ?",
(playlist_id,),
)
return {row["video_id"]: row for row in cur.fetchall()}
+26
View File
@@ -0,0 +1,26 @@
from __future__ import annotations
from typing import Optional
from .queue_manager import DownloadJob, JobState
class Downloader:
"""
Thin wrapper around yt-dlp usage. For MVP, this is a placeholder
where actual download logic will land (audio/video/both).
"""
def __init__(self, yt_dlp_path: Optional[str] = None, ffmpeg_path: Optional[str] = None) -> None:
self.yt_dlp_path = yt_dlp_path
self.ffmpeg_path = ffmpeg_path
async def handle_job(self, job: DownloadJob):
try:
job.state = JobState.DOWNLOADING
# TODO: Implement actual download via yt-dlp Python API or subprocess
# For now, mark as completed without side effects.
job.state = JobState.COMPLETED
except Exception as exc: # pragma: no cover - placeholder
job.state = JobState.FAILED
job.error = str(exc)
+54
View File
@@ -0,0 +1,54 @@
from __future__ import annotations
import asyncio
from dataclasses import dataclass
from enum import Enum
from typing import Optional
from ..models import PlaylistItem
class JobState(str, Enum):
QUEUED = "Queued"
DOWNLOADING = "Downloading"
CONVERTING = "Converting"
COMPLETED = "Completed"
FAILED = "Failed"
SKIPPED = "Skipped"
CANCELLED = "Cancelled"
@dataclass
class DownloadJob:
item: PlaylistItem
output_name: Optional[str] = None
state: JobState = JobState.QUEUED
error: Optional[str] = None
class QueueManager:
def __init__(self, concurrency: int = 2) -> None:
self._queue: "asyncio.Queue[DownloadJob]" = asyncio.Queue()
self._concurrency = max(1, concurrency)
self._workers: list[asyncio.Task[None]] = []
self._stopped = asyncio.Event()
async def start(self, worker_coro):
async def runner(idx: int):
while not self._stopped.is_set():
job = await self._queue.get()
try:
await worker_coro(job)
finally:
self._queue.task_done()
self._workers = [asyncio.create_task(runner(i)) for i in range(self._concurrency)]
async def stop(self):
self._stopped.set()
for w in self._workers:
w.cancel()
self._workers.clear()
async def enqueue(self, job: DownloadJob):
await self._queue.put(job)
+9
View File
@@ -0,0 +1,9 @@
from __future__ import annotations
from .downloader import Downloader
from .queue_manager import DownloadJob
async def default_worker(job: DownloadJob):
dl = Downloader()
await dl.handle_job(job)
+21
View File
@@ -0,0 +1,21 @@
from __future__ import annotations
from collections import defaultdict
from typing import Any, Awaitable, Callable, DefaultDict, Dict, List
EventHandler = Callable[[Dict[str, Any]], Awaitable[None]]
class EventBus:
"""Simple async pub/sub event bus used by backend and (later) GUI."""
def __init__(self) -> None:
self._subs: DefaultDict[str, List[EventHandler]] = defaultdict(list)
def subscribe(self, event_name: str, handler: EventHandler) -> None:
self._subs[event_name].append(handler)
async def publish(self, event_name: str, payload: Dict[str, Any]) -> None:
for h in list(self._subs.get(event_name, [])):
await h(payload)
+50
View File
@@ -0,0 +1,50 @@
from __future__ import annotations
from dataclasses import dataclass
from enum import Enum
from pathlib import Path
from typing import Optional
class DownloadMode(str, Enum):
audio = "audio"
video = "video"
both = "both"
@dataclass(frozen=True)
class Playlist:
id: str
name: Optional[str]
url: str
path: Path
mode: DownloadMode = DownloadMode.audio
auto_sync: bool = False
sync_interval_minutes: int = 0
@dataclass(frozen=True)
class PlaylistItem:
playlist_id: str
video_id: str
title: str
playlist_index: int
local_filename: Optional[str] = None
downloaded: bool = False
class SyncActionType(str, Enum):
DOWNLOAD = "DOWNLOAD"
DELETE = "DELETE"
RENAME = "RENAME"
REORDER = "REORDER"
SKIP = "SKIP"
REPAIR = "REPAIR"
@dataclass(frozen=True)
class SyncAction:
type: SyncActionType
item: Optional[PlaylistItem] = None
from_name: Optional[str] = None
to_name: Optional[str] = None
+54
View File
@@ -0,0 +1,54 @@
from __future__ import annotations
from typing import List
from ..models import PlaylistItem
class PlaylistScanner:
"""
Fetches remote playlist entries using yt-dlp (no downloads).
This class intentionally avoids strict dependencies at import time. If
yt_dlp is unavailable, call sites should handle the raised ImportError.
"""
def __init__(self) -> None:
pass
def scan(self, playlist_url: str, playlist_id: str) -> List[PlaylistItem]:
try:
import yt_dlp # type: ignore
except Exception as exc: # pragma: no cover - environment dependent
raise ImportError("yt_dlp is required to scan playlists") from exc
ydl_opts = {
"extract_flat": True,
"skip_download": True,
"quiet": True,
"dump_single_json": True,
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl: # type: ignore[attr-defined]
info = ydl.extract_info(playlist_url, download=False)
entries = info.get("entries", []) if isinstance(info, dict) else []
items: List[PlaylistItem] = []
for idx, v in enumerate(entries, start=1):
if not v:
continue
title = v.get("title") or "[Unknown]"
if title in ("[Deleted video]", "[Private video]"):
continue
vid = v.get("id") or ""
if not vid:
continue
items.append(
PlaylistItem(
playlist_id=playlist_id,
video_id=vid,
title=title,
playlist_index=idx,
)
)
return items
+20
View File
@@ -0,0 +1,20 @@
from __future__ import annotations
from datetime import timedelta
from typing import Awaitable, Callable
class Scheduler:
"""
Lightweight placeholder for background scheduling. This can later be
swapped for APScheduler without changing call sites.
"""
def __init__(self) -> None:
self._jobs: list[tuple[timedelta, Callable[[], Awaitable[None]]]] = []
def every(self, interval: timedelta, coro_factory: Callable[[], Awaitable[None]]):
self._jobs.append((interval, coro_factory))
return self
# A full implementation will run an event loop and await jobs.
+55
View File
@@ -0,0 +1,55 @@
from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
from typing import Iterable, List, Mapping, Sequence
from ..models import PlaylistItem, SyncAction, SyncActionType
@dataclass(frozen=True)
class FilesystemEntry:
name: str
path: Path
class DiffEngine:
"""
Compares remote playlist items, database state, and filesystem to
produce a list of actions. Initial MVP computes DOWNLOAD/RENAME/REORDER
based on simple filename scheme "0001 - Title.ext".
"""
def compute_actions(
self,
remote: Sequence[PlaylistItem],
db_index: Mapping[str, PlaylistItem],
fs_entries: Iterable[FilesystemEntry],
extension: str,
) -> List[SyncAction]:
actions: List[SyncAction] = []
desired_names = {
item.video_id: f"{item.playlist_index:04d} - {item.title}{extension}"
for item in remote
}
fs_by_name = {e.name: e for e in fs_entries}
for item in remote:
desired_name = desired_names[item.video_id]
if item.local_filename == desired_name and desired_name in fs_by_name:
continue
if desired_name in fs_by_name:
actions.append(SyncAction(SyncActionType.RENAME, item=item, from_name=item.local_filename, to_name=desired_name))
continue
actions.append(SyncAction(SyncActionType.DOWNLOAD, item=item, to_name=desired_name))
known_ids = {i.video_id for i in remote}
for vid, db_item in db_index.items():
if vid not in known_ids and db_item.local_filename:
actions.append(SyncAction(SyncActionType.DELETE, item=db_item, from_name=db_item.local_filename))
return actions
+22
View File
@@ -0,0 +1,22 @@
from __future__ import annotations
"""
Entry point for the new backend (no GUI). For now, this only verifies
that configuration and database setup work. Future iterations will wire
up scanner, diff engine, queue, and scheduler.
"""
from pathlib import Path
from .config.settings import Settings
from .core.database.db import Database
def bootstrap(db_path: Path | None = None) -> None:
settings = Settings()
db = Database((db_path or Path("app/data/app.db")).resolve())
_ = settings, db # silence linters for now
if __name__ == "__main__":
bootstrap()
View File
View File
+1 -1
View File
@@ -11,7 +11,7 @@ import sys
import shutil
import time
from pathlib import Path
from src.downloader import PlaylistDownloader
from src.old.downloader import PlaylistDownloader
from tests.dummy_config import DummyConfig
# Make imports robust when running the script directly from different working directories.
# Ensure the repository root is on sys.path so the script can import `src`.
+1 -1
View File
@@ -1,5 +1,5 @@
import logging
from src.manager import PlaylistManager
from src.old.manager import PlaylistManager
from tests.dummy_config import DummyConfig
+1 -1
View File
@@ -2,7 +2,7 @@ import logging
import subprocess
from types import SimpleNamespace
import src.cli as cli_mod
import src.old.cli as cli_mod
class DummyCompleted(SimpleNamespace):
+1 -1
View File
@@ -1,6 +1,6 @@
import json
from src.config import ConfigLoader
from src.old.config import ConfigLoader
def test_config_loader_reads_properties(tmp_path, monkeypatch):
+1 -1
View File
@@ -1,7 +1,7 @@
import subprocess
import shutil
from src.downloader import PlaylistDownloader
from src.old.downloader import PlaylistDownloader
from tests.dummy_config import DummyConfig
+1 -1
View File
@@ -1,6 +1,6 @@
from pathlib import Path
from src.downloader import PlaylistDownloader
from src.old.downloader import PlaylistDownloader
from tests.dummy_config import DummyConfig
+1 -1
View File
@@ -2,7 +2,7 @@ import json
import subprocess
from types import SimpleNamespace
from src.downloader import PlaylistDownloader
from src.old.downloader import PlaylistDownloader
from tests.dummy_config import DummyConfig
+1 -1
View File
@@ -1,6 +1,6 @@
import logging
from tests.dummy_config import DummyConfig
from src.manager import PlaylistManager
from src.old.manager import PlaylistManager
def test_manager_warns_and_sleeps(monkeypatch, caplog):
+1 -1
View File
@@ -1,5 +1,5 @@
import logging
from src.manager import PlaylistManager
from src.old.manager import PlaylistManager
from tests.dummy_config import DummyConfig
+1 -1
View File
@@ -1,6 +1,6 @@
from pathlib import Path
from src.downloader import PlaylistDownloader
from src.old.downloader import PlaylistDownloader
from tests.dummy_config import DummyConfig