Back to Home
Productivity Agents

How to Build an AI Email Assistant That Actually Works

Diego Herrera

Creative technologist writing about AI agents in design and content.

April 14, 202617 min read

**Your inbox is a decision engine disguised as a list.** Every email demands a micro-judgment: respond now, respond later, delegate, archive, or ignore. An AI email agent doesn't eliminate those decis...

Building an AI Email Management Agent: A Practical Guide

Your inbox is a decision engine disguised as a list. Every email demands a micro-judgment: respond now, respond later, delegate, archive, or ignore. An AI email agent doesn't eliminate those decisions — it batches and accelerates them.

This tutorial walks through building a production-grade email management agent that triages incoming mail, generates contextual drafts, tracks follow-ups, and integrates with both Gmail and Microsoft Graph APIs. We'll use Python, LangChain for orchestration, and real OAuth2 flows — not toy examples with hardcoded data.

Architecture Overview

Before writing code, understand what we're building:

┌─────────────────────────────────────────────────────────┐
│                    Email Agent System                     │
│                                                          │
│  ┌──────────┐   ┌──────────┐   ┌───────────────────┐   │
│  │  Gmail    │──▶│          │   │  Draft Generator   │   │
│  │  Watcher  │   │  Triage  │──▶│  (LLM-powered)     │   │
│  └──────────┘   │  Engine   │   └───────────────────┘   │
│  ┌──────────┐   │          │   ┌───────────────────┐   │
│  │  Outlook  │──▶│          │──▶│  Follow-up Tracker │   │
│  │  Poller   │   └──────────┘   │  (SQLite + Cron)   │   │
│  └──────────┘                   └───────────────────┘   │
│                                                          │
│  ┌──────────────────────────────────────────────────┐   │
│  │              Privacy & Storage Layer               │   │
│  └──────────────────────────────────────────────────┘   │
└─────────────────────────────────────────────────────────┘

The agent has four core modules. Each runs independently, communicates through a shared state database, and can be deployed as separate services or a single process.

Prerequisites

pip install langchain langchain-openai google-api-python-client \
  google-auth-oauthlib msal sqlalchemy pydantic python-dateutil \
  ollama  # optional, for local LLM inference

You'll also need:

  • A Google Cloud project with Gmail API enabled and OAuth2 credentials
  • An Azure AD app registration with Mail.ReadWrite and Mail.Send permissions
  • An OpenAI API key (or a local Ollama instance running llama3.1 or mistral)

Part 1: Email Provider Integration

Gmail Integration

Gmail's API is powerful but verbose. We'll wrap it in a clean interface.

import base64
import email
from email.mime.text import MIMEText
from typing import Optional
from dataclasses import dataclass, field
from datetime import datetime

from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
from google.auth.transport.requests import Request
import os
import pickle

SCOPES = [
    'https://www.googleapis.com/auth/gmail.readonly',
    'https://www.googleapis.com/auth/gmail.send',
    'https://www.googleapis.com/auth/gmail.modify',
]

@dataclass
class EmailMessage:
    id: str
    thread_id: str
    subject: str
    sender: str
    to: str
    date: datetime
    body: str
    snippet: str
    labels: list[str] = field(default_factory=list)
    is_read: bool = True
    provider: str = "gmail"

