How to Build a Sales Qualification Agent with Multi-Turn Conversation
Sarah Kim
Quantitative researcher turned AI writer. Specializes in financial AI agents.
The gap between "hello" and "meeting booked" is where most sales bots fail. They handle single queries well but crumble during real conversations where prospects raise objections, ask complex question...
Building a Multi-Turn Sales Qualification Agent: A Technical Deep Dive
The gap between "hello" and "meeting booked" is where most sales bots fail. They handle single queries well but crumble during real conversations where prospects raise objections, ask complex questions, and require nuanced qualification. In this tutorial, we'll build a production-grade sales qualification agent that actually navigates multi-turn conversations.
Architecture Overview
Our agent will use a stateful conversation manager with four core modules:
┌─────────────────┐ ┌─────────────────┐
│ Lead Scoring │◄───┤ Conversation │
│ Engine │ │ Manager │
└────────┬────────┘ └────────┬────────┘
│ │
▼ ▼
┌─────────────────┐ ┌─────────────────┐
│ CRM Integration│ │ Objection │
│ Layer │◄───┤ Handler │
└─────────────────┘ └─────────────────┘
We'll use LangChain for conversation orchestration, FastAPI for the backend, and integrate with HubSpot CRM (principles apply to Salesforce, Pipedrive, etc.).
Setting Up the Project
mkdir sales-agent && cd sales-agent
python -m venv venv
source venv/bin/activate
pip install langchain openai fastapi uvicorn hubspot-api-client pydantic python-dateutil
Create the project structure:
sales-agent/
├── app/
│ ├── __init__.py
│ ├── main.py
│ ├── agent.py
│ ├── scoring.py
│ ├── objections.py
│ ├── crm.py
│ ├── booking.py
│ └── models.py
├── .env
└── requirements.txt
Core Conversation State Management
First, we need to track conversation state across turns. This isn't just message history—it's structured data about where we are in the qualification process.
# app/models.py
from pydantic import BaseModel, Field
from typing import Dict, List, Optional, Any
from datetime import datetime
from enum import Enum
class ConversationStage(str, Enum):
GREETING = "greeting"
DISCOVERY = "discovery"
QUALIFICATION = "qualification"
OBJECTION = "objection"
BOOKING = "booking"
CLOSED = "closed"
class LeadProfile(BaseModel):
email: Optional[str] = None
company: Optional[str] = None
role: Optional[str] = None
company_size: Optional[int] = None
industry: Optional[str] = None
pain_points: List[str] = Field(default_factory=list)
budget_range: Optional[str] = None
timeline: Optional[str] = None
class ConversationState(BaseModel):
lead_id: str
session_id: str
stage: ConversationStage = ConversationStage.GREETING
lead_profile: LeadProfile = Field(default_factory=LeadProfile)
lead_score: float = 0.0
objections_raised: List[str] = Field(default_factory=list)
meeting_proposed: bool = False
messages: List[Dict[str, Any]] = Field(default_factory=list)
context: Dict[str, Any] = Field(default_factory=dict)
last_updated: datetime = Field(default_factory=datetime.now)
Lead Scoring Engine
Real lead scoring isn't just about company size—it's about fit, intent, and timing. We'll implement a weighted scoring system that updates as the conversation progresses.
# app/scoring.py
from typing import Dict, List, Tuple
from dataclasses import dataclass
import re
@dataclass
class ScoringRule:
field: str
weight: float
scoring_function: callable
class LeadScoringEngine:
def __init__(self):
self.rules = self._initialize_rules()
self.thresholds = {
'hot': 80,
'warm': 50,
'cold': 0
}
def _initialize_rules(self) -> List[ScoringRule]:
return [
ScoringRule('company_size', 0.2, self._score_company_size),
ScoringRule('role', 0.25, self._score_role),
ScoringRule('pain_points', 0.3, self._score_pain_points),
ScoringRule('budget_range', 0.15, self._score_budget),
ScoringRule('timeline', 0.1, self._score_timeline),
]
def calculate_score(self, profile: 'LeadProfile') -> Tuple[float, Dict]:
"""Calculate lead score with breakdown"""
total_score = 0
breakdown = {}
for rule in self.rules:
value = getattr(profile, rule.field, None)
if value is not None:
rule_score = rule.scoring_function(value)
weighted_score = rule_score * rule.weight
total_score += weighted_score
breakdown[rule.field] = {
'raw': rule_score,
'weighted': weighted_score,
'weight': rule.weight
}
# Apply conversation engagement multiplier
engagement_bonus = self._calculate_engagement(profile)
total_score *= engagement_bonus
return min(100, total_score), breakdown
def _score_company_size(self, size: int) -> float:
"""Score based on ideal customer profile"""
# Example: Target is 100-1000 employees
if 100 <= size <= 1000:
return 100
elif 50 <= size < 100 or 1000 < size <= 5000:
return 70
elif size < 50:
return 30 # Too small
else:
return 50 # Enterprise, longer sales cycle
def _score_role(self, role: str) -> float:
"""Score based on decision-making authority"""
role_lower = role.lower()
if any(title in role_lower for title in ['ceo', 'cto', 'vp', 'director']):
return 100
elif 'manager' in role_lower:
return 70
elif 'lead' in role_lower:
return 50
else:
return 30
def _score_pain_points(self, pain_points: List[str]) -> float:
"""Score based on problem-product fit"""
high_value_pains = ['scaling', 'efficiency', 'cost reduction', 'automation']
matches = sum(1 for pain in pain_points
if any(keyword in pain.lower()
for keyword in high_value_pains))
return min(100, matches * 25)
def _score_budget(self, budget: str) -> float:
"""Parse and score budget range"""
if not budget:
return 0
# Extract numbers from budget string
numbers = re.findall(r'\d+', budget.replace(',', ''))
if len(numbers) >= 2:
avg_budget = (int(numbers[0]) + int(numbers[1])) / 2
if avg_budget >= 50000:
return 100
elif avg_budget >= 20000:
return 70
else:
return 30
return 50 # Default if can't parse
def _score_timeline(self, timeline: str) -> float:
"""Score urgency"""
timeline_lower = timeline.lower()
if any(word in timeline_lower for word in ['asap', 'urgent', 'this quarter']):
return 100
elif any(word in timeline_lower for word in ['next quarter', '3 months']):
return 70
elif '6 months' in timeline_lower:
return 40
else:
return 20
def _calculate_engagement(self, profile: 'LeadProfile') -> float:
"""Multiplier based on conversation engagement"""
# This would track response times, question quality, etc.
return 1.0 # Placeholder
Objection Handling System
Objections aren't obstacles—they're buying signals. Our handler uses pattern matching combined with contextual responses.
# app/objections.py
from typing import Dict, List, Optional
import re
class ObjectionHandler:
def __init__(self):
self.objection_patterns = self._load_patterns()
self.response_templates = self._load_templates()
def _load_patterns(self) -> Dict[str, List[str]]:
"""Define objection patterns with regex"""
return {
'price': [
r'too expensive',
r'budget.{0,20}tight',
r'cost.{0,20}high',
r'can.{0,10}afford',
r'price.{0,20}concern'
],
'timing': [
r'not.{0,10}ready',
r'bad time',
r'later',
r'next.{0,10}year',
r'no.{0,10}urgency'
],
'competitor': [
r'using.{0,20}competitor',
r'already.{0,10}have.{0,20}solution',
r'happy with',
r'current.{0,10}vendor'
],
'authority': [
r'need.{0,10}check',
r'talk to.{0,10}team',
r'not my decision',
r'approval'
],
'value': [
r'not sure.{0,20}roi',
r'don\'t see.{0,10}value',
r'what.{0,10}difference',
r'why.{0,10}better'
]
}
def detect_objection(self, message: str) -> Optional[str]:
"""Detect objection type from message"""
message_lower = message.lower()
for objection_type, patterns in self.objection_patterns.items():
for pattern in patterns:
if re.search(pattern, message_lower):
return objection_type
return None
def generate_response(self, objection_type: str, context: Dict) -> str:
"""Generate contextual response to objection"""
templates = self.response_templates.get(objection_type, [])
if not templates:
return self._generic_response(objection_type)
# Select template based on context
template = self._select_template(templates, context)
# Fill in context variables
response = template.format(**context)
return response
def _load_templates(self) -> Dict[str, List[str]]:
"""Response templates with variables"""
return {
'price': [
"I understand budget is important. Many of our clients like {company} initially had similar concerns, but found that {value_prop} delivered {roi_metric} within {timeframe}.",
"Let's look at the ROI. Based on what you've shared about {pain_point}, our solution typically saves {percentage}% in {area}. Would it help to see a cost-benefit analysis?",
"We have flexible pricing options. Could you share what budget range you're working with? I want to make sure we find the right fit."
],
'timing': [
"I hear you on timing. What would need to change for this to become a priority? Sometimes starting with a pilot can help build momentum.",
"Many clients start planning now to be ready for {optimal_time}. Would it make sense to at least explore the options so you're prepared?",
"What if we scheduled a brief call to discuss your roadmap? No commitment, just to understand your timeline better."
],
'competitor': [
"That's great you're already investing in this space. What's working well with {competitor}? What would you improve?",
"Many of our clients switched from {competitor} because of {differentiator}. Would it be helpful to see a comparison?",
"We integrate well with {competitor}. Some clients use us alongside their existing solution for {specific_use_case}."
],
'authority': [
"Absolutely, involving the team makes sense. Who else would need to be part of this decision? I can prepare materials for them.",
"Would it help if I shared some case studies from similar {industry} companies you could share with your team?",
"What information would help you make the case internally? I'm happy to provide ROI calculations or reference calls."
],
'value': [
"That's a fair question. Let me share how {similar_company} achieved {specific_result} in {timeframe}.",
"What metrics matter most to your team? I can tailor our discussion around {key_metric} improvements.",
"Would a demo focused on your specific use case with {pain_point} help illustrate the value?"
]
}
def _select_template(self, templates: List[str], context: Dict) -> str:
"""Select most appropriate template based on context"""
# Simple selection - in production, use ML model
return templates[0]
def _generic_response(self, objection_type: str) -> str:
return f"I understand your concern about {objection_type}. Could you tell me more about what specifically worries you?"
Meeting Booking Integration
We'll integrate with Calendly's API for actual booking functionality. This handles timezone conversions and availability checks.
# app/booking.py
import requests
from datetime import datetime, timedelta
from typing import Dict, List, Optional
from pydantic import BaseModel
class TimeSlot(BaseModel):
start: datetime
end: datetime
available: bool
class MeetingBooker:
def __init__(self, api_key: str):
self.api_key = api_key
self.base_url = "https://api.calendly.com"
self.headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
def get_available_slots(self, event_type_uri: str,
start_time: datetime,
days_ahead: int = 7) -> List[TimeSlot]:
"""Get available time slots from Calendly"""
end_time = start_time + timedelta(days=days_ahead)
params = {
"event_type": event_type_uri,
"start_time": start_time.isoformat(),
"end_time": end_time.isoformat()
}
response = requests.get(
f"{self.base_url}/scheduled_events/availability",
headers=self.headers,
params=params
)
if response.status_code == 200:
data = response.json()
return self._parse_availability(data)
else:
# Fallback to default slots
return self._generate_default_slots(start_time, days_ahead)
def book_meeting(self, invitee_email: str,
invitee_name: str,
event_type_uri: str,
start_time: datetime,
questions: Dict = None) -> Dict:
"""Book a meeting via Calendly"""
payload = {
"event_type": event_type_uri,
"invitee": {
"email": invitee_email,
"name": invitee_name,
"timezone": "America/New_York" # Would detect from context
},
"start_time": start_time.isoformat(),
"questions": questions or []
}
response = requests.post(
f"{self.base_url}/scheduled_events",
headers=self.headers,
json=payload
)
if response.status_code == 201:
return {
"success": True,
"meeting_url": response.json()["resource"]["uri"],
"confirmation_id": response.json()["resource"]["id"]
}
else:
return {
"success": False,
"error": response.text
}
def _parse_availability(self, data: Dict) -> List[TimeSlot]:
"""Parse Calendly availability response"""
slots = []
for event in data.get("collection", []):
if event.get("status") == "available":
slots.append(TimeSlot(
start=datetime.fromisoformat(event["start_time"]),
end=datetime.fromisoformat(event["end_time"]),
available=True
))
return slots
def _generate_default_slots(self, start: datetime, days: int) -> List[TimeSlot]:
"""Generate default slots if API fails"""
slots = []
current = start.replace(hour=9, minute=0, second=0, microsecond=0)
for day in range(days):
for hour in [9, 11, 14, 16]: # 9am, 11am, 2pm, 4pm
slot_start = current + timedelta(days=day, hours=hour-9)
slot_end = slot_start + timedelta(minutes=30)
slots.append(TimeSlot(
start=slot_start,
end=slot_end,
available=True
))
return slots
CRM Integration Layer
We'll implement a HubSpot integration that creates/updates contacts and logs conversation activities.
# app/crm.py
from hubspot import HubSpot
from hubspot.crm.contacts import SimplePublicObjectInputForCreate
from typing import Dict, Optional
import json
class CRMIntegration:
def __init__(self, access_token: str):
self.client = HubSpot(access_token=access_token)
def create_or_update_contact(self, profile: 'LeadProfile') -> Dict:
"""Create or update contact in HubSpot"""
if not profile.email:
return {"error": "Email required"}
# Search for existing contact
existing = self._find_contact_by_email(profile.email)
if existing:
return self._update_contact(existing['id'], profile)
else:
return self._create_contact(profile)
def log_conversation(self, contact_id: str,
conversation_summary: str,
lead_score: float,
stage: str) -> Dict:
"""Log conversation activity in CRM"""
activity = {
"properties": {
"hs_note_body": conversation_summary,
"hs_timestamp": str(int(datetime.now().timestamp() * 1000)),
"hs_lead_score": str(lead_score),
"hs_conversation_stage": stage
}
}
try:
response = self.client.crm.objects.notes.basic_api.create(
simple_public_object_input_for_create=activity
)
return {"success": True, "note_id": response.id}
except Exception as e:
return {"success": False, "error": str(e)}
def create_deal(self, contact_id: str,
company: str,
amount: float = 0) -> Dict:
"""Create deal in HubSpot"""
deal = {
"properties": {
"dealname": f"Sales Qualified Lead - {company}",
"amount": str(amount),
"pipeline": "default",
"dealstage": "qualifiedtobuy",
"associated_contact_ids": [contact_id]
}
}
try:
response = self.client.crm.deals.basic_api.create(
simple_public_object_input_for_create=deal
)
return {"success": True, "deal_id": response.id}
except Exception as e:
return {"success": False, "error": str(e)}
def _find_contact_by_email(self, email: str) -> Optional[Dict]:
"""Search for contact by email"""
try:
response = self.client.crm.contacts.search_api.do_search(
public_object_search_request={
"filterGroups": [{
"filters": [{
"propertyName": "email",
"operator": "EQ",
"value": email
}]
}]
}
)
if response.total > 0:
return response.results[0].to_dict()
return None
except:
return None
def _create_contact(self, profile: 'LeadProfile') -> Dict:
"""Create new contact"""
contact_data = SimplePublicObjectInputForCreate(
properties={
"email": profile.email,
"company": profile.company,
"jobtitle": profile.role,
"hs_lead_status": "open",
"numemployees": str(profile.company_size) if profile.company_size else None
}
)
try:
response = self.client.crm.contacts.basic_api.create(
simple_public_object_input_for_create=contact_data
)
return {"success": True, "contact_id": response.id}
except Exception as e:
return {"success": False, "error": str(e)}
def _update_contact(self, contact_id: str, profile: 'LeadProfile') -> Dict:
"""Update existing contact"""
update_data = SimplePublicObjectInputForCreate(
properties={
"company": profile.company,
"jobtitle": profile.role,
"numemployees": str(profile.company_size) if profile.company_size else None,
"hs_lead_status": "in_progress"
}
)
try:
response = self.client.crm.contacts.basic_api.update(
contact_id=contact_id,
simple_public_object_input_for_create=update_data
)
return {"success": True, "contact_id": contact_id}
except Exception as e:
return {"success": False, "error": str(e)}
Main Conversation Agent
Now we'll build the LangChain agent that orchestrates everything.
# app/agent.py
from langchain.agents import AgentExecutor, create_openai_tools_agent
from langchain.tools import BaseTool
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from typing import Dict, Any, List
import json
class SalesQualificationAgent:
def __init__(self):
self.llm = ChatOpenAI(model="gpt-4-1106-preview", temperature=0.3)
self.conversation_states: Dict[str, ConversationState] = {}
self.scoring_engine = LeadScoringEngine()
self.objection_handler = ObjectionHandler()
self.crm = CRMIntegration(os.getenv("HUBSPOT_TOKEN"))
self.booker = MeetingBooker(os.getenv("CALENDLY_TOKEN"))
# Initialize tools
self.tools = self._initialize_tools()
# Create agent
self.agent = self._create_agent()
def _initialize_tools(self) -> List[BaseTool]:
"""Initialize LangChain tools"""
return [
self._create_update_profile_tool(),
self._create_score_lead_tool(),
self._create_handle_objection_tool(),
self._create_book_meeting_tool(),
self._create_update_crm_tool()
]
def _create_agent(self) -> AgentExecutor:
"""Create the main conversation agent"""
prompt = ChatPromptTemplate.from_messages([
("system", """You are a professional sales qualification agent for DriftSeas AI solutions.
Your goal is to qualify leads through natural conversation. Follow these guidelines:
1. Start with greeting and discovery questions
2. Gradually collect qualification information
3. Address objections empathetically
4. Propose meeting when lead is qualified
5. Always maintain professional, consultative tone
Current conversation state: {conversation_state}
Lead profile: {lead_profile}
Lead score: {lead_score}
Objections raised: {objections}
Use tools when needed. Always explain your reasoning."""),
MessagesPlaceholder(variable_name="chat_history"),
("human", "{input}"),
MessagesPlaceholder(variable_name="agent_scratchpad"),
])
agent = create_openai_tools_agent(self.llm, self.tools, prompt)
return AgentExecutor(agent=agent, tools=self.tools, verbose=True)
async def process_message(self, session_id: str, message: str) -> Dict[str, Any]:
"""Process incoming message and return response"""
# Get or create conversation state
if session_id not in self.conversation_states:
self.conversation_states[session_id] = ConversationState(
lead_id=f"lead_{session_id}",
session_id=session_id
)
state = self.conversation_states[session_id]
# Update message history
state.messages.append({
"role": "user",
"content": message,
"timestamp": datetime.now().isoformat()
})
# Detect objection if in appropriate stage
if state.stage in [ConversationStage.DISCOVERY, ConversationStage.QUALIFICATION]:
objection_type = self.objection_handler.detect_objection(message)
if objection_type:
state.objections_raised.append(objection_type)
state.stage = ConversationStage.OBJECTION
# Prepare agent input
agent_input = {
"input": message,
"conversation_state": state.stage.value,
"lead_profile": state.lead_profile.dict(),
"lead_score": state.lead_score,
"objections": state.objections_raised,
"chat_history": state.messages[-10:] # Last 10 messages for context
}
# Run agent
try:
response = await self.agent.ainvoke(agent_input)
# Update state with agent's actions
self._update_state_from_actions(state, response.get("intermediate_steps", []))
# Add response to history
state.messages.append({
"role": "assistant",
"content": response["output"],
"timestamp": datetime.now().isoformat()
})
# Update last activity
state.last_updated = datetime.now()
return {
"response": response["output"],
"state": {
"stage": state.stage.value,
"lead_score": state.lead_score,
"meeting_proposed": state.meeting_proposed
}
}
except Exception as e:
return {
"response": "I apologize, I encountered an error. Let me try again.",
"error": str(e)
}
def _update_state_from_actions(self, state: ConversationState,
actions: List) -> None:
"""Update conversation state based on agent actions"""
for action in actions:
tool_name = action[0].tool
tool_input = action[0].tool_input
if tool_name == "update_lead_profile":
# Update lead profile with new information
for key, value in tool_input.items():
if hasattr(state.lead_profile, key):
setattr(state.lead_profile, key, value)
# Recalculate score
state.lead_score, _ = self.scoring_engine.calculate_score(
state.lead_profile
)
elif tool_name == "handle_objection":
# Mark objection as addressed
if state.stage == ConversationStage.OBJECTION:
state.stage = ConversationStage.QUALIFICATION
elif tool_name == "book_meeting":
state.meeting_proposed = True
state.stage = ConversationStage.BOOKING
elif tool_name == "update_crm":
# CRM updated, conversation can close
state.stage = ConversationStage.CLOSED
FastAPI Backend
Finally, let's create the API endpoint.
# app/main.py
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import Dict, Any
import uuid
app = FastAPI(title="Sales Qualification Agent API")
# CORS for frontend
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Initialize agent
agent = SalesQualificationAgent()
class MessageRequest(BaseModel):
session_id: str = None
message: str
metadata: Dict[str, Any] = {}
class MessageResponse(BaseModel):
response: str
session_id: str
state: Dict[str, Any]
@app.post("/chat", response_model=MessageResponse)
async def chat_endpoint(request: MessageRequest):
"""Main chat endpoint"""
session_id = request.session_id or str(uuid.uuid4())
try:
result = await agent.process_message(session_id, request.message)
return MessageResponse(
response=result["response"],
session_id=session_id,
state=result.get("state", {})
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/health")
async def health_check():
return {"status": "healthy"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
Testing the Agent
Create a test script to simulate conversations:
# test_agent.py
import asyncio
import httpx
async def test_conversation():
async with httpx.AsyncClient() as client:
# Start conversation
response = await client.post("http://localhost:8000/chat", json={
"message": "Hi, I'm interested in your AI solutions."
})
session_id = response.json()["session_id"]
print(f"Bot: {response.json()['response']}")
# Simulate multi-turn conversation
test_messages = [
"We're a mid-sized tech company, about 200 employees.",
"I'm the VP of Engineering, looking to improve our development workflow.",
"Our main pain point is that our current tools don't scale well.",
"Budget is around $50k-$100k for the right solution.",
"We'd like to implement something within the next quarter.",
"What makes you different from CompetitorX?",
"That's interesting. How does pricing work?",
"Let me check with my team first.",
"Actually, can we schedule a demo next week?"
]
for message in test_messages:
print(f"\nUser: {message}")
response = await client.post(
"http://localhost:8000/chat",
json={
"session_id": session_id,
"message": message
}
)
data = response.json()
print(f"Bot: {data['response']}")
print(f"State: {data['state']}")
if __name__ == "__main__":
asyncio.run(test_conversation())
Production Considerations
1. Conversation Memory Management
# Add to agent.py
def _cleanup_old_sessions(self, max_age_hours: int = 24):
"""Remove inactive sessions"""
cutoff = datetime.now() - timedelta(hours=max_age_hours)
inactive_sessions = [
session_id for session_id, state in self.conversation_states.items()
if state.last_updated < cutoff
]
for session_id in inactive_sessions:
del self.conversation_states[session_id]
2. Error Handling and Fallbacks
# Add to main.py
@app.exception_handler(Exception)
async def global_exception_handler(request, exc):
return JSONResponse(
status_code=500,
content={
"response": "I apologize for the technical difficulty. Let me connect you with a human agent.",
"error": str(exc),
"fallback": True
}
)
3. Analytics and Monitoring
# Add analytics tracking
class ConversationAnalytics:
def track_event(self, session_id: str, event_type: str, data: Dict):
"""Track conversation events for analysis"""
# Send to your analytics platform
# (Mixpanel, Amplitude, custom database)
pass
Key Takeaways
- State Management is Critical: Track conversation stage, lead profile, and context separately
- Scoring Should Evolve: Update lead score as new information emerges
- Objections Are Opportunities: Pattern match and respond contextually
- Integration Points Matter: CRM updates should happen at natural conversation breaks
- Test Real Conversations: Multi-turn testing reveals edge cases single-turn testing misses
The complete codebase is available on GitHub at driftseas/sales-agent-tutorial. This implementation handles the messy reality of sales conversations—where prospects change their minds, raise unexpected objections, and require careful nurturing through the qualification process.
Remember: The goal isn't to replace sales reps, but to handle initial qualification so your team can focus on high-value conversations with well-qualified leads.