Claude Code Plugins

Community-maintained marketplace

Feedback

AI Workflow Orchestrator

@Krosebrook/source-of-truth-monorepo
1
0

Expert guidance for building AI-powered workflows with n8n, Zapier, and custom orchestration systems. Use when automating workflows, integrating AI agents, or building no-code/low-code automation.

Install Skill

1Download skill
2Enable skills in Claude

Open claude.ai/settings/capabilities and find the "Skills" section

3Upload to Claude

Click "Upload skill" and select the downloaded ZIP file

Note: Please verify skill by going through its instructions before using it.

SKILL.md

name AI Workflow Orchestrator
description Expert guidance for building AI-powered workflows with n8n, Zapier, and custom orchestration systems. Use when automating workflows, integrating AI agents, or building no-code/low-code automation.
version 1.0.0
allowed-tools Read, Write, Edit, Bash

AI Workflow Orchestrator

Enterprise AI workflow automation with n8n, Zapier, and custom orchestration.

n8n Workflow Patterns

AI Agent Workflow

{
  "nodes": [
    {
      "name": "Webhook",
      "type": "n8n-nodes-base.webhook",
      "typeVersion": 1,
      "position": [250, 300],
      "webhookId": "user-query"
    },
    {
      "name": "OpenAI Chat",
      "type": "@n8n/n8n-nodes-langchain.lmChatOpenAi",
      "typeVersion": 1,
      "position": [450, 300],
      "parameters": {
        "model": "gpt-4-turbo",
        "temperature": 0.7,
        "systemMessage": "You are a helpful assistant."
      }
    },
    {
      "name": "Vector Store Query",
      "type": "@n8n/n8n-nodes-langchain.vectorStorePinecone",
      "typeVersion": 1,
      "position": [450, 150],
      "parameters": {
        "operation": "retrieve",
        "topK": 5
      }
    },
    {
      "name": "Response Formatter",
      "type": "n8n-nodes-base.function",
      "typeVersion": 1,
      "position": [650, 300],
      "parameters": {
        "functionCode": "return items.map(item => ({\n  json: {\n    response: item.json.response,\n    sources: item.json.sources,\n    confidence: item.json.confidence\n  }\n}));"
      }
    }
  ],
  "connections": {
    "Webhook": {
      "main": [[{ "node": "Vector Store Query", "type": "main", "index": 0 }]]
    },
    "Vector Store Query": {
      "main": [[{ "node": "OpenAI Chat", "type": "main", "index": 0 }]]
    },
    "OpenAI Chat": {
      "main": [[{ "node": "Response Formatter", "type": "main", "index": 0 }]]
    }
  }
}

Multi-Agent Orchestration

// n8n Custom Node for Agent Orchestration
import { IExecuteFunctions } from 'n8n-core';
import { INodeExecutionData, INodeType, INodeTypeDescription } from 'n8n-workflow';

export class MultiAgentOrchestrator implements INodeType {
  description: INodeTypeDescription = {
    displayName: 'Multi-Agent Orchestrator',
    name: 'multiAgentOrchestrator',
    group: ['transform'],
    version: 1,
    description: 'Orchestrate multiple AI agents',
    defaults: {
      name: 'Multi-Agent Orchestrator',
    },
    inputs: ['main'],
    outputs: ['main'],
    properties: [
      {
        displayName: 'Agents',
        name: 'agents',
        type: 'fixedCollection',
        typeOptions: {
          multipleValues: true,
        },
        default: {},
        options: [
          {
            name: 'agentValues',
            displayName: 'Agent',
            values: [
              {
                displayName: 'Agent Name',
                name: 'name',
                type: 'string',
                default: '',
              },
              {
                displayName: 'Agent Type',
                name: 'type',
                type: 'options',
                options: [
                  { name: 'Researcher', value: 'researcher' },
                  { name: 'Writer', value: 'writer' },
                  { name: 'Reviewer', value: 'reviewer' },
                ],
                default: 'researcher',
              },
            ],
          },
        ],
      },
    ],
  };

  async execute(this: IExecuteFunctions): Promise<INodeExecutionData[][]> {
    const items = this.getInputData();
    const returnData: INodeExecutionData[] = [];

    for (let itemIndex = 0; itemIndex < items.length; itemIndex++) {
      const input = this.getNodeParameter('input', itemIndex, '') as string;
      const agents = this.getNodeParameter('agents', itemIndex, {}) as any;

      const results: any[] = [];

      for (const agent of agents.agentValues || []) {
        const result = await this.executeAgent(agent.type, input);
        results.push({ agent: agent.name, result });
      }

      returnData.push({
        json: {
          input,
          results,
          summary: this.summarizeResults(results),
        },
      });
    }

    return [returnData];
  }

  private async executeAgent(type: string, input: string): Promise<string> {
    // Agent execution logic
    return `Result from ${type} agent`;
  }

  private summarizeResults(results: any[]): string {
    return results.map(r => r.result).join(' ');
  }
}

Zapier Integration Patterns

AI-Powered Email Automation

// Zapier Custom Code Action
const inputData = inputData || {};
const { email_content, sender } = inputData;

// Call OpenAI API
const response = await fetch('https://api.openai.com/v1/chat/completions', {
  method: 'POST',
  headers: {
    'Authorization': `Bearer ${process.env.OPENAI_API_KEY}`,
    'Content-Type': 'application/json',
  },
  body: JSON.stringify({
    model: 'gpt-4-turbo',
    messages: [
      {
        role: 'system',
        content: 'You are an email assistant. Categorize and draft responses.',
      },
      {
        role: 'user',
        content: `Categorize and draft a response to this email:\n\n${email_content}`,
      },
    ],
  }),
});

