Claude Code Plugins

Community-maintained marketplace

Feedback
5
0

|

Install Skill

1Download skill
2Enable skills in Claude

Open claude.ai/settings/capabilities and find the "Skills" section

3Upload to Claude

Click "Upload skill" and select the downloaded ZIP file

Note: Please verify skill by going through its instructions before using it.

SKILL.md

name dagster-orchestration
description ALWAYS USE when working with Dagster assets, resources, IO managers, schedules, sensors, or dbt integration. CRITICAL for: @asset decorators, @dbt_assets, DbtCliResource, ConfigurableResource, IO managers, partitions. Enforces CATALOG-AS-CONTROL-PLANE architecture - ALL Iceberg writes via catalog (Polaris/Glue). Provides pluggable orchestration patterns abstractable to Airflow/Prefect. Compute abstraction: DuckDB (default), Spark, Snowflake - all via dbt.

Dagster Orchestration with dbt Integration

Critical Architecture: Catalog-as-Control-Plane

⚠️ NEVER write directly to storage. ALL table operations MUST flow through catalog:

┌─────────────────────────────────────────────────────────────┐
│                   DAGSTER ORCHESTRATION                      │
│  (Schedule → Sensor → Asset Graph → Materialization)         │
└─────────────────────────────────────────────────────────────┘
                              │
                              ▼
┌─────────────────────────────────────────────────────────────┐
│                    dbt TRANSFORMATIONS                       │
│  (SQL owns transformations - NEVER parse SQL in Python)      │
└─────────────────────────────────────────────────────────────┘
                              │
              ┌───────────────┼───────────────┐
              │               │               │
              ▼               ▼               ▼
        ┌─────────┐     ┌─────────┐     ┌─────────┐
        │ DuckDB  │     │  Spark  │     │Snowflake│
        │(default)│     │ (scale) │     │(analytic)│
        └────┬────┘     └────┬────┘     └────┬────┘
              │               │               │
              └───────────────┼───────────────┘
                              │
                              ▼
┌─────────────────────────────────────────────────────────────┐
│               POLARIS / GLUE CATALOG (REST API)              │
│     ⚡ CONTROL PLANE - ACID, Schema, Access, Governance ⚡   │
└─────────────────────────────────────────────────────────────┘
                              │
                              ▼
┌─────────────────────────────────────────────────────────────┐
│                    ICEBERG TABLES                            │
│                   (S3 / Azure / GCS)                         │
└─────────────────────────────────────────────────────────────┘

Why Catalog-as-Control-Plane?

  • ACID transactions across ALL compute engines
  • Schema evolution coordination (engines see same schema)
  • Access control and governance (row/column masking)
  • Multi-engine interoperability (DuckDB + Spark + dbt query same tables)
  • See: references/catalog-control-plane.md

Pluggable Orchestration Design

Design assets as pure functions runnable in ANY orchestrator:

Concept Dagster Airflow Prefect
Unit of Work @asset @task @task
Dependencies Asset deps Task deps Task deps
Scheduling @schedule DAG schedule Deployment
Event-driven @sensor Sensor Event handlers
Configuration ConfigurableResource Connection/Variable Block

See: references/orchestration-abstraction.md

Pre-Implementation Checklist

Step 1: Verify Runtime Environment

# ALWAYS run first
python -c "import dagster; print(f'Dagster {dagster.__version__}')"
python -c "import dagster_dbt; print(f'dagster-dbt {dagster_dbt.__version__}')"
python -c "import dagster_iceberg; print(f'dagster-iceberg installed')"
dbt --version

Step 2: Discover Existing Patterns

# Find Dagster definitions
rg "@asset|@multi_asset|@dbt_assets" --type py
rg "ConfigurableResource|IOManager" --type py
rg "dg.Definitions|Definitions\(" --type py

# Find dbt project
find . -name "dbt_project.yml"
find . -name "manifest.json" -path "*/target/*"

# Check catalog configuration
cat platform.yaml | grep -A 20 "catalogs:"

Step 3: Understand Platform Configuration

# Two-tier config: platform.yaml (credentials) + floe.yaml (logical refs)
cat platform.yaml    # Engineers NEVER see credentials in code
cat floe.yaml        # Data engineers reference: catalog: default

dbt Integration (Primary Pattern)

Pattern 1: Load dbt Assets from Manifest

from pathlib import Path
from dagster import AssetExecutionContext, Definitions
from dagster_dbt import DbtCliResource, DbtProject, dbt_assets

dbt_project = DbtProject(
    project_dir=Path(__file__).parent / "dbt",
    packaged_project_dir=Path(__file__).parent / "dbt-project",
)
dbt_project.prepare_if_dev()  # Hot-reload in dev

@dbt_assets(manifest=dbt_project.manifest_path)
def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
    """Execute dbt build - yields Dagster events for each model."""
    yield from dbt.cli(["build"], context=context).stream()

defs = Definitions(
    assets=[my_dbt_assets],
    resources={"dbt": DbtCliResource(project_dir=dbt_project)},
)

Pattern 2: Custom DagsterDbtTranslator

from typing import Any, Mapping, Optional
from dagster import AssetKey
from dagster_dbt import DagsterDbtTranslator, DagsterDbtTranslatorSettings

