Skip to content

Commit

Permalink
feat: converted elf_analysis plugin to new base class
Browse files Browse the repository at this point in the history
  • Loading branch information
jstucke committed Sep 11, 2024
1 parent a928cdb commit c14b112
Show file tree
Hide file tree
Showing 4 changed files with 314 additions and 261 deletions.
2 changes: 1 addition & 1 deletion src/helperFunctions/hash.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def _is_elf_file(file_object: FileObject) -> bool:
return file_object.processed_analysis['file_type']['result']['mime'] in ELF_MIME_TYPES


def normalize_lief_items(functions):
def normalize_lief_items(functions) -> list[str]:
"""
Shorthand to convert a list of objects to a list of strings
"""
Expand Down
340 changes: 206 additions & 134 deletions src/plugins/analysis/elf_analysis/code/elf_analysis.py
Original file line number Diff line number Diff line change
@@ -1,161 +1,233 @@
from __future__ import annotations

import json
import logging
import re
import string
from difflib import SequenceMatcher
from pathlib import Path
from typing import TYPE_CHECKING, Iterable, List, Optional

import lief
from pydantic import BaseModel
from semver import Version

from analysis.PluginBase import AnalysisBasePlugin
from analysis.plugin import AnalysisPluginV0, Tag
from analysis.plugin.compat import AnalysisBasePluginAdapterMixin
from helperFunctions.hash import normalize_lief_items
from helperFunctions.tag import TagColor

LIEF_DATA_ENTRIES = (
'dynamic_entries',
'exported_functions',
'header',
'imported_functions',
'libraries',
'sections',
'segments',
'symbols_version',
)
FUNCTION_MATCHING_THRESHOLD = 0.85

if TYPE_CHECKING:
from io import FileIO

TEMPLATE_FILE_PATH = Path(__file__).parent.parent / 'internal/matching_template.json'
BEHAVIOUR_CLASSES = json.loads(TEMPLATE_FILE_PATH.read_text())
PRINTABLE_BYTES = set(string.printable.encode())
ELF_SEGMENT_FLAGS = {
'execute': 0x1, # executable
'write': 0x2, # writable
'read': 0x4, # readable
}


class ElfHeader(BaseModel):
entrypoint: int
file_type: str
header_size: int
identity_abi_version: int
identity_class: str
identity_data: str
identity_os_abi: str
identity_version: str
machine_type: str
numberof_sections: int
object_file_version: str
processor_flag: int
processornumberof_segments_flag: int
program_header_size: int
program_headers_offset: int
section_header_size: int
section_headers_offset: int
section_name_table_idx: int


class ElfSection(BaseModel):
flags: List[str]
name: str
size: int
type: str
offset: int
virtual_address: int


class ElfSegment(BaseModel):
file_offset: int
flags: List[str]
physical_address: int
physical_size: int
type: str
virtual_address: int
virtual_size: int


class DynamicEntry(BaseModel):
tag: str
value: int
library: Optional[str] = None
flags: Optional[List[str]] = None


class ElfSymbol(BaseModel):
name: str
offset: int


class InfoSectionData(BaseModel):
name: str
contents: str


class AnalysisPlugin(AnalysisPluginV0, AnalysisBasePluginAdapterMixin):
class Schema(BaseModel):
header: ElfHeader
sections: List[ElfSection]
segments: List[ElfSegment]
dynamic_entries: List[DynamicEntry]
exported_functions: List[ElfSymbol]
imported_functions: List[str]
mod_info: Optional[List[str]]
note_sections: List[InfoSectionData]
behavior_classes: List[str]

def __init__(self):
metadata = self.MetaData(
name='elf_analysis',
description='Analyzes and tags ELF executables and libraries',
version=Version(1, 0, 0),
Schema=self.Schema,
mime_whitelist=[
'application/x-executable',
'application/x-pie-executable',
'application/x-object',
'application/x-sharedlib',
],
)
super().__init__(metadata=metadata)

def analyze(self, file_handle: FileIO, virtual_file_path: str, analyses: dict) -> Schema:
del virtual_file_path, analyses
elf = lief.parse(file_handle.name)
json_dict = json.loads(lief.to_json(elf))
_convert_flags(json_dict)
return self.Schema(
header=ElfHeader.model_validate(json_dict['header']),
exported_functions=[ElfSymbol(name=f.name, offset=f.address) for f in elf.exported_functions],
imported_functions=[f.name for f in elf.imported_functions],
sections=[ElfSection.model_validate(s) for s in json_dict['sections']],
segments=[ElfSegment.model_validate(s) for s in json_dict['segments']],
dynamic_entries=[DynamicEntry.model_validate(e) for e in json_dict['dynamic_entries']],
note_sections=[c for c in _get_note_sections_content(elf) if c],
mod_info=_get_modinfo(elf),
behavior_classes=_get_behavior_classes(elf),
)

