This guide covers performance metrics, optimization strategies, and monitoring for retail AI agents.
Agent Response Times
| Agent Type |
Avg Response Time |
P95 Response Time |
P99 Response Time |
Success Rate |
| Product |
1.2s |
2.1s |
3.5s |
98.5% |
| Inventory |
0.8s |
1.4s |
2.2s |
99.2% |
| Comparison |
2.1s |
3.8s |
5.2s |
94.1% |
| DIY |
3.2s |
5.1s |
7.8s |
91.3% |
| General |
1.5s |
2.7s |
4.1s |
96.8% |
| Recommendation |
1.8s |
3.2s |
4.9s |
95.7% |
Accuracy Metrics
| Agent Type |
Accuracy |
Precision |
Recall |
F1 Score |
| Product |
95.2% |
94.8% |
95.6% |
95.2% |
| Inventory |
98.1% |
98.3% |
97.9% |
98.1% |
| Comparison |
92.4% |
91.8% |
93.1% |
92.4% |
| DIY |
88.7% |
87.9% |
89.5% |
88.7% |
| General |
90.3% |
89.7% |
90.9% |
90.3% |
| Recommendation |
93.1% |
92.5% |
93.7% |
93.1% |
| Tool Type |
Avg Latency |
Throughput (req/s) |
Error Rate |
Cache Hit Rate |
| Unity Catalog |
200ms |
1000 |
0.5% |
85% |
| Vector Search |
300ms |
500 |
1.2% |
70% |
| LangChain |
1.5s |
100 |
2.1% |
45% |
| External APIs |
2.0s |
50 |
3.5% |
20% |
Optimization Strategies
Response Time Optimization
def optimize_tool_selection(query: str, context: dict) -> list:
"""Select the fastest appropriate tools for a query."""
# Fast path for exact lookups
skus = extract_skus(query)
if skus:
return [create_find_product_by_sku_tool()] # Fastest: 200ms avg
# Medium path for semantic search
if is_product_search_query(query):
return [find_product_details_by_description_tool()] # Medium: 300ms avg
# Slow path for complex analysis
if is_comparison_query(query):
return [
find_product_details_by_description_tool(),
create_product_comparison_tool() # Slower: 1.5s avg
]
return [find_product_details_by_description_tool()]
import asyncio
async def execute_tools_parallel(tools: list, query: str) -> dict:
"""Execute multiple tools in parallel to reduce total time."""
tasks = []
# Group tools by execution time
fast_tools = [t for t in tools if t.avg_latency < 500] # < 500ms
slow_tools = [t for t in tools if t.avg_latency >= 500] # >= 500ms
# Execute fast tools first
if fast_tools:
fast_tasks = [tool.ainvoke(query) for tool in fast_tools]
tasks.extend(fast_tasks)
# Execute slow tools in parallel
if slow_tools:
slow_tasks = [tool.ainvoke(query) for tool in slow_tools]
tasks.extend(slow_tasks)
results = await asyncio.gather(*tasks, return_exceptions=True)
# Combine results
combined_results = {}
for i, result in enumerate(results):
if not isinstance(result, Exception):
combined_results[f"tool_{i}"] = result
return combined_results
3. Caching Strategy
from functools import lru_cache
import redis
# In-memory cache for frequent queries
@lru_cache(maxsize=1000)
def cached_product_lookup(sku: str) -> dict:
"""Cache product lookups in memory."""
return find_product_by_sku_tool([sku])
# Redis cache for session data
redis_client = redis.Redis(host='localhost', port=6379, db=0)
def cached_vector_search(query: str, ttl: int = 3600) -> list:
"""Cache vector search results in Redis."""
cache_key = f"vector_search:{hash(query)}"
# Try cache first
cached_result = redis_client.get(cache_key)
if cached_result:
return json.loads(cached_result)
# Execute search and cache result
result = find_product_details_by_description_tool(query)
redis_client.setex(cache_key, ttl, json.dumps(result))
return result
Accuracy Optimization
1. Prompt Engineering
def create_optimized_prompt(agent_type: str, context: dict) -> str:
"""Create performance-optimized prompts."""
base_prompts = {
"product": """You are a product specialist at BrickMart store {store_num}.
Focus on providing accurate product information using the available tools.
Guidelines:
- Always use exact SKU lookup when SKUs are mentioned
- Use semantic search for product descriptions
- Provide specific product details including price and availability
- If unsure, ask clarifying questions
Current context: {context}
""",
"inventory": """You are an inventory specialist at BrickMart.
Provide accurate, real-time inventory information.
Guidelines:
- Always check both store and warehouse inventory
- Include aisle locations when available
- Mention if items are on order or backordered
- Be specific about quantities and locations
Store: {store_num}
Context: {context}
"""
}
return base_prompts.get(agent_type, "").format(
store_num=context.get("store_num", ""),
context=context.get("additional_context", "")
)
def validate_tool_results(results: dict, query: str) -> dict:
"""Validate and improve tool results."""
validated_results = {}
for tool_name, result in results.items():
# Validate product results
if "product" in tool_name:
if validate_product_data(result):
validated_results[tool_name] = result
else:
# Retry with different parameters
validated_results[tool_name] = retry_product_search(query)
# Validate inventory results
elif "inventory" in tool_name:
if validate_inventory_data(result):
validated_results[tool_name] = result
else:
# Use fallback inventory check
validated_results[tool_name] = fallback_inventory_check(query)
return validated_results
def validate_product_data(data: dict) -> bool:
"""Validate product data completeness."""
required_fields = ["sku", "product_name", "price"]
return all(field in data and data[field] for field in required_fields)
Monitoring and Observability
import mlflow
import time
from functools import wraps
def monitor_agent_performance(agent_name: str):
"""Decorator to monitor agent performance."""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time()
with mlflow.start_run(run_name=f"{agent_name}_execution"):
try:
# Execute agent
result = func(*args, **kwargs)
# Log metrics
execution_time = time.time() - start_time
mlflow.log_metric("execution_time", execution_time)
mlflow.log_metric("success", 1)
mlflow.log_param("agent_name", agent_name)
# Log response quality metrics
if "messages" in result:
response_length = len(result["messages"][-1].content)
mlflow.log_metric("response_length", response_length)
return result
except Exception as e:
# Log failure
execution_time = time.time() - start_time
mlflow.log_metric("execution_time", execution_time)
mlflow.log_metric("success", 0)
mlflow.log_param("error", str(e))
raise
return wrapper
return decorator
# Usage
@monitor_agent_performance("product_agent")
def product_agent(state, config):
# Agent implementation
pass
Real-time Metrics Dashboard
import streamlit as st
import pandas as pd
import plotly.express as px
def create_performance_dashboard():
"""Create a real-time performance monitoring dashboard."""
st.title("Agent Performance Dashboard")
# Fetch recent metrics
metrics_df = get_recent_metrics(hours=24)
# Response time chart
fig_response_time = px.line(
metrics_df,
x="timestamp",
y="execution_time",
color="agent_name",
title="Agent Response Times (24h)"
)
st.plotly_chart(fig_response_time)
# Success rate chart
success_rates = calculate_success_rates(metrics_df)
fig_success = px.bar(
success_rates,
x="agent_name",
y="success_rate",
title="Agent Success Rates"
)
st.plotly_chart(fig_success)
# Tool performance
tool_metrics = get_tool_metrics(hours=24)
st.subheader("Tool Performance")
st.dataframe(tool_metrics)
def get_recent_metrics(hours: int) -> pd.DataFrame:
"""Fetch recent performance metrics from MLflow."""
# Implementation to fetch from MLflow tracking
pass
def calculate_success_rates(df: pd.DataFrame) -> pd.DataFrame:
"""Calculate success rates by agent."""
return df.groupby("agent_name")["success"].mean().reset_index()
Alerting System
import smtplib
from email.mime.text import MIMEText
class PerformanceAlerting:
"""System for alerting on performance issues."""
def __init__(self, thresholds: dict):
self.thresholds = thresholds
self.smtp_server = "smtp.company.com"
self.alert_email = "alerts@company.com"
def check_performance_thresholds(self, metrics: dict):
"""Check if any performance thresholds are exceeded."""
alerts = []
# Response time alerts
if metrics["avg_response_time"] > self.thresholds["max_response_time"]:
alerts.append(f"High response time: {metrics['avg_response_time']:.2f}s")
# Error rate alerts
if metrics["error_rate"] > self.thresholds["max_error_rate"]:
alerts.append(f"High error rate: {metrics['error_rate']:.2%}")
# Success rate alerts
if metrics["success_rate"] < self.thresholds["min_success_rate"]:
alerts.append(f"Low success rate: {metrics['success_rate']:.2%}")
if alerts:
self.send_alert(alerts)
def send_alert(self, alerts: list):
"""Send performance alert email."""
subject = "Agent Performance Alert"
body = "Performance issues detected:\n\n" + "\n".join(alerts)
msg = MIMEText(body)
msg["Subject"] = subject
msg["From"] = self.alert_email
msg["To"] = "team@company.com"
with smtplib.SMTP(self.smtp_server) as server:
server.send_message(msg)
# Usage
alerting = PerformanceAlerting({
"max_response_time": 5.0, # 5 seconds
"max_error_rate": 0.05, # 5%
"min_success_rate": 0.95 # 95%
})
Model Optimization
def optimize_model_parameters(agent_type: str) -> dict:
"""Optimize model parameters for different agent types."""
optimizations = {
"product": {
"temperature": 0.1, # Low for factual responses
"max_tokens": 500, # Moderate length
"top_p": 0.9
},
"inventory": {
"temperature": 0.0, # Deterministic for data
"max_tokens": 200, # Short responses
"top_p": 0.8
},
"comparison": {
"temperature": 0.3, # Some creativity for analysis
"max_tokens": 1000, # Longer for comparisons
"top_p": 0.95
},
"diy": {
"temperature": 0.5, # Creative for tutorials
"max_tokens": 1500, # Long for instructions
"top_p": 0.95
}
}
return optimizations.get(agent_type, {
"temperature": 0.2,
"max_tokens": 500,
"top_p": 0.9
})
Resource Management
import threading
from queue import Queue
import time
class AgentResourceManager:
"""Manage agent resources and prevent overload."""
def __init__(self, max_concurrent: int = 10):
self.max_concurrent = max_concurrent
self.active_agents = 0
self.queue = Queue()
self.lock = threading.Lock()
def execute_agent(self, agent_func, *args, **kwargs):
"""Execute agent with resource management."""
with self.lock:
if self.active_agents >= self.max_concurrent:
# Queue the request
self.queue.put((agent_func, args, kwargs))
return {"status": "queued", "position": self.queue.qsize()}
self.active_agents += 1
try:
# Execute agent
result = agent_func(*args, **kwargs)
return result
finally:
with self.lock:
self.active_agents -= 1
# Process queued requests
if not self.queue.empty():
queued_func, queued_args, queued_kwargs = self.queue.get()
threading.Thread(
target=self.execute_agent,
args=(queued_func,) + queued_args,
kwargs=queued_kwargs
).start()
# Usage
resource_manager = AgentResourceManager(max_concurrent=5)
result = resource_manager.execute_agent(product_agent, state, config)
Benchmarking
import time
import statistics
from concurrent.futures import ThreadPoolExecutor
def benchmark_agent(agent_func, test_cases: list, num_runs: int = 10) -> dict:
"""Benchmark agent performance across multiple test cases."""
results = {
"response_times": [],
"success_count": 0,
"error_count": 0,
"accuracy_scores": []
}
for test_case in test_cases:
for _ in range(num_runs):
start_time = time.time()
try:
result = agent_func(test_case["input"])
execution_time = time.time() - start_time
results["response_times"].append(execution_time)
results["success_count"] += 1
# Calculate accuracy if expected output provided
if "expected" in test_case:
accuracy = calculate_accuracy(result, test_case["expected"])
results["accuracy_scores"].append(accuracy)
except Exception as e:
results["error_count"] += 1
print(f"Error in test case: {e}")
# Calculate summary statistics
if results["response_times"]:
results["avg_response_time"] = statistics.mean(results["response_times"])
results["p95_response_time"] = statistics.quantiles(results["response_times"], n=20)[18]
results["p99_response_time"] = statistics.quantiles(results["response_times"], n=100)[98]
if results["accuracy_scores"]:
results["avg_accuracy"] = statistics.mean(results["accuracy_scores"])
results["success_rate"] = results["success_count"] / (results["success_count"] + results["error_count"])
return results
# Example usage
test_cases = [
{"input": "Tell me about SKU ABC123", "expected": "product_info"},
{"input": "Do you have wireless headphones?", "expected": "product_search"},
{"input": "Compare iPhone vs Samsung", "expected": "comparison"}
]
benchmark_results = benchmark_agent(product_agent, test_cases)
print(f"Average response time: {benchmark_results['avg_response_time']:.2f}s")
print(f"Success rate: {benchmark_results['success_rate']:.2%}")
Load Testing
def load_test_agent(agent_func, concurrent_users: int, duration_seconds: int):
"""Perform load testing on an agent."""
results = []
start_time = time.time()
def user_simulation():
"""Simulate a single user's requests."""
user_results = []
while time.time() - start_time < duration_seconds:
request_start = time.time()
try:
# Simulate user request
test_input = generate_random_test_input()
result = agent_func(test_input)
response_time = time.time() - request_start
user_results.append({
"timestamp": time.time(),
"response_time": response_time,
"success": True
})
except Exception as e:
response_time = time.time() - request_start
user_results.append({
"timestamp": time.time(),
"response_time": response_time,
"success": False,
"error": str(e)
})
# Wait before next request
time.sleep(1)
return user_results
# Run concurrent user simulations
with ThreadPoolExecutor(max_workers=concurrent_users) as executor:
futures = [executor.submit(user_simulation) for _ in range(concurrent_users)]
for future in futures:
results.extend(future.result())
# Analyze results
total_requests = len(results)
successful_requests = sum(1 for r in results if r["success"])
avg_response_time = statistics.mean([r["response_time"] for r in results])
return {
"total_requests": total_requests,
"successful_requests": successful_requests,
"success_rate": successful_requests / total_requests,
"avg_response_time": avg_response_time,
"requests_per_second": total_requests / duration_seconds
}