| name | dagster-orchestration |
| description | ALWAYS USE when working with Dagster assets, resources, IO managers, schedules, sensors, or dbt integration. CRITICAL for: @asset decorators, @dbt_assets, DbtCliResource, ConfigurableResource, IO managers, partitions. Enforces CATALOG-AS-CONTROL-PLANE architecture - ALL Iceberg writes via catalog (Polaris/Glue). Provides pluggable orchestration patterns abstractable to Airflow/Prefect. Compute abstraction: DuckDB (default), Spark, Snowflake - all via dbt. |
Dagster Orchestration with dbt Integration
Critical Architecture: Catalog-as-Control-Plane
⚠️ NEVER write directly to storage. ALL table operations MUST flow through catalog:
┌─────────────────────────────────────────────────────────────┐
│ DAGSTER ORCHESTRATION │
│ (Schedule → Sensor → Asset Graph → Materialization) │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ dbt TRANSFORMATIONS │
│ (SQL owns transformations - NEVER parse SQL in Python) │
└─────────────────────────────────────────────────────────────┘
│
┌───────────────┼───────────────┐
│ │ │
▼ ▼ ▼
┌─────────┐ ┌─────────┐ ┌─────────┐
│ DuckDB │ │ Spark │ │Snowflake│
│(default)│ │ (scale) │ │(analytic)│
└────┬────┘ └────┬────┘ └────┬────┘
│ │ │
└───────────────┼───────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ POLARIS / GLUE CATALOG (REST API) │
│ ⚡ CONTROL PLANE - ACID, Schema, Access, Governance ⚡ │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ ICEBERG TABLES │
│ (S3 / Azure / GCS) │
└─────────────────────────────────────────────────────────────┘
Why Catalog-as-Control-Plane?
- ACID transactions across ALL compute engines
- Schema evolution coordination (engines see same schema)
- Access control and governance (row/column masking)
- Multi-engine interoperability (DuckDB + Spark + dbt query same tables)
- See:
references/catalog-control-plane.md
Pluggable Orchestration Design
Design assets as pure functions runnable in ANY orchestrator:
| Concept | Dagster | Airflow | Prefect |
|---|---|---|---|
| Unit of Work | @asset |
@task |
@task |
| Dependencies | Asset deps | Task deps | Task deps |
| Scheduling | @schedule |
DAG schedule | Deployment |
| Event-driven | @sensor |
Sensor | Event handlers |
| Configuration | ConfigurableResource |
Connection/Variable | Block |
See: references/orchestration-abstraction.md
Pre-Implementation Checklist
Step 1: Verify Runtime Environment
# ALWAYS run first
python -c "import dagster; print(f'Dagster {dagster.__version__}')"
python -c "import dagster_dbt; print(f'dagster-dbt {dagster_dbt.__version__}')"
python -c "import dagster_iceberg; print(f'dagster-iceberg installed')"
dbt --version
Step 2: Discover Existing Patterns
# Find Dagster definitions
rg "@asset|@multi_asset|@dbt_assets" --type py
rg "ConfigurableResource|IOManager" --type py
rg "dg.Definitions|Definitions\(" --type py
# Find dbt project
find . -name "dbt_project.yml"
find . -name "manifest.json" -path "*/target/*"
# Check catalog configuration
cat platform.yaml | grep -A 20 "catalogs:"
Step 3: Understand Platform Configuration
# Two-tier config: platform.yaml (credentials) + floe.yaml (logical refs)
cat platform.yaml # Engineers NEVER see credentials in code
cat floe.yaml # Data engineers reference: catalog: default
dbt Integration (Primary Pattern)
Pattern 1: Load dbt Assets from Manifest
from pathlib import Path
from dagster import AssetExecutionContext, Definitions
from dagster_dbt import DbtCliResource, DbtProject, dbt_assets
dbt_project = DbtProject(
project_dir=Path(__file__).parent / "dbt",
packaged_project_dir=Path(__file__).parent / "dbt-project",
)
dbt_project.prepare_if_dev() # Hot-reload in dev
@dbt_assets(manifest=dbt_project.manifest_path)
def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
"""Execute dbt build - yields Dagster events for each model."""
yield from dbt.cli(["build"], context=context).stream()
defs = Definitions(
assets=[my_dbt_assets],
resources={"dbt": DbtCliResource(project_dir=dbt_project)},
)
Pattern 2: Custom DagsterDbtTranslator
from typing import Any, Mapping, Optional
from dagster import AssetKey
from dagster_dbt import DagsterDbtTranslator, DagsterDbtTranslatorSettings
class FloeDbTranslator(DagsterDbtTranslator):
"""Translator for floe-runtime architecture."""
def __init__(self):
super().__init__(
settings=DagsterDbtTranslatorSettings(
enable_code_references=True,
enable_source_tests_as_checks=True,
)
)
def get_asset_key(self, dbt_resource_props: Mapping[str, Any]) -> AssetKey:
"""Map dbt models to Dagster asset keys with namespace."""
schema = dbt_resource_props.get("schema", "default")
name = dbt_resource_props["name"]
return AssetKey([schema, name])
def get_group_name(self, dbt_resource_props: Mapping[str, Any]) -> Optional[str]:
"""Group by dbt folder structure."""
fqn = dbt_resource_props.get("fqn", [])
return fqn[1] if len(fqn) > 2 else None
def get_metadata(self, dbt_resource_props: Mapping[str, Any]) -> Mapping[str, Any]:
"""Extract governance metadata from dbt model meta."""
meta = dbt_resource_props.get("meta", {})
return {
"classification": meta.get("classification"),
"owner": meta.get("owner"),
"sla": meta.get("sla"),
}
See: references/dbt-integration.md for complete patterns
Pattern 3: Partitioned dbt Assets
from dagster import DailyPartitionsDefinition
daily_partitions = DailyPartitionsDefinition(start_date="2024-01-01")
@dbt_assets(
manifest=dbt_project.manifest_path,
partitions_def=daily_partitions,
)
def partitioned_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
partition_date = context.partition_key
yield from dbt.cli(
["build", "--vars", f'{{"run_date": "{partition_date}"}}'],
context=context,
).stream()
Compute Target Integration
DuckDB (Default - Ephemeral Compute via dbt-duckdb)
DuckDB reads/writes via catalog ATTACH:
-- dbt-duckdb plugin automatically executes:
ATTACH 'demo_catalog' AS polaris_catalog (
TYPE ICEBERG,
CLIENT_ID '{{ env_var("POLARIS_CLIENT_ID") }}',
CLIENT_SECRET '{{ env_var("POLARIS_CLIENT_SECRET") }}',
ENDPOINT '{{ env_var("POLARIS_URI") }}'
);
Snowflake (Analytical Compute)
-- External Iceberg via Polaris integration
CREATE OR REPLACE ICEBERG TABLE gold.metrics
CATALOG = 'polaris_catalog'
EXTERNAL_VOLUME = 'iceberg_volume'
AS SELECT * FROM silver.orders;
Spark (Distributed Compute)
@asset(kinds={"spark"})
def spark_asset(spark: SparkResource):
spark.spark_session.sql("""
INSERT INTO polaris_catalog.gold.metrics
SELECT * FROM polaris_catalog.silver.orders
""")
See: references/compute-abstraction.md
IO Manager Patterns
Iceberg IO Manager (Catalog-Controlled)
from dagster_iceberg.config import IcebergCatalogConfig
from dagster_iceberg.io_manager.arrow import PyArrowIcebergIOManager
iceberg_io_manager = PyArrowIcebergIOManager(
name="polaris_catalog",
config=IcebergCatalogConfig(
properties={
"type": "rest",
"uri": "http://polaris:8181/api/catalog",
"credential": f"{client_id}:{client_secret}",
"warehouse": "demo_catalog",
}
),
namespace="default",
)
See: references/io-managers.md
Environment-Based Resource Switching
import os
from dagster import EnvVar
def get_resources_for_env() -> dict:
env = os.getenv("DAGSTER_DEPLOYMENT", "local")
base_resources = {"dbt": DbtCliResource(project_dir=dbt_project)}
if env == "local":
return {**base_resources, "io_manager": local_iceberg_io()}
elif env == "production":
return {**base_resources, "io_manager": prod_iceberg_io()}
Validation Workflow
Before Implementation
- Verified Dagster + dagster-dbt versions
- Located dbt project and manifest.json
- Understood catalog configuration (Polaris/Glue)
- Identified compute targets (DuckDB/Snowflake/Spark)
- Read
/docs/for CompiledArtifacts contract
During Implementation
- Using
@dbt_assetsfor dbt models - Custom DagsterDbtTranslator for metadata
- IO manager uses catalog (NOT direct storage writes)
- Resources configured per environment
- Partitions aligned with dbt vars
After Implementation
- Run
dagster dev- verify assets appear - Materialize assets manually
- Verify data lineage in UI
- Check Polaris catalog for table metadata
- Test schedules/sensors
Anti-Patterns to Avoid
❌ Don't write to Iceberg without going through catalog
❌ Don't hardcode compute logic (use dbt for SQL transforms)
❌ Don't mix Dagster partitions with dbt incremental without alignment
❌ Don't use deprecated load_assets_from_dbt_manifest()
❌ Don't bypass DbtCliResource for dbt execution
❌ Don't store credentials in code (use EnvVar or secret_ref)
❌ Don't parse SQL in Python (dbt owns SQL)
Reference Documentation
| Document | Purpose |
|---|---|
references/dbt-integration.md |
Complete dbt-Dagster patterns |
references/compute-abstraction.md |
DuckDB, Spark, Snowflake patterns |
references/io-managers.md |
Iceberg IO managers, storage layer |
references/orchestration-abstraction.md |
Pluggable Airflow/Prefect patterns |
references/catalog-control-plane.md |
CRITICAL architecture doc |
API-REFERENCE.md |
Dagster SDK quick reference |
Quick Reference: Research Queries
When uncertain, search:
- "Dagster dbt_assets decorator examples 2025"
- "DagsterDbtTranslator custom implementation 2025"
- "dagster-iceberg PyArrowIcebergIOManager 2025"
- "DuckDB Iceberg REST catalog ATTACH 2025"
Remember: Design for abstraction. Dagster orchestrates, dbt owns SQL, catalog controls storage.