What it does
Indexes transcripts from any public channel
Timestamp-linked answers
Topic clustering
No API key for YouTube
Stack
YouTube Transcript APIOpenAIChromaDBStreamlit
Deploy on
✓ Streamlit Cloud (free)✓ Local
Full source code
Install commands are in the top comments. Copy and run.
# YouTube Channel Research RAG Agent
# Indexes video transcripts — ask questions across hundreds of videos
# pip install youtube-transcript-api yt-dlp openai chromadb streamlit langchain-openai
import streamlit as st
from youtube_transcript_api import YouTubeTranscriptApi
from urllib.parse import urlparse, parse_qs
import yt_dlp
import chromadb
from chromadb.utils import embedding_functions
from openai import OpenAI
import re, json
from typing import Optional
# ── SETUP ─────────────────────────────────────────────────────────
import os
OPENAI_KEY = os.environ.get("OPENAI_API_KEY")
openai_client = OpenAI(api_key=OPENAI_KEY)
chroma_client = chromadb.PersistentClient(path="./youtube_chroma")
openai_ef = embedding_functions.OpenAIEmbeddingFunction(
api_key=OPENAI_KEY,
model_name="text-embedding-3-small"
)
collection = chroma_client.get_or_create_collection(
"youtube_transcripts",
embedding_function=openai_ef
)
# ── YOUTUBE HELPERS ────────────────────────────────────────────────
def extract_video_id(url: str) -> Optional[str]:
"""Extract video ID from any YouTube URL format"""
patterns = [
r'(?:youtube\.com/watch\?v=|youtu\.be/|youtube\.com/embed/)([^&\n?#]+)',
]
for pattern in patterns:
match = re.search(pattern, url)
if match:
return match.group(1)
return None
def get_channel_videos(channel_url: str, max_videos: int = 50) -> list[dict]:
"""Get all videos from a YouTube channel"""
ydl_opts = {
"quiet": True,
"extract_flat": True,
"playlistend": max_videos
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(channel_url, download=False)
videos = []
for entry in info.get("entries", []):
if entry:
videos.append({
"id": entry.get("id"),
"title": entry.get("title"),
"url": f"https://youtube.com/watch?v={entry.get('id')}",
"duration": entry.get("duration"),
"upload_date": entry.get("upload_date")
})
return videos
def get_transcript(video_id: str) -> list[dict]:
"""Get transcript with timestamps"""
try:
transcript = YouTubeTranscriptApi.get_transcript(video_id, languages=["en", "en-US", "hi"])
return transcript
except Exception:
return []
def chunk_transcript(transcript: list, video_info: dict, chunk_size: int = 60) -> list[dict]:
"""Chunk transcript into ~60 second segments"""
chunks = []
current_chunk = {"text": "", "start": 0, "end": 0}
for segment in transcript:
if current_chunk["text"] == "":
current_chunk["start"] = segment["start"]
current_chunk["text"] += " " + segment["text"]
current_chunk["end"] = segment["start"] + segment.get("duration", 3)
if current_chunk["end"] - current_chunk["start"] >= chunk_size:
chunks.append({
"text": current_chunk["text"].strip(),
"video_id": video_info["id"],
"video_title": video_info["title"],
"video_url": video_info["url"],
"timestamp_start": int(current_chunk["start"]),
"timestamp_url": f"{video_info['url']}&t={int(current_chunk['start'])}s",
"upload_date": video_info.get("upload_date", "")
})
current_chunk = {"text": "", "start": 0, "end": 0}
if current_chunk["text"]:
chunks.append({**current_chunk, "video_id": video_info["id"],
"video_title": video_info["title"], "video_url": video_info["url"],
"timestamp_start": int(current_chunk["start"]),
"timestamp_url": f"{video_info['url']}&t={int(current_chunk['start'])}s",
"upload_date": video_info.get("upload_date", "")})
return chunks
def index_videos(videos: list[dict], progress_callback=None) -> int:
"""Index a list of videos into ChromaDB"""
total_chunks = 0
for i, video in enumerate(videos):
if progress_callback:
progress_callback(i / len(videos), f"Indexing: {video['title'][:50]}...")
# Check if already indexed
existing = collection.get(where={"video_id": video["id"]})
if existing["ids"]:
continue
transcript = get_transcript(video["id"])
if not transcript:
continue
chunks = chunk_transcript(transcript, video)
if chunks:
collection.add(
documents=[c["text"] for c in chunks],
metadatas=[{k: str(v) for k, v in c.items() if k != "text"} for c in chunks],
ids=[f"{video['id']}_{j}" for j in range(len(chunks))]
)
total_chunks += len(chunks)
return total_chunks
def search_and_answer(query: str, n_results: int = 6) -> dict:
"""Search transcripts and generate an answer with timestamps"""
results = collection.query(query_texts=[query], n_results=n_results)
if not results["documents"][0]:
return {"answer": "No relevant content found. Try indexing more videos first.", "sources": []}
# Build context with timestamps
context_parts = []
sources = []
for doc, meta in zip(results["documents"][0], results["metadatas"][0]):
timestamp = int(meta.get("timestamp_start", 0))
mins, secs = divmod(timestamp, 60)
context_parts.append(f"[{meta['video_title']} at {mins}:{secs:02d}]\n{doc}")
sources.append({
"title": meta["video_title"],
"url": meta["timestamp_url"],
"timestamp": f"{mins}:{secs:02d}",
"excerpt": doc[:120] + "..."
})
context = "\n\n---\n\n".join(context_parts)
response = openai_client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": "Answer questions based only on the provided YouTube transcript excerpts. Always mention which video and timestamp the information comes from. Be specific."},
{"role": "user", "content": f"Context from transcripts:\n{context}\n\nQuestion: {query}"}
]
)
return {"answer": response.choices[0].message.content, "sources": sources}
# ── STREAMLIT UI ───────────────────────────────────────────────────
st.set_page_config(page_title="YouTube Research Agent", layout="wide", page_icon="▶️")
st.title("▶️ YouTube Research Agent")
st.caption("Index any YouTube channel — ask questions with timestamp citations")
tab1, tab2, tab3 = st.tabs(["💬 Research", "📥 Index Videos", "📊 Library"])
with tab2:
st.subheader("Index a YouTube Channel or Playlist")
channel_url = st.text_input("Channel or Playlist URL", placeholder="https://www.youtube.com/@channelname")
max_vids = st.slider("Max videos to index", 10, 200, 50)
if st.button("Index Channel", type="primary") and channel_url:
with st.spinner("Getting video list..."):
videos = get_channel_videos(channel_url, max_vids)
st.info(f"Found {len(videos)} videos. Starting indexing...")
progress_bar = st.progress(0)
status_text = st.empty()
def update_progress(pct, msg):
progress_bar.progress(pct)
status_text.text(msg)
chunks = index_videos(videos, update_progress)
st.success(f"✅ Indexed {len(videos)} videos → {chunks} transcript chunks")
with tab3:
st.subheader("Indexed Library")
total = collection.count()
st.metric("Total transcript chunks", total)
if total > 0:
sample = collection.peek(5)
for meta in sample["metadatas"]:
st.caption(f"▶️ {meta.get('video_title','?')} — {meta.get('timestamp_url','')}")
with tab1:
if "yt_messages" not in st.session_state:
st.session_state.yt_messages = []
for msg in st.session_state.yt_messages:
with st.chat_message(msg["role"]):
st.markdown(msg["content"])
for src in msg.get("sources", []):
st.markdown(f"▶️ [{src['title']} @ {src['timestamp']}]({src['url']})")
if prompt := st.chat_input("Ask anything about the indexed videos..."):
st.session_state.yt_messages.append({"role": "user", "content": prompt})
with st.chat_message("user"):
st.markdown(prompt)
with st.chat_message("assistant"):
with st.spinner("Searching transcripts..."):
result = search_and_answer(prompt)
st.markdown(result["answer"])
for src in result["sources"][:3]:
st.markdown(f"▶️ [{src['title']} @ {src['timestamp']}]({src['url']})")
st.session_state.yt_messages.append({
"role": "assistant", "content": result["answer"], "sources": result["sources"]
})