-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathengine.py
executable file
·81 lines (68 loc) · 2.51 KB
/
engine.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
#!/usr/bin/env python3
# -*- coding: UTF-8 -*-
import time
from Queue import Queue
from threading import Thread
import conf
from request import Request
from downloader import Downloader
from spider import Spider
class Engine(object):
def __init__(self, downloader, spider=None):
self._counter = 0
# self._end_flag = 0
# self._request_queue = Queue(conf.REQUEST_QUEUE_MAXSIZE)
# self._response_queue = Queue(0)
self._order_record_queue = Queue(conf.REQUEST_QUEUE_MAXSIZE)
self._result_pool = dict()
self._downloader = downloader
self._spider = spider
listener = Thread(target=self._listen_result_pool)
listener.setDaemon(True)
listener.start()
def _package_request_obj2queue(self, url, item=None):
key = self._counter
self._counter = self._counter + 1
req = Request(key, url, self._put_result2result_pool_callback, item)
# self._request_queue.put(req)
self._order_record_queue.put(key)
return req
def _submit_request2downloader(self, req):
self._downloader.submit(req)
def _put_result2result_pool_callback(self, result):
self._result_pool[result["key"]] = result["value"]
def _listen_result_pool(self):
while 1:
if self._order_record_queue.empty():
# self._end_flag = 1
continue
# else:
# self._end_flag = 0
cur_key = self._order_record_queue.get()
while 1:
if cur_key in self._result_pool:
self._spider.parse(self._result_pool[cur_key])
break
else:
continue
def fire(self, url, item=None):
req = self._package_request_obj2queue(url, item)
self._submit_request2downloader(req)
# def fire(self):
# for i in xrange(10):
# self._package_request_obj2queue(i)
# for i in xrange(10):
# req = self._request_queue.get()
# self._submit_request2downloader(req)
# print "fire"
# listener = Thread(target=self._listen_result_pool)
# listener.setDaemon(True)
# listener.start()
# for i in xrange(10):
# print self._response_queue.get()
if __name__ == '__main__':
downloader = Downloader()
spider = Spider()
engine = Engine(downloader, spider)
for i in xrange(30):
engine.fire(i, str(i) + "item")