Skip to content

Commit

Permalink
fix: [stix2 import] Avoiding issues with STIX 2.x content coming from…
Browse files Browse the repository at this point in the history
… a TAXII collection or embedded into a single list instead of a Bundle
  • Loading branch information
chrisr3d committed Feb 20, 2024
1 parent 729085c commit d6abd9e
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 23 deletions.
10 changes: 3 additions & 7 deletions misp_stix_converter/misp_stix_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from .misp2stix.stix1_mapping import NS_DICT, SCHEMALOC_DICT
from .stix2misp.external_stix1_to_misp import ExternalSTIX1toMISPParser
from .stix2misp.external_stix2_to_misp import ExternalSTIX2toMISPParser
from .stix2misp.importparser import _load_stix2_content
from .stix2misp.internal_stix1_to_misp import InternalSTIX1toMISPParser
from .stix2misp.internal_stix2_to_misp import InternalSTIX2toMISPParser
from collections import defaultdict
Expand All @@ -27,8 +28,6 @@
Campaigns, CoursesOfAction, Indicators, ThreatActors, STIXPackage)
from stix.core.ttps import TTPs
from stix2.base import STIXJSONEncoder
from stix2.exceptions import InvalidValueError
from stix2.parsing import parse as stix2_parser, ParseError
from stix2.v20 import Bundle as Bundle_v20
from stix2.v21 import Bundle as Bundle_v21
from typing import List, Optional, Union
Expand Down Expand Up @@ -673,11 +672,8 @@ def stix_2_to_misp(filename: _files_type,
if isinstance(filename, str):
filename = Path(filename).resolve()
try:
with open(filename, 'rt', encoding='utf-8') as f:
bundle = stix2_parser(
f.read(), allow_custom=True, interoperability=True
)
except (ParseError, InvalidValueError) as error:
bundle = _load_stix2_content(filename)
except Exception as error:
return {'errors': [f'{filename} - {error.__str__()}']}
parser, args = _get_stix2_parser(
_from_misp(bundle.objects), distribution, sharing_group_id,
Expand Down
1 change: 1 addition & 0 deletions misp_stix_converter/stix2misp/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from .external_stix1_to_misp import ExternalSTIX1toMISPParser # noqa
from .external_stix2_mapping import ExternalSTIX2toMISPMapping # noqa
from .external_stix2_to_misp import ExternalSTIX2toMISPParser # noqa
from .importparser import _load_stix2_content # noqa
from .internal_stix1_to_misp import InternalSTIX1toMISPParser # noqa
from .internal_stix2_mapping import InternalSTIX2toMISPMapping # noqa
from .internal_stix2_to_misp import InternalSTIX2toMISPParser # noqa
Expand Down
41 changes: 41 additions & 0 deletions misp_stix_converter/stix2misp/importparser.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,11 @@
from collections import defaultdict
from pathlib import Path
from pymisp import AbstractMISP, MISPEvent, MISPObject
from stix2.exceptions import InvalidValueError
from stix2.parsing import dict_to_stix2, parse as stix2_parser, ParseError
from stix2.v20.bundle import Bundle as Bundle_v20
from stix2.v20.sdo import Indicator as Indicator_v20
from stix2.v21.bundle import Bundle as Bundle_v21
from stix2.v21.sdo import Indicator as Indicator_v21
from types import GeneratorType
from typing import Optional, Union
Expand All @@ -26,6 +30,43 @@
_UUIDv4 = UUID('76beed5f-7251-457e-8c2a-b45f7b589d3d')


def _get_stix2_content_version(stix2_content: dict):
for stix_object in stix2_content['objects']:
if stix_object.get('spec_version'):
return '2.1'
return '2.0'


def _handle_stix2_loading_error(stix2_content: dict):
version = _get_stix2_content_version(stix2_content)
if isinstance(stix2_content, dict):
if version == '2.1' and stix2_content.get('spec_version') == '2.0':
del stix2_content['spec_version']
return dict_to_stix2(
stix2_content, allow_custom=True, interoperability=True
)
if version == '2.0' and stix2_content.get('spec_version') == '2.1':
stix2_content['spec_version'] = '2.0'
return dict_to_stix2(
stix2_content, allow_custom=True, interoperability=True
)
bundle = Bundle_v21 if version == '2.1' else Bundle_v20
if 'objects' in stix2_content:
stix2_content = stix2_content['objects']
return bundle(*stix2_content, allow_custom=True, interoperability=True)


def _load_stix2_content(filename):
with open(filename, 'rt', encoding='utf-8') as f:
stix2_content = f.read()
try:
return stix2_parser(
stix2_content, allow_custom=True, interoperability=True
)
except (InvalidValueError, ParseError):
return _handle_stix2_loading_error(json.loads(stix2_content))


class STIXtoMISPParser(metaclass=ABCMeta):
def __init__(self, distribution: int, sharing_group_id: Union[int, None],
galaxies_as_tags: bool):
Expand Down
30 changes: 14 additions & 16 deletions misp_stix_converter/stix2misp/stix2_to_misp.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,9 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from os import walk
import sys
import time
from .exceptions import (
MarkingDefinitionLoadingError, ObjectRefLoadingError,
ObjectTypeLoadingError, SynonymsResourceJSONError,
UnavailableGalaxyResourcesError, UnavailableSynonymsResourceError,
UndefinedIndicatorError, UndefinedSTIXObjectError, UndefinedObservableError,
UnknownAttributeTypeError, UnknownObjectNameError,
UnknownParsingFunctionError, UnknownPatternTypeError,
UnknownStixObjectTypeError)
from .external_stix2_mapping import ExternalSTIX2toMISPMapping
from .importparser import STIXtoMISPParser, _INDICATOR_TYPING
from .internal_stix2_mapping import InternalSTIX2toMISPMapping
from .converters import (
ExternalSTIX2AttackPatternConverter, ExternalSTIX2MalwareAnalysisConverter,
ExternalSTIX2CampaignConverter, InternalSTIX2CampaignConverter,
Expand All @@ -27,14 +17,25 @@
ExternalSTIX2ThreatActorConverter, InternalSTIX2ThreatActorConverter,
ExternalSTIX2ToolConverter, InternalSTIX2ToolConverter,
ExternalSTIX2VulnerabilityConverter, InternalSTIX2VulnerabilityConverter)
from .exceptions import (
MarkingDefinitionLoadingError, ObjectRefLoadingError,
ObjectTypeLoadingError, SynonymsResourceJSONError,
UnavailableGalaxyResourcesError, UnavailableSynonymsResourceError,
UndefinedIndicatorError, UndefinedSTIXObjectError, UndefinedObservableError,
UnknownAttributeTypeError, UnknownObjectNameError,
UnknownParsingFunctionError, UnknownPatternTypeError,
UnknownStixObjectTypeError)
from .external_stix2_mapping import ExternalSTIX2toMISPMapping
from .importparser import (
STIXtoMISPParser, _INDICATOR_TYPING, _load_stix2_content)
from .internal_stix2_mapping import InternalSTIX2toMISPMapping
from abc import ABCMeta
from collections import defaultdict
from datetime import datetime
from pymisp import (
AbstractMISP, MISPEvent, MISPAttribute, MISPGalaxy, MISPGalaxyCluster,
MISPObject, MISPSighting)
from stix2 import TLP_AMBER, TLP_GREEN, TLP_RED, TLP_WHITE
from stix2.parsing import parse as stix2_parser
from stix2.v20.bundle import Bundle as Bundle_v20
from stix2.v20.common import MarkingDefinition as MarkingDefinition_v20
from stix2.v20.observables import NetworkTraffic as NetworkTraffic_v20
Expand Down Expand Up @@ -283,10 +284,7 @@ def parse_stix_bundle(self, single_event: Optional[bool] = False):
def parse_stix_content(
self, filename: str, single_event: Optional[bool] = False):
try:
with open(filename, 'rt', encoding='utf-8') as f:
bundle = stix2_parser(
f.read(), allow_custom=True, interoperability=True
)
bundle = _load_stix2_content(filename)
except Exception as exception:
sys.exit(exception)
self.load_stix_bundle(bundle)
Expand Down

0 comments on commit d6abd9e

Please sign in to comment.