Skip to content

Commit

Permalink
dns: handle empty queues
Browse files Browse the repository at this point in the history
  • Loading branch information
AlyaGomaa committed Jan 27, 2025
1 parent 2f214c3 commit ddd15d5
Showing 1 changed file with 31 additions and 6 deletions.
37 changes: 31 additions & 6 deletions modules/flowalerts/dns.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import ipaddress
import json
import math
import queue
from datetime import datetime
from typing import (
List,
Expand Down Expand Up @@ -264,14 +265,28 @@ def check_dns_without_connection_of_all_pending_flows(self):
flows in the pending_dns_without_conn queue before stopping slips,
doesnt matter if the 30 mins passed or not"""
while self.pending_dns_without_conn.qsize() > 0:
profileid, twid, pending_flow = self.pending_dns_without_conn.get()
try:
# flowalerts is closing here. there's no chance that
profileid, twid, pending_flow = (
self.pending_dns_without_conn.get(timeout=5)
)
except queue.Empty:
return

self.check_dns_without_connection(
profileid, twid, pending_flow, waited_for_the_conn=True
)

def get_dns_flow_from_queue(self):
"""Fetch and parse the DNS message from the dns_msgs queue."""
msg: str = self.dns_msgs.get()
"""
Fetch and parse the DNS message from the dns_msgs queue.
Returns None if the queue is empty.
"""
try:
msg: str = self.dns_msgs.get(timeout=4)
except queue.Empty:
return None

msg: dict = json.loads(msg["data"])
flow = self.classifier.convert_to_flow_obj(msg["flow"])
return flow
Expand All @@ -293,9 +308,13 @@ def check_pending_flows_timeout(
back_to_queue: List[Tuple[str, str, Any]] = []

while self.pending_dns_without_conn.qsize() > 0:
profileid, twid, pending_flow = self.pending_dns_without_conn.get(
timeout=4
)
try:
profileid, twid, pending_flow = (
self.pending_dns_without_conn.get(timeout=5)
)
except queue.Empty:
return back_to_queue

diff_in_mins = utils.get_time_diff(
pending_flow.starttime, reference_flow.starttime, "minutes"
)
Expand Down Expand Up @@ -339,6 +358,10 @@ def check_dns_without_connection_timeout(self):
# mins zeek time passed or not. we are not going to
# analyze it.
reference_flow = self.get_dns_flow_from_queue()
if not reference_flow:
# ok wait for more dns flows to be read by slips
continue

# back_to_queue will be used to store the flows we're
# waiting for the conn of temporarily if 30 mins didnt pass
# since we saw them.
Expand Down Expand Up @@ -550,10 +573,12 @@ def shutdown_gracefully(self):
# close the queue
# without this, queues are left in memory and flowalerts keeps
# waiting for them forever
# to exit the process quickly without blocking on the queue's cleanup
self.dns_msgs.cancel_join_thread()
self.dns_msgs.close()

self.pending_dns_without_conn.cancel_join_thread()

self.pending_dns_without_conn.close()

def pre_analyze(self):
Expand Down

0 comments on commit ddd15d5

Please sign in to comment.