-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcapa.py
52 lines (39 loc) · 2.29 KB
/
capa.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import subprocess
from assemblyline_v4_service.common.base import ServiceBase
from assemblyline_v4_service.common.result import Result, ResultSection
class Capa(ServiceBase):
def __init__(self, config=None):
super(Capa, self).__init__(config)
def start(self):
self.log.debug("Capa service started")
def stop(self):
self.log.debug("Capa service ended")
def execute(self, request):
# ==================================================================
# Execute a request:
# Every time your service receives a new file to scan, the execute function is called
# This is where you should execute your processing code.
# For this example, we will only generate results ...
# ==================================================================
# 1. Create a result object where all the result sections will be saved to
result = Result()
file = request.file_path
# ==================================================================
# CAPA Execution with capa-rules repository - Commented because of a rule generating an error.
# Dockerfile updated to clone capa-rules with tag v3.2.0 which generates no error
# ==================================================================
p1 = subprocess.run(["/opt/capa", "-r", "/opt/al_service/capa-rules", "-s", "/opt/al_service/capa/sigs", file], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# ==================================================================
# Running CAPA with signature folder only and output as text (non-verbose)
# ==================================================================
#p1 = subprocess.run(
# "/opt/capa -s /opt/al_service/capa/sigs " + file,
# capture_output=True, text=True, shell=True, check=True).stdout
# 2. Create a section to be displayed for this result
text_section = ResultSection("CAPA Analysis output")
# 2.1. Add lines to your section
text_section.add_line(p1.stdout.decode('utf-8'))
# 3. Make sure you add your section to the result
result.add_section(text_section)
# 4. Wrap-up: Save your result object back into the request
request.result = result