| name | dlt |
| description | dlt (data load tool) patterns for SignalRoom ETL pipelines. Use when creating sources, debugging pipeline failures, understanding schema evolution, or implementing incremental loading. |
dlt Data Load Tool
Core Concepts
dlt handles extract, normalize, and load. You define sources and resources; dlt handles schema inference, table creation, and loading.
Source Structure
src/signalroom/sources/{source_name}/
└── __init__.py # Contains @dlt.source and @dlt.resource
Creating a New Source
import dlt
from signalroom.common import settings
@dlt.source(name="my_source")
def my_source():
"""Source docstring appears in dlt metadata."""
@dlt.resource(write_disposition="append", primary_key="id")
def my_resource():
yield from fetch_data()
return [my_resource]
Register in Pipeline Runner
Add to src/signalroom/pipelines/runner.py:
SOURCES = {
"my_source": "signalroom.sources.my_source:my_source",
}
Write Dispositions
| Mode | Use Case | Behavior |
|---|---|---|
append |
Immutable events (clicks, conversions) | Always insert new rows |
merge |
Mutable entities (campaigns, contacts) | Upsert by primary_key |
replace |
Full refresh (feature flags, config) | Drop and recreate table |
Incremental Loading
Only fetch new data since last run:
@dlt.resource(write_disposition="append", primary_key="id")
def events(
updated_at: dlt.sources.incremental[str] = dlt.sources.incremental(
"updated_at",
initial_value="2024-01-01"
)
):
# Only fetches records after last loaded timestamp
yield from api.get_events(since=updated_at.last_value)
WARNING: High-Volume Sources
dlt.sources.incremental tracks every row for deduplication. If many rows share the same cursor value, this causes O(n²) performance.
| Rows per cursor value | Overhead | Recommendation |
|---|---|---|
| < 100 | Negligible | Use incremental |
| 100 - 1,000 | Noticeable | Monitor performance |
| > 1,000 | Severe | Use file-level state instead |
For high-volume sources (like S3 CSV imports), use dlt.current.resource_state() for file-level tracking:
@dlt.resource(write_disposition="merge", primary_key=["file_name", "row_id"])
def csv_resource():
state = dlt.current.resource_state()
last_date = state.get("last_file_date", "2024-01-01")
for file in get_files_since(last_date):
yield from process_file(file)
state["last_file_date"] = file.date # Manual state update
Primary Keys
Required for merge disposition:
# Single key
@dlt.resource(primary_key="id")
# Composite key
@dlt.resource(primary_key=["date", "affiliate_id"])
Schema Evolution
dlt auto-evolves schemas. New columns added automatically. To see current schema:
SELECT * FROM {schema}._dlt_loads ORDER BY inserted_at DESC LIMIT 5;
Debugging Failed Loads
Check dlt metadata tables
-- Recent loads
SELECT load_id, schema_name, status, inserted_at
FROM {schema}._dlt_loads
ORDER BY inserted_at DESC LIMIT 10;
-- Pipeline state
SELECT * FROM {schema}._dlt_pipeline_state;
Common Errors
"Primary key violation"
- Using
appendwhen you needmerge - Duplicate records in source data
"Column type mismatch"
- Schema evolved incompatibly
- Fix: Drop table or add explicit column hints
"Connection refused"
- Check Supabase pooler settings (port 6543, user format)
Drop Pending Packages
If pipeline is stuck:
dlt pipeline {pipeline_name} drop-pending-packages
SignalRoom Sources
| Source | Write Mode | Primary Key | State Tracking |
|---|---|---|---|
s3_exports |
merge | _file_name, _row_id |
File-level (resource_state) |
everflow |
merge | date, affiliate_id, advertiser_id |
Row-level (incremental) |
redtrack |
merge | date, source_id |
Row-level (incremental) |
Testing Locally
Use DuckDB for fast local testing:
pipeline = dlt.pipeline(
pipeline_name="test",
destination="duckdb",
dataset_name="test"
)
Resources
- dlt Documentation
- Write Dispositions
- Schema Evolution
- SignalRoom API Reference:
docs/API_REFERENCE.md— Live docs, auth, request/response examples