diff --git a/contentctl/actions/new_content.py b/contentctl/actions/new_content.py index 0a54cf11..0d3ffc4a 100644 --- a/contentctl/actions/new_content.py +++ b/contentctl/actions/new_content.py @@ -1,5 +1,3 @@ - - from dataclasses import dataclass import questionary from typing import Any @@ -11,67 +9,108 @@ import pathlib from contentctl.objects.abstract_security_content_objects.security_content_object_abstract import SecurityContentObject_Abstract from contentctl.output.yml_writer import YmlWriter - +from contentctl.objects.enums import AssetType +from contentctl.objects.constants import SES_OBSERVABLE_TYPE_MAPPING, SES_OBSERVABLE_ROLE_MAPPING class NewContent: + UPDATE_PREFIX = "__UPDATE__" + + DEFAULT_DRILLDOWN_DEF = [ + { + "name": f'View the detection results for - "${UPDATE_PREFIX}FIRST_RISK_OBJECT$" and "${UPDATE_PREFIX}SECOND_RISK_OBJECT$"', + "search": f'%original_detection_search% | search "${UPDATE_PREFIX}FIRST_RISK_OBJECT = "${UPDATE_PREFIX}FIRST_RISK_OBJECT$" second_observable_type_here = "${UPDATE_PREFIX}SECOND_RISK_OBJECT$"', + "earliest_offset": '$info_min_time$', + "latest_offset": '$info_max_time$' + }, + { + "name": f'View risk events for the last 7 days for - "${UPDATE_PREFIX}FIRST_RISK_OBJECT$" and "${UPDATE_PREFIX}SECOND_RISK_OBJECT$"', + "search": f'| from datamodel Risk.All_Risk | search normalized_risk_object IN ("${UPDATE_PREFIX}FIRST_RISK_OBJECT$", "${UPDATE_PREFIX}SECOND_RISK_OBJECT$") starthoursago=168 | stats count min(_time) as firstTime max(_time) as lastTime values(search_name) as "Search Name" values(risk_message) as "Risk Message" values(analyticstories) as "Analytic Stories" values(annotations._all) as "Annotations" values(annotations.mitre_attack.mitre_tactic) as "ATT&CK Tactics" by normalized_risk_object | `security_content_ctime(firstTime)` | `security_content_ctime(lastTime)`', + "earliest_offset": '$info_min_time$', + "latest_offset": '$info_max_time$' + } + ] + - def buildDetection(self)->dict[str,Any]: + def buildDetection(self) -> tuple[dict[str, Any], str]: questions = NewContentQuestions.get_questions_detection() - answers: dict[str,str] = questionary.prompt( - questions, - kbi_msg="User did not answer all of the prompt questions. Exiting...") + answers: dict[str, str] = questionary.prompt( + questions, + kbi_msg="User did not answer all of the prompt questions. Exiting...", + ) if not answers: raise ValueError("User didn't answer one or more questions!") - answers.update(answers) - answers['name'] = answers['detection_name'] - del answers['detection_name'] - answers['id'] = str(uuid.uuid4()) - answers['version'] = 1 - answers['date'] = datetime.today().strftime('%Y-%m-%d') - answers['author'] = answers['detection_author'] - del answers['detection_author'] - answers['data_source'] = answers['data_source'] - answers['type'] = answers['detection_type'] - del answers['detection_type'] - answers['status'] = "production" #start everything as production since that's what we INTEND the content to become - answers['description'] = 'UPDATE_DESCRIPTION' - file_name = answers['name'].replace(' ', '_').replace('-','_').replace('.','_').replace('/','_').lower() - answers['search'] = answers['detection_search'] + ' | `' + file_name + '_filter`' - del answers['detection_search'] - answers['how_to_implement'] = 'UPDATE_HOW_TO_IMPLEMENT' - answers['known_false_positives'] = 'UPDATE_KNOWN_FALSE_POSITIVES' - answers['references'] = ['REFERENCE'] - answers['tags'] = dict() - answers['tags']['analytic_story'] = ['UPDATE_STORY_NAME'] - answers['tags']['asset_type'] = 'UPDATE asset_type' - answers['tags']['confidence'] = 'UPDATE value between 1-100' - answers['tags']['impact'] = 'UPDATE value between 1-100' - answers['tags']['message'] = 'UPDATE message' - answers['tags']['mitre_attack_id'] = [x.strip() for x in answers['mitre_attack_ids'].split(',')] - answers['tags']['observable'] = [{'name': 'UPDATE', 'type': 'UPDATE', 'role': ['UPDATE']}] - answers['tags']['product'] = ['Splunk Enterprise','Splunk Enterprise Security','Splunk Cloud'] - answers['tags']['required_fields'] = ['UPDATE'] - answers['tags']['risk_score'] = 'UPDATE (impact * confidence)/100' - answers['tags']['security_domain'] = answers['security_domain'] - del answers["security_domain"] - answers['tags']['cve'] = ['UPDATE WITH CVE(S) IF APPLICABLE'] - - #generate the tests section - answers['tests'] = [ - { - 'name': "True Positive Test", - 'attack_data': [ - { - 'data': "https://github.com/splunk/contentctl/wiki", - "sourcetype": "UPDATE SOURCETYPE", - "source": "UPDATE SOURCE" - } - ] - } - ] - del answers["mitre_attack_ids"] - return answers - def buildStory(self)->dict[str,Any]: + data_source_field = ( + answers["data_source"] if len(answers["data_source"]) > 0 else [f"{NewContent.UPDATE_PREFIX} zero or more data_sources"] + ) + file_name = ( + answers["detection_name"] + .replace(" ", "_") + .replace("-", "_") + .replace(".", "_") + .replace("/", "_") + .lower() + ) + + #Minimum lenght for a mitre tactic is 5 characters: T1000 + if len(answers["mitre_attack_ids"]) >= 5: + mitre_attack_ids = [x.strip() for x in answers["mitre_attack_ids"].split(",")] + else: + #string was too short, so just put a placeholder + mitre_attack_ids = [f"{NewContent.UPDATE_PREFIX} zero or more mitre_attack_ids"] + + output_file_answers: dict[str, Any] = { + "name": answers["detection_name"], + "id": str(uuid.uuid4()), + "version": 1, + "date": datetime.today().strftime("%Y-%m-%d"), + "author": answers["detection_author"], + "status": "production", # start everything as production since that's what we INTEND the content to become + "type": answers["detection_type"], + "description": f"{NewContent.UPDATE_PREFIX} by providing a description of your search", + "data_source": data_source_field, + "search": f"{answers['detection_search']} | `{file_name}_filter`", + "how_to_implement": f"{NewContent.UPDATE_PREFIX} how to implement your search", + "known_false_positives": f"{NewContent.UPDATE_PREFIX} known false positives for your search", + "references": [f"{NewContent.UPDATE_PREFIX} zero or more http references to provide more information about your search"], + "drilldown_searches": NewContent.DEFAULT_DRILLDOWN_DEF, + "tags": { + "analytic_story": [f"{NewContent.UPDATE_PREFIX} by providing zero or more analytic stories"], + "asset_type": f"{NewContent.UPDATE_PREFIX} by providing and asset type from {list(AssetType._value2member_map_)}", + "confidence": f"{NewContent.UPDATE_PREFIX} by providing a value between 1-100", + "impact": f"{NewContent.UPDATE_PREFIX} by providing a value between 1-100", + "message": f"{NewContent.UPDATE_PREFIX} by providing a risk message. Fields in your search results can be referenced using $fieldName$", + "mitre_attack_id": mitre_attack_ids, + "observable": [ + {"name": f"{NewContent.UPDATE_PREFIX} the field name of the observable. This is a field that exists in your search results.", "type": f"{NewContent.UPDATE_PREFIX} the type of your observable from the list {list(SES_OBSERVABLE_TYPE_MAPPING.keys())}.", "role": [f"{NewContent.UPDATE_PREFIX} the role from the list {list(SES_OBSERVABLE_ROLE_MAPPING.keys())}"]} + ], + "product": [ + "Splunk Enterprise", + "Splunk Enterprise Security", + "Splunk Cloud", + ], + "security_domain": answers["security_domain"], + "cve": [f"{NewContent.UPDATE_PREFIX} with CVE(s) if applicable"], + }, + "tests": [ + { + "name": "True Positive Test", + "attack_data": [ + { + "data": f"{NewContent.UPDATE_PREFIX} the data file to replay. Go to https://github.com/splunk/contentctl/wiki for information about the format of this field", + "sourcetype": f"{NewContent.UPDATE_PREFIX} the sourcetype of your data file.", + "source": f"{NewContent.UPDATE_PREFIX} the source of your datafile", + } + ], + } + ], + } + + if answers["detection_type"] not in ["TTP", "Anomaly", "Correlation"]: + del output_file_answers["drilldown_searches"] + + return output_file_answers, answers['detection_kind'] + + def buildStory(self) -> dict[str, Any]: questions = NewContentQuestions.get_questions_story() answers = questionary.prompt( questions, @@ -96,12 +135,11 @@ def buildStory(self)->dict[str,Any]: del answers['usecase'] answers['tags']['cve'] = ['UPDATE WITH CVE(S) IF APPLICABLE'] return answers - def execute(self, input_dto: new) -> None: if input_dto.type == NewContentType.detection: - content_dict = self.buildDetection() - subdirectory = pathlib.Path('detections') / content_dict.pop('detection_kind') + content_dict, detection_kind = self.buildDetection() + subdirectory = pathlib.Path('detections') / detection_kind elif input_dto.type == NewContentType.story: content_dict = self.buildStory() subdirectory = pathlib.Path('stories') @@ -111,23 +149,20 @@ def execute(self, input_dto: new) -> None: full_output_path = input_dto.path / subdirectory / SecurityContentObject_Abstract.contentNameToFileName(content_dict.get('name')) YmlWriter.writeYmlFile(str(full_output_path), content_dict) - - def writeObjectNewContent(self, object: dict, subdirectory_name: str, type: NewContentType) -> None: if type == NewContentType.detection: file_path = os.path.join(self.output_path, 'detections', subdirectory_name, self.convertNameToFileName(object['name'], object['tags']['product'])) output_folder = pathlib.Path(self.output_path)/'detections'/subdirectory_name - #make sure the output folder exists for this detection + # make sure the output folder exists for this detection output_folder.mkdir(exist_ok=True) YmlWriter.writeDetection(file_path, object) print("Successfully created detection " + file_path) - + elif type == NewContentType.story: file_path = os.path.join(self.output_path, 'stories', self.convertNameToFileName(object['name'], object['tags']['product'])) YmlWriter.writeStory(file_path, object) print("Successfully created story " + file_path) - + else: raise(Exception(f"Object Must be Story or Detection, but is not: {object}")) - diff --git a/contentctl/contentctl.py b/contentctl/contentctl.py index efef5853..1bb21174 100644 --- a/contentctl/contentctl.py +++ b/contentctl/contentctl.py @@ -154,7 +154,7 @@ def main(): else: #The file exists, so load it up! - config_obj = YmlReader().load_file(configFile) + config_obj = YmlReader().load_file(configFile,add_fields=False) t = test.model_validate(config_obj) except Exception as e: print(f"Error validating 'contentctl.yml':\n{str(e)}") diff --git a/contentctl/helper/utils.py b/contentctl/helper/utils.py index 261ecb64..e0649f2d 100644 --- a/contentctl/helper/utils.py +++ b/contentctl/helper/utils.py @@ -247,20 +247,6 @@ def validate_git_pull_request(repo_path: str, pr_number: int) -> str: return hash - # @staticmethod - # def check_required_fields( - # thisField: str, definedFields: dict, requiredFields: list[str] - # ): - # missing_fields = [ - # field for field in requiredFields if field not in definedFields - # ] - # if len(missing_fields) > 0: - # raise ( - # ValueError( - # f"Could not validate - please resolve other errors resulting in missing fields {missing_fields}" - # ) - # ) - @staticmethod def verify_file_exists( file_path: str, verbose_print=False, timeout_seconds: int = 10 diff --git a/contentctl/input/new_content_questions.py b/contentctl/input/new_content_questions.py index 02b20f46..dbc47cdd 100644 --- a/contentctl/input/new_content_questions.py +++ b/contentctl/input/new_content_questions.py @@ -57,7 +57,7 @@ def get_questions_detection(cls) -> list[dict[str,Any]]: "type": "text", "message": "enter search (spl)", "name": "detection_search", - "default": "| UPDATE_SPL", + "default": "| __UPDATE__ SPL", }, { "type": "text", diff --git a/contentctl/input/yml_reader.py b/contentctl/input/yml_reader.py index 11bea479..49dfb812 100644 --- a/contentctl/input/yml_reader.py +++ b/contentctl/input/yml_reader.py @@ -1,15 +1,12 @@ from typing import Dict, Any - import yaml - - import sys import pathlib class YmlReader(): @staticmethod - def load_file(file_path: pathlib.Path, add_fields=True, STRICT_YML_CHECKING=False) -> Dict[str,Any]: + def load_file(file_path: pathlib.Path, add_fields:bool=True, STRICT_YML_CHECKING:bool=False) -> Dict[str,Any]: try: file_handler = open(file_path, 'r', encoding="utf-8") @@ -27,8 +24,16 @@ def load_file(file_path: pathlib.Path, add_fields=True, STRICT_YML_CHECKING=Fals print(f"Error loading YML file {file_path}: {str(e)}") sys.exit(1) try: - #yml_obj = list(yaml.safe_load_all(file_handler))[0] - yml_obj = yaml.load(file_handler, Loader=yaml.CSafeLoader) + #Ideally we should use + # from contentctl.actions.new_content import NewContent + # and use NewContent.UPDATE_PREFIX, + # but there is a circular dependency right now which makes that difficult. + # We have instead hardcoded UPDATE_PREFIX + UPDATE_PREFIX = "__UPDATE__" + data = file_handler.read() + if UPDATE_PREFIX in data: + raise Exception(f"The file {file_path} contains the value '{UPDATE_PREFIX}'. Please fill out any unpopulated fields as required.") + yml_obj = yaml.load(data, Loader=yaml.CSafeLoader) except yaml.YAMLError as exc: print(exc) sys.exit(1) diff --git a/contentctl/objects/abstract_security_content_objects/security_content_object_abstract.py b/contentctl/objects/abstract_security_content_objects/security_content_object_abstract.py index f93602f1..af1b3674 100644 --- a/contentctl/objects/abstract_security_content_objects/security_content_object_abstract.py +++ b/contentctl/objects/abstract_security_content_objects/security_content_object_abstract.py @@ -33,8 +33,7 @@ # TODO (#266): disable the use_enum_values configuration class SecurityContentObject_Abstract(BaseModel, abc.ABC): - model_config = ConfigDict(use_enum_values=True,validate_default=True) - + model_config = ConfigDict(use_enum_values=True,validate_default=True,extra="forbid") name: str = Field(...,max_length=99) author: str = Field(...,max_length=255) date: datetime.date = Field(...) diff --git a/contentctl/objects/alert_action.py b/contentctl/objects/alert_action.py index f2f745d4..d2855292 100644 --- a/contentctl/objects/alert_action.py +++ b/contentctl/objects/alert_action.py @@ -1,5 +1,5 @@ from __future__ import annotations -from pydantic import BaseModel, model_serializer +from pydantic import BaseModel, model_serializer, ConfigDict from typing import Optional from contentctl.objects.deployment_email import DeploymentEmail @@ -9,6 +9,7 @@ from contentctl.objects.deployment_phantom import DeploymentPhantom class AlertAction(BaseModel): + model_config = ConfigDict(extra="forbid") email: Optional[DeploymentEmail] = None notable: Optional[DeploymentNotable] = None rba: Optional[DeploymentRBA] = DeploymentRBA() diff --git a/contentctl/objects/atomic.py b/contentctl/objects/atomic.py index a723304d..7e79227c 100644 --- a/contentctl/objects/atomic.py +++ b/contentctl/objects/atomic.py @@ -41,6 +41,7 @@ class InputArgumentType(StrEnum): Url = "Url" class AtomicExecutor(BaseModel): + model_config = ConfigDict(extra="forbid") name: str elevation_required: Optional[bool] = False #Appears to be optional command: Optional[str] = None diff --git a/contentctl/objects/base_test.py b/contentctl/objects/base_test.py index 20e681cf..8a377dfc 100644 --- a/contentctl/objects/base_test.py +++ b/contentctl/objects/base_test.py @@ -2,7 +2,7 @@ from typing import Union from abc import ABC, abstractmethod -from pydantic import BaseModel +from pydantic import BaseModel,ConfigDict from contentctl.objects.base_test_result import BaseTestResult @@ -21,6 +21,7 @@ def __str__(self) -> str: # TODO (#224): enforce distinct test names w/in detections class BaseTest(BaseModel, ABC): + model_config = ConfigDict(extra="forbid") """ A test case for a detection """ diff --git a/contentctl/objects/baseline.py b/contentctl/objects/baseline.py index 5dc59d8f..a41acbb4 100644 --- a/contentctl/objects/baseline.py +++ b/contentctl/objects/baseline.py @@ -1,7 +1,7 @@ from __future__ import annotations -from typing import Annotated, Optional, List,Any -from pydantic import field_validator, ValidationInfo, Field, model_serializer +from typing import Annotated, List,Any +from pydantic import field_validator, ValidationInfo, Field, model_serializer, computed_field from contentctl.objects.deployment import Deployment from contentctl.objects.security_content_object import SecurityContentObject from contentctl.objects.enums import DataModel @@ -15,7 +15,6 @@ class Baseline(SecurityContentObject): name:str = Field(...,max_length=CONTENTCTL_MAX_SEARCH_NAME_LENGTH) type: Annotated[str,Field(pattern="^Baseline$")] = Field(...) - datamodel: Optional[List[DataModel]] = None search: str = Field(..., min_length=4) how_to_implement: str = Field(..., min_length=4) known_false_positives: str = Field(..., min_length=4) @@ -34,6 +33,10 @@ def get_conf_stanza_name(self, app:CustomApp)->str: def getDeployment(cls, v:Any, info:ValidationInfo)->Deployment: return Deployment.getDeployment(v,info) + @computed_field + @property + def datamodel(self) -> List[DataModel]: + return [dm for dm in DataModel if dm.value in self.search] @model_serializer def serialize_model(self): diff --git a/contentctl/objects/baseline_tags.py b/contentctl/objects/baseline_tags.py index ea979664..db5f8048 100644 --- a/contentctl/objects/baseline_tags.py +++ b/contentctl/objects/baseline_tags.py @@ -1,5 +1,5 @@ from __future__ import annotations -from pydantic import BaseModel, Field, field_validator, ValidationInfo, model_serializer +from pydantic import BaseModel, Field, field_validator, ValidationInfo, model_serializer, ConfigDict from typing import List, Any, Union from contentctl.objects.story import Story @@ -12,12 +12,12 @@ class BaselineTags(BaseModel): + model_config = ConfigDict(extra="forbid") analytic_story: list[Story] = Field(...) #deployment: Deployment = Field('SET_IN_GET_DEPLOYMENT_FUNCTION') # TODO (#223): can we remove str from the possible types here? detections: List[Union[Detection,str]] = Field(...) product: List[SecurityContentProductName] = Field(...,min_length=1) - required_fields: List[str] = Field(...,min_length=1) security_domain: SecurityDomain = Field(...) @@ -33,7 +33,6 @@ def serialize_model(self): "analytic_story": [story.name for story in self.analytic_story], "detections": [detection.name for detection in self.detections if isinstance(detection,Detection)], "product": self.product, - "required_fields":self.required_fields, "security_domain":self.security_domain, "deployments": None } diff --git a/contentctl/objects/config.py b/contentctl/objects/config.py index 659d1113..c41b93ea 100644 --- a/contentctl/objects/config.py +++ b/contentctl/objects/config.py @@ -35,7 +35,7 @@ # TODO (#266): disable the use_enum_values configuration class App_Base(BaseModel,ABC): - model_config = ConfigDict(use_enum_values=True,validate_default=True, arbitrary_types_allowed=True) + model_config = ConfigDict(use_enum_values=True,validate_default=True, arbitrary_types_allowed=True, extra='forbid') uid: Optional[int] = Field(default=None) title: str = Field(description="Human-readable name used by the app. This can have special characters.") appid: Optional[APPID_TYPE]= Field(default=None,description="Internal name used by your app. " diff --git a/contentctl/objects/data_source.py b/contentctl/objects/data_source.py index 868bdd51..ed8a8f86 100644 --- a/contentctl/objects/data_source.py +++ b/contentctl/objects/data_source.py @@ -1,8 +1,7 @@ from __future__ import annotations from typing import Optional, Any -from pydantic import Field, HttpUrl, model_serializer, BaseModel +from pydantic import Field, HttpUrl, model_serializer, BaseModel, ConfigDict from contentctl.objects.security_content_object import SecurityContentObject -from contentctl.objects.event_source import EventSource class TA(BaseModel): @@ -10,15 +9,16 @@ class TA(BaseModel): url: HttpUrl | None = None version: str class DataSource(SecurityContentObject): + model_config = ConfigDict(extra="forbid") source: str = Field(...) sourcetype: str = Field(...) separator: Optional[str] = None configuration: Optional[str] = None supported_TA: list[TA] = [] - fields: Optional[list] = None - field_mappings: Optional[list] = None - convert_to_log_source: Optional[list] = None - example_log: Optional[str] = None + fields: None | list = None + field_mappings: None | list = None + convert_to_log_source: None | list = None + example_log: None | str = None @model_serializer diff --git a/contentctl/objects/deployment.py b/contentctl/objects/deployment.py index 832c048d..8fd264b6 100644 --- a/contentctl/objects/deployment.py +++ b/contentctl/objects/deployment.py @@ -1,5 +1,5 @@ from __future__ import annotations -from pydantic import Field, computed_field,ValidationInfo, model_serializer, NonNegativeInt +from pydantic import Field, computed_field,ValidationInfo, model_serializer, NonNegativeInt, ConfigDict from typing import Any import uuid import datetime @@ -11,6 +11,7 @@ class Deployment(SecurityContentObject): + model_config = ConfigDict(extra="forbid") #id: str = None #date: str = None #author: str = None @@ -72,7 +73,6 @@ def serialize_model(self): "tags": self.tags } - #Combine fields from this model with fields from parent model.update(super_fields) diff --git a/contentctl/objects/deployment_email.py b/contentctl/objects/deployment_email.py index a607502c..1d1269fe 100644 --- a/contentctl/objects/deployment_email.py +++ b/contentctl/objects/deployment_email.py @@ -1,8 +1,9 @@ from __future__ import annotations -from pydantic import BaseModel +from pydantic import BaseModel, ConfigDict class DeploymentEmail(BaseModel): + model_config = ConfigDict(extra="forbid") message: str subject: str to: str \ No newline at end of file diff --git a/contentctl/objects/deployment_notable.py b/contentctl/objects/deployment_notable.py index b6e2c463..7f064b43 100644 --- a/contentctl/objects/deployment_notable.py +++ b/contentctl/objects/deployment_notable.py @@ -1,8 +1,9 @@ from __future__ import annotations -from pydantic import BaseModel +from pydantic import BaseModel, ConfigDict from typing import List class DeploymentNotable(BaseModel): + model_config = ConfigDict(extra="forbid") rule_description: str rule_title: str nes_fields: List[str] \ No newline at end of file diff --git a/contentctl/objects/deployment_phantom.py b/contentctl/objects/deployment_phantom.py index 11df2feb..1d4a9975 100644 --- a/contentctl/objects/deployment_phantom.py +++ b/contentctl/objects/deployment_phantom.py @@ -1,8 +1,9 @@ from __future__ import annotations -from pydantic import BaseModel +from pydantic import BaseModel, ConfigDict class DeploymentPhantom(BaseModel): + model_config = ConfigDict(extra="forbid") cam_workers : str label : str phantom_server : str diff --git a/contentctl/objects/deployment_rba.py b/contentctl/objects/deployment_rba.py index b3412b3f..58917c70 100644 --- a/contentctl/objects/deployment_rba.py +++ b/contentctl/objects/deployment_rba.py @@ -1,6 +1,7 @@ from __future__ import annotations -from pydantic import BaseModel +from pydantic import BaseModel, ConfigDict class DeploymentRBA(BaseModel): + model_config = ConfigDict(extra="forbid") enabled: bool = False \ No newline at end of file diff --git a/contentctl/objects/deployment_scheduling.py b/contentctl/objects/deployment_scheduling.py index 6c5a75a8..b21673d8 100644 --- a/contentctl/objects/deployment_scheduling.py +++ b/contentctl/objects/deployment_scheduling.py @@ -1,8 +1,9 @@ from __future__ import annotations -from pydantic import BaseModel +from pydantic import BaseModel, ConfigDict class DeploymentScheduling(BaseModel): + model_config = ConfigDict(extra="forbid") cron_schedule: str earliest_time: str latest_time: str diff --git a/contentctl/objects/deployment_slack.py b/contentctl/objects/deployment_slack.py index 294836e2..03cf5ebb 100644 --- a/contentctl/objects/deployment_slack.py +++ b/contentctl/objects/deployment_slack.py @@ -1,7 +1,8 @@ from __future__ import annotations -from pydantic import BaseModel +from pydantic import BaseModel, ConfigDict class DeploymentSlack(BaseModel): + model_config = ConfigDict(extra="forbid") channel: str message: str \ No newline at end of file diff --git a/contentctl/objects/detection_tags.py b/contentctl/objects/detection_tags.py index b1d489f4..185bc190 100644 --- a/contentctl/objects/detection_tags.py +++ b/contentctl/objects/detection_tags.py @@ -38,13 +38,12 @@ # TODO (#266): disable the use_enum_values configuration class DetectionTags(BaseModel): # detection spec - model_config = ConfigDict(use_enum_values=True, validate_default=False) + model_config = ConfigDict(use_enum_values=True,validate_default=False, extra='forbid') analytic_story: list[Story] = Field(...) asset_type: AssetType = Field(...) - - confidence: NonNegativeInt = Field(..., le=100) - impact: NonNegativeInt = Field(..., le=100) - + group: list[str] = [] + confidence: NonNegativeInt = Field(...,le=100) + impact: NonNegativeInt = Field(...,le=100) @computed_field @property def risk_score(self) -> int: @@ -74,7 +73,6 @@ def severity(self)->RiskSeverity: observable: List[Observable] = [] message: str = Field(...) product: list[SecurityContentProductName] = Field(..., min_length=1) - required_fields: list[str] = Field(min_length=1) throttling: Optional[Throttling] = None security_domain: SecurityDomain = Field(...) cve: List[CVE_TYPE] = [] diff --git a/contentctl/objects/event_source.py b/contentctl/objects/event_source.py deleted file mode 100644 index 0ed61979..00000000 --- a/contentctl/objects/event_source.py +++ /dev/null @@ -1,11 +0,0 @@ -from __future__ import annotations -from typing import Union, Optional, List -from pydantic import BaseModel, Field - -from contentctl.objects.security_content_object import SecurityContentObject - -class EventSource(SecurityContentObject): - fields: Optional[list[str]] = None - field_mappings: Optional[list[dict]] = None - convert_to_log_source: Optional[list[dict]] = None - example_log: Optional[str] = None diff --git a/contentctl/objects/investigation.py b/contentctl/objects/investigation.py index 293e3331..6e058783 100644 --- a/contentctl/objects/investigation.py +++ b/contentctl/objects/investigation.py @@ -16,13 +16,10 @@ class Investigation(SecurityContentObject): model_config = ConfigDict(use_enum_values=True,validate_default=False) type: str = Field(...,pattern="^Investigation$") - datamodel: list[DataModel] = Field(...) name:str = Field(...,max_length=CONTENTCTL_MAX_SEARCH_NAME_LENGTH) search: str = Field(...) how_to_implement: str = Field(...) known_false_positives: str = Field(...) - - tags: InvestigationTags # enrichment @@ -38,6 +35,11 @@ def inputs(self)->List[str]: return inputs + @computed_field + @property + def datamodel(self) -> List[DataModel]: + return [dm for dm in DataModel if dm.value in self.search] + @computed_field @property def lowercase_name(self)->str: diff --git a/contentctl/objects/investigation_tags.py b/contentctl/objects/investigation_tags.py index 6db99eff..c4b812e6 100644 --- a/contentctl/objects/investigation_tags.py +++ b/contentctl/objects/investigation_tags.py @@ -1,13 +1,13 @@ from __future__ import annotations from typing import List -from pydantic import BaseModel, Field, field_validator, ValidationInfo, model_serializer +from pydantic import BaseModel, Field, field_validator, ValidationInfo, model_serializer,ConfigDict from contentctl.objects.story import Story from contentctl.objects.enums import SecurityContentInvestigationProductName, SecurityDomain class InvestigationTags(BaseModel): + model_config = ConfigDict(extra="forbid") analytic_story: List[Story] = Field([],min_length=1) product: List[SecurityContentInvestigationProductName] = Field(...,min_length=1) - required_fields: List[str] = Field(min_length=1) security_domain: SecurityDomain = Field(...) @@ -23,7 +23,6 @@ def serialize_model(self): model= { "analytic_story": [story.name for story in self.analytic_story], "product": self.product, - "required_fields": self.required_fields, "security_domain": self.security_domain, } diff --git a/contentctl/objects/lookup.py b/contentctl/objects/lookup.py index e37e60e9..bc699f49 100644 --- a/contentctl/objects/lookup.py +++ b/contentctl/objects/lookup.py @@ -33,6 +33,7 @@ class Lookup(SecurityContentObject): default_match: Optional[bool] = None match_type: Optional[str] = None min_matches: Optional[int] = None + max_matches: Optional[int] = None case_sensitive_match: Optional[bool] = None # TODO: Add id field to all lookup ymls id: uuid.UUID = Field(default_factory=uuid.uuid4) @@ -52,6 +53,7 @@ def serialize_model(self): "default_match": "true" if self.default_match is True else "false", "match_type": self.match_type, "min_matches": self.min_matches, + "max_matches": self.max_matches, "case_sensitive_match": "true" if self.case_sensitive_match is True else "false", "collection": self.collection, "fields_list": self.fields_list diff --git a/contentctl/objects/mitre_attack_enrichment.py b/contentctl/objects/mitre_attack_enrichment.py index 401774e9..85df2c4b 100644 --- a/contentctl/objects/mitre_attack_enrichment.py +++ b/contentctl/objects/mitre_attack_enrichment.py @@ -85,7 +85,7 @@ def standardize_contributors(cls, contributors:list[str] | None) -> list[str]: # TODO (#266): disable the use_enum_values configuration class MitreAttackEnrichment(BaseModel): - ConfigDict(use_enum_values=True) + ConfigDict(use_enum_values=True,extra='forbid') mitre_attack_id: MITRE_ATTACK_ID_TYPE = Field(...) mitre_attack_technique: str = Field(...) mitre_attack_tactics: List[MitreTactics] = Field(...) diff --git a/contentctl/objects/observable.py b/contentctl/objects/observable.py index daf7a70b..81b04922 100644 --- a/contentctl/objects/observable.py +++ b/contentctl/objects/observable.py @@ -1,8 +1,9 @@ -from pydantic import BaseModel, field_validator +from pydantic import BaseModel, field_validator, ConfigDict from contentctl.objects.constants import SES_OBSERVABLE_TYPE_MAPPING, RBA_OBSERVABLE_ROLE_MAPPING class Observable(BaseModel): + model_config = ConfigDict(extra="forbid") name: str type: str role: list[str] diff --git a/contentctl/objects/playbook_tags.py b/contentctl/objects/playbook_tags.py index fd4a21e6..10d90ac1 100644 --- a/contentctl/objects/playbook_tags.py +++ b/contentctl/objects/playbook_tags.py @@ -1,6 +1,6 @@ from __future__ import annotations from typing import TYPE_CHECKING, Optional, List -from pydantic import BaseModel, Field +from pydantic import BaseModel, Field,ConfigDict import enum from contentctl.objects.detection import Detection @@ -36,6 +36,7 @@ class DefendTechnique(str,enum.Enum): D3_SRA = "D3-SRA" D3_RUAA = "D3-RUAA" class PlaybookTag(BaseModel): + model_config = ConfigDict(extra="forbid") analytic_story: Optional[list] = None detections: Optional[list] = None platform_tags: list[str] = Field(...,min_length=0) @@ -46,5 +47,8 @@ class PlaybookTag(BaseModel): use_cases: list[PlaybookUseCase] = Field([],min_length=0) defend_technique_id: Optional[List[DefendTechnique]] = None + labels:list[str] = [] + playbook_outputs:list[str] = [] + detection_objects: list[Detection] = [] \ No newline at end of file diff --git a/contentctl/objects/test_attack_data.py b/contentctl/objects/test_attack_data.py index 2c53df0b..5d5f9c80 100644 --- a/contentctl/objects/test_attack_data.py +++ b/contentctl/objects/test_attack_data.py @@ -1,8 +1,9 @@ from __future__ import annotations -from pydantic import BaseModel, HttpUrl, FilePath, Field +from pydantic import BaseModel, HttpUrl, FilePath, Field, ConfigDict class TestAttackData(BaseModel): + model_config = ConfigDict(extra="forbid") data: HttpUrl | FilePath = Field(...) # TODO - should source and sourcetype should be mapped to a list # of supported source and sourcetypes in a given environment? diff --git a/contentctl/objects/unit_test_baseline.py b/contentctl/objects/unit_test_baseline.py index 9ba49336..66a60594 100644 --- a/contentctl/objects/unit_test_baseline.py +++ b/contentctl/objects/unit_test_baseline.py @@ -1,9 +1,10 @@ -from pydantic import BaseModel +from pydantic import BaseModel,ConfigDict from typing import Union class UnitTestBaseline(BaseModel): + model_config = ConfigDict(extra="forbid") name: str file: str pass_condition: str diff --git a/contentctl/output/data_source_writer.py b/contentctl/output/data_source_writer.py index 97967a72..1a6e4f95 100644 --- a/contentctl/output/data_source_writer.py +++ b/contentctl/output/data_source_writer.py @@ -1,6 +1,5 @@ import csv from contentctl.objects.data_source import DataSource -from contentctl.objects.event_source import EventSource from typing import List import pathlib diff --git a/contentctl/output/templates/doc_detections.j2 b/contentctl/output/templates/doc_detections.j2 index 5430b0ed..60f0282f 100644 --- a/contentctl/output/templates/doc_detections.j2 +++ b/contentctl/output/templates/doc_detections.j2 @@ -162,11 +162,6 @@ The SPL above uses the following Lookups: {% endfor %} {% endif -%} -#### Required field -{% for field in object.tags.required_fields -%} -* {{ field }} -{% endfor %} - #### How To Implement {{ object.how_to_implement}} diff --git a/contentctl/output/yml_output.py b/contentctl/output/yml_output.py index 93eae5dc..b4da5412 100644 --- a/contentctl/output/yml_output.py +++ b/contentctl/output/yml_output.py @@ -43,7 +43,6 @@ def writeDetections(self, objects: list, output_path : str) -> None: "kill_chain_phases:": True, "observable": True, "product": True, - "required_fields": True, "risk_score": True, "security_domain": True }, diff --git a/contentctl/templates/detections/endpoint/anomalous_usage_of_7zip.yml b/contentctl/templates/detections/endpoint/anomalous_usage_of_7zip.yml index a101fd7d..fbf847e1 100644 --- a/contentctl/templates/detections/endpoint/anomalous_usage_of_7zip.yml +++ b/contentctl/templates/detections/endpoint/anomalous_usage_of_7zip.yml @@ -71,18 +71,6 @@ tags: - Splunk Enterprise - Splunk Enterprise Security - Splunk Cloud - required_fields: - - _time - - Processes.process_name - - Processes.process - - Processes.dest - - Processes.user - - Processes.parent_process_name - - Processes.process_name - - Processes.parent_process - - Processes.process_id - - Processes.parent_process_id - risk_score: 64 security_domain: endpoint tests: - name: True Positive Test diff --git a/pyproject.toml b/pyproject.toml index ed1eebd9..4d6172e5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "contentctl" -version = "4.4.6" +version = "4.5.0" description = "Splunk Content Control Tool" authors = ["STRT "] @@ -12,7 +12,7 @@ contentctl = 'contentctl.contentctl:main' [tool.poetry.dependencies] python = "^3.11,<3.13" -pydantic = "^2.8.2" +pydantic = "~2.9.2" PyYAML = "^6.0.2" requests = "~2.32.3" pycvesearch = "^1.2"