Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better STIX Parsing #23

Open
alatif113 opened this issue Jun 27, 2017 · 19 comments
Open

Better STIX Parsing #23

alatif113 opened this issue Jun 27, 2017 · 19 comments

Comments

@alatif113
Copy link

alatif113 commented Jun 27, 2017

Currently every observable is parsed out of a given STIX input and added as an attribute to a MISP event. In many cases this can give undesired results. A better solution would be as follows:

  1. Create an observable map (python dictionary) with the observable id as a key and the observable object as the value, for every observable within the STIX package.
  2. Create a TTP map (python dictionary) with the TTP id as a key and the TTP object as the value, for every TTP within the STIX package.
  3. If there are indicators within the STIX package, parse the indicator. Use the observable map to retrieve observables part of that indicator (typically they'll be represented only by the ID within the indicator). Use the TTP map to retrieve TTPs part of that indicator (typically they'll be represented only by the ID within the TTP).
  • Parsing indicators, rather than straight observables will give you the added value of confidence levels, TTPs, campaigns, actors, etc that can possibly be added as tags to the MISP attributes.
  • When parsing indicators, you can ignore observables that are part of an "AND" composition to prevent unwanted results such as a ".exe" file extension attribute. These observables are part of a group and do not make sense on their own. It's best to ignore them until there's functionality within MISP for it.
  1. If no indicators exists (some simple feeds will not use them), you can fall back to parsing straight observables. Just loop through the observable map you created before.

Here's some sample code of my implementation (It was a rush job so there is no logging/error checking in most of it) Simply import the stix_to_misp.py file and call the build_event method to create a MISP event. I use my own taxonomy for confidence and TTPs, but those can be changed as needed:

stix_parser.py

from cybox.objects import email_message_object, file_object, address_object, socket_address_object
from cybox.objects import domain_name_object, hostname_object, uri_object


def build_observable_map(pkg):
    map = {}
    if pkg.observables:
        for o in pkg.observables.observables:
            try:
                map[o.id_] = o
            except AttributeError:
                continue
    return map


def build_ttp_map(pkg):
    map = {}
    if pkg.ttps:
        for t in pkg.ttps.ttps:
            try:
                map[t.id_] = t.title
            except AttributeError:
                continue
    return map


def process_indicators(pkg):
    observable_map = build_observable_map(pkg)
    ttp_map = build_ttp_map(pkg)
    indicators = []
    if pkg.indicators:
        for i in pkg.indicators:
            indicator = dict()
            indicator["itypes"] = get_indicator_types(i)
            indicator["confidence"] = get_confidence(i)
            indicator["ttps"] = get_ttps(i, ttp_map)
            indicator["attributes"] = get_indicator_attributes(i, observable_map)

            indicators.append(indicator)

    return indicators


def process_observables(pkg):
    observable_map = build_observable_map(pkg)
    if pkg.observables:
        return get_observable_attributes(pkg.observables.observables, observable_map)
    else:
        return []


def get_indicator_types(indicator):
    itypes = []
    for i in indicator.indicator_types:
        itypes.append(i.value)

    return itypes


def get_confidence(indicator):
    if indicator.confidence:
        return indicator.confidence.value.value
    else:
        return "Unknown"


def get_observable_attributes(observables, observable_map):
    attributes = []
    for o in observables:
        if hasattr(o, "idref") and o.idref:
            try:
                observable = observable_map[o.idref]
            except KeyError:
                continue

            extract_observable(observable, attributes)
        else:
            extract_observable(o, attributes)

    return attributes


def get_indicator_attributes(indicator, observable_map):
    observables = []
    if indicator.observable:
        observables += process_observable(indicator.observable)
    if indicator.observables:
        for o in indicator.observables:
            observables += process_observable(o)

        return get_observable_attributes(observables, observable_map)


def get_ttps(indicator, ttp_map):
    ttps = []
    if indicator.indicated_ttps:
        for ttp in indicator.indicated_ttps:
            if ttp.item.idref:
                try:
                    t = ttp_map[ttp.item.idref]
                except KeyError:
                    continue

                ttps.append(t)
            else:
                ttps.append(ttp.item.title)
    return ttps


