Skip to content

ali-essam/wtsqs

Repository files navigation

WTSQS

npm version Build Status Coverage Status Dependencies Dev Dependencies

Simplified SQS Wrapper and Async Worker manager.

Features:

  • Simple interface. ✅
  • Promise based. ✅
  • ES6. ✅
  • Optimized async worker. ✅

Install

# Using npm
$ npm install wtsqs --save

# Or using yarn
$ yarn add wtsqs

Classes

WTSQS

A simplified sqs wrapper with interface similar to a normal queue data structure.

WTSQSWorker

WTSQS worker job manager.

WTSQSWorker takes care of asynchronously fetching jobs from sqs while processing other jobs concurrently. It also takes care of deleting a job from the queue after successfully processing the message.

Typedefs

Message : Object

Received SQS Message

Job : Object

Worker Job

WTSQS

A simplified sqs wrapper with interface similar to a normal queue data structure.

Kind: global class


new WTSQS(options)

Constructs WTSQS object.

Param Type Default Description
options Object Options object.
options.url String SQS queue url.
[options.accessKeyId] String AWS access key id.
[options.secretAccessKey] String AWS secret access key.
[options.region] String us-east-1 AWS regions where queue exists.
[options.defaultMessageGroupId] String FIFO queues only. Default tag assigned to a message that specifies it belongs to a specific message group. If not provided random uuid is assigned to each message which doesn't guarantee order but allows parallelism.
[options.defaultVisibilityTimeout] Integer 60 Default duration (in seconds) that the received messages are hidden from subsequent retrieve requests.
[options.defaultPollWaitTime] Integer 10 Default duration (in seconds) for which read calls wait for a message to arrive in the queue before returning.
[options.sqsOptions] Object Additional options to extend/override the underlying SQS object creation.

Example

const { WTSQS } = require('wtsqs')

// The most simple way to construct a WTSQS object
const wtsqs = new WTSQS({
  url: '//queue-url',
  accessKeyId: 'AWS_ACCESS_KEY_ID',
  secretAccessKey: 'AWS_SECRET_ACCESS_KEY'
})

wtsqs.size() ⇒ Promise.<integer>

Get approximate total number of messages in the queue.

Kind: instance method of WTSQS
Example

const size = await wtsqs.size()
console.log(size) // output: 2

wtsqs.enqueueOne(payload, [options], [sqsOptions]) ⇒ Promise

Enqueue single payload in the queue.

Kind: instance method of WTSQS
See: SQS#sendMessage

Param Type Default Description
payload Object JSON serializable object.
[options] Object Options.
[options.messageGroupId] String Message group id to override default id.
[sqsOptions] Object {} Additional options to extend/override the underlying SQS sendMessage request.

Example

const myObj = { a: 1 }
await wtsqs.enqueueOne(myObj)

wtsqs.enqueueMany(payloads, [options], [sqsOptions]) ⇒ Promise

Enqueue batch of payloads in the queue.

Kind: instance method of WTSQS
See: SQS#sendMessageBatch

Param Type Default Description
payloads Array.<Object> Array of JSON serializable objects.
[options] Object Options object.
[options.messageGroupId] String Message group id to override default id.
[sqsOptions] Object {} Additional options to extend/override the underlying SQS sendMessageBatch request.

Example

const myObjList = [{ a: 1 }, { b: 3 }]
await wtsqs.enqueueMany(myObjList)

wtsqs.peekOne([options], [sqsOptions]) ⇒ Promise.<(Message|null)>

Retrieve single message without deleting it.

Kind: instance method of WTSQS
Returns: Promise.<(Message|null)> - Message object or null if queue is empty.

Param Type Default Description
[options] Object Options object.
[options.pollWaitTime] Integer Duration (in seconds) for which read call waits for a message to arrive in the queue before returning. If no messages are available and the wait time expires, the call returns successfully with an empty list of messages.
[options.visibilityTimeout] Integer Duration (in seconds) that the received messages are hidden from subsequent retrieve requests.
[sqsOptions] Object {} Additional options to extend/override the underlying SQS receiveMessage request.

Example

const myMessage = await wtsqs.peekOne()
console.log(myMessage)
// output:
{
  id: 'messageId',
  receiptHandle: 'messageReceiptHandle'
  md5: 'messageMD5',
  body: { a: 1 }
}

wtsqs.peekMany([maxNumberOfMessages], [options], [sqsOptions]) ⇒ Promise.<Array.<Message>>

Retrieve batch of messages without deleting them.

Kind: instance method of WTSQS
Returns: Promise.<Array.<Message>> - Array of retrieved messages.
See: SQS#receiveMessage

Param Type Default Description
[maxNumberOfMessages] Number 10 Maximum number of messages to retrieve. Must be between 1 and 10.
[options] Object Options object.
[options.pollWaitTime] Integer Duration (in seconds) for which read call waits for a message to arrive in the queue before returning. If no messages are available and the wait time expires, the call returns successfully with an empty list of messages.
[options.visibilityTimeout] Integer Duration (in seconds) that the received messages are hidden from subsequent retrieve requests.
[sqsOptions] Object {} Additional options to extend/override the underlying SQS receiveMessage request.

Example

const myMessageList = await wtsqs.peekMany(2)
console.log(myMessageList)
// output:
[
 {
   id: 'messageId',
   receiptHandle: 'messageReceiptHandle'
   md5: 'messageMD5',
   body: { a: 1 }
 },
 {
   id: 'messageId',
   receiptHandle: 'messageReceiptHandle'
   md5: 'messageMD5',
   body: { b: 3 }
 }
]

