Claude Code Plugins

Community-maintained marketplace

Feedback

video-upload-patterns

@mindmorass/reflex
0
0

Video upload patterns for YouTube, TikTok, and Vimeo. Use when uploading videos to platforms, managing video metadata, scheduling video releases, or handling bulk video uploads.

Install Skill

1Download skill
2Enable skills in Claude

Open claude.ai/settings/capabilities and find the "Skills" section

3Upload to Claude

Click "Upload skill" and select the downloaded ZIP file

Note: Please verify skill by going through its instructions before using it.

SKILL.md

name video-upload-patterns
description Video upload patterns for YouTube, TikTok, and Vimeo. Use when uploading videos to platforms, managing video metadata, scheduling video releases, or handling bulk video uploads.

Video Upload Patterns

Best practices for uploading videos to major platforms.

YouTube Upload

Basic Upload

from googleapiclient.discovery import build
from googleapiclient.http import MediaFileUpload
from google.oauth2.credentials import Credentials

def upload_video_youtube(
    credentials: Credentials,
    file_path: str,
    title: str,
    description: str,
    tags: list[str],
    category_id: str = "22",  # People & Blogs
    privacy: str = "private"
) -> dict:
    """Upload video to YouTube."""
    youtube = build('youtube', 'v3', credentials=credentials)

    body = {
        "snippet": {
            "title": title,
            "description": description,
            "tags": tags,
            "categoryId": category_id
        },
        "status": {
            "privacyStatus": privacy,
            "selfDeclaredMadeForKids": False
        }
    }

    media = MediaFileUpload(
        file_path,
        mimetype='video/*',
        resumable=True,
        chunksize=1024*1024  # 1MB chunks
    )

    request = youtube.videos().insert(
        part="snippet,status",
        body=body,
        media_body=media
    )

    response = None
    while response is None:
        status, response = request.next_chunk()
        if status:
            print(f"Uploading: {int(status.progress() * 100)}%")

    return response


def set_thumbnail(
    credentials: Credentials,
    video_id: str,
    thumbnail_path: str
):
    """Set custom thumbnail for video."""
    youtube = build('youtube', 'v3', credentials=credentials)

    media = MediaFileUpload(thumbnail_path, mimetype='image/jpeg')

    return youtube.thumbnails().set(
        videoId=video_id,
        media_body=media
    ).execute()


def schedule_video(
    credentials: Credentials,
    video_id: str,
    publish_at: str  # ISO 8601 format
):
    """Schedule video for future publication."""
    youtube = build('youtube', 'v3', credentials=credentials)

    return youtube.videos().update(
        part="status",
        body={
            "id": video_id,
            "status": {
                "privacyStatus": "private",
                "publishAt": publish_at
            }
        }
    ).execute()

Resumable Upload with Progress

import os
import time
from googleapiclient.errors import HttpError

class YouTubeUploader:
    MAX_RETRIES = 10
    RETRIABLE_STATUS_CODES = [500, 502, 503, 504]

    def __init__(self, credentials):
        self.youtube = build('youtube', 'v3', credentials=credentials)

    def upload_with_retry(
        self,
        file_path: str,
        metadata: dict,
        progress_callback=None
    ) -> dict:
        """Upload with automatic retry on failure."""
        file_size = os.path.getsize(file_path)

        media = MediaFileUpload(
            file_path,
            mimetype='video/*',
            resumable=True,
            chunksize=5 * 1024 * 1024  # 5MB chunks
        )

        request = self.youtube.videos().insert(
            part="snippet,status",
            body=metadata,
            media_body=media
        )

        response = None
        retry = 0

        while response is None:
            try:
                status, response = request.next_chunk()
                if status:
                    progress = status.progress()
                    bytes_uploaded = int(file_size * progress)
                    if progress_callback:
                        progress_callback(progress, bytes_uploaded, file_size)

            except HttpError as e:
                if e.resp.status in self.RETRIABLE_STATUS_CODES:
                    retry += 1
                    if retry > self.MAX_RETRIES:
                        raise
                    sleep_seconds = 2 ** retry
                    print(f"Retry {retry}, sleeping {sleep_seconds}s")
                    time.sleep(sleep_seconds)
                else:
                    raise

        return response