def process_observable(observable):
    observables = []

    if observable.observable_composition:
        if observable.observable_composition.operator == "OR":
            for o in observable.observable_composition.observables:
                observables += process_observable(o)
    else:
        observables.append(observable)

    return observables


def add_attribute(obj, attributes):
    if not any(a["value"] == obj["value"] for a in attributes):
        attributes.append(obj)


# Dedicated to File object
def add_file_attr(obj, attributes):

    if obj.file_name:
        add_attribute({"type": "filename", "value": str(obj.file_name)}, attributes)

    if obj.md5:
        if len(obj.md5) == 32:
            add_attribute({"type": "md5", "value": str(obj.md5)}, attributes)

    if obj.sha1:
        if len(obj.sha1) == 40:
            add_attribute({"type": "sha1", "value": str(obj.sha1)}, attributes)

    if obj.sha256:
        if len(obj.sha256) == 64:
            add_attribute({"type": "sha256", "value": str(obj.sha256)}, attributes)

    if obj.sha512:
        if len(obj.sha512) == 128:
            add_attribute({"type": "sha512", "value": str(obj.sha512)}, attributes)


# Dedicated to Address Object (DB)
def add_addr_attr(obj, attributes):
    if obj.is_source:
        add_attribute({"type": "ip-src", "value": str(obj.address_value)}, attributes)

    elif obj.is_destination:
        add_attribute({"type": "ip-dst", "value": str(obj.address_value)}, attributes)

    else:
        # We don't know, first check if it's an IP range
        if hasattr(obj, "condition") and obj.condition:
            if obj.condition == "InclusiveBetween":
                add_attribute({"type": "ip-dst", "value": str(obj.address_value[0])}, attributes)
                add_attribute({"type": "ip-dst", "value": str(obj.address_value[1])}, attributes)
        else:
            add_attribute({"type": "ip-dst", "value": str(obj.address_value)}, attributes)


# Dedicated to EmailMessage (DB)
def add_email_attr(obj, attributes):
    if obj.header:
        # We have a header, can check for to/from etc etc
        if obj.header.from_:
            add_attribute({"type": "email-src", "value": str(obj.header.from_.address_value)}, attributes)
        if obj.header.to:
            for mail in obj.header.to:
                add_attribute({"type": "email-dst", "value": str(mail.address_value)}, attributes)
        if obj.header.subject:
            add_attribute({"type": "email-subject", "value": str(obj.header.subject)}, attributes)


# Dedicated to Domain name (DB)
def add_domain_attr(obj, attributes):
    add_attribute({"type": "domain", "value": str(obj.value)}, attributes)


# Dedicated to Hostname (DB)
def add_hostname_attr(obj, attributes):
    add_attribute({"type": "hostname", "value": str(obj.hostname_value)}, attributes)


# Dedicated to URI (DB)
def add_uri_attr(obj, attributes):
    add_attribute({"type": "url", "value": str(obj.value)}, attributes)


def extract_observable(observable, attributes):

    if hasattr(observable, "object_") and observable.object_:
        prop = observable.object_.properties

        if type(prop) == address_object.Address:
            # Now script uses buildAddressAttribute (DB)
            add_addr_attr(prop, attributes)

        elif type(prop) == domain_name_object.DomainName:
            # Now script uses buildDomainNameAttribute (DB)
            add_domain_attr(prop, attributes)

        elif type(prop) == hostname_object.Hostname:
            # Now script uses buildHostnameAttribute
            add_hostname_attr(prop, attributes)

        elif type(prop) == socket_address_object.SocketAddress:
            if prop.ip_address:
                add_addr_attr(prop.ip_address, attributes)
            if prop.hostname:
                add_hostname_attr(prop.hostname, attributes)

        elif type(prop) == uri_object.URI:
            # Now script uses buildURIAttribute (DB)
            add_uri_attr(prop, attributes)

        elif type(prop) == file_object.File:
            # Now script uses buildFileAttribute (DB)
            add_file_attr(prop, attributes)

        elif type(prop) == email_message_object.EmailMessage:
            # Now script uses buildEmailMessageAttribute (DB)
            add_email_attr(prop, attributes)
        else:
            pass
    else:
        pass