def summarize(self, result: Schema) -> list[str]:
keys = ['sections', 'dynamic_entries', 'exported_functions', 'imported_functions', 'note_sections', 'mod_info']
return [k for k, v in result.model_dump().items() if k in keys and v]

def get_tags(self, result: Schema, summary: list[str]) -> list[Tag]:
del summary
tags = []
for behaviour_class in result.behavior_classes:
tags.append(
Tag(
name=behaviour_class,
value=behaviour_class,
color=self._get_color_codes(behaviour_class),
propagate=False,
)
)
return tags

@staticmethod
def _get_color_codes(behavior_class: str) -> str:
if behavior_class == 'crypto':
return TagColor.RED
if behavior_class == 'file_system':
return TagColor.BLUE
if behavior_class == 'network':
return TagColor.ORANGE
if behavior_class == 'memory_operations':
return TagColor.GREEN
if behavior_class == 'randomize':
return TagColor.LIGHT_BLUE
return TagColor.GRAY

class AnalysisPlugin(AnalysisBasePlugin):
NAME = 'elf_analysis'
DESCRIPTION = 'Analyzes and tags ELF executables and libraries'
VERSION = '0.3.4'
MIME_WHITELIST = [ # noqa: RUF012
'application/x-executable',
'application/x-pie-executable',
'application/x-object',
'application/x-sharedlib',
]
FILE = __file__

def process_object(self, file_object):
try:
elf_dict, parsed_binary = self._analyze_elf(file_object)
file_object.processed_analysis[self.NAME] = {'Output': elf_dict}
self.create_tags(parsed_binary, file_object)
file_object.processed_analysis[self.NAME]['summary'] = list(elf_dict.keys())
except (RuntimeError, ValueError):
logging.error(f'lief could not parse {file_object.uid}', exc_info=True)
file_object.processed_analysis[self.NAME] = {'failed': 'lief could not parse the file'}
return file_object

@staticmethod
def _get_tags_from_library_list(libraries: list, behaviour_class: str, indicators: list, tags: list):
for library, indicator in ((lib, ind) for lib in libraries for ind in indicators):
if re.search(indicator, library):
tags.append(behaviour_class)
def _get_behavior_classes(elf: lief.ELF) -> list[str]:
libraries = _get_symbols_version_entries(normalize_lief_items(elf.symbols_version))
libraries.extend(normalize_lief_items(elf.libraries))
functions = _get_relevant_imp_functions(normalize_lief_items(elf.imported_functions))

@staticmethod
def _get_tags_from_function_list(functions: list, behaviour_class: str, indicators: list, tags: list):
for function, indicator in ((f, i) for f in functions for i in indicators):
behaviour_classes = []
for behaviour_class in BEHAVIOUR_CLASSES:
indicators = BEHAVIOUR_CLASSES[behaviour_class]
if _behaviour_class_applies(functions, libraries, indicators):
behaviour_classes.append(behaviour_class)
return behaviour_classes


def _get_relevant_imp_functions(imp_functions: list[str]) -> list[str]:
return [f for f in imp_functions if not f.startswith('__')]


def _get_symbols_version_entries(symbol_versions: list[str]) -> list[str]:
imported_libs = []
for sv in symbol_versions:
if str(sv) != '* Local *' and str(sv) != '* Global *':
imported_libs.append(str(sv).split('(', maxsplit=1)[0])
return list(set(imported_libs))


def _behaviour_class_applies(functions: list[str], libraries: list[str], indicators: list[str]) -> bool:
for function in functions:
for indicator in indicators:
if (
indicator.lower() in function.lower() and SequenceMatcher(None, indicator, function).ratio() >= 0.85 # noqa: PLR2004
indicator.lower() in function.lower()
and SequenceMatcher(None, indicator, function).ratio() >= FUNCTION_MATCHING_THRESHOLD
):
tags.append(behaviour_class)
return True
for library in libraries:
for indicator in indicators:
if indicator.lower() in library.lower():
return True
return False

def _get_tags(self, libraries: list, functions: list) -> list:
tags = []
for behaviour_class in BEHAVIOUR_CLASSES:
if behaviour_class not in tags:
behaviour_indicators = BEHAVIOUR_CLASSES[behaviour_class]
self._get_tags_from_function_list(functions, behaviour_class, behaviour_indicators, tags)
self._get_tags_from_library_list(libraries, behaviour_class, behaviour_indicators, tags)
return list(set(tags))