TikTok Upload

Content Posting API

import requests
from dataclasses import dataclass
from typing import Optional

@dataclass
class TikTokVideo:
    video_url: str  # URL to video file
    title: str
    privacy_level: str = "SELF_ONLY"  # SELF_ONLY, MUTUAL_FOLLOW_FRIENDS, FOLLOWER_OF_CREATOR, PUBLIC_TO_EVERYONE
    disable_duet: bool = False
    disable_comment: bool = False
    disable_stitch: bool = False


class TikTokUploader:
    def __init__(self, access_token: str):
        self.access_token = access_token
        self.base_url = "https://open.tiktokapis.com/v2"
        self.headers = {
            "Authorization": f"Bearer {access_token}",
            "Content-Type": "application/json"
        }

    def init_video_upload(self, video: TikTokVideo) -> dict:
        """Initialize video upload and get upload URL."""
        response = requests.post(
            f"{self.base_url}/post/publish/video/init/",
            headers=self.headers,
            json={
                "post_info": {
                    "title": video.title,
                    "privacy_level": video.privacy_level,
                    "disable_duet": video.disable_duet,
                    "disable_comment": video.disable_comment,
                    "disable_stitch": video.disable_stitch
                },
                "source_info": {
                    "source": "PULL_FROM_URL",
                    "video_url": video.video_url
                }
            }
        )
        return response.json()

    def check_publish_status(self, publish_id: str) -> dict:
        """Check status of video publishing."""
        response = requests.post(
            f"{self.base_url}/post/publish/status/fetch/",
            headers=self.headers,
            json={"publish_id": publish_id}
        )
        return response.json()

    def upload_from_file(self, file_path: str, video: TikTokVideo) -> dict:
        """Upload video from local file (requires file hosting)."""
        # TikTok requires video URL, so you need to host the file first
        # This is a placeholder for the workflow
        raise NotImplementedError(
            "TikTok requires video URL. Host file and use init_video_upload()"
        )

Direct Upload (Chunk-based)

class TikTokDirectUploader:
    """Direct upload using chunk-based approach."""

    def __init__(self, access_token: str):
        self.access_token = access_token
        self.base_url = "https://open.tiktokapis.com/v2"

    def init_upload(
        self,
        file_size: int,
        chunk_size: int = 10 * 1024 * 1024  # 10MB
    ) -> dict:
        """Initialize direct upload."""
        total_chunks = (file_size + chunk_size - 1) // chunk_size

        response = requests.post(
            f"{self.base_url}/post/publish/inbox/video/init/",
            headers={
                "Authorization": f"Bearer {self.access_token}",
                "Content-Type": "application/json"
            },
            json={
                "source_info": {
                    "source": "FILE_UPLOAD",
                    "video_size": file_size,
                    "chunk_size": chunk_size,
                    "total_chunk_count": total_chunks
                }
            }
        )
        return response.json()

    def upload_chunk(
        self,
        upload_url: str,
        chunk_data: bytes,
        chunk_index: int,
        total_chunks: int
    ):
        """Upload a single chunk."""
        response = requests.put(
            upload_url,
            headers={
                "Content-Type": "video/mp4",
                "Content-Range": f"bytes {chunk_index * len(chunk_data)}-{(chunk_index + 1) * len(chunk_data) - 1}/{total_chunks * len(chunk_data)}"
            },
            data=chunk_data
        )
        return response

    def upload_file(self, file_path: str) -> dict:
        """Upload complete file in chunks."""
        import os

        file_size = os.path.getsize(file_path)
        chunk_size = 10 * 1024 * 1024  # 10MB

        # Initialize
        init_response = self.init_upload(file_size, chunk_size)
        upload_url = init_response['data']['upload_url']
        publish_id = init_response['data']['publish_id']

        # Upload chunks
        with open(file_path, 'rb') as f:
            chunk_index = 0
            total_chunks = (file_size + chunk_size - 1) // chunk_size

            while True:
                chunk = f.read(chunk_size)
                if not chunk:
                    break

                self.upload_chunk(upload_url, chunk, chunk_index, total_chunks)
                chunk_index += 1
                print(f"Uploaded chunk {chunk_index}/{total_chunks}")

        return {"publish_id": publish_id}

