-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathapp.js
369 lines (296 loc) · 11 KB
/
app.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
"use strict";
const AWS = require("aws-sdk");
AWS.config.update({ region: process.env.aws_region });
if (process.env.is_local != null) {
AWS.config.credentials = new AWS.SharedIniFileCredentials({ profile: 'klogger' });
process.env.NODE_TLS_REJECT_UNAUTHORIZED = "0";
}
const s3 = new AWS.S3();
const crypto = require("crypto");
const request = require("request-promise");
const MB_5 = 5242880; // S3상에서 머지(멀티파트업로드)가 가능한 최소 크기(5MB).
const MB_LARGE = MB_5 * 20; // 최종 머지할 크기(100M 이상~).
const SUFFIX_MERGED = "_fin"; // 최종 머지 파일 이름의 마지막에 붙을 문자열.
var context = null;
exports.handler = async (event) => {
var prefix;
if (event && event.prefix) {
prefix = event.prefix;
} else {
// 시간이 넘어가도 이전 시간에 대해 최종 머지할 수 있도록 시간을 뒤로 당긴다(호출 주기보다 커야 한다).
const addMinute = event && event.intervalMin ? -(event.intervalMin + 5) : -10;
prefix = buildPrefixByDate(addMinute);
}
console.log(`prefix: ${prefix}`);
context = {
prefix: prefix,
bucket: process.env.bucket_name
};
try {
await merge();
} catch (exception) {
console.error("=================== exception beg ===================");
console.error(exception);
await sendSlack("EXCEPTION", `\`\`\`${exception.stack}\`\`\``);
console.error("=================== exception end ===================");
}
return "fin.";
};
function buildPrefixByDate(addMinute = 0) {
const date = new Date(new Date().toUTCString()); // firehose가 UTC기반으로 시간까지 prefix를 붙여주는 것을 활용한다.
date.setMinutes(date.getMinutes() + addMinute);
const isoDate = date.toISOString(); // 2019-08-12T13:25:08.000Z
const tokens = isoDate.split("T"); // [2019-08-12, 13:25:08.000Z]
const ymd = tokens[0].replace(/\-/g, "/"); // 2019/08/12
const hour = tokens.pop().split(":")[0]; // 13
return `${ymd}/${hour}`; // "2019/08/05/13"
}
async function merge() {
// 온라인(S3)에서 머지할 수 없는 작은(5MB 이하)파일에 대한 머지.
var success = await mergeSmallFile();
if (success === false) {
console.error("error mergeSmallFile");
return;
}
console.log("ok. small file merge.");
// 멀티파트업로드를 이용해 온라인(S3)에서 머지할 수 있는 파일에 대한 머지.
success = await mergeLargeFile();
if (success === false) {
console.error("error mergeLargeFile");
return;
}
console.log("ok. large file merge.");
}
async function mergeSmallFile() {
// 전체 리스트 읽음.
const s3Objects = await s3.listObjects({
Bucket: context.bucket,
Prefix: context.prefix
}).promise();
// S3 상에서 머지가 불가능한 5MB 미만 파일 선택.
const under5MBs = selectObjectMinMax(s3Objects, 0, MB_5);
// 작은 것부터 머지할 수 있도록 오름차순 정렬.
under5MBs.sort((l, r) => l.Size - r.Size);
// 5MB 이상이 되도록 묶음.
const smallBundles = makeOverNSizeBundles(under5MBs, MB_5);
// 묶은 번들들을 로컬(람다)에서 머지하고 업로드.
const merges = await mergeOnLambda(smallBundles);
// 머지한 파일들 삭제.
const count = await deleteS3Objects(merges);
return count === merges.length;
}
async function mergeLargeFile() {
// 전체 리스트를 읽음.
const s3Objects = await s3.listObjects({
Bucket: context.bucket,
Prefix: context.prefix
}).promise();
// S3 상에서 머지가 가능한 5MB 이상 파일 선택.
const over5MBs = selectObjectMinMax(s3Objects, MB_5, MB_LARGE);
if (over5MBs.length <= 1) {
console.log("There are no files to merge."); // 병합할 파일이 없음.
return true;
}
// 작은 것부터 머지할 수 있도록 오름차순 정렬.
over5MBs.sort((l, r) => l.Size - r.Size);
// MB_LARGE 크기 이상이 되도록 묶음.
const largeBundles = makeOverNSizeBundles(over5MBs, MB_LARGE);
// 다 합해도 MB_LARGE 보다 크지 않으므로 남은 것들을 하나로 머지.
if (largeBundles.length <= 0) {
largeBundles.push(over5MBs);
}
const merges = await mergeOnS3(largeBundles);
const count = await deleteS3Objects(merges);
return count === merges.length;
}
// min 보다 크고(초과), max 보다 작은(이하) S3Object 배열 반환.
function selectObjectMinMax(s3Objects, min, max, skipSuffix = null) {
const selectObjects = [];
s3Objects.Contents.forEach(content => {
if (skipSuffix && content.Key.endsWith(skipSuffix)) {
console.log(`skip: ${content.Key}`);
return;
}
if (content.Size > min && content.Size <= max) {
selectObjects.push(content);
}
});
return selectObjects;
}
// size 보다 큰 번들의 리스트로 묶는다. [[번들], [번들], [번들], ...]
function makeOverNSizeBundles(s3Objects, size, skipSuffix = null) {
const overNSizeBundles = [];
var bundleSize = 0;
var bundle = [];
s3Objects.forEach(content => {
if (skipSuffix && content.Key.endsWith(skipSuffix)) {
console.log(`skip: ${content.Key}`);
return;
}
bundle.push(content);
bundleSize += content.Size;
if (bundleSize > size) {
overNSizeBundles.push(bundle);
bundle = [];
bundleSize = 0;
}
});
// 마지막 번들이 size를 넘지 못해 남은 경우.
if (bundle.length > 0) {
// 이미 묶인 마지막 번들에 추가한다.
if (overNSizeBundles.length > 0) {
overNSizeBundles[overNSizeBundles.length - 1].push(...bundle);
} else {
// 조건을 넘는 번들이 하나도 없다. 즉, 입력을 다 더해도 size가 안된다.
}
}
return overNSizeBundles;
}
async function mergeOnLambda(bundles) {
const tasks = [];
for (const bundle of bundles) {
const task = mergeOnLambdaBundle(bundle);
tasks.push(task);
}
const merges = await Promise.all(tasks);
return [].concat(...merges);
}
// bundle안에 있는 파일들을 받아서 하나의 파일로 합친 뒤 업로드한다.
async function mergeOnLambdaBundle(bundle) {
var mergedBody = null;
const fileName = buildMergedFileName(bundle, 1);
for (const content of bundle) {
const data = await s3.getObject({
Bucket: context.bucket,
Key: content.Key
}).promise();
mergedBody ? (mergedBody += data.Body) : (mergedBody = data.Body);
console.log(`merge on lambda: ${content.Key}`);
}
const putResult = await s3.putObject({
Bucket: context.bucket,
Key: `${context.prefix}/${fileName}`,
Body: mergedBody
}).promise();
if (putResult.ETag) {
console.log(`complete merge on lambda: ${fileName}`);
} else {
console.error(`fail merge on lambda: ${fileName}`);
}
return bundle;
}
function buildMergedFileName(bundle, phase, suffix = null) {
const sha1 = crypto.createHash("sha1");
const nameJoined = bundle.map(content => content.Key).join();
const hash = sha1.update(nameJoined, 'utf-8').digest('hex');
var fileName = `merged_phase${phase}_${hash}_${bundle.length}`;
if (suffix) {
fileName += suffix;
}
return fileName;
}
async function deleteS3Objects(s3Objects) {
if (s3Objects.length <= 0) {
return 0;
}
const params = {
Bucket: context.bucket,
Delete: {
Objects: []
}
};
for (const content of s3Objects) {
params.Delete.Objects.push({
Key: content.Key
});
console.log(`delete: ${content.Key}`);
}
const result = await s3.deleteObjects(params).promise();
return result.Deleted.length;
}
async function mergeOnS3(largeBundles) {
const tasks = [];
for (const bundle of largeBundles) {
const task = mergeOnS3Bundle(bundle);
tasks.push(task);
}
const merges = await Promise.all(tasks);
return [].concat(...merges);
}
// bundle안에 있는 파일들을 멀티파트업로드를 이용하여 온라인(S3) 상에서 머지한다.
// https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/S3.html#createMultipartUpload-property
async function mergeOnS3Bundle(bundle) {
const fileName = buildMergedFileName(bundle, 2, SUFFIX_MERGED);
const createResult = await s3.createMultipartUpload({
Bucket: context.bucket,
Key: `${context.prefix}/${fileName}`
}).promise();
var partNumber = 1;
const partNumberAndTasks = [];
for (const content of bundle) {
const uploadPartCopyTask = s3.uploadPartCopy({
Bucket: context.bucket,
CopySource: `${context.bucket}/${content.Key}`, // 버킷 이름이 필요함에 유의.
Key: createResult.Key,
PartNumber: partNumber,
UploadId: createResult.UploadId
}).promise();
partNumberAndTasks.push({ PartNumber: partNumber++, Task: uploadPartCopyTask });
console.log(`merge on s3: ${content.Key}`);
}
const parts = [];
for (const partNumberAndTask of partNumberAndTasks) {
const uploadResult = await partNumberAndTask.Task;
parts.push({ PartNumber: partNumberAndTask.PartNumber, ETag: uploadResult.CopyPartResult.ETag });
}
const completeResult = await s3.completeMultipartUpload({
Bucket: context.bucket,
Key: createResult.Key,
MultipartUpload: {
Parts: parts
},
UploadId: createResult.UploadId
}).promise();
if (completeResult.ETag) {
console.log(`complete merge on s3: ${fileName}`);
} else {
console.error(`fail merge on s3: ${fileName}`);
}
return bundle;
}
async function sendSlack(title, text, color = "danger") {
function formattedNow() {
const date = new Date();
date.setHours(date.getHours() + 9);
const now = (date).toISOString().replace(/T/, " ").replace(/\..+/, "");
return now;
}
if (process.env.slack_webhook_url == null) {
console.error("there is no slack webhook url.");
return;
}
const options = {
method: "POST",
url: process.env.slack_webhook_url,
headers: {
'Content-Type': "application/json"
},
json: {
channel: process.env.slack_webhook_channel,
username: "KLogger Merge Lambda",
icon_emoji: ":ddo_bug_ne:",
attachments: [
{
"title": title,
"text": text,
"footer": `\`${formattedNow()}\``,
"mrkdwn": true
}
]
}
};
const res = await request.post(options);
if (res !== "ok") {
console.error("Fail Send Slack. Result: ", res);
}
}