Claude Code Plugins

Community-maintained marketplace

Feedback

automation-workflows-skill

@Useforclaude/skills-claude
0
1

Master automation workflows for repetitive tasks using Make, n8n, Zapier, Python, and Bash. Use for content automation, video production pipelines, data processing, marketing workflows, AI content generation, batch operations, webhook integrations, API orchestration, and production-ready automation systems. Also use for Thai keywords "ทำให้อัตโนมัติ", "ระบบอัตโนมัติ", "อัตโนมัติ", "ประมวลผลหลายไฟล์", "batch process", "workflow", "ลำดับงาน", "ลดขั้นตอน", "ทำงานซ้ำ", "ไม่ต้องทำเอง", "Make.com", "n8n", "Zapier", "Python script", "Bash script", "ประหยัดเวลา", "ทำงานรวด", "ระบบทำงานเอง".

Install Skill

1Download skill
2Enable skills in Claude

Open claude.ai/settings/capabilities and find the "Skills" section

3Upload to Claude

Click "Upload skill" and select the downloaded ZIP file

Note: Please verify skill by going through its instructions before using it.

SKILL.md

name automation-workflows-skill
description Master automation workflows for repetitive tasks using Make, n8n, Zapier, Python, and Bash. Use for content automation, video production pipelines, data processing, marketing workflows, AI content generation, batch operations, webhook integrations, API orchestration, and production-ready automation systems. Also use for Thai keywords "ทำให้อัตโนมัติ", "ระบบอัตโนมัติ", "อัตโนมัติ", "ประมวลผลหลายไฟล์", "batch process", "workflow", "ลำดับงาน", "ลดขั้นตอน", "ทำงานซ้ำ", "ไม่ต้องทำเอง", "Make.com", "n8n", "Zapier", "Python script", "Bash script", "ประหยัดเวลา", "ทำงานรวด", "ระบบทำงานเอง".

🤖 Automation Workflows Skill - Production-Grade Task Automation

Version: 1.0 Last Updated: 2025-10-26 Expertise Level: Automation Engineer / DevOps Platforms: Make.com, n8n, Zapier, Python, Bash, GitHub Actions


📋 Table of Contents

  1. Platform Comparison
  2. Workflow Design Patterns
  3. Content Automation
  4. Video Production Automation
  5. Data Processing Automation
  6. Marketing Automation
  7. Development Automation
  8. API Integration Strategies
  9. Error Handling & Monitoring
  10. Security Best Practices
  11. Cost Optimization
  12. Production Deployment

🎯 Platform Comparison

Quick Reference Table

Platform Type Price Best For Pros Cons
Make.com Visual $9-299/mo Complex workflows Visual editor, powerful router Learning curve
n8n Open-source Free/Self-hosted Privacy, customization Self-hosted, unlimited Requires server setup
Zapier Cloud $20-599/mo Simple integrations Easy to use, 5000+ apps Expensive at scale
Python Code Free Custom logic Full control, unlimited Requires coding
Bash Code Free System operations Fast, native Limited complexity
GitHub Actions CI/CD Free tier Dev workflows Git integration GitHub-centric

When to Use Which Platform

Use Make.com when:

  • You need visual workflow builder
  • Complex branching/routing logic
  • Non-technical team members need access
  • Budget allows $9-50/month

Use n8n when:

  • Privacy/data sovereignty required
  • High-volume operations (cost-effective)
  • Need custom integrations
  • Have server infrastructure

Use Zapier when:

  • Simple trigger-action workflows
  • Need specific app integrations (Zapier has most)
  • Budget allows higher cost
  • No-code requirement

Use Python when:

  • Complex data processing
  • AI/ML integration required
  • Custom business logic
  • Need full control

Use Bash when:

  • File system operations
  • Command-line tool orchestration
  • Server maintenance
  • Simple sequential tasks

🎨 Workflow Design Patterns

1. Trigger-Action Pattern (Simple)

Structure:

Trigger → Action → Done

Example: New Email → Save to Database

# n8n workflow
Trigger: Gmail - New Email
Action: Database - Insert Row

Use cases:

  • Save form submissions to database
  • Send notification on new file upload
  • Log webhook events

2. Trigger-Filter-Action Pattern (Conditional)

Structure:

Trigger → Filter → Action A (if true)
                 → Action B (if false)

Example: New Lead → Qualify → Route to Sales/Marketing

# Python pseudo-code
def process_lead(lead):
    if lead['score'] >= 80:
        send_to_sales(lead)
    else:
        add_to_nurture_campaign(lead)

Use cases:

  • Lead scoring and routing
  • Content approval workflows
  • Error handling (retry vs alert)

3. Multi-Step Processing Pipeline

Structure:

Trigger → Step 1 → Step 2 → Step 3 → ... → Done

Example: Video Production Pipeline

# Bash pipeline
#!/bin/bash

# Step 1: Download video
youtube-dl "$VIDEO_URL" -o input.mp4

# Step 2: Transcribe with Whisper
whisper input.mp4 --model large-v3 --language th --output_format srt

# Step 3: Translate subtitles
python translate_srt.py input.srt output_en.srt

# Step 4: Synthesize TTS
python tts_from_srt.py output_en.srt audio_en.mp3

# Step 5: Replace audio
ffmpeg -i input.mp4 -i audio_en.mp3 -c:v copy -c:a aac -map 0:v:0 -map 1:a:0 output.mp4

# Step 6: Upload to storage
aws s3 cp output.mp4 s3://my-bucket/videos/

Use cases:

  • Video production automation
  • Data ETL pipelines
  • Multi-step content generation

4. Fan-Out Pattern (Parallel Processing)

Structure:

Trigger → Split → [Task 1, Task 2, Task 3] (parallel) → Merge → Done

Example: Generate Social Media Content for All Platforms

// Make.com scenario
Input: Blog post URL

Fan-out:
  - Generate Twitter thread (parallel)
  - Generate LinkedIn post (parallel)
  - Generate Instagram caption (parallel)
  - Generate Facebook post (parallel)

Merge: Collect all outputs
Action: Schedule to respective platforms

Use cases:

  • Multi-platform content distribution
  • Parallel API calls
  • Batch image processing

5. Event-Driven Architecture

Structure:

Event Source → Event Bus → Multiple Subscribers

Example: E-commerce Order Events

New Order Event →
  ├─ Inventory System (deduct stock)
  ├─ Email Service (send confirmation)
  ├─ Analytics (track conversion)
  ├─ Shipping (create label)
  └─ Accounting (record revenue)

Implementation (n8n + Webhook):

{
  "nodes": [
    {
      "name": "Webhook",
      "type": "n8n-nodes-base.webhook",
      "parameters": {
        "path": "new-order"
      }
    },
    {
      "name": "Update Inventory",
      "type": "n8n-nodes-base.httpRequest"
    },
    {
      "name": "Send Email",
      "type": "n8n-nodes-base.emailSend"
    },
    {
      "name": "Track Analytics",
      "type": "n8n-nodes-base.httpRequest"
    }
  ]
}

Use cases:

  • Microservices communication
  • Real-time notifications
  • Multi-system synchronization

📝 Content Automation

1. Automated Blog Post Generation (AI-Powered)

Workflow:

Google Sheets (topics) → OpenAI (generate) → WordPress (publish) → Social Media (share)

n8n Implementation:

