Skip to content

Commit

Permalink
feat: implement reusify, rework deduping (#15)
Browse files Browse the repository at this point in the history
* feat: adopt reusify, rework deduping by enabling cache-less dedupe
* reduce GC & implementing dedupe by passing `cache: 0`
* chore: use performance.now instead of date.now
* fix: outstanding sequence set overflow when noAck / qos was disabled
  • Loading branch information
AVVS committed Nov 15, 2023
1 parent 4310ddd commit 2a1cfd3
Show file tree
Hide file tree
Showing 37 changed files with 1,478 additions and 530 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@ npm-debug.log
/test/ssl/server
/test/ssl/client
.swc
.env
.env
*.heapsnapshot
5 changes: 4 additions & 1 deletion packages/amqp-codec/.eslintrc.json
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
{
"extends": "makeomatic/typescript"
"extends": "makeomatic/typescript",
"rules": {
"@typescript-eslint/no-unsafe-declaration-merging": 0
}
}
2 changes: 1 addition & 1 deletion packages/amqp-codec/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
},
"repository": {
"type": "git",
"url": "https://github.com/microfleet/amqp-coffee.git",
"url": "https://github.com/microfleet/amqp.git",
"directory": "packages/amqp-codec"
},
"scripts": {
Expand Down
2 changes: 1 addition & 1 deletion packages/amqp-codec/src/parser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ function parseHeaderFrame(parser: Parser, buffer: Buffer): ContentHeader | Error

function parseBodyFrame(parser: Parser, buffer: Buffer, frameSize: number): Content {
const frameEnd = parser.offset + frameSize
const data = buffer.slice(parser.offset, frameEnd)
const data = buffer.subarray(parser.offset, frameEnd)
parser.offset = frameEnd
return { type: FrameType.BODY, data }
}
Expand Down
4 changes: 2 additions & 2 deletions packages/amqp-codec/src/serializer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,7 @@ function* encodeBody(serializer: Serializer, channel: number, args: Content): Ge
buffer.copy(header, 0, 0, 7)

yield header
yield body.slice(offset, offset + length)
yield body.subarray(offset, offset + length)
yield EndFrame

offset += length
Expand Down Expand Up @@ -495,7 +495,7 @@ export class Serializer {
}

if (frameSize < this.maxFrameSize) {
this.buffer = this.buffer.slice(0, frameSize)
this.buffer = this.buffer.subarray(0, frameSize)
} else {
this.buffer = Buffer.allocUnsafe(frameSize)
}
Expand Down
3 changes: 3 additions & 0 deletions packages/amqp-coffee/.eslintrc.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,8 @@
"extends": "makeomatic/typescript",
"parserOptions": {
"ecmaVersion": "latest"
},
"rules": {
"@typescript-eslint/no-unsafe-declaration-merging": 0
}
}
4 changes: 2 additions & 2 deletions packages/amqp-coffee/.mdeprc.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ const root = `/src/packages/${dir}/node_modules/.bin`

