Back to Home
DevOps Agents

Building an Incident Response Agent That Fixes Production Issues

Oliver Schmidt

DevOps engineer covering AI agents for operations and deployment.

April 28, 202617 min read

Incident response is where AI agents can deliver immediate, measurable ROI. Not the vague "AI-powered" marketing fluff — actual agents that triage alerts, correlate signals across systems, identify ro...

Building an AI Agent for Automated Incident Response: A Production-Grade Tutorial

Incident response is where AI agents can deliver immediate, measurable ROI. Not the vague "AI-powered" marketing fluff — actual agents that triage alerts, correlate signals across systems, identify root causes, and either fix problems or escalate them intelligently.

This tutorial walks through building a production-grade incident response agent using LangGraph for orchestration, real observability tools for signal gathering, and concrete remediation strategies. Every code snippet here runs against actual infrastructure APIs.

Architecture Overview

The incident response agent follows a state machine pattern, not a simple prompt chain. This matters because incident response has branching logic, retry loops, and human-in-the-loop checkpoints that naive sequential chains handle poorly.

┌─────────────┐     ┌──────────────┐     ┌─────────────────┐
│ Alert        │────▶│ Triage &     │────▶│ Root Cause      │
│ Ingestion    │     │ Classification│     │ Analysis        │
└─────────────┘     └──────────────┘     └────────┬────────┘
                                                   │
                    ┌──────────────┐     ┌─────────▼────────┐
                    │ Escalation   │◀────│ Remediation      │
                    │ (if needed)  │     │ Decision Engine   │
                    └──────────────┘     └─────────┬────────┘
                                                   │
                                          ┌────────▼────────┐
                                          │ Execute & Verify │
                                          └─────────────────┘

Tech stack:

  • LangGraph — agent orchestration with state management and conditional routing
  • Prometheus + Grafana — metrics collection and querying
  • Loki — log aggregation
  • PagerDuty / Opsgenie — escalation
  • Kubernetes client — infrastructure remediation
  • OpenAI GPT-4 / Claude — reasoning backbone

Setting Up the Project

mkdir incident-response-agent && cd incident-response-agent
python -m venv venv && source venv/bin/activate

pip install langgraph langchain-openai langchain-anthropic \
    prometheus-api-client python-requests kubernetes \
    structlog pydantic

Defining the Agent State

LangGraph agents are stateful. The state object carries all context through the workflow. Get this right and the rest falls into place.

from typing import Literal, Optional
from pydantic import BaseModel, Field
from datetime import datetime
from enum import Enum


class Severity(str, Enum):
    CRITICAL = "critical"
    HIGH = "high"
    MEDIUM = "medium"
    LOW = "low"


class Alert(BaseModel):
    alert_id: str
    source: str  # prometheus, datadog, cloudwatch, etc.
    name: str
    description: str
    labels: dict[str, str]
    annotations: dict[str, str]
    fired_at: datetime
    raw_payload: dict


class MetricSnapshot(BaseModel):
    metric_name: str
    query: str
    values: list[dict]  # timestamp, value pairs
    unit: str
    anomaly_detected: bool = False
    anomaly_description: str = ""


class LogEntry(BaseModel):
    timestamp: datetime
    source: str
    level: str
    message: str
    structured_fields: dict = {}


class RootCauseHypothesis(BaseModel):
    cause: str
    confidence: float = Field(ge=0.0, le=1.0)
    evidence: list[str]
    affected_components: list[str]
    suggested_remediation: str


class RemediationAction(BaseModel):
    action_type: Literal[
        "kubectl_restart", "scale_deployment", "rollback_deployment",
        "clear_cache", "rotate_certificate", "failover_database",
        "custom_script", "manual_only"
    ]
    target: str
    parameters: dict = {}
    requires_approval: bool = True
    estimated_risk: Literal["low", "medium", "high"] = "medium"


