|
| 1 | +""" |
| 2 | +Copyright (c) 2025, Oracle and/or its affiliates. |
| 3 | +Licensed under the Universal Permissive License v1.0 as shown at |
| 4 | +https://oss.oracle.com/licenses/upl. |
| 5 | +""" |
| 6 | + |
| 7 | +import os |
| 8 | +from datetime import datetime, timedelta, timezone |
| 9 | +from logging import Logger |
| 10 | +from typing import Annotated, Any, Dict, List, Optional |
| 11 | + |
| 12 | +import oci |
| 13 | +from fastmcp import FastMCP |
| 14 | +from oci.cloud_guard import CloudGuardClient |
| 15 | + |
| 16 | +from . import __project__, __version__ |
| 17 | + |
| 18 | +logger = Logger(__name__, level="INFO") |
| 19 | + |
| 20 | +mcp = FastMCP(name=__project__) |
| 21 | + |
| 22 | + |
| 23 | +def get_cloud_guard_client(): |
| 24 | + config = oci.config.from_file( |
| 25 | + profile_name=os.getenv("OCI_CONFIG_PROFILE", oci.config.DEFAULT_PROFILE) |
| 26 | + ) |
| 27 | + |
| 28 | + user_agent_name = __project__.split("oracle.", 1)[1].split("-server", 1)[0] |
| 29 | + config["additional_user_agent"] = f"{user_agent_name}/{__version__}" |
| 30 | + |
| 31 | + private_key = oci.signer.load_private_key_from_file(config["key_file"]) |
| 32 | + token_file = config["security_token_file"] |
| 33 | + token = None |
| 34 | + with open(token_file, "r") as f: |
| 35 | + token = f.read() |
| 36 | + signer = oci.auth.signers.SecurityTokenSigner(token, private_key) |
| 37 | + return CloudGuardClient(config, signer=signer) |
| 38 | + |
| 39 | + |
| 40 | +@mcp.tool( |
| 41 | + name="list_problems", |
| 42 | + description="Returns a list of all Problems identified by Cloud Guard.", |
| 43 | +) |
| 44 | +def list_problems( |
| 45 | + compartment_id: Annotated[ |
| 46 | + Optional[str], "The OCID of the compartment in which to list resources." |
| 47 | + ] = None, |
| 48 | + risk_level: Annotated[Optional[str], "Risk level of the problem"] = None, |
| 49 | + lifecycle_state: Annotated[ |
| 50 | + Optional[str], |
| 51 | + "The field lifecycle state. Only one state can be provided. Default value for state is active.", |
| 52 | + ] = "ACTIVE", |
| 53 | + detector_rule_ids: Annotated[ |
| 54 | + Optional[list[str]], |
| 55 | + "Comma seperated list of detector rule IDs to be passed in to match against Problems.", |
| 56 | + ] = None, |
| 57 | + time_range_days: Annotated[ |
| 58 | + Optional[int], "Number of days to look back for problems" |
| 59 | + ] = 30, |
| 60 | + limit: Annotated[int, "The number of problems to return"] = 10, |
| 61 | +) -> List[Dict[str, Any]]: |
| 62 | + time_filter = ( |
| 63 | + datetime.now(timezone.utc) - timedelta(days=time_range_days) |
| 64 | + ).isoformat() |
| 65 | + |
| 66 | + kwargs = { |
| 67 | + "compartment_id": compartment_id, |
| 68 | + "time_last_detected_greater_than_or_equal_to": time_filter, |
| 69 | + "limit": limit, |
| 70 | + } |
| 71 | + |
| 72 | + if risk_level: |
| 73 | + kwargs["risk_level"] = risk_level |
| 74 | + if lifecycle_state: |
| 75 | + kwargs["lifecycle_state"] = lifecycle_state |
| 76 | + if detector_rule_ids: |
| 77 | + kwargs["detector_rule_id_list"] = detector_rule_ids |
| 78 | + |
| 79 | + response = get_cloud_guard_client().list_problems(**kwargs).data.items |
| 80 | + return [ |
| 81 | + { |
| 82 | + "id": problem.id, |
| 83 | + "status": problem.lifecycle_detail, |
| 84 | + "detector_rule_id": problem.detector_rule_id, |
| 85 | + "risk_level": problem.risk_level, |
| 86 | + "risk_score": problem.risk_score, |
| 87 | + "resource_name": problem.resource_name, |
| 88 | + "resource_type": problem.resource_type, |
| 89 | + "lifecycle_state": problem.lifecycle_state, |
| 90 | + "time_first_detected": ( |
| 91 | + problem.time_first_detected.isoformat() |
| 92 | + if problem.time_first_detected |
| 93 | + else None |
| 94 | + ), |
| 95 | + "time_last_detected": ( |
| 96 | + problem.time_last_detected.isoformat() |
| 97 | + if problem.time_last_detected |
| 98 | + else None |
| 99 | + ), |
| 100 | + "region": problem.region, |
| 101 | + "labels": problem.labels, |
| 102 | + "compartment_id": problem.compartment_id, |
| 103 | + } |
| 104 | + for problem in response |
| 105 | + ] |
| 106 | + |
| 107 | + |
| 108 | +@mcp.tool( |
| 109 | + name="get_problem_details", |
| 110 | + description="Get the details for a Problem identified by problemId.", |
| 111 | +) |
| 112 | +def get_problem_details( |
| 113 | + problem_id: Annotated[str, "The OCID of the problem"], |
| 114 | +): |
| 115 | + response = get_cloud_guard_client().get_problem(problem_id=problem_id) |
| 116 | + problem = response.data |
| 117 | + |
| 118 | + return { |
| 119 | + "id": problem.id, |
| 120 | + "detector_rule_id": problem.detector_rule_id, |
| 121 | + "detector_id": problem.detector_id, |
| 122 | + "risk_level": problem.risk_level, |
| 123 | + "risk_score": problem.risk_score, |
| 124 | + "resource_id": problem.resource_id, |
| 125 | + "resource_name": problem.resource_name, |
| 126 | + "resource_type": problem.resource_type, |
| 127 | + "lifecycle_state": problem.lifecycle_state, |
| 128 | + "lifecycle_detail": problem.lifecycle_detail, |
| 129 | + "time_first_detected": ( |
| 130 | + problem.time_first_detected.isoformat() |
| 131 | + if problem.time_first_detected |
| 132 | + else None |
| 133 | + ), |
| 134 | + "time_last_detected": ( |
| 135 | + problem.time_last_detected.isoformat() |
| 136 | + if problem.time_last_detected |
| 137 | + else None |
| 138 | + ), |
| 139 | + "description": problem.description, |
| 140 | + "recommendation": problem.recommendation, |
| 141 | + "additional_details": problem.additional_details, |
| 142 | + "comment": problem.comment, |
| 143 | + } |
| 144 | + |
| 145 | + |
| 146 | +@mcp.tool( |
| 147 | + name="update_problem_status", |
| 148 | + description="Changes the current status of the problem, identified by problemId, to the status " |
| 149 | + "specified in the UpdateProblemStatusDetails resource that you pass.", |
| 150 | +) |
| 151 | +def update_problem_status( |
| 152 | + problem_id: Annotated[str, "OCID of the problem"], |
| 153 | + status: Annotated[ |
| 154 | + str, |
| 155 | + "Action taken by user. Allowed values are: OPEN, RESOLVED, DISMISSED, CLOSED", |
| 156 | + ], |
| 157 | + comment: Annotated[Optional[str], "A comment from the user"] = None, |
| 158 | +): |
| 159 | + updated_problem_status = oci.cloud_guard.models.UpdateProblemStatusDetails( |
| 160 | + status=status, comment=comment |
| 161 | + ) |
| 162 | + response = get_cloud_guard_client().update_problem_status( |
| 163 | + problem_id=problem_id, |
| 164 | + update_problem_status_details=updated_problem_status, |
| 165 | + ) |
| 166 | + problem = response.data |
| 167 | + |
| 168 | + return { |
| 169 | + "id": problem.id, |
| 170 | + "detector_rule_id": problem.detector_rule_id, |
| 171 | + "detector_id": problem.detector_id, |
| 172 | + "risk_level": problem.risk_level, |
| 173 | + "risk_score": problem.risk_score, |
| 174 | + "resource_id": problem.resource_id, |
| 175 | + "resource_name": problem.resource_name, |
| 176 | + "resource_type": problem.resource_type, |
| 177 | + "lifecycle_state": problem.lifecycle_state, |
| 178 | + "lifecycle_detail": problem.lifecycle_detail, |
| 179 | + "time_first_detected": ( |
| 180 | + problem.time_first_detected.isoformat() |
| 181 | + if problem.time_first_detected |
| 182 | + else None |
| 183 | + ), |
| 184 | + "time_last_detected": ( |
| 185 | + problem.time_last_detected.isoformat() |
| 186 | + if problem.time_last_detected |
| 187 | + else None |
| 188 | + ), |
| 189 | + "description": problem.description, |
| 190 | + "recommendation": problem.recommendation, |
| 191 | + "additional_details": problem.additional_details, |
| 192 | + "comment": problem.comment, |
| 193 | + } |
| 194 | + |
| 195 | + |
| 196 | +def main(): |
| 197 | + mcp.run() |
| 198 | + |
| 199 | + |
| 200 | +if __name__ == "__main__": |
| 201 | + main() |
0 commit comments