Runtime Monitoring
Overview: Comprehensive monitoring of MCP system operations, tool usage, and security events.
Runtime monitoring provides visibility into MCP system behavior, enabling early detection of security threats, performance issues, and operational anomalies. This guide covers monitoring strategies, implementation approaches, and best practices.
Monitoring Architecture
Comprehensive Monitoring Stack
# Comprehensive monitoring system for MCP
import time
import json
import logging
from typing import Dict, List, Optional, Any
from dataclasses import dataclass
from enum import Enum
import asyncio
from datetime import datetime
class MonitoringLevel(Enum):
DEBUG = "debug"
INFO = "info"
WARNING = "warning"
ERROR = "error"
CRITICAL = "critical"
@dataclass
class MonitoringEvent:
timestamp: float
level: MonitoringLevel
category: str
source: str
event_type: str
message: str
metadata: Dict[str, Any]
tags: List[str]
class MCPRuntimeMonitor:
def __init__(self):
self.metrics_collector = MetricsCollector()
self.log_aggregator = LogAggregator()
self.alert_manager = AlertManager()
self.anomaly_detector = AnomalyDetector()
self.dashboard = MonitoringDashboard()
def start_monitoring(self):
"""Start comprehensive monitoring system"""
# Start metrics collection
self.metrics_collector.start()
# Start log aggregation
self.log_aggregator.start()
# Start anomaly detection
self.anomaly_detector.start()
# Start real-time analysis
asyncio.create_task(self.analyze_events())
# Start dashboard
self.dashboard.start()
async def analyze_events(self):
"""Analyze monitoring events in real-time"""
while True:
try:
# Collect events from all sources
events = await self.collect_events()
# Process each event
for event in events:
await self.process_event(event)
await asyncio.sleep(1) # Analysis interval
except Exception as e:
logging.error(f"Error in event analysis: {e}")
async def process_event(self, event: MonitoringEvent):
"""Process individual monitoring event"""
# Log event
self.log_aggregator.log_event(event)
# Update metrics
self.metrics_collector.update_metrics(event)
# Check for anomalies
if self.anomaly_detector.is_anomaly(event):
await self.handle_anomaly(event)
# Check alert conditions
if self.should_alert(event):
await self.alert_manager.send_alert(event)
# Update dashboard
self.dashboard.update_display(event)
def should_alert(self, event: MonitoringEvent) -> bool:
"""Determine if event should trigger an alert"""
# Critical events always alert
if event.level == MonitoringLevel.CRITICAL:
return True
# Security events alert
if event.category == "security":
return True
# Performance degradation alerts
if event.category == "performance" and event.level == MonitoringLevel.WARNING:
return True
# Tool failure alerts
if event.category == "tool_execution" and event.level == MonitoringLevel.ERROR:
return True
return False
class MetricsCollector:
def __init__(self):
self.metrics_storage = {}
self.metric_definitions = self.define_metrics()
def define_metrics(self) -> Dict[str, Dict]:
"""Define metrics to collect"""
return {
# System metrics
"cpu_usage": {"type": "gauge", "unit": "percent"},
"memory_usage": {"type": "gauge", "unit": "bytes"},
"disk_usage": {"type": "gauge", "unit": "bytes"},
"network_throughput": {"type": "gauge", "unit": "bytes/sec"},
# Tool metrics
"tool_execution_count": {"type": "counter", "unit": "count"},
"tool_execution_time": {"type": "histogram", "unit": "seconds"},
"tool_success_rate": {"type": "gauge", "unit": "percent"},
"tool_error_rate": {"type": "gauge", "unit": "percent"},
# Security metrics
"authentication_attempts": {"type": "counter", "unit": "count"},
"authentication_failures": {"type": "counter", "unit": "count"},
"authorization_denials": {"type": "counter", "unit": "count"},
"suspicious_activities": {"type": "counter", "unit": "count"},
# Performance metrics
"request_latency": {"type": "histogram", "unit": "seconds"},
"request_throughput": {"type": "gauge", "unit": "requests/sec"},
"error_rate": {"type": "gauge", "unit": "percent"},
"availability": {"type": "gauge", "unit": "percent"}
}
def collect_system_metrics(self) -> Dict[str, float]:
"""Collect system-level metrics"""
import psutil
return {
"cpu_usage": psutil.cpu_percent(),
"memory_usage": psutil.virtual_memory().used,
"disk_usage": psutil.disk_usage('/').used,
"network_throughput": self.get_network_throughput()
}
def collect_tool_metrics(self) -> Dict[str, float]:
"""Collect tool execution metrics"""
# Get tool execution statistics
tool_stats = self.get_tool_statistics()
return {
"tool_execution_count": tool_stats.get("total_executions", 0),
"tool_success_rate": tool_stats.get("success_rate", 0),
"tool_error_rate": tool_stats.get("error_rate", 0),
"tool_execution_time": tool_stats.get("avg_execution_time", 0)
}
def collect_security_metrics(self) -> Dict[str, float]:
"""Collect security-related metrics"""
# Get security statistics
security_stats = self.get_security_statistics()
return {
"authentication_attempts": security_stats.get("auth_attempts", 0),
"authentication_failures": security_stats.get("auth_failures", 0),
"authorization_denials": security_stats.get("authz_denials", 0),
"suspicious_activities": security_stats.get("suspicious_count", 0)
}
def update_metrics(self, event: MonitoringEvent):
"""Update metrics based on monitoring event"""
# Update relevant metrics based on event
if event.category == "tool_execution":
self.increment_counter("tool_execution_count")
if event.level == MonitoringLevel.ERROR:
self.increment_counter("tool_error_count")
execution_time = event.metadata.get("execution_time", 0)
if execution_time > 0:
self.record_histogram("tool_execution_time", execution_time)
elif event.category == "security":
if event.event_type == "authentication_attempt":
self.increment_counter("authentication_attempts")
if event.level == MonitoringLevel.ERROR:
self.increment_counter("authentication_failures")
elif event.event_type == "authorization_denial":
self.increment_counter("authorization_denials")
elif event.category == "performance":
if event.event_type == "request_processed":
latency = event.metadata.get("latency", 0)
if latency > 0:
self.record_histogram("request_latency", latency)
Tool Usage Monitoring
Tool Execution Tracking
# Tool execution monitoring
class ToolExecutionMonitor:
def __init__(self):
self.execution_history = []
self.performance_metrics = {}
self.usage_patterns = {}
self.anomaly_detector = ToolAnomalyDetector()
def monitor_tool_execution(self, tool_name: str, user_id: str, parameters: Dict) -> ExecutionContext:
"""Monitor tool execution from start to finish"""
# Create execution context
execution_id = self.generate_execution_id()
context = ExecutionContext(
execution_id=execution_id,
tool_name=tool_name,
user_id=user_id,
parameters=parameters,
start_time=time.time(),
status="running"
)
# Record execution start
self.record_execution_start(context)
return context
def record_execution_start(self, context: ExecutionContext):
"""Record tool execution start"""
event = MonitoringEvent(
timestamp=time.time(),
level=MonitoringLevel.INFO,
category="tool_execution",
source="tool_monitor",
event_type="execution_start",
message=f"Tool {context.tool_name} execution started",
metadata={
"execution_id": context.execution_id,
"tool_name": context.tool_name,
"user_id": context.user_id,
"parameters": context.parameters
},
tags=["tool", "execution", "start"]
)
self.log_event(event)
def record_execution_complete(self, context: ExecutionContext, result: Dict):
"""Record tool execution completion"""
context.end_time = time.time()
context.execution_time = context.end_time - context.start_time
context.result = result
context.status = "completed"
# Determine success/failure
success = result.get("success", False)
level = MonitoringLevel.INFO if success else MonitoringLevel.ERROR
event = MonitoringEvent(
timestamp=time.time(),
level=level,
category="tool_execution",
source="tool_monitor",
event_type="execution_complete",
message=f"Tool {context.tool_name} execution completed",
metadata={
"execution_id": context.execution_id,
"tool_name": context.tool_name,
"user_id": context.user_id,
"execution_time": context.execution_time,
"success": success,
"result": result
},
tags=["tool", "execution", "complete"]
)
self.log_event(event)
# Update performance metrics
self.update_tool_performance_metrics(context)
# Analyze usage patterns
self.analyze_usage_patterns(context)
# Check for anomalies
self.check_execution_anomalies(context)
def update_tool_performance_metrics(self, context: ExecutionContext):
"""Update performance metrics for tool"""
tool_name = context.tool_name
if tool_name not in self.performance_metrics:
self.performance_metrics[tool_name] = {
"total_executions": 0,
"successful_executions": 0,
"failed_executions": 0,
"total_execution_time": 0,
"average_execution_time": 0,
"min_execution_time": float('inf'),
"max_execution_time": 0
}
metrics = self.performance_metrics[tool_name]
# Update counters
metrics["total_executions"] += 1
metrics["total_execution_time"] += context.execution_time
if context.result.get("success", False):
metrics["successful_executions"] += 1
else:
metrics["failed_executions"] += 1
# Update timing metrics
metrics["average_execution_time"] = metrics["total_execution_time"] / metrics["total_executions"]
metrics["min_execution_time"] = min(metrics["min_execution_time"], context.execution_time)
metrics["max_execution_time"] = max(metrics["max_execution_time"], context.execution_time)
def analyze_usage_patterns(self, context: ExecutionContext):
"""Analyze tool usage patterns"""
user_id = context.user_id
tool_name = context.tool_name
# Update user usage patterns
if user_id not in self.usage_patterns:
self.usage_patterns[user_id] = {}
if tool_name not in self.usage_patterns[user_id]:
self.usage_patterns[user_id][tool_name] = {
"usage_count": 0,
"last_used": 0,
"usage_times": [],
"parameter_patterns": {}
}
user_tool_pattern = self.usage_patterns[user_id][tool_name]
# Update usage statistics
user_tool_pattern["usage_count"] += 1
user_tool_pattern["last_used"] = context.end_time
user_tool_pattern["usage_times"].append(context.start_time)
# Analyze parameter patterns
self.analyze_parameter_patterns(user_tool_pattern, context.parameters)
def check_execution_anomalies(self, context: ExecutionContext):
"""Check for anomalies in tool execution"""
anomalies = self.anomaly_detector.detect_anomalies(context)
for anomaly in anomalies:
event = MonitoringEvent(
timestamp=time.time(),
level=MonitoringLevel.WARNING,
category="anomaly",
source="tool_monitor",
event_type="execution_anomaly",
message=f"Anomaly detected in tool {context.tool_name}: {anomaly.description}",
metadata={
"execution_id": context.execution_id,
"tool_name": context.tool_name,
"user_id": context.user_id,
"anomaly_type": anomaly.type,
"anomaly_score": anomaly.score,
"description": anomaly.description
},
tags=["anomaly", "tool", "execution"]
)
self.log_event(event)
Security Event Monitoring
Security Event Detection
# Security event monitoring
class SecurityEventMonitor:
def __init__(self):
self.threat_detector = ThreatDetector()
self.behavioral_analyzer = BehavioralAnalyzer()
self.security_rules = self.load_security_rules()
def monitor_authentication_event(self, user_id: str, event_type: str, result: str, metadata: Dict):
"""Monitor authentication events"""
level = MonitoringLevel.INFO if result == "success" else MonitoringLevel.WARNING
event = MonitoringEvent(
timestamp=time.time(),
level=level,
category="security",
source="auth_monitor",
event_type="authentication",
message=f"Authentication {event_type} for user {user_id}: {result}",
metadata={
"user_id": user_id,
"event_type": event_type,
"result": result,
"ip_address": metadata.get("ip_address"),
"user_agent": metadata.get("user_agent"),
"timestamp": metadata.get("timestamp")
},
tags=["security", "authentication", result]
)
self.process_security_event(event)
def monitor_authorization_event(self, user_id: str, resource: str, action: str, result: str, metadata: Dict):
"""Monitor authorization events"""
level = MonitoringLevel.INFO if result == "allowed" else MonitoringLevel.WARNING
event = MonitoringEvent(
timestamp=time.time(),
level=level,
category="security",
source="authz_monitor",
event_type="authorization",
message=f"Authorization {action} on {resource} for user {user_id}: {result}",
metadata={
"user_id": user_id,
"resource": resource,
"action": action,
"result": result,
"permissions": metadata.get("permissions", []),
"context": metadata.get("context", {})
},
tags=["security", "authorization", result]
)
self.process_security_event(event)
def monitor_data_access_event(self, user_id: str, data_type: str, operation: str, metadata: Dict):
"""Monitor data access events"""
event = MonitoringEvent(
timestamp=time.time(),
level=MonitoringLevel.INFO,
category="security",
source="data_monitor",
event_type="data_access",
message=f"Data access: {operation} on {data_type} by user {user_id}",
metadata={
"user_id": user_id,
"data_type": data_type,
"operation": operation,
"data_classification": metadata.get("data_classification"),
"access_context": metadata.get("access_context"),
"data_volume": metadata.get("data_volume")
},
tags=["security", "data", "access"]
)
self.process_security_event(event)
def process_security_event(self, event: MonitoringEvent):
"""Process security event through analysis pipeline"""
# Apply security rules
for rule in self.security_rules:
if rule.matches(event):
rule_result = rule.evaluate(event)
if rule_result.triggered:
self.handle_security_rule_trigger(event, rule, rule_result)
# Behavioral analysis
behavioral_analysis = self.behavioral_analyzer.analyze_event(event)
if behavioral_analysis.anomalous:
self.handle_behavioral_anomaly(event, behavioral_analysis)
# Threat detection
threat_analysis = self.threat_detector.analyze_event(event)
if threat_analysis.threat_detected:
self.handle_threat_detection(event, threat_analysis)
def handle_security_rule_trigger(self, event: MonitoringEvent, rule: SecurityRule, result: RuleResult):
"""Handle security rule trigger"""
alert_event = MonitoringEvent(
timestamp=time.time(),
level=MonitoringLevel.CRITICAL,
category="security_alert",
source="security_monitor",
event_type="rule_trigger",
message=f"Security rule triggered: {rule.name}",
metadata={
"original_event": event.metadata,
"rule_name": rule.name,
"rule_description": rule.description,
"severity": result.severity,
"recommended_action": result.recommended_action
},
tags=["security", "alert", "rule", rule.name]
)
self.log_event(alert_event)
def load_security_rules(self) -> List[SecurityRule]:
"""Load security detection rules"""
return [
# Brute force detection
SecurityRule(
name="brute_force_detection",
description="Detect brute force authentication attempts",
condition=lambda event: (
event.category == "security" and
event.event_type == "authentication" and
event.metadata.get("result") == "failure"
),
threshold=5,
time_window=300, # 5 minutes
severity="high"
),
# Privilege escalation detection
SecurityRule(
name="privilege_escalation",
description="Detect privilege escalation attempts",
condition=lambda event: (
event.category == "security" and
event.event_type == "authorization" and
event.metadata.get("result") == "denied" and
"admin" in event.metadata.get("resource", "")
),
threshold=3,
time_window=60, # 1 minute
severity="critical"
),
# Unusual data access
SecurityRule(
name="unusual_data_access",
description="Detect unusual data access patterns",
condition=lambda event: (
event.category == "security" and
event.event_type == "data_access" and
event.metadata.get("data_classification") == "sensitive"
),
threshold=10,
time_window=3600, # 1 hour
severity="medium"
)
]
Performance Monitoring
System Performance Tracking
# Performance monitoring
class PerformanceMonitor:
def __init__(self):
self.performance_metrics = {}
self.baseline_metrics = {}
self.performance_thresholds = self.define_thresholds()
def define_thresholds(self) -> Dict[str, Dict]:
"""Define performance thresholds"""
return {
"response_time": {"warning": 1.0, "critical": 5.0},
"cpu_usage": {"warning": 80.0, "critical": 95.0},
"memory_usage": {"warning": 85.0, "critical": 95.0},
"disk_usage": {"warning": 80.0, "critical": 90.0},
"error_rate": {"warning": 5.0, "critical": 10.0},
"throughput": {"warning": 100, "critical": 50} # requests/second
}
def monitor_request_performance(self, request_id: str, start_time: float, end_time: float, success: bool):
"""Monitor individual request performance"""
response_time = end_time - start_time
# Check response time thresholds
level = MonitoringLevel.INFO
if response_time > self.performance_thresholds["response_time"]["critical"]:
level = MonitoringLevel.CRITICAL
elif response_time > self.performance_thresholds["response_time"]["warning"]:
level = MonitoringLevel.WARNING
event = MonitoringEvent(
timestamp=time.time(),
level=level,
category="performance",
source="performance_monitor",
event_type="request_performance",
message=f"Request {request_id} completed in {response_time:.2f}s",
metadata={
"request_id": request_id,
"response_time": response_time,
"success": success,
"start_time": start_time,
"end_time": end_time
},
tags=["performance", "request", "response_time"]
)
self.log_event(event)
def monitor_system_performance(self):
"""Monitor system-level performance"""
# Collect system metrics
system_metrics = self.collect_system_metrics()
# Check each metric against thresholds
for metric_name, value in system_metrics.items():
if metric_name in self.performance_thresholds:
thresholds = self.performance_thresholds[metric_name]
level = MonitoringLevel.INFO
if value > thresholds["critical"]:
level = MonitoringLevel.CRITICAL
elif value > thresholds["warning"]:
level = MonitoringLevel.WARNING
event = MonitoringEvent(
timestamp=time.time(),
level=level,
category="performance",
source="performance_monitor",
event_type="system_performance",
message=f"System {metric_name}: {value}",
metadata={
"metric_name": metric_name,
"value": value,
"threshold_warning": thresholds["warning"],
"threshold_critical": thresholds["critical"]
},
tags=["performance", "system", metric_name]
)
self.log_event(event)
Alerting and Notifications
Alert Management System
# Alert management
class AlertManager:
def __init__(self):
self.alert_channels = {}
self.alert_rules = {}
self.alert_history = []
self.notification_service = NotificationService()
def setup_alert_channels(self):
"""Setup alert notification channels"""
self.alert_channels = {
"email": EmailAlertChannel(),
"slack": SlackAlertChannel(),
"pagerduty": PagerDutyAlertChannel(),
"webhook": WebhookAlertChannel()
}
def create_alert(self, event: MonitoringEvent, alert_type: str, severity: str) -> Alert:
"""Create alert from monitoring event"""
alert = Alert(
id=self.generate_alert_id(),
timestamp=time.time(),
event=event,
alert_type=alert_type,
severity=severity,
status="new",
description=self.generate_alert_description(event),
metadata=event.metadata
)
return alert
def process_alert(self, alert: Alert):
"""Process alert through notification channels"""
# Determine notification channels based on severity
channels = self.get_notification_channels(alert.severity)
# Send notifications
for channel in channels:
try:
self.send_notification(channel, alert)
except Exception as e:
logging.error(f"Failed to send alert via {channel}: {e}")
# Record alert
self.alert_history.append(alert)
# Update alert status
alert.status = "sent"
def get_notification_channels(self, severity: str) -> List[str]:
"""Get notification channels based on severity"""
channel_mapping = {
"low": ["email"],
"medium": ["email", "slack"],
"high": ["email", "slack", "pagerduty"],
"critical": ["email", "slack", "pagerduty", "webhook"]
}
return channel_mapping.get(severity, ["email"])
def send_notification(self, channel: str, alert: Alert):
"""Send notification via specified channel"""
if channel in self.alert_channels:
self.alert_channels[channel].send_alert(alert)
else:
logging.warning(f"Unknown alert channel: {channel}")
Dashboard and Visualization
Monitoring Dashboard
# Monitoring dashboard
class MonitoringDashboard:
def __init__(self):
self.dashboard_data = {}
self.widgets = {}
self.update_interval = 30 # seconds
def setup_dashboard(self):
"""Setup monitoring dashboard"""
# System overview widget
self.widgets["system_overview"] = SystemOverviewWidget()
# Tool performance widget
self.widgets["tool_performance"] = ToolPerformanceWidget()
# Security events widget
self.widgets["security_events"] = SecurityEventsWidget()
# Alert summary widget
self.widgets["alert_summary"] = AlertSummaryWidget()
# Performance metrics widget
self.widgets["performance_metrics"] = PerformanceMetricsWidget()
def update_dashboard(self, events: List[MonitoringEvent]):
"""Update dashboard with latest events"""
# Update each widget
for widget_name, widget in self.widgets.items():
try:
widget.update(events)
except Exception as e:
logging.error(f"Failed to update widget {widget_name}: {e}")
# Update dashboard data
self.dashboard_data = {
"last_updated": time.time(),
"widgets": {name: widget.get_data() for name, widget in self.widgets.items()}
}
def get_dashboard_data(self) -> Dict:
"""Get current dashboard data"""
return self.dashboard_data
Integration and Deployment
Monitoring Integration
# Complete monitoring integration
class ComprehensiveMonitoringSystem:
def __init__(self):
self.runtime_monitor = MCPRuntimeMonitor()
self.tool_monitor = ToolExecutionMonitor()
self.security_monitor = SecurityEventMonitor()
self.performance_monitor = PerformanceMonitor()
self.alert_manager = AlertManager()
self.dashboard = MonitoringDashboard()
def initialize_monitoring(self):
"""Initialize complete monitoring system"""
# Setup components
self.runtime_monitor.start_monitoring()
self.alert_manager.setup_alert_channels()
self.dashboard.setup_dashboard()
# Start background tasks
asyncio.create_task(self.monitoring_loop())
async def monitoring_loop(self):
"""Main monitoring loop"""
while True:
try:
# Collect all monitoring data
events = await self.collect_all_events()
# Process events
for event in events:
await self.process_monitoring_event(event)
# Update dashboard
self.dashboard.update_dashboard(events)
await asyncio.sleep(self.update_interval)
except Exception as e:
logging.error(f"Error in monitoring loop: {e}")
async def process_monitoring_event(self, event: MonitoringEvent):
"""Process monitoring event through all systems"""
# Security event processing
if event.category == "security":
self.security_monitor.process_security_event(event)
# Performance event processing
elif event.category == "performance":
self.performance_monitor.process_performance_event(event)
# Tool execution event processing
elif event.category == "tool_execution":
self.tool_monitor.process_tool_event(event)
# Check for alerts
if self.should_create_alert(event):
alert = self.alert_manager.create_alert(
event,
alert_type=event.category,
severity=self.determine_severity(event)
)
await self.alert_manager.process_alert(alert)
Runtime Monitoring provides comprehensive visibility into MCP system operations, enabling proactive security management and performance optimization.