| name | dagster-local |
| description | Interact with Dagster data orchestration platform running locally or on Kubernetes. Use when Claude needs to monitor Dagster runs, get run logs, list assets/jobs, materialize assets, launch jobs, or debug pipeline failures. Supports both local Dagster dev server and Kubernetes deployments where each job run is a separate pod. |
Dagster Local Skill
Programmatic interaction with Dagster via GraphQL API and Kubernetes for pod-level logs.
Quick Start
from scripts.dagster_client import DagsterClient
client = DagsterClient() # defaults to http://localhost:3000/graphql
# List all assets
assets = client.list_assets()
# Get recent runs
runs = client.get_recent_runs(limit=10)
# Get logs for a specific run
logs = client.get_run_logs(run_id="abc123")
Configuration
client = DagsterClient(graphql_url="http://localhost:3000/graphql")
Available Operations
Query Operations
| Function | Purpose |
|---|---|
list_repositories() |
List all code locations/repositories |
list_jobs(repo_location, repo_name) |
List jobs in a repository |
list_assets(repo_location, repo_name) |
List assets in a repository |
get_recent_runs(limit) |
Get recent run history |
get_run_info(run_id) |
Get detailed run info and status |
get_run_logs(run_id) |
Get event logs for a run |
get_asset_info(asset_key) |
Get asset details and dependencies |
Mutation Operations
| Function | Purpose |
|---|---|
launch_job(repo_location, repo_name, job_name, config) |
Launch a job run |
materialize_asset(asset_key, repo_location, repo_name) |
Materialize an asset |
terminate_run(run_id) |
Terminate an in-progress run |
Kubernetes Integration
When Dagster runs on K8s (each run = separate pod):
from scripts.k8s_logs import get_pod_logs_for_run, get_dagster_pods
# Get pod logs for a specific run
logs = get_pod_logs_for_run(run_id="abc123", namespace="dagster")
# List all Dagster-related pods
pods = get_dagster_pods(namespace="dagster")
Debugging Workflow
- Check recent runs:
get_recent_runs()to find failed runs - Get run details:
get_run_info(run_id)for status and error summary - Get Dagster logs:
get_run_logs(run_id)for step-level events - Get pod logs (K8s):
get_pod_logs_for_run(run_id)for stdout/stderr
Common Patterns
Wait for Run Completion
import time
def wait_for_run(client, run_id, timeout=300):
start = time.time()
while time.time() - start < timeout:
info = client.get_run_info(run_id)
status = info.get("status")
if status in ["SUCCESS", "FAILURE", "CANCELED"]:
return info
time.sleep(5)
raise TimeoutError(f"Run {run_id} did not complete")
Get Failure Reason
def get_failure_reason(client, run_id):
logs = client.get_run_logs(run_id)
failures = [e for e in logs.get("events", []) if "error" in e]
return failures[-1] if failures else None
GraphQL Reference
For advanced queries, see references/graphql_queries.md.