1
0
mirror of https://github.com/darkzoul5/YoutubePlaylistSync.git synced 2026-07-04 04:53:58 +03:00

8 Commits

Author SHA1 Message Date
dark_zoul e7f1dbc1f7 fix 2026-05-16 16:32:28 +03:00
dark_zoul 8e3a7e3920 establish config defaults 2026-05-16 16:22:24 +03:00
dark_zoul 5d4cba3df3 refactor: remove docker related files; 2026-05-16 16:20:37 +03:00
dark_zoul b17913e21b readme: update :) 2026-05-16 16:14:57 +03:00
dark_zoul 98fab7838a refactor: remove old tests 2026-05-16 16:13:50 +03:00
dark_zoul 1928f70928 feat: make sure max_video_quality setting is working 2026-05-16 16:07:52 +03:00
dark_zoul 4cd6255b0f readme: update config section 2026-05-16 15:58:05 +03:00
dark_zoul e8535d335d readme: update confg and dependencies sections 2026-05-16 15:55:30 +03:00
24 changed files with 159 additions and 811 deletions
+21 -9
View File
@@ -19,33 +19,45 @@ Local-first YouTube playlist synchronization client.
## Requirements ## Requirements
- Python 3.10+ - Python 3.10+
- `yt-dlp` (pip) - `ffmpeg` (needed for `audio` and `both` modes)
- `ffmpeg` (only needed for audio extraction / "both" mode)
Install: Quick start:
```bash Download the latest release from [releases](https://github.com/darkzoul5/YoutubePlaylistSyncThing/releases) page
pip install -U yt-dlp
```
## Configure ## Configure
On first run, the app will auto-create a default `config/yt-playlist-config.json` (if missing).
Create/edit `config/yt-playlist-config.json`: Create/edit `config/yt-playlist-config.json`:
```json ```json
{ {
"ffmpeg_path": "./bin/ffmpeg.exe",
"playlists": [ "playlists": [
{ {
"url": "https://www.youtube.com/playlist?list=YOUR_PLAYLIST_ID", "url": "https://www.youtube.com/playlist?list=YOUR_PLAYLIST_ID",
"download_mode": "audio", "download_mode": "video",
"max_download_quality": "1080p",
"save_path": "./downloads" "save_path": "./downloads"
} }
], ]
"ffmpeg_path": "./ffmpeg"
} }
``` ```
Defaults:
- `ffmpeg_path`: `./bin/ffmpeg.exe` (Windows) or `./bin/ffmpeg` (Linux)
- `download_mode`: `video`
- `max_download_quality`: `1080p`
- `save_path`: `./downloads`
`max_download_quality`:
- Limits yt-dlp download quality (e.g. `"1080p"`, `"720p"`, `"360p"`). This only affects the downloaded video format selection.
- If the requested max quality isn't available for a video, the best available quality is chosen.
`download_mode`: `download_mode`:
- `video`: download playlist videos as muxed `.mp4` (no ffmpeg processing) - `video`: download playlist videos as muxed `.mp4` (no ffmpeg processing)
- `audio`: download muxed `.mp4`, extract `.mp3`, delete the `.mp4` - `audio`: download muxed `.mp4`, extract `.mp3`, delete the `.mp4`
- `both`: download muxed `.mp4`, extract `.mp3`, keep both files - `both`: download muxed `.mp4`, extract `.mp3`, keep both files
+4 -4
View File
@@ -1,11 +1,11 @@
{ {
"ffmpeg_path": "./bin/ffmpeg.exe",
"playlists": [ "playlists": [
{ {
"url": "https://www.youtube.com/playlist?list=YOUR_PLAYLIST_ID_HERE", "url": "https://www.youtube.com/playlist?list=YOUR_PLAYLIST_ID_HERE",
"download_mode": "audio", "download_mode": "video",
"max_video_quality": "1080p", "max_download_quality": "1080p",
"save_path": "./downloads" "save_path": "./downloads"
} }
], ]
"ffmpeg_path": "./ffmpeg"
} }
-89
View File
@@ -1,89 +0,0 @@
.gitea/
.github/
.venv/
./bin/
# Python bytecode
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
build/
dist/
*.egg-info/
.eggs/
pip-wheel-metadata/
# Installer logs
pip-log.txt
# Virtual environments
venv/
ENV/
env/
env.bak/
venv.bak/
# pyenv
.python-version
# Test and coverage
.pytest_cache/
.coverage
coverage.xml
htmlcov/
# Type checkers
.mypy_cache/
.dmypy.json
dmypy.json
# Pyright
.pyright/
# IDEs and editors
.vscode/
.idea/
*.sublime-workspace
*.sublime-project
# OS files
.DS_Store
Thumbs.db
# Logs
*.log
logs/
# Local config and secrets (do NOT include if you intentionally want them in image)
config/yt-playlist-config.json
.env
.env.*
*.secret
secrets.json
# Docker files and Compose (ignore local overrides)
Dockerfile*
docker-compose*.yml
docker-compose*.yaml
# Git and VCS
.git/
.gitignore
# Gitea and CI artifacts
.gitea/workflows/
dist/
# Node (if present)
node_modules/
# Poetry / Pipenv
Pipfile.lock
poetry.lock
# compiled python
*.pyc
-23
View File
@@ -1,23 +0,0 @@
FROM python:3.13-alpine
WORKDIR /app
# Copy application code (package) and bootstrap
COPY yt-playlist-main.py /app/
COPY src/ /app/
COPY config/ /app/config/
# Copy helper binaries from the build context (which includes extracted artifacts)
COPY bin/ffmpeg /app/bin/ffmpeg
COPY bin/yt-dlp /app/bin/yt-dlp
COPY bin/aria2c /app/bin/aria2c
# Copy entrypoint that maps environment variables to CLI flags
COPY docker-entrypoint.sh /app/docker-entrypoint.sh
RUN chmod +x /app/docker-entrypoint.sh && chmod +x /app/bin/* || true
# Put the bundled bin directory first in PATH
ENV PATH="/app/bin:${PATH}"
ENTRYPOINT ["/app/docker-entrypoint.sh"]
CMD [""]
-9
View File
@@ -1,9 +0,0 @@
version: '3.8'
services:
ytplst:
image: git.darkzoul.org/dark_zoul/ytplst:latest
container_name: ytplst
restart: no
volumes:
- /path/to/downloads:/app/downloads
- /path/to/config:/app/config
-146
View File
@@ -1,146 +0,0 @@
#!/bin/sh
# Entry point for the ytplaylist container.
set -e
# Map environment variables to CLI flags
ARGS=""
if [ "${YTPL_DEBUG:-0}" != "0" ]; then
ARGS="$ARGS --debug"
fi
if [ "${YTPL_PRUNE:-0}" != "0" ]; then
ARGS="$ARGS --prune"
fi
if [ "${YTPL_YES:-0}" != "0" ]; then
ARGS="$ARGS --yes"
fi
if [ -n "${YTPL_CONFIG}" ]; then
ARGS="$ARGS --config ${YTPL_CONFIG}"
fi
# If environment-based configuration is provided, materialize it into /app/config/yt-playlist-config.json
# Supported methods (priority order):
# 1) YTPL_CONFIG_JSON -> full JSON payload for the entire config
# 2) YTPL_PLAYLISTS_JSON -> JSON array assigned to 'playlists' key in the base config
# 3) PLAYLIST_{N}_{FIELD} env vars, e.g. PLAYLIST_0_URL, PLAYLIST_0_DOWNLOAD_MODE, etc.
# Top-level overrides (optional): YTPL_MAX_PARALLEL_DOWNLOADS, YTPL_ARIA2C_CONNECTIONS, YTPL_MAX_VIDEO_QUALITY, YTPL_DOWNLOAD_MODE
if [ -n "${YTPL_CONFIG_JSON:-}" ] || [ -n "${YTPL_PLAYLISTS_JSON:-}" ] || env | grep -q '^PLAYLIST_' ; then
python - <<'PY'
import os, json, sys
from pathlib import Path
config_dir = Path('/app/config')
config_dir.mkdir(parents=True, exist_ok=True)
config_path = config_dir / 'yt-playlist-config.json'
# Load existing config if present, otherwise start with a minimal default
base = {
'playlists': [
{
'url': 'https://www.youtube.com/playlist?list=YOUR_PLAYLIST_ID_HERE',
'download_mode': 'audio',
'max_video_quality': '1080p',
'save_path': './downloads',
'archive': 'archive.txt'
}
],
'yt_dlp_path': 'yt-dlp',
'ffmpeg_path': 'ffmpeg',
'aria2c_path': 'aria2c',
'max_parallel_downloads': 10,
'aria2c_connections': 8,
}
if config_path.exists():
try:
with config_path.open('r', encoding='utf-8') as f:
base = json.load(f)
except Exception:
# if existing file is invalid, continue with our base and overwrite below
pass
# 1) Full config JSON
cfg_json = os.environ.get('YTPL_CONFIG_JSON')
if cfg_json:
try:
cfg = json.loads(cfg_json)
with config_path.open('w', encoding='utf-8') as f:
json.dump(cfg, f, indent=2)
except Exception as e:
print('ERROR: failed to parse YTPL_CONFIG_JSON:', e, file=sys.stderr)
sys.exit(1)
sys.exit(0)
# 2) Playlists JSON
pl_json = os.environ.get('YTPL_PLAYLISTS_JSON')
if pl_json:
try:
playlists = json.loads(pl_json)
if isinstance(playlists, list):
base['playlists'] = playlists
else:
raise ValueError('YTPL_PLAYLISTS_JSON must be a JSON array')
except Exception as e:
print('ERROR: failed to parse YTPL_PLAYLISTS_JSON:', e, file=sys.stderr)
sys.exit(1)
# 3) Indexed PLAYLIST_{N}_{FIELD} variables
playlists = {}
for k, v in os.environ.items():
if not k.startswith('PLAYLIST_'):
continue
parts = k.split('_', 2)
if len(parts) < 3:
continue
_, idx, field = parts
try:
i = int(idx)
except Exception:
continue
playlists.setdefault(i, {})[field.lower()] = v
if playlists:
# convert to ordered list
built = [playlists[i] for i in sorted(playlists.keys())]
base['playlists'] = built
# Top-level overrides
overrides = {
'max_parallel_downloads': 'YTPL_MAX_PARALLEL_DOWNLOADS',
'aria2c_connections': 'YTPL_ARIA2C_CONNECTIONS',
'max_video_quality': 'YTPL_MAX_VIDEO_QUALITY',
'download_mode': 'YTPL_DOWNLOAD_MODE',
}
for key, envname in overrides.items():
if envname in os.environ and os.environ[envname] != '':
val = os.environ[envname]
# cast numbers where appropriate
if key in ('max_parallel_downloads', 'aria2c_connections'):
try:
val = int(val)
except Exception:
pass
base[key] = val
# Write resulting config
try:
with config_path.open('w', encoding='utf-8') as f:
json.dump(base, f, indent=2)
except Exception as e:
print('ERROR: failed to write config file:', e, file=sys.stderr)
sys.exit(1)
PY
fi
# Allow the user to pass extra args to the container
if [ "$#" -gt 0 ]; then
exec python -m ytplaylist.cli $ARGS "$@"
else
exec python -m ytplaylist.cli $ARGS
fi
+41 -12
View File
@@ -1,37 +1,66 @@
from __future__ import annotations from __future__ import annotations
import json import json
import os
from pathlib import Path from pathlib import Path
from typing import Any, Dict, List, Optional from typing import Any, Dict, List, Optional
def _default_ffmpeg_path() -> str:
if os.name == "nt":
return "./bin/ffmpeg.exe"
return "./bin/ffmpeg"
DEFAULT_CONFIG: Dict[str, Any] = { DEFAULT_CONFIG: Dict[str, Any] = {
"playlists": [], "playlists": [],
"download_mode": "audio", "download_mode": "video",
"max_video_quality": "1080p", "max_download_quality": "1080p",
"save_path": "./downloads", "save_path": "./downloads",
"ffmpeg_path": "ffmpeg", "ffmpeg_path": _default_ffmpeg_path(),
} }
class Settings: class Settings:
def __init__(self, config_path: Optional[Path] = None) -> None: def __init__(self) -> None:
base_dir = Path("config") base_dir = Path("config")
base_dir.mkdir(parents=True, exist_ok=True) base_dir.mkdir(parents=True, exist_ok=True)
self.path = (config_path or (base_dir / "yt-playlist-config.json")).resolve() self.path = (base_dir / "yt-playlist-config.json").resolve()
self.data: Dict[str, Any] = dict(DEFAULT_CONFIG) self.data: Dict[str, Any] = dict(DEFAULT_CONFIG)
if self.path.exists():
try: # Ensure there is always a config file at the default path.
self.data.update(json.loads(self.path.read_text(encoding="utf-8"))) if not self.path.exists():
except Exception: self._write_default_config(self.path)
# Leave defaults if invalid JSON; validation can be added later.
pass self._load_from_path(self.path)
def _load_from_path(self, path: Path) -> None:
try:
self.data.update(json.loads(path.read_text(encoding="utf-8")))
except Exception:
# Leave defaults if invalid JSON; validation can be added later.
pass
def _write_default_config(self, path: Path) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
default_payload: Dict[str, Any] = {
"playlists": [
{
"url": "https://www.youtube.com/playlist?list=YOUR_PLAYLIST_ID",
"download_mode": "video",
"max_download_quality": "1080p",
"save_path": "./downloads",
}
],
"ffmpeg_path": _default_ffmpeg_path(),
}
path.write_text(json.dumps(default_payload, indent=2) + "\n", encoding="utf-8")
@property @property
def playlists(self) -> List[Dict[str, Any]]: def playlists(self) -> List[Dict[str, Any]]:
global_defaults = { global_defaults = {
"download_mode": self.data.get("download_mode", DEFAULT_CONFIG["download_mode"]), "download_mode": self.data.get("download_mode", DEFAULT_CONFIG["download_mode"]),
"max_video_quality": self.data.get("max_video_quality", DEFAULT_CONFIG["max_video_quality"]), "max_download_quality": self.data.get("max_download_quality", DEFAULT_CONFIG["max_download_quality"]),
"save_path": self.data.get("save_path", DEFAULT_CONFIG["save_path"]), "save_path": self.data.get("save_path", DEFAULT_CONFIG["save_path"]),
"ffmpeg_path": self.data.get("ffmpeg_path", DEFAULT_CONFIG["ffmpeg_path"]), "ffmpeg_path": self.data.get("ffmpeg_path", DEFAULT_CONFIG["ffmpeg_path"]),
} }
+32 -1
View File
@@ -15,6 +15,35 @@ class Downloader:
self.yt_dlp_path = yt_dlp_path self.yt_dlp_path = yt_dlp_path
self.ffmpeg_path = ffmpeg_path self.ffmpeg_path = ffmpeg_path
@staticmethod
def build_format(max_download_quality) -> str:
def parse_height_cap(value) -> int | None:
if value is None:
return 1080
if isinstance(value, int):
return value if value > 0 else None
s = str(value).strip().lower()
if not s:
return 1080
if s in {"best", "max", "auto"}:
return None
if s in {"none", "null"}:
return 1080
digits = "".join(ch for ch in s if ch.isdigit())
if not digits:
return None
try:
cap = int(digits)
except Exception:
return None
return cap if cap > 0 else None
cap = parse_height_cap(max_download_quality)
if cap is not None:
#if the requested cap isn't available, we still download the best mp4.
return f"best[ext=mp4][acodec!=none][vcodec!=none][height<={cap}]/best[ext=mp4][height<={cap}]/best[ext=mp4]"
return "best[ext=mp4][acodec!=none][vcodec!=none]/best[ext=mp4]"
async def handle_job(self, job: DownloadJob): async def handle_job(self, job: DownloadJob):
try: try:
job.state = JobState.DOWNLOADING job.state = JobState.DOWNLOADING
@@ -53,12 +82,14 @@ class Downloader:
outtmpl = str(job.output_path) outtmpl = str(job.output_path)
fmt = self.build_format(getattr(job, "max_download_quality", None))
# All modes download a single muxed mp4 when possible. # All modes download a single muxed mp4 when possible.
# This avoids any ffmpeg-driven merging during the download step, satisfying: # This avoids any ffmpeg-driven merging during the download step, satisfying:
# - video: "original file, no processing" # - video: "original file, no processing"
# - audio/both: extraction is done separately after download # - audio/both: extraction is done separately after download
ydl_opts = { ydl_opts = {
"format": "best[ext=mp4][acodec!=none][vcodec!=none]/best[ext=mp4]", "format": fmt,
"outtmpl": outtmpl, "outtmpl": outtmpl,
"noplaylist": True, "noplaylist": True,
"quiet": True, "quiet": True,
+1
View File
@@ -28,6 +28,7 @@ class DownloadJob:
state: JobState = JobState.QUEUED state: JobState = JobState.QUEUED
error: Optional[str] = None error: Optional[str] = None
ffmpeg_path: Optional[str] = None ffmpeg_path: Optional[str] = None
max_download_quality: Optional[str] = None
audio_output_path: Optional[Path] = None # when mode=video and we also want mp3 audio_output_path: Optional[Path] = None # when mode=video and we also want mp3
keep_video: bool = True keep_video: bool = True
+5 -1
View File
@@ -25,7 +25,7 @@ class ActionExecutor:
self._preflight_dependencies(actions, playlist_cfg) self._preflight_dependencies(actions, playlist_cfg)
save_path = Path(playlist_cfg.get("save_path", "./downloads")).resolve() save_path = Path(playlist_cfg.get("save_path", "./downloads")).resolve()
mode = playlist_cfg.get("download_mode", "audio") mode = playlist_cfg.get("download_mode", "video")
# Prepare roots # Prepare roots
audio_root = save_path / "audio" audio_root = save_path / "audio"
@@ -152,6 +152,7 @@ class ActionExecutor:
d["video"] = a.to_name d["video"] = a.to_name
ffmpeg_cfg = str(playlist_cfg.get("ffmpeg_path", "ffmpeg")) if playlist_cfg.get("ffmpeg_path") is not None else None ffmpeg_cfg = str(playlist_cfg.get("ffmpeg_path", "ffmpeg")) if playlist_cfg.get("ffmpeg_path") is not None else None
max_quality_cfg = playlist_cfg.get("max_download_quality")
temp_video_root = video_root / ".tmp" temp_video_root = video_root / ".tmp"
temp_video_root.mkdir(parents=True, exist_ok=True) temp_video_root.mkdir(parents=True, exist_ok=True)
@@ -176,6 +177,7 @@ class ActionExecutor:
url=url, url=url,
mode="video", mode="video",
ffmpeg_path=ffmpeg_cfg, ffmpeg_path=ffmpeg_cfg,
max_download_quality=max_quality_cfg,
audio_output_path=audio_path, audio_output_path=audio_path,
) )
jobs.append(job) jobs.append(job)
@@ -200,6 +202,7 @@ class ActionExecutor:
url=url, url=url,
mode="video", mode="video",
ffmpeg_path=ffmpeg_cfg, ffmpeg_path=ffmpeg_cfg,
max_download_quality=max_quality_cfg,
audio_output_path=audio_path, audio_output_path=audio_path,
keep_video=False, keep_video=False,
) )
@@ -215,6 +218,7 @@ class ActionExecutor:
url=url, url=url,
mode="video", mode="video",
ffmpeg_path=ffmpeg_cfg, ffmpeg_path=ffmpeg_cfg,
max_download_quality=max_quality_cfg,
) )
jobs.append(job) jobs.append(job)
await queue.enqueue(job) await queue.enqueue(job)
-9
View File
@@ -1,9 +0,0 @@
import pytest
from tests.dummy_config import DummyConfig
@pytest.fixture
def dummy_config():
"""Return a fresh DummyConfig instance for tests to customize."""
return DummyConfig()
-19
View File
@@ -1,19 +0,0 @@
import os
class DummyConfig:
"""Small test configuration object used by unit and integration tests.
Adjust attributes via environment variables where appropriate.
"""
yt_dlp_path = os.getenv("YTDLP_PATH", "yt-dlp")
ffmpeg_path = os.getenv("FFMPEG_PATH", "ffmpeg")
aria2c_path = os.getenv("ARIA2C_PATH", "aria2c")
max_parallel_downloads = int(os.getenv("TEST_MAX_PARALLEL", "2"))
aria2c_connections = int(os.getenv("TEST_ARIA2C_CONN", "2"))
download_mode = os.getenv("TEST_DOWNLOAD_MODE", "audio")
max_video_quality = os.getenv("TEST_MAX_VIDEO_QUALITY", "1080p")
# runtime flags
debug = False
non_interactive = False
prune = False
-154
View File
@@ -1,154 +0,0 @@
"""
Full integration test (opt-in):
- Set environment variable INTEGRATION_TEST=1 to enable
- Optionally set TEST_PLAYLIST_URL to a full playlist URL; otherwise the built-in playlist id will be used
This script will attempt to download real audio/video for a small playlist (3 items).
It will run three modes: audio, video, and both. It is intentionally opt-in to avoid accidental large downloads.
"""
import os
import sys
import shutil
import time
from pathlib import Path
from src.old.downloader import PlaylistDownloader
from tests.dummy_config import DummyConfig
# Make imports robust when running the script directly from different working directories.
# Ensure the repository root is on sys.path so the script can import `src`.
REPO_ROOT = Path(__file__).resolve().parents[1]
if str(REPO_ROOT) not in sys.path:
sys.path.insert(0, str(REPO_ROOT))
if not os.getenv("INTEGRATION_TEST"):
print("Skipping full integration test (set INTEGRATION_TEST=1 to enable)")
sys.exit(0)
# Prefer local ./bin/ executables for integration runs when available.
# Set environment variables before importing TempConfig so its class attributes
# pick up these overridden paths.
bin_dir = REPO_ROOT / "bin" / "linux"
if bin_dir.exists():
ytdlp_path = bin_dir / "yt-dlp"
if ytdlp_path.exists():
os.environ.setdefault("YTDLP_PATH", str(ytdlp_path))
print(f"Using local yt-dlp at: {ytdlp_path}")
ffmpeg_path = bin_dir / "ffmpeg"
if ffmpeg_path.exists():
os.environ.setdefault("FFMPEG_PATH", str(ffmpeg_path))
print(f"Using local ffmpeg at: {ffmpeg_path}")
aria2c_path = bin_dir / "aria2c"
if aria2c_path.exists():
os.environ.setdefault("ARIA2C_PATH", str(aria2c_path))
print(f"Using local aria2c at: {aria2c_path}")
# allow caller to override playlist url via env
playlist_url = os.getenv("TEST_PLAYLIST_URL")
if not playlist_url:
# Use provided playlist id (3 videos)
playlist_id = "PLUmRr21IDW9WCW87FnbWAbIwwZHbf-lAz"
playlist_url = f"https://www.youtube.com/playlist?list={playlist_id}"
print(f"Using playlist URL: {playlist_url}")
cfg_base = DummyConfig()
# ensure yt-dlp exists
if not shutil.which(str(cfg_base.yt_dlp_path)):
print(f"yt-dlp binary not found at '{cfg_base.yt_dlp_path}'. Please install yt-dlp or set YTDLP_PATH environment variable.")
sys.exit(2)
MODES = ["audio", "video", "both"]
root_tmp = Path("./tests/tmp_integration_full")
root_tmp.mkdir(parents=True, exist_ok=True)
failed = False
for mode in MODES:
print(f"\n=== Running mode: {mode} ===")
cfg = DummyConfig()
# Allow enabling verbose subprocess output from CI by setting YTPL_DEBUG=1
cfg.debug = bool(os.getenv("YTPL_DEBUG", "0") == "1")
cfg.download_mode = mode
# make downloads single-threaded for predictability
cfg.max_parallel_downloads = 1
cfg.aria2c_connections = 1
save_path = root_tmp / mode
# ensure a clean directory per run
if save_path.exists():
try:
shutil.rmtree(save_path)
except Exception:
pass
playlist = {"url": playlist_url, "save_path": str(save_path), "archive": f"archive_{mode}.txt"}
downloader = PlaylistDownloader(cfg, playlist, 0)
# Print resolved binary paths for debugging
try:
print(f"Resolved yt-dlp path: {cfg.yt_dlp_path}")
print(f"Resolved ffmpeg path: {cfg.ffmpeg_path}")
print(f"Resolved aria2c path: {cfg.aria2c_path}")
except Exception:
pass
try:
start = time.time()
downloader.update()
dur = time.time() - start
print(f"Mode {mode} completed in {dur:.1f}s")
# basic verifications
if mode in ("audio", "both"):
audio_folder = save_path / "audio"
mp3s = list(audio_folder.glob("*.mp3")) if audio_folder.exists() else []
print(f"Found {len(mp3s)} mp3 files in {audio_folder}")
if len(mp3s) < 3:
print(f"Expected >=3 mp3 files for mode={mode}, found {len(mp3s)}")
failed = True
if mode in ("video", "both"):
video_folder = save_path / "video"
mp4s = list(video_folder.glob("*.mp4")) if video_folder.exists() else []
print(f"Found {len(mp4s)} mp4 files in {video_folder}")
if len(mp4s) < 3:
print(f"Expected >=3 mp4 files for mode={mode}, found {len(mp4s)}")
failed = True
# check archive has entries
archive_file = (save_path / f"archive_{mode}.txt")
if archive_file.exists():
lines = [line for line in archive_file.read_text(encoding='utf-8').splitlines() if line.strip()]
print(f"Archive {archive_file} contains {len(lines)} lines")
if len(lines) < 3:
print(f"Expected archive to contain >=3 lines, found {len(lines)}")
# Not necessarily fatal; mark failure but continue
failed = True
else:
print(f"Archive file {archive_file} not found")
failed = True
except Exception as ex:
print(f"Exception during mode {mode}: {ex}")
failed = True
# cleanup to avoid leaving large files around
try:
if save_path.exists():
shutil.rmtree(save_path)
print(f"Cleaned up {save_path}")
except Exception as ex:
print(f"Failed to clean up {save_path}: {ex}")
# final cleanup
try:
if root_tmp.exists():
shutil.rmtree(root_tmp)
except Exception:
pass
if failed:
print("Integration full workflow test encountered failures.")
sys.exit(3)
print("Integration full workflow test completed successfully")
sys.exit(0)
-23
View File
@@ -1,23 +0,0 @@
import logging
from src.old.manager import PlaylistManager
from tests.dummy_config import DummyConfig
def test_run_with_prune_disabled():
logging.basicConfig(level=logging.INFO, format="%(levelname)s:%(message)s")
cfg = DummyConfig()
cfg.playlists = [{"url": None, "save_path": "tests/tmp_test", "archive": "archive.txt"}]
m = PlaylistManager(cfg, debug=False)
# should complete without raising
m.run()
def test_run_with_prune_enabled_non_interactive():
logging.basicConfig(level=logging.INFO, format="%(levelname)s:%(message)s")
cfg = DummyConfig()
cfg.playlists = [{"url": None, "save_path": "tests/tmp_test", "archive": "archive.txt"}]
cfg.prune = True
cfg.non_interactive = True
m = PlaylistManager(cfg, debug=False)
# should complete without raising
m.run()
-45
View File
@@ -1,45 +0,0 @@
import logging
import subprocess
from types import SimpleNamespace
import src.old.cli as cli_mod
class DummyCompleted(SimpleNamespace):
pass
def test_update_yt_dlp_success(monkeypatch, caplog):
called = {"count": 0}
def fake_run(args, check=True, **kw):
called["count"] += 1
return DummyCompleted(returncode=0)
monkeypatch.setattr(subprocess, "run", fake_run)
caplog.set_level(logging.INFO)
cli_mod.update_yt_dlp("yt-dlp", debug=False)
assert called["count"] == 1
assert any("up to date" in r.message.lower() for r in caplog.records)
def test_update_yt_dlp_failure(monkeypatch, caplog):
def raise_called(*a, **k):
raise subprocess.CalledProcessError(1, cmd=a[0])
monkeypatch.setattr(subprocess, "run", raise_called)
caplog.set_level(logging.WARNING)
cli_mod.update_yt_dlp("yt-dlp", debug=False)
assert any("could not update yt-dlp" in r.message.lower() or "could not update" in r.message.lower() for r in caplog.records)
def test_configure_logging_sets_levels():
# ensure calling configure_logging flips global root logger level
# clear existing handlers so basicConfig can take effect in test
logging.root.handlers.clear()
cli_mod.configure_logging(True)
assert logging.getLogger().getEffectiveLevel() == logging.DEBUG
logging.root.handlers.clear()
cli_mod.configure_logging(False)
assert logging.getLogger().getEffectiveLevel() == logging.INFO
-28
View File
@@ -1,28 +0,0 @@
import json
from src.old.config import ConfigLoader
def test_config_loader_reads_properties(tmp_path, monkeypatch):
# create a minimal config file with known binary names that exist on PATH
cfg = {
"playlists": [{"url": "https://www.youtube.com/playlist?list=FAKE", "save_path": "./tmp", "archive": "archive.txt"}],
"yt_dlp_path": "python",
"ffmpeg_path": "python",
"aria2c_path": "python",
"max_parallel_downloads": 3,
"aria2c_connections": 2,
}
p = tmp_path / "yt-playlist-config.json"
p.write_text(json.dumps(cfg), encoding="utf-8")
# Use absolute path so ConfigLoader doesn't try to create ./config
loader = ConfigLoader(str(p))
assert loader.playlists == cfg["playlists"]
assert loader.yt_dlp_path == "python"
assert loader.ffmpeg_path == "python"
assert loader.aria2c_path == "python"
assert loader.max_parallel_downloads == 3
assert loader.aria2c_connections == 2
+15
View File
@@ -0,0 +1,15 @@
from __future__ import annotations
from src.app.core.download.downloader import Downloader
def test_build_format_defaults_to_best_mp4():
fmt = Downloader.build_format(None)
assert "height<=1080" in fmt
assert fmt.endswith("/best[ext=mp4]")
def test_build_format_applies_height_cap():
fmt = Downloader.build_format("720p")
assert "height<=720" in fmt
-40
View File
@@ -1,40 +0,0 @@
import subprocess
import shutil
from src.old.downloader import PlaylistDownloader
from tests.dummy_config import DummyConfig
def test_download_video_invalid_mode(tmp_path):
cfg = DummyConfig()
playlist = {"url": "https://www.youtube.com/playlist?list=FAKE", "save_path": str(tmp_path)}
dl = PlaylistDownloader(cfg, playlist, 0)
dl.download_mode = "invalid_mode"
video = {"id": "X1", "title": "Test"}
assert dl.download_video(video, 1) is False
def test_download_video_both_mode_ffmpeg_missing(monkeypatch, tmp_path, caplog):
cfg = DummyConfig()
playlist = {"url": "https://www.youtube.com/playlist?list=FAKE", "save_path": str(tmp_path)}
dl = PlaylistDownloader(cfg, playlist, 0)
dl.download_mode = "both"
video = {"id": "X1", "title": "Test"}
# monkeypatch _run to simulate successful video download and ffmpeg extraction failure path
def fake_run(*args, **kwargs):
# accept label or other kwargs; simulate successful call
return subprocess.CompletedProcess(args, 0)
monkeypatch.setattr(PlaylistDownloader, "_run", fake_run)
# Ensure ffmpeg is not found
monkeypatch.setattr(shutil, "which", lambda p: None)
# Should not raise; will log a warning about ffmpeg missing
caplog.set_level("WARNING")
ok = dl.download_video(video, 1)
# For 'both' mode the function returns True when video download succeeded (we simulate that)
assert ok is True
assert any("ffmpeg not found" in r.message.lower() or "ffmpeg failed" in r.message.lower() for r in caplog.records) or True
@@ -1,24 +0,0 @@
from pathlib import Path
from src.old.downloader import PlaylistDownloader
from tests.dummy_config import DummyConfig
def test_sanitize_title_and_get_file_path(tmp_path):
cfg = DummyConfig()
playlist = {"url": None, "save_path": str(tmp_path)}
dl = PlaylistDownloader(cfg, playlist, 0)
# illegal chars should be replaced and trimmed; fallback_id used when title becomes empty
title = ' My: <>:"/\\|?*Title '
safe = dl.sanitize_title(title, "ABC123")
# ensure no illegal characters remain
assert all(c not in safe for c in dl.illegal_chars)
# empty title should return fallback id
assert dl.sanitize_title(" ", "FALLBACK") == "FALLBACK"
# get_file_path uses save_path and zero-padded index
path = dl.get_file_path(5, "SongName")
assert isinstance(path, Path)
assert path.name.startswith("005 - SongName")
-48
View File
@@ -1,48 +0,0 @@
import json
import subprocess
from types import SimpleNamespace
from src.old.downloader import PlaylistDownloader
from tests.dummy_config import DummyConfig
class DummyCompleted(SimpleNamespace):
pass
def test_fetch_videos_parses_entries(monkeypatch, tmp_path):
cfg = DummyConfig()
playlist = {"url": "https://www.youtube.com/playlist?list=FAKE", "save_path": str(tmp_path)}
dl = PlaylistDownloader(cfg, playlist, 0)
entries = [{"id": "A1", "title": "Song 1"}, {"id": "B2", "title": "Song 2"}]
out = json.dumps({"entries": entries})
def fake_run(args, capture_output=True, text=True, check=True):
return DummyCompleted(stdout=out)
monkeypatch.setattr(subprocess, "run", fake_run)
res = dl.fetch_videos()
assert isinstance(res, list)
assert len(res) == 2
assert res[0]["id"] == "A1"
def test_fetch_videos_handles_private_and_errors(monkeypatch, tmp_path, caplog):
cfg = DummyConfig()
playlist = {"url": "https://www.youtube.com/playlist?list=FAKE", "save_path": str(tmp_path)}
dl = PlaylistDownloader(cfg, playlist, 0)
# simulate CalledProcessError with 'private' message
def raise_called(*a, **k):
e = subprocess.CalledProcessError(1, cmd=a[0])
e.stderr = "This playlist is private"
raise e
monkeypatch.setattr(subprocess, "run", raise_called)
caplog.set_level("WARNING")
res = dl.fetch_videos()
assert res == []
assert dl.skip is True
-26
View File
@@ -1,26 +0,0 @@
import logging
from tests.dummy_config import DummyConfig
from src.old.manager import PlaylistManager
def test_manager_warns_and_sleeps(monkeypatch, caplog):
# Avoid actually sleeping during the test
slept = {"called": False}
def fake_sleep(sec):
slept["called"] = True
# monkeypatch the sleep used inside the manager module
monkeypatch.setattr("src.manager.time.sleep", fake_sleep)
caplog.set_level(logging.WARNING)
cfg = DummyConfig()
cfg.max_parallel_downloads = 11
cfg.aria2c_connections = 10
cfg.playlists = []
m = PlaylistManager(cfg, debug=False)
m.run()
assert slept["called"] is True
assert any("may overload your network" in rec.getMessage() for rec in caplog.records)
@@ -1,23 +0,0 @@
import logging
from src.old.manager import PlaylistManager
from tests.dummy_config import DummyConfig
def test_run_with_prune_disabled():
logging.basicConfig(level=logging.INFO, format="%(levelname)s:%(message)s")
cfg = DummyConfig()
cfg.playlists = [{"url": None, "save_path": "tests/tmp_test", "archive": "archive.txt"}]
m = PlaylistManager(cfg, debug=False)
# should complete without raising
m.run()
def test_run_with_prune_enabled_non_interactive():
logging.basicConfig(level=logging.INFO, format="%(levelname)s:%(message)s")
cfg = DummyConfig()
cfg.playlists = [{"url": None, "save_path": "tests/tmp_test", "archive": "archive.txt"}]
cfg.prune = True
cfg.non_interactive = True
m = PlaylistManager(cfg, debug=False)
# should complete without raising
m.run()
-78
View File
@@ -1,78 +0,0 @@
from pathlib import Path
from src.old.downloader import PlaylistDownloader
from tests.dummy_config import DummyConfig
def touch(p: Path):
p.parent.mkdir(parents=True, exist_ok=True)
p.write_text("x")
def test_renumber_audio_and_cleanup(tmp_path, monkeypatch):
cfg = DummyConfig()
playlist = {"url": "FAKE", "save_path": str(tmp_path)}
dl = PlaylistDownloader(cfg, playlist, 0)
# set download mode to audio and create only audio files
dl.download_mode = "audio"
entries = [
{"id": "ID1", "title": "First Song"},
{"id": "ID2", "title": "Second Song"},
]
a1 = tmp_path / "audio" / "oldname First Song.mp3"
a2 = tmp_path / "audio" / "zzz Second Song.mp3"
touch(a1)
touch(a2)
# On Windows os.rename may fail when target exists; use os.replace to allow
# overwrite semantics for the duration of this test.
import os as _os
monkeypatch.setattr(Path, "rename", lambda self, target: _os.replace(self, target))
dl.renumber_all_tracks(entries)
audio_files = list((tmp_path / "audio").glob("*.mp3"))
# On some platforms the renaming logic may overwrite targets; assert at least
# one audio file was produced and that its name contains one of the titles.
assert audio_files
assert any("First Song" in f.name or "Second Song" in f.name for f in audio_files)
# Now test cleanup_removed_tracks: create a stray file not in entries
stray = tmp_path / "audio" / "999 - NotInPlaylist.mp3"
touch(stray)
# ensure prune=False -> no deletion
dl.prune = False
dl.cleanup_removed_tracks(entries)
assert stray.exists()
# Now enable prune and non_interactive so deletion occurs without input
dl.prune = True
dl.non_interactive = True
dl.cleanup_removed_tracks(entries)
assert not stray.exists()
def test_renumber_video(tmp_path, monkeypatch):
cfg = DummyConfig()
playlist = {"url": "FAKE", "save_path": str(tmp_path)}
dl = PlaylistDownloader(cfg, playlist, 0)
dl.download_mode = "video"
entries = [
{"id": "ID1", "title": "Alpha"},
{"id": "ID2", "title": "Beta"},
]
v1 = tmp_path / "video" / "something Alpha.mp4"
v2 = tmp_path / "video" / "something Beta.mp4"
touch(v1)
touch(v2)
import os as _os
monkeypatch.setattr(Path, "rename", lambda self, target: _os.replace(self, target))
dl.renumber_all_tracks(entries)
video_files = list((tmp_path / "video").glob("*.mp4"))
assert video_files
assert any("Alpha" in f.name or "Beta" in f.name for f in video_files)
+40
View File
@@ -0,0 +1,40 @@
from __future__ import annotations
import json
import os
from pathlib import Path
from src.app.config.settings import Settings
def test_settings_creates_root_config_if_missing(tmp_path, monkeypatch):
monkeypatch.chdir(tmp_path)
cfg_path = tmp_path / "config" / "yt-playlist-config.json"
assert not cfg_path.exists()
settings = Settings()
assert settings.path == cfg_path.resolve()
assert cfg_path.exists()
data = json.loads(cfg_path.read_text(encoding="utf-8"))
assert "playlists" in data
assert data.get("ffmpeg_path") == ("./bin/ffmpeg.exe" if os.name == "nt" else "./bin/ffmpeg")
assert data["playlists"][0].get("download_mode") == "video"
assert data["playlists"][0].get("max_download_quality") == "1080p"
assert data["playlists"][0].get("save_path") == "./downloads"
def test_settings_reads_config_from_default_location(tmp_path, monkeypatch):
monkeypatch.chdir(tmp_path)
cfg_path = tmp_path / "config" / "yt-playlist-config.json"
cfg_path.parent.mkdir(parents=True, exist_ok=True)
cfg_path.write_text(json.dumps({"playlists": [{"url": "X"}]}), encoding="utf-8")
settings = Settings()
assert settings.path == cfg_path.resolve()
assert settings.playlists and settings.playlists[0]["url"] == "X"
assert settings.playlists[0]["download_mode"] == "video"
assert settings.playlists[0]["max_download_quality"] == "1080p"
assert settings.playlists[0]["save_path"] == "./downloads"