Skip to content

Commit

Permalink
fix: [stix2 import] Fixed MISP Sightings handling
Browse files Browse the repository at this point in the history
  • Loading branch information
chrisr3d committed Feb 21, 2024
1 parent 993860e commit a906f1e
Showing 1 changed file with 54 additions and 77 deletions.
131 changes: 54 additions & 77 deletions misp_stix_converter/stix2misp/stix2_to_misp.py
Original file line number Diff line number Diff line change
Expand Up @@ -487,12 +487,27 @@ def _load_observed_data(self, observed_data: _OBSERVED_DATA_TYPING):
self._observed_data = {observed_data.id: observed_data}

def _load_opinion(self, opinion: Opinion):
misp_sighting = MISPSighting()
sighting_args = {
'date_sighting': self._timestamp_from_date(opinion.modified),
'type': '1',
**self._sanitise_attribute_uuid(opinion.id)
}
if hasattr(opinion, 'x_misp_source'):
sighting_args['source'] = opinion.x_misp_source
if hasattr(opinion, 'x_misp_author_ref'):
identity = self._identity[opinion.x_misp_author_ref]
sighting_args['Organisation'] = {
'uuid': self._sanitise_uuid(identity.id),
'name': identity.name
}
misp_sighting.from_dict(**sighting_args)
opinion_ref = self._sanitise_uuid(opinion.id)
try:
self._sighting['opinion'][opinion_ref] = opinion
self._sighting['opinion'][opinion_ref] = misp_sighting
except AttributeError:
self._sighting = defaultdict(lambda: defaultdict(list))
self._sighting['opinion'][opinion_ref] = opinion
self._sighting['opinion'][opinion_ref] = misp_sighting
for object_ref in opinion.object_refs:
sanitised_ref = self._sanitise_uuid(object_ref)
self._sighting['opinion_refs'][sanitised_ref].append(opinion_ref)
Expand All @@ -514,12 +529,27 @@ def _load_report(self, report: _REPORT_TYPING):
self._report = {report.id: report}

def _load_sighting(self, sighting: _SIGHTING_TYPING):
misp_sighting = MISPSighting()
sighting_args = {
'date_sighting': self._timestamp_from_date(sighting.modified),
'type': '0',
**self._sanitise_attribute_uuid(sighting.id)
}
if hasattr(sighting, 'description'):
sighting_args['source'] = sighting.description
if hasattr(sighting, 'where_sighted_refs'):
identity = self._identity[sighting.where_sighted_refs[0]]
sighting_args['Organisation'] = {
'uuid': self._sanitise_uuid(identity.id),
'name': identity.name
}
misp_sighting.from_dict(**sighting_args)
sighting_of_ref = self._sanitise_uuid(sighting.sighting_of_ref)
try:
self._sighting['sighting'][sighting_of_ref].append(sighting)
self._sighting['sighting'][sighting_of_ref].append(misp_sighting)
except AttributeError:
self._sighting = defaultdict(lambda: defaultdict(list))
self._sighting['sighting'][sighting_of_ref].append(sighting)
self._sighting['sighting'][sighting_of_ref].append(misp_sighting)

def _load_threat_actor(self, threat_actor: _THREAT_ACTOR_TYPING):
self._check_uuid(threat_actor.id)
Expand Down Expand Up @@ -817,36 +847,42 @@ def _handle_meta_fields(self, stix_object: _GALAXY_OBJECTS_TYPING) -> dict:

def _handle_attribute_sightings(self, attribute: MISPAttribute):
attribute_uuid = attribute.uuid
if attribute_uuid in self.replacement_uuids:
attribute_uuid = self.replacement_uuids[attribute_uuid]
if attribute_uuid in self._sighting.get('sighting', {}):
self._parse_attribute_sightings(attribute)
for sighting in self._sighting['sighting'][attribute_uuid]:
attribute.add_sighting(sighting)
if attribute_uuid in self._sighting.get('opinion_refs', {}):
self._parse_attribute_opinions(attribute)
for opinion_ref in self._sighting['opinion_refs'][attribute_uuid]:
attribute.add_sighting(self._sighting['opinion'][opinion_ref])
elif attribute_uuid in self._sighting.get('custom_opinion', {}):
self._parse_attribute_custom_opinions(attribute)
for sighting in self._sighting['custom_opinion'][attribute_uuid]:
attribute.add_sighting(sighting)

def _handle_object_sightings(self, misp_object: MISPObject):
object_uuid = misp_object.uuid
if object_uuid in self.replacement_uuids:
object_uuid = self.replacement_uuids[object_uuid]
if object_uuid in self._sighting.get('sighting', {}):
self._parse_object_sightings(misp_object)
for sighting in self._sighting['sighting'][object_uuid]:
for attribute in misp_object.attributes:
attribute.add_sighting(sighting)
if object_uuid in self._sighting.get('opinion_refs', {}):
self._parse_object_opinions(misp_object)
for opinion_ref in self._sighting['opinion_refs'][object_uuid]:
sighting = self._sighting['opinion'][opinion_ref]
for attribute in misp_object.attributes:
attribute.add_sighting(sighting)
elif misp_object.uuid in self._sighting.get('custom_opinion', {}):
self._parse_object_custom_opinion(misp_object)
for sighting in self._sighting['custom_opinion'][object_uuid]:
for attribute in misp_object.attributes:
attribute.add_sighting(sighting)

