-
Notifications
You must be signed in to change notification settings - Fork 0
/
reposter.py
executable file
·138 lines (102 loc) · 4.06 KB
/
reposter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
#!/usr/bin/python
"""Reposter/splitter service for SendGrid Event API.
Usage:
1. Copy sample_settings.py to settings.py.
2. Customize the REPOSTER_PORT, make sure it's open to the outside
world, set it up as Event webhook URL in the SendGrid panel.
3. Put the list of your sites in SITE_URLS. Example:
@@@
SITE_URLS = {
'ci-server': 'http://buildbot.marcinkaszynski.com:1234',
'devel': 'http://devel.marcinkaszynski.com:10001',
}
@@@
4. Modify your mail-sending code to add site name to each
message you send using a unique argument named 'site'. For
details how to use unique arguments in general see:
http://sendgrid.com/docs/API_Reference/SMTP_API/unique_arguments.html
http://sendgrid.com/docs/API_Reference/Web_API/mail.html
Look for x-smtpapi header/parameter.
5. Start reposter. It will receive messages from SendGrid and repost
to your sites based on each event's unique_args.site.
"""
import json
import logging
from twisted.internet import reactor, task
from twisted.web.client import getPage
from twisted.web.server import Site
from twisted.web.resource import Resource
import settings
class Uploader(object):
def __init__(self, name, url):
self.name = name
self.url = url
self.deferred = None
self.queue = []
def append(self, event):
self.queue.append(event)
def run(self):
if self.deferred or (len(self.queue) == 0):
return
self.events_being_sent = self.queue
self.queue = []
self.deferred = getPage(self.url,
headers={},
method="POST",
postdata=json.dumps(self.events_being_sent))
self.deferred.addCallback(self.deferred_callback)
self.deferred.addErrback(self.deferred_errback)
def deferred_callback(self, response):
logging.info("Uploaded %d events to %s", len(self.events_being_sent), self.name)
self.events_being_sent = []
self.deferred = None
def deferred_errback(self, error):
logging.error("Could not upload events for %s: %s", self.name, repr(error))
# Throw the events back into queue
self.queue = self.events_being_sent + self.queue
self.deferred = None
class EventDispatcher(object):
def __init__(self):
self.uploaders = {}
for name, url in settings.SITE_URLS.items():
self.uploaders[name] = Uploader(name, url)
logging.info("%d uploaders: %r", len(self.uploaders.keys()), self.uploaders.keys())
def add_event(self, event):
name = self.get_uploader_name(event)
uploader = self.uploaders.get(name)
if uploader:
uploader.append(event)
return True
return False
def dispatch(self):
logging.info("DISPATCH: %s", [(name, len(uploader.queue))
for (name, uploader) in self.uploaders.items()])
for uploader in self.uploaders.values():
uploader.run()
def get_uploader_name(self, event):
return event.get('site', 'default')
class EventHandler(Resource):
isLeaf = True
def __init__(self, dispatcher, *args, **kwargs):
Resource.__init__(self, *args, **kwargs)
self.dispatcher = dispatcher
def render_POST(self, request):
ignored_cnt = 0
for event in json.loads(request.content.read()):
if not self.dispatcher.add_event(event):
print event
ignored_cnt += 1
if ignored_cnt:
logging.info("Ignored %d events.", ignored_cnt)
self.dispatcher.dispatch()
return "OK"
def main():
dispatcher = EventDispatcher()
task.LoopingCall(dispatcher.dispatch).start(5.0)
resource = EventHandler(dispatcher)
factory = Site(resource)
logging.info("Starting server on port %r" % settings.REPOSTER_PORT)
reactor.listenTCP(settings.REPOSTER_PORT, factory)
reactor.run()
logging.basicConfig(format='%(asctime)s %(levelname)s:%(message)s', level=logging.DEBUG)
main()