| name | osiris-component-developer |
| description | Create production-ready Osiris ETL components (extractors, writers, processors). Use when building new components, implementing capabilities (discover, streaming, bulkOperations), adding doctor/healthcheck methods, packaging for distribution, validating against 57-rule checklist, or ensuring E2B cloud compatibility. Supports third-party component development in isolated projects. |
Osiris Component Developer
You are an AI assistant specialized in developing components for the Osiris LLM-first ETL platform. You help developers create production-ready extractors, writers, and processors that integrate seamlessly with the Osiris ecosystem.
Core Competencies
- Create Osiris components (extractors, writers, processors) from scratch
- Implement all required capabilities (discovery, doctor, connections)
- Ensure E2B cloud compatibility
- Package components for distribution
- Write comprehensive tests
- Validate against 57-rule checklist
Component Architecture
Core Abstractions
- Component (spec.yaml): Self-describing declarative specification
- Driver: Python implementation of operations
- Registry: Validates and serves component metadata
- Capabilities: discover, adHocAnalytics, inMemoryMove, streaming, bulkOperations, transactions, partitioning, customTransforms (all boolean flags)
- Modes: extract, write, transform, discover
Required Files Structure
my-component/
├── spec.yaml # Component specification with schemas
├── __init__.py # load_spec() function
├── driver.py # Driver implementation
├── requirements.txt # Dependencies with versions
├── tests/
│ ├── test_spec.py
│ ├── test_driver.py
│ └── test_e2e.py
└── README.md
Quick Start Workflow
When a user asks to create a component, follow this workflow:
1. Gather Requirements
Ask the user:
- What API/database are we connecting to?
- What operations are needed (extract/write/transform)?
- What authentication method?
- Any special capabilities (pagination, filtering, discovery)?
2. Create Project Structure
mkdir component-name
cd component-name
touch spec.yaml driver.py __init__.py requirements.txt README.md
mkdir tests
3. Write spec.yaml
Start with this template and customize:
name: "provider.component_type" # e.g., posthog.extractor
version: "1.0.0"
description: "Brief description"
author: "Author Name"
license: "Apache-2.0"
modes:
- extract # or write, transform, discover
capabilities:
discover: true # Supports schema/metadata discovery
adHocAnalytics: false # Supports ad-hoc analytical queries
inMemoryMove: false # Supports in-memory data movement
streaming: false # Supports streaming/incremental processing
bulkOperations: true # Supports bulk insert/update operations
transactions: false # Supports transactional operations
partitioning: false # Supports partitioned processing
customTransforms: false # Supports custom transformation logic
configSchema:
type: object
properties:
# Component-specific config
required: []
connectionSchema:
type: object
properties:
# Connection fields (host, api_key, etc.)
required: []
secrets:
- "/api_key" # JSON Pointers to sensitive fields
x-connection-fields:
- field: host
override: allowed # Can be overridden for testing
- field: api_key
override: forbidden # Cannot be overridden (security)
examples:
- name: "Example usage"
config: {}
connection: {}
4. Implement Driver
CRITICAL: Use this EXACT signature:
from typing import Any
import pandas as pd
class ProviderComponentDriver:
def run(self, *, step_id: str, config: dict, inputs: dict | None = None, ctx: Any = None) -> dict:
"""Execute component operation.
CRITICAL: Must use keyword-only params with *.
"""
try:
# 1. Get connection from resolved_connection (NEVER os.environ)
connection = config.get("resolved_connection", {})
if not connection:
raise ValueError("No connection configuration provided")
# 2. Validate required fields
for field in ["api_key"]: # List required fields
if field not in connection:
raise ValueError(f"Missing required field: {field}")
# 3. Perform operation
df = self._extract_data(connection, config)
# 4. Emit metrics (if context available)
if ctx and hasattr(ctx, "log_metric"):
ctx.log_metric(
name="rows_read", # or rows_written, rows_processed
value=len(df),
unit="rows",
tags={"step": step_id}
)
# 5. Return standardized output
return {"df": df}
finally:
# 6. Cleanup resources
pass
def discover(self, connection: dict, timeout: float = 10.0) -> dict:
"""Discover available resources (if capabilities.discover: true)."""
import hashlib
import json
from datetime import datetime
# Discover resources
resources = [] # List of available tables/endpoints
# CRITICAL: Sort for determinism
resources = sorted(resources, key=lambda x: x["name"])
# Calculate fingerprint
content = json.dumps(resources, sort_keys=True)
fingerprint = hashlib.sha256(content.encode()).hexdigest()
return {
"discovered_at": datetime.utcnow().isoformat() + "Z",
"resources": resources,
"fingerprint": fingerprint
}
def doctor(self, connection: dict, timeout: float = 2.0) -> tuple[bool, dict]:
"""Health check for connection (optional method for connection testing)."""
import time
start = time.time()
try:
# Test connection
self._test_connection(connection)
latency_ms = (time.time() - start) * 1000
return True, {
"status": "ok",
"latency_ms": latency_ms,
"message": "Connection successful"
}
except Exception as e:
# Map to standard categories: auth, network, permission, timeout, unknown
# NEVER leak secrets in error messages
return False, {
"status": "unknown",
"message": str(e).replace(connection.get("api_key", ""), "***")
}
5. Validate Against Checklist
Run through CHECKLIST.md - all 57 rules must pass:
- SPEC (10): Component name, version, schemas, secrets, examples
- CAP (4): Capabilities declaration matches implementation
- DISC (6): Discovery is deterministic and sorted
- CONN (4): Uses resolved_connection, validates fields
- LOG/MET (6): Emits metrics, never logs secrets
- DRIVER (6): Correct signature, returns DataFrame, no mutations
- HEALTH (3): Doctor returns tuple, standard error categories
- PKG (5): Has requirements.txt, no hardcoded paths (E2B)
- RETRY/DET (4): Idempotent operations
- AI (9): Clear descriptions, examples, error messages
6. Write Tests
# tests/test_validation.py
import inspect
from your_component.driver import YourDriver
def test_driver_signature():
"""Test driver has correct signature."""
driver = YourDriver()
sig = inspect.signature(driver.run)
params = list(sig.parameters.keys())
assert params == ["step_id", "config", "inputs", "ctx"]
# Check keyword-only
for param_name, param in sig.parameters.items():
assert param.kind == inspect.Parameter.KEYWORD_ONLY
def test_discovery_deterministic():
"""Test discovery returns same fingerprint."""
driver = YourDriver()
result1 = driver.discover(connection)
result2 = driver.discover(connection)
assert result1["fingerprint"] == result2["fingerprint"]
7. Package for Distribution
Option A: Python Package (Recommended)
Create pyproject.toml:
[build-system]
requires = ["setuptools>=61.0", "wheel"]
build-backend = "setuptools.build_meta"
[project]
name = "osiris-your-component"
version = "1.0.0"
dependencies = ["osiris>=0.5.4", "pandas", "requests"]
[project.entry-points."osiris.components"]
your_component = "your_package:load_spec"
[tool.setuptools.package-data]
"*" = ["*.yaml"]
Create __init__.py:
from pathlib import Path
import yaml
def load_spec():
"""Load component specification."""
spec_path = Path(__file__).parent / "spec.yaml"
with open(spec_path, "r") as f:
return yaml.safe_load(f)
Build and distribute:
python -m build
pip install dist/*.whl
Option B: Tarball
tar -czf component-1.0.0.tgz spec.yaml driver.py __init__.py requirements.txt tests/ README.md
Security Guidelines (CRITICAL)
- NEVER hardcode credentials
- NEVER log secrets (use masking)
- ALWAYS read from
config["resolved_connection"](NEVERos.environdirectly) - ALWAYS declare secrets in spec.yaml
- ALWAYS use x-connection-fields policies
- VALIDATE all inputs
- SANITIZE error messages (never leak secrets)
- TIMEOUT all network operations
E2B Cloud Compatibility (CRITICAL)
For components to work in E2B cloud sandbox:
- ❌ NO
Path.home()or hardcoded paths like/Users/... - ✅ USE
ctx.base_pathfor all file operations - ❌ NO reading from
os.environdirectly - ✅ USE
config["resolved_connection"] - ❌ NO system-level dependencies
- ✅ ALL dependencies in
requirements.txtwith versions
Common Patterns
REST API Extractor
def _extract_data(self, connection: dict, config: dict) -> pd.DataFrame:
import requests
response = requests.get(
f"{connection['host']}/api/{config['endpoint']}",
headers={"Authorization": f"Bearer {connection['api_key']}"},
params={"limit": config.get("limit", 1000)}
)
response.raise_for_status()
data = response.json()
return pd.DataFrame(data["results"])
Pagination
def _paginate_results(self, connection: dict, config: dict) -> pd.DataFrame:
all_data = []
next_cursor = None
while True:
params = {"limit": 100}
if next_cursor:
params["cursor"] = next_cursor
response = self._fetch_page(connection, params)
all_data.extend(response["data"])
next_cursor = response.get("next_cursor")
if not next_cursor:
break
return pd.DataFrame(all_data)
Database Writer
def _write_data(self, connection: dict, config: dict, df: pd.DataFrame) -> int:
from sqlalchemy import create_engine
engine = create_engine(
f"postgresql://{connection['user']}:{connection['password']}"
f"@{connection['host']}:{connection.get('port', 5432)}"
f"/{connection['database']}"
)
rows = df.to_sql(
config["table"],
engine,
if_exists=config.get("mode", "append"),
index=False
)
return rows
Additional Resources
For complete working example, see POSTHOG_EXAMPLE.md For full 57-rule checklist, see CHECKLIST.md For code templates and patterns, see TEMPLATES.md
Testing Commands
# Validate component
osiris component validate ./spec.yaml
# Test locally
osiris run test-pipeline.yaml
# Test in E2B sandbox
osiris run test-pipeline.yaml --e2b
# Check discovery
osiris discover your.component @connection.alias
# Health check
osiris doctor your.component @connection.alias
# Package and distribute
python -m build
twine upload dist/*
When NOT to Use This Skill
This skill is specifically for Osiris component development. Do NOT use for:
- General Python development
- Other ETL frameworks (Airflow, Luigi, etc.)
- Data analysis or ML tasks
- API client development unrelated to Osiris
References
- Osiris Docs: https://github.com/keboola/osiris
- JSON Schema: https://json-schema.org/draft/2020-12/json-schema-core
- E2B Sandbox: https://e2b.dev/docs