def _handle_opposite_reference(
self, relationship_type: str, source_uuid: str, target_uuid: str):
sanitised_uuid = self._sanitise_uuid(target_uuid)
reference = (source_uuid, self.relationship_types[relationship_type])
self._relationship[sanitised_uuid].add(reference)

def _parse_attribute_custom_opinions(self, attribute: MISPAttribute):
for sighting in self._sighting['custom_opinion'][attribute.uuid]:
attribute.add_sighting(sighting)

def _parse_attribute_opinions(self, attribute: MISPAttribute):
for opinion_ref in self._sighting['opinion_refs'][attribute.uuid]:
attribute.add_sighting(self._sighting['opinion'][opinion_ref])

def _parse_attribute_relationships_as_container(
self, attribute: MISPAttribute):
clusters = defaultdict(list)
Expand Down Expand Up @@ -880,10 +916,6 @@ def _parse_attribute_relationships_as_tag_names(
relationship_type, attribute.uuid, referenced_uuid
)

def _parse_attribute_sightings(self, attribute: MISPAttribute):
for sighting in self._sighting['sighting'][attribute.uuid]:
attribute.add_sighting(self._parse_sighting(sighting))

def _parse_cluster_relationships(self, cluster: MISPGalaxyCluster):
for relationship in self._relationship[cluster.uuid]:
referenced_uuid, relationship_type = relationship
Expand All @@ -899,17 +931,6 @@ def _parse_galaxy_relationships(self):
if cluster.uuid in self._relationship:
self._parse_cluster_relationships(cluster)

def _parse_object_custom_opinion(self, misp_object: MISPObject):
for sighting in self._sighting['custom_opinion'][misp_object.uuid]:
for attribute in misp_object.attributes:
attribute.add_sighting(sighting)

def _parse_object_opinions(self, misp_object: MISPObject):
for opinion_ref in self._sighting['opinion_refs'][misp_object.uuid]:
sighting = self._sighting['opinion'][opinion_ref]
for attribute in misp_object.attributes:
attribute.add_sighting(sighting)

def _parse_object_relationships_as_container(self, misp_object: MISPObject):
clusters = defaultdict(list)
for relationship in self._relationship[misp_object.uuid]:
Expand Down Expand Up @@ -941,29 +962,6 @@ def _parse_object_relationships_as_tag_names(self, misp_object: MISPObject):
self._sanitise_uuid(referenced_uuid), relationship_type
)

def _parse_object_sightings(self, misp_object: MISPObject):
for sighting in self._sighting['sighting'][misp_object.uuid]:
misp_sighting = self._parse_sighting(sighting)
for attribute in misp_object.attributes:
attribute.add_sighting(misp_sighting)

def _parse_opinion(self, opinion: Opinion) -> MISPSighting:
sighting = MISPSighting()
sighting_args = {
'date_sighting': self._timestamp_from_date(opinion.modified),
'type': '1'
}
if hasattr(opinion, 'x_misp_source'):
sighting_args['source'] = opinion.x_misp_source
if hasattr(opinion, 'x_misp_author_ref'):
identity = self._identity[opinion.x_misp_author_ref]
sighting_args['Organisation'] = {
'uuid': self._sanitise_uuid(identity.id),
'name': identity.name
}
sighting.from_dict(**sighting_args)
return sighting

def _parse_relationships(self):
for attribute in self.misp_event.attributes:
if attribute.uuid in self._relationship:
Expand All @@ -985,8 +983,6 @@ def _parse_relationships(self):
self._parse_galaxy_relationships()

def _parse_relationships_and_sightings(self):
for opinion_id, opinion in self._sighting['opinion'].items():
self._sighting['opinion'][opinion_id] = self._parse_opinion(opinion)
for attribute in self.misp_event.attributes:
if attribute.uuid in self._relationship:
getattr(
Expand All @@ -1008,26 +1004,7 @@ def _parse_relationships_and_sightings(self):
if not self.galaxies_as_tags:
self._parse_galaxy_relationships()

def _parse_sighting(self, sighting: _SIGHTING_TYPING) -> MISPSighting:
misp_sighting = MISPSighting()
sighting_args = {
'date_sighting': self._timestamp_from_date(sighting.modified),
'type': '0'
}
if hasattr(sighting, 'description'):
sighting_args['source'] = sighting.description
if hasattr(sighting, 'where_sighted_refs'):
identity = self._identity[sighting.where_sighted_refs[0]]
sighting_args['Organisation'] = {
'uuid': self._sanitise_uuid(identity.id),
'name': identity.name
}
misp_sighting.from_dict(**sighting_args)
return misp_sighting

def _parse_sightings(self):
for opinion_id, opinion in self._sighting['opinion'].items():
self._sighting['opinion'][opinion_id] = self._parse_opinion(opinion)
for attribute in self.misp_event.attributes:
self._handle_attribute_sightings(attribute)
for misp_object in self.misp_event.objects:
Expand Down

0 comments on commit a906f1e

Please sign in to comment.