Skip to content

Commit

Permalink
Merge pull request #1184 from stratosphereips/alya/fix_fp_dns_without…
Browse files Browse the repository at this point in the history
…_conn

Fix FP dns without connection
  • Loading branch information
AlyaGomaa authored Jan 27, 2025
2 parents c9fa03a + ddd15d5 commit e28d113
Show file tree
Hide file tree
Showing 17 changed files with 903 additions and 334 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/integration-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ jobs:

- name: Run Integration Tests for ${{ matrix.test_file }}
run: |
python3 -m pytest tests/integration_tests/${{ matrix.test_file }} -p no:warnings -vv -s -n 5
python3 -m pytest tests/integration_tests/${{ matrix.test_file }} -p no:warnings -vv -s -n 3
- name: Upload Artifacts
if: success() || failure()
Expand Down
3 changes: 2 additions & 1 deletion managers/process_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,8 @@ def stop_slips(self) -> bool:
"""
determines whether slips should stop
based on the following:
1. is slips still receiving new flows?
1. is slips still receiving new flows? (checks input.py and
profiler.py)
2. did slips the control channel recv the stop_slips
3. is a debugger present?
"""
Expand Down
14 changes: 5 additions & 9 deletions modules/flowalerts/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,6 @@ def init(self):
# slips will alert data upload
self.flow_upload_threshold = 100
self.read_configuration()
# Cache list of connections that we already checked in the timer
# thread (we waited for the dns resolution for these connections)
self.connections_checked_in_conn_dns_timer_thread = []
self.whitelist = self.flowalerts.whitelist
# how much time to wait when running on interface before reporting
# connections without DNS? Usually the computer resolved DNS
Expand Down Expand Up @@ -793,13 +790,12 @@ async def analyze(self, msg):
self.check_different_localnet_usage(
twid, flow, what_to_check="srcip"
)
task = asyncio.create_task(
self.check_connection_without_dns_resolution(
profileid, twid, flow
)
self.flowalerts.create_task(
self.check_connection_without_dns_resolution,
profileid,
twid,
flow,
)
# to wait for these functions before flowalerts shuts down
self.flowalerts.tasks.append(task)
self.detect_connection_to_multiple_ports(profileid, twid, flow)
self.check_data_upload(profileid, twid, flow)
self.check_non_http_port_80_conns(twid, flow)
Expand Down
206 changes: 184 additions & 22 deletions modules/flowalerts/dns.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
# SPDX-FileCopyrightText: 2021 Sebastian Garcia <[email protected]>
# SPDX-License-Identifier: GPL-2.0-only
import asyncio
import collections
import ipaddress
import json
import math
import queue
from datetime import datetime
from typing import List
from typing import (
List,
Tuple,
Any,
)
import validators
from multiprocessing import Queue
from threading import Thread

from slips_files.common.abstracts.flowalerts_analyzer import (
IFlowalertsAnalyzer,
Expand All @@ -26,9 +32,6 @@ def init(self):
self.nxdomains = {}
# if nxdomains are >= this threshold, it's probably DGA
self.nxdomains_threshold = 10
# Cache list of connections that we already checked in the timer
# thread (we waited for the connection of these dns resolutions)
self.connections_checked_in_dns_conn_timer_thread = []
# dict to keep track of arpa queries to check for DNS arpa scans later
# format {profileid: [ts,ts,...]}
self.dns_arpa_queries = {}
Expand All @@ -38,7 +41,23 @@ def init(self):
self.classifier = FlowClassifier()
self.our_ips = utils.get_own_ips()
# In mins
self.dns_without_conn_interface_wait_time = 5
self.dns_without_conn_interface_wait_time = 30
# to store dns queries that we should check later. the purpose of
# this is to give the connection some time to arrive
self.pending_dns_without_conn = Queue()
self.dns_without_connection_timeout_checker_thread = Thread(
target=self.check_dns_without_connection_timeout,
daemon=True,
)

# used to pass the msgs this analyzer reciecved, to the
# dns_without_connection_timeout_checker_thread.
# the reason why we can just use .get_msg() there is because once
# the msg is handled here, it wont be passed to other analyzers the
# should analyze it anymore.
# meaning, only flowalerts.py is allowed to do a get_msg because it
# manages all the analyzers the msg should be passed to
self.dns_msgs = Queue()

def name(self) -> str:
return "DNS_analyzer"
Expand Down Expand Up @@ -193,6 +212,8 @@ def is_any_flow_answer_contacted(self, profileid, twid, flow) -> bool:
# if so, we extend the flow.asnwers to include
# these IPs. the goal is to avoid FPs
flow.answers.extend(self.get_previous_domain_resolutions(flow.query))
# remove duplicates
flow.answers = list(set(flow.answers))

if flow.answers == ["-"]:
# If no IPs are in the answer, we can not expect
Expand All @@ -203,10 +224,6 @@ def is_any_flow_answer_contacted(self, profileid, twid, flow) -> bool:
contacted_ips = self.db.get_all_contacted_ips_in_profileid_twid(
profileid, twid
)
# If contacted_ips is empty it can be because
# we didnt read yet all the flows.
# This is automatically captured later in the
# for loop and we start a Timer

# every dns answer is a list of ips that correspond to 1 query,
# one of these ips should be present in the contacted ips
Expand Down Expand Up @@ -243,11 +260,132 @@ def is_interface_timeout_reached(self):
# 30 minutes have passed?
return diff >= self.dns_without_conn_interface_wait_time

def check_dns_without_connection_of_all_pending_flows(self):
"""should be called before shutting down, to check all the pending
flows in the pending_dns_without_conn queue before stopping slips,
doesnt matter if the 30 mins passed or not"""
while self.pending_dns_without_conn.qsize() > 0:
try:
# flowalerts is closing here. there's no chance that
profileid, twid, pending_flow = (
self.pending_dns_without_conn.get(timeout=5)
)
except queue.Empty:
return

self.check_dns_without_connection(
profileid, twid, pending_flow, waited_for_the_conn=True
)

def get_dns_flow_from_queue(self):
"""
Fetch and parse the DNS message from the dns_msgs queue.
Returns None if the queue is empty.
"""
try:
msg: str = self.dns_msgs.get(timeout=4)
except queue.Empty:
return None

msg: dict = json.loads(msg["data"])
flow = self.classifier.convert_to_flow_obj(msg["flow"])
return flow

def check_pending_flows_timeout(
self, reference_flow: Any
) -> List[Tuple[str, str, Any]]:
"""
Process all pending DNS flows without connections.
Calls check_dns_without_connection when 10, 20 and 30 mins (zeek
time) pass since the first encounter of the dns flow.
:param reference_flow: the current DNS flow slips is nalayzing.
only used to get the timestamp of the zeek now. just to know if 30
mins passed in zeek time or not.
Returns a list of flows that need to be put back into the queue
and checked later.
"""
back_to_queue: List[Tuple[str, str, Any]] = []

while self.pending_dns_without_conn.qsize() > 0:
try:
profileid, twid, pending_flow = (
self.pending_dns_without_conn.get(timeout=5)
)
except queue.Empty:
return back_to_queue

diff_in_mins = utils.get_time_diff(
pending_flow.starttime, reference_flow.starttime, "minutes"
)

if diff_in_mins >= 30:
self.check_dns_without_connection(
profileid, twid, pending_flow, waited_for_the_conn=True
)
elif 9.5 < diff_in_mins <= 10 or 19.5 < diff_in_mins <= 20:
self.check_dns_without_connection(
profileid, twid, pending_flow, waited_for_the_conn=False
)
else:
back_to_queue.append((profileid, twid, pending_flow))

return back_to_queue

def check_dns_without_connection_timeout(self):
"""
Waits 30 mins in zeek time for the connection of a dns to arrive
Does so by receiving every dns msg this analyzer receives. then we
compare the ts of it to the ts of the flow we're waiting the 30
mins for.
once we know the diff between them is >=30 mins we check for the
dns without connection evidence.
The whole point is to give the connection 30 mins in zeek time to
arrive before alerting "dns wihtout conn".
- To avoid having thousands of flows in memory for 30 mins. we check
every 10 mins for the connections, if not found we put it back to
queue, if found we remove that flow from the pending flows
This function runs in its own thread
"""
try:
while not self.flowalerts.should_stop():
if self.pending_dns_without_conn.empty():
continue

# we just use it to know the zeek current ts to check if 30
# mins zeek time passed or not. we are not going to
# analyze it.
reference_flow = self.get_dns_flow_from_queue()
if not reference_flow:
# ok wait for more dns flows to be read by slips
continue

# back_to_queue will be used to store the flows we're
# waiting for the conn of temporarily if 30 mins didnt pass
# since we saw them.
# the goal of this is to not change the queue size in the
# below loop
back_to_queue = self.check_pending_flows_timeout(
reference_flow
)
# put them back to the queue so we can check them later
for flow in back_to_queue:
flow: Tuple[str, str, Any]
self.pending_dns_without_conn.put(flow)
except KeyboardInterrupt:
# the rest will be handled in shutdown_gracefully
return

async def check_dns_without_connection(
self, profileid, twid, flow
self, profileid, twid, flow, waited_for_the_conn=False
) -> bool:
"""
Makes sure all cached DNS answers are there in contacted_ips
:kwarg waited_for_the_conn: if True, it means we already waited 30
mins in zeek time for the conn of this dns to arrive, and it didnt.
if False, we wait 30 mins zeek time for it to arrive
"""
if not self.should_detect_dns_without_conn(flow):
return False
Expand All @@ -258,13 +396,12 @@ async def check_dns_without_connection(
if self.is_any_flow_answer_contacted(profileid, twid, flow):
return False

# Found a DNS query and none of its answers were contacted
await asyncio.sleep(40)

if self.is_any_flow_answer_contacted(profileid, twid, flow):
if not waited_for_the_conn:
# wait 30 mins zeek time for the conn of this dns to arrive
self.pending_dns_without_conn.put((profileid, twid, flow))
return False

# Reaching here means we already waited some time for the connection
# Reaching here means we already waited for the connection
# of this dns to arrive but none was found
self.set_evidence.dns_without_conn(twid, flow)
return True
Expand Down Expand Up @@ -430,20 +567,45 @@ def check_dns_arpa_scan(self, profileid, twid, flow):
self.dns_arpa_queries.pop(profileid)
return True

def shutdown_gracefully(self):
self.check_dns_without_connection_of_all_pending_flows()
self.dns_without_connection_timeout_checker_thread.join()
# close the queue
# without this, queues are left in memory and flowalerts keeps
# waiting for them forever
# to exit the process quickly without blocking on the queue's cleanup
self.dns_msgs.cancel_join_thread()
self.dns_msgs.close()

self.pending_dns_without_conn.cancel_join_thread()

self.pending_dns_without_conn.close()

def pre_analyze(self):
"""Code that shouldnt be run in a loop. runs only once in
flowalerts' pre_main"""
# we didnt put this in __init__ because it uses self.flowalerts
# attributes that are not initialized yet in __init__
self.dns_without_connection_timeout_checker_thread.start()

async def analyze(self, msg):
"""
is only used by flowalerts.py
runs whenever we get a new_dns message
"""
if not utils.is_msg_intended_for(msg, "new_dns"):
return False

self.dns_msgs.put(msg)
msg = json.loads(msg["data"])
profileid = msg["profileid"]
twid = msg["twid"]
flow = self.classifier.convert_to_flow_obj(msg["flow"])
task = asyncio.create_task(
self.check_dns_without_connection(profileid, twid, flow)

self.flowalerts.create_task(
self.check_dns_without_connection, profileid, twid, flow
)
# Allow the event loop to run the scheduled task
await asyncio.sleep(0)
# to wait for these functions before flowalerts shuts down
self.flowalerts.tasks.append(task)

self.check_high_entropy_dns_answers(twid, flow)
self.check_invalid_dns_answers(twid, flow)
self.detect_dga(profileid, twid, flow)
Expand Down
7 changes: 6 additions & 1 deletion modules/flowalerts/flowalerts.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,12 @@ def subscribe_to_channels(self):
self.channels.update({channel: channel_obj})

async def shutdown_gracefully(self):
await asyncio.gather(*self.tasks)
self.dns.shutdown_gracefully()
await asyncio.gather(*self.tasks, return_exceptions=True)

def pre_main(self):
utils.drop_root_privs()
self.dns.pre_analyze()
self.analyzers_map = {
"new_downloaded_file": [self.downloaded_file.analyze],
"new_notice": [self.notice.analyze],
Expand All @@ -78,6 +80,7 @@ def pre_main(self):
}

async def main(self):
"""runs in a loop, waiting for messages in subscribed channels"""
for channel, analyzers in self.analyzers_map.items():
msg: dict = self.get_msg(channel)
if not msg:
Expand All @@ -91,6 +94,8 @@ async def main(self):
# and finish whenever they finish, we'll not wait for them
loop = asyncio.get_event_loop()
task = loop.create_task(analyzer(msg))
# because Async Tasks swallow exceptions.
task.add_done_callback(self.handle_exception)
# to wait for these functions before flowalerts shuts down
self.tasks.append(task)
# Allow the event loop to run the scheduled task
Expand Down
13 changes: 12 additions & 1 deletion modules/flowalerts/set_evidence.py
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,17 @@ def dga(self, twid, flow, nxdomains: int, uids: List[str]) -> None:
self.db.set_evidence(evidence)

def dns_without_conn(self, twid, flow):
# WARNING
# The approach we use to detect "dns without connection" evidence will
# cause the evidence to be set after 30 mins of the dns flow,
# and the timewindow of that evidence may have been closed,
# that would cause us to detect the tw as malicious way after it
# ends.
# but this doesnt matter since the threat level of it is info.
#
# this will be an issue if we ever decide to increase the threat level
# of this evidence

description: str = f"domain {flow.query} resolved with no connection"
twid_number: int = int(twid.replace("timewindow", ""))

Expand All @@ -480,7 +491,7 @@ def dns_without_conn(self, twid, flow):
victim_type=IoCType.DOMAIN,
value=flow.query,
),
threat_level=ThreatLevel.LOW,
threat_level=ThreatLevel.INFO,
description=description,
profile=ProfileID(ip=flow.saddr),
timewindow=TimeWindow(number=twid_number),
Expand Down
Loading

0 comments on commit e28d113

Please sign in to comment.