class GmailClient:
    def __init__(self, credentials_path: str, token_path: str = "token.pickle"):
        self.credentials_path = credentials_path
        self.token_path = token_path
        self.service = self._authenticate()

    def _authenticate(self):
        creds = None
        if os.path.exists(self.token_path):
            with open(self.token_path, 'rb') as token:
                creds = pickle.load(token)

        if not creds or not creds.valid:
            if creds and creds.expired and creds.refresh_token:
                creds.refresh(Request())
            else:
                flow = InstalledAppFlow.from_client_secrets_file(
                    self.credentials_path, SCOPES
                )
                creds = flow.run_local_server(port=0)

            with open(self.token_path, 'wb') as token:
                pickle.dump(creds, token)

        return build('gmail', 'v1', credentials=creds)

    def fetch_recent_emails(self, max_results: int = 20,
                            query: str = "is:unread") -> list[EmailMessage]:
        results = self.service.users().messages().list(
            userId='me', maxResults=max_results, q=query
        ).execute()

        messages = results.get('messages', [])
        emails = []

        for msg in messages:
            full_msg = self.service.users().messages().get(
                userId='me', id=msg['id'], format='full'
            ).execute()
            emails.append(self._parse_message(full_msg))

        return emails

    def _parse_message(self, raw_message: dict) -> EmailMessage:
        headers = {h['name']: h['value']
                   for h in raw_message['payload']['headers']}

        body = self._extract_body(raw_message['payload'])
        date_str = headers.get('Date', '')

        return EmailMessage(
            id=raw_message['id'],
            thread_id=raw_message['threadId'],
            subject=headers.get('Subject', '(no subject)'),
            sender=headers.get('From', ''),
            to=headers.get('To', ''),
            date=self._parse_date(date_str),
            body=body,
            snippet=raw_message.get('snippet', ''),
            labels=raw_message.get('labelIds', []),
            is_read='UNREAD' not in raw_message.get('labelIds', []),
            provider="gmail"
        )

    def _extract_body(self, payload: dict) -> str:
        """Recursively extract text body from MIME parts."""
        if payload.get('mimeType') == 'text/plain' and payload.get('body', {}).get('data'):
            return base64.urlsafe_b64decode(
                payload['body']['data']
            ).decode('utf-8', errors='replace')

        if 'parts' in payload:
            for part in payload['parts']:
                result = self._extract_body(part)
                if result:
                    return result

        # Fallback: try text/html
        if payload.get('mimeType') == 'text/html' and payload.get('body', {}).get('data'):
            from html.parser import HTMLParser
            class TextExtractor(HTMLParser):
                def __init__(self):
                    super().__init__()
                    self.text = []
                def handle_data(self, data):
                    self.text.append(data)

            html = base64.urlsafe_b64decode(payload['body']['data']).decode('utf-8', errors='replace')
            extractor = TextExtractor()
            extractor.feed(html)
            return ' '.join(extractor.text)

        return ""

    def send_reply(self, thread_id: str, to: str, subject: str, body: str):
        message = MIMEText(body)
        message['to'] = to
        message['subject'] = subject

        raw = base64.urlsafe_b64encode(message.as_bytes()).decode()
        self.service.users().messages().send(
            userId='me',
            body={'raw': raw, 'threadId': thread_id}
        ).execute()

    def add_label(self, message_id: str, label_name: str):
        # Get or create label
        labels = self.service.users().labels().list(userId='me').execute()
        label_id = None
        for label in labels.get('labels', []):
            if label['name'] == label_name:
                label_id = label['id']
                break

        if not label_id:
            label = self.service.users().labels().create(
                userId='me',
                body={'name': label_name, 'labelListVisibility': 'labelShow'}
            ).execute()
            label_id = label['id']

        self.service.users().messages().modify(
            userId='me', id=message_id,
            body={'addLabelIds': [label_id]}
        ).execute()

    def mark_as_read(self, message_id: str):
        self.service.users().messages().modify(
            userId='me', id=message_id,
            body={'removeLabelIds': ['UNREAD']}
        ).execute()

    @staticmethod
    def _parse_date(date_str: str) -> datetime:
        from email.utils import parsedate_to_datetime
        try:
            return parsedate_to_datetime(date_str)
        except (ValueError, TypeError):
            return datetime.now()

Outlook/Microsoft Graph Integration

Microsoft Graph uses a different auth model (MSAL) and REST patterns:

import msal
import requests
from datetime import datetime, timedelta, timezone

