Skip to content

Commit

Permalink
Merge branch 'nmolnar/pluggable-api-client' into pluggable-persistence
Browse files Browse the repository at this point in the history
  • Loading branch information
neekolas committed Aug 23, 2023
2 parents 9b8a616 + e9dc1d3 commit 175d351
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 19 deletions.
19 changes: 14 additions & 5 deletions src/ApiClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,13 @@ export type SubscribeCallback = NotifyStreamEntityArrival<messageApi.Envelope>

export type UnsubscribeFn = () => Promise<void>

export type UpdateContentTopics = (topics: string[]) => Promise<void>

export type SubscriptionManager = {
unsubscribe: UnsubscribeFn
updateContentTopics?: UpdateContentTopics
}

export type OnConnectionLostCallback = () => void

const isAbortError = (err?: Error): boolean => {
Expand Down Expand Up @@ -132,7 +139,7 @@ export interface ApiClient {
params: SubscribeParams,
callback: SubscribeCallback,
onConnectionLost?: OnConnectionLostCallback
): UnsubscribeFn
): SubscriptionManager
publish(messages: PublishParams[]): ReturnType<typeof MessageApi.Publish>
batchQuery(queries: Query[]): Promise<messageApi.Envelope[][]>
setAuthenticator(
Expand Down Expand Up @@ -256,7 +263,7 @@ export default class HttpApiClient implements ApiClient {
req: messageApi.SubscribeRequest,
cb: NotifyStreamEntityArrival<messageApi.Envelope>,
onConnectionLost?: OnConnectionLostCallback
): UnsubscribeFn {
): SubscriptionManager {
const abortController = new AbortController()

const doSubscribe = async () => {
Expand Down Expand Up @@ -298,8 +305,10 @@ export default class HttpApiClient implements ApiClient {
}
doSubscribe()

return async () => {
abortController?.abort()
return {
unsubscribe: async () => {
abortController?.abort()
},
}
}

Expand Down Expand Up @@ -474,7 +483,7 @@ export default class HttpApiClient implements ApiClient {
params: SubscribeParams,
callback: SubscribeCallback,
onConnectionLost?: OnConnectionLostCallback
): UnsubscribeFn {
): SubscriptionManager {
if (!params.contentTopics.length) {
throw new Error('Must provide list of contentTopics to subscribe to')
}
Expand Down
25 changes: 17 additions & 8 deletions src/Stream.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
import { OnConnectionLostCallback, UnsubscribeFn } from './ApiClient'
import {
OnConnectionLostCallback,
SubscriptionManager,
UnsubscribeFn,

Check warning on line 4 in src/Stream.ts

View workflow job for this annotation

GitHub Actions / Lint

'UnsubscribeFn' is defined but never used
} from './ApiClient'
import Client from './Client'
import { messageApi } from '@xmtp/proto'

Expand All @@ -23,7 +27,7 @@ export default class Stream<T> {
// if callback is undefined the stream is closed
callback: ((env: messageApi.Envelope) => Promise<void>) | undefined

unsubscribeFn?: UnsubscribeFn
subscriptionManager?: SubscriptionManager

onConnectionLost?: OnConnectionLostCallback

Expand Down Expand Up @@ -84,7 +88,7 @@ export default class Stream<T> {
throw new Error('Missing callback for stream')
}

this.unsubscribeFn = this.client.apiClient.subscribe(
this.subscriptionManager = this.client.apiClient.subscribe(
{
contentTopics: this.topics,
},
Expand Down Expand Up @@ -124,8 +128,8 @@ export default class Stream<T> {
// https://tc39.es/ecma262/#table-iterator-interface-optional-properties
// Note that this means the Stream will be closed after it was used in a for-await-of or yield* or similar.
async return(): Promise<IteratorResult<T>> {
if (this.unsubscribeFn) {
await this.unsubscribeFn()
if (this.subscriptionManager) {
await this.subscriptionManager.unsubscribe()
}
if (!this.callback) {
return { value: undefined, done: true }
Expand Down Expand Up @@ -156,12 +160,17 @@ export default class Stream<T> {

// Unsubscribe from the existing content topics and resubscribe to the given topics.
private async resubscribeToTopics(topics: string[]): Promise<void> {
if (!this.callback || !this.unsubscribeFn) {
if (!this.callback || !this.subscriptionManager) {
throw new Error('Missing callback for stream')
}
await this.unsubscribeFn()

if (typeof this.subscriptionManager?.updateContentTopics === 'function') {
return this.subscriptionManager.updateContentTopics(topics)
}

await this.subscriptionManager.unsubscribe()
this.topics = topics
this.unsubscribeFn = this.client.apiClient.subscribe(
this.subscriptionManager = this.client.apiClient.subscribe(
{
contentTopics: this.topics,
},
Expand Down
1 change: 1 addition & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ export {
QueryStreamOptions,
Query,
PublishParams,
SubscriptionManager,
SubscribeParams,
SubscribeCallback,
UnsubscribeFn,
Expand Down
12 changes: 6 additions & 6 deletions test/ApiClient.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ describe('Subscribe', () => {
numEnvelopes++
}
const req = { contentTopics: [CONTENT_TOPIC] }
const unsubscribeFn = client.subscribe(req, cb)
const subscriptionManager = client.subscribe(req, cb)
await sleep(10)
expect(numEnvelopes).toBe(2)
expect(subscribeMock).toBeCalledWith(req, expect.anything(), {
Expand All @@ -266,7 +266,7 @@ describe('Subscribe', () => {
'X-Client-Version': 'xmtp-js/' + packageJson.version,
}),
})
await unsubscribeFn()
await subscriptionManager.unsubscribe()
})

it('should resubscribe on error', async () => {
Expand Down Expand Up @@ -300,7 +300,7 @@ describe('Subscribe', () => {
numDisconnects++
}
const req = { contentTopics: [CONTENT_TOPIC] }
const unsubscribeFn = client.subscribe(req, cb, onDisconnect)
const subscriptionManager = client.subscribe(req, cb, onDisconnect)
await sleep(1200)
expect(numEnvelopes).toBe(2)
expect(numDisconnects).toBe(1)
Expand All @@ -316,7 +316,7 @@ describe('Subscribe', () => {
}),
})
consoleInfo.mockRestore()
await unsubscribeFn()
await subscriptionManager.unsubscribe()
})

it('should resubscribe on completion', async () => {
Expand Down Expand Up @@ -346,7 +346,7 @@ describe('Subscribe', () => {
numEnvelopes++
}
const req = { contentTopics: [CONTENT_TOPIC] }
const unsubscribeFn = client.subscribe(req, cb)
const subscriptionManager = client.subscribe(req, cb)
await sleep(1200)
expect(numEnvelopes).toBe(2)
// Resubscribing triggers an info log
Expand All @@ -361,7 +361,7 @@ describe('Subscribe', () => {
}),
})
consoleInfo.mockRestore()
await unsubscribeFn()
await subscriptionManager.unsubscribe()
})

it('throws when no content topics returned', async () => {
Expand Down

0 comments on commit 175d351

Please sign in to comment.