| name | video-upload-patterns |
| description | Video upload patterns for YouTube, TikTok, and Vimeo. Use when uploading videos to platforms, managing video metadata, scheduling video releases, or handling bulk video uploads. |
Video Upload Patterns
Best practices for uploading videos to major platforms.
YouTube Upload
Basic Upload
from googleapiclient.discovery import build
from googleapiclient.http import MediaFileUpload
from google.oauth2.credentials import Credentials
def upload_video_youtube(
credentials: Credentials,
file_path: str,
title: str,
description: str,
tags: list[str],
category_id: str = "22", # People & Blogs
privacy: str = "private"
) -> dict:
"""Upload video to YouTube."""
youtube = build('youtube', 'v3', credentials=credentials)
body = {
"snippet": {
"title": title,
"description": description,
"tags": tags,
"categoryId": category_id
},
"status": {
"privacyStatus": privacy,
"selfDeclaredMadeForKids": False
}
}
media = MediaFileUpload(
file_path,
mimetype='video/*',
resumable=True,
chunksize=1024*1024 # 1MB chunks
)
request = youtube.videos().insert(
part="snippet,status",
body=body,
media_body=media
)
response = None
while response is None:
status, response = request.next_chunk()
if status:
print(f"Uploading: {int(status.progress() * 100)}%")
return response
def set_thumbnail(
credentials: Credentials,
video_id: str,
thumbnail_path: str
):
"""Set custom thumbnail for video."""
youtube = build('youtube', 'v3', credentials=credentials)
media = MediaFileUpload(thumbnail_path, mimetype='image/jpeg')
return youtube.thumbnails().set(
videoId=video_id,
media_body=media
).execute()
def schedule_video(
credentials: Credentials,
video_id: str,
publish_at: str # ISO 8601 format
):
"""Schedule video for future publication."""
youtube = build('youtube', 'v3', credentials=credentials)
return youtube.videos().update(
part="status",
body={
"id": video_id,
"status": {
"privacyStatus": "private",
"publishAt": publish_at
}
}
).execute()
Resumable Upload with Progress
import os
import time
from googleapiclient.errors import HttpError
class YouTubeUploader:
MAX_RETRIES = 10
RETRIABLE_STATUS_CODES = [500, 502, 503, 504]
def __init__(self, credentials):
self.youtube = build('youtube', 'v3', credentials=credentials)
def upload_with_retry(
self,
file_path: str,
metadata: dict,
progress_callback=None
) -> dict:
"""Upload with automatic retry on failure."""
file_size = os.path.getsize(file_path)
media = MediaFileUpload(
file_path,
mimetype='video/*',
resumable=True,
chunksize=5 * 1024 * 1024 # 5MB chunks
)
request = self.youtube.videos().insert(
part="snippet,status",
body=metadata,
media_body=media
)
response = None
retry = 0
while response is None:
try:
status, response = request.next_chunk()
if status:
progress = status.progress()
bytes_uploaded = int(file_size * progress)
if progress_callback:
progress_callback(progress, bytes_uploaded, file_size)
except HttpError as e:
if e.resp.status in self.RETRIABLE_STATUS_CODES:
retry += 1
if retry > self.MAX_RETRIES:
raise
sleep_seconds = 2 ** retry
print(f"Retry {retry}, sleeping {sleep_seconds}s")
time.sleep(sleep_seconds)
else:
raise
return response
TikTok Upload
Content Posting API
import requests
from dataclasses import dataclass
from typing import Optional
@dataclass
class TikTokVideo:
video_url: str # URL to video file
title: str
privacy_level: str = "SELF_ONLY" # SELF_ONLY, MUTUAL_FOLLOW_FRIENDS, FOLLOWER_OF_CREATOR, PUBLIC_TO_EVERYONE
disable_duet: bool = False
disable_comment: bool = False
disable_stitch: bool = False
class TikTokUploader:
def __init__(self, access_token: str):
self.access_token = access_token
self.base_url = "https://open.tiktokapis.com/v2"
self.headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json"
}
def init_video_upload(self, video: TikTokVideo) -> dict:
"""Initialize video upload and get upload URL."""
response = requests.post(
f"{self.base_url}/post/publish/video/init/",
headers=self.headers,
json={
"post_info": {
"title": video.title,
"privacy_level": video.privacy_level,
"disable_duet": video.disable_duet,
"disable_comment": video.disable_comment,
"disable_stitch": video.disable_stitch
},
"source_info": {
"source": "PULL_FROM_URL",
"video_url": video.video_url
}
}
)
return response.json()
def check_publish_status(self, publish_id: str) -> dict:
"""Check status of video publishing."""
response = requests.post(
f"{self.base_url}/post/publish/status/fetch/",
headers=self.headers,
json={"publish_id": publish_id}
)
return response.json()
def upload_from_file(self, file_path: str, video: TikTokVideo) -> dict:
"""Upload video from local file (requires file hosting)."""
# TikTok requires video URL, so you need to host the file first
# This is a placeholder for the workflow
raise NotImplementedError(
"TikTok requires video URL. Host file and use init_video_upload()"
)
Direct Upload (Chunk-based)
class TikTokDirectUploader:
"""Direct upload using chunk-based approach."""
def __init__(self, access_token: str):
self.access_token = access_token
self.base_url = "https://open.tiktokapis.com/v2"
def init_upload(
self,
file_size: int,
chunk_size: int = 10 * 1024 * 1024 # 10MB
) -> dict:
"""Initialize direct upload."""
total_chunks = (file_size + chunk_size - 1) // chunk_size
response = requests.post(
f"{self.base_url}/post/publish/inbox/video/init/",
headers={
"Authorization": f"Bearer {self.access_token}",
"Content-Type": "application/json"
},
json={
"source_info": {
"source": "FILE_UPLOAD",
"video_size": file_size,
"chunk_size": chunk_size,
"total_chunk_count": total_chunks
}
}
)
return response.json()
def upload_chunk(
self,
upload_url: str,
chunk_data: bytes,
chunk_index: int,
total_chunks: int
):
"""Upload a single chunk."""
response = requests.put(
upload_url,
headers={
"Content-Type": "video/mp4",
"Content-Range": f"bytes {chunk_index * len(chunk_data)}-{(chunk_index + 1) * len(chunk_data) - 1}/{total_chunks * len(chunk_data)}"
},
data=chunk_data
)
return response
def upload_file(self, file_path: str) -> dict:
"""Upload complete file in chunks."""
import os
file_size = os.path.getsize(file_path)
chunk_size = 10 * 1024 * 1024 # 10MB
# Initialize
init_response = self.init_upload(file_size, chunk_size)
upload_url = init_response['data']['upload_url']
publish_id = init_response['data']['publish_id']
# Upload chunks
with open(file_path, 'rb') as f:
chunk_index = 0
total_chunks = (file_size + chunk_size - 1) // chunk_size
while True:
chunk = f.read(chunk_size)
if not chunk:
break
self.upload_chunk(upload_url, chunk, chunk_index, total_chunks)
chunk_index += 1
print(f"Uploaded chunk {chunk_index}/{total_chunks}")
return {"publish_id": publish_id}
Vimeo Upload
import vimeo
class VimeoUploader:
def __init__(self, token: str, key: str, secret: str):
self.client = vimeo.VimeoClient(
token=token,
key=key,
secret=secret
)
def upload_video(
self,
file_path: str,
name: str,
description: str = "",
privacy: str = "nobody" # anybody, nobody, password, disable
) -> dict:
"""Upload video to Vimeo."""
uri = self.client.upload(file_path, data={
'name': name,
'description': description,
'privacy': {
'view': privacy
}
})
# Get video details
response = self.client.get(uri)
return response.json()
def upload_with_progress(
self,
file_path: str,
name: str,
progress_callback=None
) -> dict:
"""Upload with progress tracking."""
def progress_handler(uploaded, total):
if progress_callback:
progress_callback(uploaded / total, uploaded, total)
uri = self.client.upload(
file_path,
data={'name': name},
progress=progress_handler
)
return self.client.get(uri).json()
def replace_video(self, video_id: str, file_path: str) -> dict:
"""Replace existing video file."""
uri = f"/videos/{video_id}"
return self.client.replace(uri, file_path)
def set_thumbnail(self, video_id: str, time_code: float) -> dict:
"""Set thumbnail from video frame."""
response = self.client.post(
f"/videos/{video_id}/pictures",
data={'time': time_code, 'active': True}
)
return response.json()
def add_to_folder(self, video_id: str, folder_id: str):
"""Add video to folder/project."""
self.client.put(f"/me/projects/{folder_id}/videos/{video_id}")
Bulk Upload Manager
import asyncio
from dataclasses import dataclass
from enum import Enum
from typing import Callable, Optional
from pathlib import Path
class UploadPlatform(Enum):
YOUTUBE = "youtube"
TIKTOK = "tiktok"
VIMEO = "vimeo"
class UploadStatus(Enum):
PENDING = "pending"
UPLOADING = "uploading"
PROCESSING = "processing"
COMPLETE = "complete"
FAILED = "failed"
@dataclass
class VideoUploadJob:
file_path: Path
title: str
description: str
tags: list[str]
platforms: list[UploadPlatform]
thumbnail_path: Optional[Path] = None
scheduled_time: Optional[str] = None
status: UploadStatus = UploadStatus.PENDING
results: dict = None
def __post_init__(self):
self.results = {}
class BulkUploadManager:
def __init__(
self,
youtube_creds=None,
tiktok_token=None,
vimeo_client=None
):
self.youtube_creds = youtube_creds
self.tiktok_token = tiktok_token
self.vimeo_client = vimeo_client
self.jobs: list[VideoUploadJob] = []
def add_job(self, job: VideoUploadJob):
"""Add upload job to queue."""
self.jobs.append(job)
async def process_job(self, job: VideoUploadJob):
"""Process single upload job."""
job.status = UploadStatus.UPLOADING
for platform in job.platforms:
try:
if platform == UploadPlatform.YOUTUBE and self.youtube_creds:
result = upload_video_youtube(
self.youtube_creds,
str(job.file_path),
job.title,
job.description,
job.tags
)
job.results['youtube'] = result
elif platform == UploadPlatform.VIMEO and self.vimeo_client:
result = self.vimeo_client.upload_video(
str(job.file_path),
job.title,
job.description
)
job.results['vimeo'] = result
except Exception as e:
job.results[platform.value] = {"error": str(e)}
job.status = UploadStatus.COMPLETE
async def process_all(self, concurrent_limit: int = 3):
"""Process all jobs with concurrency limit."""
semaphore = asyncio.Semaphore(concurrent_limit)
async def process_with_limit(job):
async with semaphore:
await self.process_job(job)
await asyncio.gather(
*[process_with_limit(job) for job in self.jobs]
)
def get_status_report(self) -> dict:
"""Get status of all jobs."""
return {
"total": len(self.jobs),
"pending": sum(1 for j in self.jobs if j.status == UploadStatus.PENDING),
"uploading": sum(1 for j in self.jobs if j.status == UploadStatus.UPLOADING),
"complete": sum(1 for j in self.jobs if j.status == UploadStatus.COMPLETE),
"failed": sum(1 for j in self.jobs if j.status == UploadStatus.FAILED),
"jobs": [
{
"file": str(j.file_path),
"status": j.status.value,
"results": j.results
}
for j in self.jobs
]
}
Metadata Templates
from dataclasses import dataclass
from typing import Optional
from string import Template
@dataclass
class VideoMetadataTemplate:
title_template: str
description_template: str
tags: list[str]
category: str
def render(self, **kwargs) -> dict:
"""Render template with variables."""
return {
"title": Template(self.title_template).safe_substitute(**kwargs),
"description": Template(self.description_template).safe_substitute(**kwargs),
"tags": self.tags,
"category": self.category
}
# Example templates
GAMING_TEMPLATE = VideoMetadataTemplate(
title_template="$game_name - $episode_title | Episode $episode_num",
description_template="""
$episode_title
Welcome back to $game_name! In this episode, we $episode_summary.
Timestamps:
$timestamps
Subscribe for more $game_name content!
#$game_tag #gaming #letsplay
""".strip(),
tags=["gaming", "letsplay", "gameplay"],
category="20" # Gaming
)
TUTORIAL_TEMPLATE = VideoMetadataTemplate(
title_template="$topic Tutorial - $subtitle | $series_name",
description_template="""
Learn $topic in this comprehensive tutorial!
In this video:
$outline
Resources:
$resources
Don't forget to like and subscribe!
""".strip(),
tags=["tutorial", "howto", "learn"],
category="27" # Education
)
Upload Validation
import subprocess
import os
from dataclasses import dataclass
@dataclass
class VideoValidation:
is_valid: bool
duration: float
resolution: str
codec: str
file_size: int
errors: list[str]
def validate_video(file_path: str, platform: str = "youtube") -> VideoValidation:
"""Validate video meets platform requirements."""
errors = []
# Get video info
probe = subprocess.run([
"ffprobe", "-v", "quiet",
"-print_format", "json",
"-show_format", "-show_streams",
file_path
], capture_output=True, text=True)
import json
info = json.loads(probe.stdout)
video_stream = next(s for s in info['streams'] if s['codec_type'] == 'video')
duration = float(info['format']['duration'])
file_size = int(info['format']['size'])
resolution = f"{video_stream['width']}x{video_stream['height']}"
codec = video_stream['codec_name']
# Platform-specific validation
if platform == "youtube":
if duration > 12 * 3600: # 12 hours
errors.append("Video exceeds 12 hour limit")
if file_size > 256 * 1024 * 1024 * 1024: # 256GB
errors.append("File exceeds 256GB limit")
elif platform == "tiktok":
if duration > 10 * 60: # 10 minutes
errors.append("Video exceeds 10 minute limit")
if file_size > 4 * 1024 * 1024 * 1024: # 4GB
errors.append("File exceeds 4GB limit")
elif platform == "vimeo":
if duration > 12 * 3600:
errors.append("Video exceeds 12 hour limit")
return VideoValidation(
is_valid=len(errors) == 0,
duration=duration,
resolution=resolution,
codec=codec,
file_size=file_size,
errors=errors
)