class IncidentState(BaseModel):
    # Input
    alert: Alert

    # Triage output
    severity: Optional[Severity] = None
    triage_summary: str = ""
    is_known_pattern: bool = False
    known_pattern_id: Optional[str] = None

    # Investigation data
    metrics: list[MetricSnapshot] = []
    logs: list[LogEntry] = []
    recent_deployments: list[dict] = []
    related_alerts: list[Alert] = []

    # Root cause analysis
    hypotheses: list[RootCauseHypothesis] = []
    selected_hypothesis: Optional[RootCauseHypothesis] = None

    # Remediation
    remediation_plan: list[RemediationAction] = []
    remediation_executed: bool = False
    remediation_result: str = ""
    remediation_verified: bool = False

    # Escalation
    escalated: bool = False
    escalation_reason: str = ""
    escalation_target: str = ""

    # Control flow
    current_step: str = "triage"
    errors: list[str] = []
    human_approval_needed: bool = False
    human_decision: Optional[str] = None

This state object is the single source of truth. Every node reads from it and writes to it. No hidden side effects.

Node 1: Alert Triage and Classification

Triage is where most automation attempts fail because they try to be too clever. The goal isn't to solve the problem — it's to answer three questions fast: How bad is this? Have we seen it before? What data do we need?

import structlog
from langchain_openai import ChatOpenAI
from langchain_core.messages import SystemMessage, HumanMessage

logger = structlog.get_logger()

TRIAGE_PROMPT = """You are an SRE triage specialist. Given an incoming alert,
classify it and determine the investigation strategy.

Respond in JSON with these fields:
- severity: critical|high|medium|low
- summary: one-paragraph description of what's happening
- is_known_pattern: boolean
- known_pattern_id: if known pattern, identify it (e.g., "connection_pool_exhaustion",
  "oom_kill_loop", "cert_expiration", "deploy_latency_regression")
- metrics_to_query: list of Prometheus queries needed for investigation
- log_queries: list of log filter expressions
- check_deployments: boolean — should we check recent deployments?

Classification guidelines:
- CRITICAL: Customer-facing outage, data loss risk, security breach
- HIGH: Degraded service affecting >10% of users, approaching SLA breach
- MEDIUM: Performance degradation, non-critical component failure
- LOW: Warning conditions, capacity planning alerts, non-urgent anomalies

Known patterns you should recognize:
1. Connection pool exhaustion: usually shows as DB connection timeout alerts
   combined with increasing connection count metrics
2. OOM kill loops: container restarts + memory approaching limits
3. Certificate expiration: TLS errors with expiry within 7 days
4. Deploy-triggered latency: latency spike within 30min of deployment
5. DNS resolution failures: sudden spike in DNS timeout errors
6. Disk pressure: inode or volume usage approaching 100%
"""


async def triage_alert(state: IncidentState) -> dict:
    """Classify the alert and determine investigation strategy."""
    alert = state.alert

    llm = ChatOpenAI(model="gpt-4o", temperature=0)

    alert_context = f"""
Alert ID: {alert.alert_id}
Source: {alert.source}
Name: {alert.name}
Description: {alert.description}
Labels: {alert.labels}
Annotations: {alert.annotations}
Fired at: {alert.fired_at.isoformat()}
"""

    messages = [
        SystemMessage(content=TRIAGE_PROMPT),
        HumanMessage(content=alert_context)
    ]

    response = await llm.ainvoke(messages)

    import json
    try:
        result = json.loads(response.content)
    except json.JSONDecodeError:
        # Extract JSON from markdown code blocks if wrapped
        content = response.content
        if "```json" in content:
            content = content.split("```json")[1].split("```")[0]
        result = json.loads(content.strip())

    severity = Severity(result["severity"])

    logger.info("alert_triaged",
                alert_id=alert.alert_id,
                severity=severity.value,
                is_known_pattern=result["is_known_pattern"])

    return {
        "severity": severity,
        "triage_summary": result["summary"],
        "is_known_pattern": result["is_known_pattern"],
        "known_pattern_id": result.get("known_pattern_id"),
        "current_step": "gather_evidence"
    }

Why this matters: The triage step does double duty. It classifies severity (which gates escalation later) and it generates the investigation queries. Instead of hardcoding "if database alert, query these metrics," the LLM generates context-appropriate Prometheus queries based on the specific alert content.