{
  "nodes": [
    {
      "name": "Schedule Trigger",
      "type": "n8n-nodes-base.cron",
      "parameters": {
        "triggerTimes": {
          "item": [
            {
              "hour": 9,
              "minute": 0
            }
          ]
        }
      }
    },
    {
      "name": "Get Topic from Sheet",
      "type": "n8n-nodes-base.googleSheets",
      "parameters": {
        "operation": "read",
        "range": "A2:B2"
      }
    },
    {
      "name": "Generate Content (OpenAI)",
      "type": "n8n-nodes-base.openAi",
      "parameters": {
        "resource": "text",
        "operation": "complete",
        "prompt": "Write a 1000-word blog post about: {{$json[\"topic\"]}}",
        "model": "gpt-4"
      }
    },
    {
      "name": "Publish to WordPress",
      "type": "n8n-nodes-base.wordpress",
      "parameters": {
        "operation": "create",
        "title": "{{$json[\"title\"]}}",
        "content": "{{$json[\"content\"]}}"
      }
    },
    {
      "name": "Share on Twitter",
      "type": "n8n-nodes-base.twitter",
      "parameters": {
        "text": "New post: {{$json[\"title\"]}} {{$json[\"url\"]}}"
      }
    }
  ]
}

Python Alternative (More Control):

import openai
import requests
from datetime import datetime

# Configuration
OPENAI_API_KEY = "your-key"
WORDPRESS_URL = "https://yourblog.com"
WORDPRESS_AUTH = ("username", "password")

def generate_blog_post(topic):
    """Generate blog post using OpenAI."""
    openai.api_key = OPENAI_API_KEY

    prompt = f"""Write a comprehensive 1000-word blog post about: {topic}

    Include:
    - Engaging introduction
    - 3-5 main sections with headers
    - Practical examples
    - Conclusion with call-to-action

    Format in HTML."""

    response = openai.ChatCompletion.create(
        model="gpt-4",
        messages=[
            {"role": "system", "content": "You are an expert content writer."},
            {"role": "user", "content": prompt}
        ],
        max_tokens=2000
    )

    return response.choices[0].message.content

def publish_to_wordpress(title, content):
    """Publish post to WordPress via REST API."""
    url = f"{WORDPRESS_URL}/wp-json/wp/v2/posts"

    data = {
        "title": title,
        "content": content,
        "status": "publish"
    }

    response = requests.post(url, auth=WORDPRESS_AUTH, json=data)
    return response.json()

# Main workflow
if __name__ == "__main__":
    topics = [
        "10 Marketing Automation Strategies for 2025",
        "How to Build a Content Pipeline with AI",
        "Video Production Automation: Complete Guide"
    ]

    for topic in topics:
        print(f"Generating: {topic}")
        content = generate_blog_post(topic)

        result = publish_to_wordpress(topic, content)
        print(f"Published: {result['link']}")

2. Social Media Content Calendar Automation

Workflow:

Notion (content calendar) → Buffer/Hootsuite (schedule) → Analytics (track)

Make.com Scenario:

1. Trigger: Every Monday 9 AM
2. Get upcoming posts from Notion database
3. For each post:
   - Generate platform-specific variants (Twitter, LinkedIn, Instagram)
   - Generate images (Canva API or DALL-E)
   - Schedule to Buffer/Hootsuite
4. Send summary email to team

Python Implementation:

import requests
from datetime import datetime, timedelta
from notion_client import Client

# Configuration
NOTION_TOKEN = "your-notion-token"
NOTION_DATABASE_ID = "your-database-id"
BUFFER_TOKEN = "your-buffer-token"

# Initialize Notion
notion = Client(auth=NOTION_TOKEN)

def get_upcoming_posts():
    """Get posts scheduled for next 7 days."""
    today = datetime.now()
    next_week = today + timedelta(days=7)

    results = notion.databases.query(
        database_id=NOTION_DATABASE_ID,
        filter={
            "and": [
                {
                    "property": "Publish Date",
                    "date": {
                        "on_or_after": today.isoformat()
                    }
                },
                {
                    "property": "Publish Date",
                    "date": {
                        "before": next_week.isoformat()
                    }
                },
                {
                    "property": "Status",
                    "select": {
                        "equals": "Ready"
                    }
                }
            ]
        }
    )

    return results['results']

def adapt_for_platform(content, platform):
    """Adapt content for specific platform."""
    if platform == "twitter":
        # Limit to 280 chars, add hashtags
        return content[:250] + " #marketing #automation"

    elif platform == "linkedin":
        # More professional, longer form
        return f"{content}\n\nWhat are your thoughts? Share in comments."

    elif platform == "instagram":
        # Visual focus, hashtags
        return f"{content}\n\n📸✨\n\n#contentmarketing #automation #digitalmarketing"

    return content

def schedule_to_buffer(content, platform, scheduled_time):
    """Schedule post to Buffer."""
    url = "https://api.bufferapp.com/1/updates/create.json"

    data = {
        "access_token": BUFFER_TOKEN,
        "text": content,
        "profile_ids[]": get_profile_id(platform),
        "scheduled_at": int(scheduled_time.timestamp())
    }

    response = requests.post(url, data=data)
    return response.json()

# Main automation
posts = get_upcoming_posts()

for post in posts:
    title = post['properties']['Title']['title'][0]['plain_text']
    content = post['properties']['Content']['rich_text'][0]['plain_text']
    publish_date = post['properties']['Publish Date']['date']['start']

    # Schedule to all platforms
    platforms = ['twitter', 'linkedin', 'instagram']

    for platform in platforms:
        adapted_content = adapt_for_platform(content, platform)
        result = schedule_to_buffer(adapted_content, platform, publish_date)
        print(f"Scheduled {title} to {platform}: {result}")

3. Automated Newsletter Generation

Workflow:

RSS Feeds → Curate Top Articles → Generate Summary (AI) → Mailchimp → Send

Python Script:

import feedparser
import openai
from mailchimp_marketing import Client as MailchimpClient

# Configuration
OPENAI_API_KEY = "your-key"
MAILCHIMP_API_KEY = "your-key"
MAILCHIMP_SERVER = "us1"
MAILCHIMP_LIST_ID = "your-list-id"

# Initialize clients
openai.api_key = OPENAI_API_KEY
mailchimp = MailchimpClient()
mailchimp.set_config({
    "api_key": MAILCHIMP_API_KEY,
    "server": MAILCHIMP_SERVER
})

def fetch_articles_from_feeds(feeds, limit=10):
    """Fetch latest articles from RSS feeds."""
    articles = []

    for feed_url in feeds:
        feed = feedparser.parse(feed_url)

        for entry in feed.entries[:limit]:
            articles.append({
                "title": entry.title,
                "link": entry.link,
                "summary": entry.summary if hasattr(entry, 'summary') else '',
                "published": entry.published
            })

    # Sort by publish date, get top 5
    articles.sort(key=lambda x: x['published'], reverse=True)
    return articles[:5]

def generate_newsletter_content(articles):
    """Generate newsletter content using AI."""
    articles_text = "\n\n".join([
        f"Title: {a['title']}\nLink: {a['link']}\nSummary: {a['summary']}"
        for a in articles
    ])

    prompt = f"""Create an engaging newsletter from these articles:

{articles_text}

Format:
- Catchy introduction paragraph
- For each article: Title (linked), 2-sentence summary, why it matters
- Closing paragraph with CTA

Use friendly, enthusiastic tone. Output in HTML."""

    response = openai.ChatCompletion.create(
        model="gpt-4",
        messages=[
            {"role": "system", "content": "You are a newsletter editor."},
            {"role": "user", "content": prompt}
        ]
    )

    return response.choices[0].message.content

