Initial commit

This commit is contained in:
2026-03-29 15:04:15 -04:00
parent e67c991697
commit b69d1a4408
31 changed files with 2520 additions and 0 deletions
+55
View File
@@ -0,0 +1,55 @@
# Python
__pycache__/
*.py[cod]
*.pyo
*.pyd
*.egg
*.egg-info/
dist/
build/
eggs/
parts/
var/
sdist/
wheels/
*.egg-link
.installed.cfg
lib/
lib64/
pip-wheel-metadata/
share/python-wheels/
.Python
env/
venv/
.venv/
ENV/
.env
# PyInstaller
*.spec.bak
dist/
build/
# PyCharm
.idea/
*.iml
*.iws
*.ipr
out/
# Vim
*.swp
*.swo
*~
.netrwhist
tags
# OS
.DS_Store
Thumbs.db
# Local runner (contains credentials)
run.bat
# Claude session files
claude_resume
+68
View File
@@ -0,0 +1,68 @@
# CLAUDE.md
This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.
## Project Overview
MovieTagger is a PySide6/Qt desktop application for managing and tagging movie video files. It parses filenames, queries the TMDB API for metadata, and renames files to a standardized format.
## Commands
```bash
# Install dependencies (including dev tools)
pip install -e ".[dev]"
# Run in development
python -m movietagger
# or
python main.py
# Build standalone executable (PyInstaller)
pyinstaller MovieTagger.spec
```
**Required environment variables** (set in `~/.config/MovieTagger/config.ini` or shell):
- `TMDB_KEY` — TMDB API key
- `TMDB_LANGUAGE` — e.g. `en-US`
- `TMDB_REGION` — e.g. `US`
There are no tests or linting configurations.
## Architecture
### Layer Structure
```
core/ — Pure business logic (no Qt imports)
services/ — External API clients
qt/ — All UI and threading (Qt-dependent)
qt/workers/ — Background QThread workers
```
### Data Flow
1. **Load**: `LoadWorker` scans directories, calls `naming.parse_video_stem()` to extract `MovieRow` objects (title, year, tags, existing TMDB/IMDB IDs).
2. **Display**: `MovieTableModel` + `MovieProxyModel` hold and filter the `MovieRow` list in the main table view.
3. **Search**: `TmdbSearchWorker` (or `BatchScanWorker` for bulk) calls `TmdbService`, returns `MoviePick` objects. The user selects a pick via `MoviePickDialog`, which updates the row's IDs.
4. **Save**: `SaveWorker` calls `naming.build_video_stem()` on each row to construct the new filename, then renames files on disk.
### Key Design Patterns
- **Threading**: Workers are `QObject` subclasses moved to `QThread`. Communication is strictly via Qt signals/slots. Cancellation uses `threading.Event`.
- **Proxy model**: `MovieProxyModel` wraps `MovieTableModel` — handles both filtering (hide already-matched rows) and token-based title search without modifying the source model.
- **Config**: `core/config.py` reads/writes `~/.config/MovieTagger/config.ini` via `platformdirs`. Settings schema is defined as a `SETTINGS` list of `SettingKind`-typed dicts in `config.py`.
- **Rate limiting**: `RateLimiter` in `core/ratelimiter.py` throttles TMDB API calls across concurrent workers.
### Filename Format
Parsed and generated by `core/naming.py`:
```
Title (YEAR) [tag1] [tag2] {edition-X} {tmdb-12345}.mkv
```
- Tags are freeform bracketed tokens (e.g. `[Directors Cut]`, `[2160p]`)
- Edition is `{edition-...}`
- Provider IDs are `{tmdb-...}` or `{imdb-...}`
### IMDB Service
`services/imdb_service.py` and `qt/workers/imdb_search_worker.py` are stubs — they exist in the UI but return no results.
+40
View File
@@ -0,0 +1,40 @@
# -*- mode: python ; coding: utf-8 -*-
block_cipher = None
a = Analysis(
['main.py'],
pathex=["."],
binaries=[],
datas=[('assets', 'assets')],
hiddenimports=[],
hookspath=[],
hooksconfig={},
runtime_hooks=[],
excludes=[],
noarchive=False,
optimize=0,
)
pyz = PYZ(a.pure, a.zipped_data, cipher=block_cipher)
exe = EXE(
pyz,
a.scripts,
a.binaries,
a.datas,
[],
debug=False,
bootloader_ignore_signals=False,
strip=False,
upx=True,
upx_exclude=[],
runtime_tmpdir=None,
disable_windowed_traceback=False,
argv_emulation=False,
target_arch=None,
codesign_identity=None,
entitlements_file=None,
name='MovieTagger',
icon=['assets\\app.ico'],
console=True,
)
BIN
View File
Binary file not shown.

After

Width:  |  Height:  |  Size: 76 KiB

