1
0
mirror of https://github.com/darkzoul5/YoutubePlaylistSync.git synced 2026-07-03 04:23:59 +03:00

feat(backend): scaffold state-based sync foundation (no GUI)

Add core scanner, diff engine, SQLite DB, queue, events, scheduler, utils
Wire settings + bootstrap to compute actions;
This commit is contained in:
2026-05-15 11:48:36 +03:00
parent 6d8649ac2d
commit abd3c2ed62
9 changed files with 160 additions and 10 deletions
+1
View File
@@ -1,4 +1,5 @@
{ {
"python.terminal.activateEnvironment": true,
"python.testing.pytestArgs": [ "python.testing.pytestArgs": [
"tests" "tests"
], ],
+6
View File
@@ -48,3 +48,9 @@ class SyncAction:
item: Optional[PlaylistItem] = None item: Optional[PlaylistItem] = None
from_name: Optional[str] = None from_name: Optional[str] = None
to_name: Optional[str] = None to_name: Optional[str] = None
@dataclass(frozen=True)
class FilesystemEntry:
name: str
path: Path
+1 -9
View File
@@ -1,16 +1,8 @@
from __future__ import annotations from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
from typing import Iterable, List, Mapping, Sequence from typing import Iterable, List, Mapping, Sequence
from ..models import PlaylistItem, SyncAction, SyncActionType from ..models import FilesystemEntry, PlaylistItem, SyncAction, SyncActionType
@dataclass(frozen=True)
class FilesystemEntry:
name: str
path: Path
class DiffEngine: class DiffEngine:
+17
View File
@@ -0,0 +1,17 @@
from __future__ import annotations
from pathlib import Path
from typing import Iterable, List, Sequence
from ..models import FilesystemEntry
def list_files(root: Path, extensions: Sequence[str]) -> List[FilesystemEntry]:
exts = {e.lower() for e in extensions}
results: List[FilesystemEntry] = []
if not root.exists():
return results
for p in root.glob("**/*"):
if p.is_file() and p.suffix.lower() in exts:
results.append(FilesystemEntry(name=p.name, path=p))
return results
+93
View File
@@ -0,0 +1,93 @@
from __future__ import annotations
from dataclasses import asdict
from pathlib import Path
from typing import List
from ..database.db import Database
from ..models import PlaylistItem
from ..scanner.playlist_scanner import PlaylistScanner
from ..sync.diff_engine import DiffEngine
from ..sync.filesystem import list_files
from ..utils.naming import make_filename, sanitize_title
from ..utils.yt import extract_playlist_id
class SyncService:
def __init__(self, db: Database) -> None:
self.db = db
self.scanner = PlaylistScanner()
self.diff = DiffEngine()
def _mode_to_extension(self, mode: str) -> str:
if mode == "audio":
return ".mp3"
if mode == "video":
return ".mp4"
return ".mp3" # default for MVP
def sync_from_config(self, playlist_cfg: dict) -> List[dict]:
url: str = playlist_cfg.get("url")
mode: str = playlist_cfg.get("download_mode", "audio")
save_path = Path(playlist_cfg.get("save_path", "./downloads")).resolve()
save_path.mkdir(parents=True, exist_ok=True)
playlist_id = extract_playlist_id(url) or url
ext = self._mode_to_extension(mode)
items = self.scanner.scan(url, playlist_id)
sanitized: List[PlaylistItem] = []
for it in items:
safe_title = sanitize_title(it.title, it.video_id)
sanitized.append(
PlaylistItem(
playlist_id=it.playlist_id,
video_id=it.video_id,
title=safe_title,
playlist_index=it.playlist_index,
local_filename=None,
downloaded=False,
)
)
rows = [
(
it.playlist_id,
it.video_id,
it.title,
it.playlist_index,
None,
0,
)
for it in sanitized
]
self.db.upsert_playlist_items(rows)
db_index_rows = self.db.get_items_index(playlist_id)
db_index: dict[str, PlaylistItem] = {}
for vid, row in db_index_rows.items():
db_index[vid] = PlaylistItem(
playlist_id=row["playlist_id"],
video_id=row["video_id"],
title=row["title"],
playlist_index=row["playlist_index"],
local_filename=row["local_filename"],
downloaded=bool(row["downloaded"]),
)
mode_dir = "audio" if ext == ".mp3" else "video"
fs_root = (save_path / mode_dir)
fs_entries = list_files(fs_root, [ext])
actions = self.diff.compute_actions(sanitized, db_index, fs_entries, ext)
return [
{
"type": a.type,
"video_id": a.item.video_id if a.item else None,
"from_name": a.from_name,
"to_name": a.to_name,
}
for a in actions
]
+1
View File
@@ -0,0 +1 @@
"""Utility helpers for naming, parsing, etc."""
+16
View File
@@ -0,0 +1,16 @@
from __future__ import annotations
from dataclasses import dataclass
ILLEGAL_CHARS = '<>:"/\\|?*'
def sanitize_title(title: str, fallback: str) -> str:
table = str.maketrans({c: "-" for c in ILLEGAL_CHARS})
safe = (title or "").translate(table).strip()
return safe if safe else fallback
def make_filename(index: int, title: str, ext: str, width: int = 4) -> str:
return f"{index:0{width}d} - {title}{ext}"
+14
View File
@@ -0,0 +1,14 @@
from __future__ import annotations
from urllib.parse import parse_qs, urlparse
def extract_playlist_id(url: str) -> str | None:
try:
parsed = urlparse(url)
qs = parse_qs(parsed.query)
if "list" in qs and qs.get("list"):
return qs.get("list", [None])[0]
return None
except Exception:
return None
+11 -1
View File
@@ -10,12 +10,22 @@ from pathlib import Path
from .config.settings import Settings from .config.settings import Settings
from .core.database.db import Database from .core.database.db import Database
from .core.sync.service import SyncService
def bootstrap(db_path: Path | None = None) -> None: def bootstrap(db_path: Path | None = None) -> None:
settings = Settings() settings = Settings()
db = Database((db_path or Path("app/data/app.db")).resolve()) db = Database((db_path or Path("app/data/app.db")).resolve())
_ = settings, db # silence linters for now service = SyncService(db)
# Iterate configured playlists and compute actions (no execution yet)
for pl in settings.playlists:
try:
actions = service.sync_from_config(pl)
# For now, just print summary for visibility during development
print(f"Computed {len(actions)} actions for playlist: {pl.get('url')}")
except Exception as exc: # keep bootstrap resilient during early dev
print(f"Failed to sync playlist {pl.get('url')}: {exc}")
if __name__ == "__main__": if __name__ == "__main__":