Node 2: Evidence Gathering

This is the most engineering-heavy node. It executes the queries generated during triage and structures the results.

from prometheus_api_client import PrometheusConnect
from datetime import timedelta
import aiohttp

# Initialize connections
prom = PrometheusConnect(url="http://prometheus:9090", disable_ssl=True)


async def gather_evidence(state: IncidentState) -> dict:
    """Collect metrics, logs, and deployment data for analysis."""
    alert = state.alert
    llm = ChatOpenAI(model="gpt-4o", temperature=0)

    # Get triage-identified queries
    triage_result = await _get_triage_queries(state)

    metrics = []
    logs = []
    recent_deployments = []
    related_alerts = []

    # Execute metric queries
    for query_info in triage_result.get("metrics_to_query", []):
        try:
            metric_data = await _query_prometheus(
                query=query_info,
                start=alert.fired_at - timedelta(hours=1),
                end=alert.fired_at + timedelta(minutes=5)
            )
            metrics.append(metric_data)
        except Exception as e:
            logger.warning("metric_query_failed", query=query_info, error=str(e))

    # Query logs from Loki
    for log_query in triage_result.get("log_queries", []):
        try:
            log_data = await _query_loki(
                query=log_query,
                start=alert.fired_at - timedelta(minutes=30),
                end=alert.fired_at + timedelta(minutes=5)
            )
            logs.extend(log_data)
        except Exception as e:
            logger.warning("log_query_failed", query=log_query, error=str(e))

    # Check recent deployments (Kubernetes)
    namespace = alert.labels.get("namespace", "default")
    recent_deployments = await _get_recent_deployments(
        namespace=namespace,
        since=alert.fired_at - timedelta(hours=2)
    )

    # Find related alerts
    related_alerts = await _find_related_alerts(alert)

    return {
        "metrics": metrics,
        "logs": logs[:200],  # Cap logs to prevent context overflow
        "recent_deployments": recent_deployments,
        "related_alerts": related_alerts,
        "current_step": "root_cause_analysis"
    }


async def _query_prometheus(query: str, start: datetime, end: datetime) -> MetricSnapshot:
    """Execute a Prometheus range query."""
    result = prom.custom_query_range(
        query=query,
        start_time=start,
        end_time=end,
        step="15s"
    )

    if not result:
        return MetricSnapshot(
            metric_name=query,
            query=query,
            values=[],
            unit="unknown"
        )

    values = [
        {"timestamp": float(v[0]), "value": float(v[1])}
        for v in result[0]["values"]
    ]

    # Simple anomaly detection: compare last 5min to previous 30min
    recent = [v["value"] for v in values[-20:]]  # last 5 min
    baseline = [v["value"] for v in values[:-20]]  # previous 30 min

    anomaly_detected = False
    anomaly_desc = ""

    if baseline and recent:
        baseline_avg = sum(baseline) / len(baseline)
        recent_avg = sum(recent) / len(recent)
        if baseline_avg > 0:
            change_pct = abs(recent_avg - baseline_avg) / baseline_avg
            if change_pct > 0.5:  # 50% deviation
                anomaly_detected = True
                direction = "increased" if recent_avg > baseline_avg else "decreased"
                anomaly_desc = (
                    f"Metric {direction} by {change_pct:.0%} "
                    f"(baseline: {baseline_avg:.2f}, recent: {recent_avg:.2f})"
                )

    return MetricSnapshot(
        metric_name=query.split("{")[0] if "{" in query else query,
        query=query,
        values=values,
        unit="",
        anomaly_detected=anomaly_detected,
        anomaly_description=anomaly_desc
    )