class OutlookClient:
    GRAPH_ENDPOINT = "https://graph.microsoft.com/v1.0"

    def __init__(self, client_id: str, tenant_id: str,
                 client_secret: str, scopes: list[str] = None):
        self.client_id = client_id
        self.tenant_id = tenant_id
        self.client_secret = client_secret
        self.scopes = scopes or [
            'Mail.Read', 'Mail.Send', 'Mail.ReadWrite'
        ]
        self.access_token = self._acquire_token()

    def _acquire_token(self) -> str:
        app = msal.ConfidentialClientApplication(
            self.client_id,
            authority=f"https://login.microsoftonline.com/{self.tenant_id}",
            client_credential=self.client_secret,
        )

        result = app.acquire_token_silent(self.scopes, account=None)
        if not result:
            result = app.acquire_token_for_client(scopes=self.scopes)

        if "access_token" not in result:
            raise RuntimeError(
                f"Token acquisition failed: {result.get('error_description')}"
            )
        return result["access_token"]

    def fetch_recent_emails(self, max_results: int = 20,
                            filter_unread: bool = True) -> list[EmailMessage]:
        headers = {'Authorization': f'Bearer {self.access_token}'}
        params = {
            '$top': max_results,
            '$orderby': 'receivedDateTime desc',
            '$select': 'id,conversationId,subject,from,toRecipients,'
                       'receivedDateTime,body,bodyPreview,isRead',
        }
        if filter_unread:
            params['$filter'] = 'isRead eq false'

        response = requests.get(
            f"{self.GRAPH_ENDPOINT}/me/messages",
            headers=headers, params=params
        )
        response.raise_for_status()

        return [self._parse_message(msg)
                for msg in response.json().get('value', [])]

    def _parse_message(self, msg: dict) -> EmailMessage:
        sender = msg.get('from', {}).get('emailAddress', {})
        to_list = msg.get('toRecipients', [])
        to_str = ', '.join(
            r.get('emailAddress', {}).get('address', '')
            for r in to_list
        )

        body_content = msg.get('body', {}).get('content', '')
        # Graph returns HTML by default; strip tags for plain text
        if msg.get('body', {}).get('contentType') == 'html':
            from html.parser import HTMLParser
            class S(HTMLParser):
                def __init__(self):
                    super().__init__()
                    self.t = []
                def handle_data(self, d):
                    self.t.append(d)
            s = S()
            s.feed(body_content)
            body_content = ' '.join(s.t)

        return EmailMessage(
            id=msg['id'],
            thread_id=msg.get('conversationId', ''),
            subject=msg.get('subject', '(no subject)'),
            sender=sender.get('address', ''),
            to=to_str,
            date=datetime.fromisoformat(
                msg['receivedDateTime'].replace('Z', '+00:00')
            ),
            body=body_content,
            snippet=msg.get('bodyPreview', ''),
            is_read=msg.get('isRead', True),
            provider="outlook"
        )

    def send_reply(self, message_id: str, body: str):
        headers = {
            'Authorization': f'Bearer {self.access_token}',
            'Content-Type': 'application/json'
        }
        payload = {
            "comment": body
        }
        response = requests.post(
            f"{self.GRAPH_ENDPOINT}/me/messages/{message_id}/reply",
            headers=headers, json=payload
        )
        response.raise_for_status()

    def mark_as_read(self, message_id: str):
        headers = {
            'Authorization': f'Bearer {self.access_token}',
            'Content-Type': 'application/json'
        }
        requests.patch(
            f"{self.GRAPH_ENDPOINT}/me/messages/{message_id}",
            headers=headers,
            json={"isRead": True}
        ).raise_for_status()

Key difference: Gmail uses label-based organization; Outlook uses folders and categories. The EmailMessage dataclass normalizes this.

Part 2: The Triage Engine

Triage is where the agent earns its keep. We classify each email into categories and assign priority using an LLM, but with structured output to keep results parseable.

from pydantic import BaseModel, Field
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import PydanticOutputParser
from enum import Enum

class EmailCategory(str, Enum):
    ACTION_REQUIRED = "action_required"
    FYI = "fyi"
    MEETING = "meeting"
    NEWSLETTER = "newsletter"
    PROMOTIONAL = "promotional"
    SPAM = "spam"
    FOLLOW_UP_NEEDED = "follow_up_needed"
    PERSONAL = "personal"

class Priority(str, Enum):
    URGENT = "urgent"       # respond within hours
    HIGH = "high"           # respond today
    MEDIUM = "medium"       # respond this week
    LOW = "low"             # when convenient
    NONE = "none"           # no response needed