Vimeo Upload

import vimeo

class VimeoUploader:
    def __init__(self, token: str, key: str, secret: str):
        self.client = vimeo.VimeoClient(
            token=token,
            key=key,
            secret=secret
        )

    def upload_video(
        self,
        file_path: str,
        name: str,
        description: str = "",
        privacy: str = "nobody"  # anybody, nobody, password, disable
    ) -> dict:
        """Upload video to Vimeo."""
        uri = self.client.upload(file_path, data={
            'name': name,
            'description': description,
            'privacy': {
                'view': privacy
            }
        })

        # Get video details
        response = self.client.get(uri)
        return response.json()

    def upload_with_progress(
        self,
        file_path: str,
        name: str,
        progress_callback=None
    ) -> dict:
        """Upload with progress tracking."""
        def progress_handler(uploaded, total):
            if progress_callback:
                progress_callback(uploaded / total, uploaded, total)

        uri = self.client.upload(
            file_path,
            data={'name': name},
            progress=progress_handler
        )

        return self.client.get(uri).json()

    def replace_video(self, video_id: str, file_path: str) -> dict:
        """Replace existing video file."""
        uri = f"/videos/{video_id}"
        return self.client.replace(uri, file_path)

    def set_thumbnail(self, video_id: str, time_code: float) -> dict:
        """Set thumbnail from video frame."""
        response = self.client.post(
            f"/videos/{video_id}/pictures",
            data={'time': time_code, 'active': True}
        )
        return response.json()

    def add_to_folder(self, video_id: str, folder_id: str):
        """Add video to folder/project."""
        self.client.put(f"/me/projects/{folder_id}/videos/{video_id}")

Bulk Upload Manager

import asyncio
from dataclasses import dataclass
from enum import Enum
from typing import Callable, Optional
from pathlib import Path

class UploadPlatform(Enum):
    YOUTUBE = "youtube"
    TIKTOK = "tiktok"
    VIMEO = "vimeo"

class UploadStatus(Enum):
    PENDING = "pending"
    UPLOADING = "uploading"
    PROCESSING = "processing"
    COMPLETE = "complete"
    FAILED = "failed"

@dataclass
class VideoUploadJob:
    file_path: Path
    title: str
    description: str
    tags: list[str]
    platforms: list[UploadPlatform]
    thumbnail_path: Optional[Path] = None
    scheduled_time: Optional[str] = None
    status: UploadStatus = UploadStatus.PENDING
    results: dict = None

    def __post_init__(self):
        self.results = {}


class BulkUploadManager:
    def __init__(
        self,
        youtube_creds=None,
        tiktok_token=None,
        vimeo_client=None
    ):
        self.youtube_creds = youtube_creds
        self.tiktok_token = tiktok_token
        self.vimeo_client = vimeo_client
        self.jobs: list[VideoUploadJob] = []

    def add_job(self, job: VideoUploadJob):
        """Add upload job to queue."""
        self.jobs.append(job)

    async def process_job(self, job: VideoUploadJob):
        """Process single upload job."""
        job.status = UploadStatus.UPLOADING

        for platform in job.platforms:
            try:
                if platform == UploadPlatform.YOUTUBE and self.youtube_creds:
                    result = upload_video_youtube(
                        self.youtube_creds,
                        str(job.file_path),
                        job.title,
                        job.description,
                        job.tags
                    )
                    job.results['youtube'] = result

                elif platform == UploadPlatform.VIMEO and self.vimeo_client:
                    result = self.vimeo_client.upload_video(
                        str(job.file_path),
                        job.title,
                        job.description
                    )
                    job.results['vimeo'] = result

            except Exception as e:
                job.results[platform.value] = {"error": str(e)}

        job.status = UploadStatus.COMPLETE

    async def process_all(self, concurrent_limit: int = 3):
        """Process all jobs with concurrency limit."""
        semaphore = asyncio.Semaphore(concurrent_limit)

        async def process_with_limit(job):
            async with semaphore:
                await self.process_job(job)

        await asyncio.gather(
            *[process_with_limit(job) for job in self.jobs]
        )

    def get_status_report(self) -> dict:
        """Get status of all jobs."""
        return {
            "total": len(self.jobs),
            "pending": sum(1 for j in self.jobs if j.status == UploadStatus.PENDING),
            "uploading": sum(1 for j in self.jobs if j.status == UploadStatus.UPLOADING),
            "complete": sum(1 for j in self.jobs if j.status == UploadStatus.COMPLETE),
            "failed": sum(1 for j in self.jobs if j.status == UploadStatus.FAILED),
            "jobs": [
                {
                    "file": str(j.file_path),
                    "status": j.status.value,
                    "results": j.results
                }
                for j in self.jobs
            ]
        }