async def _query_loki(query: str, start: datetime, end: datetime) -> list[LogEntry]:
    """Query Loki for relevant logs."""
    loki_url = "http://loki:3100/loki/api/v1/query_range"

    async with aiohttp.ClientSession() as session:
        params = {
            "query": query,
            "start": str(int(start.timestamp())),
            "end": str(int(end.timestamp())),
            "limit": 100,
            "direction": "backward"
        }
        async with session.get(loki_url, params=params) as resp:
            data = await resp.json()

    entries = []
    for stream in data.get("data", {}).get("result", []):
        for value in stream.get("values", []):
            entries.append(LogEntry(
                timestamp=datetime.fromtimestamp(float(value[0]) / 1e9),
                source=stream.get("stream", {}).get("job", "unknown"),
                level=stream.get("stream", {}).get("level", "unknown"),
                message=value[1],
                structured_fields=stream.get("stream", {})
            ))

    return sorted(entries, key=lambda x: x.timestamp, reverse=True)


async def _get_recent_deployments(namespace: str, since: datetime) -> list[dict]:
    """Check for recent Kubernetes deployments."""
    from kubernetes import client, config

    try:
        config.load_incluster_config()
    except config.ConfigException:
        config.load_kube_config()

    apps_v1 = client.AppsV1Api()
    deployments = apps_v1.list_namespaced_deployment(namespace=namespace)

    recent = []
    for deploy in deployments.items:
        if deploy.metadata.annotations:
            change_cause = deploy.metadata.annotations.get(
                "kubernetes.io/change-cause", ""
            )
            # Check rollout status
            if deploy.status.conditions:
                for condition in deploy.status.conditions:
                    if (condition.type == "Progressing" and
                            condition.status == "False"):
                        recent.append({
                            "name": deploy.metadata.name,
                            "namespace": namespace,
                            "replicas": deploy.spec.replicas,
                            "ready_replicas": deploy.status.ready_replicas,
                            "change_cause": change_cause,
                            "status": "failing"
                        })

    return recent

The key insight: Evidence gathering isn't just data collection. The _query_prometheus function includes lightweight anomaly detection that flags deviations before the LLM even sees the data. This gives the reasoning step concrete signals to work with rather than raw time series.

Node 3: Root Cause Analysis

This is where the LLM earns its keep. The prompt engineering here is critical — you need the model to reason about evidence, not hallucinate causes.

RCA_PROMPT = """You are a senior SRE performing root cause analysis. You have been
provided with alert data, metric snapshots, log entries, and deployment history.

Your job:
1. Analyze the evidence objectively — don't jump to conclusions
2. Generate 2-3 hypotheses ranked by likelihood
3. For each hypothesis, cite specific evidence (metric values, log lines)
4. Identify the most likely root cause
5. Recommend a specific remediation

Rules:
- If metrics show no anomaly, say so. Not every alert is a real incident.
- If a recent deployment correlates with the issue, weigh that heavily.
- If you see connection pool exhaustion, check for: slow queries, connection leaks,
  upstream service failures, recent config changes.
- If you see OOM patterns, check for: memory leaks (gradual increase), sudden traffic
  spikes, new code paths, missing resource limits.
- Confidence below 0.5 means you should recommend escalation, not automated remediation.

Respond in JSON:
{
  "hypotheses": [
    {
      "cause": "description",
      "confidence": 0.0-1.0,
      "evidence": ["specific evidence items"],
      "affected_components": ["component names"],
      "suggested_remediation": "specific action"
    }
  ],
  "selected_hypothesis_index": 0,
  "additional_investigation_needed": false,
  "recommended_actions": [
    {
      "action_type": "kubectl_restart|scale_deployment|rollback_deployment|...",
      "target": "deployment/service name",
      "parameters": {},
      "requires_approval": true,
      "estimated_risk": "low|medium|high"
    }
  ]
}
"""


