-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdowntune.js
129 lines (113 loc) · 3.34 KB
/
downtune.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
'use strict';
const Request = require('request-promise-native');
const cheerio = require('cheerio');
const log4js = require('log4js');
const fp = require('./src/fingerprint');
const factory = require('./src/factory');
class downtune {
constructor(rule) {
if(rule.entry && rule.entry.reqOpt) {
this.set = new Set();
this.rule = rule;
this.timeout = rule.timeout * 1000 || 30000;
this.concurrency = rule.concurrency || 100;
this.retry = rule.retry || 100;
this.ft = new factory(this.concurrency);
let reqOpt = rule.entry.reqOpt;
reqOpt = Array.isArray(reqOpt) ? reqOpt : [ reqOpt ];
this._more_ = reqOpt.length;
this.query = reqOpt.map(opt => ({
reqOpt : opt,
cmds : this.rule.entry,
retry: 0,
}));
this.logger = log4js.getLogger();
this.logger.level = rule.log_level || 'info';
} else {
throw new Error('no target to fetch');
}
}
async request(opt) {
opt = typeof opt === 'string' ? {
url : opt
} : opt;
opt.resolveWithFullResponse = true;
opt.timeout = this.timeout;
const _meta_ = opt._meta_ || {};
delete opt._meta_;
this.logger.info('Request : ', JSON.stringify(opt));
try {
const response = await Request(opt);
const $ = opt.json ? response.body : cheerio.load(response.body);
$._meta_ = _meta_;
$._header_ = response.headers;
return $;
} catch(err) {
throw new Error(err);
}
}
async handle(task) {
const cmds = task.cmds;
const reqOpt = task.reqOpt;
const fpId = fp(reqOpt);
const retry = task.retry;
if(this.set.has(fpId)) {
this._more_ -= 1;
this.logger.debug(`Already request : ${ JSON.stringify(reqOpt) }`);
return;
}
if(retry >= this.retry) {
this._more_ -= 1;
this.set.add(fpId);
this.logger.debug(`Max retry, Request : ${ JSON.stringify(reqOpt) }`);
return;
}
try {
const $ = await this.request(reqOpt);
if(cmds.link) {
const nets = cmds.link($);
for(const key in nets) {
let links = Array.isArray(nets[key]) ? nets[key] : [ nets[key] ];
links = links.filter( req => ! this.set.has(fp(req)) );
this._more_ += links.length;
links.map( link => this.query.push({
reqOpt : link,
cmds : this.rule[key],
retry: 0,
}));
}
}
if(cmds.item) {
this.logger.info(`Processing item from : ${ JSON.stringify(reqOpt) }`);
await cmds.item($);
}
this._more_ -= 1;
this.set.add(fpId);
} catch(err) {
this.set.delete(fpId);
this.query.push(Object.assign({}, task, { retry: retry + 1}));
err.message = `Error from handler ${JSON.stringify(task.reqOpt)} , ${err.message}`;
this.logger.error(new Error(err));
}
}
async start() {
await this.ft.run(done => {
if(this._more_ > 0) {
if(this.query.length > 0) {
try {
const task = this.query.shift();
this.handle(task).then(() => done());
} catch(err) {
throw new Error(err);
}
} else {
done();
}
} else {
this.ft.stop();
this.logger.info('All requests have been finished.');
}
});
}
}
module.exports = downtune;