-
Notifications
You must be signed in to change notification settings - Fork 16
/
index.js
160 lines (140 loc) · 5 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
/* Copyright 2017- Paul Brewer, Economic and Financial Technology Consulting LLC and contributors */
/* This file is open source software. The MIT License applies to this software. */
/* jshint esnext:true,eqeqeq:true,undef:true,lastsemic:true,strict:true,unused:true,node:true */
const fs = require('fs');
const archiver = require('archiver');
const promiseRetry = require('promise-retry');
const asyncPool = require('tiny-async-pool');
const backoffStrategy = {
retries: 3,
factor: 2,
minTimeout: 1000,
maxTimeout: 10000,
randomize: true
};
function suggestedName(fname, fromPath){
if ((!fromPath) || (fromPath.length === 0) || (fromPath === '/')) {
return fname;
}
const splitFrom = fromPath.split('/').filter(s => s.length > 0);
const index = splitFrom.length-1;
if (index <= 0) {
return fname;
}
const splitFname = fname.split('/').filter(s => s.length > 0);
if (splitFname.length > index) {
return splitFname.slice(index).join('/');
}
return fname;
}
const validateOptions = ({fromBucket, fromPath}) => {
if (typeof(fromBucket) !== 'string') {
throw new Error(`fromBucket should be of type 'string', got: ${typeof(fromBucket)}`);
}
if (typeof(fromPath) !== 'string') {
throw new Error(`fromPath should be of type 'string', got: ${typeof(fromPath)}`);
}
}
module.exports = (storage) => (options) => {
validateOptions(options);
const {fromBucket, fromPath, toBucket, toPath, keep, mapper, metadata, progress, downloadValidation} = options;
if ((!keep) && (!toBucket)) {
return Promise.resolve(null);
}
const manifest = [];
const zip = archiver('zip', {zlib: { level: 9 }});
zip.on('error', (e)=>{ throw e; });
let keepPromise;
let bucketPromise;
if (keep) {
const keepOutput = fs.createWriteStream(keep);
zip.pipe(keepOutput);
keepPromise = new Promise((resolve, reject) => {
keepOutput.on('close', resolve);
keepOutput.on('error', reject);
});
}
if (toBucket) {
const uploadOptions = {destination: toPath, validation: 'md5', metadata: {contentType: 'application/zip'}};
if (typeof(metadata) === 'object') {
uploadOptions.metadata.metadata = metadata;
}
logAction(`uploading zip file to gs://${toBucket}/${toPath}`);
const bucketOutput = storage.bucket(toBucket).file(toPath).createWriteStream(uploadOptions);
bucketPromise = new Promise((resolve, reject) => {
bucketOutput.on('finish', resolve);
bucketOutput.on('error', reject);
});
zip.pipe(bucketOutput);
}
function logAction(action) {
if (progress) {
console.log(action);
}
}
function getListOfFromFiles() {
return promiseRetry((retry) => storage.bucket(fromBucket).getFiles({prefix:fromPath}).catch(retry), backoffStrategy)
.then((data)=>(data[0]));
}
function zipFile(f) {
let pathInZip = suggestedName(f.name, fromPath);
if (typeof(mapper) === 'function') {
pathInZip = mapper(f, pathInZip);
if (!pathInZip) {
return false;
}
}
logAction(`adding gs://${fromBucket}/${f.name}`);
return new Promise(function(resolve, reject) {
const reader = storage.bucket(fromBucket).file(f.name).createReadStream({validation: downloadValidation});
reader.on('error', (e)=>{ reject(e); });
reader.on('end',() => {
manifest.push([f.name,pathInZip]);
resolve([f.name,pathInZip]);
});
zip.append(reader, {name: pathInZip});
});
}
async function zipEachFile(filelist) {
const {concurrentLimit = 1} = options;
const results = [];
for await (const result of asyncPool(concurrentLimit, filelist, zipFile)) {
results.push(result);
}
return results;
}
function finalize() {
logAction('finalizing zip file');
zip.finalize();
return Promise.all([keepPromise, bucketPromise]);
}
function checkZipExistsInBucket() {
if (toBucket) {
logAction(`confirming existence of zip file at gs://${toBucket}/${toPath}`);
return promiseRetry((retry)=>(storage
.bucket(toBucket)
.file(toPath)
.exists()
.then((info)=> {if (!(info[0])) throw new Error('checkZipExistsInBucket: zip file not found in storage bucket'); })
.catch(retry)
), backoffStrategy)
}
}
function successfulResult() {
logAction('finished');
return {
keep,
fromBucket,
fromPath,
toBucket,
toPath,
metadata,
manifest
};
}
return getListOfFromFiles()
.then(zipEachFile)
.then(finalize)
.then(checkZipExistsInBucket)
.then(successfulResult);
};