| name | archive-reprocessing |
| description | Flexible, version-tracked reprocessing system for archive transformations using design patterns (Strategy, Template Method, Observer). Activate when working with tools/scripts/lib/, reprocessing scripts, transform versions, archive transformations, metadata transformers, or incremental processing workflows. |
Archive Reprocessing System
Auto-activates when: Working with tools/scripts/lib/, reprocessing scripts, transform versions, or discussing archive transformations.
Quick Reference
When to use:
- New transformations on existing archives (flattening, normalization)
- After version bumps (v1.0 → v1.1)
- Vocabulary/model updates
- Regenerating derived outputs
When NOT to use:
- Adding new videos →
ingest_youtube.py - Rebuilding Qdrant cache →
reingest_from_archive.py
Scripts:
reprocess_qdrant_metadata.py- Flattening + weights (fast, no LLM)reprocess_normalized_tags.py- Tag normalization (slow, LLM calls)
Common commands:
# Dry run (test 10 archives)
uv run python tools/scripts/reprocess_qdrant_metadata.py --dry-run --limit 10
# Full run
uv run python tools/scripts/reprocess_qdrant_metadata.py
Design
Strategy + Template Method + Observer patterns for incremental, version-tracked reprocessing:
- Pluggable transformers (Strategy) eliminate copy-paste
- Template Method for consistent workflows
- Observer hooks for progress tracking
- Semantic versioning works during development (not git-dependent)
- Incremental processing skips unchanged archives
Core Components
1. Version Registry (transform_versions.py)
VERSIONS = {
"normalizer": "v1.0",
"vocabulary": "v1",
"qdrant_flattener": "v1.0",
"weight_calculator": "v1.0",
"llm_model": "claude-3-5-haiku-20241022",
}
Bump versions on: logic changes (v1.0 → v1.1), vocabulary updates (v1 → v2), breaking changes (v1.x → v2.0)
2. Metadata Transformers (metadata_transformers.py)
from tools.scripts.lib.metadata_transformers import (
QdrantMetadataFlattener,
RecommendationWeightCalculator,
create_qdrant_transformer,
)
# Pre-built
transformer = create_qdrant_transformer()
metadata = transformer.transform(archive_data)
# Custom
class MyTransformer(BaseTransformer):
def get_version(self) -> str:
return get_version("my_transformer")
def transform(self, archive_data: dict) -> dict:
return {"transformed": True}
3. Reprocessing Pipeline (reprocessing_pipeline.py)
from tools.scripts.lib.reprocessing_pipeline import (
BaseReprocessingPipeline,
ConsoleHooks,
)
class MyPipeline(BaseReprocessingPipeline):
def get_output_type(self) -> str:
return "my_transformation_v1"
def get_version_keys(self) -> list[str]:
return ["my_transformer"]
def process_archive(self, archive: YouTubeArchive) -> str:
return json.dumps(result)
pipeline = MyPipeline(hooks=ConsoleHooks())
stats = pipeline.run(limit=10)
Common Tasks
# Dry run (test)
uv run python tools/scripts/reprocess_qdrant_metadata.py --dry-run --limit 10
uv run python tools/scripts/reprocess_normalized_tags.py --dry-run --limit 3
# Full run (~12min Qdrant, ~2hr normalization for 470 videos)
uv run python tools/scripts/reprocess_qdrant_metadata.py
uv run python tools/scripts/reprocess_normalized_tags.py
# After version bump (only processes stale archives)
# Edit: tools/scripts/lib/transform_versions.py → "normalizer": "v1.1"
uv run python tools/scripts/reprocess_normalized_tags.py
# Options
--no-context # Skip semantic retrieval (faster, lower quality)
--no-vocabulary # Skip vocabulary normalization
Archive Data Structure
{
"llm_outputs": [
{"output_type": "tags", "cost_usd": 0.0012, "model": "claude-3-5-haiku-20241022"}
],
"derived_outputs": [
{
"output_type": "normalized_metadata_v1",
"transformer_version": "v1.0+v1",
"transform_manifest": {"normalizer": "v1.0", "vocabulary": "v1"},
"source_outputs": ["tags"]
}
]
}
llm_outputs cost money (permanent), derived_outputs free to regenerate (version-tracked)
Incremental Processing
Staleness detection: compare stored transform_manifest to current VERSIONS. Reprocess if changed, skip if match.
Performance: First run (all 470): ~12min Qdrant / ~2hr normalization. Subsequent runs: ~5s (skip if current). Version bump: only affected archives.
Creating New Transformations
Steps:
- Add version to
transform_versions.py:"my_transformer": "v1.0" - Create transformer class with
get_version()andtransform()methods - Create pipeline script extending
BaseReprocessingPipeline
# Transformer
class MyTransformer(BaseTransformer):
def get_version(self) -> str:
return get_version("my_transformer")
def transform(self, archive_data: dict) -> dict:
return {"result": "value"}
# Pipeline
class MyReprocessor(BaseReprocessingPipeline):
def get_output_type(self) -> str:
return "my_output_v1"
def get_version_keys(self) -> list[str]:
return ["my_transformer"]
def process_archive(self, archive: YouTubeArchive) -> str:
return json.dumps(MyTransformer().transform(archive.model_dump()))
Archive Service Integration
from tools.services.archive import create_local_archive_writer
from tools.scripts.lib.transform_versions import get_transform_manifest
writer = create_local_archive_writer()
# Add
writer.add_derived_output(
video_id="abc123",
output_type="my_transformation_v1",
output_value=json.dumps(result),
transformer_version="v1.0",
transform_manifest=get_transform_manifest(),
source_outputs=["tags"],
)
# Retrieve
archive = writer.get("abc123")
derived = archive.get_latest_derived_output("my_transformation_v1")
Observer Hooks
class MetricsHooks:
def on_start(self, total: int):
print(f"Starting {total} archives")
def on_archive_success(self, video_id: str, elapsed: float):
print(f"{video_id}: {elapsed:.2f}s")
def on_complete(self, stats: dict):
print(f"Done: {stats}")
pipeline = MyPipeline(hooks=MetricsHooks())
Documentation
tools/scripts/lib/README.md- Complete system docstools/scripts/lib/QUICKSTART.md- Quick referencelessons/lesson-010/COMPLETE.md- Tag normalization lesson