-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathworker.py
122 lines (103 loc) · 4.02 KB
/
worker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
"""
Throttler main worker
"""
from pymongo import MongoClient
from pika import spec
import pika
import json
import logging
import sys
import time
import dateutil.parser
# global logger
db_auth_log = None #Mongo users DB auth log
db_users = None #Mongo users DB
db_user_ip = None
# logger = None
def processUserRegistration(ch, method, properties, body):
"""
Function gets called whenever we get an item from the queue about a user logging on or off.
"""
global db_auth_log
global db_users
global db_user_ip
global logger
request = json.loads(body)
logger.debug("Got user %s on address %s" % (request['username'], request['ip_address']))
authed_time = dateutil.parser.parse(request['timestamp'])
#A user auth has come in. Log it in our historical records table
db_auth_log.insert( {
'username': request['username'],
'authed_time': authed_time,
'method': request['method'],
'ip_address': request['ip_address'],
} )
db_user_ip.update( {
'_id': request['ip_address']
},
{
'_id': request['ip_address'],
'username': request['username'],
'authed_time': authed_time,
'method': request['method'],
}, upsert=True)
#Ack the processing of this transaction
ch.basic_ack(delivery_tag=method.delivery_tag)
def main(settings):
"""
settings: The setting dictionary
"""
logger.debug("Starting main function..")
global db_auth_log
global db_users
global db_user_ip
# global logger
logger.debug("Seting Herbert User-Auth-Writer")
#Setup the MongoDB Connection
mongo_client = MongoClient(settings['mongodb_server'], 27017)
db = mongo_client[settings['mongodb_database']]
db.authenticate(settings['mongodb_username'], settings['mongodb_password'])
db_auth_log = db[settings['mongodb_auth_log_collection']]
db_auth_log.ensure_index('authed_time', expire_after_seconds=31536000L) #Keep the auth log around for 12 months
db_user_ip = db.user_ip
exclusive = False
durable=True
#Setup the message queue
credentials = pika.PlainCredentials(settings['amqp_username'], settings['amqp_password'])
amqp_connection = pika.BlockingConnection(pika.ConnectionParameters(settings['amqp_server'],credentials=credentials))
amqp_channel = amqp_connection.channel()
amqp_channel.exchange_declare(exchange=settings['amqp_exchange'] ,type='fanout')
amqp_channel.queue_declare(queue=settings['amqp_queue'], durable=durable, exclusive=exclusive)
amqp_channel.queue_bind(exchange=settings['amqp_exchange'], queue=settings['amqp_queue'])
#Setup the basic consume settings so we don't try and process too much at a time
amqp_channel.basic_qos(prefetch_count=4)
#Bind to the queues and start consuming
amqp_channel.basic_consume(processUserRegistration, queue=settings['amqp_queue'])
amqp_channel.start_consuming()
if __name__ == "__main__":
#Load up the settings from disk
logging.basicConfig()
# global logger
settings = {}
for setting in open('settings.txt', 'r').read().split('\n'):
setting = setting.strip()
if setting == '' or setting[0] in ['!', '#'] or ':' not in setting:
continue
key, value = setting.split(":")
settings[key.strip()] = value.strip()
#If we're in debug/testing.. just run and die
logger = logging.getLogger('worker')
if 'mode' in settings and 'debug' in settings['mode']:
logger.setLevel(logging.DEBUG)
if 'mode' in settings and 'test' in settings['mode']:
logger.setLevel(logging.INFO)
main(settings)
sys.exit(0)
#If we're in production, print out the exception and try and restart the app
while 1:
try:
main(settings)
except:
logging.critical("-- ERROR Has occured in Herbert Throttler. Sleeping and re-running")
logging.critical(sys.exc_info()[0])
time.sleep(5)