def send_newsletter(subject, html_content):
    """Send newsletter via Mailchimp."""
    campaign = mailchimp.campaigns.create({
        "type": "regular",
        "recipients": {"list_id": MAILCHIMP_LIST_ID},
        "settings": {
            "subject_line": subject,
            "from_name": "Your Newsletter",
            "reply_to": "newsletter@yourdomain.com"
        }
    })

    mailchimp.campaigns.set_content(campaign['id'], {
        "html": html_content
    })

    mailchimp.campaigns.send(campaign['id'])
    return campaign

# Main automation
if __name__ == "__main__":
    # Define RSS feeds to monitor
    feeds = [
        "https://techcrunch.com/feed/",
        "https://www.theverge.com/rss/index.xml",
        "https://feeds.arstechnica.com/arstechnica/index"
    ]

    # Fetch and curate
    articles = fetch_articles_from_feeds(feeds)

    # Generate newsletter
    content = generate_newsletter_content(articles)

    # Send
    from datetime import datetime
    subject = f"Your Weekly Tech Digest - {datetime.now().strftime('%B %d, %Y')}"
    campaign = send_newsletter(subject, content)

    print(f"Newsletter sent! Campaign ID: {campaign['id']}")

🎬 Video Production Automation

1. Complete Video Dubbing Pipeline

Workflow:

Input Video → Transcribe (Whisper) → Translate → TTS (Edge-TTS) → Audio Sync → Replace Audio → Output

Full Production Script:

#!/bin/bash
# video_dubbing_pipeline.sh
# Complete automation for video dubbing

set -e  # Exit on error

# ============================================
# Configuration
# ============================================
INPUT_VIDEO="$1"
SOURCE_LANG="${2:-th}"  # Default: Thai
TARGET_LANG="${3:-en}"  # Default: English
OUTPUT_DIR="output"
TEMP_DIR="temp"

# Voices
TTS_VOICE_MALE="en-US-GuyNeural"
TTS_VOICE_FEMALE="en-US-JennyNeural"

# ============================================
# Setup
# ============================================
mkdir -p "$OUTPUT_DIR" "$TEMP_DIR"

echo "🎬 Starting Video Dubbing Pipeline"
echo "=================================="
echo "Input: $INPUT_VIDEO"
echo "Source Language: $SOURCE_LANG"
echo "Target Language: $TARGET_LANG"
echo ""

# ============================================
# Step 1: Extract Audio
# ============================================
echo "📥 Step 1/6: Extracting audio..."
ffmpeg -i "$INPUT_VIDEO" \
  -vn -acodec pcm_s16le \
  -ar 44100 -ac 2 \
  "$TEMP_DIR/original_audio.wav" \
  -y -loglevel error

echo "✅ Audio extracted"

# ============================================
# Step 2: Transcribe with Whisper
# ============================================
echo "🎤 Step 2/6: Transcribing with Whisper..."
whisper "$TEMP_DIR/original_audio.wav" \
  --model large-v3 \
  --language "$SOURCE_LANG" \
  --output_format srt \
  --output_dir "$TEMP_DIR" \
  --word_timestamps True

mv "$TEMP_DIR/original_audio.srt" "$TEMP_DIR/source.srt"
echo "✅ Transcription complete"

# ============================================
# Step 3: Translate Subtitles
# ============================================
echo "🌐 Step 3/6: Translating subtitles..."
python3 <<EOF
import re
from deep_translator import GoogleTranslator

def parse_srt(filepath):
    with open(filepath, 'r', encoding='utf-8') as f:
        content = f.read()

    pattern = r'(\d+)\n(\d{2}:\d{2}:\d{2},\d{3}) --> (\d{2}:\d{2}:\d{2},\d{3})\n(.*?)(?=\n\n|\Z)'
    matches = re.findall(pattern, content, re.DOTALL)

    subtitles = []
    for match in matches:
        subtitles.append({
            'index': int(match[0]),
            'start': match[1],
            'end': match[2],
            'text': match[3].strip()
        })

    return subtitles

def translate_srt(source_path, target_path, source_lang, target_lang):
    translator = GoogleTranslator(source=source_lang, target=target_lang)
    subtitles = parse_srt(source_path)

    with open(target_path, 'w', encoding='utf-8') as f:
        for sub in subtitles:
            translated_text = translator.translate(sub['text'])

            f.write(f"{sub['index']}\n")
            f.write(f"{sub['start']} --> {sub['end']}\n")
            f.write(f"{translated_text}\n\n")

translate_srt('$TEMP_DIR/source.srt', '$TEMP_DIR/target.srt', '$SOURCE_LANG', '$TARGET_LANG')
print("✅ Translation complete")
EOF

# ============================================
# Step 4: Synthesize TTS Audio
# ============================================
echo "🔊 Step 4/6: Generating TTS audio..."
python3 <<EOF
import asyncio
import edge_tts
from datetime import datetime, timedelta
import re

def parse_timestamp(ts):
    """Convert SRT timestamp to seconds."""
    h, m, s_ms = ts.split(':')
    s, ms = s_ms.split(',')
    total_seconds = int(h) * 3600 + int(m) * 60 + int(s) + int(ms) / 1000
    return total_seconds

def parse_srt(filepath):
    with open(filepath, 'r', encoding='utf-8') as f:
        content = f.read()

    pattern = r'(\d+)\n(\d{2}:\d{2}:\d{2},\d{3}) --> (\d{2}:\d{2}:\d{2},\d{3})\n(.*?)(?=\n\n|\Z)'
    matches = re.findall(pattern, content, re.DOTALL)

    subtitles = []
    for match in matches:
        subtitles.append({
            'index': int(match[0]),
            'start_time': parse_timestamp(match[1]),
            'end_time': parse_timestamp(match[2]),
            'text': match[3].strip()
        })

    return subtitles

async def synthesize_segment(text, output_path, voice):
    communicate = edge_tts.Communicate(text, voice)
    await communicate.save(output_path)

async def synthesize_all_segments(srt_path, voice):
    subtitles = parse_srt(srt_path)

    tasks = []
    for sub in subtitles:
        output_path = f"$TEMP_DIR/segment_{sub['index']:04d}.mp3"
        tasks.append(synthesize_segment(sub['text'], output_path, voice))

    await asyncio.gather(*tasks)

# Run synthesis
asyncio.run(synthesize_all_segments('$TEMP_DIR/target.srt', '$TTS_VOICE_MALE'))
print("✅ TTS synthesis complete")
EOF

# ============================================
# Step 5: Merge Audio Segments with Timeline Sync
# ============================================
echo "🔗 Step 5/6: Merging audio with timeline sync..."
python3 <<EOF
import subprocess
import re
from pathlib import Path

def parse_timestamp(ts):
    h, m, s_ms = ts.split(':')
    s, ms = s_ms.split(',')
    return int(h) * 3600 + int(m) * 60 + int(s) + int(ms) / 1000

def parse_srt(filepath):
    with open(filepath, 'r', encoding='utf-8') as f:
        content = f.read()

    pattern = r'(\d+)\n(\d{2}:\d{2}:\d{2},\d{3}) --> (\d{2}:\d{2}:\d{2},\d{3})\n(.*?)(?=\n\n|\Z)'
    matches = re.findall(pattern, content, re.DOTALL)

    return [(int(m[0]), parse_timestamp(m[1]), parse_timestamp(m[2])) for m in matches]

