Skip to content

Commit

Permalink
[INF-2535] Update to use AWS SDK v3 for Javascript (#36)
Browse files Browse the repository at this point in the history
* [INF-2535] Update to use AWS SDK v3

* 0.3.0

* Make callback handling smaller.

* Fix initial log stream missing error handling.

* Update lib/log-stream.js

Co-authored-by: sushmithaeshwar <[email protected]>

* Update lib/taskrunner.js

Co-authored-by: sushmithaeshwar <[email protected]>

---------

Co-authored-by: sushmithaeshwar <[email protected]>
  • Loading branch information
ohookins and sushmithaeshwar authored Nov 22, 2023
1 parent 90fa3a4 commit a3e844d
Show file tree
Hide file tree
Showing 9 changed files with 2,642 additions and 453 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,9 @@ The `AWS_DEFAULT_REGION` environment variable has precendence over this setting.
### Example Module Usage

```
var ecsTaskRunner = require('ecs-task-runner');
const ecsTaskRunner = require('ecs-task-runner');
var options = {
const options = {
clusterArn: 'xxx',
taskDefinitionArn: 'xxx',
containerName: 'xxx',
Expand Down
42 changes: 20 additions & 22 deletions index.js
Original file line number Diff line number Diff line change
@@ -1,22 +1,20 @@
'use strict';

const _ = require('lodash');
const async = require('async');
const AWS = require('aws-sdk');
const combiner = require('stream-combiner');
const _ = require('lodash');
const async = require('async');
const { ECS } = require("@aws-sdk/client-ecs");
const combiner = require('stream-combiner');
const FormatStream = require('./lib/format-transform-stream');
const LogStream = require('./lib/log-stream');
const LogStream = require('./lib/log-stream');
const randomstring = require('randomstring');
const taskRunner = require('./lib/taskrunner');
const taskRunner = require('./lib/taskrunner');

module.exports = function(options, cb) {
AWS.config.update({
region: process.env.AWS_DEFAULT_REGION || options.region
});
module.exports = function (options, cb) {
const region = process.env.AWS_DEFAULT_REGION || options.region

var containerDefinition = null;
var loggingDriver = null;
var logOptions = null;
let containerDefinition = null;
let loggingDriver = null;
let logOptions = null;

// Generate a random string we will use to know when
// the log stream is finished.
Expand All @@ -26,9 +24,9 @@ module.exports = function(options, cb) {
});

async.waterfall([
function(next) {
const ecs = new AWS.ECS();
ecs.describeTaskDefinition({ taskDefinition: options.taskDefinitionArn }, function(err, result) {
function (next) {
const ecs = new ECS({ region: region });
ecs.describeTaskDefinition({ taskDefinition: options.taskDefinitionArn }, function (err, result) {
if (err) return next(err);

if (!result.taskDefinition || !result.taskDefinition.taskDefinitionArn) {
Expand All @@ -51,7 +49,7 @@ module.exports = function(options, cb) {
return next(new Error('Task definition networkMode is awsvpc, this requires you to specify subnets and security-groups.'));
}
}
else{
else {
if (options.subnets !== undefined || options.securityGroups !== undefined || options.assignPublicIp) {
return next(new Error('Network options are only allowed when task definition networkMode is awsvpc. You should not specify subnets, security-groups or assign-public-ip'));
}
Expand All @@ -61,8 +59,8 @@ module.exports = function(options, cb) {
next();
});
},
function(next) {
var params = {
function (next) {
const params = {
clusterArn: options.clusterArn,
cmd: options.cmd,
containerName: options.containerName,
Expand All @@ -78,11 +76,11 @@ module.exports = function(options, cb) {

taskRunner.run(params, next);
}
], function(err, taskDefinition) {
], function (err, taskDefinition) {
if (err) return cb(err);

const taskArn = taskDefinition.tasks[0].taskArn;
const taskId = taskArn.substring(taskArn.lastIndexOf('/')+1);
const taskId = taskArn.substring(taskArn.lastIndexOf('/') + 1);
const formatter = new FormatStream();

const logs = new LogStream({
Expand All @@ -91,7 +89,7 @@ module.exports = function(options, cb) {
endOfStreamIdentifier: endOfStreamIdentifier
});

var stream = combiner(logs, formatter);
const stream = combiner(logs, formatter);
stream.logStream = logs;
stream.taskRunner = taskRunner;
stream.taskId = taskId;
Expand Down
10 changes: 0 additions & 10 deletions lib/aws.js

This file was deleted.

101 changes: 52 additions & 49 deletions lib/log-stream.js
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
'use strict'

const _ = require('lodash');
const AWS = require('aws-sdk');
const moment = require('moment');
const _ = require('lodash');
const { CloudWatchLogs } = require("@aws-sdk/client-cloudwatch-logs");
const moment = require('moment');
const Readable = require('stream').Readable;

const region = process.env.AWS_DEFAULT_REGION || 'us-east-1';

// Take cloudwatchlogs.getLogEvents data and
// return logs events as strings
function renderLogEvents(data) {
var lines = _.map(data.events, function(event) {
return moment(event.timestamp).format('Y-MM-DD H:m:ss ZZ').cyan+' '+event.message;
const lines = _.map(data.events, function (event) {
return moment(event.timestamp).format('Y-MM-DD H:m:ss ZZ').cyan + ' ' + event.message;
});

return lines.join("\n");
Expand All @@ -22,80 +24,81 @@ class LogStream extends Readable {

this.options = _.defaults(options, {
durationBetweenPolls: 1000,
timeoutBeforeFirstLogs: 300*1000
timeoutBeforeFirstLogs: 300 * 1000
});

this.eventBuffer = [];
this.pending = false;
this.logsReceived = false;
this.streamEnded = false;
this.cloudwatchlogs = new AWS.CloudWatchLogs();
this.cloudwatchlogs = new CloudWatchLogs({ region: region });
this.stopRequested = false;
}

fetchLogs(_cb) {
this.pending = true;

var params = {
const params = {
logGroupName: this.options.logGroup,
logStreamName: this.options.logStream,
startFromHead: true,
nextToken: this.nextToken
};

var next = (_err, _data) => {
let next = (_err, _data) => {
setTimeout(this._read.bind(this), this.options.durationBetweenPolls);
};

this.cloudwatchlogs.getLogEvents(params, (err, data) => {
this.pending = false;

// Dismiss log stream not found. Log stream won't exist
// until container starts logging
if (err && 'ResourceNotFoundException' === err.code) return next();
if (err) return process.nextTick(() => this.emit('error', err));

if (data && data.events.length > 0) {
this.nextToken = data.nextForwardToken;
this.logsReceived = true;
data.events.forEach((event) => this.eventBuffer.push(event));
}

// If we haven't recieved any logs at all and timeoutBeforeFirstLogs duration has passed. Fail
if (!this.logsReceived && (Date.now() - this.startTime) > this.options.timeoutBeforeFirstLogs) {
let err = new Error(`No logs recieved before timeoutBeforeFirstLogs option set at '${this.options.timeoutBeforeFirstLogs}'`);
return process.nextTick(() => this.emit('error', err));
}

// Check to see if the log stream has ended. We listen for base64
// version of eof identifiter so that listener doesn't accidentally
// catch the eof identifier if container echo's out running command.
var endOfStreamIdentifierBase64 = new Buffer(this.options.endOfStreamIdentifier).toString('base64');
var endEvent = _.find(data.events, (event) => event.message.includes(endOfStreamIdentifierBase64));

if (this.stopRequested) {
this.exitCode = 130;
}

if (endEvent) {
this.streamEnded = true;

var matches = endEvent.message.match(/EXITCODE: (\d+)/);
if (matches) {
this.exitCode = matches[1] || 0;
this.pending = false;
this.cloudwatchlogs.getLogEvents(params)
.then((data) => {
if (data && data.events.length > 0) {
this.nextToken = data.nextForwardToken;
this.logsReceived = true;
data.events.forEach((event) => this.eventBuffer.push(event));
}
}

next();
});
// If we haven't recieved any logs at all and timeoutBeforeFirstLogs duration has passed. Fail
if (!this.logsReceived && (Date.now() - this.startTime) > this.options.timeoutBeforeFirstLogs) {
const err = new Error(`No logs recieved before timeoutBeforeFirstLogs option set at '${this.options.timeoutBeforeFirstLogs}'`);
return process.nextTick(() => this.emit('error', err));
}

// Check to see if the log stream has ended. We listen for base64
// version of eof identifiter so that listener doesn't accidentally
// catch the eof identifier if container echoes out running command.
const endOfStreamIdentifierBase64 = Buffer.from(this.options.endOfStreamIdentifier).toString('base64');
const endEvent = _.find(data.events, (event) => event.message.includes(endOfStreamIdentifierBase64));

if (this.stopRequested) {
this.exitCode = 130;
}

if (endEvent) {
this.streamEnded = true;

const matches = endEvent.message.match(/EXITCODE: (\d+)/);
if (matches) {
this.exitCode = matches[1] || 0;
}
}

next();
})
.catch((err) => {
// Dismiss log stream not found. Log stream won't exist
// until container starts logging
if (err && 'ResourceNotFoundException' === err.name) return next();
if (err) return process.nextTick(() => this.emit('error', err));
});
}

shutDown() {
this.stopRequested = true;
}

_read() {
var active = true;
let active = true;
while (active && this.eventBuffer.length) active = this.push(this.eventBuffer.shift());

// Downstream buffers are full. Lets give them 100ms to recover
Expand Down
16 changes: 11 additions & 5 deletions lib/taskrunner.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
'use strict'

const AWS = require('aws-sdk');
const { ECS } = require("@aws-sdk/client-ecs");

const region = process.env.AWS_DEFAULT_REGION || 'us-east-1';

module.exports = {
makeCmd: function (options) {
Expand All @@ -11,7 +13,7 @@ module.exports = {
},

run: function (options, cb) {
const ecs = new AWS.ECS();
const ecs = new ECS({ region: region });
const params = {
cluster: options.clusterArn,
taskDefinition: options.taskDefinitionArn,
Expand Down Expand Up @@ -52,17 +54,21 @@ module.exports = {
params.networkConfiguration.awsvpcConfiguration.securityGroups = options.securityGroups;
}

ecs.runTask(params, cb);
ecs.runTask(params)
.then(data => cb(null, data))
.catch(err => cb(err, null));
},

stop: function (options, cb) {
const ecs = new AWS.ECS();
const ecs = new ECS({ region: region });
const params = {
cluster: options.clusterArn,
task: options.taskId,
reason: options.reason
};

ecs.stopTask(params, cb);
ecs.stopTask(params)
.then(data => cb(null, data))
.catch(err => cb(err, null));
}
}
Loading

0 comments on commit a3e844d

Please sign in to comment.