class TriageResult(BaseModel):
    category: EmailCategory = Field(description="Email category")
    priority: Priority = Field(description="Priority level")
    reasoning: str = Field(description="Brief explanation of classification")
    suggested_action: str = Field(description="What the user should do")
    estimated_response_time: str = Field(
        description="Estimated time to draft a response, e.g., '2 minutes' or '15 minutes'"
    )
    requires_context: bool = Field(
        description="Whether responding requires information the agent doesn't have"
    )

class TriageEngine:
    def __init__(self, model_name: str = "gpt-4o-mini",
                 user_context: str = ""):
        self.llm = ChatOpenAI(model=model_name, temperature=0)
        self.parser = PydanticOutputParser(pydantic_object=TriageResult)
        self.user_context = user_context

        self.prompt = ChatPromptTemplate.from_messages([
            ("system", """You are an email triage assistant. Classify emails
accurately and conservatively. When in doubt, mark as higher priority.

User context: {user_context}

{format_instructions}

Rules:
- "action_required" means the sender explicitly needs something from the user
- "follow_up_needed" means the user previously asked something and this is a reply
- Mark emails from known contacts higher priority than unknowns
- Meeting invites should include the time sensitivity in suggested_action
- If an email looks like it requires domain expertise you lack, set requires_context=true"""),
            ("human", """Subject: {subject}
From: {sender}
To: {to}
Date: {date}

Body:
{body}""")
        ])

    def triage(self, email_msg: EmailMessage) -> TriageResult:
        chain = self.prompt | self.llm | self.parser

        result = chain.invoke({
            "subject": email_msg.subject,
            "sender": email_msg.sender,
            "to": email_msg.to,
            "date": email_msg.date.isoformat(),
            "body": email_msg.body[:3000],  # truncate for token limits
            "user_context": self.user_context,
            "format_instructions": self.parser.get_format_instructions(),
        })
        return result

Batch Triage with Cost Control

Calling the LLM per email gets expensive. Here's a batching strategy:

import asyncio
from collections import defaultdict

class BatchTriageEngine(TriageEngine):
    async def triage_batch(self, emails: list[EmailMessage],
                           batch_size: int = 5) -> dict[str, TriageResult]:
        """Process emails in batches to control rate limits and costs."""
        results = {}

        for i in range(0, len(emails), batch_size):
            batch = emails[i:i + batch_size]
            tasks = [self._triage_async(e) for e in batch]
            batch_results = await asyncio.gather(*tasks, return_exceptions=True)

            for email_msg, result in zip(batch, batch_results):
                if isinstance(result, Exception):
                    # Default to medium priority on failure
                    results[email_msg.id] = TriageResult(
                        category=EmailCategory.FYI,
                        priority=Priority.MEDIUM,
                        reasoning=f"Triage failed: {result}",
                        suggested_action="Review manually",
                        estimated_response_time="unknown",
                        requires_context=True,
                    )
                else:
                    results[email_msg.id] = result

            # Rate limit courtesy pause
            if i + batch_size < len(emails):
                await asyncio.sleep(1)

        return results

    async def _triage_async(self, email_msg: EmailMessage) -> TriageResult:
        loop = asyncio.get_event_loop()
        return await loop.run_in_executor(None, self.triage, email_msg)

Real-world cost note: At GPT-4o-mini pricing (~$0.15/1M input tokens), triaging 100 emails costs roughly $0.02-0.05. Using gpt-4o for the same batch runs $0.50-2.00. For most users, gpt-4o-mini is more than sufficient for classification. Reserve larger models for draft generation.

Part 3: Draft Generation

Draft generation is the most nuanced part. A bad auto-draft is worse than no draft at all. The key is giving the LLM enough context about the user's communication style and the email thread.

