Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

converted elf_analysis plugin to new base class #1266

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/helperFunctions/hash.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def get_tlsh_comparison(first, second):
return tlsh.diff(first, second)


def normalize_lief_items(functions):
def normalize_lief_items(functions) -> list[str]:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couldn't we remove this function completely?
Why is this in helperFunctions.hash anyways?

"""
Shorthand to convert a list of objects to a list of strings
"""
Expand Down
343 changes: 209 additions & 134 deletions src/plugins/analysis/elf_analysis/code/elf_analysis.py
Original file line number Diff line number Diff line change
@@ -1,161 +1,236 @@
from __future__ import annotations

import json
import logging
import re
import string
from difflib import SequenceMatcher
from pathlib import Path
from typing import TYPE_CHECKING, Iterable, List, Optional

import lief
from lief.ELF import Section
from pydantic import BaseModel
from semver import Version

from analysis.PluginBase import AnalysisBasePlugin
from analysis.plugin import AnalysisPluginV0, Tag
from analysis.plugin.compat import AnalysisBasePluginAdapterMixin
from helperFunctions.hash import normalize_lief_items
from helperFunctions.tag import TagColor

LIEF_DATA_ENTRIES = (
'dynamic_entries',
'exported_functions',
'header',
'imported_functions',
'libraries',
'sections',
'segments',
'symbols_version',
)
FUNCTION_MATCHING_THRESHOLD = 0.85

if TYPE_CHECKING:
from io import FileIO

TEMPLATE_FILE_PATH = Path(__file__).parent.parent / 'internal/matching_template.json'
BEHAVIOUR_CLASSES = json.loads(TEMPLATE_FILE_PATH.read_text())
PRINTABLE_BYTES = set(string.printable.encode())
ELF_SEGMENT_FLAGS = {
'execute': 0x1, # executable
'write': 0x2, # writable
'read': 0x4, # readable
}


class ElfHeader(BaseModel):
entrypoint: int
file_type: str
header_size: int
identity_abi_version: int
identity_class: str
identity_data: str
identity_os_abi: str
identity_version: str
machine_type: str
numberof_sections: int
object_file_version: str
processor_flag: int
processornumberof_segments_flag: int
program_header_size: int
program_headers_offset: int
section_header_size: int
section_headers_offset: int
section_name_table_idx: int


class ElfSection(BaseModel):
flags: List[str]
name: str
size: int
type: str
offset: int
virtual_address: int


class ElfSegment(BaseModel):
file_offset: int
flags: List[str]
physical_address: int
physical_size: int
type: str
virtual_address: int
virtual_size: int


class DynamicEntry(BaseModel):
tag: str
value: int
library: Optional[str] = None
flags: Optional[List[str]] = None


class ElfSymbol(BaseModel):
name: str
offset: int


class InfoSectionData(BaseModel):
name: str
contents: str


class AnalysisPlugin(AnalysisPluginV0, AnalysisBasePluginAdapterMixin):
class Schema(BaseModel):
header: ElfHeader
sections: List[ElfSection]
segments: List[ElfSegment]
dynamic_entries: List[DynamicEntry]
exported_functions: List[ElfSymbol]
imported_functions: List[str]
mod_info: Optional[List[str]]
note_sections: List[InfoSectionData]
behavior_classes: List[str]
Comment on lines +93 to +101
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we document these?
Imo, we should at least document the non-standart ones like mod_info and behavior_classes.


def __init__(self):
metadata = self.MetaData(
name='elf_analysis',
description='Analyzes and tags ELF executables and libraries',
version=Version(1, 0, 0),
Schema=self.Schema,
mime_whitelist=[
'application/x-executable',
'application/x-pie-executable',
'application/x-object',
'application/x-sharedlib',
],
)
super().__init__(metadata=metadata)

def analyze(self, file_handle: FileIO, virtual_file_path: str, analyses: dict) -> Schema:
del virtual_file_path, analyses
elf = lief.parse(file_handle.name)
json_dict = json.loads(lief.to_json(elf))
# for whatever reason, the machine types are all in caps in the new version of lief
json_dict['header']['machine_type'] = json_dict['header']['machine_type'].lower()
_convert_flags(json_dict)
return self.Schema(
header=ElfHeader.model_validate(json_dict['header']),
exported_functions=[ElfSymbol(name=f.name, offset=f.address) for f in elf.exported_functions],
imported_functions=[f.name for f in elf.imported_functions],
sections=[ElfSection.model_validate(s) for s in json_dict['sections']],
segments=[ElfSegment.model_validate(s) for s in json_dict['segments']],
dynamic_entries=[DynamicEntry.model_validate(e) for e in json_dict['dynamic_entries']],
Comment on lines +129 to +131
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Imo the code gets more readable if we do not serialize the lief elf representation to json.
E.g. like the exported_functions above.

note_sections=[c for c in _get_note_sections_content(elf) if c],
mod_info=_get_modinfo(elf),
behavior_classes=_get_behavior_classes(elf),
)

