diff --git a/index.js b/index.js index 30e7528..2062b30 100644 --- a/index.js +++ b/index.js @@ -480,7 +480,7 @@ function handler(event, context) { /* * process what happened if the iterative request to * write to the open pending batch timed out - * + * * TODO Can we force a rotation of the current batch * at this point? */ @@ -1490,102 +1490,138 @@ function handler(event, context) { }); } - /* end of runtime functions */ - - try { - logger.debug(JSON.stringify(event)); + exports.processS3EventRecord = function (event) { + logger.info("processS3EventRecord " + JSON.stringify(event)); if (!event.Records) { - // filter out unsupported events - logger.error("Event type unsupported by Lambda Redshift Loader"); - logger.info(JSON.stringify(event)); - context.done(null, null); + logger.error("The provided s3 event was not wellformed or was generated as a test event, this will be ignored."); + } else if (event.Records.length > 1) { + context.done(error, "Unable to process multi-record events"); } else { - if (event.Records.length > 1) { - context.done(error, "Unable to process multi-record events"); + var r = event.Records[0]; + + // ensure that we can process this event based on a variety + // of criteria + var noProcessReason; + if (r.eventSource !== "aws:s3") { + noProcessReason = "Invalid Event Source " + r.eventSource; + } + if (!(r.eventName === "ObjectCreated:Copy" || r.eventName === "ObjectCreated:Put" || r.eventName === 'ObjectCreated:CompleteMultipartUpload')) { + noProcessReason = "Invalid Event Name " + r.eventName; + } + if (r.s3.s3SchemaVersion !== "1.0") { + noProcessReason = "Unknown S3 Schema Version " + r.s3.s3SchemaVersion; + } + + if (noProcessReason) { + logger.error(noProcessReason); + context.done(error, noProcessReason); } else { - var r = event.Records[0]; + // extract the s3 details from the event + var inputInfo = { + bucket: undefined, + key: undefined, + prefix: undefined, + inputFilename: undefined + }; - // ensure that we can process this event based on a variety - // of criteria - var noProcessReason; - if (r.eventSource !== "aws:s3") { - noProcessReason = "Invalid Event Source " + r.eventSource; - } - if (!(r.eventName === "ObjectCreated:Copy" || r.eventName === "ObjectCreated:Put" || r.eventName === 'ObjectCreated:CompleteMultipartUpload')) { - noProcessReason = "Invalid Event Name " + r.eventName; - } - if (r.s3.s3SchemaVersion !== "1.0") { - noProcessReason = "Unknown S3 Schema Version " + r.s3.s3SchemaVersion; - } + inputInfo.bucket = r.s3.bucket.name; + inputInfo.key = decodeURIComponent(r.s3.object.key); - if (noProcessReason) { - logger.error(noProcessReason); - context.done(error, noProcessReason); - } else { - // extract the s3 details from the event - var inputInfo = { - bucket: undefined, - key: undefined, - prefix: undefined, - inputFilename: undefined - }; + // remove the bucket name from the key, if we have + // received it - this happens on object copy + inputInfo.key = inputInfo.key.replace(inputInfo.bucket + "/", ""); - inputInfo.bucket = r.s3.bucket.name; - inputInfo.key = decodeURIComponent(r.s3.object.key); + var keyComponents = inputInfo.key.split('/'); + inputInfo.inputFilename = keyComponents[keyComponents.length - 1]; - // remove the bucket name from the key, if we have - // received it - this happens on object copy - inputInfo.key = inputInfo.key.replace(inputInfo.bucket + "/", ""); + // remove the filename from the prefix value + var searchKey = inputInfo.key.replace(inputInfo.inputFilename, '').replace(/\/$/, ''); - var keyComponents = inputInfo.key.split('/'); - inputInfo.inputFilename = keyComponents[keyComponents.length - 1]; + // transform hive style dynamic prefixes into static + // match prefixes and set the prefix in inputInfo + inputInfo.prefix = inputInfo.bucket + '/' + searchKey.transformHiveStylePrefix(); - // remove the filename from the prefix value - var searchKey = inputInfo.key.replace(inputInfo.inputFilename, '').replace(/\/$/, ''); + // add the object size to inputInfo + inputInfo.size = r.s3.object.size; - // transform hive style dynamic prefixes into static - // match prefixes and set the prefix in inputInfo - inputInfo.prefix = inputInfo.bucket + '/' + searchKey.transformHiveStylePrefix(); + resolveConfig(inputInfo.prefix, function (err, configData) { + /* + * we did get a configuration found by the resolveConfig + * method + */ + if (err) { + logger.error(JSON.stringify(err)); + context.done(err, JSON.stringify(err)); + } else { + // update the inputInfo prefix to match the + // resolved + // config entry + inputInfo.prefix = configData.Item.s3Prefix.S; - // add the object size to inputInfo - inputInfo.size = r.s3.object.size; + logger.debug(JSON.stringify(inputInfo)); - resolveConfig(inputInfo.prefix, function (err, configData) { - /* - * we did get a configuration found by the resolveConfig - * method - */ - if (err) { - logger.error(JSON.stringify(err)); - context.done(err, JSON.stringify(err)); - } else { - // update the inputInfo prefix to match the - // resolved - // config entry - inputInfo.prefix = configData.Item.s3Prefix.S; + // call the foundConfig method with the data + // item + foundConfig(inputInfo, null, configData); + } + }, function (err) { + // finish with no exception - where this file sits + // in the S3 structure is not configured for redshift + // loads, or there was an access issue that prevented us + // querying DDB + logger.error("No Configuration Found for " + inputInfo.prefix); + if (err) { + logger.error(err); + } - logger.debug(JSON.stringify(inputInfo)); + context.done(err, JSON.stringify(err)); + }); + } + } + } - // call the foundConfig method with the data - // item - foundConfig(inputInfo, null, configData); - } - }, function (err) { - // finish with no exception - where this file sits - // in the S3 structure is not configured for redshift - // loads, or there was an access issue that prevented us - // querying DDB - logger.error("No Configuration Found for " + inputInfo.prefix); - if (err) { - logger.error(err); - } + /* end of runtime functions */ - context.done(err, JSON.stringify(err)); - }); + try { + logger.debug(JSON.stringify(event)); + + + if(!event.Records || event.Records.length == 0) { + // filter out unsupported events + logger.error("Event type unsupported by Lambda Redshift Loader"); + logger.info(JSON.stringify(event)); + return; + } + + //obtain the first record in order to establish the eventSource + var record = event.Records[0]; + + var noProcessReason; + + if (record.eventSource == "aws:s3") { //process the s3 event + logger.info("Processing message from S3 event source"); + + exports.processS3EventRecord(event); + } else if(record.eventSource == "aws:sqs") { //process the sqs message + logger.info("Processing " + event.Records.length + " message(s) from SQS event source."); + + //process the s3 event contained in the body of each sqs message. + event.Records.forEach(function(record) { + if(!record.body) { + noProcessReason = "Unable to process message body for event received via sqs event source, body was not present."; + logger.error(noProcessReason); + context.done(error, noProcessReason); } - } + var messageBody = JSON.parse(record.body); + + exports.processS3EventRecord(messageBody); + }); + } else { + noProcessReason = "Invalid Event Source " + record.eventSource; + logger.error(noProcessReason); + context.done(error, noProcessReason); } } catch (e) { logger.error("Unhandled Exception"); @@ -1595,4 +1631,4 @@ function handler(event, context) { } } -exports.handler = handler; \ No newline at end of file +exports.handler = handler;