Skip to content

Commit

Permalink
Merge pull request #1202 from stratosphereips/alya/fix_not_reading_al…
Browse files Browse the repository at this point in the history
…l_given_flows

Fix not reading all given flows due to whitelisting all flows and evidence to tranco domains
  • Loading branch information
AlyaGomaa authored Jan 29, 2025
2 parents 15d0aec + 5b4ac49 commit 7179b8b
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 36 deletions.
3 changes: 2 additions & 1 deletion docs/usage.md
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,8 @@ Slips uses Tranco whitelist which contains a research-oriented top sites
ranking hardened against manipulation here https://tranco-list.eu/

Slips download the top 10k domains from this list and by default and
whitelists all evidence and alerts from and to these domains. Slips still shows the flows to and from these IoC.
whitelists all evidence and alerts from and to these domains.
Slips still shows the flows to and from these IoC.


The tranco list is updated daily by default in Slips, but you can change how often to update it using the
Expand Down
2 changes: 2 additions & 0 deletions managers/process_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,8 @@ def is_stop_msg_received(self) -> bool:
"""
returns true if the control_channel channel received the
'stop_slips' msg
This control channel is used by CYST or the filemanager to tell
slips that zeek terminated (useful when running slips with -g)
"""
message = self.main.c1.get_message(timeout=0.01)
if not message:
Expand Down
38 changes: 25 additions & 13 deletions slips_files/core/helpers/whitelist/domain_whitelist.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,29 +74,41 @@ def is_whitelisted(
if not parent_domain:
return False

if self.is_domain_in_tranco_list(parent_domain):
return True

whitelisted_domains: Dict[str, Dict[str, str]]
whitelisted_domains = self.db.get_whitelist("domains")

# is domain in whitelisted domains?
if parent_domain not in whitelisted_domains:
# if the parent domain not in whitelisted domains, then the
# child definetely isn't
# is the parent domain in any of slips whitelists?? like tranco or
# whitelist.conf?
# if so we need to get extra info about that domain based on the
# whitelist.
# e.g by default slips whitelists all evidence and alerts from and to
# tranco domains.
# but domains taken from whitelist.conf have their own direction
# and type
if self.is_domain_in_tranco_list(parent_domain):
whitelist_should_ignore = "alerts"
dir_from_whitelist = "dst"
elif parent_domain in whitelisted_domains:
# did the user say slips should ignore flows or alerts in the
# config file?
whitelist_should_ignore = whitelisted_domains[parent_domain][
"what_to_ignore"
]
# did the user say slips should ignore flows/alerts TO or from
# that domain in the config file?
dir_from_whitelist: str = whitelisted_domains[parent_domain][
"from"
]
else:
return False

# Ignore flows or alerts?
whitelist_should_ignore = whitelisted_domains[parent_domain][
"what_to_ignore"
]
# match the direction and whitelist_Type of the given domain to the
# ones we have from the whitelist.
if not self.match.what_to_ignore(
should_ignore, whitelist_should_ignore
):
return False

# Ignore src or dst
dir_from_whitelist: str = whitelisted_domains[parent_domain]["from"]
if not self.match.direction(direction, dir_from_whitelist):
return False

Expand Down
4 changes: 2 additions & 2 deletions slips_files/core/helpers/whitelist/matcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ class WhitelistMatcher:
"""
matches ioc properties to whitelist properties
for example if in the config file we have
facebook, alerts, to
this matcher macher when given a fb ip, makes sure we're whitelisting
"facebook, alerts, to"
this matcher maches when given a fb ip, makes sure we're whitelisting
an alert, not a flow
and makes sure we're whitelisting all flows TO fb and not from fb.
its called like this
Expand Down
45 changes: 25 additions & 20 deletions slips_files/core/profiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -442,17 +442,11 @@ def get_input_type(self, line: dict, input_type: str):
# pcap, binetflow, binetflow tabs, nfdump, etc
return input_type

def shutdown_gracefully(self):
def join_profiler_threads(self):
# wait for the profiler threads to complete
for thread in self.profiler_threads:
thread.join()

self.print(
f"Stopping. Total lines read: {self.rec_lines}",
log_to_logfiles_only=True,
)
self.mark_process_as_done_processing()

def mark_process_as_done_processing(self):
"""
is called to mark this process as done processing so
Expand Down Expand Up @@ -546,23 +540,16 @@ def get_msg_from_input_proc(
the profiler threads (e.g pending_flows_queue).
when set to true, this function uses the pending flows queue lock.
"""
if thread_safe:
self.pending_flows_queue_lock.acquire()
try:
# this msg can be a str only when it's a 'stop' msg indicating
# that this module should stop
msg = q.get(timeout=1, block=False)
if thread_safe:
self.pending_flows_queue_lock.release()
return msg
with self.pending_flows_queue_lock:
return q.get(timeout=1, block=False)
else:
return q.get(timeout=1, block=False)
except queue.Empty:
pass
return None
except Exception:
# ValueError is raised when the queue is closed
pass

if thread_safe:
self.pending_flows_queue_lock.release()
return None

def start_profiler_threads(self):
"""starts 3 profiler threads for faster processing of the flows"""
Expand Down Expand Up @@ -592,6 +579,10 @@ def init_input_handlers(self, line, input_type):
self.input_handler_obj = SUPPORTED_INPUT_TYPES[self.input_type]()

def stop_profiler_thread(self) -> bool:
# cant use while self.flows_to_process_q.qsize() != 0 only here
# because when the thread starts, this qsize is 0, so we need
# another indicator that we are at the end of the flows. aka the
# stop_profiler_threads event
return (
self.stop_profiler_threads.is_set()
and not self.flows_to_process_q.qsize()
Expand All @@ -602,6 +593,7 @@ def process_flow(self):
This function runs in 3 parallel threads for faster processing of
the flows
"""

while not self.stop_profiler_thread():
msg = self.get_msg_from_input_proc(
self.flows_to_process_q, thread_safe=True
Expand Down Expand Up @@ -651,6 +643,13 @@ def should_stop(self):
"""
return False

def shutdown_gracefully(self):
self.print(
f"Stopping. Total lines read: {self.rec_lines}",
log_to_logfiles_only=True,
)
self.mark_process_as_done_processing()

def pre_main(self):
utils.drop_root_privs()
client_ips = [str(ip) for ip in self.client_ips]
Expand Down Expand Up @@ -681,7 +680,13 @@ def main(self):
# without it, there's no way this module will know it's
# time to stop and no new flows are coming
if self.is_stop_msg(msg):
# DO NOT return/exit this module before all profilers are
# done. if you do, the profiler threads will shutdown
# before reading all flows as soon as we receive the stop msg.
# signal the threads to stop
self.stop_profiler_threads.set()
# wait for them to finish
self.join_profiler_threads()
return 1

self.pending_flows_queue_lock.acquire()
Expand Down

0 comments on commit 7179b8b

Please sign in to comment.