From 7012680a944a34694ca40fd1862f281b2ff0f5c4 Mon Sep 17 00:00:00 2001 From: Vladimir Dementyev Date: Tue, 23 Jul 2024 12:14:43 +0300 Subject: [PATCH] feat: add info event to propagate protocol-level events --- CHANGELOG.md | 4 ++ README.md | 30 ++++++++++ packages/core/CHANGELOG.md | 4 ++ packages/core/action_cable_ext/index.js | 2 + packages/core/action_cable_ext/index.test.ts | 22 +++++++ packages/core/cable/index.d.ts | 9 +++ packages/core/cable/index.js | 19 +++++++ packages/core/cable/index.test.ts | 60 +++++++++++++++++++- packages/core/channel/index.d.ts | 6 ++ packages/core/index.d.ts | 6 +- packages/core/protocol/index.d.ts | 1 + packages/core/protocol/testing.ts | 6 +- 12 files changed, 165 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c7cb110..49d4845 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,10 @@ ## master +- Add `info` event to Cable and Channel. ([@palkan][]) + + This event can be used to notify of some protocol-level events that happen under the hood and have no representation at the Channel API level. A example of such event is a stream history retrieval failure (`{type: "history_not_found"}`). + ## 0.9.0 (2024-05-21) - Types improvements. ([@cmdoptesc][]) diff --git a/README.md b/README.md index 1afd947..410f28c 100644 --- a/README.md +++ b/README.md @@ -385,6 +385,36 @@ This is a recommended way to use this feature with Hotwire applications, where i You can also disable retrieving history since the specified time completely by setting the `historyTimestamp` option to `false`. +#### Handling history retrieval failures + +AnyCable reliable streams store history for a finite period of time and also have an upper size limit. Thus, in some cases, clients may fail to retrieve the missed messages (e.g., after a long-term disconnect). To gracefully handle this situation, you may decide to fallback to a full state reset (e.g., a browser page reload). You can use the specific "info" event to react on various protocol-level events not exposed to the generic Channel interface: + +```js +import { createCable, Channel } from '@anycable/web' + +const cable = createCable({protocol: 'actioncable-v1-ext-json'}); + +class ChatChannel extends Channel { + static identifier = 'ChatChannel' + + constructor(params) { + super(params) + + this.on("info", (evt) => { + if (evt.type === "history_not_found") { + // Restore state by performing an action + this.perform("resetState") + } + + // Successful history retrieval is also notified + if (evt.type === "history_received") { + // ... + } + }) + } +} +``` + #### PONGs support The extended protocol also support sending `pong` commands in response to `ping` messages. A server (AnyCable-Go) keeps track of pongs and disconnect the client if no pongs received in time. This helps to identify broken connections quicker. diff --git a/packages/core/CHANGELOG.md b/packages/core/CHANGELOG.md index d13964f..3e8df16 100644 --- a/packages/core/CHANGELOG.md +++ b/packages/core/CHANGELOG.md @@ -2,6 +2,10 @@ ## master +- Add `notification` event to Cable and Channel. ([@palkan][]) + + This event can be used to notify of some protocol-level events that happen under the hood and have no representation at the Channel API level. A example of such notification is a stream history retrieval failure. + ## 0.9.0 (2024-05-21) - Types improvements. ([@cmdoptesc][]) diff --git a/packages/core/action_cable_ext/index.js b/packages/core/action_cable_ext/index.js index 9cfac3b..80a99d8 100644 --- a/packages/core/action_cable_ext/index.js +++ b/packages/core/action_cable_ext/index.js @@ -54,11 +54,13 @@ export class ActionCableExtendedProtocol extends ActionCableProtocol { if (type === 'confirm_history') { this.logger.debug('history result received', msg) + this.cable.notify('history_received', identifier) return } if (type === 'reject_history') { this.logger.warn('failed to retrieve history', msg) + this.cable.notify('history_not_found', identifier) return } diff --git a/packages/core/action_cable_ext/index.test.ts b/packages/core/action_cable_ext/index.test.ts index 401cd46..c237927 100644 --- a/packages/core/action_cable_ext/index.test.ts +++ b/packages/core/action_cable_ext/index.test.ts @@ -329,6 +329,17 @@ describe('history', () => { expect(logger.logs[0].message).toEqual('history result received') }) + it('notifies history_received', () => { + protocol.receive({ type: 'confirm_history', identifier }) + + expect(cable.mailbox).toHaveLength(1) + expect(cable.mailbox[0]).toMatchObject({ + type: 'info', + event: 'history_received', + identifier + }) + }) + it('logs reject_history', () => { expect( protocol.receive({ type: 'reject_history', identifier }) @@ -337,4 +348,15 @@ describe('history', () => { expect(logger.warnings).toHaveLength(1) expect(logger.warnings[0].message).toEqual('failed to retrieve history') }) + + it('notifies history_not_found', () => { + protocol.receive({ type: 'reject_history', identifier }) + + expect(cable.mailbox).toHaveLength(1) + expect(cable.mailbox[0]).toMatchObject({ + type: 'info', + event: 'history_not_found', + identifier + }) + }) }) diff --git a/packages/core/cable/index.d.ts b/packages/core/cable/index.d.ts index 8104a59..740239d 100644 --- a/packages/core/cable/index.d.ts +++ b/packages/core/cable/index.d.ts @@ -19,11 +19,18 @@ type ConnectEvent = Partial<{ reconnect: boolean }> +export type InfoEvent = { + type: string + identifier?: Identifier + data?: object +} + export interface CableEvents { connect: (event: ConnectEvent) => void disconnect: (event: ReasonError) => void close: (event?: ReasonError) => void keepalive: (msg?: Message) => void + info: (event: InfoEvent) => void } export type CableOptions = { @@ -108,6 +115,8 @@ export class Cable { restored(remoteIds: string[]): void disconnected(reason?: ReasonError): void closed(reason?: string | ReasonError): void + notify(event: string, data?: object): void + notify(event: string, identifier?: Identifier, data?: object): void setSessionId(sid: string): void } diff --git a/packages/core/cable/index.js b/packages/core/cable/index.js index d05cfd2..b879497 100644 --- a/packages/core/cable/index.js +++ b/packages/core/cable/index.js @@ -171,6 +171,25 @@ export class Cable { this.emit('connect', { reconnect, restored }) } + notify(event, identifier, data) { + if (identifier && typeof identifier !== 'string') { + data = identifier + identifier = undefined + } + + // If identifier is present then it's a channel-level notification + if (!identifier) { + this.emit('info', { type: event, data }) + } else { + let sub = this.hub.subscriptions.get(identifier) + if (sub) { + sub.channels.forEach(channel => + channel.emit('info', { type: event, data }) + ) + } + } + } + handleClose(err) { this.logger.debug('transport closed', { error: err }) diff --git a/packages/core/cable/index.test.ts b/packages/core/cable/index.test.ts index afb0b2d..916a720 100644 --- a/packages/core/cable/index.test.ts +++ b/packages/core/cable/index.test.ts @@ -13,7 +13,8 @@ import { SubscriptionRejectedError, StaleConnectionError, ReasonError, - Message + Message, + InfoEvent } from '../index.js' import { TestTransport } from '../transport/testing' import { TestLogger } from '../logger/testing' @@ -999,6 +1000,63 @@ describe('channels', () => { ).rejects.toEqual(Error('failed')) }) + it('notify w/o identifier', async () => { + let received: InfoEvent[] = [] + let promise = new Promise((resolve, reject) => { + let tid = setTimeout(() => { + reject(Error('Timed out to receive notification event')) + }, 500) + + cable.on('info', evt => { + received.push(evt) + + if (received.length === 2) { + clearTimeout(tid) + resolve() + } + }) + }) + + cable.notify('test_notification') + cable.notify('test_notification', { foo: 'bar' }) + + await promise + + expect(received).toEqual([ + { type: 'test_notification', data: undefined }, + { type: 'test_notification', data: { foo: 'bar' } } + ]) + }) + + it('notify', async () => { + cable.subscribe(channel) + expect(cable.hub.size).toEqual(1) + + await channel.ensureSubscribed() + + let promise = new Promise((resolve, reject) => { + let tid = setTimeout(() => { + reject(Error('Timed out to receive notification event')) + }, 500) + + cable.on('info', () => { + clearTimeout(tid) + reject(Error('Should not receive info event for cable')) + }) + + channel.on('info', evt => { + clearTimeout(tid) + expect(evt.type).toEqual('test_notification') + expect(evt.data).toEqual({ foo: 'bar' }) + resolve() + }) + }) + + cable.notify('test_notification', channel.identifier, { foo: 'bar' }) + + await promise + }) + describe('closure and recovery with channels', () => { let channel2: TestChannel let firstError: Promise diff --git a/packages/core/channel/index.d.ts b/packages/core/channel/index.d.ts index c91c0b5..75de7c5 100644 --- a/packages/core/channel/index.d.ts +++ b/packages/core/channel/index.d.ts @@ -32,11 +32,17 @@ type ConnectEvent = Partial<{ reconnect: boolean }> +export type InfoEvent = { + type: string + data?: object +} + export interface ChannelEvents { connect: (event: ConnectEvent) => void disconnect: (event: ReasonError) => void close: (event?: ReasonError) => void message: (msg: T, meta?: MessageMeta) => void + info: (event: InfoEvent) => void } /* eslint-disable @typescript-eslint/no-explicit-any */ diff --git a/packages/core/index.d.ts b/packages/core/index.d.ts index 4db2986..5ce7395 100644 --- a/packages/core/index.d.ts +++ b/packages/core/index.d.ts @@ -4,7 +4,8 @@ export { ChannelEvents, Message, MessageMeta, - Identifier + Identifier, + InfoEvent as ChannelInfoEvent } from './channel/index.js' export { Transport, FallbackTransport } from './transport/index.js' export { Encoder, JSONEncoder } from './encoder/index.js' @@ -29,7 +30,8 @@ export { CableOptions, Cable, NoConnectionError, - CableEvents + CableEvents, + InfoEvent } from './cable/index.js' export { Monitor, diff --git a/packages/core/protocol/index.d.ts b/packages/core/protocol/index.d.ts index b49d6ca..8143395 100644 --- a/packages/core/protocol/index.d.ts +++ b/packages/core/protocol/index.d.ts @@ -36,6 +36,7 @@ export interface Consumer { closed(reason?: string | ReasonError): void keepalive(msg?: Message): void send(msg: object): void + notify(event: string, identifier?: Identifier, data?: object): void } export type ProcessedMessage = Partial<{ diff --git a/packages/core/protocol/testing.ts b/packages/core/protocol/testing.ts index 2ff5fe1..357eead 100644 --- a/packages/core/protocol/testing.ts +++ b/packages/core/protocol/testing.ts @@ -1,5 +1,5 @@ /*eslint n/no-unsupported-features/es-syntax: ["error", {version: "14.0"}] */ -import { Consumer, ReasonError } from '../index.js' +import { Consumer, Identifier, ReasonError } from '../index.js' type State = 'idle' | 'connected' | 'restored' | 'disconnected' | 'closed' @@ -51,4 +51,8 @@ export class TestConsumer implements Consumer { keepalive(msg: number) { this.lastPingedAt = msg | 0 } + + notify(event: string, identifier?: Identifier, data?: object): void { + this.mailbox.push({ type: 'info', event, identifier, data }) + } }