@staticmethod
def _get_symbols_version_entries(symbol_versions):
imported_libs = []
for sv in symbol_versions:
if str(sv) != '* Local *' and str(sv) != '* Global *':
imported_libs.append(str(sv).split('(', maxsplit=1)[0])
return list(set(imported_libs))
def _get_modinfo(elf: lief.ELF) -> list[str] | None:
# getting the information from the *.ko files .modinfo section
modinfo = None
for section in elf.sections:
if section.name == '.modinfo':
modinfo = section.content.tobytes()
modinfo = [entry.decode() for entry in modinfo.split(b'\x00') if entry]
break
return modinfo

@staticmethod
def _get_relevant_imp_functions(imp_functions):
imp_functions[:] = [x for x in imp_functions if not x.startswith('__')]
return imp_functions

@staticmethod
def _get_color_codes(tag):
if tag == 'crypto':
return TagColor.RED
if tag == 'file_system':
return TagColor.BLUE
if tag == 'network':
return TagColor.ORANGE
if tag == 'memory_operations':
return TagColor.GREEN
if tag == 'randomize':
return TagColor.LIGHT_BLUE
return TagColor.GRAY
def _convert_flags(json_dict: dict):
# convert numerical flags to "human-readable" list of strings
for section in json_dict['segments']:
section['flags'] = _get_active_flags(section['flags'], ELF_SEGMENT_FLAGS)

def create_tags(self, parsed_bin, file_object):
all_libs = self._get_symbols_version_entries(normalize_lief_items(parsed_bin.symbols_version))
all_libs.extend(normalize_lief_items(parsed_bin.libraries))
all_funcs = self._get_relevant_imp_functions(normalize_lief_items(parsed_bin.imported_functions))
for entry in self._get_tags(all_libs, all_funcs):
self.add_analysis_tag(
file_object=file_object,
tag_name=entry,
value=entry,
color=self._get_color_codes(entry),
propagate=False,
)

@staticmethod
def get_final_analysis_dict(binary_json_dict, elf_dict):
for key in binary_json_dict:
if key in LIEF_DATA_ENTRIES and binary_json_dict[key]:
elf_dict[key] = binary_json_dict[key]

def _analyze_elf(self, file_object):
elf_dict = {}
try:
parsed_binary = lief.parse(file_object.file_path)
binary_json_dict = json.loads(lief.to_json(parsed_binary))
if parsed_binary.exported_functions:
binary_json_dict['exported_functions'] = normalize_lief_items(parsed_binary.exported_functions)
if parsed_binary.imported_functions:
binary_json_dict['imported_functions'] = normalize_lief_items(parsed_binary.imported_functions)
if parsed_binary.libraries:
binary_json_dict['libraries'] = normalize_lief_items(parsed_binary.libraries)
modinfo_data = self.filter_modinfo(parsed_binary)
if modinfo_data:
elf_dict['modinfo'] = modinfo_data

except (AttributeError, TypeError):
logging.error(f'Bad file for lief/elf analysis {file_object.uid}.', exc_info=True)
return elf_dict

self.get_final_analysis_dict(binary_json_dict, elf_dict)
self._convert_address_values_to_hex(elf_dict)

return elf_dict, parsed_binary
def _get_active_flags(flags_value: int, flag_dict: dict[str, int]) -> list[str]:
# get active flags from flags_value as list of strings
return [flag_name for flag_name, flag_mask in flag_dict.items() if flags_value & flag_mask]

@staticmethod
def _convert_address_values_to_hex(elf_dict):
for category in {'sections', 'segments'}.intersection(elf_dict):
for entry in elf_dict[category]:
for key in {'virtual_address', 'offset'}.intersection(entry):
entry[key] = hex(entry[key])

@staticmethod
def filter_modinfo(binary) -> list[str] | None:
# getting the information from the *.ko files .modinfo section
modinfo = None
for section in binary.sections:
if section.name == '.modinfo':
modinfo = bytes(section.content).decode()
modinfo = [entry for entry in modinfo.split('\x00') if entry]
break
return modinfo
def _get_note_sections_content(elf: lief.ELF) -> Iterable[InfoSectionData]:
for section in elf.sections: # type: lief.ELF.Section
if section.type == lief.ELF.SECTION_TYPES.NOTE:
readable_content = bytes([c for c in section.content.tobytes() if c in PRINTABLE_BYTES])
yield InfoSectionData(name=section.name, contents=readable_content.decode())
Loading

0 comments on commit c14b112

Please sign in to comment.