const data = await response.json();
const ai_response = data.choices[0].message.content;

// Parse AI response
const category = ai_response.match(/Category: (.*)/)?.[1] || 'General';
const draft_response = ai_response.match(/Draft:([\s\S]*)/)?.[1]?.trim() || '';

output = {
  category,
  draft_response,
  priority: category === 'Urgent' ? 'high' : 'normal',
};

Custom Orchestration System

Workflow Engine

from typing import Dict, List, Callable, Any
from pydantic import BaseModel
import asyncio

class WorkflowStep(BaseModel):
    name: str
    function: str
    inputs: Dict[str, str] = {}
    condition: str | None = None

class Workflow(BaseModel):
    name: str
    steps: List[WorkflowStep]

class WorkflowEngine:
    def __init__(self):
        self.functions: Dict[str, Callable] = {}
        self.context: Dict[str, Any] = {}

    def register_function(self, name: str, func: Callable):
        """Register a function that can be used in workflows."""
        self.functions[name] = func

    async def execute_workflow(self, workflow: Workflow, initial_context: Dict[str, Any]):
        """Execute a workflow with the given context."""
        self.context = initial_context.copy()

        for step in workflow.steps:
            # Check condition
            if step.condition and not self._evaluate_condition(step.condition):
                continue

            # Prepare inputs
            inputs = {
                key: self._resolve_value(value)
                for key, value in step.inputs.items()
            }

            # Execute function
            func = self.functions.get(step.function)
            if not func:
                raise ValueError(f"Function not found: {step.function}")

            result = await func(**inputs) if asyncio.iscoroutinefunction(func) else func(**inputs)

            # Store result
            self.context[step.name] = result

        return self.context

    def _resolve_value(self, value: str) -> Any:
        """Resolve context variables in string values."""
        if value.startswith('$'):
            return self.context.get(value[1:])
        return value

    def _evaluate_condition(self, condition: str) -> bool:
        """Evaluate a condition expression."""
        return eval(condition, {}, self.context)

# Usage
engine = WorkflowEngine()

# Register AI functions
async def analyze_sentiment(text: str) -> str:
    # Call AI API
    return "positive"

async def generate_response(sentiment: str, text: str) -> str:
    # Call AI API
    return f"Response based on {sentiment} sentiment"

engine.register_function('analyze_sentiment', analyze_sentiment)
engine.register_function('generate_response', generate_response)

# Define workflow
workflow = Workflow(
    name="Email Response",
    steps=[
        WorkflowStep(
            name="sentiment",
            function="analyze_sentiment",
            inputs={"text": "$email_content"}
        ),
        WorkflowStep(
            name="response",
            function="generate_response",
            inputs={"sentiment": "$sentiment", "text": "$email_content"},
            condition="sentiment in ['positive', 'neutral']"
        ),
    ]
)

# Execute
result = await engine.execute_workflow(
    workflow,
    {"email_content": "Thank you for your help!"}
)

Agent Chain Pattern

interface Agent {
  name: string;
  execute: (input: any) => Promise<any>;
}

class AgentChain {
  private agents: Agent[] = [];

  add(agent: Agent): this {
    this.agents.push(agent);
    return this;
  }

  async execute(initialInput: any): Promise<any> {
    let result = initialInput;

    for (const agent of this.agents) {
      console.log(`Executing ${agent.name}...`);
      result = await agent.execute(result);
    }

    return result;
  }
}

// Usage
const chain = new AgentChain()
  .add({
    name: 'Researcher',
    execute: async (query: string) => {
      // Research logic
      return { query, sources: ['source1', 'source2'] };
    },
  })
  .add({
    name: 'Summarizer',
    execute: async (data: any) => {
      // Summarization logic
      return { ...data, summary: 'Summary of research' };
    },
  })
  .add({
    name: 'Formatter',
    execute: async (data: any) => {
      // Formatting logic
      return {
        title: data.query,
        content: data.summary,
        references: data.sources,
      };
    },
  });

const result = await chain.execute('What is quantum computing?');

Integration Patterns

Webhook Handlers

from fastapi import FastAPI, Request
from pydantic import BaseModel

app = FastAPI()

class ZapierWebhook(BaseModel):
    event: str
    data: dict

@app.post("/zapier/webhook")
async def handle_zapier_webhook(webhook: ZapierWebhook):
    """Handle incoming Zapier webhooks."""
    if webhook.event == "new_lead":
        await process_lead(webhook.data)
    elif webhook.event == "support_ticket":
        await process_ticket(webhook.data)

    return {"status": "processed"}

@app.post("/n8n/webhook/{workflow_id}")
async def handle_n8n_webhook(workflow_id: str, request: Request):
    """Handle incoming n8n webhooks."""
    data = await request.json()
    result = await execute_workflow(workflow_id, data)
    return result

Task Queues

from celery import Celery
from typing import Dict

celery = Celery('tasks', broker='redis://localhost:6379')

@celery.task
def process_with_ai(data: Dict) -> Dict:
    """Process data with AI agent in background."""
    result = ai_agent.process(data)
    notify_completion(result)
    return result

# Trigger from workflow
process_with_ai.delay({"text": "Process this"})

Best Practices

✅ Use idempotent operations ✅ Implement error handling and retries ✅ Add logging and monitoring ✅ Use task queues for long-running operations ✅ Implement rate limiting ✅ Version your workflows ✅ Test workflows thoroughly ✅ Use environment variables for secrets ✅ Implement rollback mechanisms ✅ Monitor workflow performance


When to Use: AI workflow automation, n8n/Zapier integrations, multi-agent orchestration, business process automation.