async def analyze_root_cause(state: IncidentState) -> dict:
    """Determine root cause from gathered evidence."""
    llm = ChatOpenAI(model="gpt-4o", temperature=0)

    # Build evidence summary
    evidence_sections = []

    # Metrics
    for metric in state.metrics:
        section = f"### Metric: {metric.metric_name}\nQuery: `{metric.query}`\n"
        if metric.anomaly_detected:
            section += f"**ANOMALY**: {metric.anomaly_description}\n"
        if metric.values:
            recent_values = metric.values[-10:]
            section += "Recent values:\n"
            for v in recent_values:
                section += f"  - {v['timestamp']}: {v['value']:.2f}\n"
        evidence_sections.append(section)

    # Logs
    if state.logs:
        log_section = "### Relevant Logs\n"
        for log in state.logs[:50]:
            log_section += f"[{log.level}] {log.message}\n"
        evidence_sections.append(log_section)

    # Deployments
    if state.recent_deployments:
        deploy_section = "### Recent Deployments\n"
        for deploy in state.recent_deployments:
            deploy_section += f"- {deploy['name']}: {deploy.get('change_cause', 'no change cause recorded')}\n"
            deploy_section += f"  Status: {deploy.get('status', 'unknown')}\n"
        evidence_sections.append(deploy_section)

    # Related alerts
    if state.related_alerts:
        related_section = "### Related Alerts\n"
        for related in state.related_alerts:
            related_section += f"- {related.name}: {related.description}\n"
        evidence_sections.append(related_section)

    evidence_text = "\n".join(evidence_sections)

    messages = [
        SystemMessage(content=RCA_PROMPT),
        HumanMessage(content=f"""
## Alert
{state.alert.name}: {state.alert.description}
Labels: {state.alert.labels}

## Triage Summary
{state.triage_summary}
Known pattern: {state.is_known_pattern} ({state.known_pattern_id or 'N/A'})

## Evidence
{evidence_text}
""")
    ]

    response = await llm.ainvoke(messages)

    import json
    result = json.loads(_extract_json(response.content))

    hypotheses = [
        RootCauseHypothesis(
            cause=h["cause"],
            confidence=h["confidence"],
            evidence=h["evidence"],
            affected_components=h["affected_components"],
            suggested_remediation=h["suggested_remediation"]
        )
        for h in result["hypotheses"]
    ]

    selected_idx = result.get("selected_hypothesis_index", 0)
    selected = hypotheses[selected_idx] if hypotheses else None

    remediation_plan = [
        RemediationAction(**action)
        for action in result.get("recommended_actions", [])
    ]

    logger.info("rca_complete",
                num_hypotheses=len(hypotheses),
                top_confidence=hypotheses[0].confidence if hypotheses else 0,
                selected_cause=selected.cause if selected else "unknown")

    return {
        "hypotheses": hypotheses,
        "selected_hypothesis": selected,
        "remediation_plan": remediation_plan,
        "current_step": "decide_action"
    }


def _extract_json(text: str) -> str:
    """Extract JSON from LLM response, handling markdown wrappers."""
    if "```json" in text:
        return text.split("```json")[1].split("```")[0].strip()
    if "```" in text:
        return text.split("```")[1].split("```")[0].strip()
    # Find first { or [
    for i, char in enumerate(text):
        if char in "{[":
            return text[i:]
    return text

Node 4: Decision Engine — To Remediate or Escalate

This is the most important node architecturally. It's the decision gate that determines whether the agent acts autonomously or hands off to humans.

from langgraph.types import interrupt


