Claude Code Plugins

Community-maintained marketplace

Feedback

streaming-patterns

@mindmorass/reflex
0
0

Live streaming patterns for YouTube, Twitch, and OBS. Use when setting up live streams, configuring stream keys, RTMP workflows, multi-platform streaming, or real-time broadcast automation.

Install Skill

1Download skill
2Enable skills in Claude

Open claude.ai/settings/capabilities and find the "Skills" section

3Upload to Claude

Click "Upload skill" and select the downloaded ZIP file

Note: Please verify skill by going through its instructions before using it.

SKILL.md

name streaming-patterns
description Live streaming patterns for YouTube, Twitch, and OBS. Use when setting up live streams, configuring stream keys, RTMP workflows, multi-platform streaming, or real-time broadcast automation.

Live Streaming Patterns

Best practices for live streaming to YouTube, Twitch, and other platforms.

Platform Configuration

YouTube Live

from googleapiclient.discovery import build
from google.oauth2.credentials import Credentials

def create_youtube_broadcast(
    credentials: Credentials,
    title: str,
    description: str,
    scheduled_start: str,
    privacy: str = "unlisted"
):
    """Create a YouTube live broadcast."""
    youtube = build('youtube', 'v3', credentials=credentials)

    # Create broadcast
    broadcast = youtube.liveBroadcasts().insert(
        part="snippet,status,contentDetails",
        body={
            "snippet": {
                "title": title,
                "description": description,
                "scheduledStartTime": scheduled_start
            },
            "status": {
                "privacyStatus": privacy,
                "selfDeclaredMadeForKids": False
            },
            "contentDetails": {
                "enableAutoStart": True,
                "enableAutoStop": True,
                "enableDvr": True,
                "recordFromStart": True
            }
        }
    ).execute()

    # Create stream
    stream = youtube.liveStreams().insert(
        part="snippet,cdn",
        body={
            "snippet": {
                "title": f"Stream for {title}"
            },
            "cdn": {
                "frameRate": "60fps",
                "ingestionType": "rtmp",
                "resolution": "1080p"
            }
        }
    ).execute()

    # Bind stream to broadcast
    youtube.liveBroadcasts().bind(
        part="id,contentDetails",
        id=broadcast['id'],
        streamId=stream['id']
    ).execute()

    return {
        "broadcast_id": broadcast['id'],
        "stream_key": stream['cdn']['ingestionInfo']['streamName'],
        "rtmp_url": stream['cdn']['ingestionInfo']['ingestionAddress'],
        "watch_url": f"https://youtube.com/watch?v={broadcast['id']}"
    }


def transition_broadcast(credentials: Credentials, broadcast_id: str, status: str):
    """Transition broadcast status: testing, live, complete."""
    youtube = build('youtube', 'v3', credentials=credentials)

    return youtube.liveBroadcasts().transition(
        broadcastStatus=status,
        id=broadcast_id,
        part="status"
    ).execute()

Twitch

import requests

class TwitchAPI:
    def __init__(self, client_id: str, access_token: str):
        self.client_id = client_id
        self.access_token = access_token
        self.base_url = "https://api.twitch.tv/helix"
        self.headers = {
            "Client-ID": client_id,
            "Authorization": f"Bearer {access_token}"
        }

    def get_stream_key(self, broadcaster_id: str) -> str:
        """Get stream key for broadcaster."""
        response = requests.get(
            f"{self.base_url}/streams/key",
            headers=self.headers,
            params={"broadcaster_id": broadcaster_id}
        )
        return response.json()['data'][0]['stream_key']

    def update_stream_info(
        self,
        broadcaster_id: str,
        title: str,
        game_id: str = None,
        language: str = "en"
    ):
        """Update stream title and category."""
        data = {
            "broadcaster_id": broadcaster_id,
            "title": title,
            "broadcaster_language": language
        }
        if game_id:
            data["game_id"] = game_id

        return requests.patch(
            f"{self.base_url}/channels",
            headers=self.headers,
            json=data
        )

    def get_stream_status(self, user_login: str) -> dict:
        """Check if channel is live."""
        response = requests.get(
            f"{self.base_url}/streams",
            headers=self.headers,
            params={"user_login": user_login}
        )
        data = response.json()['data']
        return data[0] if data else None

    def create_clip(self, broadcaster_id: str) -> dict:
        """Create clip from live stream."""
        response = requests.post(
            f"{self.base_url}/clips",
            headers=self.headers,
            params={"broadcaster_id": broadcaster_id}
        )
        return response.json()['data'][0]

RTMP Streaming

FFmpeg RTMP Push

# Stream to YouTube
ffmpeg -re -i input.mp4 \
    -c:v libx264 -preset veryfast -maxrate 4500k -bufsize 9000k \
    -pix_fmt yuv420p -g 60 \
    -c:a aac -b:a 160k -ar 44100 \
    -f flv "rtmp://a.rtmp.youtube.com/live2/YOUR_STREAM_KEY"