def summarize(self, result: Schema) -> list[str]:
keys = ['sections', 'dynamic_entries', 'exported_functions', 'imported_functions', 'note_sections', 'mod_info']
return [k for k, v in result.model_dump().items() if k in keys and v]

def get_tags(self, result: Schema, summary: list[str]) -> list[Tag]:
del summary
tags = []
for behaviour_class in result.behavior_classes:
tags.append(
Tag(
name=behaviour_class,
value=behaviour_class,
color=self._get_color_codes(behaviour_class),
propagate=False,
)
)
return tags

@staticmethod
def _get_color_codes(behavior_class: str) -> str:
if behavior_class == 'crypto':
return TagColor.RED
if behavior_class == 'file_system':
return TagColor.BLUE
if behavior_class == 'network':
return TagColor.ORANGE
if behavior_class == 'memory_operations':
return TagColor.GREEN
if behavior_class == 'randomize':
return TagColor.LIGHT_BLUE
return TagColor.GRAY

class AnalysisPlugin(AnalysisBasePlugin):
NAME = 'elf_analysis'
DESCRIPTION = 'Analyzes and tags ELF executables and libraries'
VERSION = '0.3.4'
MIME_WHITELIST = [ # noqa: RUF012
'application/x-executable',
'application/x-pie-executable',
'application/x-object',
'application/x-sharedlib',
]
FILE = __file__

def process_object(self, file_object):
try:
elf_dict, parsed_binary = self._analyze_elf(file_object)
file_object.processed_analysis[self.NAME] = {'Output': elf_dict}
self.create_tags(parsed_binary, file_object)
file_object.processed_analysis[self.NAME]['summary'] = list(elf_dict.keys())
except (RuntimeError, ValueError):
logging.error(f'lief could not parse {file_object.uid}', exc_info=True)
file_object.processed_analysis[self.NAME] = {'failed': 'lief could not parse the file'}
return file_object

@staticmethod
def _get_tags_from_library_list(libraries: list, behaviour_class: str, indicators: list, tags: list):
for library, indicator in ((lib, ind) for lib in libraries for ind in indicators):
if re.search(indicator, library):
tags.append(behaviour_class)
def _get_behavior_classes(elf: lief.ELF) -> list[str]:
libraries = _get_symbols_version_entries(normalize_lief_items(elf.symbols_version))
libraries.extend(normalize_lief_items(elf.libraries))
functions = _get_relevant_imp_functions(normalize_lief_items(elf.imported_functions))

@staticmethod
def _get_tags_from_function_list(functions: list, behaviour_class: str, indicators: list, tags: list):
for function, indicator in ((f, i) for f in functions for i in indicators):
behaviour_classes = []
for behaviour_class in BEHAVIOUR_CLASSES:
indicators = BEHAVIOUR_CLASSES[behaviour_class]
if _behaviour_class_applies(functions, libraries, indicators):
behaviour_classes.append(behaviour_class)
return behaviour_classes


def _get_relevant_imp_functions(imp_functions: list[str]) -> list[str]:
return [f for f in imp_functions if not f.startswith('__')]


def _get_symbols_version_entries(symbol_versions: list[str]) -> list[str]:
imported_libs = []
for sv in symbol_versions:
if str(sv) != '* Local *' and str(sv) != '* Global *':
imported_libs.append(str(sv).split('(', maxsplit=1)[0])
return list(set(imported_libs))


def _behaviour_class_applies(functions: list[str], libraries: list[str], indicators: list[str]) -> bool:
for function in functions:
for indicator in indicators:
if (
indicator.lower() in function.lower() and SequenceMatcher(None, indicator, function).ratio() >= 0.85 # noqa: PLR2004
indicator.lower() in function.lower()
and SequenceMatcher(None, indicator, function).ratio() >= FUNCTION_MATCHING_THRESHOLD
):
tags.append(behaviour_class)
return True
for library in libraries:
for indicator in indicators:
if indicator.lower() in library.lower():
return True
return False

def _get_tags(self, libraries: list, functions: list) -> list:
tags = []
for behaviour_class in BEHAVIOUR_CLASSES:
if behaviour_class not in tags:
behaviour_indicators = BEHAVIOUR_CLASSES[behaviour_class]
self._get_tags_from_function_list(functions, behaviour_class, behaviour_indicators, tags)
self._get_tags_from_library_list(libraries, behaviour_class, behaviour_indicators, tags)
return list(set(tags))