def get_audio_duration(filepath):
    cmd = ['ffprobe', '-v', 'error', '-show_entries', 'format=duration',
           '-of', 'default=noprint_wrappers=1:nokey=1', str(filepath)]
    result = subprocess.run(cmd, capture_output=True, text=True)
    return float(result.stdout.strip())

# Parse SRT for timeline
segments = parse_srt('$TEMP_DIR/target.srt')

# Build concat file with silence
concat_file = '$TEMP_DIR/concat.txt'
with open(concat_file, 'w') as f:
    current_time = 0

    for idx, start_time, end_time in segments:
        segment_path = f'$TEMP_DIR/segment_{idx:04d}.mp3'

        # Add silence if there's a gap
        if start_time > current_time:
            silence_duration = start_time - current_time
            f.write(f"file 'silence_{idx}.mp3'\n")

            # Generate silence
            subprocess.run([
                'ffmpeg', '-f', 'lavfi', '-i', f'anullsrc=r=44100:cl=stereo',
                '-t', str(silence_duration), '-y',
                f'$TEMP_DIR/silence_{idx}.mp3'
            ], capture_output=True)

        # Add segment
        f.write(f"file 'segment_{idx:04d}.mp3'\n")

        # Update current time
        audio_duration = get_audio_duration(segment_path)
        current_time = start_time + audio_duration

# Concatenate all segments
subprocess.run([
    'ffmpeg', '-f', 'concat', '-safe', '0',
    '-i', concat_file,
    '-c', 'copy', '-y',
    '$TEMP_DIR/merged_audio.mp3'
], capture_output=True)

print("✅ Audio merged with timeline sync")
EOF

# ============================================
# Step 6: Replace Video Audio
# ============================================
echo "🎥 Step 6/6: Replacing video audio..."
OUTPUT_FILE="$OUTPUT_DIR/$(basename "$INPUT_VIDEO" .mp4)_dubbed_$TARGET_LANG.mp4"

ffmpeg -i "$INPUT_VIDEO" \
  -i "$TEMP_DIR/merged_audio.mp3" \
  -c:v copy \
  -c:a aac \
  -b:a 192k \
  -map 0:v:0 \
  -map 1:a:0 \
  -shortest \
  -y \
  "$OUTPUT_FILE" \
  -loglevel error

echo "✅ Video audio replaced"

# ============================================
# Cleanup
# ============================================
echo ""
echo "🧹 Cleaning up temporary files..."
rm -rf "$TEMP_DIR"

# ============================================
# Summary
# ============================================
echo ""
echo "🎉 Pipeline Complete!"
echo "=================================="
echo "Output: $OUTPUT_FILE"

# Get file size
FILE_SIZE=$(du -h "$OUTPUT_FILE" | cut -f1)
echo "Size: $FILE_SIZE"

# Get duration
DURATION=$(ffprobe -v error -show_entries format=duration \
  -of default=noprint_wrappers=1:nokey=1 "$OUTPUT_FILE" | cut -d. -f1)
echo "Duration: ${DURATION}s"
echo ""
echo "✅ All done!"

Usage:

chmod +x video_dubbing_pipeline.sh
./video_dubbing_pipeline.sh input.mp4 th en

2. Batch Video Processing (100+ Videos)

Scenario: Process entire YouTube channel for dubbing

Workflow:

Get video list → Download (parallel) → Process each → Upload to storage → Update database

Python Orchestrator:

import subprocess
import asyncio
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
import yt_dlp
import boto3

# Configuration
MAX_PARALLEL = 4
S3_BUCKET = "dubbed-videos"
OUTPUT_BUCKET = "output"

def download_video(url, output_dir):
    """Download video using yt-dlp."""
    ydl_opts = {
        'format': 'best',
        'outtmpl': f'{output_dir}/%(id)s.%(ext)s',
    }

    with yt_dlp.YoutubeDL(ydl_opts) as ydl:
        info = ydl.extract_info(url, download=True)
        filepath = ydl.prepare_filename(info)

    return filepath

def process_video(video_path, source_lang, target_lang):
    """Process single video through dubbing pipeline."""
    cmd = [
        './video_dubbing_pipeline.sh',
        video_path,
        source_lang,
        target_lang
    ]

    result = subprocess.run(cmd, capture_output=True, text=True)

    if result.returncode != 0:
        raise Exception(f"Processing failed: {result.stderr}")

    # Get output path (last line of stdout)
    output_path = result.stdout.strip().split('\n')[-1].split(': ')[-1]
    return output_path

def upload_to_s3(filepath, bucket, key):
    """Upload file to S3."""
    s3 = boto3.client('s3')
    s3.upload_file(filepath, bucket, key)
    return f"s3://{bucket}/{key}"

async def process_video_async(url, source_lang, target_lang):
    """Process single video (async wrapper)."""
    try:
        # Download
        print(f"⬇️  Downloading: {url}")
        video_path = download_video(url, "downloads")

        # Process
        print(f"⚙️  Processing: {video_path}")
        output_path = process_video(video_path, source_lang, target_lang)

        # Upload
        print(f"⬆️  Uploading: {output_path}")
        key = Path(output_path).name
        s3_url = upload_to_s3(output_path, S3_BUCKET, key)

        print(f"✅ Complete: {s3_url}")
        return {"url": url, "output": s3_url, "status": "success"}

    except Exception as e:
        print(f"❌ Failed: {url} - {str(e)}")
        return {"url": url, "error": str(e), "status": "failed"}

async def process_batch(urls, source_lang, target_lang, max_parallel=4):
    """Process multiple videos with concurrency limit."""
    semaphore = asyncio.Semaphore(max_parallel)

    async def bounded_process(url):
        async with semaphore:
            return await process_video_async(url, source_lang, target_lang)

    tasks = [bounded_process(url) for url in urls]
    results = await asyncio.gather(*tasks)

    return results

# Main execution
if __name__ == "__main__":
    # Example: Process 100 videos
    video_urls = [
        "https://youtube.com/watch?v=xxx",
        "https://youtube.com/watch?v=yyy",
        # ... 98 more URLs
    ]

    results = asyncio.run(process_batch(
        video_urls,
        source_lang="th",
        target_lang="en",
        max_parallel=MAX_PARALLEL
    ))

    # Summary
    success_count = sum(1 for r in results if r['status'] == 'success')
    failed_count = len(results) - success_count

    print(f"\n📊 Batch Processing Summary")
    print(f"===========================")
    print(f"Total: {len(results)}")
    print(f"Success: {success_count}")
    print(f"Failed: {failed_count}")

    # Save results
    import json
    with open('batch_results.json', 'w') as f:
        json.dump(results, f, indent=2)

3. Automated Video Editing (Template-Based)

Use Case: Generate 100 product demo videos from templates

Tools: FFmpeg + Python + Template System

Template Structure:

[Intro 5s] → [Product Shot 10s] → [Features 15s] → [CTA 5s] → [Outro 5s]

Python Script:

import subprocess
from pathlib import Path
import json