# Stream to Twitch
ffmpeg -re -i input.mp4 \
    -c:v libx264 -preset veryfast -maxrate 6000k -bufsize 12000k \
    -pix_fmt yuv420p -g 60 \
    -c:a aac -b:a 160k -ar 44100 \
    -f flv "rtmp://live.twitch.tv/app/YOUR_STREAM_KEY"

# Stream desktop (macOS)
ffmpeg -f avfoundation -framerate 30 -i "1:0" \
    -c:v libx264 -preset ultrafast -tune zerolatency \
    -c:a aac -b:a 128k \
    -f flv "rtmp://destination/stream_key"

# Stream desktop (Linux)
ffmpeg -f x11grab -framerate 30 -video_size 1920x1080 -i :0.0 \
    -f pulse -i default \
    -c:v libx264 -preset ultrafast -tune zerolatency \
    -c:a aac -b:a 128k \
    -f flv "rtmp://destination/stream_key"

Multi-Platform Streaming

# Using tee muxer to stream to multiple platforms
ffmpeg -re -i input.mp4 \
    -c:v libx264 -preset veryfast -b:v 4500k \
    -c:a aac -b:a 160k \
    -f tee -map 0:v -map 0:a \
    "[f=flv]rtmp://a.rtmp.youtube.com/live2/YT_KEY|\
     [f=flv]rtmp://live.twitch.tv/app/TWITCH_KEY|\
     [f=flv]rtmp://live-api-s.facebook.com:443/rtmp/FB_KEY"

Python RTMP Handler

import subprocess
from dataclasses import dataclass
from typing import List, Optional

@dataclass
class StreamDestination:
    name: str
    rtmp_url: str
    stream_key: str

    @property
    def full_url(self) -> str:
        return f"{self.rtmp_url}/{self.stream_key}"


class MultiStreamer:
    def __init__(
        self,
        input_source: str,
        destinations: List[StreamDestination],
        video_bitrate: str = "4500k",
        audio_bitrate: str = "160k"
    ):
        self.input_source = input_source
        self.destinations = destinations
        self.video_bitrate = video_bitrate
        self.audio_bitrate = audio_bitrate
        self.process: Optional[subprocess.Popen] = None

    def build_command(self) -> List[str]:
        """Build FFmpeg command for multi-platform streaming."""
        cmd = [
            "ffmpeg",
            "-re", "-i", self.input_source,
            "-c:v", "libx264",
            "-preset", "veryfast",
            "-b:v", self.video_bitrate,
            "-maxrate", self.video_bitrate,
            "-bufsize", str(int(self.video_bitrate[:-1]) * 2) + "k",
            "-pix_fmt", "yuv420p",
            "-g", "60",
            "-c:a", "aac",
            "-b:a", self.audio_bitrate,
            "-ar", "44100"
        ]

        if len(self.destinations) == 1:
            cmd.extend(["-f", "flv", self.destinations[0].full_url])
        else:
            # Use tee muxer for multiple destinations
            tee_outputs = "|".join(
                f"[f=flv]{dest.full_url}" for dest in self.destinations
            )
            cmd.extend([
                "-f", "tee",
                "-map", "0:v", "-map", "0:a",
                tee_outputs
            ])

        return cmd

    def start(self):
        """Start streaming."""
        cmd = self.build_command()
        self.process = subprocess.Popen(
            cmd,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE
        )

    def stop(self):
        """Stop streaming."""
        if self.process:
            self.process.terminate()
            self.process.wait()

OBS WebSocket Integration

import obswebsocket
from obswebsocket import obsws, requests as obs_requests

class OBSController:
    def __init__(self, host: str = "localhost", port: int = 4455, password: str = ""):
        self.ws = obsws(host, port, password)

    def connect(self):
        self.ws.connect()

    def disconnect(self):
        self.ws.disconnect()

    def start_streaming(self):
        """Start OBS streaming."""
        self.ws.call(obs_requests.StartStream())

    def stop_streaming(self):
        """Stop OBS streaming."""
        self.ws.call(obs_requests.StopStream())

    def start_recording(self):
        """Start OBS recording."""
        self.ws.call(obs_requests.StartRecord())

    def stop_recording(self):
        """Stop OBS recording."""
        self.ws.call(obs_requests.StopRecord())

    def switch_scene(self, scene_name: str):
        """Switch to a different scene."""
        self.ws.call(obs_requests.SetCurrentProgramScene(sceneName=scene_name))

    def get_scenes(self) -> list:
        """Get list of available scenes."""
        response = self.ws.call(obs_requests.GetSceneList())
        return [scene['sceneName'] for scene in response.getScenes()]

    def set_source_visibility(self, scene: str, source: str, visible: bool):
        """Show or hide a source in a scene."""
        self.ws.call(obs_requests.SetSceneItemEnabled(
            sceneName=scene,
            sceneItemId=self._get_source_id(scene, source),
            sceneItemEnabled=visible
        ))

    def _get_source_id(self, scene: str, source: str) -> int:
        """Get source ID by name."""
        response = self.ws.call(obs_requests.GetSceneItemId(
            sceneName=scene,
            sourceName=source
        ))
        return response.getSceneItemId()

    def set_stream_settings(self, server: str, key: str):
        """Update stream settings."""
        self.ws.call(obs_requests.SetStreamServiceSettings(
            streamServiceType="rtmp_common",
            streamServiceSettings={
                "server": server,
                "key": key
            }
        ))