class DraftGenerator:
    def __init__(self, model_name: str = "gpt-4o",
                 user_name: str = "",
                 user_role: str = "",
                 communication_style: str = "professional but friendly"):
        self.llm = ChatOpenAI(model=model_name, temperature=0.3)
        self.user_name = user_name
        self.user_role = user_role
        self.communication_style = communication_style

        self.prompt = ChatPromptTemplate.from_messages([
            ("system", """You draft email replies. Match the user's communication style.

User profile:
- Name: {user_name}
- Role: {user_role}
- Style: {communication_style}

Rules:
- Keep replies concise. Most emails should get 2-5 sentence replies.
- Match the formality level of the sender.
- If the email requires information you don't have, insert [NEEDS INFO: describe what's needed] placeholders.
- Don't commit to deadlines, meetings, or promises unless explicitly instructed.
- If the email is a meeting invite, draft an acceptance or decline based on context.
- Never fabricate facts, dates, or commitments.
- Include a greeting and sign-off appropriate to the relationship."""),
            ("human", """Draft a reply to this email:

From: {sender}
Subject: {subject}
Date: {date}

Their email:
{body}

{additional_context}""")
        ])

    def generate_draft(self, email_msg: EmailMessage,
                       triage: TriageResult,
                       thread_history: list[str] = None) -> str:
        additional_parts = []

        if triage.category == EmailCategory.MEETING:
            additional_parts.append(
                "This is a meeting invite. Draft a brief acceptance or "
                "request to reschedule if timing seems tight."
            )
        elif triage.requires_context:
            additional_parts.append(
                "This email likely requires domain knowledge. Draft a "
                "holding response acknowledging receipt and asking for "
                "time to look into it."
            )
        elif triage.category == EmailCategory.FOLLOW_UP_NEEDED:
            additional_parts.append(
                "This is a follow-up to a previous conversation. "
                "The sender is likely waiting for an update."
            )

        if thread_history:
            additional_parts.append(
                "Previous messages in this thread:\n" +
                "\n---\n".join(thread_history[-3:])  # last 3 messages
            )

        chain = self.prompt | self.llm
        result = chain.invoke({
            "user_name": self.user_name,
            "user_role": self.user_role,
            "communication_style": self.communication_style,
            "sender": email_msg.sender,
            "subject": email_msg.subject,
            "date": email_msg.date.isoformat(),
            "body": email_msg.body[:4000],
            "additional_context": "\n".join(additional_parts),
        })

        return result.content

Thread Context Retrieval

For replies, thread history matters enormously. Here's how to fetch it:

class ThreadManager:
    def __init__(self, gmail_client: GmailClient = None,
                 outlook_client: OutlookClient = None):
        self.gmail = gmail_client
        self.outlook = outlook_client

    def get_gmail_thread(self, thread_id: str) -> list[EmailMessage]:
        if not self.gmail:
            return []
        thread = self.gmail.service.users().threads().get(
            userId='me', id=thread_id
        ).execute()
        return [
            self.gmail._parse_message(msg)
            for msg in thread.get('messages', [])
        ]

    def get_outlook_thread(self, conversation_id: str) -> list[EmailMessage]:
        if not self.outlook:
            return []
        headers = {'Authorization': f'Bearer {self.outlook.access_token}'}
        response = requests.get(
            f"{self.outlook.GRAPH_ENDPOINT}/me/messages",
            headers=headers,
            params={
                '$filter': f"conversationId eq '{conversation_id}'",
                '$orderby': 'receivedDateTime asc',
                '$select': 'id,conversationId,subject,from,toRecipients,'
                           'receivedDateTime,body,bodyPreview,isRead',
            }
        )
        response.raise_for_status()
        return [
            self.outlook._parse_message(msg)
            for msg in response.json().get('value', [])
        ]

Part 4: Follow-Up Tracking

Follow-ups are where most email systems fail. The agent needs a persistent store to track what's pending, who owes whom, and when to remind.

from sqlalchemy import create_engine, Column, String, DateTime, Boolean, Text, Enum as SAEnum
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from datetime import datetime, timedelta

Base = declarative_base()

class FollowUp(Base):
    __tablename__ = "follow_ups"

    id = Column(String, primary_key=True)
    email_id = Column(String, nullable=False)
    thread_id = Column(String)
    provider = Column(String, default="gmail")
    subject = Column(String)
    contact_email = Column(String, nullable=False)
    direction = Column(String)  # "inbound" or "outbound"
    status = Column(String, default="pending")  # pending, reminded, resolved, snoozed
    created_at = Column(DateTime, default=datetime.utcnow)
    remind_at = Column(DateTime, nullable=False)
    last_reminded_at = Column(DateTime, nullable=True)
    notes = Column(Text, default="")
    draft_sent = Column(Boolean, default=False)