class VideoTemplate:
    def __init__(self, template_config):
        self.config = template_config

    def render(self, data, output_path):
        """Render video from template and data."""
        segments = []

        # Intro
        intro_clip = self.create_intro(data['product_name'])
        segments.append(intro_clip)

        # Product shot
        product_clip = self.overlay_text(
            video_path=data['product_video'],
            text=data['product_name'],
            duration=10
        )
        segments.append(product_clip)

        # Features (dynamic)
        for idx, feature in enumerate(data['features']):
            feature_clip = self.create_feature_slide(
                feature['title'],
                feature['description'],
                feature['icon']
            )
            segments.append(feature_clip)

        # CTA
        cta_clip = self.create_cta(data['cta_text'], data['cta_url'])
        segments.append(cta_clip)

        # Outro
        outro_clip = self.config['outro_template']
        segments.append(outro_clip)

        # Concatenate all
        self.concatenate_segments(segments, output_path)

        return output_path

    def create_intro(self, product_name):
        """Generate intro with animated text."""
        output = f"temp/intro_{product_name}.mp4"

        cmd = f"""
        ffmpeg -f lavfi -i color=c=black:s=1920x1080:d=5 \
          -vf "drawtext=text='{product_name}':fontsize=72:fontcolor=white:x=(w-text_w)/2:y=(h-text_h)/2:enable='between(t,1,4)'" \
          -y {output}
        """

        subprocess.run(cmd, shell=True, capture_output=True)
        return output

    def overlay_text(self, video_path, text, duration):
        """Overlay text on video."""
        output = f"temp/overlay_{Path(video_path).stem}.mp4"

        cmd = f"""
        ffmpeg -i {video_path} -t {duration} \
          -vf "drawtext=text='{text}':fontsize=48:fontcolor=white:x=50:y=50:box=1:boxcolor=black@0.5:boxborderw=10" \
          -y {output}
        """

        subprocess.run(cmd, shell=True, capture_output=True)
        return output

    def create_feature_slide(self, title, description, icon_path):
        """Create feature slide with icon and text."""
        output = f"temp/feature_{title.replace(' ', '_')}.mp4"

        # Use FFmpeg complex filter to composite icon + text
        cmd = f"""
        ffmpeg -f lavfi -i color=c=#1a1a2e:s=1920x1080:d=5 \
          -i {icon_path} \
          -filter_complex "[1]scale=200:-1[icon];[0][icon]overlay=100:300[bg];[bg]drawtext=text='{title}':fontsize=64:fontcolor=white:x=350:y=300[titled];[titled]drawtext=text='{description}':fontsize=32:fontcolor=gray:x=350:y=400" \
          -y {output}
        """

        subprocess.run(cmd, shell=True, capture_output=True)
        return output

    def create_cta(self, cta_text, cta_url):
        """Create call-to-action slide."""
        output = f"temp/cta.mp4"

        cmd = f"""
        ffmpeg -f lavfi -i color=c=#ff6b6b:s=1920x1080:d=5 \
          -vf "drawtext=text='{cta_text}':fontsize=72:fontcolor=white:x=(w-text_w)/2:y=400,drawtext=text='{cta_url}':fontsize=48:fontcolor=white:x=(w-text_w)/2:y=600" \
          -y {output}
        """

        subprocess.run(cmd, shell=True, capture_output=True)
        return output

    def concatenate_segments(self, segments, output_path):
        """Concatenate all segments into final video."""
        concat_file = "temp/concat.txt"

        with open(concat_file, 'w') as f:
            for segment in segments:
                f.write(f"file '{Path(segment).absolute()}'\n")

        cmd = f"""
        ffmpeg -f concat -safe 0 -i {concat_file} \
          -c:v libx264 -preset medium -crf 23 \
          -y {output_path}
        """

        subprocess.run(cmd, shell=True, capture_output=True)

# Usage
template = VideoTemplate({
    'outro_template': 'assets/outro.mp4'
})

# Generate 100 product videos
products = [
    {
        'product_name': 'Product A',
        'product_video': 'footage/product_a.mp4',
        'features': [
            {'title': 'Fast', 'description': '10x faster processing', 'icon': 'icons/speed.png'},
            {'title': 'Secure', 'description': 'End-to-end encryption', 'icon': 'icons/lock.png'},
        ],
        'cta_text': 'Get Started Today!',
        'cta_url': 'www.product-a.com'
    },
    # ... 99 more products
]

for idx, product in enumerate(products):
    output_path = f"output/product_{idx+1:03d}.mp4"
    template.render(product, output_path)
    print(f"✅ Generated: {output_path}")

📊 Data Processing Automation

1. Web Scraping Pipeline

Workflow:

Cron → Scrape Websites → Clean Data → Save to Database → Send Alert

Python Script (with Selenium for dynamic content):

from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.options import Options
from bs4 import BeautifulSoup
import pandas as pd
from datetime import datetime
import psycopg2
import smtplib
from email.mime.text import MIMEText

# Configuration
DATABASE_URL = "postgresql://user:pass@localhost/db"
SMTP_CONFIG = {
    'host': 'smtp.gmail.com',
    'port': 587,
    'username': 'your@email.com',
    'password': 'your-password'
}

def scrape_website(url):
    """Scrape dynamic website using Selenium."""
    # Setup headless Chrome
    chrome_options = Options()
    chrome_options.add_argument("--headless")
    chrome_options.add_argument("--no-sandbox")
    chrome_options.add_argument("--disable-dev-shm-usage")

    driver = webdriver.Chrome(options=chrome_options)

    try:
        driver.get(url)
        driver.implicitly_wait(10)  # Wait for JS to load

        # Extract data
        products = []
        elements = driver.find_elements(By.CLASS_NAME, "product-card")

        for element in elements:
            try:
                title = element.find_element(By.CLASS_NAME, "title").text
                price = element.find_element(By.CLASS_NAME, "price").text
                rating = element.find_element(By.CLASS_NAME, "rating").text
                link = element.find_element(By.TAG_NAME, "a").get_attribute("href")

                products.append({
                    'title': title,
                    'price': float(price.replace('$', '').replace(',', '')),
                    'rating': float(rating),
                    'link': link,
                    'scraped_at': datetime.now()
                })
            except Exception as e:
                print(f"Error extracting product: {e}")
                continue

        return products

    finally:
        driver.quit()

def clean_data(products):
    """Clean and validate scraped data."""
    df = pd.DataFrame(products)

    # Remove duplicates
    df.drop_duplicates(subset=['link'], inplace=True)

    # Remove invalid prices
    df = df[df['price'] > 0]

    # Normalize ratings to 0-5 scale
    df['rating'] = df['rating'].clip(0, 5)

    return df

def save_to_database(df):
    """Save data to PostgreSQL."""
    conn = psycopg2.connect(DATABASE_URL)
    cursor = conn.cursor()

    for _, row in df.iterrows():
        cursor.execute("""
            INSERT INTO products (title, price, rating, link, scraped_at)
            VALUES (%s, %s, %s, %s, %s)
            ON CONFLICT (link) DO UPDATE SET
                price = EXCLUDED.price,
                rating = EXCLUDED.rating,
                scraped_at = EXCLUDED.scraped_at
        """, (row['title'], row['price'], row['rating'], row['link'], row['scraped_at']))

    conn.commit()
    cursor.close()
    conn.close()

def send_alert(subject, message):
    """Send email alert."""
    msg = MIMEText(message)
    msg['Subject'] = subject
    msg['From'] = SMTP_CONFIG['username']
    msg['To'] = 'recipient@email.com'

    server = smtplib.SMTP(SMTP_CONFIG['host'], SMTP_CONFIG['port'])
    server.starttls()
    server.login(SMTP_CONFIG['username'], SMTP_CONFIG['password'])
    server.send_message(msg)
    server.quit()