async def decide_action(state: IncidentState) -> dict:
    """Decide whether to remediate automatically or escalate."""
    hypothesis = state.selected_hypothesis

    if not hypothesis:
        return {
            "current_step": "escalate",
            "escalation_reason": "No viable hypothesis identified",
            "escalated": True
        }

    # Decision matrix
    should_escalate = False
    escalation_reason = ""

    # Rule 1: Low confidence always escalates
    if hypothesis.confidence < 0.6:
        should_escalate = True
        escalation_reason = (
            f"Low confidence ({hypothesis.confidence:.0%}) in root cause: "
            f"{hypothesis.cause}"
        )

    # Rule 2: High-risk remediations require human approval
    high_risk_actions = [
        a for a in state.remediation_plan if a.estimated_risk == "high"
    ]
    if high_risk_actions:
        should_escalate = True
        escalation_reason = (
            f"Remediation involves high-risk actions: "
            f"{', '.join(a.action_type for a in high_risk_actions)}"
        )

    # Rule 3: Critical severity with data loss risk
    if (state.severity == Severity.CRITICAL and
            any(kw in hypothesis.cause.lower()
                for kw in ["data loss", "corruption", "security", "breach"])):
        should_escalate = True
        escalation_reason = "Critical incident with data/security risk requires human oversight"

    # Rule 4: Database and stateful service operations
    stateful_targets = ["database", "redis", "elasticsearch", "kafka", "zookeeper"]
    for action in state.remediation_plan:
        if any(s in action.target.lower() for s in stateful_targets):
            if action.action_type in ["failover_database", "custom_script"]:
                should_escalate = True
                escalation_reason = f"Stateful service operation on {action.target}"

    if should_escalate:
        logger.info("escalating_incident",
                     alert_id=state.alert.alert_id,
                     reason=escalation_reason)
        return {
            "current_step": "escalate",
            "escalation_reason": escalation_reason,
            "escalated": True
        }

    # For medium-risk actions, request human approval via interrupt
    medium_risk = [
        a for a in state.remediation_plan if a.estimated_risk == "medium"
    ]
    if medium_risk:
        return {
            "current_step": "request_approval",
            "human_approval_needed": True
        }

    # Low-risk actions proceed automatically
    return {"current_step": "remediate"}

The interrupt pattern in LangGraph is what makes this production-safe. When the agent hits request_approval, it pauses execution and surfaces the remediation plan to a human via Slack/PagerDuty. The workflow resumes only after the human responds. This is fundamentally different from "the LLM decides everything" — it's a structured handoff.

async def request_human_approval(state: IncidentState) -> dict:
    """Pause workflow and request human approval for medium-risk actions."""

    # This interrupt surfaces to the human operator
    approval_request = interrupt({
        "type": "remediation_approval",
        "alert_id": state.alert.alert_id,
        "severity": state.severity.value,
        "hypothesis": state.selected_hypothesis.cause,
        "confidence": state.selected_hypothesis.confidence,
        "evidence": state.selected_hypothesis.evidence,
        "proposed_actions": [
            {
                "action": a.action_type,
                "target": a.target,
                "parameters": a.parameters,
                "risk": a.estimated_risk
            }
            for a in state.remediation_plan
        ],
        "message": (
            f"Remediation proposed for {state.alert.name}. "
            f"Root cause: {state.selected_hypothesis.cause} "
            f"(confidence: {state.selected_hypothesis.confidence:.0%}). "
            f"Approve automated remediation?"
        )
    })

    # When the workflow resumes, approval_request contains the human's response
    if approval_request.get("approved"):
        return {"current_step": "remediate", "human_decision": "approved"}
    else:
        return {
            "current_step": "escalate",
            "human_decision": "rejected",
            "escalation_reason": "Human rejected automated remediation",
            "escalated": True
        }

Node 5: Automated Remediation

from kubernetes import client, config

# Map action types to handler functions
REMEDIATION_HANDLERS = {}


def register_handler(action_type: str):
    def decorator(func):
        REMEDIATION_HANDLERS[action_type] = func
        return func
    return decorator


@register_handler("kubectl_restart")
async def restart_deployment(action: RemediationAction) -> str:
    """Rolling restart of a Kubernetes deployment."""
    try:
        config.load_incluster_config()
    except config.ConfigException:
        config.load_kube_config()

    apps_v1 = client.AppsV1Api()
    namespace = action.parameters.get("namespace", "default")

    # Trigger rolling restart by patching annotation
    now = datetime.utcnow().isoformat()
    body = {
        "spec": {
            "template": {
                "metadata": {
                    "annotations": {"kubectl.kubernetes.io/restartedAt": now}
                }
            }
        }
    }

    apps_v1.patch_namespaced_deployment(
        name=action.target,
        namespace=namespace,
        body=body
    )

    return f"Initiated rolling restart of {action.target} in {namespace}"