class FollowUpTracker:
    def __init__(self, db_path: str = "follow_ups.db"):
        self.engine = create_engine(f"sqlite:///{db_path}")
        Base.metadata.create_all(self.engine)
        self.Session = sessionmaker(bind=self.engine)

    def add_follow_up(self, email_msg: EmailMessage,
                      triage: TriageResult,
                      remind_in_days: int = 3) -> FollowUp:
        session = self.Session()
        try:
            follow_up = FollowUp(
                id=f"fu_{email_msg.id}_{int(datetime.utcnow().timestamp())}",
                email_id=email_msg.id,
                thread_id=email_msg.thread_id,
                provider=email_msg.provider,
                subject=email_msg.subject,
                contact_email=email_msg.sender,
                direction="inbound",
                status="pending",
                remind_at=datetime.utcnow() + timedelta(days=remind_in_days),
                notes=f"Category: {triage.category.value}, "
                      f"Priority: {triage.priority.value}",
            )
            session.add(follow_up)
            session.commit()
            return follow_up
        finally:
            session.close()

    def add_outbound_follow_up(self, email_msg: EmailMessage,
                                expected_response_days: int = 5) -> FollowUp:
        """Track emails we sent that we're waiting on a reply for."""
        session = self.Session()
        try:
            follow_up = FollowUp(
                id=f"fu_out_{email_msg.id}_{int(datetime.utcnow().timestamp())}",
                email_id=email_msg.id,
                thread_id=email_msg.thread_id,
                provider=email_msg.provider,
                subject=email_msg.subject,
                contact_email=email_msg.to.split(',')[0].strip(),
                direction="outbound",
                status="pending",
                remind_at=datetime.utcnow() + timedelta(days=expected_response_days),
            )
            session.add(follow_up)
            session.commit()
            return follow_up
        finally:
            session.close()

    def resolve_if_replied(self, incoming_emails: list[EmailMessage]):
        """Check if any pending outbound follow-ups have been answered."""
        session = self.Session()
        try:
            pending = session.query(FollowUp).filter(
                FollowUp.direction == "outbound",
                FollowUp.status == "pending"
            ).all()

            for fu in pending:
                for email_msg in incoming_emails:
                    if (fu.contact_email.lower() in email_msg.sender.lower() and
                            fu.thread_id == email_msg.thread_id):
                        fu.status = "resolved"
                        break

            session.commit()
        finally:
            session.close()

    def get_due_reminders(self) -> list[FollowUp]:
        session = self.Session()
        try:
            return session.query(FollowUp).filter(
                FollowUp.status.in_(["pending", "snoozed"]),
                FollowUp.remind_at <= datetime.utcnow()
            ).all()
        finally:
            session.close()

    def snooze(self, follow_up_id: str, days: int = 3):
        session = self.Session()
        try:
            fu = session.query(FollowUp).get(follow_up_id)
            if fu:
                fu.status = "snoozed"
                fu.remind_at = datetime.utcnow() + timedelta(days=days)
                session.commit()
        finally:
            session.close()

    def mark_reminded(self, follow_up_id: str):
        session = self.Session()
        try:
            fu = session.query(FollowUp).get(follow_up_id)
            if fu:
                fu.last_reminded_at = datetime.utcnow()
                fu.status = "reminded"
                session.commit()
        finally:
            session.close()

Part 5: The Agent Loop

Now we tie everything together into the main agent:

import logging
from dataclasses import dataclass

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("email_agent")

@dataclass
class AgentConfig:
    gmail_credentials: str = "credentials.json"
    outlook_client_id: str = ""
    outlook_tenant_id: str = ""
    outlook_client_secret: str = ""
    llm_model: str = "gpt-4o-mini"
    draft_model: str = "gpt-4o"
    user_name: str = ""
    user_role: str = ""
    user_context: str = ""
    check_interval_minutes: int = 5
    auto_send_threshold: Priority = None  # None = never auto-send
    db_path: str = "email_agent.db"

