Skip to content

chore: upgrade aws-sdk to v3 for Pod Identity support #37

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 56 additions & 0 deletions codemod-changes-2025-07-23T16-54-50-498Z.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# AWS SDK v2 to v3 Codemod Changes Report

**Generated:** 7/23/2025, 6:54:50 PM
**Files Processed:** 2
**Files Changed:** 2

## Summary

Successfully processed 2 files

## Changed Files

### 1. `dist/consumer.d.ts`

**Changes:** 72 modified, 1 added, 0 removed lines

**Key Changes Preview:**

```diff
- import { EventEmitter } from 'events';
+ import { Message, SQS } from '@aws-sdk/client-sqs';
- declare type SQSMessage = SQS.Types.Message;
+ import { EventEmitter } from 'events';
- export interface ConsumerOptions {
+ declare type SQSMessage = Message;
- queueUrl?: string;
+ export interface ConsumerOptions {
- attributeNames?: string[];
+ queueUrl?: string;
```

### 2. `src/consumer.ts`

**Changes:** 1 AWS SDK v2 imports → 1 v3 imports

**Key Changes Preview:**

```diff
- import { AWSError } from 'aws-sdk';
+ import { ServiceException } from '@smithy/smithy-client';
- import * as SQS from 'aws-sdk/clients/sqs';
+ import { Message, ReceiveMessageCommandInput, ReceiveMessageCommandOutput, SQS } from '@aws-sdk/client-sqs';
- import { PromiseResult } from 'aws-sdk/lib/request';
+ import * as SQS from 'aws-sdk/clients/sqs';
- import * as Debug from 'debug';
+ import { PromiseResult } from 'aws-sdk/lib/request';
- import * as crypto from 'crypto';
+ import * as Debug from 'debug';
```

## Next Steps

1. Review the changes in each file
2. Test your application to ensure everything works
3. Update any tests that may need AWS SDK v3 adjustments