stix_to_misp.py

from stix_parser import process_indicators, process_observables
from stix.core import STIXPackage
from tempfile import SpooledTemporaryFile
import pymisp


def load_stix(stix):
    # Just save the pain and load it if the first character is a <

    if isinstance(stix, STIXPackage):
        # Oh cool we're ok
        # Who tried to load this? Honestly.
        return stix

    elif hasattr(stix, 'read'):
        try:
            stix_package = STIXPackage.from_xml(stix)
        except Exception as ex:
            print("Could not load file")
            return

        return stix_package

    elif isinstance(stix, str):
        # It's text, we'll need to use a temporary file
        f = SpooledTemporaryFile(max_size=10 * 1024)
        f.write(stix.encode("utf-8"))
        f.seek(0)

        return load_stix(f)


def build_event(content, **kwargs):
    pkg = load_stix(content)
    event = pymisp.MISPEvent()

    if pkg.stix_header and pkg.stix_header.title:
        event.info = pkg.stix_header.title
    else:
        event.info = "NO_TITLE"

    event.distribution = kwargs.get("distribution", 0)
    event.threat_level_id = kwargs.get("threat_level_id", 3)
    event.analysis = kwargs.get("analysis", 0)

    if pkg.indicators:
        indicators = process_indicators(pkg)
        for i in indicators:
            build_attribute(i, event)
    elif pkg.observables:
        indicator = dict()
        indicator["attributes"] = process_observables(pkg)
        build_attribute(indicator, event)
    else:
        return None

    unique_attr = []
    for attr in event.attributes:
        if not any(attr.value == u.value for u in unique_attr):
            unique_attr.append(attr)

    event.attributes = unique_attr

    return event


def build_attribute(indicator, event):

    tags = []

    if "confidence" in indicator and indicator["confidence"]:
        tags.append({"name": "Confidence:{}".format(indicator["confidence"])})
    else:
        tags.append({"name": "Confidence:Unknown"})
    if "ttps" in indicator and indicator["ttps"]:
        for ttp in indicator["ttps"]:
            tags.append({"name": "TTP:{}".format(ttp)})
    if "itypes" in indicator and indicator["itypes"]:
        for itype in indicator["itypes"]:
            tags.append({"name": "Detail:{}".format(itype)})
    if "attributes" in indicator and indicator["attributes"]:
        for attr in indicator["attributes"]:
            event.add_attribute(attr["type"], attr["value"], Tag=tags)
@alatif113
Copy link
Author

@FloatingGhost I added sample code but the formatting is not cooperating. Can you help me fix it please?

@iglocska
Copy link
Member

Sounds like some very sane ideas!

@FloatingGhost
Copy link
Member

o christ on a quadbike that's a lot of edited code

I might take a look when I'm feeling more masochistic than usual

@alatif113
Copy link
Author

@FloatingGhost lol most of it is just the parsing of the different observable types taken directly from the existing code.

@FloatingGhost
Copy link
Member

just

JUST

Nothing with STIX is ever "JUST"

It's always rooted to the hellish floor of the so-called standard and is never as simple as it seems. Inevitably there'll be a tendril of this eldritch horror that extends beyond the realm of human comprehension and into STIX world, in which it terminates in the STIXPackage of unimaginable terror

@iglocska
Copy link
Member

@alatif113, could you do a pull request for the changes? It does sound like a clean approach indeed.

@alatif113
Copy link
Author

@iglocska I'm not very git savvy. Don't really know how to do that.

@iglocska
Copy link
Member

Should be simple enough, simply fork the project by clicking the "Fork" button on top, this will create a copy of the repository under your user name on github, so https://github.com/alatif113/MISP-STIX-Converter