class EmailAgent:
    def __init__(self, config: AgentConfig):
        self.config = config
        self.tracker = FollowUpTracker(config.db_path)

        # Initialize providers
        self.gmail = None
        self.outlook = None

        if config.gmail_credentials:
            self.gmail = GmailClient(config.gmail_credentials)

        if config.outlook_client_id:
            self.outlook = OutlookClient(
                config.outlook_client_id,
                config.outlook_tenant_id,
                config.outlook_client_secret,
            )

        self.thread_mgr = ThreadManager(self.gmail, self.outlook)
        self.triage = BatchTriageEngine(
            model_name=config.llm_model,
            user_context=config.user_context,
        )
        self.drafter = DraftGenerator(
            model_name=config.draft_model,
            user_name=config.user_name,
            user_role=config.user_role,
        )

    async def process_inbox(self) -> list[dict]:
        """Main processing loop. Returns a summary of actions taken."""
        all_emails = []
        results_summary = []

        # Fetch from all providers
        if self.gmail:
            gmail_emails = self.gmail.fetch_recent_emails(max_results=20)
            all_emails.extend(gmail_emails)
            logger.info(f"Fetched {len(gmail_emails)} emails from Gmail")

        if self.outlook:
            outlook_emails = self.outlook.fetch_recent_emails(max_results=20)
            all_emails.extend(outlook_emails)
            logger.info(f"Fetched {len(outlook_emails)} emails from Outlook")

        if not all_emails:
            logger.info("No new emails to process")
            return []

        # Check if any pending follow-ups got resolved
        self.tracker.resolve_if_replied(all_emails)

        # Triage all emails
        triage_results = await self.triage.triage_batch(all_emails)

        for email_msg in all_emails:
            triage = triage_results.get(email_msg.id)
            if not triage:
                continue

            action = {
                "email_id": email_msg.id,
                "subject": email_msg.subject,
                "sender": email_msg.sender,
                "category": triage.category.value,
                "priority": triage.priority.value,
                "draft": None,
                "follow_up_created": False,
                "auto_sent": False,
            }

            # Generate draft for action-required emails
            if triage.category in (
                EmailCategory.ACTION_REQUIRED,
                EmailCategory.FOLLOW_UP_NEEDED,
                EmailCategory.MEETING,
            ):
                thread_history = []
                if email_msg.thread_id:
                    thread = self.thread_mgr.get_gmail_thread(email_msg.thread_id)
                    thread_history = [f"From: {m.sender}\n{m.body[:500]}"
                                      for m in thread[:-1]]

                draft = self.drafter.generate_draft(
                    email_msg, triage, thread_history
                )
                action["draft"] = draft

                # Auto-send logic (use with extreme caution)
                if (self.config.auto_send_threshold and
                        triage.priority == self.config.auto_send_threshold and
                        not triage.requires_context and
                        '[NEEDS INFO' not in draft):
                    self._send_reply(email_msg, draft)
                    action["auto_sent"] = True

            # Create follow-up if needed
            if triage.priority in (Priority.URGENT, Priority.HIGH):
                remind_days = 1 if triage.priority == Priority.URGENT else 3
                self.tracker.add_follow_up(email_msg, triage, remind_days)
                action["follow_up_created"] = True

            results_summary.append(action)

        # Process due follow-up reminders
        reminders = self.tracker.get_due_reminders()
        for reminder in reminders:
            logger.warning(
                f"FOLLOW-UP DUE: {reminder.subject} | "
                f"Contact: {reminder.contact_email} | "
                f"Due: {reminder.remind_at}"
            )

        return results_summary

    def _send_reply(self, email_msg: EmailMessage, body: str):
        if email_msg.provider == "gmail" and self.gmail:
            self.gmail.send_reply(email_msg.thread_id, email_msg.sender,
                                  f"Re: {email_msg.subject}", body)
            self.gmail.mark_as_read(email_msg.id)
        elif email_msg.provider == "outlook" and self.outlook:
            self.outlook.send_reply(email_msg.id, body)
            self.outlook.mark_as_read(email_msg.id)

    def run_daemon(self):
        """Run as a continuous service."""
        import time
        interval = self.config.check_interval_minutes * 60

        logger.info(f"Email agent started. Checking every {self.config.check_interval_minutes} minutes.")

        while True:
            try:
                import asyncio
                results = asyncio.run(self.process_inbox())
                if results:
                    logger.info(f"Processed {len(results)} emails")
                    for r in results:
                        logger.info(
                            f"  [{r['priority'].upper()}] {r['subject']} "
                            f"-> {r['category']}"
                            f"{' (draft ready)' if r['draft'] else ''}"
                        )
            except Exception as e:
                logger.error(f"Processing failed: {e}", exc_info=True)

            time.sleep(interval)