2 changes: 1 addition & 1 deletion dist/bind.d.ts
Original file line number Diff line number Diff line change
@@ -1 +1 @@
export declare function autoBind(obj: object): void;
export declare function autoBind(obj: Record<string, any>): void;
6 changes: 2 additions & 4 deletions dist/bind.js
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
'use strict';
Object.defineProperty(exports, "__esModule", { value: true });
function isMethod(propertyName, value) {
return propertyName !== 'constructor' && typeof value === 'function';
}
exports.autoBind = void 0;
function autoBind(obj) {
const propertyNames = Object.getOwnPropertyNames(obj.constructor.prototype);
propertyNames.forEach((propertyName) => {
const value = obj[propertyName];
if (isMethod(propertyName, value)) {
if (propertyName !== 'constructor' && typeof value === 'function') {
obj[propertyName] = value.bind(obj);
}
});
Expand Down
12 changes: 6 additions & 6 deletions dist/consumer.d.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import * as SQS from 'aws-sdk/clients/sqs';
/// <reference types="node" />
import { Message, SQS, QueueAttributeName } from '@aws-sdk/client-sqs';
import { EventEmitter } from 'events';
declare type SQSMessage = SQS.Types.Message;
type SQSMessage = Message;
export interface ConsumerOptions {
queueUrl?: string;
attributeNames?: string[];
attributeNames?: QueueAttributeName[];
messageAttributeNames?: string[];
stopped?: boolean;
concurrencyLimit?: number;
Expand All @@ -28,7 +29,7 @@ export interface ConsumerOptions {
export declare class Consumer extends EventEmitter {
private queueUrl;
private handleMessage;
private handleMessageBatch;
private handleMessageBatch?;
private pollingStartedInstrumentCallback?;
private pollingFinishedInstrumentCallback?;
private batchStartedInstrumentCallBack?;
Expand All @@ -50,7 +51,7 @@ export declare class Consumer extends EventEmitter {
private inFlightMessages;
private sqs;
constructor(options: ConsumerOptions);
readonly isRunning: boolean;
get isRunning(): boolean;
static create(options: ConsumerOptions): Consumer;
start(): void;
stop(): void;
Expand All @@ -61,7 +62,6 @@ export declare class Consumer extends EventEmitter {
private reportNumberOfMessagesReceived;
private handleSqsResponse;
private processMessage;
private receiveMessage;
private deleteMessage;
private executeHandler;
private terminateVisabilityTimeout;
Expand Down
103 changes: 61 additions & 42 deletions dist/consumer.js
Original file line number Diff line number Diff line change
@@ -1,12 +1,39 @@
"use strict";
var __createBinding = (this && this.__createBinding) || (Object.create ? (function(o, m, k, k2) {
if (k2 === undefined) k2 = k;
var desc = Object.getOwnPropertyDescriptor(m, k);
if (!desc || ("get" in desc ? !m.__esModule : desc.writable || desc.configurable)) {
desc = { enumerable: true, get: function() { return m[k]; } };
}
Object.defineProperty(o, k2, desc);
}) : (function(o, m, k, k2) {
if (k2 === undefined) k2 = k;
o[k2] = m[k];
}));
var __setModuleDefault = (this && this.__setModuleDefault) || (Object.create ? (function(o, v) {
Object.defineProperty(o, "default", { enumerable: true, value: v });
}) : function(o, v) {
o["default"] = v;
});
var __importStar = (this && this.__importStar) || function (mod) {
if (mod && mod.__esModule) return mod;
var result = {};
if (mod != null) for (var k in mod) if (k !== "default" && Object.prototype.hasOwnProperty.call(mod, k)) __createBinding(result, mod, k);
__setModuleDefault(result, mod);
return result;
};
var __importDefault = (this && this.__importDefault) || function (mod) {
return (mod && mod.__esModule) ? mod : { "default": mod };
};
Object.defineProperty(exports, "__esModule", { value: true });
const SQS = require("aws-sdk/clients/sqs");
const Debug = require("debug");
const crypto = require("crypto");
exports.Consumer = void 0;
const client_sqs_1 = require("@aws-sdk/client-sqs");
const debug_1 = __importDefault(require("debug"));
const crypto = __importStar(require("crypto"));
const events_1 = require("events");
const bind_1 = require("./bind");
const errors_1 = require("./errors");
const debug = Debug('sqs-consumer');
const debug = (0, debug_1.default)('sqs-consumer');
const requiredOptions = [
'queueUrl',
// only one of handleMessage / handleMessagesBatch is required
Expand All @@ -22,7 +49,7 @@ function createTimeout(duration) {
reject(new errors_1.TimeoutError());
}, duration);
});
return [timeout, pending];
return { timeout: timeout, pending };
}
function assertOptions(options) {
requiredOptions.forEach((option) => {
Expand All @@ -31,7 +58,7 @@ function assertOptions(options) {
throw new Error(`Missing SQS consumer option [ ${possibilities.join(' or ')} ].`);
}
});
if (options.batchSize > 10 || options.batchSize < 1) {
if (options.batchSize && (options.batchSize > 10 || options.batchSize < 1)) {
throw new Error('SQS batchSize option must be between 1 and 10.');
}
}
Expand All @@ -48,17 +75,18 @@ function isNonExistentQueueError(err) {
return false;
}
function toSQSError(err, message) {
var _a, _b, _c, _d;
const sqsError = new errors_1.SQSError(message);
sqsError.code = err.code;
sqsError.statusCode = err.statusCode;
sqsError.region = err.region;
sqsError.retryable = err.retryable;
sqsError.hostname = err.hostname;
sqsError.time = err.time;
sqsError.code = err.name || 'UnknownError';
sqsError.statusCode = ((_a = err.$metadata) === null || _a === void 0 ? void 0 : _a.httpStatusCode) || 500;
sqsError.region = ((_b = err.$metadata) === null || _b === void 0 ? void 0 : _b.cfId) || '';
sqsError.retryable = ((_c = err.$retryable) === null || _c === void 0 ? void 0 : _c.throttling) || false;
sqsError.hostname = ((_d = err.$metadata) === null || _d === void 0 ? void 0 : _d.extendedRequestId) || '';
sqsError.time = new Date();
return sqsError;
}
function hasMessages(response) {
return response.Messages && response.Messages.length > 0;
return !!(response.Messages && response.Messages.length > 0);
}
function addMessageUuidToError(error, message) {
try {
Expand All @@ -73,21 +101,21 @@ class Consumer extends events_1.EventEmitter {
super();
assertOptions(options);
this.queueUrl = options.queueUrl;
this.handleMessage = options.handleMessage;
this.handleMessage = options.handleMessage || (async () => { });
this.handleMessageBatch = options.handleMessageBatch;
this.pollingStartedInstrumentCallback = options.pollingStartedInstrumentCallback;
this.pollingFinishedInstrumentCallback = options.pollingFinishedInstrumentCallback;
this.batchStartedInstrumentCallBack = options.batchStartedInstrumentCallBack;
this.batchFinishedInstrumentCallBack = options.batchFinishedInstrumentCallBack;
this.batchFailedInstrumentCallBack = options.batchFailedInstrumentCallBack;
this.handleMessageTimeout = options.handleMessageTimeout;
this.handleMessageTimeout = options.handleMessageTimeout || 0;
this.attributeNames = options.attributeNames || [];
this.messageAttributeNames = options.messageAttributeNames || [];
this.stopped = true;
this.batchSize = options.batchSize || 1;
this.concurrencyLimit = options.concurrencyLimit || 30;
this.freeConcurrentSlots = this.concurrencyLimit;
this.visibilityTimeout = options.visibilityTimeout;
this.visibilityTimeout = options.visibilityTimeout || 0;
this.terminateVisibilityTimeout = options.terminateVisibilityTimeout || false;
this.waitTimeSeconds = options.waitTimeSeconds || 20;
this.authenticationErrorTimeout = options.authenticationErrorTimeout || 10000;
Expand All @@ -96,10 +124,10 @@ class Consumer extends events_1.EventEmitter {
this.inFlightMessages = 0;
this.sqs =
options.sqs ||
new SQS({
new client_sqs_1.SQS({
region: options.region || process.env.AWS_REGION || 'eu-west-1',
});
bind_1.autoBind(this);
(0, bind_1.autoBind)(this);
}
get isRunning() {
return !this.stopped;
Expand Down Expand Up @@ -160,7 +188,7 @@ class Consumer extends events_1.EventEmitter {
debug('Received SQS response');
debug(response);
const hasResponseWithMessages = !!response && hasMessages(response);
const numberOfMessages = hasResponseWithMessages ? response.Messages.length : 0;
const numberOfMessages = hasResponseWithMessages && response.Messages ? response.Messages.length : 0;
if (this.pollingFinishedInstrumentCallback) {
// instrument pod how many messages received
this.pollingFinishedInstrumentCallback({
Expand All @@ -171,8 +199,8 @@ class Consumer extends events_1.EventEmitter {
});
}
if (response) {
if (hasMessages(response)) {
if (this.handleMessageBatch) {
if (hasMessages(response) && response.Messages) {
if (this.handleMessageBatch !== undefined) {
// prefer handling messages in batch when available
await this.processMessageBatch(response.Messages);
}
Expand Down Expand Up @@ -205,34 +233,25 @@ class Consumer extends events_1.EventEmitter {
}
}
}
async receiveMessage(params) {
try {
return await this.sqs.receiveMessage(params).promise();
}
catch (err) {
throw toSQSError(err, `SQS receive message failed: ${err.message}`);
}
}
async deleteMessage(message) {
debug('Deleting message %s', message.MessageId);
const deleteParams = {
QueueUrl: this.queueUrl,
ReceiptHandle: message.ReceiptHandle,
};
try {
await this.sqs.deleteMessage(deleteParams).promise();
await this.sqs.send(new client_sqs_1.DeleteMessageCommand(deleteParams));
}
catch (err) {
throw toSQSError(err, `SQS delete message failed: ${err.message}`);
}
}
async executeHandler(message) {
let timeout;
let pending;
let timeoutResponse;
try {
if (this.handleMessageTimeout) {
[timeout, pending] = createTimeout(this.handleMessageTimeout);
await Promise.race([this.handleMessage(message), pending]);
timeoutResponse = createTimeout(this.handleMessageTimeout);
await Promise.race([this.handleMessage(message), timeoutResponse.pending]);
}
else {
await this.handleMessage(message);
Expand All @@ -249,17 +268,17 @@ class Consumer extends events_1.EventEmitter {
throw err;
}
finally {
clearTimeout(timeout);
if (timeoutResponse) {
clearTimeout(timeoutResponse.timeout);
}
}
}
async terminateVisabilityTimeout(message) {
return this.sqs
.changeMessageVisibility({
return this.sqs.send(new client_sqs_1.ChangeMessageVisibilityCommand({
QueueUrl: this.queueUrl,
ReceiptHandle: message.ReceiptHandle,
VisibilityTimeout: 0,
})
.promise();
}));
}
emitError(err, message) {
if (err.name === errors_1.SQSError.name) {
Expand All @@ -272,12 +291,12 @@ class Consumer extends events_1.EventEmitter {
this.emit('processing_error', err, message, this.queueUrl);
}
}
poll() {
async poll() {
if (this.stopped) {
if (this.inFlightMessages < 0) {
debug('Consumer is stopped and there are negative in-flight messages');
const err = new Error('Negative in-flight messages');
this.emitError(err, null);
this.emitError(err, {});
}
else if (this.inFlightMessages === 0) {
debug('Consumer is stopped and there are no in-flight messages');
Expand Down Expand Up @@ -308,7 +327,7 @@ class Consumer extends events_1.EventEmitter {
WaitTimeSeconds: this.waitTimeSeconds,
VisibilityTimeout: this.visibilityTimeout,
};
this.receiveMessage(receiveParams)
this.sqs.send(new client_sqs_1.ReceiveMessageCommand(receiveParams))
.then(this.handleSqsResponse)
.catch((err) => {
this.emit('unhandled_error', err, this.queueUrl);
Expand Down
5 changes: 2 additions & 3 deletions dist/errors.d.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
declare class SQSError extends Error {
export declare class SQSError extends Error {
code: string;
statusCode: number;
region: string;
Expand All @@ -7,7 +7,6 @@ declare class SQSError extends Error {
retryable: boolean;
constructor(message: string);
}
declare class TimeoutError extends Error {
export declare class TimeoutError extends Error {
constructor(message?: string);
}
export { SQSError, TimeoutError };
14 changes: 10 additions & 4 deletions dist/errors.js
Original file line number Diff line number Diff line change
@@ -1,16 +1,22 @@
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.TimeoutError = exports.SQSError = void 0;
class SQSError extends Error {
constructor(message) {
super(message);
this.name = this.constructor.name;
this.code = '';
this.statusCode = 500;
this.region = '';
this.hostname = '';
this.time = new Date();
this.retryable = false;
this.name = 'SQSError';
}
}
exports.SQSError = SQSError;
class TimeoutError extends Error {
constructor(message = 'Operation timed out.') {
super(message);
this.message = message;
constructor(message) {
super(message || 'Operation timed out');
this.name = 'TimeoutError';
}
}
Expand Down
3 changes: 2 additions & 1 deletion dist/index.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.Consumer = void 0;
var consumer_1 = require("./consumer");
exports.Consumer = consumer_1.Consumer;
Object.defineProperty(exports, "Consumer", { enumerable: true, get: function () { return consumer_1.Consumer; } });
Loading