# Main automation
if __name__ == "__main__":
    urls = [
        "https://example.com/products",
        "https://competitor.com/items",
    ]

    all_products = []

    for url in urls:
        print(f"Scraping: {url}")
        products = scrape_website(url)
        all_products.extend(products)

    # Clean data
    df = clean_data(all_products)

    # Save to database
    save_to_database(df)

    # Send summary email
    summary = f"""
    Scraping Complete
    =================
    Total products: {len(df)}
    Average price: ${df['price'].mean():.2f}
    Highest rated: {df.loc[df['rating'].idxmax(), 'title']}
    """

    send_alert("Daily Scraping Report", summary)
    print("✅ Pipeline complete!")

Schedule with Cron:

# Run every day at 2 AM
0 2 * * * cd /path/to/project && python3 scrape_pipeline.py

2. API Data Aggregation

Use Case: Aggregate data from multiple APIs (Twitter, Reddit, News)

n8n Workflow:

{
  "nodes": [
    {
      "name": "Schedule",
      "type": "n8n-nodes-base.cron",
      "parameters": {
        "triggerTimes": {"item": [{"hour": 6}]}
      }
    },
    {
      "name": "Fetch Twitter",
      "type": "n8n-nodes-base.twitter",
      "parameters": {
        "operation": "search",
        "searchText": "#automation"
      }
    },
    {
      "name": "Fetch Reddit",
      "type": "n8n-nodes-base.reddit",
      "parameters": {
        "resource": "subreddit",
        "operation": "getPosts",
        "subreddit": "automation"
      }
    },
    {
      "name": "Fetch News",
      "type": "n8n-nodes-base.httpRequest",
      "parameters": {
        "url": "https://newsapi.org/v2/everything?q=automation"
      }
    },
    {
      "name": "Merge Data",
      "type": "n8n-nodes-base.merge",
      "parameters": {
        "mode": "append"
      }
    },
    {
      "name": "Analyze Sentiment (AI)",
      "type": "n8n-nodes-base.openAi",
      "parameters": {
        "operation": "complete",
        "prompt": "Analyze sentiment (positive/negative/neutral): {{$json.text}}"
      }
    },
    {
      "name": "Save to Airtable",
      "type": "n8n-nodes-base.airtable",
      "parameters": {
        "operation": "append"
      }
    }
  ]
}

📈 Marketing Automation

1. Lead Scoring & Routing

Workflow:

New Lead (Webhook) → Calculate Score → If > 80 → Send to Sales
                                     → If 50-80 → Nurture Campaign
                                     → If < 50 → Newsletter Only

Make.com Scenario:

Trigger: Webhook (new lead from form)

Router:
  Route 1 (Score >= 80):
    - Create deal in CRM (HubSpot)
    - Assign to sales rep
    - Send Slack notification to sales channel
    - Send personalized email from sales rep

  Route 2 (Score 50-79):
    - Add to nurture campaign (Mailchimp)
    - Send lead magnet email
    - Tag as "warm lead"

  Route 3 (Score < 50):
    - Add to newsletter list
    - Tag as "cold lead"

All routes:
  - Log to Google Sheets
  - Update analytics dashboard

Python Implementation:

def calculate_lead_score(lead):
    """Calculate lead score based on attributes."""
    score = 0

    # Company size
    if lead.get('company_size', '') == 'Enterprise (1000+)':
        score += 30
    elif lead.get('company_size', '') == 'Mid-market (100-1000)':
        score += 20
    elif lead.get('company_size', '') == 'SMB (10-100)':
        score += 10

    # Job title
    if any(title in lead.get('job_title', '').lower() for title in ['ceo', 'cto', 'vp', 'director']):
        score += 25
    elif 'manager' in lead.get('job_title', '').lower():
        score += 15

    # Industry
    if lead.get('industry') in ['Technology', 'SaaS', 'E-commerce']:
        score += 20

    # Engagement
    if lead.get('email_opens', 0) > 5:
        score += 10
    if lead.get('website_visits', 0) > 3:
        score += 10
    if lead.get('downloaded_resources', 0) > 0:
        score += 15

    return min(score, 100)  # Cap at 100

def route_lead(lead):
    """Route lead based on score."""
    score = calculate_lead_score(lead)
    lead['score'] = score

    if score >= 80:
        # Hot lead - send to sales
        create_deal_in_crm(lead)
        assign_to_sales_rep(lead)
        send_slack_alert(f"🔥 Hot lead: {lead['name']} (Score: {score})")

    elif score >= 50:
        # Warm lead - nurture campaign
        add_to_nurture_campaign(lead)
        send_lead_magnet(lead)

    else:
        # Cold lead - newsletter
        add_to_newsletter(lead)

    # Log all leads
    log_to_database(lead)

    return lead

2. Abandoned Cart Recovery

Workflow:

Cart Abandoned → Wait 1 hour → Send Email 1 (gentle reminder)
              → Wait 24 hours → Send Email 2 (discount offer)
              → Wait 48 hours → Send Email 3 (last chance)

n8n Workflow:

{
  "nodes": [
    {
      "name": "Webhook - Cart Abandoned",
      "type": "n8n-nodes-base.webhook"
    },
    {
      "name": "Wait 1 Hour",
      "type": "n8n-nodes-base.wait",
      "parameters": {"amount": 1, "unit": "hours"}
    },
    {
      "name": "Check if Purchased",
      "type": "n8n-nodes-base.httpRequest",
      "parameters": {
        "url": "https://api.yourstore.com/orders/{{$json.user_id}}"
      }
    },
    {
      "name": "If Not Purchased",
      "type": "n8n-nodes-base.if",
      "parameters": {
        "conditions": {
          "boolean": [
            {"value1": "{{$json.purchased}}", "value2": false}
          ]
        }
      }
    },
    {
      "name": "Send Email 1",
      "type": "n8n-nodes-base.emailSend",
      "parameters": {
        "subject": "You left something in your cart!",
        "text": "Complete your purchase..."
      }
    },
    {
      "name": "Wait 24 Hours",
      "type": "n8n-nodes-base.wait",
      "parameters": {"amount": 24, "unit": "hours"}
    },
    {
      "name": "Send Email 2 (Discount)",
      "type": "n8n-nodes-base.emailSend",
      "parameters": {
        "subject": "10% off your cart!",
        "text": "Use code SAVE10..."
      }
    },
    {
      "name": "Wait 48 Hours",
      "type": "n8n-nodes-base.wait",
      "parameters": {"amount": 48, "unit": "hours"}
    },
    {
      "name": "Send Email 3 (Final)",
      "type": "n8n-nodes-base.emailSend",
      "parameters": {
        "subject": "Last chance - cart expiring soon!",
        "text": "Final reminder..."
      }
    }
  ]
}

🔐 Security Best Practices

1. API Key Management

❌ Never Do This:

# Hardcoded API key - DANGEROUS!
OPENAI_API_KEY = "sk-1234567890abcdef"

✅ Use Environment Variables:

import os
from dotenv import load_dotenv

load_dotenv()  # Load from .env file

OPENAI_API_KEY = os.getenv('OPENAI_API_KEY')
if not OPENAI_API_KEY:
    raise ValueError("OPENAI_API_KEY not found in environment")

.env file:

