diff --git a/.gitignore b/.gitignore index ec531d4..dcc06f8 100644 --- a/.gitignore +++ b/.gitignore @@ -4,9 +4,15 @@ *~ *.swp +# MyPy +.mypy_cache/ + # PyCharm folder /.idea/ +# Log file +mans_to_es.log + # Generic auto-generated build files *.pyc *.pyo diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..80f646f --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,20 @@ +# Changelog + +## [1.5] 2020-05-19 +- Little refactoring to improve usage as imported library +- Add threat info to alerts if present +- Aligned dependencies to Timesketch requirements +- Added support for pip > 20 build + +## [1.4] 2019-10-03 +- Support for extracting multiple field as comment +- Keep all meta by default +- Check if elastic is up @deralexxx + +## [1.3] 2019-07-29 +- Added process-api to processed items +- Skip if not explicitly selected +- Timestamp parsing improvment + +## [1.0] 2019-07-24 +- First working release \ No newline at end of file diff --git a/README.md b/README.md index c9faa84..f297227 100644 --- a/README.md +++ b/README.md @@ -28,7 +28,7 @@ pip install mans-to-es If you want to develop with the script you can download and place it under /usr/local/bin and make it executable. -#### Usage +#### Usage as script ``` $ mans_to_es.py --help @@ -53,6 +53,14 @@ optional arguments: ``` +#### Usage as lib + +``` +>>> from mans_to_es import MansToEs +>>> a = MansToEs(filename = '', index="", name="", es_host="localhost", es_port=9200) +>>> a.run() +``` + ## Contributing **If you want to contribute to mans_to_es, be sure to review the [contributing guidelines](CONTRIBUTING.md). This project adheres to mans_to_es diff --git a/mans_to_es/__init__.py b/mans_to_es/__init__.py index e69de29..527e59e 100644 --- a/mans_to_es/__init__.py +++ b/mans_to_es/__init__.py @@ -0,0 +1,3 @@ +from mans_to_es.mans_to_es import MansToEs + +__all__ = [MansToEs] diff --git a/mans_to_es/mans_to_es.py b/mans_to_es/mans_to_es.py index 829d12c..8ed0f52 100644 --- a/mans_to_es/mans_to_es.py +++ b/mans_to_es/mans_to_es.py @@ -11,15 +11,16 @@ import datetime import ciso8601 -import xmltodict - -import pandas as pd +import xmltodict # type: ignore +import pandas as pd # type: ignore from multiprocessing import Pool, cpu_count -import elasticsearch +import elasticsearch # type: ignore from elasticsearch import helpers, Elasticsearch +from typing import Tuple, Union, BinaryIO, TextIO, Dict, List, Mapping, Any + from glob import glob # hide ES log @@ -32,7 +33,7 @@ FORMAT = "%(asctime)-15s %(message)s" logging.basicConfig(filename="mans_to_es.log", level=logging.DEBUG, format=FORMAT) -MANS_FIELDS = { +MANS_FIELDS: Dict[str, Any] = { "persistence": { "key": "PersistenceItem", "datefields": [ @@ -163,7 +164,7 @@ } -def convert_both_pandas(argument, offset=0): +def convert_both_pandas(argument: str, offset=0) -> pd.Series: """ convert_both_pandas: parse date field and convert to it to proper in: @@ -182,7 +183,7 @@ def convert_both_pandas(argument, offset=0): return pd.Series([None, None]) -def convert_both(argument, offset=0): +def convert_both(argument: str, offset=0) -> Union[Tuple[str, str], Tuple[None, None]]: """ convert_both: parse date field and convert to it to proper in: @@ -199,7 +200,7 @@ def convert_both(argument, offset=0): return None, None -def convert_skew(offset): +def convert_skew(offset: str) -> int: """ convert_skew: return offset for xml file in seconds in: @@ -215,33 +216,42 @@ def convert_skew(offset): class MansToEs: - def __init__(self, args): - self.filename = args.filename - self.index = args.index - self.name = args.name - self.bulk_size = args.bulk_size - self.cpu_count = args.cpu_count + def __init__( + self, + filename: str, + index: str, + name: str, + es_host: str, + es_port: int, + bulk_size: int = 1000, + cpu_count: int = cpu_count() - 1, + ): + self.filename = filename + self.index = index + self.name = name + self.bulk_size = bulk_size + self.cpu_count = cpu_count self.folder_path = self.filename + "__tmp" self.offset_stateagentinspector = None - self.es_info = {"host": args.es_host, "port": args.es_port} - self.upload_parts = [] - self.filelist = {} - self.ioc_alerts = [] - self.exd_alerts = [] - self.generic_items = {} + self.es_info = {"host": es_host, "port": es_port} + self.filelist: Dict[str, Tuple[str, int]] = {} + self.ioc_alerts: Dict[str, Any] = {} + self.exd_alerts: List[Mapping[str, str]] = [] + self.generic_items: Dict[str, Any] = {} es = Elasticsearch([self.es_info]) if not es.ping(): raise ValueError("Connection failed") - logging.debug(f"[MAIN] Start parsing {args.filename}.") - logging.debug(f"[MAIN] Pushing on {args.name} index and {args.index} timeline") + logging.debug(f"[MAIN] Start parsing {self.filename}.") + logging.debug(f"[MAIN] Pushing on {self.name} index and {self.index} timeline") - def handle_stateagentinspector(self, path, item_detail): + def handle_stateagentinspector(self, path, item_detail) -> bool: """ handle_item: streaming function for xmltodict (stateagentitem) In: path: xml item path + item_detail: xml item data """ item = {} uid = path[1][1]["uid"] @@ -269,10 +279,13 @@ def handle_stateagentinspector(self, path, item_detail): .get(item_detail["eventType"], {}) .get("hits_key", None), ) + if self.ioc_alerts[uid]: + item["threat_info"] = self.ioc_alerts[uid] + self.generic_items.setdefault(path[1][0], []).append(item) return True - def handle_item(self, path, item_detail): + def handle_item(self, path, item_detail) -> bool: """ handle_item: streaming function for xmltodict In: @@ -283,7 +296,13 @@ def handle_item(self, path, item_detail): self.generic_items.setdefault(path[1][0], []).append(item_detail) return True - def generate_df(self, file, offset, filetype, stateagentinspector=False): + def generate_df( + self, + file: TextIO, + offset: int, + filetype: str, + stateagentinspector: bool = False, + ) -> Tuple[pd.DataFrame, bool]: """ Generate dataframe from xml file """ @@ -311,6 +330,9 @@ def extract_mans(self): logging.debug(f"[MAIN] Unzip file in {self.folder_path} [✔]") def delete_temp_folder(self): + """ + Delete temporary folder + """ try: shutil.rmtree(self.folder_path) logging.debug("[MAIN] temporary folder deleted [✔]") @@ -339,8 +361,16 @@ def parse_manifest(self): def parse_hits(self): """ - Get hit and alert from hits.json file + Get hit and alert from hits.json file, threat info from threats.json """ + threats_info = {} + if not os.path.exists(os.path.join(self.folder_path, "threats.json")): + logging.debug("[MAIN] Parsing threats.json [missing]") + else: + with open(os.path.join(self.folder_path, "threats.json"), "r") as f: + for x in json.load(f): + threats_info[x.get("uri_name")] = x + if not os.path.exists(os.path.join(self.folder_path, "hits.json")): logging.debug("[MAIN] Parsing Hits.json [missing]") else: @@ -348,8 +378,12 @@ def parse_hits(self): for x in json.load(f): if x.get("data", {}).get("key", None): event_id = str(x["data"]["key"]["event_id"]) - if event_id not in self.ioc_alerts: - self.ioc_alerts.append(event_id) + threat_id = x.get("threat_id", None) + if event_id not in self.ioc_alerts.keys(): + self.ioc_alerts[event_id] = threats_info.get( + threat_id, None + ) + elif x.get("data", {}).get("documents", None) or x.get( "data", {} ).get("analysis_details", None): @@ -373,10 +407,11 @@ def parse_hits(self): ), } ) + if len(self.exd_alerts) > 0: es = Elasticsearch([self.es_info]) helpers.bulk( - es, self.exd_alerts, index=self.index, doc_type="generic_event" + es, self.exd_alerts, index=self.index, # doc_type="generic_event" ) logging.debug( "[MAIN] Parsing Hits.json - %d alerts found [✔]" @@ -408,7 +443,7 @@ def process(self): res = pool.starmap_async(self.process_file, files_list).get() logging.debug("[MAIN] Pre-Processing [✔]") - def process_file(self, filetype, file, offset): + def process_file(self, filetype: str, file: str, offset: int): """ process_file: parse xml to df and clean it In: @@ -541,7 +576,7 @@ def to_elastic(self): es, elk_items, index=self.index, - doc_type="generic_event", + # doc_type="generic_event", chunk_size=self.bulk_size, request_timeout=60, ), @@ -549,6 +584,17 @@ def to_elastic(self): ) logging.debug("[MAIN] Parallel elastic push [✔]") + def run(self): + """ + Main process + """ + self.extract_mans() + self.parse_manifest() + self.parse_hits() + self.process() + self.to_elastic() + self.delete_temp_folder() + def main(): parser = argparse.ArgumentParser( @@ -578,7 +624,7 @@ def main(): ) parser.add_argument( - "--version", dest="version", action="version", version="%(prog)s 1.4" + "--version", dest="version", action="version", version="%(prog)s 1.5" ) args = parser.parse_args() @@ -586,13 +632,16 @@ def main(): parser.print_usage() else: try: - mte = MansToEs(args) - mte.extract_mans() - mte.parse_manifest() - mte.parse_hits() - mte.process() - mte.to_elastic() - mte.delete_temp_folder() + mte = MansToEs( + args.filename, + args.index, + args.name, + args.es_host, + args.es_port, + args.bulk_size, + args.cpu_count, + ) + mte.run() logging.debug("[MAIN] Operation Completed [✔✔✔]") except: logging.exception("Error parsing .mans") diff --git a/requirements.txt b/requirements.txt index d5fb755..d0a38c0 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,10 +1,10 @@ certifi>=2017.7.27.1 -elasticsearch==7.0.2 -numpy==1.16.4 -pandas==0.25.0 -python-dateutil==2.8.0 -pytz==2019.1 +elasticsearch>=7.5.1 +numpy>=1.16.6 +pandas>=0.24.2 +python-dateutil==2.8.1 +pytz==2020.1 six==1.12.0 -urllib3==1.25.3 +urllib3==1.25.9 xmltodict==0.12.0 -ciso8601==2.1.1 +ciso8601>=2.1.1 diff --git a/setup.py b/setup.py index 45e1e23..eb3d763 100644 --- a/setup.py +++ b/setup.py @@ -1,35 +1,56 @@ #!/usr/bin/env python3 import setuptools import os +import pip -try: # for pip >= 10 +pip_major_version = int(pip.__version__.split(".")[0]) +pip_minor_version = int(pip.__version__.split(".")[1]) +if pip_major_version >= 20: # for pip >= 20 + from pip._internal.req import parse_requirements + from pip._internal.network.session import PipSession +elif pip_major_version >= 10: from pip._internal.download import PipSession from pip._internal.req import parse_requirements -except ImportError: # for pip <= 9.0.3 +else: # for pip <= 9.0.3 from pip.download import PipSession from pip.req import parse_requirements - + with open("README.md", "r") as fh: long_description = fh.read() +if (pip_major_version == 20 and pip_minor_version >= 1) or pip_major_version > 20: + install_requires = [ + str(req.requirement) + for req in parse_requirements("requirements.txt", session=PipSession(),) + ] +else: + try: + install_requires = [ + str(req.req) + for req in parse_requirements("requirements.txt", session=PipSession(),) + ] + except: + install_requires = [ + str(req) + for req in parse_requirements("requirements.txt", session=PipSession(),) + ] + setuptools.setup( name="mans_to_es", - version="1.4", + version="1.5", author="LDO-CERT", author_email="gcert@leonardocompany.com", description="Send .mans to ElasticSearch", long_description=long_description, long_description_content_type="text/markdown", - license='Apache License, Version 2.0', + license="Apache License, Version 2.0", url="https://github.com/LDO-CERT/mans_to_es", packages=setuptools.find_packages(), classifiers=[ "Programming Language :: Python :: 3", "Operating System :: OS Independent", ], - entry_points={'console_scripts': ['mans_to_es=mans_to_es.mans_to_es:main']}, - install_requires=[str(req.req) for req in parse_requirements( - 'requirements.txt', session=PipSession(), - )], -) \ No newline at end of file + entry_points={"console_scripts": ["mans_to_es=mans_to_es.mans_to_es:main"]}, + install_requires=install_requires, +)