From 4aed224f361044d4ed7d7fafdd40c49d13579359 Mon Sep 17 00:00:00 2001 From: DeWitt Gibson Date: Thu, 21 Nov 2024 09:11:45 -0700 Subject: [PATCH 1/8] Create constraints.py --- constraints.py | 9 +++++++++ 1 file changed, 9 insertions(+) create mode 100644 constraints.py diff --git a/constraints.py b/constraints.py new file mode 100644 index 0000000..b52fcb3 --- /dev/null +++ b/constraints.py @@ -0,0 +1,9 @@ +monitor = AISecurityMonitor( + model_name="my_model", + input_constraints={ + 'max_value': 1.0, + 'min_value': -1.0, + 'max_gradient': 50, + 'min_sparsity': 0.05 + } +) From 222bb8372007218a32f4e493e1c2c1b8254d4251 Mon Sep 17 00:00:00 2001 From: DeWitt Gibson Date: Thu, 21 Nov 2024 09:12:28 -0700 Subject: [PATCH 2/8] Create validate_inputs.py --- validate_inputs.py | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 validate_inputs.py diff --git a/validate_inputs.py b/validate_inputs.py new file mode 100644 index 0000000..17fbe2e --- /dev/null +++ b/validate_inputs.py @@ -0,0 +1,6 @@ +security_report = monitor.protect(input_data) +if security_report['allow_inference']: + # Run your model + prediction = model.predict(input_data) +else: + # Handle security violation From 7409fa112698a8ffce94af34ba41362742082ce3 Mon Sep 17 00:00:00 2001 From: DeWitt Gibson Date: Thu, 21 Nov 2024 09:13:39 -0700 Subject: [PATCH 3/8] Create ai_security_model_v2.py --- ai_security_model_v2.py | 194 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 194 insertions(+) create mode 100644 ai_security_model_v2.py diff --git a/ai_security_model_v2.py b/ai_security_model_v2.py new file mode 100644 index 0000000..deade26 --- /dev/null +++ b/ai_security_model_v2.py @@ -0,0 +1,194 @@ +import numpy as np +from typing import Any, Dict, List, Optional, Union +import logging +import hashlib +import json +from datetime import datetime + +class AIModelProtector: + """Security monitoring and protection for AI model deployments""" + + def __init__(self, + model_name: str, + input_constraints: Dict[str, Any] = None, + log_file: Optional[str] = None): + """ + Initialize the security protector + + Args: + model_name: Identifier for the protected model + input_constraints: Dictionary of input validation rules + log_file: Optional custom log file path + """ + self.model_name = model_name + self.input_constraints = input_constraints or {} + self.request_history = [] + + # Configure logging + log_file = log_file or f"security_{model_name}_{datetime.now():%Y%m%d}.log" + logging.basicConfig( + filename=log_file, + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s' + ) + + def validate_input(self, input_data: Union[np.ndarray, List, Dict]) -> Dict[str, Any]: + """ + Validate model input against security constraints + + Returns: + Dict containing validation results and any detected issues + """ + validation_result = { + "valid": True, + "issues": [] + } + + try: + # Check for null/empty inputs + if input_data is None or (hasattr(input_data, "__len__") and len(input_data) == 0): + validation_result["valid"] = False + validation_result["issues"].append("Empty or null input detected") + return validation_result + + # Convert to numpy array for numerical checks + input_array = np.asarray(input_data) + + # Size limits + if "max_size" in self.input_constraints: + if input_array.size > self.input_constraints["max_size"]: + validation_result["valid"] = False + validation_result["issues"].append(f"Input size {input_array.size} exceeds maximum {self.input_constraints['max_size']}") + + # Value range checks + if "max_value" in self.input_constraints: + if np.any(input_array > self.input_constraints["max_value"]): + validation_result["valid"] = False + validation_result["issues"].append(f"Values exceed maximum {self.input_constraints['max_value']}") + + if "min_value" in self.input_constraints: + if np.any(input_array < self.input_constraints["min_value"]): + validation_result["valid"] = False + validation_result["issues"].append(f"Values below minimum {self.input_constraints['min_value']}") + + # Check for anomalous patterns + anomaly_result = self._check_anomalous_patterns(input_array) + if anomaly_result["anomalies_detected"]: + validation_result["valid"] = False + validation_result["issues"].extend(anomaly_result["details"]) + + except Exception as e: + validation_result["valid"] = False + validation_result["issues"].append(f"Validation error: {str(e)}") + logging.error(f"Input validation failed: {str(e)}") + + return validation_result + + def _check_anomalous_patterns(self, input_array: np.ndarray) -> Dict[str, Any]: + """ + Check for patterns that might indicate attacks + """ + result = { + "anomalies_detected": False, + "details": [] + } + + # Extreme gradients (potential adversarial patterns) + if len(input_array.shape) > 1: + gradients = np.gradient(input_array.astype(float)) + max_gradient = self.input_constraints.get("max_gradient", 100) + if np.any(np.abs(gradients) > max_gradient): + result["anomalies_detected"] = True + result["details"].append(f"Extreme gradients detected (>{max_gradient})") + + # Unusual sparsity + sparsity = np.count_nonzero(input_array) / input_array.size + min_sparsity = self.input_constraints.get("min_sparsity", 0.01) + if sparsity < min_sparsity: + result["anomalies_detected"] = True + result["details"].append(f"Unusually sparse input (sparsity={sparsity:.3f})") + + return result + + def log_request(self, input_data: Any, metadata: Dict[str, Any] = None) -> None: + """ + Log request details for monitoring + """ + try: + request_hash = hashlib.sha256( + json.dumps(str(input_data)).encode() + ).hexdigest() + + log_entry = { + "timestamp": datetime.now().isoformat(), + "input_hash": request_hash, + "input_shape": np.asarray(input_data).shape, + "metadata": metadata or {} + } + + self.request_history.append(log_entry) + logging.info(f"Request logged: {log_entry}") + + except Exception as e: + logging.error(f"Failed to log request: {str(e)}") + + def analyze_requests(self, window_minutes: int = 60) -> Dict[str, Any]: + """ + Analyze recent requests for suspicious patterns + """ + if not self.request_history: + return {} + + cutoff_time = datetime.now().timestamp() - (window_minutes * 60) + recent_requests = [ + req for req in self.request_history + if datetime.fromisoformat(req["timestamp"]).timestamp() > cutoff_time + ] + + analysis = { + "total_requests": len(recent_requests), + "unique_inputs": len(set(req["input_hash"] for req in recent_requests)), + "request_rate": len(recent_requests) / window_minutes, + "suspicious_patterns": [] + } + + # Check for repeated inputs + hash_counts = {} + for req in recent_requests: + hash_counts[req["input_hash"]] = hash_counts.get(req["input_hash"], 0) + 1 + + # Flag suspicious patterns + threshold = max(5, len(recent_requests) * 0.1) # 10% of requests or at least 5 + suspicious = {h: c for h, c in hash_counts.items() if c > threshold} + if suspicious: + analysis["suspicious_patterns"].append({ + "type": "repeated_inputs", + "details": suspicious + }) + + return analysis + + def protect(self, input_data: Any, metadata: Dict[str, Any] = None) -> Dict[str, Any]: + """ + Main protection function to run before model inference + + Returns: + Dictionary with security assessment and recommendations + """ + security_report = { + "timestamp": datetime.now().isoformat(), + "validation_result": self.validate_input(input_data), + "request_analysis": self.analyze_requests(), + "allow_inference": False + } + + # Log the request + self.log_request(input_data, metadata) + + # Determine if inference should be allowed + security_report["allow_inference"] = ( + security_report["validation_result"]["valid"] and + not security_report["request_analysis"].get("suspicious_patterns", []) + ) + + return security_report From 921c852e74836fffbaa6d4d211ffe2c95683fb9a Mon Sep 17 00:00:00 2001 From: DeWitt Gibson Date: Thu, 21 Nov 2024 09:14:39 -0700 Subject: [PATCH 4/8] Update README.md --- README.md | 29 +++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/README.md b/README.md index 7354826..b700b97 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,31 @@ # ai-model-security-monitor Security monitoring tool that helps protect AI models from common attacks. + +The tool provides these key security features: + +## Input validation: + +Size limits +Value range checks +Null/empty input detection +Format validation + +## Attack detection: + +Adversarial pattern detection +Unusual input structure detection +Gradient analysis + +## Request monitoring: + +Request logging +Rate monitoring +Repeated input detection +Pattern analysis + +## Detailed reporting: + +Validation results +Detected issues +Request analysis +Security recommendations From b8d95f2fbb27c25afe3c68ba5f54cefe100c2637 Mon Sep 17 00:00:00 2001 From: DeWitt Gibson Date: Thu, 21 Nov 2024 09:15:37 -0700 Subject: [PATCH 5/8] Create security_team_alerts.py --- security_team_alerts.py | 214 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 214 insertions(+) create mode 100644 security_team_alerts.py diff --git a/security_team_alerts.py b/security_team_alerts.py new file mode 100644 index 0000000..a42ff58 --- /dev/null +++ b/security_team_alerts.py @@ -0,0 +1,214 @@ +import logging +from datetime import datetime, timedelta +import numpy as np +from typing import Dict, Any, List, Optional +import smtplib +from email.mime.text import MIMEText +import json +from collections import defaultdict + +class AISecurityMonitor: + def __init__( + self, + model_name: str, + alert_settings: Dict[str, Any], + logging_path: Optional[str] = None + ): + """ + Initialize security monitor with alert capabilities + + Args: + model_name: Name of the model being protected + alert_settings: Dictionary containing alert configuration + { + "email_recipients": list of security team email addresses, + "smtp_settings": SMTP server configuration, + "alert_thresholds": { + "max_requests_per_minute": int, + "suspicious_pattern_threshold": float, + "failed_attempts_threshold": int + } + } + """ + self.model_name = model_name + self.alert_settings = alert_settings + self.incident_log = [] + self.request_history = defaultdict(list) # IP address -> list of timestamps + + # Configure logging + logging.basicConfig( + filename=logging_path or f"security_{model_name}_{datetime.now():%Y%m%d}.log", + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s' + ) + + def detect_threat(self, request_data: Dict[str, Any]) -> Dict[str, Any]: + """ + Analyze request for potential security threats + """ + threat_assessment = { + "timestamp": datetime.now().isoformat(), + "severity": "low", + "threats_detected": [], + "details": {} + } + + # Check request rate + if self._check_rate_limit(request_data["ip_address"]): + threat_assessment["threats_detected"].append("rate_limit_exceeded") + threat_assessment["severity"] = "medium" + + # Check input patterns + pattern_check = self._analyze_input_patterns(request_data["input_data"]) + if pattern_check["suspicious_patterns"]: + threat_assessment["threats_detected"].extend(pattern_check["suspicious_patterns"]) + threat_assessment["severity"] = "high" + threat_assessment["details"]["patterns"] = pattern_check + + # Log the threat assessment + if threat_assessment["threats_detected"]: + self._log_incident(threat_assessment, request_data) + + return threat_assessment + + def _check_rate_limit(self, ip_address: str) -> bool: + """ + Check if request rate from IP exceeds threshold + """ + current_time = datetime.now() + recent_requests = [ + timestamp for timestamp in self.request_history[ip_address] + if current_time - timestamp < timedelta(minutes=1) + ] + + # Update request history + self.request_history[ip_address] = recent_requests + [current_time] + + return len(recent_requests) > self.alert_settings["alert_thresholds"]["max_requests_per_minute"] + + def _analyze_input_patterns(self, input_data: Any) -> Dict[str, Any]: + """ + Analyze input for suspicious patterns + """ + result = { + "suspicious_patterns": [], + "details": {} + } + + try: + input_array = np.asarray(input_data) + + # Check for extreme values + if np.any(np.abs(input_array) > 1e6): + result["suspicious_patterns"].append("extreme_values") + + # Check for unusual sparsity + sparsity = np.count_nonzero(input_array) / input_array.size + if sparsity < 0.01: + result["suspicious_patterns"].append("suspicious_sparsity") + + # Check for repeating patterns + if len(input_array.shape) > 1: + gradient = np.gradient(input_array.astype(float)) + if np.all(np.abs(gradient) > 100): + result["suspicious_patterns"].append("potential_adversarial_pattern") + + except Exception as e: + logging.error(f"Pattern analysis error: {str(e)}") + result["suspicious_patterns"].append("analysis_error") + + return result + + def _log_incident(self, threat_assessment: Dict[str, Any], request_data: Dict[str, Any]) -> None: + """ + Log security incident and send alerts if needed + """ + incident = { + "timestamp": threat_assessment["timestamp"], + "severity": threat_assessment["severity"], + "threats": threat_assessment["threats_detected"], + "ip_address": request_data["ip_address"], + "details": threat_assessment["details"] + } + + # Log to file + logging.warning(f"Security incident detected: {json.dumps(incident)}") + + # Store in incident history + self.incident_log.append(incident) + + # Send alert if severity warrants it + if threat_assessment["severity"] in ["medium", "high"]: + self._send_alert(incident) + + def _send_alert(self, incident: Dict[str, Any]) -> None: + """ + Send alert to security team + """ + try: + subject = f"Security Alert: {self.model_name} - {incident['severity'].upper()} Severity" + body = f""" +Security incident detected: +------------------------- +Timestamp: {incident['timestamp']} +Severity: {incident['severity']} +Threats Detected: {', '.join(incident['threats'])} +IP Address: {incident['ip_address']} + +Details: +{json.dumps(incident['details'], indent=2)} + +Please review the incident and take appropriate action. +""" + + msg = MIMEText(body) + msg['Subject'] = subject + msg['From'] = self.alert_settings["smtp_settings"]["sender"] + msg['To'] = ', '.join(self.alert_settings["email_recipients"]) + + # Send email + with smtplib.SMTP( + self.alert_settings["smtp_settings"]["server"], + self.alert_settings["smtp_settings"]["port"] + ) as server: + if self.alert_settings["smtp_settings"].get("use_tls"): + server.starttls() + if "username" in self.alert_settings["smtp_settings"]: + server.login( + self.alert_settings["smtp_settings"]["username"], + self.alert_settings["smtp_settings"]["password"] + ) + server.send_message(msg) + + except Exception as e: + logging.error(f"Failed to send alert: {str(e)}") + + def get_incident_summary(self, hours: int = 24) -> Dict[str, Any]: + """ + Get summary of recent security incidents + """ + cutoff_time = datetime.now() - timedelta(hours=hours) + recent_incidents = [ + incident for incident in self.incident_log + if datetime.fromisoformat(incident["timestamp"]) > cutoff_time + ] + + return { + "total_incidents": len(recent_incidents), + "by_severity": { + severity: len([i for i in recent_incidents if i["severity"] == severity]) + for severity in ["low", "medium", "high"] + }, + "unique_ips": len(set(incident["ip_address"] for incident in recent_incidents)), + "most_common_threats": self._get_common_threats(recent_incidents) + } + + def _get_common_threats(self, incidents: List[Dict[str, Any]]) -> Dict[str, int]: + """ + Get count of most common threat types + """ + threat_counts = defaultdict(int) + for incident in incidents: + for threat in incident["threats"]: + threat_counts[threat] += 1 + return dict(sorted(threat_counts.items(), key=lambda x: x[1], reverse=True)) From 7efec0efd89de0dd94a3e59eaea21ae0d60d455d Mon Sep 17 00:00:00 2001 From: DeWitt Gibson Date: Thu, 21 Nov 2024 09:16:04 -0700 Subject: [PATCH 6/8] Update README.md --- README.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/README.md b/README.md index b700b97..a3129fb 100644 --- a/README.md +++ b/README.md @@ -29,3 +29,5 @@ Validation results Detected issues Request analysis Security recommendations + +## Security Team Alerting From 21876012f0eddb0306bff73b22237ab14f8b03c2 Mon Sep 17 00:00:00 2001 From: DeWitt Gibson Date: Thu, 21 Nov 2024 09:20:35 -0700 Subject: [PATCH 7/8] Update README.md --- README.md | 172 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 172 insertions(+) diff --git a/README.md b/README.md index a3129fb..0b2a56e 100644 --- a/README.md +++ b/README.md @@ -31,3 +31,175 @@ Request analysis Security recommendations ## Security Team Alerting + +# AI Model Security Tools + +A comprehensive security toolkit for protecting AI model deployments, including model protection, threat monitoring, and security assessment capabilities. + +## Components + +1. **AIModelProtector**: Real-time protection and monitoring for AI model endpoints +2. **AISecurityMonitor**: Advanced threat detection and team notification system +3. **ChatbotThreatModeler**: Threat assessment and security evaluation for chatbot implementations + +## Installation + +```bash +pip install -r requirements.txt +``` + +Required dependencies: +``` +numpy>=1.21.0 +pandas>=1.3.0 +typing>=3.7.4 +logging>=0.5.1.2 +smtplib +email +``` + +## Quick Start + +### Basic Protection Setup + +```python +from ai_model_protector import AIModelProtector +from ai_security_monitor import AISecurityMonitor +from chatbot_threat_modeler import ChatbotThreatModeler + +# Initialize base protection +protector = AIModelProtector( + model_name="production_model", + input_constraints={ + "max_size": 1000000, + "max_value": 1.0, + "min_value": -1.0, + "max_gradient": 50 + } +) + +# Setup security monitoring +monitor = AISecurityMonitor( + model_name="production_model", + alert_settings={ + "email_recipients": ["security@company.com"], + "smtp_settings": { + "server": "smtp.company.com", + "port": 587, + "sender": "ai-alerts@company.com", + "use_tls": True + }, + "alert_thresholds": { + "max_requests_per_minute": 100, + "suspicious_pattern_threshold": 0.8 + } + } +) + +# Initialize threat modeling +threat_modeler = ChatbotThreatModeler() +``` + +### Deployment Integration + +```python +def process_model_request(input_data, request_metadata): + # 1. Check security protections + security_check = protector.protect(input_data) + if not security_check["allow_inference"]: + return {"error": "Security check failed", "details": security_check} + + # 2. Monitor for threats + threat_assessment = monitor.detect_threat({ + "ip_address": request_metadata["ip"], + "input_data": input_data + }) + + if threat_assessment["severity"] == "high": + return {"error": "Request blocked due to security risk"} + + # 3. Process request if safe + try: + prediction = model.predict(input_data) + return {"prediction": prediction} + except Exception as e: + monitor.log_incident({ + "type": "inference_error", + "details": str(e) + }) + return {"error": "Processing failed"} +``` + +## Security Features + +### Model Protection +- Input validation and sanitization +- Pattern analysis for adversarial attacks +- Request rate limiting +- Anomaly detection + +### Security Monitoring +- Real-time threat detection +- Team notifications +- Incident logging +- Traffic analysis +- IP-based monitoring + +### Threat Modeling +- Security control assessment +- Risk scoring +- Threat identification +- Compliance checking +- Recommendation generation + +## Configuration + +### Environment Variables +```bash +SECURITY_LOG_PATH=/path/to/logs +ALERT_SMTP_SERVER=smtp.company.com +ALERT_SMTP_PORT=587 +ALERT_SENDER=ai-security@company.com +ALERT_RECIPIENTS=security-team@company.com +``` + +### Security Thresholds +```python +SECURITY_CONFIG = { + "max_requests_per_minute": 100, + "suspicious_pattern_threshold": 0.8, + "max_failed_attempts": 5, + "session_timeout": 3600, + "min_request_interval": 1.0 +} +``` + +## Best Practices + +1. **API Key Management** + - Rotate keys regularly + - Use separate keys for different environments + - Monitor key usage + +2. **Logging** + - Enable comprehensive logging + - Store logs securely + - Implement log rotation + +3. **Monitoring** + - Set up alerts for suspicious activities + - Monitor resource usage + - Track error rates + +4. **Regular Assessment** + - Run threat modeling weekly + - Review security controls + - Update security thresholds + +## Contributing + +Please see CONTRIBUTING.md for guidelines on contributing to this project. + +## License + +MIT License - See LICENSE.md for details From 14179c6d88f6ea73a9b5d0d1e0ed1030e7d2b43f Mon Sep 17 00:00:00 2001 From: DeWitt Gibson Date: Thu, 21 Nov 2024 09:22:27 -0700 Subject: [PATCH 8/8] Create CONTRIBUTING.md --- CONTRIBUTING.md | 67 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 67 insertions(+) create mode 100644 CONTRIBUTING.md diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md new file mode 100644 index 0000000..4b58c3d --- /dev/null +++ b/CONTRIBUTING.md @@ -0,0 +1,67 @@ +# Contributing to AI Model Security Tools + +Thank you for your interest in contributing to this security toolkit. We welcome contributions that improve the security, reliability, and usability of these tools. + +## Core Principles + +1. **Do No Harm**: All contributions must be intended for defensive security purposes only +2. **Privacy First**: Never collect or expose sensitive user data +3. **Transparency**: Document all security mechanisms and changes +4. **Responsibility**: Test thoroughly before submitting changes + +## How to Contribute + +1. Fork the repository +2. Create a feature branch (`git checkout -b feature/amazing-security-feature`) +3. Commit your changes (`git commit -m 'Add new security feature'`) +4. Push to your branch (`git push origin feature/amazing-security-feature`) +5. Open a Pull Request + +## Development Guidelines + +- Add tests for any new features +- Update documentation for changes +- Follow PEP 8 style guidelines +- Use type hints for all new code +- Add logging for security-relevant events +- Comment security-critical code sections + +## Security Requirements + +- No code that could enable attacks or exploitation +- No weakening of existing security measures +- No collection of unnecessary user data +- No hard-coded credentials or secrets +- No disabled security features by default + +## Testing + +- Add unit tests for new features +- Include integration tests where appropriate +- Test edge cases and error conditions +- Verify no security weaknesses introduced + +## Documentation + +When adding or modifying features: +- Update README.md +- Add docstrings to functions/classes +- Document security implications +- Include usage examples + +## Need Help? + +- Open an issue for bugs or security concerns +- Use discussions for feature ideas +- Tag security-critical issues appropriately + +## Code of Conduct + +- Be respectful and constructive +- Focus on improving security +- No malicious or harmful contributions +- Report security issues responsibly + +## License + +By contributing, you agree that your contributions will be licensed under the MIT License.