-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathsniffer_3.py
128 lines (96 loc) · 3.75 KB
/
sniffer_3.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
import socket
import os
import struct
from ctypes import *
import threading
import time
from netaddr import IPNetwork, IPAddress
host = "192.168.148.135"
subnet = "192.168.1.0/24"
magic_message = "PythonRules"
def udp_sender(subnet, magic_message):
time.sleep(5)
sender = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
for ip in IPNetwork(subnet):
#print "Sending to " + str(ip)
try:
sender.sendto(magic_message, ("%s"%ip, 65212))
except Exception, e:
print str(ip), e
class IP(Structure):
_fields_ = [
("ihl", c_ubyte, 4),
("version", c_ubyte, 4),
("tos", c_ubyte),
("len", c_ushort),
("id", c_ushort),
("offset", c_short),
("ttl", c_ubyte),
("protocol_num", c_ubyte),
("sum", c_ushort),
("src", c_uint32),
("dst", c_uint32)
]
def __new__(self, socket_buffer=None):
return self.from_buffer_copy(socket_buffer)
def __init__(self, socket_buffer=None):
#map protocol constants
self.protocol_map = {1:"ICMP", 6:"TCP", 17:"UDP"}
#human readable IP adresses
k = struct.pack("q", self.src)
self.src_address = socket.inet_ntoa(struct.pack("@I",self.src))
self.dst_address = socket.inet_ntoa(struct.pack("@I",self.dst))
#human readable protocol
try:
self.protocol = self.protocol_map[self.protocol_num]
except:
self.protocol = str(self.protocol_num)
class ICMP(Structure):
_fields_ = [
("type", c_ubyte),
("code", c_ubyte),
("checksum", c_ushort),
("unused", c_ushort),
("next_hop_mtu",c_ushort)
]
def __new__(self, socket_buffer=None):
return self.from_buffer_copy(socket_buffer)
def __init__(self, socket_buffer=None):
pass
if os.name == "nt":
socket_protocol = socket.IP
else:
socket_protocol = socket.IPPROTO_ICMP
sniffer = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket_protocol)
sniffer.bind((host, 0))
#include ip header in capture
sniffer.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)
if os.name == "nt":
sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_ON)
t = threading.Thread(target=udp_sender, args=(subnet, magic_message))
t.start()
try:
while True:
#read in a packet
raw_buffer = sniffer.recvfrom(65535)[0]
#create an IP header from first 20 bytes of buffer
ip_header = IP(raw_buffer)
#print protocol and address
#print "Protocol: %s %s -> %s" %(ip_header.protocol,ip_header.src_address,
#ip_header.dst_address)
if ip_header.protocol == "ICMP":
# calculate where our ICMP packet starts
offset = ip_header.ihl * 4
buf = raw_buffer[offset:offset + sizeof(ICMP)]
#create our ICMP Structure
icmp_header = ICMP(buf)
#print "ICMP -> Type: %d code: %d"%(icmp_header.type, icmp_header.code)
if icmp_header.code == 3 and icmp_header.type == 3:
if IPAddress(ip_header.src_address) in IPNetwork(subnet):
#make sure it has our magic message
if raw_buffer[len(raw_buffer)-len(magic_message):] == magic_message:
print "Host up: %s"%ip_header.src_address
except KeyboardInterrupt:
#if windows set up IOCTL to turn off promiscous mode
if os.name == "nt":
sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_OFF)