Claude Code Plugins

Community-maintained marketplace

Feedback

archive-reprocessing

@ilude/claude-code-config
3
0

Flexible, version-tracked reprocessing system for archive transformations using design patterns (Strategy, Template Method, Observer). Activate when working with tools/scripts/lib/, reprocessing scripts, transform versions, archive transformations, metadata transformers, or incremental processing workflows.

Install Skill

1Download skill
2Enable skills in Claude

Open claude.ai/settings/capabilities and find the "Skills" section

3Upload to Claude

Click "Upload skill" and select the downloaded ZIP file

Note: Please verify skill by going through its instructions before using it.

SKILL.md

name archive-reprocessing
description Flexible, version-tracked reprocessing system for archive transformations using design patterns (Strategy, Template Method, Observer). Activate when working with tools/scripts/lib/, reprocessing scripts, transform versions, archive transformations, metadata transformers, or incremental processing workflows.

Archive Reprocessing System

Auto-activates when: Working with tools/scripts/lib/, reprocessing scripts, transform versions, or discussing archive transformations.

Quick Reference

When to use:

  • New transformations on existing archives (flattening, normalization)
  • After version bumps (v1.0 → v1.1)
  • Vocabulary/model updates
  • Regenerating derived outputs

When NOT to use:

  • Adding new videos → ingest_youtube.py
  • Rebuilding Qdrant cache → reingest_from_archive.py

Scripts:

  • reprocess_qdrant_metadata.py - Flattening + weights (fast, no LLM)
  • reprocess_normalized_tags.py - Tag normalization (slow, LLM calls)

Common commands:

# Dry run (test 10 archives)
uv run python tools/scripts/reprocess_qdrant_metadata.py --dry-run --limit 10

# Full run
uv run python tools/scripts/reprocess_qdrant_metadata.py

Design

Strategy + Template Method + Observer patterns for incremental, version-tracked reprocessing:

  • Pluggable transformers (Strategy) eliminate copy-paste
  • Template Method for consistent workflows
  • Observer hooks for progress tracking
  • Semantic versioning works during development (not git-dependent)
  • Incremental processing skips unchanged archives

Core Components

1. Version Registry (transform_versions.py)

VERSIONS = {
    "normalizer": "v1.0",
    "vocabulary": "v1",
    "qdrant_flattener": "v1.0",
    "weight_calculator": "v1.0",
    "llm_model": "claude-3-5-haiku-20241022",
}

Bump versions on: logic changes (v1.0 → v1.1), vocabulary updates (v1 → v2), breaking changes (v1.x → v2.0)

2. Metadata Transformers (metadata_transformers.py)

from tools.scripts.lib.metadata_transformers import (
    QdrantMetadataFlattener,
    RecommendationWeightCalculator,
    create_qdrant_transformer,
)

# Pre-built
transformer = create_qdrant_transformer()
metadata = transformer.transform(archive_data)

# Custom
class MyTransformer(BaseTransformer):
    def get_version(self) -> str:
        return get_version("my_transformer")

    def transform(self, archive_data: dict) -> dict:
        return {"transformed": True}

3. Reprocessing Pipeline (reprocessing_pipeline.py)

from tools.scripts.lib.reprocessing_pipeline import (
    BaseReprocessingPipeline,
    ConsoleHooks,
)

class MyPipeline(BaseReprocessingPipeline):
    def get_output_type(self) -> str:
        return "my_transformation_v1"

    def get_version_keys(self) -> list[str]:
        return ["my_transformer"]

    def process_archive(self, archive: YouTubeArchive) -> str:
        return json.dumps(result)

pipeline = MyPipeline(hooks=ConsoleHooks())
stats = pipeline.run(limit=10)

Common Tasks

# Dry run (test)
uv run python tools/scripts/reprocess_qdrant_metadata.py --dry-run --limit 10
uv run python tools/scripts/reprocess_normalized_tags.py --dry-run --limit 3

# Full run (~12min Qdrant, ~2hr normalization for 470 videos)
uv run python tools/scripts/reprocess_qdrant_metadata.py
uv run python tools/scripts/reprocess_normalized_tags.py

# After version bump (only processes stale archives)
# Edit: tools/scripts/lib/transform_versions.py → "normalizer": "v1.1"
uv run python tools/scripts/reprocess_normalized_tags.py

# Options
--no-context     # Skip semantic retrieval (faster, lower quality)
--no-vocabulary  # Skip vocabulary normalization

Archive Data Structure

{
  "llm_outputs": [
    {"output_type": "tags", "cost_usd": 0.0012, "model": "claude-3-5-haiku-20241022"}
  ],
  "derived_outputs": [
    {
      "output_type": "normalized_metadata_v1",
      "transformer_version": "v1.0+v1",
      "transform_manifest": {"normalizer": "v1.0", "vocabulary": "v1"},
      "source_outputs": ["tags"]
    }
  ]
}

llm_outputs cost money (permanent), derived_outputs free to regenerate (version-tracked)

Incremental Processing

Staleness detection: compare stored transform_manifest to current VERSIONS. Reprocess if changed, skip if match.

Performance: First run (all 470): ~12min Qdrant / ~2hr normalization. Subsequent runs: ~5s (skip if current). Version bump: only affected archives.

Creating New Transformations

Steps:

  1. Add version to transform_versions.py: "my_transformer": "v1.0"
  2. Create transformer class with get_version() and transform() methods
  3. Create pipeline script extending BaseReprocessingPipeline
# Transformer
class MyTransformer(BaseTransformer):
    def get_version(self) -> str:
        return get_version("my_transformer")

    def transform(self, archive_data: dict) -> dict:
        return {"result": "value"}

# Pipeline
class MyReprocessor(BaseReprocessingPipeline):
    def get_output_type(self) -> str:
        return "my_output_v1"

    def get_version_keys(self) -> list[str]:
        return ["my_transformer"]

    def process_archive(self, archive: YouTubeArchive) -> str:
        return json.dumps(MyTransformer().transform(archive.model_dump()))

Archive Service Integration

from tools.services.archive import create_local_archive_writer
from tools.scripts.lib.transform_versions import get_transform_manifest

writer = create_local_archive_writer()

# Add
writer.add_derived_output(
    video_id="abc123",
    output_type="my_transformation_v1",
    output_value=json.dumps(result),
    transformer_version="v1.0",
    transform_manifest=get_transform_manifest(),
    source_outputs=["tags"],
)

# Retrieve
archive = writer.get("abc123")
derived = archive.get_latest_derived_output("my_transformation_v1")

Observer Hooks

class MetricsHooks:
    def on_start(self, total: int):
        print(f"Starting {total} archives")
    def on_archive_success(self, video_id: str, elapsed: float):
        print(f"{video_id}: {elapsed:.2f}s")
    def on_complete(self, stats: dict):
        print(f"Done: {stats}")

pipeline = MyPipeline(hooks=MetricsHooks())

Documentation

  • tools/scripts/lib/README.md - Complete system docs
  • tools/scripts/lib/QUICKSTART.md - Quick reference
  • lessons/lesson-010/COMPLETE.md - Tag normalization lesson