Learn 🧠 All Concepts (20) 🤖 What is an LLM? 📚 RAG Explained ⚡ AI Agents 💻 Run AI Locally 🇮🇳 AI in India 📖 Learn Tracks 🔧 DevOps Track ⚙️ AI Ops Track 🗺️ AI Engineer Roadmap
Tools 🔧 AI Tools Directory 🔓 Open Source AI ⭐ Top GitHub Repos ✦ Claude Skill Repos 🚀 Ready-to-Deploy Projects
Build 🏗️ Build Hub 🎯 Master Prompts 🧩 RAG Agents 🚀 App Megaprompts
Workflows ⚡ All Workflows (22) 🎥 Text to Video 🎞️ Image to Video 🔊 Text to Speech ♻️ Automation
Resources 🧪 Colab Notebooks ⚙️ n8n Workflows 📈 Algo Trading 💰 Passive Income
🗂️ Browse All Topics About AItheGuru
← RAG agents
▶️ RAG Agent · Research

YouTube Channel Research Agent

Index any YouTube channel or playlist. Ask "What topics did this creator cover in 2024?" or "Find all videos where they mention X tool."

Research Beginner Streamlit Cloud (free)Local

Quick info

CategoryResearch
DifficultyBeginner
Deploy onStreamlit Cloud (free)

Get the code

Includes install commands in comments

What it does

Indexes transcripts from any public channel
Timestamp-linked answers
Topic clustering
No API key for YouTube

Stack

YouTube Transcript APIOpenAIChromaDBStreamlit

Deploy on

✓ Streamlit Cloud (free)✓ Local

Full source code

Install commands are in the top comments. Copy and run.