@register_handler("scale_deployment")
async def scale_deployment(action: RemediationAction) -> str:
    """Scale a deployment to specified replica count."""
    try:
        config.load_incluster_config()
    except config.ConfigException:
        config.load_kube_config()

    apps_v1 = client.AppsV1Api()
    namespace = action.parameters.get("namespace", "default")
    replicas = action.parameters.get("replicas", 3)

    body = {"spec": {"replicas": replicas}}
    apps_v1.patch_namespaced_deployment_scale(
        name=action.target,
        namespace=namespace,
        body=body
    )

    return f"Scaled {action.target} to {replicas} replicas"


@register_handler("rollback_deployment")
async def rollback_deployment(action: RemediationAction) -> str:
    """Rollback to the previous revision."""
    try:
        config.load_incluster_config()
    except config.ConfigException:
        config.load_kube_config()

    apps_v1 = client.AppsV1Api()
    namespace = action.parameters.get("namespace", "default")

    # Get current deployment to find previous revision
    deploy = apps_v1.read_namespaced_deployment(
        name=action.target, namespace=namespace
    )

    current_revision = int(
        deploy.metadata.annotations.get(
            "deployment.kubernetes.io/revision", "1"
        )
    )

    if current_revision <= 1:
        return f"No previous revision available for {action.target}"

    # Use the Kubernetes API to rollback
    # (In practice, you'd use the rollout undo command equivalent)
    body = {
        "kind": "DeploymentRollback",
        "apiVersion": "extensions/v1beta1",
        "name": action.target,
        "rollbackTo": {"revision": current_revision - 1}
    }

    # Alternative: patch to previous revision's spec
    # This is simplified — production code should store revision history
    return f"Rollback initiated for {action.target} to revision {current_revision - 1}"


@register_handler("clear_cache")
async def clear_cache(action: RemediationAction) -> str:
    """Clear application cache via exec or API call."""
    cache_type = action.parameters.get("cache_type", "redis")

    if cache_type == "redis":
        import redis.asyncio as aioredis
        redis_url = action.parameters.get(
            "redis_url", "redis://redis:6379/0"
        )
        r = aioredis.from_url(redis_url)
        pattern = action.parameters.get("pattern", "*")
        await r.flushdb()
        await r.aclose()
        return f"Cleared Redis cache (pattern: {pattern})"
    elif cache_type == "api":
        endpoint = action.parameters.get("cache_clear_endpoint")
        async with aiohttp.ClientSession() as session:
            async with session.post(endpoint) as resp:
                return f"Cache clear API call: {resp.status}"
    return "Unknown cache type"


async def execute_remediation(state: IncidentState) -> dict:
    """Execute the remediation plan and verify results."""
    results = []

    for action in state.remediation_plan:
        handler = REMEDIATION_HANDLERS.get(action.action_type)
        if not handler:
            results.append(
                f"SKIP: No handler for action type {action.action_type}"
            )
            continue

        try:
            result = await handler(action)
            results.append(f"SUCCESS: {result}")
            logger.info("remediation_executed",
                        action=action.action_type,
                        target=action.target,
                        result=result)
        except Exception as e:
            results.append(f"FAILED: {action.action_type} - {str(e)}")
            logger.error("remediation_failed",
                         action=action.action_type,
                         error=str(e))

    return {
        "remediation_executed": True,
        "remediation_result": "\n".join(results),
        "current_step": "verify"
    }

Node 6: Verification

Remediation without verification is just hoping. This node checks whether the fix actually worked.

