This repository has been archived by the owner on Jan 7, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 3
/
get.js
137 lines (130 loc) · 3.91 KB
/
get.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
var crypto = require('crypto')
var fs = require('fs')
var ndjson = require('ndjson')
var request = require('request')
var transform = require('unordered-parallel-transform')
var through = require('through2')
var mkdirp = require('mkdirp')
var blobStore = require('content-addressable-blob-store')
var pump = require('pump')
var dataDir = process.argv[2]
var excludeFile = process.argv[3]
if (!dataDir) throw new Error('must specify data directory')
var blobs = blobStore(dataDir)
var PARALLEL = 2048
var RETRIES = 10
var seen = {}
var agentOptions = {
keepAlive: true,
maxSockets: 10,
maxFreeSockets: 10,
timeout: 60000,
keepAliveMsecs: 30000,
rejectUnauthorized: false // ignores bad certs
}
var reqHeaders = {"user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/55.0.2883.95 Safari/537.36"}
if (excludeFile) {
var buffer = through()
var excludes = {}
var excludeCount = 0
process.stdin
.pipe(buffer)
fs.createReadStream(excludeFile)
.pipe(ndjson.parse())
.pipe(through.obj(function (obj, enc, next) {
if (!obj.file) return next()
blobs.exists({key: obj.file}, function (err, exists) {
if (err || !exists) return next()
var hash = crypto.createHash('sha256').update(obj.url).digest('hex')
if (!excludes[hash]) {
excludes[hash] = true
excludeCount++
}
next()
})
}, function end () {
console.error(JSON.stringify({excludeCount: excludeCount}))
buffer
.pipe(ndjson.parse())
.pipe(through.obj(function (obj, enc, next) {
var hash = crypto.createHash('sha256').update(obj.url).digest('hex')
if (excludes[hash]) {
console.error(JSON.stringify({skipping: obj.url}))
return next()
}
this.push(obj)
next()
}))
.pipe(transform(PARALLEL, getResponse))
.pipe(ndjson.serialize())
.pipe(process.stdout)
}))
} else {
process.stdin
.pipe(ndjson.parse())
.pipe(transform(PARALLEL, getResponse))
.pipe(ndjson.serialize())
.pipe(process.stdout)
}
function getResponse (item, cb) {
var tries = 0
tryDownload()
var retryTime = 0
function tryDownload (err) {
if (tries >= RETRIES) {
var msg = 'Max retries exceeded'
if (err) msg += ': ' + err.message
return error(new Error(msg))
}
tries++
var stats = {url: item.url, try: tries}
if (err) stats.error = err
console.error(JSON.stringify(stats))
var start = Date.now()
setTimeout(function () {
retryTime = 5000 // after first try
try {
var r = request(item.url, {agentOptions: agentOptions, headers: reqHeaders, time: true})
} catch (e) {
e.errType = 'requestInitError'
return error(e)
}
r.on('error', function (err) {
err.errType = 'reqStreamErr'
tryDownload(err)
})
r.on('response', function (re) {
var elapsed = Date.now() - start
var meta = {
redirects: r._redirect && r._redirect.redirects,
initialUrl: item.url,
url: r.uri.href,
date: new Date(),
headersTook: r.elapsedTime,
package_id: item.package_id,
id: item.id,
status: re.statusCode,
rawHeaders: re.rawHeaders,
headers: re.headers
}
var ws = blobs.createWriteStream()
pump(re, ws, function (err) {
if (err) {
err.errType = 'streamPumpErr'
return tryDownload(err)
}
meta.downloadTook = Date.now() - start
meta.file = ws.key
cb(null, meta)
})
})
}, retryTime)
function error (err) {
var obj = {url: item.url, date: new Date(), package_id: item.package_id, id: item.id, error: err}
if (err.code === 'ETIMEDOUT') {
obj.timeout = true
}
cb(null, obj)
}
}
}