# YouTube Channel Research RAG Agent # Indexes video transcripts — ask questions across hundreds of videos # pip install youtube-transcript-api yt-dlp openai chromadb streamlit langchain-openai import streamlit as st from youtube_transcript_api import YouTubeTranscriptApi from urllib.parse import urlparse, parse_qs import yt_dlp import chromadb from chromadb.utils import embedding_functions from openai import OpenAI import re, json from typing import Optional # ── SETUP ───────────────────────────────────────────────────────── import os OPENAI_KEY = os.environ.get("OPENAI_API_KEY") openai_client = OpenAI(api_key=OPENAI_KEY) chroma_client = chromadb.PersistentClient(path="./youtube_chroma") openai_ef = embedding_functions.OpenAIEmbeddingFunction( api_key=OPENAI_KEY, model_name="text-embedding-3-small" ) collection = chroma_client.get_or_create_collection( "youtube_transcripts", embedding_function=openai_ef ) # ── YOUTUBE HELPERS ──────────────────────────────────────────────── def extract_video_id(url: str) -> Optional[str]: """Extract video ID from any YouTube URL format""" patterns = [ r'(?:youtube\.com/watch\?v=|youtu\.be/|youtube\.com/embed/)([^&\n?#]+)', ] for pattern in patterns: match = re.search(pattern, url) if match: return match.group(1) return None def get_channel_videos(channel_url: str, max_videos: int = 50) -> list[dict]: """Get all videos from a YouTube channel""" ydl_opts = { "quiet": True, "extract_flat": True, "playlistend": max_videos } with yt_dlp.YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(channel_url, download=False) videos = [] for entry in info.get("entries", []): if entry: videos.append({ "id": entry.get("id"), "title": entry.get("title"), "url": f"https://youtube.com/watch?v={entry.get('id')}", "duration": entry.get("duration"), "upload_date": entry.get("upload_date") }) return videos def get_transcript(video_id: str) -> list[dict]: """Get transcript with timestamps""" try: transcript = YouTubeTranscriptApi.get_transcript(video_id, languages=["en", "en-US", "hi"]) return transcript except Exception: return [] def chunk_transcript(transcript: list, video_info: dict, chunk_size: int = 60) -> list[dict]: """Chunk transcript into ~60 second segments""" chunks = [] current_chunk = {"text": "", "start": 0, "end": 0} for segment in transcript: if current_chunk["text"] == "": current_chunk["start"] = segment["start"] current_chunk["text"] += " " + segment["text"] current_chunk["end"] = segment["start"] + segment.get("duration", 3) if current_chunk["end"] - current_chunk["start"] >= chunk_size: chunks.append({ "text": current_chunk["text"].strip(), "video_id": video_info["id"], "video_title": video_info["title"], "video_url": video_info["url"], "timestamp_start": int(current_chunk["start"]), "timestamp_url": f"{video_info['url']}&t={int(current_chunk['start'])}s", "upload_date": video_info.get("upload_date", "") }) current_chunk = {"text": "", "start": 0, "end": 0} if current_chunk["text"]: chunks.append({**current_chunk, "video_id": video_info["id"], "video_title": video_info["title"], "video_url": video_info["url"], "timestamp_start": int(current_chunk["start"]), "timestamp_url": f"{video_info['url']}&t={int(current_chunk['start'])}s", "upload_date": video_info.get("upload_date", "")}) return chunks def index_videos(videos: list[dict], progress_callback=None) -> int: """Index a list of videos into ChromaDB""" total_chunks = 0 for i, video in enumerate(videos): if progress_callback: progress_callback(i / len(videos), f"Indexing: {video['title'][:50]}...") # Check if already indexed existing = collection.get(where={"video_id": video["id"]}) if existing["ids"]: continue transcript = get_transcript(video["id"]) if not transcript: continue chunks = chunk_transcript(transcript, video) if chunks: collection.add( documents=[c["text"] for c in chunks], metadatas=[{k: str(v) for k, v in c.items() if k != "text"} for c in chunks], ids=[f"{video['id']}_{j}" for j in range(len(chunks))] ) total_chunks += len(chunks) return total_chunks def search_and_answer(query: str, n_results: int = 6) -> dict: """Search transcripts and generate an answer with timestamps""" results = collection.query(query_texts=[query], n_results=n_results) if not results["documents"][0]: return {"answer": "No relevant content found. Try indexing more videos first.", "sources": []} # Build context with timestamps context_parts = [] sources = [] for doc, meta in zip(results["documents"][0], results["metadatas"][0]): timestamp = int(meta.get("timestamp_start", 0)) mins, secs = divmod(timestamp, 60) context_parts.append(f"[{meta['video_title']} at {mins}:{secs:02d}]\n{doc}") sources.append({ "title": meta["video_title"], "url": meta["timestamp_url"], "timestamp": f"{mins}:{secs:02d}", "excerpt": doc[:120] + "..." }) context = "\n\n---\n\n".join(context_parts) response = openai_client.chat.completions.create( model="gpt-4o-mini", messages=[ {"role": "system", "content": "Answer questions based only on the provided YouTube transcript excerpts. Always mention which video and timestamp the information comes from. Be specific."}, {"role": "user", "content": f"Context from transcripts:\n{context}\n\nQuestion: {query}"} ] ) return {"answer": response.choices[0].message.content, "sources": sources} # ── STREAMLIT UI ─────────────────────────────────────────────────── st.set_page_config(page_title="YouTube Research Agent", layout="wide", page_icon="▶️") st.title("▶️ YouTube Research Agent") st.caption("Index any YouTube channel — ask questions with timestamp citations") tab1, tab2, tab3 = st.tabs(["💬 Research", "📥 Index Videos", "📊 Library"]) with tab2: st.subheader("Index a YouTube Channel or Playlist") channel_url = st.text_input("Channel or Playlist URL", placeholder="https://www.youtube.com/@channelname") max_vids = st.slider("Max videos to index", 10, 200, 50) if st.button("Index Channel", type="primary") and channel_url: with st.spinner("Getting video list..."): videos = get_channel_videos(channel_url, max_vids) st.info(f"Found {len(videos)} videos. Starting indexing...") progress_bar = st.progress(0) status_text = st.empty() def update_progress(pct, msg): progress_bar.progress(pct) status_text.text(msg) chunks = index_videos(videos, update_progress) st.success(f"✅ Indexed {len(videos)} videos → {chunks} transcript chunks") with tab3: st.subheader("Indexed Library") total = collection.count() st.metric("Total transcript chunks", total) if total > 0: sample = collection.peek(5) for meta in sample["metadatas"]: st.caption(f"▶️ {meta.get('video_title','?')} — {meta.get('timestamp_url','')}") with tab1: if "yt_messages" not in st.session_state: st.session_state.yt_messages = [] for msg in st.session_state.yt_messages: with st.chat_message(msg["role"]): st.markdown(msg["content"]) for src in msg.get("sources", []): st.markdown(f"▶️ [{src['title']} @ {src['timestamp']}]({src['url']})") if prompt := st.chat_input("Ask anything about the indexed videos..."): st.session_state.yt_messages.append({"role": "user", "content": prompt}) with st.chat_message("user"): st.markdown(prompt) with st.chat_message("assistant"): with st.spinner("Searching transcripts..."): result = search_and_answer(prompt) st.markdown(result["answer"]) for src in result["sources"][:3]: st.markdown(f"▶️ [{src['title']} @ {src['timestamp']}]({src['url']})") st.session_state.yt_messages.append({ "role": "assistant", "content": result["answer"], "sources": result["sources"] })