async def verify_remediation(state: IncidentState) -> dict:
    """Check if the remediation resolved the issue."""
    await asyncio.sleep(120)  # Wait 2 minutes for effects to propagate

    alert = state.alert
    hypothesis = state.selected_hypothesis

    # Re-query the key metrics from the original investigation
    verification_metrics = []
    for metric in state.metrics:
        if metric.anomaly_detected:
            try:
                current = await _query_prometheus(
                    query=metric.query,
                    start=datetime.utcnow() - timedelta(minutes=5),
                    end=datetime.utcnow()
                )
                verification_metrics.append(current)
            except Exception:
                pass

    # Check if the original alert is still firing
    alert_still_active = await _check_alert_status(alert.alert_id)

    # Use LLM to interpret verification results
    llm = ChatOpenAI(model="gpt-4o", temperature=0)

    verification_context = f"""
Original issue: {hypothesis.cause}
Remediation taken: {state.remediation_result}
Alert still active: {alert_still_active}

Post-remediation metrics:
"""
    for vm in verification_metrics:
        verification_context += (
            f"- {vm.metric_name}: anomaly={vm.anomaly_detected}, "
            f"desc={vm.anomaly_description}\n"
        )

    response = await llm.ainvoke([
        SystemMessage(content=(
            "Determine if the remediation was successful. "
            "Respond with JSON: {\"verified\": bool, \"reason\": str, "
            "\"needs_escalation\": bool}"
        )),
        HumanMessage(content=verification_context)
    ])

    import json
    result = json.loads(_extract_json(response.content))

    if result["verified"]:
        return {
            "remediation_verified": True,
            "current_step": "complete"
        }
    elif result.get("needs_escalation"):
        return {
            "remediation_verified": False,
            "current_step": "escalate",
            "escalation_reason": (
                f"Remediation did not resolve the issue: {result['reason']}"
            ),
            "escalated": True
        }
    else:
        # Retry with different approach — loop back
        return {
            "remediation_verified": False,
            "current_step": "root_cause_analysis"
        }

Node 7: Escalation

async def escalate_incident(state: IncidentState) -> dict:
    """Escalate to human responders with full context."""
    alert = state.alert

    # Build escalation payload
    escalation_context = {
        "alert": {
            "id": alert.alert_id,
            "name": alert.name,
            "severity": state.severity.value,
            "description": alert.description,
        },
        "triage_summary": state.triage_summary,
        "investigation_findings": {
            "hypotheses": [
                {
                    "cause": h.cause,
                    "confidence": h.confidence,
                    "evidence": h.evidence
                }
                for h in state.hypotheses
            ],
            "metrics_analyzed": len(state.metrics),
            "anomalies_found": sum(
                1 for m in state.metrics if m.anomaly_detected
            ),
            "recent_deployments": state.recent_deployments,
        },
        "remediation_attempted": state.remediation_executed,
        "remediation_result": state.remediation_result,
        "escalation_reason": state.escalation_reason,
    }

    # Send to PagerDuty
    await _create_pagerduty_incident(escalation_context)

    # Post to Slack
    await _post_slack_escalation(escalation_context)

    logger.info("incident_escalated",
                alert_id=alert.alert_id,
                reason=state.escalation_reason)

    return {"escalated": True, "current_step": "complete"}


async def _create_pagerduty_incident(context: dict) -> str:
    """Create a PagerDuty incident with investigation context."""
    import os

    pagerduty_key = os.environ["PAGERDUTY_ROUTING_KEY"]
    url = "https://events.pagerduty.com/v2/enqueue"

    severity_map = {
        "critical": "critical",
        "high": "error",
        "medium": "warning",
        "low": "info"
    }

    payload = {
        "routing_key": pagerduty_key,
        "event_action": "trigger",
        "dedup_key": context["alert"]["id"],
        "payload": {
            "summary": (
                f"[{context['alert']['severity'].upper()}] "
                f"{context['alert']['name']}: "
                f"{context['escalation_reason']}"
            ),
            "severity": severity_map.get(
                context["alert"]["severity"], "warning"
            ),
            "source": "incident-response-agent",
            "custom_details": context
        }
    }

    async with aiohttp.ClientSession() as session:
        async with session.post(url, json=payload) as resp:
            result = await resp.json()
            return result.get("dedup_key", "")


async def _post_slack_escalation(context: dict) -> None:
    """Post escalation summary to Slack."""
    import os

    webhook_url = os.environ["SLACK_WEBHOOK_URL"]

    hypotheses_text = "\n".join(
        f"• *{h['cause']}* (confidence: {h['confidence']:.0%})"
        for h in context["investigation_findings"]["hypotheses"]
    )

    blocks = [
        {
            "type": "header",
            "text": {
                "type": "plain_text",
                "text": f"🚨 Incident Escalated: {context['alert']['name']}"
            }
        },
        {
            "type":

Keywords

AI agentdevops-agents