diff --git a/docs/mint.json b/docs/mint.json index 07a6c3ce4c..70dea56dfb 100644 --- a/docs/mint.json +++ b/docs/mint.json @@ -244,6 +244,7 @@ "providers/documentation/site24x7-provider", "providers/documentation/slack-provider", "providers/documentation/smtp-provider", + "providers/documentation/snmp-provider", "providers/documentation/snowflake-provider", "providers/documentation/splunk-provider", "providers/documentation/squadcast-provider", diff --git a/docs/providers/documentation/snmp-provider.mdx b/docs/providers/documentation/snmp-provider.mdx new file mode 100644 index 0000000000..cc8d0f2f1a --- /dev/null +++ b/docs/providers/documentation/snmp-provider.mdx @@ -0,0 +1,118 @@ +--- +title: "SNMP" +sidebarTitle: "SNMP Provider" +description: "SNMP provider allows you to receive SNMP traps from network devices and generate alerts in Keep." +--- +import AutoGeneratedSnippet from '/snippets/providers/snmp-snippet-autogenerated.mdx'; + +## Overview + +The Simple Network Management Protocol (SNMP) is an Internet Standard protocol for collecting and organizing information about managed devices on IP networks and for modifying that information to change device behavior. The SNMP provider for Keep allows you to receive SNMP traps from network devices and generate alerts based on these traps. + +## Inputs + +The SNMP provider receives SNMP trap notifications from network devices. + +## Outputs + +The SNMP provider generates alerts in Keep with the following information: +- Alert title (based on trap OID) +- Alert description (containing trap data details) +- Alert severity (mapped from trap content or default) +- Source information +- Raw trap data + +## Authentication Parameters + +| Parameter | Required | Description | Default | +|-----------|----------|-------------|---------| +| listen_address | No | IP address to listen on for SNMP traps | 0.0.0.0 | +| port | No | UDP port to listen on for SNMP traps | 162 | +| community | No | SNMP community string for authentication | public | +| severity_mapping | No | JSON mapping of OID patterns to Keep severity levels | null | + +## Connecting with the Provider + +### Configuration Example + +Add the SNMP provider to your Keep configuration: + +```yaml +providers: + snmp: + type: snmp + authentication: + listen_address: 0.0.0.0 + port: 1162 + community: public + severity_mapping: '{"1.3.6.1.6.3.1.1.5.3": "WARNING", "1.3.6.1.6.3.1.1.5.5": "CRITICAL"}' +``` + +### Setting Up Network Devices + +Configure your network devices to send SNMP traps to the Keep server IP address and port where the SNMP provider is listening. + +Example configuration for a Cisco device: +``` +snmp-server enable traps +snmp-server host version 2c +``` + +## Testing + +You can test the SNMP provider using tools like `snmptrap` to send test traps: + +```bash +snmptrap -v 2c -c public :162 '' 1.3.6.1.6.3.1.1.5.3 1.3.6.1.2.1.2.2.1.1.2 i 2 +``` + +For testing with Docker containers, you can use the following configuration: + +```yaml +services: + snmp-agent: + image: eclipse-mosquitto:latest + container_name: snmp-agent + ports: + - "1883:1883" + networks: + - keep_default + + snmp-tools: + image: debian:bullseye + container_name: snmp-tools + command: tail -f /dev/null + networks: + - keep_default + volumes: + - ./:/data + +networks: + keep_default: + external: true +``` + +With this setup: +1. The `snmp-agent` container runs the Eclipse Mosquitto image which can be used for message brokering +2. The `snmp-tools` container provides a Debian environment where you can install and run SNMP tools +3. Install SNMP tools in the container with: `docker exec snmp-tools apt-get update && docker exec snmp-tools apt-get install -y snmp snmptrapd` +4. Generate test traps from the tools container: `docker exec snmp-tools snmptrap -v 2c -c public keep-api:162 '' 1.3.6.1.6.3.1.1.5.3 1.3.6.1.2.1.2.2.1.1.2 i 2` + +## Notes + +- The SNMP provider currently supports SNMPv2c only. +- Port 162 is the standard port for SNMP traps and typically requires elevated privileges to bind. +- Custom severity mapping allows you to map specific OIDs to alert severity levels. + +## Troubleshooting + +- **No traps received**: Ensure port 162 is accessible and not blocked by firewalls. +- **Permission issues**: Binding to port 162 typically requires elevated privileges. Consider using a higher port (>1024) for testing. +- **Mapping issues**: Check the syntax of your severity_mapping JSON string. + +## Useful Links + +- [SNMP RFC 3411](https://tools.ietf.org/html/rfc3411) - SNMP Architecture +- [Net-SNMP Documentation](http://www.net-snmp.org/docs/) + + \ No newline at end of file diff --git a/docs/providers/overview.md b/docs/providers/overview.md index 3ef8a0c7c6..136e505437 100644 --- a/docs/providers/overview.md +++ b/docs/providers/overview.md @@ -106,6 +106,7 @@ By leveraging Keep Providers, users are able to deeply integrate Keep with the t - [Site24x7](/providers/documentation/site24x7-provider) - [Slack](/providers/documentation/slack-provider) - [SMTP](/providers/documentation/smtp-provider) +- [SNMP](/providers/documentation/snmp-provider) - [Snowflake](/providers/documentation/snowflake-provider) - [Splunk](/providers/documentation/splunk-provider) - [Squadcast](/providers/documentation/squadcast-provider) diff --git a/docs/providers/overview.mdx b/docs/providers/overview.mdx index dd6ebeef36..ecd54abc00 100644 --- a/docs/providers/overview.mdx +++ b/docs/providers/overview.mdx @@ -796,6 +796,14 @@ By leveraging Keep Providers, users are able to deeply integrate Keep with the t } > + + } +> + AlertSeverity: + """Determine alert severity based on the configured mapping.""" + # Default severity + default_severity = AlertSeverity.WARNING + + if not self._severity_mapping: + return default_severity + + # Check if any OIDs match the patterns in the severity mapping + for pattern, severity_str in self._severity_mapping.items(): + # Check if pattern matches any OID + for oid in oids: + if pattern in oid: + return self._parse_severity(severity_str) + + # Check if pattern matches any value + for value in data.values(): + if pattern in value: + return self._parse_severity(severity_str) + + return default_severity + + def _parse_severity(self, severity_str: str) -> AlertSeverity: + """ + Parse severity string into AlertSeverity enum value. + + Args: + severity_str: Severity string from trap data + + Returns: + AlertSeverity enum value + """ + severity_map = { + "INFO": AlertSeverity.INFO, + "WARNING": AlertSeverity.WARNING, + "ERROR": AlertSeverity.HIGH, # 'ERROR' maps to 'high' in Keep system + "CRITICAL": AlertSeverity.CRITICAL, + } + + return severity_map.get(severity_str, AlertSeverity.WARNING) + + def get_logs(self, limit: int = 100) -> List[Dict[str, Any]]: + """Get logs from the provider.""" + logs = [] + + # Add debugging information + debug_info = self.debug_info() + logs.append({ + "message": "SNMP Provider Debug Information", + "timestamp": datetime.utcnow().isoformat(), + "level": "INFO", + "details": debug_info + }) + + # Add basic status information + status = "Running" if self.running else "Stopped" + logs.append({ + "message": f"SNMP trap receiver status: {status}", + "timestamp": datetime.utcnow().isoformat(), + "level": "INFO", + "details": { + "status": status, + "listen_address": self.authentication_config.listen_address, + "port": self.authentication_config.port + } + }) + + # Add log for when the trap receiver was started + if self.running: + logs.append({ + "message": f"SNMP trap receiver is running on {self.authentication_config.listen_address}:{self.authentication_config.port}", + "timestamp": datetime.utcnow().isoformat(), + "level": "INFO", + "details": { + "community": "***" if self.authentication_config.community else "Not set" + } + }) + + # Check if we have a severity mapping + if self._severity_mapping: + severity_info = {k: v for k, v in self._severity_mapping.items()} + logs.append({ + "message": "SNMP trap severity mapping configured", + "timestamp": datetime.utcnow().isoformat(), + "level": "INFO", + "details": { + "severity_mapping": severity_info + } + }) + else: + logs.append({ + "message": "No SNMP trap severity mapping configured", + "timestamp": datetime.utcnow().isoformat(), + "level": "WARNING", + "details": { + "default_severity": "WARNING" + } + }) + + return logs + + def debug_info(self) -> Dict[str, Any]: + """Get debugging information about the SNMP provider.""" + # Test UDP port binding + port_test = {"status": "Unknown", "message": "", "port": self.authentication_config.port} + try: + test_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + test_socket.bind((self.authentication_config.listen_address, self.authentication_config.port)) + test_socket.close() + port_test = {"status": "Success", "message": "Port is available", "port": self.authentication_config.port} + except Exception as e: + port_test = { + "status": "Failed", + "message": str(e), + "port": self.authentication_config.port, + "reason": f"Port {self.authentication_config.port} might already be in use or requires elevated privileges" + } + + # Get information about the SNMP engine + engine_info = {"status": "Not initialized"} + if self.snmp_engine: + try: + engine_info = { + "status": "Initialized", + "transport_dispatcher_jobs": getattr(self.snmp_engine.transportDispatcher, "jobsAmount", "Unknown"), + "snmp_engine_id": str(getattr(self.snmp_engine, "snmpEngineID", b"Not available")), + } + except Exception as e: + engine_info = {"status": "Error", "message": str(e)} + + return { + "provider_id": self.provider_id, + "running": self.running, + "configuration": { + "listen_address": self.authentication_config.listen_address, + "port": self.authentication_config.port, + "community": "***" if self.authentication_config.community else "Not set", + "has_severity_mapping": bool(self._severity_mapping) + }, + "port_test": port_test, + "snmp_engine": engine_info, + "thread_active": bool(self.trap_thread and self.trap_thread.is_alive()) if self.trap_thread else False, + } + + def validate_scopes(self) -> Dict[str, Union[bool, str]]: + """Validate provider scopes.""" + # Check if we can bind to the specified UDP port + try: + test_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + test_socket.bind((self.authentication_config.listen_address, self.authentication_config.port)) + test_socket.close() + return {"receive_traps": True} + except Exception as e: + return {"receive_traps": f"Failed to bind to {self.authentication_config.listen_address}:{self.authentication_config.port}: {str(e)}"} + + @staticmethod + def get_alert_schema() -> Dict[str, Any]: + """Get the alert schema description for this provider.""" + return { + "type": "object", + "properties": { + "title": {"type": "string", "description": "Alert title"}, + "description": {"type": "string", "description": "Detailed description of the SNMP trap"}, + "severity": {"type": "string", "enum": ["info", "warning", "high", "critical"]}, + "source": {"type": "array", "items": {"type": "string"}, "description": "Sources of the SNMP trap"}, + "raw_data": {"type": "object", "description": "Raw trap data as OID-value pairs"}, + } + } + + def dispose(self): + """Clean up resources and release all ports used by the SNMP trap receiver.""" + if not self.running: + return + + self.logger.info("Stopping SNMP trap receiver") + self.running = False + + if self.snmp_engine: + try: + transport_dispatcher = self.snmp_engine.transportDispatcher + + transport_dispatcher.jobFinished(1) + + transport_dispatcher.closeDispatcher() + + self.logger.info(f"SNMP engine transport dispatcher stopped, port {self.authentication_config.port} released") + + except Exception as e: + self.logger.error(f"Error during SNMP engine cleanup: {e}") + finally: + self.snmp_engine = None + + + if self.trap_thread and self.trap_thread.is_alive(): + try: + self.trap_thread.join(timeout=5.0) + if self.trap_thread.is_alive(): + self.logger.warning("SNMP trap thread did not stop gracefully within timeout") + except Exception as e: + self.logger.error(f"Error joining SNMP trap thread: {e}") + finally: + self.trap_thread = None + + @property + def is_consumer(self) -> bool: + """Mark this provider as a consumer that can be started/stopped.""" + return True + + def status(self) -> bool: + """Check if the SNMP trap receiver is running.""" + return self.running + + @staticmethod + def simulate_alert() -> Dict[str, Any]: + """Simulate an SNMP trap alert for testing purposes.""" + return { + "title": "SNMP Trap: coldStart", + "description": "SNMP Trap received with the following data:\n1.3.6.1.6.3.1.1.5.1: coldStart\n1.3.6.1.2.1.1.1.0: Keep SNMP Test Device", + "severity": "info", + "fingerprint": f"snmp-test-trap-{datetime.utcnow().strftime('%Y%m%d%H%M%S')}", + "source": ["snmp"], + "labels": { + "trap_oid": "1.3.6.1.6.3.1.1.5.1", + "device": "Keep SNMP Test Device", + "trap_type": "coldStart" + }, + "raw_data": json.dumps({ + "1.3.6.1.6.3.1.1.5.1": "coldStart", + "1.3.6.1.2.1.1.1.0": "Keep SNMP Test Device" + }), + "created_at": datetime.utcnow().isoformat(), + } + + @staticmethod + def format_alert(event: Dict[str, Any], provider_instance: "SnmpProvider" = None) -> Dict[str, Any]: + """ + Format an SNMP event into a Keep alert. + + Args: + event: The raw SNMP event data + provider_instance: Optional provider instance for context + + Returns: + Formatted alert dictionary + """ + # The event is already in the correct format from _handle_trap or simulate_alert + # Just ensure all required fields are present + formatted_alert = { + "title": event.get("title", "SNMP Trap Received"), + "description": event.get("description", "SNMP trap received"), + "severity": event.get("severity", "warning"), + "fingerprint": event.get("fingerprint", f"snmp-{datetime.utcnow().strftime('%Y%m%d%H%M%S')}"), + "source": event.get("source", ["snmp"]), + "labels": event.get("labels", {}), + "created_at": event.get("created_at", datetime.utcnow().isoformat()), + } + + # Include raw_data if present + if "raw_data" in event: + formatted_alert["raw_data"] = event["raw_data"] + + return formatted_alert \ No newline at end of file diff --git a/poetry.lock b/poetry.lock index 88f22874df..d65040bad5 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 2.1.4 and should not be changed by hand. +# This file is automatically @generated by Poetry 2.2.1 and should not be changed by hand. [[package]] name = "aiofiles" @@ -1931,7 +1931,7 @@ google-api-core = {version = ">=1.34.1,<2.0.dev0 || >=2.11.dev0,<3.0.0dev", extr google-auth = ">=2.14.1,<2.24.0 || >2.24.0,<2.25.0 || >2.25.0,<3.0.0dev" proto-plus = [ {version = ">=1.25.0,<2.0.0dev", markers = "python_version >= \"3.13\""}, - {version = ">=1.22.3,<2.0.0dev", markers = "python_version < \"3.13\""}, + {version = ">=1.22.3,<2.0.0dev"}, ] protobuf = ">=3.20.2,<4.21.0 || >4.21.0,<4.21.1 || >4.21.1,<4.21.2 || >4.21.2,<4.21.3 || >4.21.3,<4.21.4 || >4.21.4,<4.21.5 || >4.21.5,<6.0.0dev" @@ -2611,7 +2611,7 @@ version = "3.1.6" description = "A very fast and expressive template engine." optional = false python-versions = ">=3.7" -groups = ["dev"] +groups = ["main", "dev"] files = [ {file = "jinja2-3.1.6-py3-none-any.whl", hash = "sha256:85ece4451f492d0c13c5dd7c13a64681a86afae63a5f347908daf103ce6d2f67"}, {file = "jinja2-3.1.6.tar.gz", hash = "sha256:0137fb05990d35f1275a587e9aee6d56da821fc83491a0fb838183be43f66d6d"}, @@ -3679,6 +3679,18 @@ files = [ dev = ["pre-commit", "tox"] testing = ["pytest", "pytest-benchmark"] +[[package]] +name = "ply" +version = "3.11" +description = "Python Lex & Yacc" +optional = false +python-versions = "*" +groups = ["main"] +files = [ + {file = "ply-3.11-py2.py3-none-any.whl", hash = "sha256:096f9b8350b65ebd2fd1346b12452efe5b9607f7482813ffca50c22722a807ce"}, + {file = "ply-3.11.tar.gz", hash = "sha256:00c7c1aaa88358b9c765b6d3000c6eec0ba42abca5351b095321aef446081da3"}, +] + [[package]] name = "portalocker" version = "2.10.1" @@ -4140,30 +4152,43 @@ tornado = ["tornado (>=5.0.0)"] [[package]] name = "pyasn1" -version = "0.6.1" -description = "Pure-Python implementation of ASN.1 types and DER/BER/CER codecs (X.208)" +version = "0.4.8" +description = "ASN.1 types and codecs" optional = false -python-versions = ">=3.8" +python-versions = "*" groups = ["main"] files = [ - {file = "pyasn1-0.6.1-py3-none-any.whl", hash = "sha256:0d632f46f2ba09143da3a8afe9e33fb6f92fa2320ab7e886e2d0f7672af84629"}, - {file = "pyasn1-0.6.1.tar.gz", hash = "sha256:6f580d2bdd84365380830acf45550f2511469f673cb4a5ae3857a3170128b034"}, + {file = "pyasn1-0.4.8-py2.py3-none-any.whl", hash = "sha256:39c7e2ec30515947ff4e87fb6f456dfc6e84857d34be479c9d4a4ba4bf46aa5d"}, + {file = "pyasn1-0.4.8.tar.gz", hash = "sha256:aef77c9fb94a3ac588e87841208bdec464471d9871bd5050a287cc9a475cd0ba"}, ] [[package]] name = "pyasn1-modules" -version = "0.4.1" -description = "A collection of ASN.1-based protocols modules" +version = "0.2.8" +description = "A collection of ASN.1-based protocols modules." optional = false -python-versions = ">=3.8" +python-versions = "*" groups = ["main"] files = [ - {file = "pyasn1_modules-0.4.1-py3-none-any.whl", hash = "sha256:49bfa96b45a292b711e986f222502c1c9a5e1f4e568fc30e2574a6c7d07838fd"}, - {file = "pyasn1_modules-0.4.1.tar.gz", hash = "sha256:c28e2dbf9c06ad61c71a075c7e0f9fd0f1b0bb2d2ad4377f240d33ac2ab60a7c"}, + {file = "pyasn1-modules-0.2.8.tar.gz", hash = "sha256:905f84c712230b2c592c19470d3ca8d552de726050d1d1716282a1f6146be65e"}, + {file = "pyasn1_modules-0.2.8-py2.py3-none-any.whl", hash = "sha256:a50b808ffeb97cb3601dd25981f6b016cbb3d31fbf57a8b8a87428e6158d0c74"}, ] [package.dependencies] -pyasn1 = ">=0.4.6,<0.7.0" +pyasn1 = ">=0.4.6,<0.5.0" + +[[package]] +name = "pyasyncore" +version = "1.0.4" +description = "Make asyncore available for Python 3.12 onwards" +optional = false +python-versions = "*" +groups = ["main"] +markers = "python_version >= \"3.12\"" +files = [ + {file = "pyasyncore-1.0.4-py3-none-any.whl", hash = "sha256:9e5f6dc9dc057c56370b7a5cdb4c4670fd4b0556de2913ed1f428cd6a5366895"}, + {file = "pyasyncore-1.0.4.tar.gz", hash = "sha256:2c7a8b9b750ba6260f1e5a061456d61320a80579c6a43d42183417da89c7d5d6"}, +] [[package]] name = "pycodestyle" @@ -4523,6 +4548,56 @@ files = [ {file = "pyproject_hooks-1.2.0.tar.gz", hash = "sha256:1e859bd5c40fae9448642dd871adf459e5e2084186e8d2c2a79a824c970da1f8"}, ] +[[package]] +name = "pysmi-lextudio" +version = "1.4.3" +description = "A pure-Python implementation of SNMP/SMI MIB parsing and conversion library." +optional = false +python-versions = "<4.0,>=3.8" +groups = ["main"] +files = [ + {file = "pysmi_lextudio-1.4.3-py3-none-any.whl", hash = "sha256:cb629c6386a30c976f83c29fc71e53b06d60f15094d0c0114cf8d095351b76e5"}, + {file = "pysmi_lextudio-1.4.3.tar.gz", hash = "sha256:7d255fb38669410835acf6c2e8ab41975a6d8e64593b119552e36ecba004054f"}, +] + +[package.dependencies] +Jinja2 = ">=3.1.3,<4.0.0" +ply = ">=3.11,<4.0" +requests = ">=2.26.0,<3.0.0" + +[[package]] +name = "pysnmp-lextudio" +version = "5.0.34" +description = "" +optional = false +python-versions = ">=3.7,<4.0" +groups = ["main"] +files = [ + {file = "pysnmp_lextudio-5.0.34-py3-none-any.whl", hash = "sha256:c9bc405d9048b6a2017a4db395646c8cab9f2bb4e9a84d08aaf1ad7310ef7a9a"}, + {file = "pysnmp_lextudio-5.0.34.tar.gz", hash = "sha256:cbb8963e4639fb3800fa40cea118dfec234de433509cbb878f7f42bad51b07c3"}, +] + +[package.dependencies] +pyasn1 = ">=0.4.8,<0.5.0 || >0.5.0" +pyasyncore = {version = ">=1.0.0,<2.0.0", markers = "python_version >= \"3.12\""} +pysmi-lextudio = ">=1.0.4,<2.0.0" +pysnmpcrypto = ">=0.0.4,<0.0.5" + +[[package]] +name = "pysnmpcrypto" +version = "0.0.4" +description = "Strong cryptography support for PySNMP (SNMP library for Python)" +optional = false +python-versions = "*" +groups = ["main"] +files = [ + {file = "pysnmpcrypto-0.0.4-py2.py3-none-any.whl", hash = "sha256:5889733caa030f45d9e03ea9d6370fb06426a8cb7f839aabbcdde33c6f634679"}, + {file = "pysnmpcrypto-0.0.4.tar.gz", hash = "sha256:b635fb3b1ec6637b9a0033f50506214e16eb84574b1d25ab027bbde4caa55129"}, +] + +[package.dependencies] +cryptography = {version = "*", markers = "python_version >= \"3.4\""} + [[package]] name = "pytest" version = "8.3.4" @@ -6107,4 +6182,4 @@ type = ["pytest-mypy"] [metadata] lock-version = "2.1" python-versions = ">=3.11,<3.14" -content-hash = "48583ec01b5b1cde6d02f4c6ea0a2827eab937b14ff21b62abc91602c400efde" +content-hash = "693d765c52b22b17d666a15789be4ddf53959cfc455ddfcc07679af1cf7363aa" diff --git a/pyproject.toml b/pyproject.toml index d68ab96a95..96592027a4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -6,7 +6,11 @@ authors = ["Keep Alerting LTD"] packages = [{include = "keep"}] [tool.poetry.dependencies] +pysnmp-lextudio = "5.0.34" +pyasn1 = "0.4.8" +pyasn1-modules = "0.2.8" python = ">=3.11,<3.14" +pyOpenSSL = ">=23.2.0" click = "^8.1.3" pyyaml = "^6.0" requests = "^2.32.4" diff --git a/tests/e2e_tests/test_snmp_provider.py b/tests/e2e_tests/test_snmp_provider.py new file mode 100644 index 0000000000..59cdaca257 --- /dev/null +++ b/tests/e2e_tests/test_snmp_provider.py @@ -0,0 +1,177 @@ +import re +from datetime import datetime + +from playwright.sync_api import Page, expect + +from tests.e2e_tests.utils import ( + assert_connected_provider_count, + assert_scope_text_count, + delete_provider, + init_e2e_test, + open_connected_provider, + save_failure_artifacts, + trigger_alert, +) + + +KEEP_UI_URL = "http://localhost:3000" +DEFAULT_SNMP_PORT = 1162 # Standard SNMP trap port + + +def open_snmp_card(browser): + """Open the SNMP provider card in the UI.""" + browser.get_by_placeholder("Filter providers...").click() + browser.get_by_placeholder("Filter providers...").clear() + browser.get_by_placeholder("Filter providers...").fill("SNMP") + browser.get_by_placeholder("Filter providers...").press("Enter") + browser.get_by_text("Available Providers").hover() + snmp_tile = browser.locator( + "button:has-text('SNMP'):not(:has-text('Connected')):not(:has-text('Linked'))" + ) + snmp_tile.first.hover() + snmp_tile.first.click() + + +def test_snmp_provider(browser: Page, setup_page_logging, failure_artifacts): + """End-to-end test for the SNMP Provider.""" + try: + provider_name = "playwright_test_snmp_" + datetime.now().strftime("%Y%m%d%H%M%S") + + # Initialize the test and navigate to the providers page + max_attempts = 3 + for attempt in range(max_attempts): + try: + init_e2e_test( + browser, + next_url="/signin?callbackUrl=http%3A%2F%2Flocalhost%3A3000%2Fproviders", + ) + # Give the page a moment to process redirects + browser.wait_for_timeout(500) + # Wait for navigation to complete + browser.wait_for_load_state("networkidle") + + base_url = "http://localhost:3000/providers" + url_pattern = re.compile(f"{re.escape(base_url)}(\\?.*)?$") + browser.wait_for_url(url_pattern) + print(f"Providers page loaded successfully. [try: {attempt + 1}]") + break + except Exception as e: + if attempt < max_attempts - 1: + print("Failed to load providers page. Retrying...") + continue + else: + raise e + + browser.get_by_role("link", name="Providers").hover() + browser.get_by_role("link", name="Providers").click() + browser.wait_for_timeout(5000) + + severity_mapping = '{"1.3.6.1.6.3.1.1.5.1": "INFO", "1.3.6.1.6.3.1.1.5.2": "WARNING", "1.3.6.1.6.3.1.1.5.3": "ERROR", "1.3.6.1.6.3.1.1.5.4": "WARNING", "1.3.6.1.6.3.1.1.5.5": "CRITICAL"}' + + # Open the SNMP provider configuration + open_snmp_card(browser) + + # Wait for UI to stabilize + browser.wait_for_timeout(3000) + + # Fill in the provider configuration + browser.get_by_placeholder("Enter provider name").fill(provider_name) + browser.get_by_placeholder("Enter listen_address").fill("0.0.0.0") + browser.get_by_placeholder("Enter port").fill(str(DEFAULT_SNMP_PORT)) + browser.get_by_placeholder("Enter community").fill("public") + browser.get_by_placeholder("Enter severity_mapping").fill(severity_mapping) + + # Wait for UI to stabilize and remove any validation overlays + browser.wait_for_timeout(2000) + + # Remove any overlays that appeared during form filling + browser.evaluate( + """() => { + const overlays = document.querySelectorAll('div[data-enter][data-closed][aria-hidden="true"], div[aria-hidden="true"], nextjs-portal'); + overlays.forEach(overlay => overlay.remove()); + }""" + ) + browser.wait_for_timeout(1000) + + # Connect the provider + browser.get_by_role("button", name="Connect", exact=True).click() + + print("Connected provider") + browser.reload() + + # Wait for the provider to be connected + expect( + browser.locator(f"button:has-text('SNMP'):has-text('Connected'):has-text('{provider_name}')") + ).to_be_visible(timeout=10000) + + # Wait for page to stabilize before proceeding + browser.wait_for_load_state("networkidle") + + # Open the connected provider to check scope validation + open_connected_provider( + browser=browser, + provider_type="SNMP", + provider_name=provider_name, + ) + + # Check that the receive_traps scope is valid + assert_scope_text_count(browser=browser, contains_text="Valid", count=1) + + # Close the provider details + browser.get_by_role("button", name="Cancel", exact=True).click() + + print("Simulating SNMP trap reception...") + + # Use the trigger_alert utility function to simulate an SNMP alert + # This follows the same pattern as other e2e tests + trigger_alert("snmp") + + # Wait for the alert to be processed + browser.wait_for_timeout(3000) + + # Navigate to the Feed page to check for alerts + browser.get_by_role("link", name="Feed").hover() + browser.get_by_role("link", name="Feed").click() + + # Check for the SNMP trap alert + max_attempts = 5 + for attempt in range(max_attempts): + print(f"Attempt {attempt + 1} to load alerts...") + browser.get_by_role("link", name="Feed").click() + + try: + # Wait for SNMP trap alert to appear + browser.wait_for_selector("text=SNMP Trap", timeout=5000) + print("SNMP Trap alert loaded successfully.") + break + except Exception: + if attempt < max_attempts - 1: + print("SNMP alert not loaded yet. Retrying...") + browser.reload() + else: + print("Failed to load SNMP alert after maximum attempts.") + raise Exception("Failed to load SNMP alert after maximum attempts.") + + # Clean up - delete the provider + browser.get_by_role("link", name="Providers").hover() + browser.get_by_role("link", name="Providers").click() + + delete_provider( + browser=browser, + provider_type="SNMP", + provider_name=provider_name, + ) + + # Assert provider was deleted + assert_connected_provider_count( + browser=browser, + provider_type="SNMP", + provider_name=provider_name, + provider_count=0, + ) + + except Exception as e: + print(f"Test failed with error: {str(e)}") + # Save artifacts for debugging + save_failure_artifacts(browser) + raise \ No newline at end of file diff --git a/tests/test_snmp_provider.py b/tests/test_snmp_provider.py new file mode 100644 index 0000000000..da13a2c700 --- /dev/null +++ b/tests/test_snmp_provider.py @@ -0,0 +1,381 @@ +""" +Unit tests for SNMP Provider +""" + +import json +import pytest +from unittest.mock import Mock, patch, MagicMock +from datetime import datetime + +from keep.contextmanager.contextmanager import ContextManager +from keep.providers.snmp_provider.snmp_provider import SnmpProvider, SnmpProviderAuthConfig +from keep.providers.models.provider_config import ProviderConfig +from keep.api.models.alert import AlertSeverity + + +class TestSnmpProvider: + """Test cases for SNMP Provider.""" + + @pytest.fixture + def context_manager(self): + """Create a mock context manager.""" + return ContextManager(tenant_id="test_tenant", workflow_id="test_workflow") + + @pytest.fixture + def snmp_config(self): + """Create a test SNMP configuration.""" + return ProviderConfig( + description="Test SNMP Provider", + authentication={ + "listen_address": "0.0.0.0", + "port": 1162, + "community": "public", + "severity_mapping": json.dumps({ + "1.3.6.1.6.3.1.1.5.1": "INFO", + "1.3.6.1.6.3.1.1.5.2": "WARNING", + "1.3.6.1.6.3.1.1.5.3": "ERROR", + "1.3.6.1.6.3.1.1.5.4": "CRITICAL" + }) + }, + ) + + @pytest.fixture + def snmp_provider(self, context_manager, snmp_config): + """Create an SNMP provider instance.""" + return SnmpProvider( + context_manager=context_manager, + provider_id="test_snmp_provider", + config=snmp_config, + ) + + def test_snmp_provider_initialization(self, snmp_provider): + """Test SNMP provider initialization.""" + assert snmp_provider.provider_id == "test_snmp_provider" + assert snmp_provider.authentication_config.listen_address == "0.0.0.0" + assert snmp_provider.authentication_config.port == 1162 + assert snmp_provider.authentication_config.community == "public" + assert not snmp_provider.running + assert snmp_provider._severity_mapping is not None + + def test_severity_mapping_parsing(self, snmp_provider): + """Test that severity mapping is correctly parsed.""" + expected_mapping = { + "1.3.6.1.6.3.1.1.5.1": "INFO", + "1.3.6.1.6.3.1.1.5.2": "WARNING", + "1.3.6.1.6.3.1.1.5.3": "ERROR", + "1.3.6.1.6.3.1.1.5.4": "CRITICAL" + } + assert snmp_provider._severity_mapping == expected_mapping + + def test_parse_severity(self, snmp_provider): + """Test severity parsing from string to AlertSeverity enum.""" + assert snmp_provider._parse_severity("INFO") == AlertSeverity.INFO + assert snmp_provider._parse_severity("WARNING") == AlertSeverity.WARNING + assert snmp_provider._parse_severity("ERROR") == AlertSeverity.HIGH + assert snmp_provider._parse_severity("CRITICAL") == AlertSeverity.CRITICAL + assert snmp_provider._parse_severity("UNKNOWN") == AlertSeverity.WARNING + + def test_determine_severity_with_mapping(self, snmp_provider): + """Test severity determination based on OID mapping.""" + oids = ["1.3.6.1.6.3.1.1.5.1", "1.3.6.1.2.1.1.1.0"] + data = {"1.3.6.1.6.3.1.1.5.1": "coldStart", "1.3.6.1.2.1.1.1.0": "Test Device"} + + severity = snmp_provider._determine_severity(oids, data) + assert severity == AlertSeverity.INFO + + def test_determine_severity_default(self, snmp_provider): + """Test default severity when no mapping matches.""" + oids = ["1.3.6.1.2.1.1.1.0"] + data = {"1.3.6.1.2.1.1.1.0": "Test Device"} + + severity = snmp_provider._determine_severity(oids, data) + assert severity == AlertSeverity.WARNING + + @patch('keep.providers.snmp_provider.snmp_provider.socket.socket') + def test_validate_scopes_success(self, mock_socket, snmp_provider): + """Test successful scope validation.""" + mock_socket_instance = MagicMock() + mock_socket.return_value = mock_socket_instance + + result = snmp_provider.validate_scopes() + assert result == {"receive_traps": True} + mock_socket_instance.bind.assert_called_once_with(("0.0.0.0", 1162)) + mock_socket_instance.close.assert_called_once() + + @patch('keep.providers.snmp_provider.snmp_provider.socket.socket') + def test_validate_scopes_failure(self, mock_socket, snmp_provider): + """Test scope validation failure when port is unavailable.""" + mock_socket_instance = MagicMock() + mock_socket.return_value = mock_socket_instance + mock_socket_instance.bind.side_effect = OSError("Address already in use") + + result = snmp_provider.validate_scopes() + assert "receive_traps" in result + assert "Failed to bind" in result["receive_traps"] + + @patch('keep.providers.snmp_provider.snmp_provider.engine.SnmpEngine') + @patch('keep.providers.snmp_provider.snmp_provider.config') + @patch('keep.providers.snmp_provider.snmp_provider.ntfrcv.NotificationReceiver') + @patch('keep.providers.snmp_provider.snmp_provider.asyncio.new_event_loop') + def test_handle_trap_processing(self, mock_loop, mock_ntfrcv, mock_config, mock_engine, snmp_provider): + """Test SNMP trap processing and alert creation.""" + # Mock the trap data + mock_oid1 = Mock() + mock_oid1.__str__ = lambda self: "1.3.6.1.6.3.1.1.5.1" + + mock_oid2 = Mock() + mock_oid2.__str__ = lambda self: "1.3.6.1.2.1.1.1.0" + + mock_val1 = Mock() + mock_val1.__class__.__name__ = "OctetString" + mock_val1.__str__ = lambda self: "coldStart" + + mock_val2 = Mock() + mock_val2.__class__.__name__ = "OctetString" + mock_val2.__str__ = lambda self: "Test Device" + + var_binds = [(mock_oid1, mock_val1), (mock_oid2, mock_val2)] + + # Mock _push_alert to capture the alert + with patch.object(snmp_provider, '_push_alert') as mock_push_alert: + snmp_provider._handle_trap( + snmp_engine=Mock(), + state_reference=Mock(), + context_engine_id=Mock(), + context_name=Mock(), + var_binds=var_binds, + cb_ctx=Mock() + ) + + # Verify that _push_alert was called + mock_push_alert.assert_called_once() + + # Get the alert that was pushed + alert = mock_push_alert.call_args[0][0] + + # Verify alert structure + assert alert["title"] == "SNMP Trap Received" + assert "SNMP Trap received with the following data:" in alert["description"] + assert alert["severity"] == AlertSeverity.INFO.value + assert alert["source"] == ["snmp"] + assert "1.3.6.1.6.3.1.1.5.1" in alert["fingerprint"] + assert "1.3.6.1.2.1.1.1.0" in alert["fingerprint"] + + # Verify raw data + raw_data = json.loads(alert["raw_data"]) + assert raw_data["1.3.6.1.6.3.1.1.5.1"] == "coldStart" + assert raw_data["1.3.6.1.2.1.1.1.0"] == "Test Device" + + def test_handle_trap_with_critical_severity(self, snmp_provider): + """Test trap processing with critical severity mapping.""" + # Mock trap data for critical alert + mock_oid1 = Mock() + mock_oid1.__str__ = lambda self: "1.3.6.1.6.3.1.1.5.4" # Maps to CRITICAL + + mock_val1 = Mock() + mock_val1.__class__.__name__ = "OctetString" + mock_val1.__str__ = lambda self: "authenticationFailure" + + var_binds = [(mock_oid1, mock_val1)] + + with patch.object(snmp_provider, '_push_alert') as mock_push_alert: + snmp_provider._handle_trap( + snmp_engine=Mock(), + state_reference=Mock(), + context_engine_id=Mock(), + context_name=Mock(), + var_binds=var_binds, + cb_ctx=Mock() + ) + + alert = mock_push_alert.call_args[0][0] + assert alert["severity"] == AlertSeverity.CRITICAL.value + + def test_handle_trap_error_handling(self, snmp_provider): + """Test error handling in trap processing.""" + # Create a var_binds that will cause an error + mock_oid = Mock() + mock_oid.__str__ = Mock(side_effect=Exception("OID parsing error")) + + mock_val = Mock() + var_binds = [(mock_oid, mock_val)] + + # Should not raise exception, should log error instead + with patch.object(snmp_provider.logger, 'error') as mock_logger: + snmp_provider._handle_trap( + snmp_engine=Mock(), + state_reference=Mock(), + context_engine_id=Mock(), + context_name=Mock(), + var_binds=var_binds, + cb_ctx=Mock() + ) + + # Verify error was logged + mock_logger.assert_called() + + def test_get_logs(self, snmp_provider): + """Test log retrieval functionality.""" + logs = snmp_provider.get_logs(limit=10) + + assert isinstance(logs, list) + assert len(logs) >= 2 # Should have debug info and status logs + + # Check for expected log entries + log_messages = [log["message"] for log in logs] + assert any("SNMP Provider Debug Information" in msg for msg in log_messages) + assert any("SNMP trap receiver status" in msg for msg in log_messages) + + def test_debug_info(self, snmp_provider): + """Test debug information generation.""" + debug_info = snmp_provider.debug_info() + + assert "provider_id" in debug_info + assert "running" in debug_info + assert "configuration" in debug_info + assert "port_test" in debug_info + assert "snmp_engine" in debug_info + + assert debug_info["provider_id"] == "test_snmp_provider" + assert debug_info["configuration"]["listen_address"] == "0.0.0.0" + assert debug_info["configuration"]["port"] == 1162 + + def test_invalid_severity_mapping(self, context_manager): + """Test handling of invalid severity mapping JSON.""" + config = ProviderConfig( + description="Test SNMP Provider", + authentication={ + "listen_address": "0.0.0.0", + "port": 1162, + "community": "public", + "severity_mapping": "invalid json" + }, + ) + + # Create provider with invalid JSON - it should handle the error gracefully + provider = SnmpProvider( + context_manager=context_manager, + provider_id="test_snmp_provider", + config=config, + ) + + # Should have empty severity mapping due to JSON error + assert provider._severity_mapping == {} + + def test_query_method(self, snmp_provider): + """Test that query method returns None and logs warning.""" + with patch.object(snmp_provider.logger, 'warning') as mock_logger: + result = snmp_provider._query() + assert result is None + mock_logger.assert_called_with("SNMP provider does not support querying") + + def test_notify_method(self, snmp_provider): + """Test that notify method returns None and logs warning.""" + with patch.object(snmp_provider.logger, 'warning') as mock_logger: + result = snmp_provider._notify() + assert result is None + mock_logger.assert_called_with("SNMP provider is a receiver and does not support direct notification") + + def test_is_consumer_property(self, snmp_provider): + """Test that provider is marked as a consumer.""" + assert snmp_provider.is_consumer is True + + def test_status_method(self, snmp_provider): + """Test status method returns running state.""" + assert snmp_provider.status() is False + + snmp_provider.running = True + assert snmp_provider.status() is True + + def test_get_alert_schema(self): + """Test alert schema structure.""" + schema = SnmpProvider.get_alert_schema() + + assert schema["type"] == "object" + assert "properties" in schema + assert "title" in schema["properties"] + assert "description" in schema["properties"] + assert "severity" in schema["properties"] + assert "source" in schema["properties"] + assert "raw_data" in schema["properties"] + + # Check severity enum values + severity_enum = schema["properties"]["severity"]["enum"] + assert "info" in severity_enum + assert "warning" in severity_enum + assert "high" in severity_enum # "error" maps to "high" in Keep + assert "critical" in severity_enum + + @patch('keep.providers.snmp_provider.snmp_provider.socket.socket') + def test_dispose_cleanup(self, mock_socket, snmp_provider): + """Test proper cleanup when disposing provider.""" + # Set up mock SNMP engine + mock_engine = Mock() + mock_transport_dispatcher = Mock() + mock_engine.transportDispatcher = mock_transport_dispatcher + snmp_provider.snmp_engine = mock_engine + snmp_provider.running = True + + # Set up mock thread + mock_thread = Mock() + mock_thread.is_alive.return_value = True + snmp_provider.trap_thread = mock_thread + + # Call dispose + snmp_provider.dispose() + + # Verify cleanup + assert snmp_provider.running is False + assert snmp_provider.snmp_engine is None + mock_transport_dispatcher.jobFinished.assert_called_once_with(1) + mock_transport_dispatcher.closeDispatcher.assert_called_once() + mock_thread.join.assert_called_once_with(timeout=5.0) + + def test_dispose_when_not_running(self, snmp_provider): + """Test dispose when provider is not running.""" + snmp_provider.running = False + + # Should return early without doing anything + snmp_provider.dispose() + + # Verify state unchanged + assert snmp_provider.running is False + assert snmp_provider.snmp_engine is None + + def test_format_alert(self): + """Test the format_alert static method.""" + event = { + "title": "SNMP Trap: coldStart", + "description": "Test SNMP trap", + "severity": "warning", + "fingerprint": "test-fingerprint", + "source": ["snmp"], + "labels": {"device": "test-device"}, + "raw_data": '{"test": "data"}', + "created_at": "2025-09-22T16:00:00.000000" + } + + formatted = SnmpProvider.format_alert(event) + + assert formatted["title"] == "SNMP Trap: coldStart" + assert formatted["description"] == "Test SNMP trap" + assert formatted["severity"] == "warning" + assert formatted["fingerprint"] == "test-fingerprint" + assert formatted["source"] == ["snmp"] + assert formatted["labels"] == {"device": "test-device"} + assert formatted["raw_data"] == '{"test": "data"}' + assert formatted["created_at"] == "2025-09-22T16:00:00.000000" + + def test_format_alert_with_defaults(self): + """Test format_alert with minimal event data.""" + event = {} + + formatted = SnmpProvider.format_alert(event) + + assert formatted["title"] == "SNMP Trap Received" + assert formatted["description"] == "SNMP trap received" + assert formatted["severity"] == "warning" + assert formatted["source"] == ["snmp"] + assert formatted["labels"] == {} + assert "snmp-" in formatted["fingerprint"] + assert "created_at" in formatted