Closed
Description
I simply wanted to analyze and test DNS packets and view them on my console. The problem is when I call unbind()
method of netfilterqueue, it hangs on the program. This does not happens every time. Plz help me If anyone of you know how to unbind without hanging. (see line 83)
import time
from netfilterqueue import NetfilterQueue
from scapy.all import *
import os
import sys
import threading
from random import randint
import colored
class DNS_analyze(object):
def __init__(self, net_interface):
self.q = NetfilterQueue()
self.net_interface = net_interface
## We need to identify what kind of packet we receive
def dns_debug_print(self, ip_pkt):
sep_str = colored.attr('bold') + colored.fg('white') + "|" + colored.attr('reset')
try:
dns_id = "{}{}{}".format(colored.fg('cyan'), str(ip_pkt[DNS].id), colored.attr('reset'))
dns_query = "{}{}{}".format(colored.fg('light_green'), str(ip_pkt[DNS].qd.qname.decode("utf-8")), colored.attr('reset'))
dns_qtype = "{}{}{}".format(colored.fg(169), dnsqtypes[ip_pkt[DNS].qd.qtype], colored.attr('reset'))
dns_qclass = "{}{}{}".format(colored.fg(3), dnsclasses[ip_pkt[DNS].qd.qclass], colored.attr('reset'))
except Exception as e:
print("Exception occured.. {}".format(e))
return
fmt_str = "%s:%s -> %s:%s|%s%s%s%s%s%s%s" % (
ip_pkt.src, ip_pkt[UDP].sport,
ip_pkt.dst, ip_pkt[UDP].dport,
dns_id,
sep_str,
dns_qtype,
sep_str,
dns_query,
sep_str,
dns_qclass
)
with open('dns_analyze_log.txt', 'a') as the_file:
the_file.write(fmt_str + '\n')
print(fmt_str)
def callback(self, pkt):
orig_pkt = IP(pkt.get_payload())
self.dns_debug_print(orig_pkt)
pkt.accept()
def _analyze(self):
self.q.bind(1, self.callback)
self.q.run()
def stop(self):
print("[*] Restoring iptables DNS hook.")
os.system('iptables -i '+ self.net_interface +' -t nat -D PREROUTING -p udp --dport 53 -j NFQUEUE --queue-num 1')
self.q.unbind()
print("[*] unbinded netfilter hook.")
def start(self):
os.system('iptables -i '+ self.net_interface +' -t nat -A PREROUTING -p udp --dport 53 -j NFQUEUE --queue-num 1')
t = threading.Thread(name='DNS_analyze', target=self._analyze)
t.setDaemon(True)
t.start()
dns__analyze = DNS_analyze('wlan0')
try:
dns__analyze.start()
while 1:
time.sleep(1)
except KeyboardInterrupt:
print('stopping dns_analyze')
dns__analyze.stop()
Metadata
Metadata
Assignees
Labels
No labels