class FloeDbTranslator(DagsterDbtTranslator):
    """Translator for floe-runtime architecture."""
    
    def __init__(self):
        super().__init__(
            settings=DagsterDbtTranslatorSettings(
                enable_code_references=True,
                enable_source_tests_as_checks=True,
            )
        )
    
    def get_asset_key(self, dbt_resource_props: Mapping[str, Any]) -> AssetKey:
        """Map dbt models to Dagster asset keys with namespace."""
        schema = dbt_resource_props.get("schema", "default")
        name = dbt_resource_props["name"]
        return AssetKey([schema, name])
    
    def get_group_name(self, dbt_resource_props: Mapping[str, Any]) -> Optional[str]:
        """Group by dbt folder structure."""
        fqn = dbt_resource_props.get("fqn", [])
        return fqn[1] if len(fqn) > 2 else None
    
    def get_metadata(self, dbt_resource_props: Mapping[str, Any]) -> Mapping[str, Any]:
        """Extract governance metadata from dbt model meta."""
        meta = dbt_resource_props.get("meta", {})
        return {
            "classification": meta.get("classification"),
            "owner": meta.get("owner"),
            "sla": meta.get("sla"),
        }

See: references/dbt-integration.md for complete patterns

Pattern 3: Partitioned dbt Assets

from dagster import DailyPartitionsDefinition

daily_partitions = DailyPartitionsDefinition(start_date="2024-01-01")

@dbt_assets(
    manifest=dbt_project.manifest_path,
    partitions_def=daily_partitions,
)
def partitioned_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
    partition_date = context.partition_key
    yield from dbt.cli(
        ["build", "--vars", f'{{"run_date": "{partition_date}"}}'],
        context=context,
    ).stream()

Compute Target Integration

DuckDB (Default - Ephemeral Compute via dbt-duckdb)

DuckDB reads/writes via catalog ATTACH:

-- dbt-duckdb plugin automatically executes:
ATTACH 'demo_catalog' AS polaris_catalog (
    TYPE ICEBERG,
    CLIENT_ID '{{ env_var("POLARIS_CLIENT_ID") }}',
    CLIENT_SECRET '{{ env_var("POLARIS_CLIENT_SECRET") }}',
    ENDPOINT '{{ env_var("POLARIS_URI") }}'
);

Snowflake (Analytical Compute)

-- External Iceberg via Polaris integration
CREATE OR REPLACE ICEBERG TABLE gold.metrics
    CATALOG = 'polaris_catalog'
    EXTERNAL_VOLUME = 'iceberg_volume'
AS SELECT * FROM silver.orders;

Spark (Distributed Compute)

@asset(kinds={"spark"})
def spark_asset(spark: SparkResource):
    spark.spark_session.sql("""
        INSERT INTO polaris_catalog.gold.metrics
        SELECT * FROM polaris_catalog.silver.orders
    """)

See: references/compute-abstraction.md

IO Manager Patterns

Iceberg IO Manager (Catalog-Controlled)

from dagster_iceberg.config import IcebergCatalogConfig
from dagster_iceberg.io_manager.arrow import PyArrowIcebergIOManager

iceberg_io_manager = PyArrowIcebergIOManager(
    name="polaris_catalog",
    config=IcebergCatalogConfig(
        properties={
            "type": "rest",
            "uri": "http://polaris:8181/api/catalog",
            "credential": f"{client_id}:{client_secret}",
            "warehouse": "demo_catalog",
        }
    ),
    namespace="default",
)

See: references/io-managers.md

Environment-Based Resource Switching

import os
from dagster import EnvVar

def get_resources_for_env() -> dict:
    env = os.getenv("DAGSTER_DEPLOYMENT", "local")
    
    base_resources = {"dbt": DbtCliResource(project_dir=dbt_project)}
    
    if env == "local":
        return {**base_resources, "io_manager": local_iceberg_io()}
    elif env == "production":
        return {**base_resources, "io_manager": prod_iceberg_io()}

Validation Workflow

Before Implementation

  • Verified Dagster + dagster-dbt versions
  • Located dbt project and manifest.json
  • Understood catalog configuration (Polaris/Glue)
  • Identified compute targets (DuckDB/Snowflake/Spark)
  • Read /docs/ for CompiledArtifacts contract

During Implementation

  • Using @dbt_assets for dbt models
  • Custom DagsterDbtTranslator for metadata
  • IO manager uses catalog (NOT direct storage writes)
  • Resources configured per environment
  • Partitions aligned with dbt vars

After Implementation

  • Run dagster dev - verify assets appear
  • Materialize assets manually
  • Verify data lineage in UI
  • Check Polaris catalog for table metadata
  • Test schedules/sensors

Anti-Patterns to Avoid

Don't write to Iceberg without going through catalog ❌ Don't hardcode compute logic (use dbt for SQL transforms) ❌ Don't mix Dagster partitions with dbt incremental without alignment ❌ Don't use deprecated load_assets_from_dbt_manifest()Don't bypass DbtCliResource for dbt execution ❌ Don't store credentials in code (use EnvVar or secret_ref) ❌ Don't parse SQL in Python (dbt owns SQL)

Reference Documentation

Document Purpose
references/dbt-integration.md Complete dbt-Dagster patterns
references/compute-abstraction.md DuckDB, Spark, Snowflake patterns
references/io-managers.md Iceberg IO managers, storage layer
references/orchestration-abstraction.md Pluggable Airflow/Prefect patterns
references/catalog-control-plane.md CRITICAL architecture doc
API-REFERENCE.md Dagster SDK quick reference

Quick Reference: Research Queries

When uncertain, search:

  • "Dagster dbt_assets decorator examples 2025"
  • "DagsterDbtTranslator custom implementation 2025"
  • "dagster-iceberg PyArrowIcebergIOManager 2025"
  • "DuckDB Iceberg REST catalog ATTACH 2025"

Remember: Design for abstraction. Dagster orchestrates, dbt owns SQL, catalog controls storage.