Building an Incident Response Agent That Fixes Production Issues
Oliver Schmidt
DevOps engineer covering AI agents for operations and deployment.
Incident response is where AI agents can deliver immediate, measurable ROI. Not the vague "AI-powered" marketing fluff — actual agents that triage alerts, correlate signals across systems, identify ro...
Building an AI Agent for Automated Incident Response: A Production-Grade Tutorial
Incident response is where AI agents can deliver immediate, measurable ROI. Not the vague "AI-powered" marketing fluff — actual agents that triage alerts, correlate signals across systems, identify root causes, and either fix problems or escalate them intelligently.
This tutorial walks through building a production-grade incident response agent using LangGraph for orchestration, real observability tools for signal gathering, and concrete remediation strategies. Every code snippet here runs against actual infrastructure APIs.
Architecture Overview
The incident response agent follows a state machine pattern, not a simple prompt chain. This matters because incident response has branching logic, retry loops, and human-in-the-loop checkpoints that naive sequential chains handle poorly.
┌─────────────┐ ┌──────────────┐ ┌─────────────────┐
│ Alert │────▶│ Triage & │────▶│ Root Cause │
│ Ingestion │ │ Classification│ │ Analysis │
└─────────────┘ └──────────────┘ └────────┬────────┘
│
┌──────────────┐ ┌─────────▼────────┐
│ Escalation │◀────│ Remediation │
│ (if needed) │ │ Decision Engine │
└──────────────┘ └─────────┬────────┘
│
┌────────▼────────┐
│ Execute & Verify │
└─────────────────┘
Tech stack:
- LangGraph — agent orchestration with state management and conditional routing
- Prometheus + Grafana — metrics collection and querying
- Loki — log aggregation
- PagerDuty / Opsgenie — escalation
- Kubernetes client — infrastructure remediation
- OpenAI GPT-4 / Claude — reasoning backbone
Setting Up the Project
mkdir incident-response-agent && cd incident-response-agent
python -m venv venv && source venv/bin/activate
pip install langgraph langchain-openai langchain-anthropic \
prometheus-api-client python-requests kubernetes \
structlog pydantic
Defining the Agent State
LangGraph agents are stateful. The state object carries all context through the workflow. Get this right and the rest falls into place.
from typing import Literal, Optional
from pydantic import BaseModel, Field
from datetime import datetime
from enum import Enum
class Severity(str, Enum):
CRITICAL = "critical"
HIGH = "high"
MEDIUM = "medium"
LOW = "low"
class Alert(BaseModel):
alert_id: str
source: str # prometheus, datadog, cloudwatch, etc.
name: str
description: str
labels: dict[str, str]
annotations: dict[str, str]
fired_at: datetime
raw_payload: dict
class MetricSnapshot(BaseModel):
metric_name: str
query: str
values: list[dict] # timestamp, value pairs
unit: str
anomaly_detected: bool = False
anomaly_description: str = ""
class LogEntry(BaseModel):
timestamp: datetime
source: str
level: str
message: str
structured_fields: dict = {}
class RootCauseHypothesis(BaseModel):
cause: str
confidence: float = Field(ge=0.0, le=1.0)
evidence: list[str]
affected_components: list[str]
suggested_remediation: str
class RemediationAction(BaseModel):
action_type: Literal[
"kubectl_restart", "scale_deployment", "rollback_deployment",
"clear_cache", "rotate_certificate", "failover_database",
"custom_script", "manual_only"
]
target: str
parameters: dict = {}
requires_approval: bool = True
estimated_risk: Literal["low", "medium", "high"] = "medium"
class IncidentState(BaseModel):
# Input
alert: Alert
# Triage output
severity: Optional[Severity] = None
triage_summary: str = ""
is_known_pattern: bool = False
known_pattern_id: Optional[str] = None
# Investigation data
metrics: list[MetricSnapshot] = []
logs: list[LogEntry] = []
recent_deployments: list[dict] = []
related_alerts: list[Alert] = []
# Root cause analysis
hypotheses: list[RootCauseHypothesis] = []
selected_hypothesis: Optional[RootCauseHypothesis] = None
# Remediation
remediation_plan: list[RemediationAction] = []
remediation_executed: bool = False
remediation_result: str = ""
remediation_verified: bool = False
# Escalation
escalated: bool = False
escalation_reason: str = ""
escalation_target: str = ""
# Control flow
current_step: str = "triage"
errors: list[str] = []
human_approval_needed: bool = False
human_decision: Optional[str] = None
This state object is the single source of truth. Every node reads from it and writes to it. No hidden side effects.
Node 1: Alert Triage and Classification
Triage is where most automation attempts fail because they try to be too clever. The goal isn't to solve the problem — it's to answer three questions fast: How bad is this? Have we seen it before? What data do we need?
import structlog
from langchain_openai import ChatOpenAI
from langchain_core.messages import SystemMessage, HumanMessage
logger = structlog.get_logger()
TRIAGE_PROMPT = """You are an SRE triage specialist. Given an incoming alert,
classify it and determine the investigation strategy.
Respond in JSON with these fields:
- severity: critical|high|medium|low
- summary: one-paragraph description of what's happening
- is_known_pattern: boolean
- known_pattern_id: if known pattern, identify it (e.g., "connection_pool_exhaustion",
"oom_kill_loop", "cert_expiration", "deploy_latency_regression")
- metrics_to_query: list of Prometheus queries needed for investigation
- log_queries: list of log filter expressions
- check_deployments: boolean — should we check recent deployments?
Classification guidelines:
- CRITICAL: Customer-facing outage, data loss risk, security breach
- HIGH: Degraded service affecting >10% of users, approaching SLA breach
- MEDIUM: Performance degradation, non-critical component failure
- LOW: Warning conditions, capacity planning alerts, non-urgent anomalies
Known patterns you should recognize:
1. Connection pool exhaustion: usually shows as DB connection timeout alerts
combined with increasing connection count metrics
2. OOM kill loops: container restarts + memory approaching limits
3. Certificate expiration: TLS errors with expiry within 7 days
4. Deploy-triggered latency: latency spike within 30min of deployment
5. DNS resolution failures: sudden spike in DNS timeout errors
6. Disk pressure: inode or volume usage approaching 100%
"""
async def triage_alert(state: IncidentState) -> dict:
"""Classify the alert and determine investigation strategy."""
alert = state.alert
llm = ChatOpenAI(model="gpt-4o", temperature=0)
alert_context = f"""
Alert ID: {alert.alert_id}
Source: {alert.source}
Name: {alert.name}
Description: {alert.description}
Labels: {alert.labels}
Annotations: {alert.annotations}
Fired at: {alert.fired_at.isoformat()}
"""
messages = [
SystemMessage(content=TRIAGE_PROMPT),
HumanMessage(content=alert_context)
]
response = await llm.ainvoke(messages)
import json
try:
result = json.loads(response.content)
except json.JSONDecodeError:
# Extract JSON from markdown code blocks if wrapped
content = response.content
if "```json" in content:
content = content.split("```json")[1].split("```")[0]
result = json.loads(content.strip())
severity = Severity(result["severity"])
logger.info("alert_triaged",
alert_id=alert.alert_id,
severity=severity.value,
is_known_pattern=result["is_known_pattern"])
return {
"severity": severity,
"triage_summary": result["summary"],
"is_known_pattern": result["is_known_pattern"],
"known_pattern_id": result.get("known_pattern_id"),
"current_step": "gather_evidence"
}
Why this matters: The triage step does double duty. It classifies severity (which gates escalation later) and it generates the investigation queries. Instead of hardcoding "if database alert, query these metrics," the LLM generates context-appropriate Prometheus queries based on the specific alert content.
Node 2: Evidence Gathering
This is the most engineering-heavy node. It executes the queries generated during triage and structures the results.
from prometheus_api_client import PrometheusConnect
from datetime import timedelta
import aiohttp
# Initialize connections
prom = PrometheusConnect(url="http://prometheus:9090", disable_ssl=True)
async def gather_evidence(state: IncidentState) -> dict:
"""Collect metrics, logs, and deployment data for analysis."""
alert = state.alert
llm = ChatOpenAI(model="gpt-4o", temperature=0)
# Get triage-identified queries
triage_result = await _get_triage_queries(state)
metrics = []
logs = []
recent_deployments = []
related_alerts = []
# Execute metric queries
for query_info in triage_result.get("metrics_to_query", []):
try:
metric_data = await _query_prometheus(
query=query_info,
start=alert.fired_at - timedelta(hours=1),
end=alert.fired_at + timedelta(minutes=5)
)
metrics.append(metric_data)
except Exception as e:
logger.warning("metric_query_failed", query=query_info, error=str(e))
# Query logs from Loki
for log_query in triage_result.get("log_queries", []):
try:
log_data = await _query_loki(
query=log_query,
start=alert.fired_at - timedelta(minutes=30),
end=alert.fired_at + timedelta(minutes=5)
)
logs.extend(log_data)
except Exception as e:
logger.warning("log_query_failed", query=log_query, error=str(e))
# Check recent deployments (Kubernetes)
namespace = alert.labels.get("namespace", "default")
recent_deployments = await _get_recent_deployments(
namespace=namespace,
since=alert.fired_at - timedelta(hours=2)
)
# Find related alerts
related_alerts = await _find_related_alerts(alert)
return {
"metrics": metrics,
"logs": logs[:200], # Cap logs to prevent context overflow
"recent_deployments": recent_deployments,
"related_alerts": related_alerts,
"current_step": "root_cause_analysis"
}
async def _query_prometheus(query: str, start: datetime, end: datetime) -> MetricSnapshot:
"""Execute a Prometheus range query."""
result = prom.custom_query_range(
query=query,
start_time=start,
end_time=end,
step="15s"
)
if not result:
return MetricSnapshot(
metric_name=query,
query=query,
values=[],
unit="unknown"
)
values = [
{"timestamp": float(v[0]), "value": float(v[1])}
for v in result[0]["values"]
]
# Simple anomaly detection: compare last 5min to previous 30min
recent = [v["value"] for v in values[-20:]] # last 5 min
baseline = [v["value"] for v in values[:-20]] # previous 30 min
anomaly_detected = False
anomaly_desc = ""
if baseline and recent:
baseline_avg = sum(baseline) / len(baseline)
recent_avg = sum(recent) / len(recent)
if baseline_avg > 0:
change_pct = abs(recent_avg - baseline_avg) / baseline_avg
if change_pct > 0.5: # 50% deviation
anomaly_detected = True
direction = "increased" if recent_avg > baseline_avg else "decreased"
anomaly_desc = (
f"Metric {direction} by {change_pct:.0%} "
f"(baseline: {baseline_avg:.2f}, recent: {recent_avg:.2f})"
)
return MetricSnapshot(
metric_name=query.split("{")[0] if "{" in query else query,
query=query,
values=values,
unit="",
anomaly_detected=anomaly_detected,
anomaly_description=anomaly_desc
)
async def _query_loki(query: str, start: datetime, end: datetime) -> list[LogEntry]:
"""Query Loki for relevant logs."""
loki_url = "http://loki:3100/loki/api/v1/query_range"
async with aiohttp.ClientSession() as session:
params = {
"query": query,
"start": str(int(start.timestamp())),
"end": str(int(end.timestamp())),
"limit": 100,
"direction": "backward"
}
async with session.get(loki_url, params=params) as resp:
data = await resp.json()
entries = []
for stream in data.get("data", {}).get("result", []):
for value in stream.get("values", []):
entries.append(LogEntry(
timestamp=datetime.fromtimestamp(float(value[0]) / 1e9),
source=stream.get("stream", {}).get("job", "unknown"),
level=stream.get("stream", {}).get("level", "unknown"),
message=value[1],
structured_fields=stream.get("stream", {})
))
return sorted(entries, key=lambda x: x.timestamp, reverse=True)
async def _get_recent_deployments(namespace: str, since: datetime) -> list[dict]:
"""Check for recent Kubernetes deployments."""
from kubernetes import client, config
try:
config.load_incluster_config()
except config.ConfigException:
config.load_kube_config()
apps_v1 = client.AppsV1Api()
deployments = apps_v1.list_namespaced_deployment(namespace=namespace)
recent = []
for deploy in deployments.items:
if deploy.metadata.annotations:
change_cause = deploy.metadata.annotations.get(
"kubernetes.io/change-cause", ""
)
# Check rollout status
if deploy.status.conditions:
for condition in deploy.status.conditions:
if (condition.type == "Progressing" and
condition.status == "False"):
recent.append({
"name": deploy.metadata.name,
"namespace": namespace,
"replicas": deploy.spec.replicas,
"ready_replicas": deploy.status.ready_replicas,
"change_cause": change_cause,
"status": "failing"
})
return recent
The key insight: Evidence gathering isn't just data collection. The _query_prometheus function includes lightweight anomaly detection that flags deviations before the LLM even sees the data. This gives the reasoning step concrete signals to work with rather than raw time series.
Node 3: Root Cause Analysis
This is where the LLM earns its keep. The prompt engineering here is critical — you need the model to reason about evidence, not hallucinate causes.
RCA_PROMPT = """You are a senior SRE performing root cause analysis. You have been
provided with alert data, metric snapshots, log entries, and deployment history.
Your job:
1. Analyze the evidence objectively — don't jump to conclusions
2. Generate 2-3 hypotheses ranked by likelihood
3. For each hypothesis, cite specific evidence (metric values, log lines)
4. Identify the most likely root cause
5. Recommend a specific remediation
Rules:
- If metrics show no anomaly, say so. Not every alert is a real incident.
- If a recent deployment correlates with the issue, weigh that heavily.
- If you see connection pool exhaustion, check for: slow queries, connection leaks,
upstream service failures, recent config changes.
- If you see OOM patterns, check for: memory leaks (gradual increase), sudden traffic
spikes, new code paths, missing resource limits.
- Confidence below 0.5 means you should recommend escalation, not automated remediation.
Respond in JSON:
{
"hypotheses": [
{
"cause": "description",
"confidence": 0.0-1.0,
"evidence": ["specific evidence items"],
"affected_components": ["component names"],
"suggested_remediation": "specific action"
}
],
"selected_hypothesis_index": 0,
"additional_investigation_needed": false,
"recommended_actions": [
{
"action_type": "kubectl_restart|scale_deployment|rollback_deployment|...",
"target": "deployment/service name",
"parameters": {},
"requires_approval": true,
"estimated_risk": "low|medium|high"
}
]
}
"""
async def analyze_root_cause(state: IncidentState) -> dict:
"""Determine root cause from gathered evidence."""
llm = ChatOpenAI(model="gpt-4o", temperature=0)
# Build evidence summary
evidence_sections = []
# Metrics
for metric in state.metrics:
section = f"### Metric: {metric.metric_name}\nQuery: `{metric.query}`\n"
if metric.anomaly_detected:
section += f"**ANOMALY**: {metric.anomaly_description}\n"
if metric.values:
recent_values = metric.values[-10:]
section += "Recent values:\n"
for v in recent_values:
section += f" - {v['timestamp']}: {v['value']:.2f}\n"
evidence_sections.append(section)
# Logs
if state.logs:
log_section = "### Relevant Logs\n"
for log in state.logs[:50]:
log_section += f"[{log.level}] {log.message}\n"
evidence_sections.append(log_section)
# Deployments
if state.recent_deployments:
deploy_section = "### Recent Deployments\n"
for deploy in state.recent_deployments:
deploy_section += f"- {deploy['name']}: {deploy.get('change_cause', 'no change cause recorded')}\n"
deploy_section += f" Status: {deploy.get('status', 'unknown')}\n"
evidence_sections.append(deploy_section)
# Related alerts
if state.related_alerts:
related_section = "### Related Alerts\n"
for related in state.related_alerts:
related_section += f"- {related.name}: {related.description}\n"
evidence_sections.append(related_section)
evidence_text = "\n".join(evidence_sections)
messages = [
SystemMessage(content=RCA_PROMPT),
HumanMessage(content=f"""
## Alert
{state.alert.name}: {state.alert.description}
Labels: {state.alert.labels}
## Triage Summary
{state.triage_summary}
Known pattern: {state.is_known_pattern} ({state.known_pattern_id or 'N/A'})
## Evidence
{evidence_text}
""")
]
response = await llm.ainvoke(messages)
import json
result = json.loads(_extract_json(response.content))
hypotheses = [
RootCauseHypothesis(
cause=h["cause"],
confidence=h["confidence"],
evidence=h["evidence"],
affected_components=h["affected_components"],
suggested_remediation=h["suggested_remediation"]
)
for h in result["hypotheses"]
]
selected_idx = result.get("selected_hypothesis_index", 0)
selected = hypotheses[selected_idx] if hypotheses else None
remediation_plan = [
RemediationAction(**action)
for action in result.get("recommended_actions", [])
]
logger.info("rca_complete",
num_hypotheses=len(hypotheses),
top_confidence=hypotheses[0].confidence if hypotheses else 0,
selected_cause=selected.cause if selected else "unknown")
return {
"hypotheses": hypotheses,
"selected_hypothesis": selected,
"remediation_plan": remediation_plan,
"current_step": "decide_action"
}
def _extract_json(text: str) -> str:
"""Extract JSON from LLM response, handling markdown wrappers."""
if "```json" in text:
return text.split("```json")[1].split("```")[0].strip()
if "```" in text:
return text.split("```")[1].split("```")[0].strip()
# Find first { or [
for i, char in enumerate(text):
if char in "{[":
return text[i:]
return text
Node 4: Decision Engine — To Remediate or Escalate
This is the most important node architecturally. It's the decision gate that determines whether the agent acts autonomously or hands off to humans.
from langgraph.types import interrupt
async def decide_action(state: IncidentState) -> dict:
"""Decide whether to remediate automatically or escalate."""
hypothesis = state.selected_hypothesis
if not hypothesis:
return {
"current_step": "escalate",
"escalation_reason": "No viable hypothesis identified",
"escalated": True
}
# Decision matrix
should_escalate = False
escalation_reason = ""
# Rule 1: Low confidence always escalates
if hypothesis.confidence < 0.6:
should_escalate = True
escalation_reason = (
f"Low confidence ({hypothesis.confidence:.0%}) in root cause: "
f"{hypothesis.cause}"
)
# Rule 2: High-risk remediations require human approval
high_risk_actions = [
a for a in state.remediation_plan if a.estimated_risk == "high"
]
if high_risk_actions:
should_escalate = True
escalation_reason = (
f"Remediation involves high-risk actions: "
f"{', '.join(a.action_type for a in high_risk_actions)}"
)
# Rule 3: Critical severity with data loss risk
if (state.severity == Severity.CRITICAL and
any(kw in hypothesis.cause.lower()
for kw in ["data loss", "corruption", "security", "breach"])):
should_escalate = True
escalation_reason = "Critical incident with data/security risk requires human oversight"
# Rule 4: Database and stateful service operations
stateful_targets = ["database", "redis", "elasticsearch", "kafka", "zookeeper"]
for action in state.remediation_plan:
if any(s in action.target.lower() for s in stateful_targets):
if action.action_type in ["failover_database", "custom_script"]:
should_escalate = True
escalation_reason = f"Stateful service operation on {action.target}"
if should_escalate:
logger.info("escalating_incident",
alert_id=state.alert.alert_id,
reason=escalation_reason)
return {
"current_step": "escalate",
"escalation_reason": escalation_reason,
"escalated": True
}
# For medium-risk actions, request human approval via interrupt
medium_risk = [
a for a in state.remediation_plan if a.estimated_risk == "medium"
]
if medium_risk:
return {
"current_step": "request_approval",
"human_approval_needed": True
}
# Low-risk actions proceed automatically
return {"current_step": "remediate"}
The interrupt pattern in LangGraph is what makes this production-safe. When the agent hits request_approval, it pauses execution and surfaces the remediation plan to a human via Slack/PagerDuty. The workflow resumes only after the human responds. This is fundamentally different from "the LLM decides everything" — it's a structured handoff.
async def request_human_approval(state: IncidentState) -> dict:
"""Pause workflow and request human approval for medium-risk actions."""
# This interrupt surfaces to the human operator
approval_request = interrupt({
"type": "remediation_approval",
"alert_id": state.alert.alert_id,
"severity": state.severity.value,
"hypothesis": state.selected_hypothesis.cause,
"confidence": state.selected_hypothesis.confidence,
"evidence": state.selected_hypothesis.evidence,
"proposed_actions": [
{
"action": a.action_type,
"target": a.target,
"parameters": a.parameters,
"risk": a.estimated_risk
}
for a in state.remediation_plan
],
"message": (
f"Remediation proposed for {state.alert.name}. "
f"Root cause: {state.selected_hypothesis.cause} "
f"(confidence: {state.selected_hypothesis.confidence:.0%}). "
f"Approve automated remediation?"
)
})
# When the workflow resumes, approval_request contains the human's response
if approval_request.get("approved"):
return {"current_step": "remediate", "human_decision": "approved"}
else:
return {
"current_step": "escalate",
"human_decision": "rejected",
"escalation_reason": "Human rejected automated remediation",
"escalated": True
}
Node 5: Automated Remediation
from kubernetes import client, config
# Map action types to handler functions
REMEDIATION_HANDLERS = {}
def register_handler(action_type: str):
def decorator(func):
REMEDIATION_HANDLERS[action_type] = func
return func
return decorator
@register_handler("kubectl_restart")
async def restart_deployment(action: RemediationAction) -> str:
"""Rolling restart of a Kubernetes deployment."""
try:
config.load_incluster_config()
except config.ConfigException:
config.load_kube_config()
apps_v1 = client.AppsV1Api()
namespace = action.parameters.get("namespace", "default")
# Trigger rolling restart by patching annotation
now = datetime.utcnow().isoformat()
body = {
"spec": {
"template": {
"metadata": {
"annotations": {"kubectl.kubernetes.io/restartedAt": now}
}
}
}
}
apps_v1.patch_namespaced_deployment(
name=action.target,
namespace=namespace,
body=body
)
return f"Initiated rolling restart of {action.target} in {namespace}"
@register_handler("scale_deployment")
async def scale_deployment(action: RemediationAction) -> str:
"""Scale a deployment to specified replica count."""
try:
config.load_incluster_config()
except config.ConfigException:
config.load_kube_config()
apps_v1 = client.AppsV1Api()
namespace = action.parameters.get("namespace", "default")
replicas = action.parameters.get("replicas", 3)
body = {"spec": {"replicas": replicas}}
apps_v1.patch_namespaced_deployment_scale(
name=action.target,
namespace=namespace,
body=body
)
return f"Scaled {action.target} to {replicas} replicas"
@register_handler("rollback_deployment")
async def rollback_deployment(action: RemediationAction) -> str:
"""Rollback to the previous revision."""
try:
config.load_incluster_config()
except config.ConfigException:
config.load_kube_config()
apps_v1 = client.AppsV1Api()
namespace = action.parameters.get("namespace", "default")
# Get current deployment to find previous revision
deploy = apps_v1.read_namespaced_deployment(
name=action.target, namespace=namespace
)
current_revision = int(
deploy.metadata.annotations.get(
"deployment.kubernetes.io/revision", "1"
)
)
if current_revision <= 1:
return f"No previous revision available for {action.target}"
# Use the Kubernetes API to rollback
# (In practice, you'd use the rollout undo command equivalent)
body = {
"kind": "DeploymentRollback",
"apiVersion": "extensions/v1beta1",
"name": action.target,
"rollbackTo": {"revision": current_revision - 1}
}
# Alternative: patch to previous revision's spec
# This is simplified — production code should store revision history
return f"Rollback initiated for {action.target} to revision {current_revision - 1}"
@register_handler("clear_cache")
async def clear_cache(action: RemediationAction) -> str:
"""Clear application cache via exec or API call."""
cache_type = action.parameters.get("cache_type", "redis")
if cache_type == "redis":
import redis.asyncio as aioredis
redis_url = action.parameters.get(
"redis_url", "redis://redis:6379/0"
)
r = aioredis.from_url(redis_url)
pattern = action.parameters.get("pattern", "*")
await r.flushdb()
await r.aclose()
return f"Cleared Redis cache (pattern: {pattern})"
elif cache_type == "api":
endpoint = action.parameters.get("cache_clear_endpoint")
async with aiohttp.ClientSession() as session:
async with session.post(endpoint) as resp:
return f"Cache clear API call: {resp.status}"
return "Unknown cache type"
async def execute_remediation(state: IncidentState) -> dict:
"""Execute the remediation plan and verify results."""
results = []
for action in state.remediation_plan:
handler = REMEDIATION_HANDLERS.get(action.action_type)
if not handler:
results.append(
f"SKIP: No handler for action type {action.action_type}"
)
continue
try:
result = await handler(action)
results.append(f"SUCCESS: {result}")
logger.info("remediation_executed",
action=action.action_type,
target=action.target,
result=result)
except Exception as e:
results.append(f"FAILED: {action.action_type} - {str(e)}")
logger.error("remediation_failed",
action=action.action_type,
error=str(e))
return {
"remediation_executed": True,
"remediation_result": "\n".join(results),
"current_step": "verify"
}
Node 6: Verification
Remediation without verification is just hoping. This node checks whether the fix actually worked.
async def verify_remediation(state: IncidentState) -> dict:
"""Check if the remediation resolved the issue."""
await asyncio.sleep(120) # Wait 2 minutes for effects to propagate
alert = state.alert
hypothesis = state.selected_hypothesis
# Re-query the key metrics from the original investigation
verification_metrics = []
for metric in state.metrics:
if metric.anomaly_detected:
try:
current = await _query_prometheus(
query=metric.query,
start=datetime.utcnow() - timedelta(minutes=5),
end=datetime.utcnow()
)
verification_metrics.append(current)
except Exception:
pass
# Check if the original alert is still firing
alert_still_active = await _check_alert_status(alert.alert_id)
# Use LLM to interpret verification results
llm = ChatOpenAI(model="gpt-4o", temperature=0)
verification_context = f"""
Original issue: {hypothesis.cause}
Remediation taken: {state.remediation_result}
Alert still active: {alert_still_active}
Post-remediation metrics:
"""
for vm in verification_metrics:
verification_context += (
f"- {vm.metric_name}: anomaly={vm.anomaly_detected}, "
f"desc={vm.anomaly_description}\n"
)
response = await llm.ainvoke([
SystemMessage(content=(
"Determine if the remediation was successful. "
"Respond with JSON: {\"verified\": bool, \"reason\": str, "
"\"needs_escalation\": bool}"
)),
HumanMessage(content=verification_context)
])
import json
result = json.loads(_extract_json(response.content))
if result["verified"]:
return {
"remediation_verified": True,
"current_step": "complete"
}
elif result.get("needs_escalation"):
return {
"remediation_verified": False,
"current_step": "escalate",
"escalation_reason": (
f"Remediation did not resolve the issue: {result['reason']}"
),
"escalated": True
}
else:
# Retry with different approach — loop back
return {
"remediation_verified": False,
"current_step": "root_cause_analysis"
}
Node 7: Escalation
async def escalate_incident(state: IncidentState) -> dict:
"""Escalate to human responders with full context."""
alert = state.alert
# Build escalation payload
escalation_context = {
"alert": {
"id": alert.alert_id,
"name": alert.name,
"severity": state.severity.value,
"description": alert.description,
},
"triage_summary": state.triage_summary,
"investigation_findings": {
"hypotheses": [
{
"cause": h.cause,
"confidence": h.confidence,
"evidence": h.evidence
}
for h in state.hypotheses
],
"metrics_analyzed": len(state.metrics),
"anomalies_found": sum(
1 for m in state.metrics if m.anomaly_detected
),
"recent_deployments": state.recent_deployments,
},
"remediation_attempted": state.remediation_executed,
"remediation_result": state.remediation_result,
"escalation_reason": state.escalation_reason,
}
# Send to PagerDuty
await _create_pagerduty_incident(escalation_context)
# Post to Slack
await _post_slack_escalation(escalation_context)
logger.info("incident_escalated",
alert_id=alert.alert_id,
reason=state.escalation_reason)
return {"escalated": True, "current_step": "complete"}
async def _create_pagerduty_incident(context: dict) -> str:
"""Create a PagerDuty incident with investigation context."""
import os
pagerduty_key = os.environ["PAGERDUTY_ROUTING_KEY"]
url = "https://events.pagerduty.com/v2/enqueue"
severity_map = {
"critical": "critical",
"high": "error",
"medium": "warning",
"low": "info"
}
payload = {
"routing_key": pagerduty_key,
"event_action": "trigger",
"dedup_key": context["alert"]["id"],
"payload": {
"summary": (
f"[{context['alert']['severity'].upper()}] "
f"{context['alert']['name']}: "
f"{context['escalation_reason']}"
),
"severity": severity_map.get(
context["alert"]["severity"], "warning"
),
"source": "incident-response-agent",
"custom_details": context
}
}
async with aiohttp.ClientSession() as session:
async with session.post(url, json=payload) as resp:
result = await resp.json()
return result.get("dedup_key", "")
async def _post_slack_escalation(context: dict) -> None:
"""Post escalation summary to Slack."""
import os
webhook_url = os.environ["SLACK_WEBHOOK_URL"]
hypotheses_text = "\n".join(
f"• *{h['cause']}* (confidence: {h['confidence']:.0%})"
for h in context["investigation_findings"]["hypotheses"]
)
blocks = [
{
"type": "header",
"text": {
"type": "plain_text",
"text": f"🚨 Incident Escalated: {context['alert']['name']}"
}
},
{
"type":