forked from jancurn/actor-residential-proxy-probe
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.js
290 lines (235 loc) · 10.3 KB
/
main.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
const _ = require('underscore');
const Apify = require('apify');
const moment = require('moment');
const usZipCodeToDma = require('./us_zip_code_to_dma');
const { utils: { log, sleep, requestAsBrowser: request } } = Apify;
// TODO: Make some of these input
const HEARTBEAT_INTERVAL_MILLIS = 20 * 1000;
const STORE_STATE_INTERVAL_MILLIS = 10 * 1000;
const MAX_SESSION_AGE_MILLIS = 50 * 1000;
const RANDOM_WAIT_BEFORE_REQUESTS_MILLIS = 10 * 1000;
// Global state, which is periodically stored into the key-value store
let state;
// Dictionary of session keys currently being probed, to ensure we don't probe same ones in parallel.
// Key is sessionKey, value is true.
const sessionKeysBeingRefreshed = {};
// Increments the stats value
const statsInc = (propName) => {
state.stats[propName] = (state.stats[propName] || 0) + 1;
};
// Generate random session key, always 9 chars long
const generateRandomSessionKey = () => {
return Math.floor((Math.random() * 99999999999999999)).toString(16).substr(0, 9);
};
const fatalError = (err) => {
log.exception(err, 'Fatal error');
process.exit(1);
};
// This is to spread HTTP requests over longer
const randomSleep = async () => {
await sleep(Math.random() * RANDOM_WAIT_BEFORE_REQUESTS_MILLIS);
};
const probeSession = async (sessionKey, countryCode) => {
await randomSleep();
const opts = {
// NOTE: Using HTTP instead of HTTPS because it consumes less residential traffic!
url: 'http://ip-api.com/json?fields=countryCode,regionName,city,zip,query',
proxyUrl: `http://groups-RESIDENTIAL,session-${sessionKey},country-${countryCode}:${process.env.APIFY_PROXY_PASSWORD}@proxy.apify.com:8000`,
responseType: 'json',
http2: false,
useHeaderGenerator: false,
};
const { body: json } = await request(opts);
if (!json || !json.query) throw new Error('Unexpected response body');
return {
ipAddress: json.query,
countryCode: json.countryCode,
regionName: json.regionName,
city: json.city,
postalCode: json.zip,
};
};
const addNewSession = async (input) => {
const sessionKey = generateRandomSessionKey();
let sessionInfo;
try {
statsInc('probesTotal');
sessionInfo = await probeSession(sessionKey, input.countryCode);
} catch (e) {
console.log(`Session ${sessionKey}: Probe failed "${e}"`);
statsInc('probesFailed');
return;
}
// console.log(`Session ${sessionKey}: ${JSON.stringify(sessionInfo)}`)
// No postal code?
if (!sessionInfo.postalCode) {
console.log(`Session ${sessionKey}: Missing postal code ${JSON.stringify(_.pick(sessionInfo, 'ipAddress', 'regionName', 'city', 'postalCode'))}`);
delete state.proxySessions[sessionKey];
statsInc('probesNoPostalCode');
return;
}
sessionInfo.dmaCode = input.countryCode.toLowerCase() === 'us' && usZipCodeToDma[sessionInfo.postalCode]
? usZipCodeToDma[sessionInfo.postalCode]
: null;
sessionInfo.foundAt = new Date();
sessionInfo.lastCheckedAt = sessionInfo.foundAt;
// console.log(`Session ${sessionKey}: ${JSON.stringify(sessionInfo)}`);
// If DMA or postal code is not in the requested set, forget the session and update stats
if (input.dmaCodes) {
if (!sessionInfo.dmaCode) {
console.log(`Session ${sessionKey}: DMA code not found ${JSON.stringify(_.pick(sessionInfo, 'ipAddress', 'regionName', 'city', 'postalCode'))}`);
delete state.proxySessions[sessionKey];
statsInc('probesDmaNotFound');
return;
}
if (!_.contains(input.dmaCodes, sessionInfo.dmaCode)) {
console.log(`Session ${sessionKey}: DMA code not matching`);
delete state.proxySessions[sessionKey];
statsInc('probesDmaMismatch');
return;
}
console.log(`Session ${sessionKey}: Matches DMA code ${sessionInfo.dmaCode} !!!`);
} else if (input.postalCodes) {
if (!_.contains(input.postalCodes, sessionInfo.postalCode)) {
console.log(`Session ${sessionKey}: Postal code not matching`);
delete state.proxySessions[sessionKey];
statsInc('probesPostalCodeMismatch');
return;
}
console.log(`Session ${sessionKey}: Matches postal code ${sessionInfo.postalCode} !!!`);
}
// Session matches the filter, save it
state.proxySessions[sessionKey] = sessionInfo;
statsInc('probesMatched');
};
const refreshExistingSession = async (input, sessionKey, sessionInfo) => {
// If refresh already in progress, skip it
if (sessionKeysBeingRefreshed[sessionKey]) return;
sessionKeysBeingRefreshed[sessionKey] = true;
let ipAddress;
try {
await randomSleep();
statsInc('refreshesTotal');
const opts = {
url: 'https://api.apify.com/v2/browser-info?skipHeaders=1',
proxyUrl: `http://groups-RESIDENTIAL,session-${sessionKey},country-${input.countryCode}:${process.env.APIFY_PROXY_PASSWORD}@proxy.apify.com:8000`,
responseType: 'json',
http2: false,
useHeaderGenerator: false,
};
const { body: result } = await request(opts);
if (!result || !result.clientIp) throw new Error('Invalid response from Apify API');
ipAddress = result.clientIp;
} catch (e) {
console.log(`Session ${sessionKey}: Refresh failed "${e}"`);
statsInc('refreshesFailed');
return;
} finally {
delete sessionKeysBeingRefreshed[sessionKey];
}
if (sessionInfo.ipAddress === ipAddress) {
sessionInfo.lastCheckedAt = new Date();
statsInc('refreshesIpSame');
return;
}
console.log(`Session ${sessionKey}: IP address changed, forgetting it`);
delete state.proxySessions[sessionKey];
statsInc('refreshesIpChanged');
};
const heartbeat = ({ input }) => {
const regionToSessionCount = {};
// First, iterate existing sessions and refresh them in background (send keep alive and validate IP is the same)
for (let [sessionKey, sessionInfo] of Object.entries(state.proxySessions)) {
const ageMillis = moment().diff(sessionInfo.lastCheckedAt, 'milliseconds');
// If session is not too old, consider it for region matching
if (ageMillis > 2 * MAX_SESSION_AGE_MILLIS) {
console.log(`Session ${sessionKey}: Expired, will be forgotten`);
delete state.proxySessions[sessionKey];
statsInc('expired');
continue;
}
// If session is not too old, consider it for region matching
if (ageMillis < MAX_SESSION_AGE_MILLIS) {
const region = input.dmaCodes ? sessionInfo.dmaCode : sessionInfo.postalCode;
const newCount = (regionToSessionCount[region] || 0) + 1;
if (input.maxSessionsPerRegion && newCount > input.maxSessionsPerRegion) {
console.log(`Session ${sessionKey}: Exceeded max session per region (${region}), will be forgotten `);
delete state.proxySessions[sessionKey];
statsInc('forgotten');
continue;
}
regionToSessionCount[region] = newCount;
}
refreshExistingSession(input, sessionKey, sessionInfo).catch(fatalError);
}
// Check how many live sessions we have per region, and if not enough, then launch new ones
const regions = input.dmaCodes ? input.dmaCodes : input.postalCodes;
let minPerRegion = Number.POSITIVE_INFINITY;
let maxPerRegion = Number.NEGATIVE_INFINITY;
let minRegion;
let maxRegion;
let missingSessions = 0;
for (let region of regions) {
if (!regionToSessionCount[region]) regionToSessionCount[region] = 0;
const count = regionToSessionCount[region];
if (count < input.minSessionsPerRegion) {
missingSessions += input.minSessionsPerRegion - count;
}
if (count < minPerRegion) {
minPerRegion = count;
minRegion = region;
}
if (count > maxPerRegion) {
maxPerRegion = count;
maxRegion = region;
}
}
if (minPerRegion === Number.POSITIVE_INFINITY) minPerRegion = 0;
if (maxPerRegion === Number.NEGATIVE_INFINITY) maxPerRegion = 0;
const totalSessions = Object.keys(state.proxySessions).length;
console.log(`Heartbeat: sessions: ${totalSessions} (${missingSessions} missing), minPerRegion: ${minPerRegion} (e.g. ${minRegion}), maxPerRegion: ${maxPerRegion} (e.g. ${maxRegion}), regionsCount: ${regions.length}`);
state.regionToSessionCount = regionToSessionCount;
if (missingSessions > 0) {
const newCount = Math.ceil(input.newSessionsPerHeartbeat);
console.log(`Probing ${newCount} new sessions`);
for (let i = 0; i < newCount; i++) {
addNewSession(input).catch(fatalError);
}
}
state.lastUpdatedAt = new Date();
};
const storeState = ({ input, keyValueStore }) => {
keyValueStore.setValue(input.recordKey, state).catch(fatalError);
};
Apify.main(async () => {
const input = await Apify.getInput();
// Pre-process and check selected regions
input.dmaCodes = input.dmaCodes ? input.dmaCodes.trim().split(/\s+/g) : null;
input.postalCodes = input.postalCodes ? input.postalCodes.trim().split(/\s+/g) : null;
if ((!input.dmaCodes || !input.dmaCodes.length) && (!input.postalCodes || !input.postalCodes.length)) {
throw new Error('Either "dmaCodes" or "postalCodes" input field must contain some values!');
}
if (input.dmaCodes && input.postalCodes) {
throw new Error('Cannot specify both "dmaCodes" or "postalCodes" together!');
}
const keyValueStore = await Apify.openKeyValueStore(input.keyValueStoreName);
state = await keyValueStore.getValue(input.recordKey);
if (!state || !_.isObject(state) || _.isArray(state)) {
state = {
stats: {},
proxySessions: {},
regionToSessionCount: null,
};
}
heartbeat({ input });
setInterval(() => {
heartbeat({ input });
}, HEARTBEAT_INTERVAL_MILLIS);
// Store state in a more frequent interval
storeState({ input, keyValueStore });
setInterval(() => {
storeState({ input, keyValueStore });
}, STORE_STATE_INTERVAL_MILLIS);
// Wait forever
return new Promise(() => {});
});