Metadata Templates

from dataclasses import dataclass
from typing import Optional
from string import Template

@dataclass
class VideoMetadataTemplate:
    title_template: str
    description_template: str
    tags: list[str]
    category: str

    def render(self, **kwargs) -> dict:
        """Render template with variables."""
        return {
            "title": Template(self.title_template).safe_substitute(**kwargs),
            "description": Template(self.description_template).safe_substitute(**kwargs),
            "tags": self.tags,
            "category": self.category
        }


# Example templates
GAMING_TEMPLATE = VideoMetadataTemplate(
    title_template="$game_name - $episode_title | Episode $episode_num",
    description_template="""
$episode_title

Welcome back to $game_name! In this episode, we $episode_summary.

Timestamps:
$timestamps

Subscribe for more $game_name content!

#$game_tag #gaming #letsplay
    """.strip(),
    tags=["gaming", "letsplay", "gameplay"],
    category="20"  # Gaming
)

TUTORIAL_TEMPLATE = VideoMetadataTemplate(
    title_template="$topic Tutorial - $subtitle | $series_name",
    description_template="""
Learn $topic in this comprehensive tutorial!

In this video:
$outline

Resources:
$resources

Don't forget to like and subscribe!
    """.strip(),
    tags=["tutorial", "howto", "learn"],
    category="27"  # Education
)

Upload Validation

import subprocess
import os
from dataclasses import dataclass

@dataclass
class VideoValidation:
    is_valid: bool
    duration: float
    resolution: str
    codec: str
    file_size: int
    errors: list[str]

def validate_video(file_path: str, platform: str = "youtube") -> VideoValidation:
    """Validate video meets platform requirements."""
    errors = []

    # Get video info
    probe = subprocess.run([
        "ffprobe", "-v", "quiet",
        "-print_format", "json",
        "-show_format", "-show_streams",
        file_path
    ], capture_output=True, text=True)

    import json
    info = json.loads(probe.stdout)
    video_stream = next(s for s in info['streams'] if s['codec_type'] == 'video')

    duration = float(info['format']['duration'])
    file_size = int(info['format']['size'])
    resolution = f"{video_stream['width']}x{video_stream['height']}"
    codec = video_stream['codec_name']

    # Platform-specific validation
    if platform == "youtube":
        if duration > 12 * 3600:  # 12 hours
            errors.append("Video exceeds 12 hour limit")
        if file_size > 256 * 1024 * 1024 * 1024:  # 256GB
            errors.append("File exceeds 256GB limit")

    elif platform == "tiktok":
        if duration > 10 * 60:  # 10 minutes
            errors.append("Video exceeds 10 minute limit")
        if file_size > 4 * 1024 * 1024 * 1024:  # 4GB
            errors.append("File exceeds 4GB limit")

    elif platform == "vimeo":
        if duration > 12 * 3600:
            errors.append("Video exceeds 12 hour limit")

    return VideoValidation(
        is_valid=len(errors) == 0,
        duration=duration,
        resolution=resolution,
        codec=codec,
        file_size=file_size,
        errors=errors
    )

References