Claude Code Plugins

Community-maintained marketplace

Feedback
0
0

Interact with Dagster data orchestration platform running locally or on Kubernetes. Use when Claude needs to monitor Dagster runs, get run logs, list assets/jobs, materialize assets, launch jobs, or debug pipeline failures. Supports both local Dagster dev server and Kubernetes deployments where each job run is a separate pod.

Install Skill

1Download skill
2Enable skills in Claude

Open claude.ai/settings/capabilities and find the "Skills" section

3Upload to Claude

Click "Upload skill" and select the downloaded ZIP file

Note: Please verify skill by going through its instructions before using it.

SKILL.md

name dagster-local
description Interact with Dagster data orchestration platform running locally or on Kubernetes. Use when Claude needs to monitor Dagster runs, get run logs, list assets/jobs, materialize assets, launch jobs, or debug pipeline failures. Supports both local Dagster dev server and Kubernetes deployments where each job run is a separate pod.

Dagster Local Skill

Programmatic interaction with Dagster via GraphQL API and Kubernetes for pod-level logs.

Quick Start

from scripts.dagster_client import DagsterClient

client = DagsterClient()  # defaults to http://localhost:3000/graphql

# List all assets
assets = client.list_assets()

# Get recent runs
runs = client.get_recent_runs(limit=10)

# Get logs for a specific run
logs = client.get_run_logs(run_id="abc123")

Configuration

client = DagsterClient(graphql_url="http://localhost:3000/graphql")

Available Operations

Query Operations

Function Purpose
list_repositories() List all code locations/repositories
list_jobs(repo_location, repo_name) List jobs in a repository
list_assets(repo_location, repo_name) List assets in a repository
get_recent_runs(limit) Get recent run history
get_run_info(run_id) Get detailed run info and status
get_run_logs(run_id) Get event logs for a run
get_asset_info(asset_key) Get asset details and dependencies

Mutation Operations

Function Purpose
launch_job(repo_location, repo_name, job_name, config) Launch a job run
materialize_asset(asset_key, repo_location, repo_name) Materialize an asset
terminate_run(run_id) Terminate an in-progress run

Kubernetes Integration

When Dagster runs on K8s (each run = separate pod):

from scripts.k8s_logs import get_pod_logs_for_run, get_dagster_pods

# Get pod logs for a specific run
logs = get_pod_logs_for_run(run_id="abc123", namespace="dagster")

# List all Dagster-related pods
pods = get_dagster_pods(namespace="dagster")

Debugging Workflow

  1. Check recent runs: get_recent_runs() to find failed runs
  2. Get run details: get_run_info(run_id) for status and error summary
  3. Get Dagster logs: get_run_logs(run_id) for step-level events
  4. Get pod logs (K8s): get_pod_logs_for_run(run_id) for stdout/stderr

Common Patterns

Wait for Run Completion

import time

def wait_for_run(client, run_id, timeout=300):
    start = time.time()
    while time.time() - start < timeout:
        info = client.get_run_info(run_id)
        status = info.get("status")
        if status in ["SUCCESS", "FAILURE", "CANCELED"]:
            return info
        time.sleep(5)
    raise TimeoutError(f"Run {run_id} did not complete")

Get Failure Reason

def get_failure_reason(client, run_id):
    logs = client.get_run_logs(run_id)
    failures = [e for e in logs.get("events", []) if "error" in e]
    return failures[-1] if failures else None

GraphQL Reference

For advanced queries, see references/graphql_queries.md.