Running the Agent

if __name__ == "__main__":
    config = AgentConfig(
        gmail_credentials="credentials.json",
        user_name="Alex Chen",
        user_role="Engineering Manager at Acme Corp",
        user_context="I manage a team of 8 engineers. Key stakeholders: "
                     "Sarah (VP Eng), Mike (PM). We're launching v2.1 on March 15.",
        llm_model="gpt-4o-mini",
        draft_model="gpt-4o",
        check_interval_minutes=5,
        # Uncomment below to auto-send low-risk replies:
        # auto_send_threshold=Priority.LOW,
    )

    agent = EmailAgent(config)
    agent.run_daemon()

Part 6: Privacy Considerations

This is the section most tutorials skip. Don't.

Data Flow Audit

Every email your agent processes passes through these stages:

Stage Data Location Risk Level Mitigation
Fetch from provider In-memory Medium Don't log raw email bodies
LLM classification OpenAI API / local High Use local models for sensitive orgs
Draft generation OpenAI API / local High Same as above
SQLite storage Local disk Medium Encrypt the database file
OAuth tokens Local disk (pickle) Critical Use OS keychain instead

Concrete Privacy Recommendations

1. Use local models for sensitive workloads.

from langchain_ollama import ChatOllama

# Replace OpenAI with a local model
local_llm = ChatOllama(model="llama3.1:8b", temperature=0)

# Triage engine with local model
triage_engine = TriageEngine(model_name="local")
triage_engine.llm = local_llm  # override the LLM instance

Llama 3.1 8B handles classification well. Draft quality drops noticeably — you'll want at least a 70B model or fine-tuned 13B for professional drafts.

2. Encrypt the SQLite database.

# Use sqlcipher instead of plain sqlite
# pip install pysqlcipher3
from sqlalchemy import create_engine

def create_encrypted_engine(db_path: str, passphrase: str):
    # Requires sqlcipher installed
    engine = create_engine(
        f"sqlite+pysqlcipher://:{passphrase}@/{db_path}",
        module=__import__('pysqlcipher3')
    )
    return engine

3. Never log email bodies.

# BAD
logger.info(f"Processing email: {email_msg.body}")

# GOOD
logger.info(f"Processing email {email_msg.id} from {email_msg.sender}")

4. Implement data retention policies.

class DataRetention:
    def __init__(self, tracker: FollowUpTracker, retention_days: int = 30):
        self.tracker = tracker
        self.retention_days = retention_days

    def purge_old_records(self):
        from datetime import datetime, timedelta
        cutoff = datetime.utcnow() - timedelta(days=self.retention_days)

        session = self.tracker.Session()
        try:
            deleted = session.query(FollowUp).filter(
                FollowUp.status == "resolved",
                FollowUp.created_at < cutoff
            ).delete()
            session.commit()
            logger.info(f"Purged {deleted} old follow-up records")
        finally:
            session.close()

5. Scope OAuth permissions minimally. Don't request gmail.modify if you only need read access. The Gmail scopes in our code include send and modify — strip what you don't need.

6. Handle PII in drafts. The LLM might reproduce sensitive information from the original email in the draft. Add a post-processing filter:

import re

def redact_sensitive_info(text: str) -> str:
    """Redact common PII patterns from generated drafts."""
    # SSN patterns
    text = re.sub(r'\b\d{3}-\d{2}-\d{4}\b', '[SSN REDACTED]', text)
    # Credit card numbers
    text = re.sub(r'\b\d{4}

Keywords

AI agentproductivity-agents