Claude Code Plugins

Community-maintained marketplace

Feedback

dlt (data load tool) patterns for SignalRoom ETL pipelines. Use when creating sources, debugging pipeline failures, understanding schema evolution, or implementing incremental loading.

Install Skill

1Download skill
2Enable skills in Claude

Open claude.ai/settings/capabilities and find the "Skills" section

3Upload to Claude

Click "Upload skill" and select the downloaded ZIP file

Note: Please verify skill by going through its instructions before using it.

SKILL.md

name dlt
description dlt (data load tool) patterns for SignalRoom ETL pipelines. Use when creating sources, debugging pipeline failures, understanding schema evolution, or implementing incremental loading.

dlt Data Load Tool

Core Concepts

dlt handles extract, normalize, and load. You define sources and resources; dlt handles schema inference, table creation, and loading.

Source Structure

src/signalroom/sources/{source_name}/
└── __init__.py  # Contains @dlt.source and @dlt.resource

Creating a New Source

import dlt
from signalroom.common import settings

@dlt.source(name="my_source")
def my_source():
    """Source docstring appears in dlt metadata."""

    @dlt.resource(write_disposition="append", primary_key="id")
    def my_resource():
        yield from fetch_data()

    return [my_resource]

Register in Pipeline Runner

Add to src/signalroom/pipelines/runner.py:

SOURCES = {
    "my_source": "signalroom.sources.my_source:my_source",
}

Write Dispositions

Mode Use Case Behavior
append Immutable events (clicks, conversions) Always insert new rows
merge Mutable entities (campaigns, contacts) Upsert by primary_key
replace Full refresh (feature flags, config) Drop and recreate table

Incremental Loading

Only fetch new data since last run:

@dlt.resource(write_disposition="append", primary_key="id")
def events(
    updated_at: dlt.sources.incremental[str] = dlt.sources.incremental(
        "updated_at",
        initial_value="2024-01-01"
    )
):
    # Only fetches records after last loaded timestamp
    yield from api.get_events(since=updated_at.last_value)

WARNING: High-Volume Sources

dlt.sources.incremental tracks every row for deduplication. If many rows share the same cursor value, this causes O(n²) performance.

Rows per cursor value Overhead Recommendation
< 100 Negligible Use incremental
100 - 1,000 Noticeable Monitor performance
> 1,000 Severe Use file-level state instead

For high-volume sources (like S3 CSV imports), use dlt.current.resource_state() for file-level tracking:

@dlt.resource(write_disposition="merge", primary_key=["file_name", "row_id"])
def csv_resource():
    state = dlt.current.resource_state()
    last_date = state.get("last_file_date", "2024-01-01")

    for file in get_files_since(last_date):
        yield from process_file(file)
        state["last_file_date"] = file.date  # Manual state update

Primary Keys

Required for merge disposition:

# Single key
@dlt.resource(primary_key="id")

# Composite key
@dlt.resource(primary_key=["date", "affiliate_id"])

Schema Evolution

dlt auto-evolves schemas. New columns added automatically. To see current schema:

SELECT * FROM {schema}._dlt_loads ORDER BY inserted_at DESC LIMIT 5;

Debugging Failed Loads

Check dlt metadata tables

-- Recent loads
SELECT load_id, schema_name, status, inserted_at
FROM {schema}._dlt_loads
ORDER BY inserted_at DESC LIMIT 10;

-- Pipeline state
SELECT * FROM {schema}._dlt_pipeline_state;

Common Errors

"Primary key violation"

  • Using append when you need merge
  • Duplicate records in source data

"Column type mismatch"

  • Schema evolved incompatibly
  • Fix: Drop table or add explicit column hints

"Connection refused"

  • Check Supabase pooler settings (port 6543, user format)

Drop Pending Packages

If pipeline is stuck:

dlt pipeline {pipeline_name} drop-pending-packages

SignalRoom Sources

Source Write Mode Primary Key State Tracking
s3_exports merge _file_name, _row_id File-level (resource_state)
everflow merge date, affiliate_id, advertiser_id Row-level (incremental)
redtrack merge date, source_id Row-level (incremental)

Testing Locally

Use DuckDB for fast local testing:

pipeline = dlt.pipeline(
    pipeline_name="test",
    destination="duckdb",
    dataset_name="test"
)

Resources