@staticmethod
def _get_symbols_version_entries(symbol_versions):
imported_libs = []
for sv in symbol_versions:
if str(sv) != '* Local *' and str(sv) != '* Global *':
imported_libs.append(str(sv).split('(', maxsplit=1)[0])
return list(set(imported_libs))
def _get_modinfo(elf: lief.ELF) -> list[str] | None:
# getting the information from the *.ko files .modinfo section
modinfo = None
for section in elf.sections:
if section.name == '.modinfo':
modinfo = section.content.tobytes()
modinfo = [entry.decode() for entry in modinfo.split(b'\x00') if entry]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we handle decoding errors here?

break
return modinfo

@staticmethod
def _get_relevant_imp_functions(imp_functions):
imp_functions[:] = [x for x in imp_functions if not x.startswith('__')]
return imp_functions

@staticmethod
def _get_color_codes(tag):
if tag == 'crypto':
return TagColor.RED
if tag == 'file_system':
return TagColor.BLUE
if tag == 'network':
return TagColor.ORANGE
if tag == 'memory_operations':
return TagColor.GREEN
if tag == 'randomize':
return TagColor.LIGHT_BLUE
return TagColor.GRAY
def _convert_flags(json_dict: dict):
# convert numerical flags to "human-readable" list of strings
for section in json_dict['segments']:
section['flags'] = _get_active_flags(section['flags'], ELF_SEGMENT_FLAGS)

def create_tags(self, parsed_bin, file_object):
all_libs = self._get_symbols_version_entries(normalize_lief_items(parsed_bin.symbols_version))
all_libs.extend(normalize_lief_items(parsed_bin.libraries))
all_funcs = self._get_relevant_imp_functions(normalize_lief_items(parsed_bin.imported_functions))
for entry in self._get_tags(all_libs, all_funcs):
self.add_analysis_tag(
file_object=file_object,
tag_name=entry,
value=entry,
color=self._get_color_codes(entry),
propagate=False,
)

@staticmethod
def get_final_analysis_dict(binary_json_dict, elf_dict):
for key in binary_json_dict:
if key in LIEF_DATA_ENTRIES and binary_json_dict[key]:
elf_dict[key] = binary_json_dict[key]

def _analyze_elf(self, file_object):
elf_dict = {}
try:
parsed_binary = lief.parse(file_object.file_path)
binary_json_dict = json.loads(lief.to_json(parsed_binary))
if parsed_binary.exported_functions:
binary_json_dict['exported_functions'] = normalize_lief_items(parsed_binary.exported_functions)
if parsed_binary.imported_functions:
binary_json_dict['imported_functions'] = normalize_lief_items(parsed_binary.imported_functions)
if parsed_binary.libraries:
binary_json_dict['libraries'] = normalize_lief_items(parsed_binary.libraries)
modinfo_data = self.filter_modinfo(parsed_binary)
if modinfo_data:
elf_dict['modinfo'] = modinfo_data

except (AttributeError, TypeError):
logging.error(f'Bad file for lief/elf analysis {file_object.uid}.', exc_info=True)
return elf_dict

self.get_final_analysis_dict(binary_json_dict, elf_dict)
self._convert_address_values_to_hex(elf_dict)

return elf_dict, parsed_binary
def _get_active_flags(flags_value: int, flag_dict: dict[str, int]) -> list[str]:
# get active flags from flags_value as list of strings
return [flag_name for flag_name, flag_mask in flag_dict.items() if flags_value & flag_mask]

@staticmethod
def _convert_address_values_to_hex(elf_dict):
for category in {'sections', 'segments'}.intersection(elf_dict):
for entry in elf_dict[category]:
for key in {'virtual_address', 'offset'}.intersection(entry):
entry[key] = hex(entry[key])

@staticmethod
def filter_modinfo(binary) -> list[str] | None:
# getting the information from the *.ko files .modinfo section
modinfo = None
for section in binary.sections:
if section.name == '.modinfo':
modinfo = bytes(section.content).decode()
modinfo = [entry for entry in modinfo.split('\x00') if entry]
break
return modinfo
def _get_note_sections_content(elf: lief.ELF) -> Iterable[InfoSectionData]:
for section in elf.sections: # type: Section
if section.type == Section.TYPE.NOTE:
readable_content = bytes([c for c in section.content.tobytes() if c in PRINTABLE_BYTES])
yield InfoSectionData(name=section.name, contents=readable_content.decode())
Loading
Loading