OPENAI_API_KEY=sk-1234567890abcdef
DATABASE_URL=postgresql://user:pass@localhost/db
AWS_ACCESS_KEY_ID=AKIA...
AWS_SECRET_ACCESS_KEY=...

.gitignore:

.env
secrets/
*.key

2. Webhook Security

Verify webhook signatures:

import hmac
import hashlib

def verify_webhook_signature(payload, signature, secret):
    """Verify webhook came from trusted source."""
    expected_signature = hmac.new(
        secret.encode(),
        payload.encode(),
        hashlib.sha256
    ).hexdigest()

    return hmac.compare_digest(expected_signature, signature)

# Usage in Flask
from flask import Flask, request, abort

app = Flask(__name__)

@app.route('/webhook', methods=['POST'])
def webhook():
    payload = request.get_data(as_text=True)
    signature = request.headers.get('X-Hub-Signature-256', '')

    if not verify_webhook_signature(payload, signature, WEBHOOK_SECRET):
        abort(403, 'Invalid signature')

    # Process webhook
    data = request.json
    process_event(data)

    return {'status': 'success'}

3. Rate Limiting

Prevent API abuse:

from ratelimit import limits, sleep_and_retry
from requests import Session

# Max 10 calls per minute
@sleep_and_retry
@limits(calls=10, period=60)
def call_api(url):
    """Rate-limited API call."""
    response = requests.get(url)
    return response.json()

# Usage
for url in urls:
    data = call_api(url)  # Automatically rate-limited

4. Error Handling & Retry Logic

Exponential backoff:

import time
from functools import wraps

def retry_with_backoff(retries=3, backoff_in_seconds=1):
    """Retry decorator with exponential backoff."""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            x = 0
            while True:
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    if x == retries:
                        raise

                    wait = backoff_in_seconds * 2 ** x
                    print(f"Error: {e}. Retrying in {wait}s...")
                    time.sleep(wait)
                    x += 1
        return wrapper
    return decorator

# Usage
@retry_with_backoff(retries=5, backoff_in_seconds=2)
def fetch_data_from_api(url):
    response = requests.get(url)
    response.raise_for_status()
    return response.json()

📊 Monitoring & Logging

1. Structured Logging

Use JSON logs for automation:

import logging
import json
from datetime import datetime

class JSONFormatter(logging.Formatter):
    def format(self, record):
        log_data = {
            'timestamp': datetime.utcnow().isoformat(),
            'level': record.levelname,
            'message': record.getMessage(),
            'module': record.module,
            'function': record.funcName,
        }

        if record.exc_info:
            log_data['exception'] = self.formatException(record.exc_info)

        return json.dumps(log_data)

# Setup logger
logger = logging.getLogger(__name__)
handler = logging.StreamHandler()
handler.setFormatter(JSONFormatter())
logger.addHandler(handler)
logger.setLevel(logging.INFO)

# Usage
logger.info("Automation started", extra={'workflow_id': '123'})
logger.error("API call failed", extra={'url': 'https://api.example.com'})

2. Monitoring Dashboard (Prometheus + Grafana)

Expose metrics:

from prometheus_client import Counter, Histogram, start_http_server
import time

# Define metrics
workflow_runs = Counter('workflow_runs_total', 'Total workflow executions', ['status'])
workflow_duration = Histogram('workflow_duration_seconds', 'Workflow execution time')

# Start metrics server
start_http_server(8000)

# Instrument your automation
@workflow_duration.time()
def run_workflow():
    try:
        # Your workflow logic
        time.sleep(2)

        workflow_runs.labels(status='success').inc()
        return {'status': 'success'}

    except Exception as e:
        workflow_runs.labels(status='failed').inc()
        raise

Grafana dashboard:

- Total workflow runs (success vs failed)
- Workflow duration (p50, p95, p99)
- Error rate over time
- API call latency

💰 Cost Optimization

1. Choose Right Tool for Volume

Cost comparison (1M operations/month):

Make.com: $299/mo (Premium plan, 100K ops) × 10 = $2,990/mo
n8n (self-hosted): $50/mo (VPS) + $0 (unlimited ops) = $50/mo
Python (serverless): AWS Lambda free tier (1M requests) = $0

Recommendation:
- < 10K ops/mo: Zapier/Make (ease of use)
- 10K - 100K ops/mo: Make.com or n8n
- > 100K ops/mo: n8n (self-hosted) or Python

2. Batch Operations

❌ Inefficient - 1 API call per item:

for item in items:  # 1000 items
    api.create(item)  # 1000 API calls = expensive

✅ Efficient - Batch API calls:

# Batch 100 items per call
batch_size = 100
for i in range(0, len(items), batch_size):
    batch = items[i:i+batch_size]
    api.create_batch(batch)  # 10 API calls = 100x cheaper

3. Cache Results

Save API costs with caching:

from functools import lru_cache
import redis

# In-memory cache (simple)
@lru_cache(maxsize=1000)
def get_user_data(user_id):
    # Expensive API call
    return api.get_user(user_id)

# Redis cache (distributed)
redis_client = redis.Redis(host='localhost', port=6379)

def get_cached(key, fetch_func, ttl=3600):
    """Get from cache or fetch and cache."""
    cached = redis_client.get(key)

    if cached:
        return json.loads(cached)

    # Fetch fresh data
    data = fetch_func()

    # Cache for 1 hour
    redis_client.setex(key, ttl, json.dumps(data))

    return data

# Usage
user = get_cached(f'user:{user_id}', lambda: api.get_user(user_id))

🚀 Production Deployment

1. Docker Containerization

Dockerfile for automation:

FROM python:3.11-slim

WORKDIR /app

# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Install FFmpeg (for video automation)
RUN apt-get update && apt-get install -y ffmpeg && rm -rf /var/lib/apt/lists/*

# Copy application
COPY . .

# Run automation
CMD ["python", "automation.py"]

docker-compose.yml:

version: '3.8'

services:
  automation:
    build: .
    environment:
      - OPENAI_API_KEY=${OPENAI_API_KEY}
      - DATABASE_URL=${DATABASE_URL}
    volumes:
      - ./data:/app/data
      - ./logs:/app/logs
    restart: unless-stopped

  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"

  postgres:
    image: postgres:15-alpine
    environment:
      POSTGRES_PASSWORD: ${DB_PASSWORD}
    volumes:
      - postgres_data:/var/lib/postgresql/data

volumes:
  postgres_data:

2. GitHub Actions CI/CD

.github/workflows/deploy.yml:

name: Deploy Automation

on:
  push:
    branches: [main]

jobs:
  test:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v3

      - name: Set up Python
        uses: actions/setup-python@v4
        with:
          python-version: '3.11'

      - name: Install dependencies
        run: |
          pip install -r requirements.txt
          pip install pytest

      - name: Run tests
        run: pytest

  deploy:
    needs: test
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v3

      - name: Build Docker image
        run: docker build -t automation:latest .

      - name: Deploy to server
        run: |
          docker-compose down
          docker-compose up -d

3. Scheduled Workflows

Cron + Logging:

#!/bin/bash
# run_automation.sh

# Configuration
LOG_DIR="/var/log/automation"
DATE=$(date +%Y-%m-%d_%H-%M-%S)
LOG_FILE="$LOG_DIR/automation_$DATE.log"

# Create log directory
mkdir -p "$LOG_DIR"

# Run automation with logging
{
    echo "==================================="
    echo "Automation started at $(date)"
    echo "==================================="

    python3 /path/to/automation.py

    EXIT_CODE=$?

    echo "==================================="
    echo "Automation finished at $(date)"
    echo "Exit code: $EXIT_CODE"
    echo "==================================="

    # Send alert if failed
    if [ $EXIT_CODE -ne 0 ]; then
        echo "Automation failed!" | mail -s "Automation Error" admin@example.com
    fi

} 2>&1 | tee "$LOG_FILE"

# Cleanup old logs (keep last 30 days)
find "$LOG_DIR" -name "automation_*.log" -mtime +30 -delete

Crontab:

# Run every day at 2 AM
0 2 * * * /path/to/run_automation.sh

# Run every hour
0 * * * * /path/to/hourly_automation.sh

# Run every 15 minutes
*/15 * * * * /path/to/frequent_automation.sh