+4
View File
@@ -0,0 +1,4 @@
from movietagger.app import main
if __name__ == '__main__':
raise SystemExit(main())
+25
View File
@@ -0,0 +1,25 @@
[build-system]
requires = ["setuptools>=68", "wheel"]
build-backend = "setuptools.build_meta"
[project]
name = "movietagger"
version = "0.2.0"
requires-python = ">=3.11"
dependencies = [
'PySide6',
'QtAwesome',
'platformdirs',
'themoviedb',
]
[tool.setuptools]
package-dir = {"" = "src"}
[tool.setuptools.packages.find]
where = ["src"]
[project.optional-dependencies]
dev = [
'pyinstaller'
]
+1
View File
@@ -0,0 +1 @@
__all__ = ['app']
+4
View File
@@ -0,0 +1,4 @@
from movietagger.app import main
if __name__ == '__main__':
main()
+26
View File
@@ -0,0 +1,26 @@
import sys
from pathlib import Path
from PySide6 import QtWidgets, QtGui
from movietagger.core.config import load_config
from movietagger.qt.main_window import MovieTaggerWindow
def resource_path(rel: str) -> str:
base = Path(getattr(sys, "_MEIPASS", Path(__file__).resolve().parents[2]))
return str(base / rel)
def main() -> int:
load_config()
app = QtWidgets.QApplication([])
app.setWindowIcon(QtGui.QIcon(resource_path("assets/app.ico")))
app.setStyle("Fusion")
w = MovieTaggerWindow()
w.resize(1500, 700)
w.show()
app.exec()
return 0
+6
View File
@@ -0,0 +1,6 @@
from . import config
from . import datatypes
from . import naming
from . import ratelimiter
__all__ = ['config', 'datatypes', 'naming', 'ratelimiter']
+100
View File
@@ -0,0 +1,100 @@
import configparser
import os
from dataclasses import dataclass
from pathlib import Path
from typing import Optional, Sequence, Callable
import platformdirs
from movietagger.core.datatypes import SettingKind
APP_NAME = "MovieTagger"
APP_AUTHOR = "Blue Parity"
TMDB_REQUIRED_VARS = ("TMDB_KEY", "TMDB_LANGUAGE", "TMDB_REGION")
@dataclass(frozen=True)
class SettingSpec:
section: str
key: str
label: str
kind: SettingKind
default: str = ""
required: bool = False
choices: Optional[Sequence[str]] = None
validator: Optional[Callable[[str], Optional[str]]] = None # returns error msg or None
SETTINGS = [
SettingSpec("tmdb", "TMDB_KEY", "TMDB API Key", kind=SettingKind.SECRET, default="", required=True),
SettingSpec("tmdb", "TMDB_LANGUAGE", "TMDB Language (ISO 639-1)", kind=SettingKind.STRING, default="en-US", required=True),
SettingSpec("tmdb", "TMDB_REGION", "TMDB Region (ISO 3166-1)", kind=SettingKind.STRING, default="US", required=True),
]
def __get_config_file() -> Path:
return platformdirs.user_config_path(appauthor=APP_AUTHOR, appname=APP_NAME) / "config.ini"
def __create_parser() -> configparser.ConfigParser:
config = configparser.ConfigParser()
config.optionxform = str # preserve key casing
return config
# def _allowed_map() -> dict[str, set[str]]:
# allowed: dict[str, set[str]] = {}
# for s in SETTINGS:
# allowed.setdefault(s.section, set()).add(s.key)
# return allowed
#
#
# def __reconcile_config() -> bool:
# return True
def __create_blank_config_file(file: Path) -> None:
config = configparser.ConfigParser()
config.optionxform = str # preserve key casing
config['env'] = {
"TMDB_KEY": "",
"TMDB_LANGUAGE": "en-US",
"TMDB_REGION": "US"
}
with open(file, 'w') as configfile:
# noinspection PyTypeChecker
config.write(configfile)
def load_config(override_env=False) -> None:
config_file = __get_config_file()
if not config_file.parent.exists():
config_file.parent.mkdir(parents=True, exist_ok=True)
if not config_file.exists():
__create_blank_config_file(config_file)
config = configparser.ConfigParser()
config.optionxform = str
config.read(config_file, encoding='utf-8')
section = "env"
if section in config:
for key, value in config[section].items():
if override_env or not os.getenv(key):
os.environ[key] = value
def save_config(vars=list[tuple[str, str, str]]):
pass
def require_tmdb_env() -> None:
missing = [n for n in TMDB_REQUIRED_VARS if not os.getenv(n)]
if missing:
raise RuntimeError(f"Missing required environment variables: {", ".join(missing)}")
def require_imdb_env() -> None:
pass
+64
View File
@@ -0,0 +1,64 @@
import enum
from dataclasses import dataclass
from pathlib import Path
from typing import Optional
class Provider(enum.Enum):
InvalidProvider = -1
TMDB = 0x0
IMDB = 0x1
class SettingKind(enum.Enum):
STRING = 0x0
CHOICE = 0x1
SECRET = 0x2
@dataclass(frozen=True)
class RenameOp:
src_row: int
old_path: Path
new_path: Path
@dataclass
class MoviePick:
provider: Provider
id: str
title: str
year: Optional[int] = None
overview: str = ""
poster_url: str = ""
@dataclass
class MovieRow:
path: Path
title: str
year: int
tags: list[str]
edition: Optional[str] = None
tmdb_id: Optional[str] = None
imdb_id: Optional[str] = None
valid: bool = True
status: str = ""
original_title: str = ""
canonical_title: str = "" # raw TMDB/IMDB title
original_year: int = 0
def title_changed(self) -> bool:
return self.title != self.original_title
def year_changed(self) -> bool:
return self.year != self.original_year
def is_matched(self) -> bool:
return bool((self.tmdb_id and self.tmdb_id.strip()) or (self.imdb_id and self.imdb_id.strip()))
def preferred_id(self) -> tuple[Optional[str], Optional[str]]:
# TMDB overrules IMDB
if self.tmdb_id and self.tmdb_id.strip():
return "tmdb", self.tmdb_id.strip()
if self.imdb_id and self.imdb_id.strip():
return "imdb", self.imdb_id.strip()
return None, None
+95
View File
@@ -0,0 +1,95 @@
import re
from pathlib import Path
from typing import Optional, Sequence
from movietagger.core.datatypes import MovieRow
VIDEO_EXTS = {".mkv", ".mp4"}
FILENAME_TAGS_RE = re.compile(
r"""^
(?P<name>.+?)\s*
\(\s*(?P<year>\d{4})\s*\)
(?P<brackets>(?:\s*\[[^]]+])*) # 0+ [tag]
(?:\s*\{edition-(?P<edition>[^}]+)})? # optional edition
(?:\s*\{(?P<id_kind>tmdb|imdb)-(?P<id_value>[^}]+)})? # optional id
$""",
re.VERBOSE | re.IGNORECASE,
)
BRACKET_RE = re.compile(r"\[([^]]+)]")
IMDB_ID_RE = re.compile(r"^tt\d{7,10}$", re.IGNORECASE)
TMDB_ID_RE = re.compile(r"^\d+$")
WINDOWS_ILLEGAL = {
": ": " - ",
"<": "",
">": "",
'"': "",
"/": "",
"\\": "",
"|": "",
"?": "",
"*": "",
}
def clean_tag_list(tags: list[str]) -> list[str]:
# Keep order, trim whitespace, drop empties
out: list[str] = []
for t in tags:
t = t.strip()
if t:
out.append(t)
return out
def _norm_title(s: str) -> str:
# Lowercase, strip punctuation/spaces — used only for title comparison
s = (s or "").casefold()
return re.sub(r"[^a-z0-9]+", "", s)
def sanitize_title_for_windows(title: str) -> str:
out = title
for bad, repl in WINDOWS_ILLEGAL.items():
out = out.replace(bad, repl)
return out.rstrip(" .")
def is_confident_single_pick(row_title: str, row_year: int | None, pick_title: str, pick_year: int | None) -> bool:
# If we have a year, require exact year match
if row_year and pick_year and pick_year != row_year:
return False
# Title check: exact normalized match or one contains the other (handles minor punctuation differences)
a = _norm_title(row_title)
b = _norm_title(pick_title)
if not a or not b:
return False
if a == b:
return True
if a in b or b in a:
return True
return False
def build_video_stem(row: MovieRow) -> str:
"""
Convert row data to filename stem (no extension):
Title (YEAR) [tag]... {edition-...}? {tmdb-...|imdb-...}?
TMDB wins if both are present.
"""
parts = [f"{row.title} ({row.year})"]
for t in clean_tag_list(row.tags):
parts.append(f"[{t}]")
if row.edition and row.edition.strip():
parts.append(f"{{edition-{row.edition.strip()}}}")
kind, val = row.preferred_id()
if kind and val:
parts.append(f"{{{kind}-{val}}}")
return " ".join(parts)
+21
View File
@@ -0,0 +1,21 @@
import time
import threading
class RateLimiter:
def __init__(self, max_per_sec: float):
self._min_interval = 1.0 / float(max_per_sec)
self._lock = threading.Lock()
self._next_time = 0.0
def acquire(self, cancel_event: threading.Event | None = None) -> None:
while True:
if cancel_event and cancel_event.is_set():
return
with self._lock:
now = time.monotonic()
wait_s = self._next_time - now
if wait_s <= 0:
self._next_time = now + self._min_interval
return
# Sleep outside lock
time.sleep(min(wait_s, 0.25))
+1
View File
@@ -0,0 +1 @@
__all__ = ['main_window']
+116
View File
@@ -0,0 +1,116 @@
from __future__ import annotations
import configparser
from typing import Any
from PySide6 import QtWidgets, QtCore
from movietagger.core.config import SETTINGS, normalize_value, save_config, export_to_env
class ConfigDialog(QtWidgets.QDialog):
"""
Schema-driven config editor. Builds a form from movietagger.core.config.SETTINGS.
On save:
- updates the provided ConfigParser
- writes config.ini
- exports values to os.environ (override existing env)
"""
configSaved = QtCore.Signal()
def __init__(self, cfg: configparser.ConfigParser, parent: QtWidgets.QWidget | None = None):
super().__init__(parent)
self.setWindowTitle("Configuration")
self.setModal(True)
self._cfg = cfg
self._widgets: dict[tuple[str, str], Any] = {}
self._build_ui()
def _build_ui(self) -> None:
root = QtWidgets.QVBoxLayout(self)
form = QtWidgets.QFormLayout()
form.setLabelAlignment(QtCore.Qt.AlignmentFlag.AlignRight)
root.addLayout(form)
# Build fields from SETTINGS
for spec in SETTINGS:
section, key = spec.section, spec.key
cur = self._cfg.get(section, key, fallback=spec.default)
if spec.kind == "choice":
w = QtWidgets.QComboBox()
for c in (spec.choices or []):
w.addItem(str(c), str(c))
idx = w.findData(cur)
if idx >= 0:
w.setCurrentIndex(idx)
else:
w = QtWidgets.QLineEdit()
w.setText(cur)
if spec.kind == "secret":
w.setEchoMode(QtWidgets.QLineEdit.EchoMode.Password)
w.setPlaceholderText("Paste key here")
else:
w.setPlaceholderText(spec.default)
self._widgets[(section, key)] = w
form.addRow(spec.label + ":", w)
# Helper text
hint = QtWidgets.QLabel("TMDB Language examples: en-US, fr-FR, ja-JP")
hint.setWordWrap(True)
hint.setStyleSheet("color: #777;")
root.addWidget(hint)
# Buttons
btn_row = QtWidgets.QHBoxLayout()
btn_row.addStretch(1)
self._btn_cancel = QtWidgets.QPushButton("Cancel")
self._btn_save = QtWidgets.QPushButton("Save")
self._btn_cancel.clicked.connect(self.reject)
self._btn_save.clicked.connect(self._on_save_clicked)
btn_row.addWidget(self._btn_cancel)
btn_row.addWidget(self._btn_save)
root.addLayout(btn_row)
def _on_save_clicked(self) -> None:
# Validate + write back to cfg
for spec in SETTINGS:
section, key = spec.section, spec.key
w = self._widgets[(section, key)]
if isinstance(w, QtWidgets.QComboBox):
val = str(w.currentData() or "").strip()
else:
val = str(w.text()).strip()
val = normalize_value(section, key, val)
if spec.required and not val:
QtWidgets.QMessageBox.warning(self, "Missing value", f"{spec.label} is required.")
return
if spec.validator:
err = spec.validator(val)
if err:
QtWidgets.QMessageBox.warning(self, "Invalid value", err)
return
if section not in self._cfg:
self._cfg[section] = {}
self._cfg[section][key] = val
# Persist + export to env (override)
save_config(self._cfg)
export_to_env(self._cfg, override_env=True)
self.configSaved.emit()
self.accept()
+72
View File
@@ -0,0 +1,72 @@
import qtawesome as qta
from PySide6 import QtCore, QtWidgets, QtGui
class SearchIconDelegate(QtWidgets.QStyledItemDelegate):
"""
Paints a magnifier icon at the far-right of the cell (4px padding).
Clicking the icon emits searchClicked(proxy_row).
- Keeps default edit/selection behavior unless the icon was clicked.
- Does not create widgets; scales well for large tables.
"""
searchClicked = QtCore.Signal(int) # proxy row
def __init__(
self,
parent: QtCore.QObject | None = None,
*,
right_padding: int = 4,
icon_size: QtCore.QSize = QtCore.QSize(16, 16),
):
super().__init__(parent)
self._right_padding = int(right_padding)
self._icon_size = QtCore.QSize(icon_size)
self._icon = qta.icon("mdi.movie-search")
def _icon_rect(self, cell_rect: QtCore.QRect) -> QtCore.QRect:
# Always pinned to the far-right with 2px padding (or configured padding).
w = self._icon_size.width()
h = self._icon_size.height()
x = cell_rect.right() - self._right_padding - w + 1 # +1 because right() is inclusive
y = cell_rect.top() + (cell_rect.height() - h) // 2
return QtCore.QRect(x, y, w, h)
def initStyleOption(self, option: QtWidgets.QStyleOptionViewItem, index: QtCore.QModelIndex) -> None:
super().initStyleOption(option, index)
# Reserve space so text doesn't go under the icon
option.rect = option.rect.adjusted(0, 0, -(self._icon_size.width() + self._right_padding + 4), 0)
def paint(self, painter: QtGui.QPainter, option: QtWidgets.QStyleOptionViewItem, index: QtCore.QModelIndex):
# Let the base delegate paint the cell text/selection/etc.
super().paint(painter, option, index)
# Draw icon on top, at pinned position.
cell_rect = option.widget.visualRect(index) if option.widget else option.rect
icon_rect = self._icon_rect(cell_rect)
# If cell is too narrow, icon rect might overlap; still draw (you can guard if desired).
mode = QtGui.QIcon.Mode.Selected if (
option.state & QtWidgets.QStyle.State_Selected) else QtGui.QIcon.Mode.Normal
state = QtGui.QIcon.State.On
self._icon.paint(painter, icon_rect, QtCore.Qt.AlignmentFlag.AlignCenter, mode, state)
def editorEvent(
self,
event: QtCore.QEvent,
model: QtCore.QAbstractItemModel,
option: QtWidgets.QStyleOptionViewItem,
index: QtCore.QModelIndex,
) -> bool:
# Only intercept mouse release; everything else (double click, keypress) behaves normally.
if event.type() == QtCore.QEvent.Type.MouseButtonRelease:
mouse = event # type: ignore[assignment]
if isinstance(mouse, QtGui.QMouseEvent):
if mouse.button() == QtCore.Qt.MouseButton.LeftButton:
if self._icon_rect(option.rect).contains(mouse.pos()):
# index.row() is a proxy row if the view model is a proxy (typical).
self.searchClicked.emit(index.row())
return True # event handled; don't start editing
return super().editorEvent(event, model, option, index)
+178
View File
@@ -0,0 +1,178 @@
from typing import Optional
from PySide6 import QtCore, QtGui, QtWidgets, QtNetwork
from movietagger.core.datatypes import MoviePick
class PosterCache(QtCore.QObject):
pixmap_ready = QtCore.Signal(str)
def __init__(self, parent=None):
super().__init__(parent)
self._nam = QtNetwork.QNetworkAccessManager(self)
self._cache: dict[str, QtGui.QPixmap] = {}
self._pending: set[str] = set()
def get(self, url: str) -> Optional[QtGui.QPixmap]:
return self._cache.get(url)
def request(self, url: str) -> None:
if not url or url in self._cache or url in self._pending:
return
self._pending.add(url)
reply = self._nam.get(QtNetwork.QNetworkRequest(QtCore.QUrl(url)))
reply.finished.connect(lambda r=reply, u=url: self._finish(r, u))
def _finish(self, reply: QtNetwork.QNetworkReply, url: str) -> None:
self._pending.discard(url)
if reply.error() == QtNetwork.QNetworkReply.NetworkError.NoError:
data = bytes(reply.readAll().data())
pm = QtGui.QPixmap()
if pm.loadFromData(data):
self._cache[url] = pm
reply.deleteLater()
self.pixmap_ready.emit(url)
class PicksModel(QtCore.QAbstractListModel):
PickRole = QtCore.Qt.ItemDataRole.UserRole + 1
def __init__(self, picks: list[MoviePick], parent=None):
super().__init__(parent)
self._picks = picks
def rowCount(self, parent=QtCore.QModelIndex()) -> int:
return len(self._picks)
def data(self, index: QtCore.QModelIndex, role: int = int(QtCore.Qt.ItemDataRole.DisplayRole)):
if not index.isValid():
return None
p = self._picks[index.row()]
if role == QtCore.Qt.ItemDataRole.DisplayRole:
y = f" ({p.year})" if p.year else ""
return f"{p.title}{y}{p.provider.name}: {p.id}"
if role == self.PickRole:
return p
return None
class PickDelegate(QtWidgets.QStyledItemDelegate):
def __init__(self, cache: PosterCache, parent=None):
super().__init__(parent)
self._cache = cache
self._placeholder = QtGui.QPixmap(64, 96)
self._placeholder.fill(QtGui.QColor(230, 230, 230))
def sizeHint(self, option, index):
return QtCore.QSize(option.rect.width(), 112)
def paint(self, painter: QtGui.QPainter, option: QtWidgets.QStyleOptionViewItem, index: QtCore.QModelIndex):
painter.save()
pick: MoviePick = index.data(PicksModel.PickRole)
rect = option.rect
if option.state & QtWidgets.QStyle.State_Selected:
painter.fillRect(rect, option.palette.highlight())
text_color = option.palette.highlightedText().color()
else:
painter.fillRect(rect, option.palette.base())
text_color = option.palette.text().color()
inner = rect.adjusted(8, 6, -8, -6)
poster_rect = QtCore.QRect(inner.left(), inner.top(), 64, 96)
pm = self._cache.get(pick.poster_url) if pick.poster_url else None
if pm is None:
painter.drawPixmap(poster_rect, self._placeholder)
if pick.poster_url:
self._cache.request(pick.poster_url)
else:
painter.drawPixmap(
poster_rect,
pm.scaled(poster_rect.size(),
QtCore.Qt.AspectRatioMode.KeepAspectRatioByExpanding,
QtCore.Qt.TransformationMode.SmoothTransformation),
)
x = poster_rect.right() + 10
text_rect = QtCore.QRect(x, inner.top(), inner.right() - x, inner.height())
title_line = f"{pick.title}" + (f" ({pick.year})" if pick.year else "") + f"{pick.provider.name}: {pick.id}"
overview = (pick.overview or "").replace("\n", " ").strip()
if len(overview) > 260:
overview = overview[:260].rstrip() + ""
painter.setPen(text_color)
title_font = option.font
title_font.setBold(True)
title_font.setPointSize(title_font.pointSize() + 1)
painter.setFont(title_font)
painter.drawText(text_rect.adjusted(0, 0, 0, -52), QtCore.Qt.TextFlag.TextSingleLine, title_line)
painter.setFont(option.font)
painter.drawText(text_rect.adjusted(0, 30, 0, 0), QtCore.Qt.TextFlag.TextWordWrap, overview)
painter.restore()
class MoviePickDialog(QtWidgets.QDialog):
def __init__(self, parent, query_title: str, query_year: Optional[int], picks: list[MoviePick]):
super().__init__(parent)
self.setWindowTitle("Select Match")
self.resize(900, 600)
self._selected: Optional[MoviePick] = None
header = QtWidgets.QLabel(f"<b>Search:</b> {query_title}" + (f" ({query_year})" if query_year else ""))
header.setWordWrap(True)
self.cache = PosterCache(self)
self.model = PicksModel(picks, self)
self.view = QtWidgets.QListView()
self.view.setModel(self.model)
self.view.setSelectionMode(QtWidgets.QAbstractItemView.SelectionMode.SingleSelection)
self.delegate = PickDelegate(self.cache, self.view)
self.view.setItemDelegate(self.delegate)
self.cache.pixmap_ready.connect(lambda _url: self.view.viewport().update())
self.btn_select = QtWidgets.QPushButton("Select")
self.btn_select.setEnabled(False)
self.btn_cancel = QtWidgets.QPushButton("Cancel")
btns = QtWidgets.QHBoxLayout()
btns.addStretch(1)
btns.addWidget(self.btn_cancel)
btns.addWidget(self.btn_select)
layout = QtWidgets.QVBoxLayout(self)
layout.addWidget(header)
layout.addWidget(self.view, 1)
layout.addLayout(btns)
self.view.selectionModel().selectionChanged.connect(self._sel_changed)
self.view.doubleClicked.connect(lambda _idx: self._accept())
self.btn_select.clicked.connect(self._accept)
self.btn_cancel.clicked.connect(self.reject)
def _sel_changed(self, *_):
self.btn_select.setEnabled(bool(self.view.selectionModel().selectedIndexes()))
def _accept(self):
idxs = self.view.selectionModel().selectedIndexes()
if not idxs:
return
self._selected = idxs[0].data(PicksModel.PickRole)
self.accept()
@property
def selected(self) -> Optional[MoviePick]:
return self._selected
class OptionsDialog(QtWidgets.QDialog):
def __init__(self, parent):
super().__init__(parent)
+841
View File
@@ -0,0 +1,841 @@
import pathlib
import webbrowser
from functools import partial
from typing import Sequence, Optional
import qtawesome as qta
from PySide6 import QtWidgets, QtGui, QtCore
from movietagger.core.datatypes import MovieRow, MoviePick, Provider, RenameOp
from movietagger.core.naming import build_video_stem, is_confident_single_pick, sanitize_title_for_windows
from movietagger.qt.delegates import SearchIconDelegate
from movietagger.qt.dialogs import MoviePickDialog
from movietagger.qt.models import MovieTableModel
from movietagger.qt.proxy import MovieProxyModel
from movietagger.qt.workers import *
class MovieTaggerWindow(QtWidgets.QMainWindow):
# -------------------------
# Main UI
# -------------------------
def __init__(self, parent=None):
super().__init__(parent)
self.disabled_when_loading = []
self.setWindowTitle("Movie Tagger")
self._current_progress_msg = ""
self._imdb_thread = None
self._imdb_worker = None
self._scan_thread = None
self._scan_worker = None
self._save_thread = None
self._save_worker = None
self._build_menubar()
self._build_statusbar()
self._build_content()
def _build_menubar(self):
menubar = self.menuBar()
### --- File menu ---
file_menu = menubar.addMenu("&File")
file_open_files = QtGui.QAction("Open &Files...", self)
file_open_files.setIcon(qta.icon("fa6s.file-video"))
file_open_files.setShortcut(QtGui.QKeySequence.StandardKey.Open)
file_open_files.triggered.connect(self._open_files) # or expose a public method
file_open_dir = QtGui.QAction("Open &Directory...", self)
file_open_dir.setIcon(qta.icon("fa6s.folder-open"))
file_open_dir.triggered.connect(partial(self._open_directory, recurse=False))
file_open_dir_r = QtGui.QAction("Open Directory (&Recursive)...", self)
file_open_dir_r.setIcon(qta.icon("fa6s.folder-tree"))
file_open_dir_r.triggered.connect(partial(self._open_directory, recurse=True))
file_exit = QtGui.QAction("E&xit", self)
file_exit.setIcon(qta.icon("fa6s.power-off"))
file_exit.setShortcut(QtGui.QKeySequence.StandardKey.Quit)
file_exit.triggered.connect(self.close)
file_menu.addAction(file_open_files)
file_menu.addAction(file_open_dir)
file_menu.addAction(file_open_dir_r)
file_menu.addSeparator()
file_menu.addAction(file_exit)
self.disabled_when_loading.append(file_open_files)
self.disabled_when_loading.append(file_open_dir)
self.disabled_when_loading.append(file_open_dir_r)
### --- View menu ---
view_menu = menubar.addMenu("&View")
self.view_hide_matched = QtGui.QAction("Hide &Matched", self)
self.view_hide_matched.setCheckable(True)
self.view_hide_matched.setChecked(False)
self.view_hide_matched.triggered.connect(self._on_hide_matched_toggled)
view_focus_filter = QtGui.QAction("Focus &Filter", self)
view_focus_filter.setIcon(qta.icon("ri.focus-3-line"))
view_focus_filter.setShortcut("Ctrl+F")
view_focus_filter.triggered.connect(
lambda: self.filter_edit.setFocus(QtCore.Qt.FocusReason.ShortcutFocusReason))
view_options = QtGui.QAction("&Configuration", self)
view_options.setIcon(qta.icon("msc.settings"))
# view_options.triggered.connect() TODO
view_menu.addAction(self.view_hide_matched)
view_menu.addSeparator()
view_menu.addAction(view_focus_filter)
view_menu.addSeparator()
view_menu.addAction(view_options)
### --- Exec menu ---
run_menu = menubar.addMenu("&Run")
self.run_auto_scan = QtGui.QAction("&Scan && Auto Match", self)
self.run_auto_scan.setIcon(qta.icon("msc.run-all"))
self.run_auto_scan.setShortcut("Ctrl+S")
self.run_auto_scan.setEnabled(False)
self.run_auto_scan.triggered.connect(self._start_async_scan)
run_menu.addAction(self.run_auto_scan)
# Wrap the menubar to add left padding
wrapper = QtWidgets.QWidget()
layout = QtWidgets.QHBoxLayout(wrapper)
layout.setContentsMargins(2, 8, 0, 0) # <-- left padding here
layout.setSpacing(0)
layout.addWidget(menubar)
# Replace the menu-bar area with your wrapper
self.setMenuWidget(wrapper)
def _build_statusbar(self):
self.status_lbl = QtWidgets.QLabel("Loaded: 0 | Unmatched: 0 | Visible: 0")
self.cancel_btn = QtWidgets.QPushButton()
self.cancel_btn.setIcon(QtWidgets.QApplication.style().standardIcon(QtWidgets.QStyle.SP_TabCloseButton))
self.cancel_btn.setToolTip("Cancel")
self.cancel_btn.setFixedSize(24, 24)
self.cancel_btn.hide()
self.busy = QtWidgets.QProgressBar()
self.busy.setFixedWidth(200)
self.busy.setTextVisible(False)
self.busy.setRange(0, 0)
self.busy.hide()
status_bar = self.statusBar()
wrapper = QtWidgets.QWidget()
layout = QtWidgets.QHBoxLayout(wrapper)
layout.setContentsMargins(7, 0, 0, 5)
layout.addWidget(self.status_lbl, 1)
layout.addWidget(self.cancel_btn, 0, QtCore.Qt.AlignmentFlag.AlignRight)
layout.addWidget(self.busy, 0, QtCore.Qt.AlignmentFlag.AlignRight)
status_bar.addWidget(wrapper, 1)
def _build_content(self):
self.movie_tool_widget = QtWidgets.QWidget()
self._autosave = False
self._autoselect = True
self.setWindowTitle("Movie Tagger")
self._load_thread: Optional[QtCore.QThread] = None
self._load_worker: Optional[LoadWorker] = None
self._tmdb_thread: Optional[QtCore.QThread] = None
self._tmdb_worker: Optional[TmdbSearchWorker] = None
self._imdb_thread: Optional[QtCore.QThread] = None
self._imdb_worker: Optional[ImdbSearchWorker] = None
self.filter_edit = QtWidgets.QLineEdit()
self.filter_edit.setPlaceholderText("Filter tokens (AND): dune 2021 hdr ...")
self.hide_matched_chk = QtWidgets.QCheckBox("Hide matched")
self.auto_select_chk = QtWidgets.QCheckBox("Auto-select if 1 result")
self.auto_select_chk.setChecked(self._autoselect)
self.autosave_chk = QtWidgets.QCheckBox("Auto-rename after ID selection")
self.autosave_chk.setChecked(self._autosave)
self.rename_selected_btn = QtWidgets.QPushButton("Rename")
self.rename_selected_btn.setIcon(qta.icon("mdi6.rename"))
self.rename_selected_btn.setToolTip("Rename Selected Files")
self.rename_selected_btn.setFixedSize(80, 30)
self.rename_selected_btn.setEnabled(False)
self.rename_selected_btn.setFlat(True)
# Table
self.table = QtWidgets.QTableView()
self.table.setSelectionBehavior(QtWidgets.QAbstractItemView.SelectionBehavior.SelectRows)
self.table.setSelectionMode(QtWidgets.QAbstractItemView.SelectionMode.ExtendedSelection)
self.table.setSortingEnabled(True)
self.table.verticalHeader().setVisible(False)
self.table.setAlternatingRowColors(True)
self.table.setWordWrap(False)
self.table.setContextMenuPolicy(QtCore.Qt.ContextMenuPolicy.CustomContextMenu)
self.table.customContextMenuRequested.connect(self._on_table_context_menu)
# Layout
top = QtWidgets.QHBoxLayout()
top.addWidget(self.filter_edit, 1)
top.addWidget(self.hide_matched_chk)
top.addWidget(self.auto_select_chk)
top.addWidget(self.autosave_chk)
top.addWidget(self.rename_selected_btn)
central = QtWidgets.QWidget()
self.setCentralWidget(central)
layout = QtWidgets.QVBoxLayout(central)
layout.addLayout(top)
layout.addWidget(self.table, 1)
# Model + proxy
self.model = MovieTableModel([])
self.proxy = MovieProxyModel(self)
self.proxy.setSourceModel(self.model)
self.table.setModel(self.proxy)
# Action delegate
self.tmdb_search_delegate = SearchIconDelegate(self.table, right_padding=4, icon_size=QtCore.QSize(16, 16))
self.imdb_search_delegate = SearchIconDelegate(self.table, right_padding=4, icon_size=QtCore.QSize(16, 16))
self.table.setItemDelegateForColumn(MovieTableModel.COL_TMDB, self.tmdb_search_delegate)
self.table.setItemDelegateForColumn(MovieTableModel.COL_IMDB, self.imdb_search_delegate)
self.tmdb_search_delegate.searchClicked.connect(self._on_find_tmdb_clicked)
self.imdb_search_delegate.searchClicked.connect(self._on_find_imdb_clicked)
# Column sizing
self.table.setColumnWidth(MovieTableModel.COL_TITLE, 400)
self.table.setColumnWidth(MovieTableModel.COL_YEAR, 70)
self.table.setColumnWidth(MovieTableModel.COL_TAGS, 120)
self.table.setColumnWidth(MovieTableModel.COL_TMDB, 110)
self.table.setColumnWidth(MovieTableModel.COL_IMDB, 140)
self.table.horizontalHeader().setStretchLastSection(True)
self.movie_tool_widget.setLayout(layout)
self.setCentralWidget(self.movie_tool_widget)
# Signals
self.autosave_chk.toggled.connect(self._set_autosave)
self.auto_select_chk.toggled.connect(self._set_autoselect)
self.cancel_btn.clicked.connect(self._cancel_worker)
self.rename_selected_btn.clicked.connect(self._bulk_save)
self.filter_edit.textChanged.connect(self._on_filter_text_changed)
self.hide_matched_chk.clicked.connect(self._on_hide_matched_toggled)
self.table.selectionModel().selectionChanged.connect(self._on_selection_changed)
self._update_status()
def _on_table_context_menu(self, pos: QtCore.QPoint) -> None:
idx = self.table.indexAt(pos)
enable_single = len(self.table.selectionModel().selectedRows()) == 1
if not idx.isValid():
return
proxy_row = idx.row()
src_row = self._source_row_from_proxy_row(proxy_row)
row = self.model.get_row(src_row)
menu = QtWidgets.QMenu(self)
act_rename = menu.addAction("Rename")
act_rename.setIcon(qta.icon("mdi6.rename"))
menu.addSeparator()
act_tmdb = menu.addAction("Search TMDB")
act_tmdb.setIcon(qta.icon("mdi.movie-search"))
act_tmdb.setEnabled(enable_single)
act_tmdb_url = menu.addAction("TMDB Listing")
act_tmdb_url.setIcon(qta.icon("mdi.web"))
act_tmdb_url.setEnabled(enable_single and row.tmdb_id is not None)
menu.addSeparator()
act_imdb = menu.addAction("Search IMDB")
act_imdb.setIcon(qta.icon("mdi.movie-search"))
act_imdb.setEnabled(enable_single)
act_imdb_url = menu.addAction("IMDB Listing")
act_imdb_url.setIcon(qta.icon("mdi.web"))
act_imdb_url.setEnabled(enable_single and row.imdb_id is not None)
menu.addSeparator()
act_restore_title = menu.addAction("Restore original title")
act_restore_title.setIcon(qta.icon("mdi.restore"))
act_restore_title.setEnabled(enable_single and row.title_changed())
act_restore_year = menu.addAction("Restore original year")
act_restore_year.setIcon(qta.icon("mdi.restore"))
act_restore_year.setEnabled(enable_single and row.year_changed())
chosen = menu.exec(self.table.viewport().mapToGlobal(pos))
if chosen is None:
return
if chosen == act_rename:
self._bulk_save()
if chosen == act_tmdb:
self._on_find_tmdb_clicked(proxy_row)
pass
if chosen == act_imdb:
# self._on_find_imdb_clicked(proxy_row)
pass
if chosen == act_tmdb_url:
self._on_open_tmdb_url(src_row)
if chosen == act_imdb_url:
self._on_open_imdb_url(src_row)
if chosen == act_restore_title:
self.model.restore_original_title(src_row)
self._update_status()
if self._autosave:
self._save_rows([src_row])
return
if chosen == act_restore_year:
self.model.restore_original_year(src_row)
self._update_status()
if self._autosave:
self._save_rows([src_row])
return
def _source_row_from_proxy_row(self, proxy_row: int) -> int:
proxy_index = self.proxy.index(proxy_row, 0)
src_index = self.proxy.mapToSource(proxy_index)
return src_index.row()
# -------------------------
# URLs
# -------------------------
def _on_open_tmdb_url(self, src_row: int) -> None:
row = self.model.get_row(src_row)
webbrowser.open(f"https://www.themoviedb.org/movie/{row.tmdb_id}")
def _on_open_imdb_url(self, src_row: int) -> None:
row = self.model.get_row(src_row)
webbrowser.open(f"https://www.imdb.com/title/{row.imdb_id}")
# -------------------------
# Filter / toggles
# -------------------------
def _set_autosave(self, checked: bool) -> None:
if self._autosave == checked:
return
self._autosave = checked
self.autosave_chk.setChecked(checked)
def _set_autoselect(self, checked: bool) -> None:
self._autoselect = checked
def _on_filter_text_changed(self, text: str) -> None:
self.proxy.set_filter_text(text)
self._update_status()
def _on_hide_matched_toggled(self, checked: bool) -> None:
self.proxy.set_hide_matched(checked)
self.hide_matched_chk.setChecked(checked)
self.view_hide_matched.setChecked(checked)
self._update_status()
def _on_selection_changed(self, *_args) -> None:
self.rename_selected_btn.setEnabled(bool(self.table.selectionModel().selectedRows()))
self._update_status()
# -------------------------
# Loading
# -------------------------
def _open_files(self) -> None:
files, _ = QtWidgets.QFileDialog.getOpenFileNames(
self,
"Select video files",
"",
"Video Files (*.mkv *.mp4)",
)
if not files:
return
self._start_async_file_load(files=files, directory=None, recurse=False)
def _open_directory(self, recurse: bool) -> None:
directory = QtWidgets.QFileDialog.getExistingDirectory(self, "Select directory")
if not directory:
return
self._start_async_file_load(files=[], directory=directory, recurse=recurse)
# -------------------------
# Workers
# -------------------------
def _enter_async_state(self, minimum: int, maximum: int) -> None:
self.cancel_btn.show()
self.busy.show()
self.busy.setRange(minimum, maximum)
for widget in self.disabled_when_loading:
widget.setEnabled(False)
def _exit_async_state(self) -> None:
self.cancel_btn.hide()
self.busy.hide()
self.busy.setRange(0, 0)
for widget in self.disabled_when_loading:
widget.setEnabled(True)
def _on_worker_progress(self, done: int, total: int) -> None:
if total > 0:
self.busy.setValue(done)
self.status_lbl.setText(f"{self._current_progress_msg} {done}/{total}")
else:
self.status_lbl.setText(f"{self._current_progress_msg} {done}")
def _cancel_worker(self) -> None:
if self._load_worker is not None:
self._load_worker.cancel()
self.status_lbl.setText("Cancelling file load...")
if self._scan_worker is not None:
self._scan_worker.cancel()
self.status_lbl.setText("Cancelling batch scan...")
if self._save_worker is not None:
self._save_worker.cancel()
self.status_lbl.setText("Cancelling file save...")
# -------------------------
# Load Files
# -------------------------
def _start_async_file_load(self, files: Sequence[str], directory: Optional[str], recurse: Optional[bool]) -> None:
if self._load_thread is not None:
QtWidgets.QMessageBox.information(self, "Loading", "File load is already running.")
return
# UI into loading state
self._enter_async_state(0, 0)
self._current_progress_msg = "Loading videos..."
self._load_thread = QtCore.QThread(self)
self._load_worker = LoadWorker(
files=files,
directory=directory,
recursive=recurse,
)
self._load_worker.moveToThread(self._load_thread)
self._load_thread.started.connect(self._load_worker.run)
self._load_worker.progress.connect(self._on_worker_progress)
self._load_worker.finished.connect(self._on_load_finished)
self._load_worker.cancelled.connect(self._on_load_cancelled)
self._load_worker.error.connect(self._on_load_error)
# Cleanup
self._load_worker.finished.connect(self._load_thread.quit)
self._load_worker.cancelled.connect(self._load_thread.quit)
self._load_worker.error.connect(self._load_thread.quit)
self._load_thread.finished.connect(self._on_load_cleanup)
self._load_thread.start()
def _on_load_finished(self, rows: list[MovieRow]) -> None:
self._apply_loaded_rows(rows)
self._exit_async_state()
self._update_status()
self.run_auto_scan.setEnabled(True)
def _on_load_cancelled(self, rows: list[MovieRow]) -> None:
self._apply_loaded_rows(rows)
self._exit_async_state()
# Keep a little hint that it was partial
if rows:
self.status_lbl.setText("Load cancelled (partial list)")
def _on_load_error(self, msg: str) -> None:
self._exit_async_state()
QtWidgets.QMessageBox.critical(self, "Load error", msg)
self._update_status()
def _on_load_cleanup(self) -> None:
if self._load_worker is not None:
self._load_worker.deleteLater()
self._load_worker = None
if self._load_thread is not None:
self._load_thread.deleteLater()
self._load_thread = None
def _apply_loaded_rows(self, rows: list[MovieRow]) -> None:
self.model = MovieTableModel(rows)
self.proxy.setSourceModel(self.model)
self.table.setModel(self.proxy)
self.table.sortByColumn(MovieTableModel.COL_TITLE, QtCore.Qt.SortOrder.AscendingOrder)
self.rename_selected_btn.setEnabled(bool(rows))
# -------------------------
# Scan & Match
# -------------------------
def _start_async_scan(self) -> None:
# Disable auto-saving
self._set_autosave(False)
# Avoid double-start
if self._scan_thread is not None:
QtWidgets.QMessageBox.information(self, "Scanning", "Auto-Scan is already running.")
return
rows = []
for proxy_row in range(self.proxy.rowCount()):
src_row = self._source_row_from_proxy_row(proxy_row)
r = self.model.get_row(src_row)
if r.tmdb_id: # already matched
continue
rows.append((src_row, r.title, r.year))
if not rows:
self.status_lbl.setText("No unmatched files to scan.")
return
self._enter_async_state(0, len(rows))
self._current_progress_msg = "Scanning videos..."
self._scan_thread = QtCore.QThread(self)
self._scan_worker = None
try:
self._scan_worker = BatchScanWorker(rows=rows)
except RuntimeError as e:
self.status_lbl.setText(f"Error in TDMB module: {e}")
self._on_scan_cleanup()
return
self._scan_worker.moveToThread(self._scan_thread)
self._scan_thread.started.connect(self._scan_worker.run)
self._scan_worker.progress.connect(self._on_worker_progress)
self._scan_worker.resultReady.connect(self._on_auto_scan_result)
self._scan_worker.errorReady.connect(self._on_auto_scan_error)
# Cleanup
self._scan_worker.finished.connect(self._scan_thread.quit)
self._scan_thread.finished.connect(self._on_scan_cleanup)
self._scan_thread.start()
def _on_scan_cleanup(self) -> None:
self._exit_async_state()
if self._scan_worker is not None:
self._scan_worker.deleteLater()
self._scan_worker = None
if self._scan_thread is not None:
self._scan_thread.deleteLater()
self._scan_thread = None
self._update_status()
def _on_auto_scan_result(self, src_row: int, pick: MoviePick) -> None:
if not pick:
return
r = self.model.get_row(src_row)
warn = ""
if sanitize_title_for_windows(pick.title) != r.title:
warn = "(WARN: title changed)"
if pick.provider == Provider.TMDB:
self.model.set_tmdb_selection(src_row, pick, status=f"{pick.provider.name}: auto-selected {warn}")
elif pick.provider == Provider.IMDB:
# TODO
pass
def _on_auto_scan_error(self, src_row: int, msg: str) -> None:
r = self.model.get_row(src_row)
self.model.set_ids_for_row(src_row, r.tmdb_id, r.imdb_id, status=f"TMDB error: {msg}")
# -------------------------
# Find ID action
# -------------------------
def _on_find_imdb_clicked(self, proxy_row: int) -> None:
src_row = self._source_row_from_proxy_row(proxy_row)
row = self.model.get_row(src_row)
if not row.valid:
QtWidgets.QMessageBox.warning(self, "Invalid filename",
"Fix the filename or edit fields in-table first.")
return
# Prevent overlapping IMDB searches
if self._imdb_thread is not None:
QtWidgets.QMessageBox.information(self, "IMDB Search", "An IMDB search is already running.")
return
# UI feedback (use your status bar)
self.status_lbl.setText(f"Searching IMDB... {row.title} ({row.year})")
# Start worker thread for TMDB search
self._imdb_thread = QtCore.QThread(self)
self._imdb_worker = ImdbSearchWorker(
src_row_idx=src_row,
title=row.title,
year=row.year if row.year else None,
)
self._imdb_worker.moveToThread(self._imdb_thread)
self._imdb_thread.started.connect(self._imdb_worker.run)
self._imdb_worker.finished.connect(self._on_search_finished)
self._imdb_worker.error.connect(self._on_search_error)
# Cleanup
self._imdb_worker.finished.connect(self._imdb_thread.quit)
self._imdb_worker.error.connect(self._imdb_thread.quit)
self._imdb_thread.finished.connect(self._on_imdb_search_cleanup)
self._imdb_thread.start()
def _on_find_tmdb_clicked(self, proxy_row: int) -> None:
src_row = self._source_row_from_proxy_row(proxy_row)
row = self.model.get_row(src_row)
if not row.valid:
QtWidgets.QMessageBox.warning(self, "Invalid filename",
"Fix the filename or edit fields in-table first.")
return
# Prevent overlapping TMDB searches
if self._tmdb_thread is not None:
QtWidgets.QMessageBox.information(self, "TMDB Search", "A TMDB search is already running.")
return
# UI feedback (use your status bar)
self.status_lbl.setText(f"Searching TMDB... {row.title} ({row.year})")
# Start worker thread for TMDB search
self._tmdb_thread = QtCore.QThread(self)
self._tmdb_worker = None
try:
self._tmdb_worker = TmdbSearchWorker(
src_row_idx=src_row,
title=row.title,
year=row.year if row.year else None,
)
except RuntimeError as e:
self.status_lbl.setText(f"Error in TDMB module: {e}")
self._on_tmdb_search_cleanup()
return
self._tmdb_worker.moveToThread(self._tmdb_thread)
self._tmdb_thread.started.connect(self._tmdb_worker.run)
self._tmdb_worker.finished.connect(self._on_search_finished)
self._tmdb_worker.error.connect(self._on_search_error)
# Cleanup
self._tmdb_worker.finished.connect(self._tmdb_thread.quit)
self._tmdb_worker.error.connect(self._tmdb_thread.quit)
self._tmdb_thread.finished.connect(self._on_tmdb_search_cleanup)
self._tmdb_thread.start()
def _on_search_finished(self, src_row: int, picks: list[MoviePick], provider: Provider) -> None:
if not picks:
self.model.set_ids_for_row(src_row, self.model.get_row(src_row).tmdb_id,
self.model.get_row(src_row).imdb_id,
status=f"{provider.name}: no results")
self._update_status()
return
dirty = False
warn = ""
# Auto-select if only one result (guarded)
if self._autoselect and len(picks) == 1:
pick = picks[0]
r = self.model.get_row(src_row)
if is_confident_single_pick(r.title, r.year, pick.title, pick.year):
if sanitize_title_for_windows(pick.title) != r.title:
warn = "(WARN: title changed)"
if provider == Provider.TMDB:
self.model.set_tmdb_selection(src_row, pick, status=f"{provider.name}: auto-selected {warn}")
elif provider == Provider.IMDB:
# TODO
pass
self._update_status()
if self._autosave:
self._save_rows([src_row])
return
r = self.model.get_row(src_row)
dlg = MoviePickDialog(self, query_title=r.title, query_year=r.year if r.year else None, picks=picks)
if dlg.exec() != QtWidgets.QDialog.DialogCode.Accepted or dlg.selected is None:
self.model.set_ids_for_row(src_row, r.tmdb_id, r.imdb_id, status=f"{provider.name}: cancelled")
self._update_status()
return
chosen = dlg.selected
if sanitize_title_for_windows(chosen.title) != r.title:
dirty = True
warn = "(WARN: title changed)"
self.model.set_tmdb_selection(src_row, chosen, status=f"{provider.name}: selected {warn}")
self._update_status()
if not dirty and self._autosave:
self._save_rows([src_row])
def _on_search_error(self, src_row: int, msg: str, provider: Provider) -> None:
r = self.model.get_row(src_row)
self.model.set_ids_for_row(src_row, r.tmdb_id, r.imdb_id, status=f"{provider.name} error: {msg}")
self._update_status()
def _on_tmdb_search_cleanup(self) -> None:
if self._tmdb_worker is not None:
self._tmdb_worker.deleteLater()
self._tmdb_worker = None
if self._tmdb_thread is not None:
self._tmdb_thread.deleteLater()
self._tmdb_thread = None
def _on_imdb_search_cleanup(self) -> None:
if self._imdb_worker is not None:
self._imdb_worker.deleteLater()
self._imdb_worker = None
if self._imdb_thread is not None:
self._imdb_thread.deleteLater()
self._imdb_thread = None
# -------------------------
# Save = rename files on disk
# -------------------------
def _save_rows(self, src_rows: list[int]) -> None:
"""
Rename files on disk to match the row data.
- Preserves extension
- Writes tags as [tag] blocks, edition as {edition-...}, id as {tmdb-...} or {imdb-...}
- TMDB wins if both set
- If target exists, skip and set status
"""
if not src_rows:
return
# Confirm if bulk (optional)
if len(src_rows) > 1:
resp = QtWidgets.QMessageBox.question(
self,
"Rename files",
f"Rename {len(src_rows)} file(s) on disk?",
QtWidgets.QMessageBox.StandardButton.Yes | QtWidgets.QMessageBox.StandardButton.No,
)
if resp != QtWidgets.QMessageBox.StandardButton.Yes:
return
ops: list[RenameOp] = []
for row_idx in src_rows:
row = self.model.get_row(row_idx)
if not row.valid:
self.model.update_path_and_status(row_idx, row.path, "Rename skipped: invalid format")
continue
new_stem = build_video_stem(row)
new_path = row.path.with_name(new_stem + row.path.suffix)
if new_path == row.path:
self.model.update_path_and_status(row_idx, row.path, "No change")
continue
if new_path.exists():
self.model.update_path_and_status(row_idx, row.path, "Rename skipped: target exists")
continue
ops.append(RenameOp(row_idx, row.path, new_path))
if not ops:
return
self._enter_async_state(0, len(ops))
self._current_progress_msg = "Saving files..."
self._save_thread = QtCore.QThread(self)
self._save_worker = SaveWorker(ops)
self._save_worker.moveToThread(self._save_thread)
#
self._save_thread.started.connect(self._save_worker.run)
self._save_worker.resultReady.connect(self._on_save_result)
self._save_worker.progress.connect(self._on_worker_progress)
self._save_worker.finished.connect(self._save_thread.quit)
self._save_thread.finished.connect(self._on_save_cleanup)
self._save_thread.start()
self._update_status()
def _bulk_save(self) -> None:
proxy_rows = [idx.row() for idx in self.table.selectionModel().selectedRows()]
src_rows = [self._source_row_from_proxy_row(r) for r in proxy_rows]
self._save_rows(src_rows)
def _on_save_result(self, src_row: int, ok: bool, msg: str, file_path: pathlib.Path):
if ok:
self.model.update_path_and_status(src_row, file_path, "Renamed")
else:
self.model.update_path_and_status(src_row, file_path, f"Rename failed: {msg}")
pass
def _on_save_cleanup(self):
self._exit_async_state()
if self._save_worker is not None:
self._save_worker.deleteLater()
self._save_worker = None
if self._save_thread is not None:
self._save_thread.deleteLater()
self._save_thread = None
# -------------------------
# Status
# -------------------------
def _update_status(self) -> None:
rows = self.model.rows()
total = len(rows)
unmatched = sum(1 for r in rows if r.valid and not r.is_matched())
invalid = sum(1 for r in rows if not r.valid)
visible = self.proxy.rowCount()
parts = [f"Loaded: {total}", f"Unmatched: {unmatched}", f"Visible: {visible}"]
if invalid:
parts.append(f"Invalid: {invalid}")
self.status_lbl.setText(" | ".join(parts))
+199
View File
@@ -0,0 +1,199 @@
from datetime import date
from pathlib import Path
from typing import Optional
from PySide6 import QtCore, QtGui
from movietagger.core.datatypes import MovieRow, MoviePick
from movietagger.core.naming import IMDB_ID_RE, TMDB_ID_RE, clean_tag_list, sanitize_title_for_windows
class MovieTableModel(QtCore.QAbstractTableModel):
COL_TITLE = 0
COL_YEAR = 1
COL_TAGS = 2
COL_EDITION = 3
COL_TMDB = 4
COL_IMDB = 5
COL_PATH = 6
COL_STATUS = 7
HEADERS = ["Title", "Year", "Tags (CSV)", "Edition", "TMDB", "IMDB", "Path", "Status"]
def __init__(self, rows: list[MovieRow]):
super().__init__()
self._rows = rows
def rowCount(self, parent=QtCore.QModelIndex()) -> int:
return len(self._rows)
def columnCount(self, parent=QtCore.QModelIndex()) -> int:
return len(self.HEADERS)
def headerData(self, section: int, orientation: QtCore.Qt.Orientation,
role: int = QtCore.Qt.ItemDataRole.DisplayRole):
if orientation == QtCore.Qt.Orientation.Horizontal and role == QtCore.Qt.ItemDataRole.DisplayRole:
return self.HEADERS[section]
return None
def flags(self, index: QtCore.QModelIndex) -> QtCore.Qt.ItemFlag:
if not index.isValid():
return QtCore.Qt.ItemFlag.NoItemFlags
base = QtCore.Qt.ItemFlag.ItemIsSelectable | QtCore.Qt.ItemFlag.ItemIsEnabled
if index.column() in (
self.COL_TITLE, self.COL_YEAR, self.COL_TAGS, self.COL_EDITION, self.COL_TMDB, self.COL_IMDB):
base |= QtCore.Qt.ItemFlag.ItemIsEditable
return base
def data(self, index: QtCore.QModelIndex, role: int = QtCore.Qt.ItemDataRole.DisplayRole):
if not index.isValid():
return None
row = self._rows[index.row()]
col = index.column()
if role == QtCore.Qt.ItemDataRole.DisplayRole or role == QtCore.Qt.ItemDataRole.EditRole:
if col == self.COL_TITLE:
return row.title
if col == self.COL_YEAR:
return "" if not row.year else str(row.year)
if col == self.COL_TAGS:
return ", ".join(row.tags)
if col == self.COL_EDITION:
return row.edition or ""
if col == self.COL_TMDB:
return row.tmdb_id or ""
if col == self.COL_IMDB:
return row.imdb_id or ""
if col == self.COL_PATH:
return str(row.path)
if col == self.COL_STATUS:
return row.status
return ""
if role == QtCore.Qt.ItemDataRole.ToolTipRole:
if not row.valid:
return (
"Invalid format. Expected:\n"
"Title (YEAR) [tag]* {edition-XYZ}? {tmdb-123|imdb-tt1234567}? .mkv|.mp4\n"
"Strict order after YEAR: [tags] then {edition} then {id}."
)
elif col == self.COL_STATUS:
return row.status or None
elif col == self.COL_PATH:
return str(row.path) or None
if role == QtCore.Qt.ItemDataRole.ForegroundRole and not row.valid:
return QtGui.QBrush(QtGui.QColor("red"))
return None
def setData(self, index: QtCore.QModelIndex, value, role: int = QtCore.Qt.ItemDataRole.DisplayRole) -> bool:
if role != QtCore.Qt.ItemDataRole.EditRole or not index.isValid():
return False
row = self._rows[index.row()]
col = index.column()
text = str(value).strip()
row.status = ""
if col == self.COL_TITLE:
if text is not None:
row.title = text
row.canonical_title = text
elif col == self.COL_YEAR:
try:
year = int(text)
if 1887 < year <= date.today().year:
row.year = year
else:
row.status = "Invalid Year"
except ValueError:
row.status = "Invalid Year"
elif col == self.COL_TAGS:
parts = [p.strip() for p in text.split(",")]
row.tags = clean_tag_list(parts)
elif col == self.COL_EDITION:
row.edition = text or None
elif col == self.COL_TMDB:
if text and not TMDB_ID_RE.match(text):
row.status = "Bad TMDB id"
else:
row.tmdb_id = text or None
elif col == self.COL_IMDB:
if text and not IMDB_ID_RE.match(text):
row.status = "Bad IMDB id"
else:
row.imdb_id = text or None
elif col == self.COL_STATUS:
row.status = text
else:
return False
self.dataChanged.emit(index, index, [QtCore.Qt.ItemDataRole.DisplayRole, QtCore.Qt.ItemDataRole.EditRole])
return True
def get_row(self, row_idx: int) -> MovieRow:
return self._rows[row_idx]
def rows(self) -> list[MovieRow]:
return self._rows
def set_ids_for_row(self, row_idx: int, tmdb_id: Optional[str], imdb_id: Optional[str], status: str = "") -> None:
row = self._rows[row_idx]
row.tmdb_id = (tmdb_id or "").strip() or None
row.imdb_id = (imdb_id or "").strip() or None
if status:
row.status = status
left = self.index(row_idx, self.COL_TMDB)
right = self.index(row_idx, self.COL_STATUS)
self.dataChanged.emit(left, right, [QtCore.Qt.ItemDataRole.DisplayRole])
def update_path_and_status(self, row_idx: int, new_path: Path, status: str) -> None:
row = self._rows[row_idx]
row.path = new_path
row.status = status
left = self.index(row_idx, self.COL_PATH)
right = self.index(row_idx, self.COL_STATUS)
self.dataChanged.emit(left, right, [QtCore.Qt.ItemDataRole.DisplayRole])
def set_tmdb_selection(self, row_idx: int, pick: MoviePick, status: str = "TMDB: selected") -> None:
row = self._rows[row_idx]
row.tmdb_id = str(pick.id)
# Update canonical metadata from TMDB
if pick.title and pick.title.strip():
row.title = sanitize_title_for_windows(pick.title.strip())
row.canonical_title = pick.title.strip()
if pick.year:
row.year = int(pick.year)
row.status = status
left = self.index(row_idx, self.COL_TITLE)
right = self.index(row_idx, self.COL_STATUS)
self.dataChanged.emit(left, right, [QtCore.Qt.ItemDataRole.DisplayRole, QtCore.Qt.ItemDataRole.EditRole])
def restore_original_title(self, row_idx: int) -> None:
r = self._rows[row_idx]
if r.original_title:
r.title = r.original_title
r.canonical_title = ""
r.status = "Restored original title"
left = self.index(row_idx, self.COL_TMDB)
right = self.index(row_idx, self.COL_STATUS)
self.dataChanged.emit(left, right, [QtCore.Qt.ItemDataRole.DisplayRole])
def restore_original_year(self, row_idx: int) -> None:
r = self._rows[row_idx]
if r.original_year:
r.year = r.original_year
r.status = "Restored original year"
left = self.index(row_idx, self.COL_TMDB)
right = self.index(row_idx, self.COL_STATUS)
self.dataChanged.emit(left, right, [QtCore.Qt.ItemDataRole.DisplayRole])
+96
View File
@@ -0,0 +1,96 @@
from typing import cast
from PySide6 import QtCore
from movietagger.core.datatypes import Provider
from movietagger.core.naming import MovieRow
from movietagger.qt.models import MovieTableModel
class MovieProxyModel(QtCore.QSortFilterProxyModel):
def __init__(self, parent=None):
super().__init__(parent)
self._hide_matched = False
self._tokens: list[str] = []
self.setSortCaseSensitivity(QtCore.Qt.CaseSensitivity.CaseInsensitive)
def _compare_ids(self, left, right, *, provider: Provider) -> bool:
l = self.sourceModel().data(left, QtCore.Qt.ItemDataRole.DisplayRole)
r = self.sourceModel().data(right, QtCore.Qt.ItemDataRole.DisplayRole)
# Normalize empty values → always sort last
if not l and not r:
return False
if not l:
return False
if not r:
return True
try:
if provider == Provider.TMDB:
return int(l) < int(r)
if provider == Provider.IMDB:
# IMDB IDs like "tt0123456"
return int(l.lstrip("tt")) < int(r.lstrip("tt"))
except ValueError:
# Fallback to string compare if malformed
return str(l) < str(r)
return False
def lessThan(self, left, right) -> bool:
col = left.column()
# TMDB column
if col == MovieTableModel.COL_TMDB:
return self._compare_ids(left, right, provider=Provider.TMDB)
# IMDB column
if col == MovieTableModel.COL_IMDB:
return self._compare_ids(left, right, provider=Provider.IMDB)
return super().lessThan(left, right)
def set_hide_matched(self, hide: bool) -> None:
self._hide_matched = hide
self.invalidateFilter()
def set_filter_text(self, text: str) -> None:
self._tokens = [t.casefold() for t in text.split() if t.strip()]
self.invalidateFilter()
def filterAcceptsRow(self, source_row: int, source_parent: QtCore.QModelIndex) -> bool:
model = cast(MovieTableModel, self.sourceModel())
if model is None:
return True
row: MovieRow = model.get_row(source_row)
if self._hide_matched and row.is_matched():
return False
if not self._tokens:
return True
flags = []
if not row.valid:
flags.append("invalid")
haystack = " ".join(
[
row.title or "",
row.original_title or "",
str(row.year) if row.year else "",
", ".join(row.tags) if row.tags else "",
row.edition or "",
row.tmdb_id or "",
row.imdb_id or "",
str(row.path) if row.path else "",
row.status or "",
*flags,
]
).casefold()
return all(tok in haystack for tok in self._tokens)
View File
+7
View File
@@ -0,0 +1,7 @@
from .load_worker import LoadWorker
from .tmdb_search_worker import TmdbSearchWorker
from .imdb_search_worker import ImdbSearchWorker
from .batch_scan_worker import BatchScanWorker
from .save_worker import SaveWorker
__all__ = ['LoadWorker', 'TmdbSearchWorker', 'ImdbSearchWorker', 'BatchScanWorker', 'SaveWorker']
@@ -0,0 +1,119 @@
import threading
import time
from concurrent.futures import Future, ThreadPoolExecutor, wait, FIRST_COMPLETED
from PySide6 import QtCore
from movietagger.core.config import require_tmdb_env
from movietagger.core.datatypes import MoviePick
from movietagger.core.naming import is_confident_single_pick
from movietagger.core.ratelimiter import RateLimiter
from movietagger.services.tmdb_service import tmdb_search_picks_sync
class BatchScanWorker(QtCore.QObject):
finished = QtCore.Signal()
progress = QtCore.Signal(int, int) # done, total
resultReady = QtCore.Signal(int, object)
errorReady = QtCore.Signal(int, str) # src_row_idx, message
def __init__(self, rows: list[tuple[int, str, int]], *, max_workers: int = 10):
"""
rows: list of (src_row, title, year)
We pass a snapshot so the worker doesn't touch the model from another thread.
"""
super().__init__()
self._rows = rows
self._cancel_event = threading.Event()
self._max_workers = max(1, int(max_workers))
self._limiter = RateLimiter(max_per_sec=20)
self._emit_interval_s = 0.10
require_tmdb_env()
@QtCore.Slot()
def cancel(self) -> None:
self._cancel_event.set()
@QtCore.Slot()
def run(self) -> None:
total = len(self._rows)
done = 0
if total == 0:
self.finished.emit()
return
self.progress.emit(done, total)
row_iter = iter(self._rows)
pending: dict[Future, int] = {} # future -> src_row
last_emit = 0.0
def maybe_emit_progress(force: bool = False) -> None:
nonlocal last_emit
now = time.monotonic()
if force or (now - last_emit) >= self._emit_interval_s:
last_emit = now
self.progress.emit(done, total)
def submit_next(ex: ThreadPoolExecutor) -> None:
if self._cancel_event.is_set():
return
row = next(row_iter, None)
if row is None:
return
src_row, title, year = row
pending[ex.submit(self._scan_one_row, src_row, title, year)] = src_row
try:
with ThreadPoolExecutor(max_workers=self._max_workers) as ex:
# Prime the pool
for _ in range(self._max_workers):
submit_next(ex)
while pending:
if self._cancel_event.is_set():
for f in list(pending):
f.cancel()
break
done_set, _ = wait(pending, return_when=FIRST_COMPLETED)
for f in done_set:
src_row = pending.pop(f)
done += 1
if not f.cancelled():
try:
res = f.result()
if res is not None:
self.resultReady.emit(*res)
except Exception as e:
self.errorReady.emit(src_row, str(e))
maybe_emit_progress()
submit_next(ex)
finally:
self.finished.emit()
def _scan_one_row(self, src_row: int, title: str, year: int) -> tuple[int, MoviePick] | None:
"""
Runs in a thread-pool thread. No Qt objects, no model access.
Returns a MoviePick (or None) to send back to UI thread.
"""
self._limiter.acquire(self._cancel_event)
if self._cancel_event.is_set():
return None
try:
picks = tmdb_search_picks_sync(title, year)
except Exception:
# IMDB fallback will go here when implemented
raise
if len(picks) == 1:
pick = picks[0]
if is_confident_single_pick(title, year, pick.title, pick.year):
return src_row, pick
@@ -0,0 +1,25 @@
from typing import Optional
from PySide6 import QtCore
from movietagger.core.datatypes import Provider
from movietagger.services.imdb_service import imdb_search_picks_sync
class ImdbSearchWorker(QtCore.QObject):
finished = QtCore.Signal(int, list, Provider) # src_row_idx, picks
error = QtCore.Signal(int, str, Provider) # src_row_idx, message
def __init__(self, src_row_idx: int, title: str, year: Optional[int]):
super().__init__()
self.src_row_idx = src_row_idx
self.title = title
self.year = year
@QtCore.Slot()
def run(self) -> None:
try:
picks = imdb_search_picks_sync(self.title, self.year)
self.finished.emit(self.src_row_idx, picks, Provider.IMDB)
except Exception as e:
self.error.emit(self.src_row_idx, str(e), Provider.IMDB)
+153
View File
@@ -0,0 +1,153 @@
import os
import time
from pathlib import Path
from typing import Optional, Sequence
from PySide6 import QtCore
from movietagger.core.naming import MovieRow, VIDEO_EXTS, FILENAME_TAGS_RE, clean_tag_list, \
BRACKET_RE, TMDB_ID_RE, IMDB_ID_RE
def _parse_video_filename(path: Path) -> MovieRow:
stem = path.stem
m = FILENAME_TAGS_RE.match(stem)
if not m:
return MovieRow(path=path, title=stem, year=0, tags=[], valid=False, status="Invalid format")
title = (m.group("name") or "").strip()
year = int(m.group("year"))
brackets_raw = m.group("brackets") or ""
tags = clean_tag_list([t for t in BRACKET_RE.findall(brackets_raw)])
edition = (m.group("edition") or "").strip() or None
id_kind = (m.group("id_kind") or "").lower() or None
id_value = (m.group("id_value") or "").strip() or None
tmdb_id: Optional[str] = None
imdb_id: Optional[str] = None
if id_kind and id_value:
if id_kind == "tmdb":
if not TMDB_ID_RE.match(id_value):
return MovieRow(path=path, title=title, year=year, tags=tags, edition=edition,
valid=False, status="Bad TMDB id")
tmdb_id = id_value
elif id_kind == "imdb":
if not IMDB_ID_RE.match(id_value):
return MovieRow(path=path, title=title, year=year, tags=tags, edition=edition,
valid=False, status="Bad IMDB id")
imdb_id = id_value
row = MovieRow(
path=path,
title=title,
year=year,
tags=tags,
edition=edition,
tmdb_id=tmdb_id,
imdb_id=imdb_id,
valid=True,
status=""
)
row.original_title = row.title
row.original_year = row.year
return row
class LoadWorker(QtCore.QObject):
started = QtCore.Signal()
progress = QtCore.Signal(int, int) # done, total
finished = QtCore.Signal(list) # list[MovieRow]
cancelled = QtCore.Signal(list) # partial list[MovieRow]
error = QtCore.Signal(str)
def __init__(self, files: Sequence[str], directory: Optional[str], recursive: bool):
super().__init__()
self._files = list(files)
self._directory = directory
self._recursive = recursive
self._cancel = False
def cancel(self) -> None:
self._cancel = True
def _collect_video_files(self, files: Sequence[str], directory: Optional[str] = None, recursive: bool = False) -> \
list[Path]:
paths: list[Path] = []
last_emit = 0.0
emit_interval = 0.1 # seconds (100ms)
for f in files:
if self._cancel:
break
p = Path(f)
if p.is_file() and p.suffix.lower() in VIDEO_EXTS:
paths.append(p)
self.progress.emit(0, len(paths))
exts_tuple = tuple(VIDEO_EXTS)
if directory:
if recursive:
for root, _, files in os.walk(directory):
if self._cancel:
break
for name in files:
if self._cancel:
break
# cheap string check first
if name.lower().endswith(exts_tuple):
paths.append(Path(os.path.join(root, name)))
now = time.monotonic()
if now - last_emit >= emit_interval:
last_emit = now
self.progress.emit(0, len(paths))
else:
with os.scandir(directory) as it:
for entry in it:
if self._cancel:
break
if entry.is_file():
if entry.name.lower().endswith(exts_tuple):
paths.append(Path(entry.path))
now = time.monotonic()
if now - last_emit >= emit_interval:
last_emit = now
self.progress.emit(0, len(paths))
# de-dupe stable
seen: set[Path] = set()
out: list[Path] = []
for p in paths:
if p not in seen:
seen.add(p)
out.append(p)
return out
@QtCore.Slot()
def run(self) -> None:
last_emit = 0.0
emit_interval = 0.1 # seconds (100ms)
self.started.emit()
try:
paths = self._collect_video_files(files=self._files, directory=self._directory, recursive=self._recursive)
total = len(paths)
rows: list[MovieRow] = []
self.progress.emit(0, total)
for i, p in enumerate(paths, start=1):
if self._cancel:
self.cancelled.emit(rows)
return
rows.append(_parse_video_filename(p))
now = time.monotonic()
if now - last_emit >= emit_interval or i == total:
last_emit = now
self.progress.emit(i, total)
self.finished.emit(rows)
except Exception as e:
self.error.emit(str(e))
+66
View File
@@ -0,0 +1,66 @@
import os
import threading
import time
from PySide6 import QtCore
from movietagger.core.datatypes import RenameOp
class SaveWorker(QtCore.QObject):
progress = QtCore.Signal(int, int) # done, total
resultReady = QtCore.Signal(int, bool, str, str) # (src_row, ok, message, new_path_str)
finished = QtCore.Signal()
def __init__(self, ops: list[RenameOp], *, emit_interval_s: float = 0.10, parent=None):
super().__init__(parent)
self._ops = ops
self._emit_interval_s = float(emit_interval_s)
self._cancel = threading.Event()
@QtCore.Slot()
def cancel(self) -> None:
self._cancel.set()
@QtCore.Slot()
def run(self) -> None:
total = len(self._ops)
done = 0
last_emit = 0.0
self.progress.emit(0, total)
for op in self._ops:
if self._cancel.is_set():
break
ok = True
msg = "Renamed"
try:
# Ensure destination directory exists (usually same dir, but safe)
op.new_path.parent.mkdir(parents=True, exist_ok=True)
# On Windows, Path.rename can fail across volumes; os.replace is a bit more robust.
os.replace(str(op.old_path), str(op.new_path))
except FileExistsError:
ok = False
msg = "Target already exists"
except PermissionError as e:
ok = False
msg = f"Permission error: {e}"
except OSError as e:
ok = False
msg = f"OS error: {e}"
done += 1
self.resultReady.emit(op.src_row, ok, msg, str(op.new_path) if ok else str(op.old_path))
now = time.monotonic()
if (now - last_emit) >= self._emit_interval_s:
last_emit = now
self.progress.emit(done, total)
# final progress tick
self.progress.emit(done, total)
self.finished.emit()
@@ -0,0 +1,28 @@
from typing import Optional
from PySide6 import QtCore
from movietagger.core.config import require_tmdb_env
from movietagger.core.datatypes import Provider
from movietagger.services.tmdb_service import tmdb_search_picks_sync
class TmdbSearchWorker(QtCore.QObject):
finished = QtCore.Signal(int, list, Provider) # src_row_idx, picks
error = QtCore.Signal(int, str, Provider) # src_row_idx, message
def __init__(self, src_row_idx: int, title: str, year: Optional[int]):
super().__init__()
self.src_row_idx = src_row_idx
self.title = title
self.year = year
require_tmdb_env()
@QtCore.Slot()
def run(self) -> None:
try:
picks = tmdb_search_picks_sync(self.title, self.year)
self.finished.emit(self.src_row_idx, picks, Provider.TMDB)
except Exception as e:
self.error.emit(self.src_row_idx, str(e), Provider.TMDB)
+1
View File
@@ -0,0 +1 @@
__all__ = ['tmdb_service', 'imdb_service']
+48
View File
@@ -0,0 +1,48 @@
import asyncio
from typing import Any, Optional
from movietagger.core.datatypes import MoviePick, Provider
def _safe_int(x: Any) -> Optional[int]:
try:
return int(x)
except ValueError:
return None
TMDB_POSTER_BASE = "https://image.tmdb.org/t/p/w92"
def _to_pick(obj: Any) -> MoviePick:
# Be defensive: the library objects are attribute-based
mid = getattr(obj, "id")
title = getattr(obj, "title", None) or getattr(obj, "name", "") or ""
year = _safe_int(getattr(obj, "year", None))
overview = getattr(obj, "overview", "") or ""
poster_url = ""
poster_path = getattr(obj, "poster_path", None)
if poster_path:
poster_url = f"{TMDB_POSTER_BASE}{poster_path}"
elif hasattr(obj, "poster_url"):
try:
val = obj.poster_url
if callable(val):
val = val()
if isinstance(val, str) and val:
poster_url = val
except TypeError:
pass
# The README indicates poster_url exists on details; search results may or may not.
# poster_url = getattr(obj, "poster_url", "") or ""
return MoviePick(provider=Provider.IMDB, id=mid, title=title, year=year, overview=overview, poster_url=poster_url)
async def imdb_search_picks_async(title: str, year: Optional[int]) -> list[MoviePick]:
return []
def imdb_search_picks_sync(title: str, year: Optional[int]) -> list[MoviePick]:
return asyncio.run(imdb_search_picks_async(title, year))
+61
View File
@@ -0,0 +1,61 @@
import asyncio
import pprint
from typing import Any, Optional
from themoviedb import aioTMDb
from movietagger.core.datatypes import MoviePick, Provider
TMDB_POSTER_BASE = "https://image.tmdb.org/t/p/w92"
def _safe_int(x: Any) -> Optional[int]:
try:
return int(x)
except ValueError:
return None
def _to_pick(obj: Any) -> MoviePick:
# Be defensive: the library objects are attribute-based
movie_id = getattr(obj, "id")
title = getattr(obj, "title", None) or getattr(obj, "name", "") or ""
year = _safe_int(getattr(obj, "year", None))
overview = getattr(obj, "overview", "") or ""
poster_url = ""
poster_path = getattr(obj, "poster_path", None)
if poster_path:
poster_url = f"{TMDB_POSTER_BASE}{poster_path}"
elif hasattr(obj, "poster_url"):
try:
val = obj.poster_url
if callable(val):
val = val()
if isinstance(val, str) and val:
poster_url = val
except TypeError:
pass
# The README indicates poster_url exists on details; search results may or may not.
# poster_url = getattr(obj, "poster_url", "") or ""
return MoviePick(provider=Provider.TMDB, id=movie_id, title=title, year=year, overview=overview, poster_url=poster_url)
async def tmdb_search_picks_async(title: str, year: Optional[int]) -> list[MoviePick]:
tmdb = aioTMDb()
results = await tmdb.search().movies(title)
picks = [_to_pick(m) for m in results if m.year is not None]
# If year was provided: filter client-side --- also grab surrounding years, to be safe
if year:
exactish = [p for p in picks if (year - 2 <= p.year <= year + 2)]
if exactish:
picks = exactish
return picks
def tmdb_search_picks_sync(title: str, year: Optional[int]) -> list[MoviePick]:
return asyncio.run(tmdb_search_picks_async(title, year))