OAuth Security Patterns

Overview: Secure OAuth implementations for MCP tool authorization and invocation.

OAuth provides a robust framework for secure authorization in MCP systems. This guide covers secure OAuth patterns, implementation best practices, and security considerations for MCP deployments.

OAuth Flow Security

Authorization Code Flow with PKCE

The most secure OAuth flow for MCP implementations:

# Secure OAuth implementation with PKCE
import hashlib
import base64
import secrets
from urllib.parse import urlencode

class SecureOAuthClient:
    def __init__(self, client_id, authorization_endpoint, token_endpoint):
        self.client_id = client_id
        self.authorization_endpoint = authorization_endpoint
        self.token_endpoint = token_endpoint
        
    def generate_pkce_challenge(self):
        """Generate PKCE code verifier and challenge"""
        # Generate cryptographically secure random code verifier
        code_verifier = base64.urlsafe_b64encode(
            secrets.token_bytes(32)
        ).decode('utf-8').rstrip('=')
        
        # Generate SHA256 challenge
        challenge = hashlib.sha256(code_verifier.encode('utf-8')).digest()
        code_challenge = base64.urlsafe_b64encode(challenge).decode('utf-8').rstrip('=')
        
        return code_verifier, code_challenge
    
    def build_authorization_url(self, scope, redirect_uri, state=None):
        """Build secure authorization URL"""
        if not state:
            state = secrets.token_urlsafe(32)
        
        code_verifier, code_challenge = self.generate_pkce_challenge()
        
        # Store code_verifier securely for token exchange
        self.store_code_verifier(state, code_verifier)
        
        params = {
            'response_type': 'code',
            'client_id': self.client_id,
            'redirect_uri': redirect_uri,
            'scope': scope,
            'state': state,
            'code_challenge': code_challenge,
            'code_challenge_method': 'S256'
        }
        
        return f"{self.authorization_endpoint}?{urlencode(params)}"
    
    def exchange_code_for_token(self, code, redirect_uri, state):
        """Exchange authorization code for access token"""
        code_verifier = self.retrieve_code_verifier(state)
        
        token_data = {
            'grant_type': 'authorization_code',
            'client_id': self.client_id,
            'code': code,
            'redirect_uri': redirect_uri,
            'code_verifier': code_verifier
        }
        
        # Make secure token request
        response = self.make_token_request(token_data)
        
        # Validate and return token
        return self.validate_token_response(response)

Scope Minimization

Principle of Least Privilege

Implement minimal necessary scopes for MCP tools:

# Secure scope management
class MCPScopeManager:
    def __init__(self):
        self.tool_scopes = {
            'file_reader': ['read:files'],
            'database_connector': ['read:database'],
            'api_client': ['read:api', 'write:api'],
            'admin_tool': ['admin:system']
        }
        
        self.scope_hierarchy = {
            'read:files': ['read:files'],
            'write:files': ['read:files', 'write:files'],
            'admin:system': ['read:files', 'write:files', 'admin:system']
        }
    
    def get_minimal_scopes(self, tool_name, requested_actions):
        """Get minimal scopes for tool actions"""
        tool_scopes = self.tool_scopes.get(tool_name, [])
        minimal_scopes = set()
        
        for action in requested_actions:
            required_scope = self.map_action_to_scope(action)
            if required_scope in tool_scopes:
                minimal_scopes.add(required_scope)
        
        return list(minimal_scopes)
    
    def validate_scope_access(self, token_scopes, required_scope):
        """Validate token has required scope"""
        for token_scope in token_scopes:
            if required_scope in self.scope_hierarchy.get(token_scope, []):
                return True
        return False

Redirect URI Validation

Secure Redirect Handling

# Secure redirect URI validation
import re
from urllib.parse import urlparse

class RedirectURIValidator:
    def __init__(self):
        self.allowed_schemes = ['https']
        self.allowed_hosts = ['localhost', 'mcp-client.example.com']
        self.blocked_patterns = [
            r'.*\.evil\.com',
            r'javascript:',
            r'data:',
            r'file:'
        ]
    
    def validate_redirect_uri(self, redirect_uri, registered_uris):
        """Validate redirect URI security"""
        
        # Check if URI is in registered URIs
        if redirect_uri not in registered_uris:
            return False, "Redirect URI not registered"
        
        # Parse URI
        parsed = urlparse(redirect_uri)
        
        # Validate scheme
        if parsed.scheme not in self.allowed_schemes:
            return False, "Invalid scheme"
        
        # Validate host
        if parsed.hostname not in self.allowed_hosts:
            return False, "Invalid host"
        
        # Check blocked patterns
        for pattern in self.blocked_patterns:
            if re.match(pattern, redirect_uri):
                return False, "Blocked URI pattern"
        
        # Additional security checks
        if self.contains_suspicious_content(redirect_uri):
            return False, "Suspicious content detected"
        
        return True, "Valid redirect URI"
    
    def contains_suspicious_content(self, uri):
        """Check for suspicious content in URI"""
        suspicious_patterns = [
            'javascript:',
            'data:',
            'vbscript:',
            '<script',
            'onload=',
            'onerror='
        ]
        
        return any(pattern in uri.lower() for pattern in suspicious_patterns)

Token Management

Secure Token Storage and Handling

# Secure token management
import jwt
import time
from cryptography.fernet import Fernet

