What it does
OAuth Gmail connection
Incremental sync
Thread-aware answers
Privacy-first local mode
Stack
Gmail APIOpenAIChromaDBFastAPIReact
Deploy on
✓ Local✓ Any VPS
Full source code
Install commands are in the top comments. Copy and run.
# Email Inbox RAG Agent
# Indexes your Gmail and lets you query it with AI
# pip install google-auth google-auth-oauthlib google-auth-httplib2
# google-api-python-client openai chromadb fastapi uvicorn
import os, json, base64, email
from datetime import datetime, timedelta
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
import chromadb
from chromadb.utils.embedding_functions import OpenAIEmbeddingFunction
from openai import OpenAI
SCOPES = ["https://www.googleapis.com/auth/gmail.readonly"]
OPENAI_KEY = os.environ.get("OPENAI_API_KEY")
client = OpenAI(api_key=OPENAI_KEY)
# ── GMAIL AUTH ────────────────────────────────────────────────────
def authenticate_gmail():
"""Authenticate with Gmail OAuth2"""
creds = None
if os.path.exists("token.json"):
creds = Credentials.from_authorized_user_file("token.json", SCOPES)
if not creds or not creds.valid:
# Need credentials.json from Google Cloud Console
flow = InstalledAppFlow.from_client_secrets_file("credentials.json", SCOPES)
creds = flow.run_local_server(port=0)
with open("token.json", "w") as f:
f.write(creds.to_json())
return build("gmail", "v1", credentials=creds)
# ── EMAIL FETCHING ────────────────────────────────────────────────
def fetch_emails(service, days_back: int = 30, max_results: int = 500) -> list[dict]:
"""Fetch recent emails from Gmail"""
after_date = (datetime.now() - timedelta(days=days_back)).strftime("%Y/%m/%d")
results = service.users().messages().list(
userId="me",
q=f"after:{after_date}",
maxResults=max_results
).execute()
messages = results.get("messages", [])
emails = []
print(f"Fetching {len(messages)} emails...")
for i, msg_ref in enumerate(messages):
if i % 50 == 0:
print(f" Progress: {i}/{len(messages)}")
msg = service.users().messages().get(
userId="me",
id=msg_ref["id"],
format="full"
).execute()
# Extract headers
headers = {h["name"]: h["value"] for h in msg["payload"]["headers"]}
# Extract body text
body = extract_body(msg["payload"])
if len(body) < 20: # Skip very short emails
continue
emails.append({
"id": msg["id"],
"thread_id": msg["threadId"],
"subject": headers.get("Subject", "(no subject)"),
"from": headers.get("From", ""),
"to": headers.get("To", ""),
"date": headers.get("Date", ""),
"body": body[:3000], # Cap at 3000 chars per email
"labels": msg.get("labelIds", [])
})
return emails
def extract_body(payload) -> str:
"""Recursively extract text body from email payload"""
if payload.get("body", {}).get("data"):
data = payload["body"]["data"]
return base64.urlsafe_b64decode(data + "==").decode("utf-8", errors="ignore")
text_parts = []
for part in payload.get("parts", []):
if part.get("mimeType") == "text/plain":
data = part.get("body", {}).get("data", "")
if data:
text_parts.append(base64.urlsafe_b64decode(data + "==").decode("utf-8", errors="ignore"))
elif part.get("parts"):
text_parts.append(extract_body(part))
return "\n".join(text_parts)
# ── INDEX EMAILS ──────────────────────────────────────────────────
def index_emails(emails: list[dict]) -> int:
"""Index emails into ChromaDB"""
chroma = chromadb.PersistentClient(path="./email_chroma")
ef = OpenAIEmbeddingFunction(api_key=OPENAI_KEY, model_name="text-embedding-3-small")
collection = chroma.get_or_create_collection("emails", embedding_function=ef)
# Only add new emails
existing_ids = set(collection.get()["ids"])
new_emails = [e for e in emails if e["id"] not in existing_ids]
if not new_emails:
print("No new emails to index.")
return 0
texts = [f"From: {e['from']}\nSubject: {e['subject']}\nDate: {e['date']}\n\n{e['body']}"
for e in new_emails]
metadatas = [{k: v for k, v in e.items() if k != "body"} for e in new_emails]
ids = [e["id"] for e in new_emails]
# Batch insert
batch_size = 100
for i in range(0, len(texts), batch_size):
collection.add(
documents=texts[i:i+batch_size],
metadatas=metadatas[i:i+batch_size],
ids=ids[i:i+batch_size]
)
print(f"✅ Indexed {len(new_emails)} new emails")
return len(new_emails)
# ── QUERY EMAILS ──────────────────────────────────────────────────
def query_emails(question: str, n_results: int = 8) -> dict:
"""Search emails and answer with context"""
chroma = chromadb.PersistentClient(path="./email_chroma")
ef = OpenAIEmbeddingFunction(api_key=OPENAI_KEY, model_name="text-embedding-3-small")
collection = chroma.get_or_create_collection("emails", embedding_function=ef)
results = collection.query(query_texts=[question], n_results=n_results)
if not results["documents"][0]:
return {"answer": "No relevant emails found.", "sources": []}
# Build context
context = ""
sources = []
for doc, meta in zip(results["documents"][0], results["metadatas"][0]):
context += f"\n---\nFrom: {meta.get('from','')}\nSubject: {meta.get('subject','')}\nDate: {meta.get('date','')}\n{doc[:500]}\n"
sources.append({
"subject": meta.get("subject", ""),
"from": meta.get("from", ""),
"date": meta.get("date", ""),
"gmail_url": f"https://mail.google.com/mail/u/0/#inbox/{meta.get('id','')}"
})
response = client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": "You are an email assistant. Answer questions about emails based only on the provided email context. Be specific — quote from emails when relevant. Always mention who sent the email and when."},
{"role": "user", "content": f"Email context:\n{context}\n\nQuestion: {question}"}
]
)
return {"answer": response.choices[0].message.content, "sources": sources}
# ── MAIN ──────────────────────────────────────────────────────────
if __name__ == "__main__":
print("📧 Email RAG Agent")
print("1. Authenticating with Gmail...")
service = authenticate_gmail()
print("2. Fetching emails from last 30 days...")
emails = fetch_emails(service, days_back=30)
print(f" Found {len(emails)} emails")
print("3. Indexing emails...")
count = index_emails(emails)
print("\n✅ Ready! Ask questions about your emails:")
while True:
question = input("\n> ")
if question.lower() in ["exit", "quit"]:
break
result = query_emails(question)
print(f"\n{result['answer']}")
print("\nSources:")
for s in result["sources"][:3]:
print(f" 📧 {s['subject']} — from {s['from']} on {s['date'][:16]}")