🎓 Advanced Patterns

1. State Machine Workflow

Use for complex multi-step processes:

from enum import Enum

class OrderState(Enum):
    CREATED = "created"
    PAID = "paid"
    PROCESSING = "processing"
    SHIPPED = "shipped"
    DELIVERED = "delivered"
    CANCELLED = "cancelled"

class OrderStateMachine:
    def __init__(self, order_id):
        self.order_id = order_id
        self.state = OrderState.CREATED

    def transition(self, event):
        """Handle state transitions."""
        transitions = {
            (OrderState.CREATED, 'payment_received'): self._handle_payment,
            (OrderState.PAID, 'start_processing'): self._handle_processing,
            (OrderState.PROCESSING, 'shipped'): self._handle_shipment,
            (OrderState.SHIPPED, 'delivered'): self._handle_delivery,
        }

        handler = transitions.get((self.state, event))
        if handler:
            handler()
        else:
            raise ValueError(f"Invalid transition: {self.state} + {event}")

    def _handle_payment(self):
        # Send confirmation email
        send_email(self.order_id, "Payment confirmed")

        # Update inventory
        deduct_inventory(self.order_id)

        # Update state
        self.state = OrderState.PAID

    def _handle_processing(self):
        # Create shipping label
        create_label(self.order_id)

        # Notify warehouse
        notify_warehouse(self.order_id)

        self.state = OrderState.PROCESSING

    # ... other handlers

📚 Resources & Tools

Learning Resources

Tools & Platforms

  • Make.com - Visual automation builder
  • n8n - Open-source workflow automation
  • Zapier - No-code automation (5000+ apps)
  • Integromat - Advanced automation (now Make.com)
  • Apache Airflow - Data pipeline orchestration
  • Prefect - Modern workflow orchestration
  • Temporal - Durable execution framework

APIs & Integrations

  • RapidAPI - 40,000+ APIs marketplace
  • Postman - API testing and automation
  • Webhook.site - Test webhooks instantly
  • Pipedream - Serverless integration platform

✅ Production Checklist

Before deploying automation to production:

Planning:

  • Defined clear success criteria
  • Documented workflow diagram
  • Identified failure points
  • Planned rollback strategy

Security:

  • API keys in environment variables (not hardcoded)
  • Webhook signature verification
  • Rate limiting implemented
  • Input validation on all external data

Error Handling:

  • Try-catch blocks on all API calls
  • Retry logic with exponential backoff
  • Dead letter queue for failed items
  • Alerts for critical errors

Monitoring:

  • Logging (JSON structured logs)
  • Metrics (success rate, duration, errors)
  • Alerts (email/Slack for failures)
  • Dashboard (Grafana/custom)

Testing:

  • Unit tests for core logic
  • Integration tests with real APIs (sandbox)
  • Load testing for high-volume scenarios
  • Manual testing end-to-end

Documentation:

  • README with setup instructions
  • Workflow diagram
  • API documentation
  • Runbook for common issues

Cost Optimization:

  • Batch operations where possible
  • Caching for repeated data
  • Right-sized compute resources
  • Monitoring costs (set budgets/alerts)

🎯 Quick Start Examples

Example 1: Daily Content Automation (5 minutes setup)

Goal: Auto-post daily motivation quote to Twitter

Setup (Make.com):

1. New scenario
2. Add Schedule trigger (every day 9 AM)
3. Add HTTP module → GET https://api.quotable.io/random
4. Add Twitter module → Post tweet
   Text: "{{content}}" - {{author}}
5. Activate

Done! Auto-posts every morning.


Example 2: Video Subtitle Generation (10 minutes)

Goal: Auto-generate SRT subtitles for MP4 videos

Bash script:

#!/bin/bash
# auto_subtitle.sh

VIDEO="$1"

# Extract audio
ffmpeg -i "$VIDEO" -vn -acodec pcm_s16le -ar 44100 audio.wav -y

# Transcribe
whisper audio.wav --model medium --output_format srt

# Cleanup
rm audio.wav

echo "✅ Subtitles: ${VIDEO%.mp4}.srt"

Usage:

chmod +x auto_subtitle.sh
./auto_subtitle.sh myvideo.mp4

Example 3: Form to Spreadsheet (No code, 2 minutes)

Goal: Save Google Form responses to Sheet + send email

Setup (Zapier):

1. Trigger: Google Forms - New Response
2. Action: Google Sheets - Create Row
3. Action: Gmail - Send Email
   To: {{email}}
   Subject: Thanks for your submission!
4. Turn on

Done! Fully automated.


🤝 Contributing & Community

Want to improve this skill? Contribute automation examples, fix errors, or suggest new patterns!

Community:


Last Updated: 2025-10-26 Version: 1.0 License: MIT Author: Claude Code Skills Team

🤖 This skill is production-ready and maintained. Happy automating!


🔧 CODING ULTIMATE STACK: Must Load Together

This skill is Layer 3: Implementation of THE CODING ULTIMATE STACK system.

Same Layer (Implementation - Load All 8):

  • python-best-practices-skill - Pythonic code, PEP 8, type hints
  • javascript-modern-skill - ES6+, async/await, modern JS
  • code-quality-standards-skill - Clean code, SOLID, refactoring, code smells
  • document-conversion-skill - MD → PDF, HTML → PDF, Pandoc
  • odoo-development-skill - ERP development, Odoo customization
  • excel-expert-skill - Data manipulation, advanced Excel
  • ffmpeg-video-processing-skill - Video processing pipelines

Next Layer (Quality & Testing - Load 3-5):

  • code-quality-standards-skill - Clean code, SOLID, refactoring, code smells
  • debug-methodology-skill - Codex systematic debugging, trace execution
  • security-best-practices-skill - OWASP, authentication, security audit
  • git-safety-skill - Safe version control, branching strategies

Deployment Layer (Load 2-3):

  • git-safety-skill - Safe version control, branching strategies
  • automation-workflows-skill - Workflow automation, batch processing
  • security-best-practices-skill - OWASP, authentication, security audit

Auto-Loading Modes:

  • Default Stack (12 skills): Triggers on "code", "เขียนโค้ด", "programming"
  • Aggressive Stack (18 skills): Triggers on "architecture", "scalability", "รีแฟคเตอร์"
  • Ultimate Stack (25 skills): Triggers on "ultimate stack", "production-ready", "ช่วยเต็มที่"

Pro Workflow:

  1. Novice: Use this skill alone → Basic implementation
  2. Intermediate: This + 2-3 same-layer skills → 2-3x quality
  3. Expert: Full Layer 3 + all layers → Production-grade code

Power Level: This skill + full stack = 800/1000 (maximum development expertise)