From a906f1ecb07b5b269cd7f9740e56b52eff913001 Mon Sep 17 00:00:00 2001 From: Christian Studer Date: Wed, 21 Feb 2024 11:20:12 +0100 Subject: [PATCH] fix: [stix2 import] Fixed MISP Sightings handling --- .../stix2misp/stix2_to_misp.py | 131 ++++++++---------- 1 file changed, 54 insertions(+), 77 deletions(-) diff --git a/misp_stix_converter/stix2misp/stix2_to_misp.py b/misp_stix_converter/stix2misp/stix2_to_misp.py index 78f8b0ab..c54930cf 100644 --- a/misp_stix_converter/stix2misp/stix2_to_misp.py +++ b/misp_stix_converter/stix2misp/stix2_to_misp.py @@ -487,12 +487,27 @@ def _load_observed_data(self, observed_data: _OBSERVED_DATA_TYPING): self._observed_data = {observed_data.id: observed_data} def _load_opinion(self, opinion: Opinion): + misp_sighting = MISPSighting() + sighting_args = { + 'date_sighting': self._timestamp_from_date(opinion.modified), + 'type': '1', + **self._sanitise_attribute_uuid(opinion.id) + } + if hasattr(opinion, 'x_misp_source'): + sighting_args['source'] = opinion.x_misp_source + if hasattr(opinion, 'x_misp_author_ref'): + identity = self._identity[opinion.x_misp_author_ref] + sighting_args['Organisation'] = { + 'uuid': self._sanitise_uuid(identity.id), + 'name': identity.name + } + misp_sighting.from_dict(**sighting_args) opinion_ref = self._sanitise_uuid(opinion.id) try: - self._sighting['opinion'][opinion_ref] = opinion + self._sighting['opinion'][opinion_ref] = misp_sighting except AttributeError: self._sighting = defaultdict(lambda: defaultdict(list)) - self._sighting['opinion'][opinion_ref] = opinion + self._sighting['opinion'][opinion_ref] = misp_sighting for object_ref in opinion.object_refs: sanitised_ref = self._sanitise_uuid(object_ref) self._sighting['opinion_refs'][sanitised_ref].append(opinion_ref) @@ -514,12 +529,27 @@ def _load_report(self, report: _REPORT_TYPING): self._report = {report.id: report} def _load_sighting(self, sighting: _SIGHTING_TYPING): + misp_sighting = MISPSighting() + sighting_args = { + 'date_sighting': self._timestamp_from_date(sighting.modified), + 'type': '0', + **self._sanitise_attribute_uuid(sighting.id) + } + if hasattr(sighting, 'description'): + sighting_args['source'] = sighting.description + if hasattr(sighting, 'where_sighted_refs'): + identity = self._identity[sighting.where_sighted_refs[0]] + sighting_args['Organisation'] = { + 'uuid': self._sanitise_uuid(identity.id), + 'name': identity.name + } + misp_sighting.from_dict(**sighting_args) sighting_of_ref = self._sanitise_uuid(sighting.sighting_of_ref) try: - self._sighting['sighting'][sighting_of_ref].append(sighting) + self._sighting['sighting'][sighting_of_ref].append(misp_sighting) except AttributeError: self._sighting = defaultdict(lambda: defaultdict(list)) - self._sighting['sighting'][sighting_of_ref].append(sighting) + self._sighting['sighting'][sighting_of_ref].append(misp_sighting) def _load_threat_actor(self, threat_actor: _THREAT_ACTOR_TYPING): self._check_uuid(threat_actor.id) @@ -817,21 +847,35 @@ def _handle_meta_fields(self, stix_object: _GALAXY_OBJECTS_TYPING) -> dict: def _handle_attribute_sightings(self, attribute: MISPAttribute): attribute_uuid = attribute.uuid + if attribute_uuid in self.replacement_uuids: + attribute_uuid = self.replacement_uuids[attribute_uuid] if attribute_uuid in self._sighting.get('sighting', {}): - self._parse_attribute_sightings(attribute) + for sighting in self._sighting['sighting'][attribute_uuid]: + attribute.add_sighting(sighting) if attribute_uuid in self._sighting.get('opinion_refs', {}): - self._parse_attribute_opinions(attribute) + for opinion_ref in self._sighting['opinion_refs'][attribute_uuid]: + attribute.add_sighting(self._sighting['opinion'][opinion_ref]) elif attribute_uuid in self._sighting.get('custom_opinion', {}): - self._parse_attribute_custom_opinions(attribute) + for sighting in self._sighting['custom_opinion'][attribute_uuid]: + attribute.add_sighting(sighting) def _handle_object_sightings(self, misp_object: MISPObject): object_uuid = misp_object.uuid + if object_uuid in self.replacement_uuids: + object_uuid = self.replacement_uuids[object_uuid] if object_uuid in self._sighting.get('sighting', {}): - self._parse_object_sightings(misp_object) + for sighting in self._sighting['sighting'][object_uuid]: + for attribute in misp_object.attributes: + attribute.add_sighting(sighting) if object_uuid in self._sighting.get('opinion_refs', {}): - self._parse_object_opinions(misp_object) + for opinion_ref in self._sighting['opinion_refs'][object_uuid]: + sighting = self._sighting['opinion'][opinion_ref] + for attribute in misp_object.attributes: + attribute.add_sighting(sighting) elif misp_object.uuid in self._sighting.get('custom_opinion', {}): - self._parse_object_custom_opinion(misp_object) + for sighting in self._sighting['custom_opinion'][object_uuid]: + for attribute in misp_object.attributes: + attribute.add_sighting(sighting) def _handle_opposite_reference( self, relationship_type: str, source_uuid: str, target_uuid: str): @@ -839,14 +883,6 @@ def _handle_opposite_reference( reference = (source_uuid, self.relationship_types[relationship_type]) self._relationship[sanitised_uuid].add(reference) - def _parse_attribute_custom_opinions(self, attribute: MISPAttribute): - for sighting in self._sighting['custom_opinion'][attribute.uuid]: - attribute.add_sighting(sighting) - - def _parse_attribute_opinions(self, attribute: MISPAttribute): - for opinion_ref in self._sighting['opinion_refs'][attribute.uuid]: - attribute.add_sighting(self._sighting['opinion'][opinion_ref]) - def _parse_attribute_relationships_as_container( self, attribute: MISPAttribute): clusters = defaultdict(list) @@ -880,10 +916,6 @@ def _parse_attribute_relationships_as_tag_names( relationship_type, attribute.uuid, referenced_uuid ) - def _parse_attribute_sightings(self, attribute: MISPAttribute): - for sighting in self._sighting['sighting'][attribute.uuid]: - attribute.add_sighting(self._parse_sighting(sighting)) - def _parse_cluster_relationships(self, cluster: MISPGalaxyCluster): for relationship in self._relationship[cluster.uuid]: referenced_uuid, relationship_type = relationship @@ -899,17 +931,6 @@ def _parse_galaxy_relationships(self): if cluster.uuid in self._relationship: self._parse_cluster_relationships(cluster) - def _parse_object_custom_opinion(self, misp_object: MISPObject): - for sighting in self._sighting['custom_opinion'][misp_object.uuid]: - for attribute in misp_object.attributes: - attribute.add_sighting(sighting) - - def _parse_object_opinions(self, misp_object: MISPObject): - for opinion_ref in self._sighting['opinion_refs'][misp_object.uuid]: - sighting = self._sighting['opinion'][opinion_ref] - for attribute in misp_object.attributes: - attribute.add_sighting(sighting) - def _parse_object_relationships_as_container(self, misp_object: MISPObject): clusters = defaultdict(list) for relationship in self._relationship[misp_object.uuid]: @@ -941,29 +962,6 @@ def _parse_object_relationships_as_tag_names(self, misp_object: MISPObject): self._sanitise_uuid(referenced_uuid), relationship_type ) - def _parse_object_sightings(self, misp_object: MISPObject): - for sighting in self._sighting['sighting'][misp_object.uuid]: - misp_sighting = self._parse_sighting(sighting) - for attribute in misp_object.attributes: - attribute.add_sighting(misp_sighting) - - def _parse_opinion(self, opinion: Opinion) -> MISPSighting: - sighting = MISPSighting() - sighting_args = { - 'date_sighting': self._timestamp_from_date(opinion.modified), - 'type': '1' - } - if hasattr(opinion, 'x_misp_source'): - sighting_args['source'] = opinion.x_misp_source - if hasattr(opinion, 'x_misp_author_ref'): - identity = self._identity[opinion.x_misp_author_ref] - sighting_args['Organisation'] = { - 'uuid': self._sanitise_uuid(identity.id), - 'name': identity.name - } - sighting.from_dict(**sighting_args) - return sighting - def _parse_relationships(self): for attribute in self.misp_event.attributes: if attribute.uuid in self._relationship: @@ -985,8 +983,6 @@ def _parse_relationships(self): self._parse_galaxy_relationships() def _parse_relationships_and_sightings(self): - for opinion_id, opinion in self._sighting['opinion'].items(): - self._sighting['opinion'][opinion_id] = self._parse_opinion(opinion) for attribute in self.misp_event.attributes: if attribute.uuid in self._relationship: getattr( @@ -1008,26 +1004,7 @@ def _parse_relationships_and_sightings(self): if not self.galaxies_as_tags: self._parse_galaxy_relationships() - def _parse_sighting(self, sighting: _SIGHTING_TYPING) -> MISPSighting: - misp_sighting = MISPSighting() - sighting_args = { - 'date_sighting': self._timestamp_from_date(sighting.modified), - 'type': '0' - } - if hasattr(sighting, 'description'): - sighting_args['source'] = sighting.description - if hasattr(sighting, 'where_sighted_refs'): - identity = self._identity[sighting.where_sighted_refs[0]] - sighting_args['Organisation'] = { - 'uuid': self._sanitise_uuid(identity.id), - 'name': identity.name - } - misp_sighting.from_dict(**sighting_args) - return misp_sighting - def _parse_sightings(self): - for opinion_id, opinion in self._sighting['opinion'].items(): - self._sighting['opinion'][opinion_id] = self._parse_opinion(opinion) for attribute in self.misp_event.attributes: self._handle_attribute_sightings(attribute) for misp_object in self.misp_event.objects: