Building an AI Trading Bot with LangChain: A Step-by-Step Tutorial
Alex Chen
AI engineer and open-source contributor. Writes about agent architectures and LLM tooling.
Most tutorials on "AI trading bots" are either vaporware or dangerously naive. They show you a LangChain agent calling a single indicator function, then skip straight to "now deploy it and make money....
Building a Quantitative Trading Bot with LangChain Agents
A Realistic Architecture for LLM-Augmented Trading Systems
Most tutorials on "AI trading bots" are either vaporware or dangerously naive. They show you a LangChain agent calling a single indicator function, then skip straight to "now deploy it and make money." This isn't that tutorial.
We're going to build something architecturally honest: a trading system where LangChain agents handle the parts LLMs are actually good at (unstructured data analysis, reasoning over multiple information sources, qualitative assessment) while traditional quantitative methods handle numerical signal generation. The LLM isn't the strategy — it's one component in a pipeline that also includes hard risk controls that no agent can override.
The full system will cover market data ingestion, a hybrid signal generation pipeline, layered risk management, backtesting with realistic assumptions, and paper trading through Alpaca.
Why LLMs in Trading (and Why Not)
Before writing a line of code, let's be clear about what LLMs bring to trading and where they fail:
LLMs are useful for:
- Parsing and synthesizing news, earnings calls, SEC filings
- Interpreting macroeconomic context
- Reasoning across multiple qualitative signals
- Explaining and justifying trade decisions (auditability)
LLMs are terrible for:
- Pure numerical pattern recognition (traditional ML is better)
- High-frequency anything (latency is measured in seconds, not microseconds)
- Guaranteed consistency (same input can yield different outputs)
- Mathematical precision
The architecture below reflects this reality.
Project Structure
trading-bot/
├── config.py
├── data/
│ ├── ingestion.py
│ └── store.py
├── agents/
│ ├── market_analyst.py
│ ├── risk_assessor.py
│ └── orchestrator.py
├── signals/
│ ├── technical.py
│ ├── sentiment.py
│ └── composite.py
├── risk/
│ ├── position_sizing.py
│ ├── limits.py
│ └── stops.py
├── execution/
│ ├── paper_trader.py
│ └── order_manager.py
├── backtest/
│ ├── engine.py
│ └── metrics.py
├── main.py
└── requirements.txt
1. Market Data Ingestion
We need two types of data: structured (OHLCV prices, fundamentals) and unstructured (news, filings). We'll use yfinance for price data and a news API for the unstructured side.
# config.py
from dataclasses import dataclass, field
from typing import List
@dataclass
class TradingConfig:
symbols: List[str] = field(default_factory=lambda: [
"AAPL", "MSFT", "GOOGL", "AMZN", "NVDA",
"META", "TSLA", "JPM", "V", "UNH"
])
lookback_days: int = 252 # 1 year of trading days
initial_capital: float = 100_000.0
max_position_pct: float = 0.10 # 10% max per position
max_portfolio_risk: float = 0.02 # 2% max daily portfolio risk
max_drawdown_pct: float = 0.15 # 15% max drawdown before halt
stop_loss_pct: float = 0.05 # 5% trailing stop
paper_trading: bool = True
alpaca_api_key: str = ""
alpaca_secret_key: str = ""
openai_api_key: str = ""
news_api_key: str = ""
# data/ingestion.py
import yfinance as yf
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from typing import Dict, Optional
import requests
import logging
logger = logging.getLogger(__name__)
class MarketDataFetcher:
"""Fetches and caches market data from multiple sources."""
def __init__(self, config):
self.config = config
self._cache: Dict[str, pd.DataFrame] = {}
def get_price_data(
self, symbol: str, period_days: Optional[int] = None
) -> pd.DataFrame:
"""Fetch OHLCV data. Returns DataFrame with standard columns."""
if symbol in self._cache:
return self._cache[symbol]
days = period_days or self.config.lookback_days
end = datetime.now()
start = end - timedelta(days=int(days * 1.5)) # buffer for weekends
try:
ticker = yf.Ticker(symbol)
df = ticker.history(start=start, end=end, auto_adjust=True)
if df.empty:
logger.warning(f"No data returned for {symbol}")
return pd.DataFrame()
# Standardize columns
df = df.rename(columns={
"Open": "open", "High": "high", "Low": "low",
"Close": "close", "Volume": "volume"
})
df = df[["open", "high", "low", "close", "volume"]]
df.index = pd.to_datetime(df.index)
df.index = df.index.tz_localize(None) # Remove timezone
df = df.tail(days)
self._cache[symbol] = df
return df
except Exception as e:
logger.error(f"Failed to fetch data for {symbol}: {e}")
return pd.DataFrame()
def get_fundamentals(self, symbol: str) -> dict:
"""Fetch basic fundamental data."""
try:
ticker = yf.Ticker(symbol)
info = ticker.info
return {
"pe_ratio": info.get("trailingPE"),
"forward_pe": info.get("forwardPE"),
"peg_ratio": info.get("pegRatio"),
"market_cap": info.get("marketCap"),
"profit_margin": info.get("profitMargins"),
"revenue_growth": info.get("revenueGrowth"),
"debt_to_equity": info.get("debtToEquity"),
"free_cash_flow": info.get("freeCashflow"),
"sector": info.get("sector"),
"industry": info.get("industry"),
}
except Exception as e:
logger.error(f"Failed to fetch fundamentals for {symbol}: {e}")
return {}
def get_news(self, symbol: str, days: int = 7) -> list:
"""Fetch recent news articles for a symbol."""
if not self.config.news_api_key:
return self._get_yfinance_news(symbol)
# NewsAPI approach
end = datetime.now()
start = end - timedelta(days=days)
url = "https://newsapi.org/v2/everything"
params = {
"q": symbol,
"from": start.strftime("%Y-%m-%d"),
"to": end.strftime("%Y-%m-%d"),
"sortBy": "relevancy",
"language": "en",
"pageSize": 10,
"apiKey": self.config.news_api_key,
}
try:
resp = requests.get(url, params=params, timeout=10)
resp.raise_for_status()
articles = resp.json().get("articles", [])
return [
{
"title": a["title"],
"description": a.get("description", ""),
"source": a["source"]["name"],
"published": a["publishedAt"],
"content": a.get("content", "")[:500],
}
for a in articles
if a.get("title") and "[Removed]" not in a["title"]
]
except Exception as e:
logger.error(f"News API error for {symbol}: {e}")
return []
def _get_yfinance_news(self, symbol: str) -> list:
"""Fallback: use yfinance's built-in news."""
try:
ticker = yf.Ticker(symbol)
news = ticker.news or []
return [
{
"title": n.get("title", ""),
"description": n.get("summary", ""),
"source": n.get("publisher", "Unknown"),
"published": datetime.fromtimestamp(
n.get("providerPublishTime", 0)
).isoformat(),
"content": n.get("summary", "")[:500],
}
for n in news[:10]
]
except Exception:
return []
def clear_cache(self):
self._cache.clear()
Key design decisions here: the cache prevents redundant API calls during a single analysis cycle, yfinance is rate-limited but free, and the news fallback means the system degrades gracefully without an API key.
2. Technical Signal Generation
This is where most LLM trading tutorials go wrong. They ask the LLM to calculate RSI. Don't do that. Use pandas-ta for technical indicators and have the LLM interpret the results.
# signals/technical.py
import pandas as pd
import numpy as np
from dataclasses import dataclass
from typing import List, Optional
try:
import pandas_ta as ta
except ImportError:
ta = None
@dataclass
class TechnicalSignal:
symbol: str
signal_type: str # "BUY", "SELL", "HOLD"
strength: float # -1.0 to 1.0
indicators: dict # Raw indicator values
reasoning: str # Human-readable explanation
class TechnicalAnalyzer:
"""Generates trading signals from price data using technical indicators."""
def __init__(self):
self.indicator_weights = {
"trend": 0.35,
"momentum": 0.25,
"volatility": 0.20,
"volume": 0.20,
}
def analyze(self, symbol: str, df: pd.DataFrame) -> TechnicalSignal:
"""Run full technical analysis on price data."""
if df.empty or len(df) < 50:
return TechnicalSignal(
symbol=symbol, signal_type="HOLD",
strength=0.0, indicators={}, reasoning="Insufficient data"
)
indicators = {}
# --- Trend Indicators ---
indicators.update(self._trend_indicators(df))
# --- Momentum Indicators ---
indicators.update(self._momentum_indicators(df))
# --- Volatility Indicators ---
indicators.update(self._volatility_indicators(df))
# --- Volume Indicators ---
indicators.update(self._volume_indicators(df))
# --- Composite Score ---
score = self._compute_composite_score(indicators)
signal_type = "BUY" if score > 0.2 else "SELL" if score < -0.2 else "HOLD"
return TechnicalSignal(
symbol=symbol,
signal_type=signal_type,
strength=round(score, 3),
indicators=indicators,
reasoning=self._generate_reasoning(indicators, signal_type, score),
)
def _trend_indicators(self, df: pd.DataFrame) -> dict:
close = df["close"]
result = {}
# Moving Averages
result["sma_20"] = close.rolling(20).mean().iloc[-1]
result["sma_50"] = close.rolling(50).mean().iloc[-1]
result["ema_12"] = close.ewm(span=12).mean().iloc[-1]
result["ema_26"] = close.ewm(span=26).mean().iloc[-1]
# MACD
macd_line = result["ema_12"] - result["ema_26"]
signal_line = (close.ewm(span=12).mean() - close.ewm(span=26).mean()).ewm(span=9).mean().iloc[-1]
result["macd"] = macd_line
result["macd_signal"] = signal_line
result["macd_histogram"] = macd_line - signal_line
# Price relative to MAs
current_price = close.iloc[-1]
result["price_vs_sma20"] = (current_price - result["sma_20"]) / result["sma_20"]
result["price_vs_sma50"] = (current_price - result["sma_50"]) / result["sma_50"]
# Trend direction (slope of 20-day SMA)
sma20 = close.rolling(20).mean()
result["trend_slope"] = (sma20.iloc[-1] - sma20.iloc[-5]) / sma20.iloc[-5]
return result
def _momentum_indicators(self, df: pd.DataFrame) -> dict:
close = df["close"]
high = df["high"]
low = df["low"]
result = {}
# RSI (14-period)
delta = close.diff()
gain = delta.where(delta > 0, 0).rolling(14).mean()
loss = (-delta.where(delta < 0, 0)).rolling(14).mean()
rs = gain / loss.replace(0, np.nan)
result["rsi_14"] = (100 - (100 / (1 + rs))).iloc[-1]
# Stochastic Oscillator
lowest_low = low.rolling(14).min()
highest_high = high.rolling(14).max()
result["stoch_k"] = (
100 * (close - lowest_low) / (highest_high - lowest_low)
).iloc[-1]
result["stoch_d"] = (
100 * (close - lowest_low) / (highest_high - lowest_low)
).rolling(3).mean().iloc[-1]
# Rate of Change
result["roc_10"] = ((close.iloc[-1] / close.iloc[-10]) - 1) * 100
return result
def _volatility_indicators(self, df: pd.DataFrame) -> dict:
close = df["close"]
high = df["high"]
low = df["low"]
result = {}
# Bollinger Bands
sma20 = close.rolling(20).mean()
std20 = close.rolling(20).std()
result["bb_upper"] = (sma20 + 2 * std20).iloc[-1]
result["bb_lower"] = (sma20 - 2 * std20).iloc[-1]
result["bb_width"] = (
(result["bb_upper"] - result["bb_lower"]) / sma20.iloc[-1]
)
result["bb_position"] = (
(close.iloc[-1] - result["bb_lower"])
/ (result["bb_upper"] - result["bb_lower"])
)
# ATR (Average True Range)
tr = pd.DataFrame({
"hl": high - low,
"hc": abs(high - close.shift(1)),
"lc": abs(low - close.shift(1)),
}).max(axis=1)
result["atr_14"] = tr.rolling(14).mean().iloc[-1]
result["atr_pct"] = result["atr_14"] / close.iloc[-1]
# Historical Volatility (20-day)
returns = close.pct_change().dropna()
result["hist_vol_20"] = returns.tail(20).std() * np.sqrt(252)
return result
def _volume_indicators(self, df: pd.DataFrame) -> dict:
close = df["close"]
volume = df["volume"]
result = {}
# Volume Moving Average
result["vol_sma_20"] = volume.rolling(20).mean().iloc[-1]
result["vol_ratio"] = volume.iloc[-1] / result["vol_sma_20"]
# On-Balance Volume
obv = (np.sign(close.diff()) * volume).cumsum()
obv_sma = obv.rolling(20).mean()
result["obv_trend"] = 1 if obv.iloc[-1] > obv_sma.iloc[-1] else -1
# Volume-Price Trend
vpt = (close.pct_change() * volume).cumsum()
result["vpt_slope"] = (vpt.iloc[-1] - vpt.iloc[-10]) / abs(vpt.iloc[-10]) if vpt.iloc[-10] != 0 else 0
return result
def _compute_composite_score(self, indicators: dict) -> float:
"""Weighted composite score from all indicator categories."""
scores = {}
# Trend score
trend_signals = []
if indicators.get("macd_histogram", 0) > 0:
trend_signals.append(0.5)
else:
trend_signals.append(-0.5)
if indicators.get("price_vs_sma20", 0) > 0:
trend_signals.append(min(indicators["price_vs_sma20"] * 5, 1.0))
else:
trend_signals.append(max(indicators["price_vs_sma20"] * 5, -1.0))
trend_signals.append(np.clip(indicators.get("trend_slope", 0) * 20, -1, 1))
scores["trend"] = np.mean(trend_signals)
# Momentum score
rsi = indicators.get("rsi_14", 50)
if rsi > 70:
mom_signals = [-0.7] # Overbought
elif rsi < 30:
mom_signals = [0.7] # Oversold
else:
mom_signals = [(rsi - 50) / 50 * -0.3] # Mild mean reversion
roc = indicators.get("roc_10", 0)
mom_signals.append(np.clip(roc / 10, -1, 1))
scores["momentum"] = np.mean(mom_signals)
# Volatility score (higher vol = more caution)
bb_pos = indicators.get("bb_position", 0.5)
if bb_pos > 0.9:
scores["volatility"] = -0.5 # Near upper band, caution
elif bb_pos < 0.1:
scores["volatility"] = 0.5 # Near lower band, opportunity
else:
scores["volatility"] = (0.5 - bb_pos) * 0.5
# Volume score
vol_ratio = indicators.get("vol_ratio", 1.0)
obv_trend = indicators.get("obv_trend", 0)
if vol_ratio > 1.5 and obv_trend > 0:
scores["volume"] = 0.6
elif vol_ratio > 1.5 and obv_trend < 0:
scores["volume"] = -0.6
else:
scores["volume"] = obv_trend * 0.2
# Weighted composite
composite = sum(
scores[cat] * weight
for cat, weight in self.indicator_weights.items()
)
return np.clip(composite, -1.0, 1.0)
def _generate_reasoning(
self, indicators: dict, signal: str, score: float
) -> str:
"""Generate human-readable reasoning for the signal."""
parts = []
rsi = indicators.get("rsi_14", 50)
if rsi > 70:
parts.append(f"RSI at {rsi:.1f} indicates overbought conditions")
elif rsi < 30:
parts.append(f"RSI at {rsi:.1f} indicates oversold conditions")
else:
parts.append(f"RSI at {rsi:.1f} is in neutral range")
macd_hist = indicators.get("macd_histogram", 0)
if macd_hist > 0:
parts.append("MACD histogram is positive (bullish momentum)")
else:
parts.append("MACD histogram is negative (bearish momentum)")
atr_pct = indicators.get("atr_pct", 0)
parts.append(f"ATR is {atr_pct*100:.1f}% of price")
vol_ratio = indicators.get("vol_ratio", 1)
if vol_ratio > 1.5:
parts.append(f"Volume is {vol_ratio:.1f}x the 20-day average (elevated)")
elif vol_ratio < 0.5:
parts.append(f"Volume is {vol_ratio:.1f}x average (below normal)")
return ". ".join(parts) + f". Composite score: {score:.3f}."
This is deterministic, fast, and numerically precise. Now let's add the LLM layer that makes this system more than just a traditional quant bot.
3. LangChain Agent for Qualitative Analysis
This is where LangChain earns its place. The agent synthesizes news, fundamentals, and technical signals into a holistic assessment.
# agents/market_analyst.py
from langchain_openai import ChatOpenAI
from langchain.agents import AgentExecutor, create_openai_functions_agent
from langchain.tools import tool
from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.messages import SystemMessage
from pydantic import BaseModel, Field
from typing import List, Optional
import json
import logging
logger = logging.getLogger(__name__)
class MarketAssessment(BaseModel):
"""Structured output from the market analyst agent."""
symbol: str
overall_bias: str = Field(description="BULLISH, BEARISH, or NEUTRAL")
confidence: float = Field(description="0.0 to 1.0", ge=0, le=1)
time_horizon: str = Field(description="SHORT_TERM, MEDIUM_TERM, LONG_TERM")
key_factors: List[str] = Field(description="Top 3-5 factors driving the assessment")
risk_factors: List[str] = Field(description="Key risks to the thesis")
sentiment_score: float = Field(description="-1.0 (very bearish) to 1.0 (very bullish)")
recommendation: str = Field(description="BUY, SELL, HOLD with sizing suggestion")
reasoning: str = Field(description="Detailed reasoning")
def create_market_analyst_agent(config, data_fetcher, technical_analyzer):
"""Create a LangChain agent for market analysis."""
llm = ChatOpenAI(
model="gpt-4o",
temperature=0.1, # Low temperature for consistency
api_key=config.openai_api_key,
)
# Define tools the agent can use
@tool
def get_price_analysis(symbol: str) -> str:
"""Get technical analysis results for a stock symbol.
Returns indicators, signals, and composite scores."""
df = data_fetcher.get_price_data(symbol)
signal = technical_analyzer.analyze(symbol, df)
return json.dumps({
"symbol": signal.symbol,
"signal": signal.signal_type,
"strength": signal.strength,
"reasoning": signal.reasoning,
"key_indicators": {
k: round(v, 4) if isinstance(v, float) else v
for k, v in signal.indicators.items()
}
}, indent=2)
@tool
def get_recent_news(symbol: str) -> str:
"""Get recent news articles for a stock symbol.
Returns titles, descriptions, and sources."""
news = data_fetcher.get_news(symbol)
if not news:
return "No recent news found."
return json.dumps(news[:7], indent=2)
@tool
def get_fundamentals(symbol: str) -> str:
"""Get fundamental data for a stock including valuation
metrics, profitability, and growth."""
fundamentals = data_fetcher.get_fundamentals(symbol)
if not fundamentals:
return "No fundamental data available."
return json.dumps({
k: v for k, v in fundamentals.items() if v is not None
}, indent=2)
@tool
def get_price_summary(symbol: str) -> str:
"""Get a summary of recent price action including
returns over multiple periods and current price levels."""
df = data_fetcher.get_price_data(symbol)
if df.empty:
return "No price data available."
close = df["close"]
current = close.iloc[-1]
def pct_return(n):
if len(close) >= n:
return round((current / close.iloc[-n] - 1) * 100, 2)
return None
return json.dumps({
"current_price": round(current, 2),
"5d_return_pct": pct_return(5),
"20d_return_pct": pct_return(20),
"60d_return_pct": pct_return(60),
"252d_return_pct": pct_return(252),
"52w_high": round(close.max(), 2),
"52w_low": round(close.min(), 2),
"pct_from_52w_high": round((current / close.max() - 1) * 100, 2),
"avg_daily_volume": int(df["volume"].tail(20).mean()),
}, indent=2)
tools = [get_price_analysis, get_recent_news, get_fundamentals, get_price_summary]
system_prompt = """You are a senior quantitative market analyst. Your job is to
synthesize technical analysis, news sentiment, and fundamental data into a clear
trading assessment for a given stock.
IMPORTANT GUIDELINES:
- Be specific and data-driven. Cite actual numbers from the tools.
- Distinguish between short-term trading signals and longer-term fundamentals.
- Always identify the strongest counter-arguments to your assessment.
- Be honest about uncertainty. Low confidence is acceptable.
- Consider macro context: is this a stock-specific move or sector-wide?
- News sentiment can be misleading. Cross-reference with price action.
- Never fabricate data. If you don't have information, say so.
Your output must be a structured MarketAssessment with:
- overall_bias: BULLISH, BEARISH, or NEUTRAL
- confidence: 0.0 to 1.0 (be conservative; most assessments should be 0.3-0.7)
- key_factors: the 3-5 most important drivers
- risk_factors: what could go wrong
- sentiment_score: -1.0 to 1.0 based on news/qualitative factors
- recommendation: BUY, SELL, or HOLD with position sizing guidance
- reasoning: detailed explanation"""
prompt = ChatPromptTemplate.from_messages([
SystemMessage(content=system_prompt),
MessagesPlaceholder(variable_name="chat_history", optional=True),
("human", "{input}"),
MessagesPlaceholder(variable_name="agent_scratchpad"),
])
agent = create_openai_functions_agent(llm, tools, prompt)
executor = AgentExecutor(
agent=agent,
tools=tools,
verbose=False,
max_iterations=8,
handle_parsing_errors=True,
return_intermediate_steps=True,
)
return executor
class MarketAnalyst:
"""High-level wrapper that runs the agent and parses output."""
def __init__(self, config, data_fetcher, technical_analyzer):
self.agent_executor = create_market_analyst_agent(
config, data_fetcher, technical_analyzer
)
self.config = config
def analyze(self, symbol: str) -> MarketAssessment:
"""Run full analysis on a symbol and return structured assessment."""
try:
result = self.agent_executor.invoke({
"input": (
f"Provide a comprehensive trading analysis for {symbol}. "
f"Use all available tools to gather data, then synthesize "
f"your findings into a structured assessment."
)
})
# Parse the agent's output into a MarketAssessment
output = result.get("output", "")
# The agent's final output should contain the structured assessment
# We'll use the LLM to extract structured data
return self._parse_assessment(symbol, output)
except Exception as e:
logger.error(f"Analysis failed for {symbol}: {e}")
return MarketAssessment(
symbol=symbol,
overall_bias="NEUTRAL",
confidence=0.0,
time_horizon="SHORT_TERM",
key_factors=[f"Analysis failed: {str(e)}"],
risk_factors=["System error - no analysis available"],
sentiment_score=0.0,
recommendation="HOLD",
reasoning=f"Could not complete analysis: {e}",
)
def _parse_assessment(self, symbol: str, output: str) -> MarketAssessment:
"""Parse agent output into structured assessment."""
# Use a separate LLM call with structured output for reliability
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(
model="gpt-4o-mini", # Cheaper model for parsing
temperature=0,
api_key=self.config.openai_api_key,
)
structured_llm = llm.with_structured_output(MarketAssessment)
try:
assessment = structured_llm.invoke(
f"Parse this market analysis into a structured assessment "
f"for {symbol}:\n\n{output}"
)
return assessment
except Exception as e:
logger.error(f"Failed to parse assessment for {symbol}: {e}")
return MarketAssessment(
symbol=symbol,
overall_bias="NEUTRAL",
confidence=0.1,
time_horizon="SHORT_TERM",
key_factors=["Parse failure"],
risk_factors=["Assessment could not be structured"],
sentiment_score=0.0,
recommendation="HOLD",
reasoning=output[:500],
)
The two-step pattern (agent generates → structured parser extracts) is more reliable than asking the agent to output JSON directly. The agent gets to reason freely, then a deterministic extraction step ensures we get valid structured data.
4. Composite Signal Generation
Now we combine technical and qualitative signals:
# signals/composite.py
from dataclasses import dataclass
from typing import List
import numpy as np
@dataclass
class CompositeSignal:
symbol: str
action: str # "BUY", "SELL", "HOLD"
strength: float # -1.0 to 1.0
technical_score: float
sentiment_score: float
confidence: float
position_size_hint: float # 0.0 to 1.0 (fraction of max position)
reasoning: str
class CompositeSignalGenerator:
"""Combines technical and LLM-based signals into actionable recommendations."""
def __init__(self, technical_weight: float = 0.6, sentiment_weight: float = 0.4):
self.technical_weight = technical_weight
self.sentiment_weight = sentiment_weight
# Thresholds
self.buy_threshold = 0.25
self.sell_threshold = -0.25
self.min_confidence = 0.3
def generate(
self,
technical_signal,
market_assessment,
) -> CompositeSignal:
"""Combine technical and qualitative signals."""
t_score = technical_signal.strength
s_score = market_assessment.sentiment_score
# Weighted composite
composite = (
self.technical_weight * t_score
+ self.sentiment_weight * s_score
)
# Confidence adjustment: if signals disagree, reduce confidence
agreement = 1.0 - abs(t_score - s_score) / 2.0
confidence = market_assessment.confidence * agreement
# Signal agreement bonus/penalty
if (t_score > 0 and s_score > 0) or (t_score < 0 and s_score < 0):
# Signals agree - boost composite slightly
composite *= 1.15
else:
# Signals disagree - dampen
composite *= 0.7
composite = np.clip(composite, -1.0, 1.0)
# Determine action
if confidence < self.min_confidence:
action = "HOLD"
elif composite > self.buy_threshold:
action = "BUY"
elif composite < self.sell_threshold:
action = "SELL"
else:
action = "HOLD"
# Position sizing hint (scaled by confidence and strength)
position_hint = abs(composite) * confidence if action != "HOLD" else 0.0
reasoning = (
f"Technical: {technical_signal.signal_type} ({t_score:.3f}), "
f"Sentiment: {market_assessment.overall_bias} ({s_score:.3f}), "
f"Composite: {composite:.3f}, Confidence: {confidence:.3f}. "
f"Technical: {technical_signal.reasoning} "
f"Qualitative: {'; '.join(market_assessment.key_factors[:3])}"
)
return CompositeSignal(
symbol=technical_signal.symbol,
action=action,
strength=round(composite, 3),
technical_score=round(t_score, 3),
sentiment_score=round(s_score, 3),
confidence=round(confidence, 3),
position_size_hint=round(position_hint, 3),
reasoning=reasoning,
)
5. Risk Management (The Non-Negotiable Layer)
This is the most important part of the system. Risk management is a hard constraint layer — it sits between signal generation and execution, and nothing bypasses it.
# risk/limits.py
import logging
from dataclasses import dataclass, field
from typing import Dict, List, Optional
import numpy as np
logger = logging.getLogger(__name__)
@dataclass
class Position:
symbol: str
shares: int
entry_price: float
current_price: float
stop_loss: float
highest_price: float # For trailing stop
entry_date: str
@property
def market_value(self):
return self.shares * self.current_price
@property
def pnl(self):
return (self.current_price - self.entry_price) * self.shares
@property
def pnl_pct(self):
if self.entry_price == 0:
return 0
return (self.current_price / self.entry_price - 1) * 100
@dataclass
class Portfolio:
cash: float
positions: Dict[str, Position] = field(default_factory=dict)
peak_value: float = 0.0
trade_history: List[dict] = field(default_factory=list)
halted: bool = False
@property
def total_value(self):
pos_value = sum(p.market_value for p in self.positions.values())
return self.cash + pos_value
@property
def total_pnl(self):
return sum(p.pnl for p in self.positions.values())
@property
def drawdown(self):
if self.peak_value == 0:
return 0
return (self.peak_value - self.total_value) / self.peak_value
class RiskManager:
"""Enforces risk constraints on all trades. Cannot be overridden by agents."""
def __init__(self, config):
self.config = config
self.max_position_pct = config.max_position_pct
self.max_portfolio_risk = config.max_portfolio_risk
self.max_drawdown_pct = config.max_drawdown_pct
self.stop_loss_pct = config.stop_loss_pct
def check_trade(
self,
symbol: str,
action: str,
shares: int,
price: float,
portfolio: Portfolio,
signal_confidence: float,
) -> tuple[bool, int, str]:
"""
Validate a trade against all risk constraints.
Returns (approved, adjusted_shares, reason).
"""
if portfolio.halted:
return False, 0, "PORTFOLIO HALTED: Max drawdown exceeded"
if action == "HOLD":
return True, 0, "No trade needed"
trade_value = shares * price
# --- Constraint 1: Cash available ---
if action == "BUY":
if trade_value > portfolio.cash:
max_affordable = int(portfolio.cash / price)
if max_affordable ==