-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathtia_example.py
125 lines (100 loc) · 3.14 KB
/
tia_example.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
#!/usr/bin/env python
#
# Connect to the Quad9 threat-intel api and receive domain block information
# You receive the auth_token from Quad9 and it is specific to a threat feed.
#
# Requires Python version 3.8 or greater.
#
# usage:
# ./tia_example.py --config <YOUR CONFIG FILE>
#
# set verbose: true to see the data being retrieved.
# @author: Emilia Cebrat-Maslowski (Quad9)
import os
import logging
import asyncio
import websockets
import json
import yaml
import sys
import os
import time
import argparse
from aiofile import async_open
from collections import namedtuple
def read_config(config_path):
with open(config_path, 'r') as f:
parsed_file = yaml.safe_load(f)
Config = namedtuple("Config", "ti_url auth_token data_file log_file verbose nolog noack")
config = Config(
parsed_file['ti_url'],
parsed_file['auth_token'],
parsed_file['data_file'],
parsed_file['log_file'],
parsed_file['verbose'],
parsed_file['nolog'],
parsed_file['noack']
)
return config
async def readblockloop(config, events):
async with websockets.connect(config.ti_url,
extra_headers={'Authorization': "Token " + config.auth_token}) as ws:
global websocket
websocket = ws
while True:
try:
message = await websocket.recv()
if config.verbose:
print(f" {message}")
if not config.nolog:
await events.put(message)
except:
logging.debug('Failed to receive message')
await asyncio.sleep(1)
async def process_acks(acks):
while True:
ack = await acks.get()
try:
logging.debug(f"ACKing: {ack}")
await send_data(ack)
except:
logging.debug('Failed to send ack')
break
async def send_data(data):
frame = json.dumps(data)
await websocket.send(frame)
async def process_events(config, events, acks):
while True:
async with async_open(config.data_file, "a") as f:
event = await events.get()
await f.write(event)
if not config.noack:
event_parsed = json.loads(event)
ack = dict(id=event_parsed['id'])
await acks.put(ack)
def main():
parser = argparse.ArgumentParser(description='Read from Quad9 threat-intel api')
parser.add_argument('--config', required=True,
help='Path to the config file.')
args = parser.parse_args()
config = read_config(args.config)
logging.basicConfig(filename=config.log_file, level=logging.INFO, format='%(message)s')
loop = asyncio.get_event_loop()
acks = asyncio.Queue()
events = asyncio.Queue()
try:
loop.create_task(readblockloop(config, events))
loop.create_task(process_events(config, events, acks))
loop.create_task(process_acks(acks))
loop.run_forever()
finally:
loop.close()
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
print('Interrupted')
try:
sys.exit(0)
except SystemExit:
os._exit(0)