| name | youtube-harvester |
| description | Extract transcripts and metadata from YouTube videos |
YouTube Harvester Skill
Extract and ingest YouTube video transcripts into RAG with proper chunking and metadata.
Overview
YouTube is a rich source of tutorials, lectures, and explanations. This skill covers:
- Transcript extraction (manual and auto-generated)
- Timestamp-aware chunking
- Playlist and channel harvesting
- Metadata enrichment
Prerequisites
# Install yt-dlp for video metadata and subtitles
pip install yt-dlp
# Install youtube-transcript-api for cleaner transcript access
pip install youtube-transcript-api
# Optional: for audio transcription fallback
pip install openai-whisper
Extraction Methods
Method 1: youtube-transcript-api (Recommended)
Best for clean transcript text with timestamps.
#!/usr/bin/env python3
"""Extract YouTube transcripts using youtube-transcript-api."""
from youtube_transcript_api import YouTubeTranscriptApi
from youtube_transcript_api.formatters import TextFormatter
import json
import re
from typing import Dict, List, Optional
from datetime import datetime
def extract_video_id(url: str) -> str:
"""Extract video ID from various YouTube URL formats."""
patterns = [
r'(?:v=|/v/|youtu\.be/)([a-zA-Z0-9_-]{11})',
r'(?:embed/)([a-zA-Z0-9_-]{11})',
]
for pattern in patterns:
match = re.search(pattern, url)
if match:
return match.group(1)
raise ValueError(f"Could not extract video ID from: {url}")
def get_transcript(video_id: str, languages: List[str] = ['en']) -> List[Dict]:
"""
Fetch transcript for a video.
Args:
video_id: YouTube video ID
languages: Preferred languages in order
Returns:
List of transcript segments with text, start, duration
"""
try:
# Try to get manual captions first
transcript_list = YouTubeTranscriptApi.list_transcripts(video_id)
try:
transcript = transcript_list.find_manually_created_transcript(languages)
except:
# Fall back to auto-generated
transcript = transcript_list.find_generated_transcript(languages)
return transcript.fetch()
except Exception as e:
print(f"Error fetching transcript: {e}")
return []
def format_timestamp(seconds: float) -> str:
"""Convert seconds to HH:MM:SS format."""
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
secs = int(seconds % 60)
if hours > 0:
return f"{hours:02d}:{minutes:02d}:{secs:02d}"
return f"{minutes:02d}:{secs:02d}"
def get_video_metadata(video_id: str) -> Dict:
"""Get video metadata using yt-dlp."""
import subprocess
import json
result = subprocess.run(
['yt-dlp', '--dump-json', '--no-download', f'https://youtube.com/watch?v={video_id}'],
capture_output=True,
text=True
)
if result.returncode == 0:
data = json.loads(result.stdout)
return {
'title': data.get('title'),
'channel': data.get('channel'),
'channel_id': data.get('channel_id'),
'upload_date': data.get('upload_date'),
'duration': data.get('duration'),
'view_count': data.get('view_count'),
'description': data.get('description', '')[:500], # Truncate
'tags': data.get('tags', [])[:10], # Limit tags
}
return {}
Method 2: yt-dlp Subtitles
Better for batch processing and when API limits are hit.
#!/bin/bash
# Extract subtitles using yt-dlp
VIDEO_URL="$1"
OUTPUT_DIR="${2:-.}"
# Download auto-generated subtitles
yt-dlp \
--write-auto-sub \
--sub-lang en \
--sub-format vtt \
--skip-download \
--output "$OUTPUT_DIR/%(title)s.%(ext)s" \
"$VIDEO_URL"
# Convert VTT to plain text
for vtt in "$OUTPUT_DIR"/*.vtt; do
# Remove VTT formatting, keep just text
sed -e '/^WEBVTT/d' \
-e '/^Kind:/d' \
-e '/^Language:/d' \
-e '/^[0-9][0-9]:[0-9][0-9]/d' \
-e '/-->/d' \
-e 's/<[^>]*>//g' \
-e '/^$/d' \
"$vtt" > "${vtt%.vtt}.txt"
done
Chunking Strategies
Strategy 1: Time-Based Chunks
Split transcript into fixed time intervals.
def chunk_by_time(
transcript: List[Dict],
chunk_duration: int = 300 # 5 minutes
) -> List[Dict]:
"""
Chunk transcript by time intervals.
Args:
transcript: List of transcript segments
chunk_duration: Seconds per chunk
"""
chunks = []
current_chunk = {
'text': '',
'start': 0,
'end': 0,
'segments': []
}
for segment in transcript:
segment_start = segment['start']
# Check if we need to start a new chunk
if segment_start >= current_chunk['start'] + chunk_duration:
if current_chunk['text']:
chunks.append(current_chunk)
current_chunk = {
'text': '',
'start': segment_start,
'end': segment_start,
'segments': []
}
current_chunk['text'] += ' ' + segment['text']
current_chunk['end'] = segment['start'] + segment.get('duration', 0)
current_chunk['segments'].append(segment)
# Don't forget the last chunk
if current_chunk['text']:
chunks.append(current_chunk)
return chunks
Strategy 2: Topic-Based Chunks
Split when topic appears to change (silence gaps or topic markers).
def chunk_by_topic(
transcript: List[Dict],
gap_threshold: float = 5.0, # Seconds of silence indicating topic change
min_chunk_size: int = 100 # Minimum words per chunk
) -> List[Dict]:
"""
Chunk transcript by topic boundaries.
Uses gaps in speech and sentence boundaries to identify topic changes.
"""
chunks = []
current_chunk = {
'text': '',
'start': 0,
'end': 0,
'word_count': 0
}
prev_end = 0
for segment in transcript:
segment_start = segment['start']
gap = segment_start - prev_end
word_count = len(segment['text'].split())
# Check for topic boundary
is_boundary = (
gap > gap_threshold and
current_chunk['word_count'] >= min_chunk_size
)
if is_boundary:
if current_chunk['text']:
chunks.append(current_chunk)
current_chunk = {
'text': '',
'start': segment_start,
'end': segment_start,
'word_count': 0
}
current_chunk['text'] += ' ' + segment['text']
current_chunk['end'] = segment_start + segment.get('duration', 0)
current_chunk['word_count'] += word_count
prev_end = current_chunk['end']
if current_chunk['text']:
chunks.append(current_chunk)
return chunks
Strategy 3: Semantic Chunks
Use embeddings to find natural topic boundaries.
def chunk_by_semantics(
transcript: List[Dict],
similarity_threshold: float = 0.7,
window_size: int = 5
) -> List[Dict]:
"""
Chunk based on semantic similarity between segments.
Groups semantically similar consecutive segments together.
"""
from sentence_transformers import SentenceTransformer
import numpy as np
model = SentenceTransformer('all-MiniLM-L6-v2')
# Combine segments into windows for more stable embeddings
windows = []
for i in range(0, len(transcript), window_size):
window_text = ' '.join(
s['text'] for s in transcript[i:i+window_size]
)
windows.append({
'text': window_text,
'start': transcript[i]['start'],
'end': transcript[min(i+window_size-1, len(transcript)-1)]['start'],
'segments': transcript[i:i+window_size]
})
# Get embeddings
embeddings = model.encode([w['text'] for w in windows])
# Find boundaries where similarity drops
chunks = []
current_chunk = windows[0].copy() if windows else None
for i in range(1, len(windows)):
similarity = np.dot(embeddings[i-1], embeddings[i]) / (
np.linalg.norm(embeddings[i-1]) * np.linalg.norm(embeddings[i])
)
if similarity < similarity_threshold:
# Topic boundary detected
if current_chunk:
chunks.append(current_chunk)
current_chunk = windows[i].copy()
else:
# Continue current chunk
current_chunk['text'] += ' ' + windows[i]['text']
current_chunk['end'] = windows[i]['end']
current_chunk['segments'].extend(windows[i]['segments'])
if current_chunk:
chunks.append(current_chunk)
return chunks
Full Harvesting Pipeline
#!/usr/bin/env python3
"""Complete YouTube harvesting pipeline."""
import json
from typing import List, Dict, Optional
from datetime import datetime
async def harvest_youtube_video(
url: str,
collection: str,
chunk_strategy: str = "time", # time, topic, semantic
chunk_size: int = 300
) -> Dict:
"""
Harvest a single YouTube video into RAG.
Args:
url: YouTube video URL
collection: Target RAG collection
chunk_strategy: How to chunk the transcript
chunk_size: Size parameter for chunking
Returns:
Harvest report
"""
video_id = extract_video_id(url)
# Get metadata
metadata = get_video_metadata(video_id)
if not metadata:
return {"status": "error", "error": "Could not fetch metadata"}
# Get transcript
transcript = get_transcript(video_id)
if not transcript:
return {"status": "error", "error": "No transcript available"}
# Chunk based on strategy
if chunk_strategy == "time":
chunks = chunk_by_time(transcript, chunk_size)
elif chunk_strategy == "topic":
chunks = chunk_by_topic(transcript)
elif chunk_strategy == "semantic":
chunks = chunk_by_semantics(transcript)
else:
chunks = chunk_by_time(transcript, chunk_size)
# Ingest each chunk
ingested = 0
for i, chunk in enumerate(chunks):
chunk_metadata = {
"source_type": "youtube",
"source_url": url,
"video_id": video_id,
"title": metadata.get("title"),
"channel": metadata.get("channel"),
"upload_date": metadata.get("upload_date"),
"duration_seconds": metadata.get("duration"),
"timestamp_start": format_timestamp(chunk["start"]),
"timestamp_end": format_timestamp(chunk["end"]),
"chunk_index": i,
"total_chunks": len(chunks),
"harvested_at": datetime.now().isoformat()
}
await ingest(
content=chunk["text"].strip(),
collection=collection,
metadata=chunk_metadata,
doc_id=f"yt_{video_id}_chunk_{i}"
)
ingested += 1
return {
"status": "success",
"video_id": video_id,
"title": metadata.get("title"),
"chunks": ingested,
"collection": collection
}
async def harvest_youtube_playlist(
playlist_url: str,
collection: str,
**kwargs
) -> Dict:
"""Harvest all videos in a playlist."""
import subprocess
# Get playlist video IDs
result = subprocess.run(
['yt-dlp', '--flat-playlist', '--print', 'id', playlist_url],
capture_output=True,
text=True
)
video_ids = result.stdout.strip().split('
')
results = []
for video_id in video_ids:
url = f"https://youtube.com/watch?v={video_id}"
result = await harvest_youtube_video(url, collection, **kwargs)
results.append(result)
success = sum(1 for r in results if r.get("status") == "success")
return {
"status": "success",
"videos_processed": len(video_ids),
"videos_succeeded": success,
"videos_failed": len(video_ids) - success,
"details": results
}
async def harvest_youtube_channel(
channel_url: str,
collection: str,
max_videos: int = 50,
**kwargs
) -> Dict:
"""Harvest recent videos from a channel."""
import subprocess
# Get recent video IDs from channel
result = subprocess.run(
['yt-dlp', '--flat-playlist', '--print', 'id',
'--playlist-end', str(max_videos), channel_url],
capture_output=True,
text=True
)
video_ids = result.stdout.strip().split('
')
results = []
for video_id in video_ids:
if video_id:
url = f"https://youtube.com/watch?v={video_id}"
result = await harvest_youtube_video(url, collection, **kwargs)
results.append(result)
success = sum(1 for r in results if r.get("status") == "success")
return {
"status": "success",
"videos_processed": len(video_ids),
"videos_succeeded": success,
"collection": collection
}
Metadata Schema
# YouTube video chunk metadata
source_type: youtube
source_url: https://youtube.com/watch?v=...
video_id: dQw4w9WgXcQ
title: "Video Title"
channel: "Channel Name"
channel_id: UC...
upload_date: "20240101"
duration_seconds: 930
timestamp_start: "05:30"
timestamp_end: "10:00"
chunk_index: 2
total_chunks: 12
harvested_at: "2024-01-01T12:00:00Z"
tags: [tag1, tag2]
Error Handling
| Error | Handling |
|---|---|
| No transcript available | Log, skip, note in report |
| Private/deleted video | Skip with error note |
| Age-restricted | May need authentication |
| Rate limited | Back off, retry with delay |
| API quota exceeded | Switch to yt-dlp method |
Usage Examples
# Single video
result = await harvest_youtube_video(
url="https://youtube.com/watch?v=VIDEO_ID",
collection="ml_tutorials",
chunk_strategy="topic"
)
# Playlist
result = await harvest_youtube_playlist(
playlist_url="https://youtube.com/playlist?list=PLAYLIST_ID",
collection="course_lectures",
chunk_strategy="time",
chunk_size=600 # 10-minute chunks
)
# Channel (recent videos)
result = await harvest_youtube_channel(
channel_url="https://youtube.com/@ChannelName",
collection="channel_content",
max_videos=20
)
Refinement Notes
Track improvements as you use this skill.
- Transcript extraction tested
- Chunking strategies compared
- Metadata enrichment working
- Playlist harvesting tested
- Channel following implemented
- Error handling robust