wtsqs.deleteOne(message) ⇒ Promise

Delete single message from queue.

Kind: instance method of WTSQS
See: SQS#deleteMessage

Param Type Description
message Message Message to be deleted

Example

const myMessage = await wtsqs.peekOne()
await wtsqs.deleteOne(myMessage)

wtsqs.deleteMany(messages) ⇒ Promise

Delete batch of messages from queue.

Kind: instance method of WTSQS
See: SQS#deleteMessageBatch

Param Type Description
messages Array.<Message> Messages to be deleted

Example

const myMessageList = await wtsqs.peekMany(2)
await wtsqs.deleteMany(myMessageList)

wtsqs.deleteAll() ⇒ Promise

Delete ALL messages in the queue.

NOTE: Can only be called once every 60 seconds.

Kind: instance method of WTSQS
See: SQS#purgeQueue
Example

await wtsqs.deleteAll()

wtsqs.popOne([options], [sqsOptions]) ⇒ Promise.<(Message|null)>

Retrieve single message and immediately delete it.

Kind: instance method of WTSQS
Returns: Promise.<(Message|null)> - Message object or null if queue is empty.

Param Type Default Description
[options] Object Options object.
[options.pollWaitTime] Integer Duration (in seconds) for which read call waits for a message to arrive in the queue before returning. If no messages are available and the wait time expires, the call returns successfully with an empty list of messages.
[options.visibilityTimeout] Integer Duration (in seconds) that the received messages are hidden from subsequent retrieve requests.
[sqsOptions] Object {} Additional options to extend/override the underlying SQS receiveMessage request.

Example

const myMessage = await wtsqs.popOne()
// The message no longer exists in queue
console.log(myMessage)
// output:
{
  id: 'messageId',
  receiptHandle: 'messageReceiptHandle'
  md5: 'messageMD5',
  body: { a: 1 }
}

wtsqs.popMany([maxNumberOfMessages], [options], [sqsOptions]) ⇒ Promise.<Array.<Message>>

Retrieve batch of messages and immediately delete them.

Kind: instance method of WTSQS
Returns: Promise.<Array.<Message>> - Array of retrieved messages.

Param Type Default Description
[maxNumberOfMessages] Number 10 Maximum number of messages to retrieve. Must be between 1 and 10.
[options] Object Options object.
[options.pollWaitTime] Integer Duration (in seconds) for which read call waits for a message to arrive in the queue before returning. If no messages are available and the wait time expires, the call returns successfully with an empty list of messages.
[options.visibilityTimeout] Integer Duration (in seconds) that the received messages are hidden from subsequent retrieve requests.
[sqsOptions] Object {} Additional options to extend/override the underlying SQS receiveMessage request.

Example

const myMessageList = await wtsqs.popMany(2)
// Messages no longer exist in queue
console.log(myMessageList)
// output:
[
 {
   id: 'messageId',
   receiptHandle: 'messageReceiptHandle'
   md5: 'messageMD5',
   body: { a: 1 }
 },
 {
   id: 'messageId',
   receiptHandle: 'messageReceiptHandle'
   md5: 'messageMD5',
   body: { b: 3 }
 }
]

WTSQSWorker

WTSQS worker job manager.

WTSQSWorker takes care of asynchronously fetching jobs from sqs while processing other jobs concurrently. It also takes care of deleting a job from the queue after successfully processing the message.

Kind: global class


new WTSQSWorker(options)

Constructs WTSQSWorker object.

Param Type Default Description
options Object Options object.
options.wtsqs WTSQS WTSQS instance to use for connecting to sqs.
[options.maxConcurrency] Integer 20 Maximum number of concurrent jobs.
[options.pollWaitTime] Integer 5 Duration (in seconds) for which read calls wait for a job to arrive in the queue before returning.
[options.visibilityTimeout] Integer 30 Duration (in seconds) that the received jobs are hidden from subsequent retrieve requests.
[options.logger] Object | String Object with debug, info, warn, error methods to use for logging. Or a string with log level to use default internal logger.

Example

const { WTSQS, WTSQSWorker } = require('wtsqs')

const wtsqs = new WTSQS({
  url: '//queue-url',
  accessKeyId: 'AWS_ACCESS_KEY_ID',
  secretAccessKey: 'AWS_SECRET_ACCESS_KEY'
})

const worker = new WTSQSWorker({ wtsqs })

worker.run(async (job) => {
 await someAsyncFunction(job.body)
 console.log(job)
})

worker.run(handler)

Start fetching and processing jobs.

Kind: instance method of WTSQSWorker

Param Type Description
handler runHandler Async function to process a single job.

worker.shutdown() ⇒ Promise

Shutsdown the worker and drain active jobs.

Kind: instance method of WTSQSWorker
Returns: Promise - Resolves when all active jobs have been drained.


WTSQSWorker~runHandler ⇒ Promise

Async callback function to process single job.

Kind: inner typedef of WTSQSWorker

Param Type Description
job Job A single job to process

Message : Object

Received SQS Message

Kind: global typedef
Properties

Name Type Description
id String Message id.
receiptHandle String Message receipt handle.
md5 String Message body md5 hash sum.
body Object Message body containing original payload.

Job : Object

Worker Job

Kind: global typedef
Properties

Name Type Description
id String Job id.
receiptHandle String Job receipt handle.
md5 String Job body md5 hash sum.
body Object Job body containing original payload.