class SecureTokenManager:
    def __init__(self, encryption_key):
        self.cipher = Fernet(encryption_key)
        self.token_storage = {}
        
    def store_token(self, user_id, token_data):
        """Securely store access token"""
        # Encrypt token data
        encrypted_token = self.cipher.encrypt(
            json.dumps(token_data).encode('utf-8')
        )
        
        # Store with expiration
        self.token_storage[user_id] = {
            'token': encrypted_token,
            'expires_at': time.time() + token_data.get('expires_in', 3600),
            'created_at': time.time()
        }
    
    def retrieve_token(self, user_id):
        """Retrieve and decrypt access token"""
        if user_id not in self.token_storage:
            return None
        
        token_entry = self.token_storage[user_id]
        
        # Check expiration
        if time.time() > token_entry['expires_at']:
            del self.token_storage[user_id]
            return None
        
        # Decrypt token
        encrypted_token = token_entry['token']
        token_data = json.loads(
            self.cipher.decrypt(encrypted_token).decode('utf-8')
        )
        
        return token_data
    
    def refresh_token(self, user_id, refresh_token):
        """Refresh expired access token"""
        refresh_data = {
            'grant_type': 'refresh_token',
            'refresh_token': refresh_token,
            'client_id': self.client_id
        }
        
        response = self.make_token_request(refresh_data)
        
        if response.get('access_token'):
            self.store_token(user_id, response)
            return response
        
        return None

Security Best Practices

OAuth Security Checklist

# OAuth security validation
class OAuthSecurityValidator:
    def __init__(self):
        self.security_checks = [
            'pkce_enabled',
            'https_only',
            'state_validation',
            'redirect_uri_validation',
            'scope_minimization',
            'token_expiration',
            'refresh_token_rotation'
        ]
    
    def validate_oauth_implementation(self, config):
        """Validate OAuth implementation security"""
        results = {}
        
        # Check PKCE support
        results['pkce_enabled'] = self.check_pkce_support(config)
        
        # Check HTTPS enforcement
        results['https_only'] = self.check_https_enforcement(config)
        
        # Check state parameter usage
        results['state_validation'] = self.check_state_validation(config)
        
        # Check redirect URI validation
        results['redirect_uri_validation'] = self.check_redirect_validation(config)
        
        # Check scope minimization
        results['scope_minimization'] = self.check_scope_minimization(config)
        
        # Check token expiration
        results['token_expiration'] = self.check_token_expiration(config)
        
        # Check refresh token rotation
        results['refresh_token_rotation'] = self.check_refresh_rotation(config)
        
        return results
    
    def generate_security_report(self, results):
        """Generate OAuth security assessment report"""
        passed = sum(1 for result in results.values() if result)
        total = len(results)
        
        report = {
            'score': f"{passed}/{total}",
            'percentage': (passed / total) * 100,
            'recommendations': self.generate_recommendations(results)
        }
        
        return report

Common OAuth Vulnerabilities

Vulnerability Prevention

# OAuth vulnerability prevention
class OAuthVulnerabilityPrevention:
    def __init__(self):
        self.common_vulnerabilities = {
            'authorization_code_interception': self.prevent_code_interception,
            'csrf_attacks': self.prevent_csrf_attacks,
            'redirect_uri_manipulation': self.prevent_redirect_manipulation,
            'scope_elevation': self.prevent_scope_elevation,
            'token_leakage': self.prevent_token_leakage
        }
    
    def prevent_code_interception(self, config):
        """Prevent authorization code interception"""
        return {
            'pkce_required': True,
            'https_only': True,
            'short_code_lifetime': 600,  # 10 minutes
            'one_time_use': True
        }
    
    def prevent_csrf_attacks(self, config):
        """Prevent CSRF attacks"""
        return {
            'state_parameter_required': True,
            'state_entropy': 32,  # bytes
            'state_validation': True,
            'state_timeout': 300  # 5 minutes
        }
    
    def prevent_redirect_manipulation(self, config):
        """Prevent redirect URI manipulation"""
        return {
            'exact_uri_matching': True,
            'uri_registration_required': True,
            'localhost_restrictions': True,
            'scheme_validation': True
        }

Integration with MCP

MCP-Specific OAuth Patterns

# MCP OAuth integration
class MCPOAuthIntegration:
    def __init__(self, oauth_client):
        self.oauth_client = oauth_client
        
    def authorize_tool_access(self, tool_name, user_id, requested_scopes):
        """Authorize MCP tool access via OAuth"""
        
        # Validate tool registration
        if not self.is_tool_registered(tool_name):
            raise ValueError("Tool not registered")
        
        # Get minimal required scopes
        minimal_scopes = self.get_minimal_scopes(tool_name, requested_scopes)
        
        # Generate authorization URL
        auth_url = self.oauth_client.build_authorization_url(
            scope=' '.join(minimal_scopes),
            redirect_uri=self.get_tool_redirect_uri(tool_name),
            state=self.generate_tool_state(tool_name, user_id)
        )
        
        return auth_url
    
    def handle_tool_callback(self, code, state):
        """Handle OAuth callback for tool authorization"""
        
        # Validate state
        tool_name, user_id = self.validate_tool_state(state)
        
        # Exchange code for token
        token_data = self.oauth_client.exchange_code_for_token(
            code, 
            self.get_tool_redirect_uri(tool_name),
            state
        )
        
        # Store token for tool usage
        self.store_tool_token(tool_name, user_id, token_data)
        
        return token_data

References and Resources

Standards and Specifications

Security Guidelines


OAuth Security Patterns provide a robust foundation for secure authorization in MCP systems while maintaining usability and interoperability.