Stream Automation

Scheduled Stream

import asyncio
from datetime import datetime, timedelta
from typing import Callable

class StreamScheduler:
    def __init__(self):
        self.scheduled_streams = []

    async def schedule_stream(
        self,
        start_time: datetime,
        duration: timedelta,
        start_callback: Callable,
        stop_callback: Callable
    ):
        """Schedule a stream for a specific time."""
        now = datetime.now()
        delay = (start_time - now).total_seconds()

        if delay > 0:
            await asyncio.sleep(delay)

        # Start stream
        await start_callback()

        # Wait for duration
        await asyncio.sleep(duration.total_seconds())

        # Stop stream
        await stop_callback()


# Usage
async def main():
    scheduler = StreamScheduler()
    obs = OBSController()
    obs.connect()

    start_time = datetime.now() + timedelta(minutes=5)
    duration = timedelta(hours=2)

    await scheduler.schedule_stream(
        start_time=start_time,
        duration=duration,
        start_callback=lambda: obs.start_streaming(),
        stop_callback=lambda: obs.stop_streaming()
    )

Chat Bot Integration

from twitchio.ext import commands

class StreamBot(commands.Bot):
    def __init__(self, token: str, prefix: str, channels: list):
        super().__init__(token=token, prefix=prefix, initial_channels=channels)
        self.obs = OBSController()
        self.obs.connect()

    async def event_ready(self):
        print(f'Bot is ready | {self.nick}')

    async def event_message(self, message):
        if message.echo:
            return
        await self.handle_commands(message)

    @commands.command(name='scene')
    async def scene_command(self, ctx, scene_name: str):
        """Switch OBS scene via chat command."""
        if ctx.author.is_mod:
            try:
                self.obs.switch_scene(scene_name)
                await ctx.send(f"Switched to scene: {scene_name}")
            except Exception as e:
                await ctx.send(f"Error switching scene: {e}")

    @commands.command(name='brb')
    async def brb_command(self, ctx):
        """Switch to BRB scene."""
        if ctx.author.is_mod:
            self.obs.switch_scene("BRB")
            await ctx.send("Be right back!")

    @commands.command(name='back')
    async def back_command(self, ctx):
        """Switch back to main scene."""
        if ctx.author.is_mod:
            self.obs.switch_scene("Main")
            await ctx.send("We're back!")

Stream Quality Presets

from dataclasses import dataclass
from enum import Enum

class StreamQuality(Enum):
    LOW = "480p"
    MEDIUM = "720p"
    HIGH = "1080p"
    ULTRA = "1440p"

@dataclass
class EncodingPreset:
    resolution: str
    video_bitrate: str
    audio_bitrate: str
    framerate: int
    preset: str

QUALITY_PRESETS = {
    StreamQuality.LOW: EncodingPreset(
        resolution="854x480",
        video_bitrate="1500k",
        audio_bitrate="96k",
        framerate=30,
        preset="veryfast"
    ),
    StreamQuality.MEDIUM: EncodingPreset(
        resolution="1280x720",
        video_bitrate="3000k",
        audio_bitrate="128k",
        framerate=30,
        preset="veryfast"
    ),
    StreamQuality.HIGH: EncodingPreset(
        resolution="1920x1080",
        video_bitrate="4500k",
        audio_bitrate="160k",
        framerate=60,
        preset="veryfast"
    ),
    StreamQuality.ULTRA: EncodingPreset(
        resolution="2560x1440",
        video_bitrate="9000k",
        audio_bitrate="192k",
        framerate=60,
        preset="fast"
    )
}

Health Monitoring

import asyncio
from dataclasses import dataclass
from datetime import datetime

@dataclass
class StreamHealth:
    bitrate: float
    dropped_frames: int
    fps: float
    cpu_usage: float
    timestamp: datetime

class StreamMonitor:
    def __init__(self, obs: OBSController):
        self.obs = obs
        self.health_history: list[StreamHealth] = []

    async def monitor(self, interval: float = 5.0):
        """Continuously monitor stream health."""
        while True:
            try:
                stats = self.obs.ws.call(obs_requests.GetStats())
                health = StreamHealth(
                    bitrate=stats.getKbitsPerSec(),
                    dropped_frames=stats.getOutputSkippedFrames(),
                    fps=stats.getActiveFps(),
                    cpu_usage=stats.getCpuUsage(),
                    timestamp=datetime.now()
                )
                self.health_history.append(health)

                # Alert on issues
                if health.dropped_frames > 100:
                    print(f"Warning: High dropped frames: {health.dropped_frames}")
                if health.fps < 25:
                    print(f"Warning: Low FPS: {health.fps}")

            except Exception as e:
                print(f"Monitor error: {e}")

            await asyncio.sleep(interval)

References