module.exports = {
...require('../../.mdeprc.js'),
node: "18",
node: "20",
auto_compose: true,
services: ["rabbitmq"],
tests: "./test/**/*.test.{ts,coffee}",
Expand All @@ -23,7 +23,7 @@ module.exports = {
working_dir: `/src/packages/${dir}`,
environment: {
AMQP_TEST: '1', // this must be enable to have low reconnection timer
// AMQP: '8',
// AMQP: '4',
// DEBUG: 'amqp:*',
SWC_NODE_PROJECT: './tsconfig.test.json',
NODE_NO_WARNINGS: '1',
Expand Down
4 changes: 2 additions & 2 deletions packages/amqp-coffee/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,11 @@
},
"repository": {
"type": "git",
"url": "https://github.com/microfleet/amqp-coffee.git",
"url": "https://github.com/microfleet/amqp.git",
"directory": "packages/amqp-coffee"
},
"engines": {
"node": ">= 14.17"
"node": ">= 18.17"
},
"files": [
"bin/src/",
Expand Down
13 changes: 7 additions & 6 deletions packages/amqp-coffee/src/lib/channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import { Connection, ConnectionState } from './connection'
import { ServerClosedError, ConnectionResetError } from './errors'
import { promisify } from 'util'
import { noop } from 'lodash'
import { performance } from 'node:perf_hooks'

const debug = _debug('amqp:Channel')

Expand Down Expand Up @@ -87,7 +88,7 @@ export abstract class Channel extends EventEmitter {

temporaryChannel() {
this.transactional = true // THIS IS NOT AMQP TRANSACTIONS
this.lastChannelAccess = Date.now()
this.lastChannelAccess = performance.now()

if (process.env.AMQP_TEST != null) {
this.connection.connectionOptions.temporaryChannelTimeout = 200
Expand All @@ -97,7 +98,7 @@ export abstract class Channel extends EventEmitter {
if (this.channelTracker == null) {
const { temporaryChannelTimeout, temporaryChannelTimeoutCheck } = this.connection.connectionOptions
this.channelTracker = setInterval(() => {
if (this.lastChannelAccess < (Date.now() - temporaryChannelTimeout)) {
if (this.lastChannelAccess < (performance.now() - temporaryChannelTimeout)) {
debug(4, () => [this.channel, 'Closing channel due to inactivity'])
this.close(true)
}
Expand Down Expand Up @@ -138,7 +139,7 @@ export abstract class Channel extends EventEmitter {
|| this.listeners('open').length > 0
|| (
this.transactional
&& this.lastChannelAccess > (Date.now() - this.connection.connectionOptions.temporaryChannelTimeout)
&& this.lastChannelAccess > (performance.now() - this.connection.connectionOptions.temporaryChannelTimeout)
))
) {
debug(1, () => 'State is closed... reconnecting')
Expand Down Expand Up @@ -279,13 +280,13 @@ export abstract class Channel extends EventEmitter {
this.queue.push({ type: TaskType.method, method, args, okMethod, cb })
}

queuePublish<T extends Methods>(method: T, data: any, options: InferOptions<T>): void {
queuePublish<T extends Methods>(method: T, data: any, options: Partial<InferOptions<T>>): void {
this.queue.push({ type: TaskType.publish, method, data, options })
}

async _taskWorker(task: Task): Promise<void> {
if (this.transactional) {
this.lastChannelAccess = Date.now()
this.lastChannelAccess = performance.now()
}

const { type, method, okMethod, cb, data, preflight } = task
Expand Down Expand Up @@ -357,7 +358,7 @@ export abstract class Channel extends EventEmitter {
// incomming channel messages for us
_onChannelMethod<T extends MethodFrame>(channel: number, frame: T) {
if (this.transactional) {
this.lastChannelAccess = Date.now()
this.lastChannelAccess = performance.now()
}

if (channel !== this.channel) {
Expand Down
2 changes: 1 addition & 1 deletion packages/amqp-coffee/src/lib/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ export class Connection extends EventEmitter {
return consumerChannel
}

public async publish(exchange: string, routingKey: string, data: any, options: Partial<PublishOptions> = {}): Promise<void> {
public async publish(exchange: string, routingKey: string, data: any, options: Partial<PublishOptions> = Object.create(null)): Promise<void> {
const publishChannel = this.channelManager.publisherChannel(options.confirm)
return publishChannel.publishAsync(exchange, routingKey, data, options)
}
Expand Down
38 changes: 28 additions & 10 deletions packages/amqp-coffee/src/lib/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,11 @@ export class Consumer extends Channel {
this.consumeOptions = consumeOptions
this.qosOptions = qosOptions

// select correct version
this._onMessageCreated = this.qosEnabled()
? this._onMessageCreatedWithQos
: this._onMessageCreatedNoQos

return this._consume()
}

Expand Down Expand Up @@ -306,7 +311,7 @@ export class Consumer extends Channel {
}

qosEnabled(): boolean {
return this.qos && this.consumeOptions?.noAck !== true
return this.qos && this.consumeOptions!.noAck !== true
}

deliveryOutstanding(deliveryTag: number): boolean {
Expand Down Expand Up @@ -395,18 +400,31 @@ export class Consumer extends Channel {
}

if (incomingMessage!.ready()) {
incomingMessage!.create(this).then((message) => {
debug(4, () => ['message ready', message.deliveryTag, message.properties])
incomingMessage!.create(this)
}
}

const { deliveryTag } = message
if (deliveryTag !== undefined) {
this.outstandingDeliveryTags.add(deliveryTag)
debug(4, () => ['outstanding tags', this.outstandingDeliveryTags.size])
}
// eslint-disable-next-line @typescript-eslint/no-unused-vars
_onMessageCreated(message: Message): void {
throw new Error('must be overwritten')
}

this.messageHandler!(message)
})
_onMessageCreatedNoQos(message: Message) {
debug(4, () => ['message ready', message.deliveryTag, message.properties])

this.messageHandler!(message)
}

_onMessageCreatedWithQos(message: Message) {
debug(4, () => ['message ready', message.deliveryTag, message.properties])

const { deliveryTag } = message
if (deliveryTag !== undefined) {
this.outstandingDeliveryTags.add(deliveryTag)
debug(4, () => ['outstanding tags', this.outstandingDeliveryTags.size])
}

this.messageHandler!(message)
}

_onChannelReconnect(cb: (err?: Error | null, result?: any) => void): void {
Expand Down
55 changes: 48 additions & 7 deletions packages/amqp-coffee/src/lib/message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,59 @@ import type { Consumer } from './consumer'

export type IncomingMessage = MethodFrameBasicDeliver['args']



// <!-- MIME typing -->
// <field name = "content-type" domain = "shortstr" label = "MIME content type" />
// <!-- MIME typing -->
// <field name = "content-encoding" domain = "shortstr" label = "MIME content encoding" />
// <!-- For applications, and for header exchange routing -->
// <field name = "headers" domain = "table" label = "message header field table" />
// <!-- For queues that implement persistence -->
// <field name = "delivery-mode" domain = "octet" label = "non-persistent (1) or persistent (2)" />
// <!-- For queues that implement priorities -->
// <field name = "priority" domain = "octet" label = "message priority, 0 to 9" />
// <!-- For application use, no formal behaviour -->
// <field name = "correlation-id" domain = "shortstr" label = "application correlation identifier" />
// <!-- For application use, no formal behaviour but may hold the
// name of a private response queue, when used in request messages -->
// <field name = "reply-to" domain = "shortstr" label = "address to reply to" />
// <!-- For implementation use, no formal behaviour -->
// <field name = "expiration" domain = "shortstr" label = "message expiration specification" />
// <!-- For application use, no formal behaviour -->
// <field name = "message-id" domain = "shortstr" label = "application message identifier" />
// <!-- For application use, no formal behaviour -->
// <field name = "timestamp" domain = "timestamp" label = "message timestamp" />
// <!-- For application use, no formal behaviour -->
// <field name = "type" domain = "shortstr" label = "message type name" />
// <!-- For application use, no formal behaviour -->
// <field name = "user-id" domain = "shortstr" label = "creating user id" />
// <!-- For application use, no formal behaviour -->
// <field name = "app-id" domain = "shortstr" label = "creating application id" />
// <!-- Deprecated, was old cluster-id property -->
// <field name = "reserved" domain = "shortstr" label = "reserved, must be empty" />
export interface MessageProperties {
headers?: Record<string, any> // amqp headers
deliveryMode: 1 | 2
confirm: boolean
mandatory: boolean
immediate: boolean
headers: Record<string, any> // amqp headers
appId?: string // sender diagnostics data
replyTo?: string // amqp reply-to field
correlationId?: string // amqp correlation-id
contentType?: string // amqp message content-type
contentEncoding?: string // amqp message content-encoding
userId?: string
// message type, optional use
type?: string
// message id, optional use
messageId?: string
timestamp?: Date
// timestamp, optional use
timestamp?: Date | null
priority?: number
expiration?: string
exchange: string
routingKey: string
}

export type parseFunction<T, R> = {
Expand All @@ -35,7 +75,7 @@ const safeParse = <R>(fn: parseFunction<Buffer, R>, data: Buffer, contentType: s
} catch (e) {
process.emitWarning(`data parse to ${contentType} failed`, {
code: 'E_AMQP_PARSE_FAILED',
detail: data.slice(0, Math.min(50, data.length - 1)).toString() + '...', // print 50 chars at most
detail: data.subarray(0, Math.min(50, data.length - 1)).toString() + '...', // print 50 chars at most
})
return data
}
Expand Down Expand Up @@ -72,7 +112,7 @@ export class Message {
public readonly raw: Buffer | Error
public readonly size: number
public readonly deliveryTag?: number
public readonly routingKey?: string
public readonly routingKey: string = ''
public readonly exchange: string
public readonly redelivered: boolean
public readonly consumerTag: string
Expand Down Expand Up @@ -268,11 +308,12 @@ export class MessageFactory {
* @param {Consumer} subscription
* @returns
*/
async create(subscription: Consumer): Promise<Message> {
create(subscription: Consumer): void {
if (this[kData] === null && this[kDecoder] !== null) {
await this[kFinished]
this[kFinished]!.then(() => this.create(subscription))
return
}

return new Message(this, subscription)
subscription._onMessageCreated(new Message(this, subscription))
}
}
16 changes: 8 additions & 8 deletions packages/amqp-coffee/src/lib/publisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import { MessageProperties} from './message'
const debug = _debug('amqp:Publisher')
const kEvents = Symbol('amqp:kEvents')
const { hasOwnProperty } = Object.prototype
const kBasicPublishDefaults = defaults.basicPublish

export type BasicPublishOptions = InferOptions<typeof methods.basicPublish>

Expand All @@ -23,6 +24,7 @@ export interface PublishOptions extends MessageProperties {

// sets `expiration` property on the message
timeout?: number // optional ttl value for message in the publish/send
reuse?: boolean

// delivery modes
confirm: boolean // require ack from server on publish
Expand All @@ -31,7 +33,7 @@ export interface PublishOptions extends MessageProperties {
deliveryMode: 1 | 2 // transient or persistant, default to 1
}

const transformData = (data: string | Record<string, any> | Buffer | undefined, options: PublishOptions): Buffer => {
const transformData = (data: string | Record<string, any> | Buffer | undefined, options: { contentType?: string }): Buffer => {
// data must be a buffer
if (typeof data === 'string') {
options.contentType = 'string/utf8'
Expand Down Expand Up @@ -148,7 +150,7 @@ export class Publisher extends Channel {
publish(exchange: string,
routingKey: string,
data: any,
_options: Partial<PublishOptions>,
_options: PublishOptions,
_cb?: (err?: Error | null) => void) {

let cb = _cb
Expand All @@ -162,10 +164,8 @@ export class Publisher extends Channel {
.then(() => cb?.(), cb)
}

async publishAsync(exchange: string, routingKey: string, _data: any, _options: Partial<PublishOptions>) {
const options: PublishOptions = _options
? { ..._options }
: Object.create(null)
async publishAsync(exchange: string, routingKey: string, _data: any, _options: Partial<PublishOptions> = Object.create(null)) {
const options = _options.reuse !== true ? { ..._options } : _options

if (this._inoperableState()) {
debug(4, () => ['publish channel in inoperable state'])
Expand All @@ -185,10 +185,10 @@ export class Publisher extends Channel {
const data = transformData(_data, options)

// Apply default options after we deal with potentially converting the data
for (const [key, value] of Object.entries(defaults.basicPublish)) {
for (const key in kBasicPublishDefaults) {
if (!hasOwnProperty.call(options, key)) {
// @ts-expect-error -- invalid error
options[key] = value
options[key] = kBasicPublishDefaults[key]
}
}

Expand Down
Loading

0 comments on commit 2a1cfd3

Please sign in to comment.