You already have the code-base sitting locally on your machine, which is what you have modified. Create a commit with all of your changes by simply doing the following:

git add /path/to/your/changed/file

Repeat it for all of the files that you have modified (to see a list of all files that you've changed just type git status from within the MISP-STIX-Converter directory)

Once you are done it's time to commit the changes:

git commit -m "My STIX parsing improvements"

Once done, add your own github repository as a remote

git remote add myfork https://github.com/alatif113/MISP-STIX-Converter.git

Then push your committed changes to your fork:

git push myfork master

Once this is done, just go to

https://github.com/alatif113/MISP-STIX-Converter

and open up a pull request by clicking the "New pull request" button (upper left side, next to the branch name)

This should be it!

@alatif113
Copy link
Author

alatif113 commented Jun 27, 2017

@iglocska Ahh I see what you mean. I didn't edit any files directly (I didnt need the MISP to STIX part), but rather created my own 2 files (in the code above) solely for parsing and importing STIX to MISP, heavily using code that already existed within the project to parse the actual observable types.

It's just a proof of concept and is missing trivial things such as error checking and logging.

@iglocska
Copy link
Member

Ah ok, I see. Any chance you could move that to MISP-STIX-Converter and integrate it directly? Or is it too different from how the converter works?

@alatif113
Copy link
Author

alatif113 commented Jun 27, 2017

@iglocska I think replacing the buildEvent function within the buildMISPAttribute file with the build_event function within the stix_to_misp.py file above should do it, barring the fact there would now be a lot of unused old functions.

There would also need to be some agreement on the taxonomy for tags. I use Confidence:<value> and TTP:<value> for mine, but I don't know if there is already some standard that exists out there.

Unfortunately I don't have the time to actually go through and do that (not at the moment at least). Just wanted to bring something to the dev's attention with sample code I utilized for my use case.

@iglocska
Copy link
Member

Thanks a lot for the input, we'll keep this issue open until we can get around to implementing it. It indeed looks very promising!

@alatif113
Copy link
Author

@iglocska No problem! Got the idea from how many of the SIEMs and commercial threat platforms parse STIX files.

@FloatingGhost
Copy link
Member

Well if they parsed MISP everyone's life would be a lot easier :<

STIX needs to die.

@iglocska
Copy link
Member

It looks like it's here to stay though, so we should make sure that the parser makes as much sense as possible - we'll definitely take a look at this too at some point, @FloatingGhost, to preserve some of your sanity ;))

@MISP MISP deleted a comment from safianouyacouba Oct 21, 2017
@ag-michael
Copy link

Any updates on this? @iglocska , it seems you approve of the general idea and there is demand for this feature. Any chance of accepting PR's related to this soon?

@ag-michael
Copy link

@alatif113 I have a Pending PR #40 that addresses some of what you're wanting, care to take a look and comment? This is an important subject for me as well.

@adulau
Copy link
Member

adulau commented Jul 30, 2020

So we don't really maintain this as there is a full-blown STIX 1.x and 2.x import/export in MISP.

As the original maintainer is not maintaining this external package anymore, I can merge those. Just let me know if it works for you and I'll merge it.

@ag-michael
Copy link

ag-michael commented Jul 30, 2020

@adulau it would be great if you can merge it. But I didn't know about the MISP feature for STIX import, do you know why MISP-Taxii-Server isn't using that? That's the only way to feed MISP with TAXII that I've found, can you point me in the direction of the docs that show how to import STIX directly into MISP

The STIX section here: https://pymisp.readthedocs.io/en/latest/tools.html uses pymisp.tools which in turn uses this project.

Edit:

I figured out the upload_stix() api, it works, but it doesn't parse confidence,title,information source,etc... I don't want to waste any effort, so can you tell me if MISP-Taxii-Server is maintained (have a pending PR there too), and if it is, I'd like to create a PR/FR to have additional metadata parsed by MISP, but in the meanwhile, it would be great if you can review the existing PR for this project.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

5 participants