diff --git a/.changeset/moody-dots-taste.md b/.changeset/moody-dots-taste.md new file mode 100644 index 00000000..cde045ca --- /dev/null +++ b/.changeset/moody-dots-taste.md @@ -0,0 +1,5 @@ +--- +"@xmtp/mls-client": patch +--- + +Add streaming callbacks diff --git a/packages/mls-client/package.json b/packages/mls-client/package.json index c3c178fe..9e8a3880 100644 --- a/packages/mls-client/package.json +++ b/packages/mls-client/package.json @@ -52,9 +52,9 @@ "typecheck": "tsc" }, "dependencies": { + "@xmtp/content-type-primitives": "^1.0.1", "@xmtp/mls-client-bindings-node": "^0.0.4", - "@xmtp/proto": "^3.61.1", - "@xmtp/xmtp-js": "^11.6.2" + "@xmtp/proto": "^3.61.1" }, "devDependencies": { "@ianvs/prettier-plugin-sort-imports": "^4.2.1", @@ -64,6 +64,7 @@ "@typescript-eslint/eslint-plugin": "^7.8.0", "@typescript-eslint/parser": "^7.8.0", "@vitest/coverage-v8": "^1.6.0", + "@xmtp/xmtp-js": "workspace:^", "eslint": "^8.57.0", "eslint-config-prettier": "^9.1.0", "eslint-config-standard": "^17.1.0", diff --git a/packages/mls-client/src/AsyncStream.ts b/packages/mls-client/src/AsyncStream.ts index 490cf135..8f36428d 100644 --- a/packages/mls-client/src/AsyncStream.ts +++ b/packages/mls-client/src/AsyncStream.ts @@ -1,32 +1,30 @@ -type Value = V extends undefined ? T : V - -type ResolveValue = { - value: Value | undefined +type ResolveValue = { + value: T | undefined done: boolean } -type ResolveNext = (resolveValue: ResolveValue) => void +type ResolveNext = (resolveValue: ResolveValue) => void -type TransformValue = (value: T) => Value +export type StreamCallback = (err: Error | null, value: T) => void -export class AsyncStream { +export class AsyncStream { #done = false - #resolveNext: ResolveNext | null - #queue: Value[] - #transformValue?: TransformValue + #resolveNext: ResolveNext | null + #queue: T[] stopCallback: (() => void) | undefined = undefined - constructor( - transformValue: V extends undefined ? undefined : TransformValue - ) { + constructor() { this.#queue = [] this.#resolveNext = null this.#done = false - this.#transformValue = transformValue } - callback = (err: Error | null, value: T) => { + get isDone() { + return this.#done + } + + callback: StreamCallback = (err, value) => { if (err) { console.error('stream error', err) this.stop() @@ -37,17 +35,11 @@ export class AsyncStream { return } - const newValue = this.#transformValue - ? this.#transformValue(value) - : // must assert type because TypeScript can't infer that T is assignable - // to Value when this.#transformValue is undefined - (value as unknown as Value) - if (this.#resolveNext) { - this.#resolveNext({ value: newValue, done: false }) + this.#resolveNext({ value, done: false }) this.#resolveNext = null } else { - this.#queue.push(newValue) + this.#queue.push(value) } } @@ -59,7 +51,7 @@ export class AsyncStream { this.stopCallback?.() } - next = (): Promise> => { + next = (): Promise> => { if (this.#queue.length > 0) { return Promise.resolve({ value: this.#queue.shift(), done: false }) } else if (this.#done) { diff --git a/packages/mls-client/src/Conversation.ts b/packages/mls-client/src/Conversation.ts index 3853b730..8367093d 100644 --- a/packages/mls-client/src/Conversation.ts +++ b/packages/mls-client/src/Conversation.ts @@ -1,10 +1,9 @@ import type { NapiGroup, NapiListMessagesOptions, - NapiMessage, } from '@xmtp/mls-client-bindings-node' import type { ContentTypeId } from '@xmtp/xmtp-js' -import { AsyncStream } from '@/AsyncStream' +import { AsyncStream, type StreamCallback } from '@/AsyncStream' import type { Client } from '@/Client' import { DecodedMessage } from '@/DecodedMessage' import { nsToDate } from '@/helpers/date' @@ -78,12 +77,17 @@ export class Conversation { return this.#group.sync() } - stream() { - const asyncStream = new AsyncStream( - (message) => new DecodedMessage(this.#client, message) - ) - const stream = this.#group.stream(asyncStream.callback) + stream(callback?: StreamCallback) { + const asyncStream = new AsyncStream() + + const stream = this.#group.stream((err, message) => { + const decodedMessage = new DecodedMessage(this.#client, message) + asyncStream.callback(err, decodedMessage) + callback?.(err, decodedMessage) + }) + asyncStream.stopCallback = stream.end.bind(stream) + return asyncStream } diff --git a/packages/mls-client/src/Conversations.ts b/packages/mls-client/src/Conversations.ts index d01d143c..6501a847 100644 --- a/packages/mls-client/src/Conversations.ts +++ b/packages/mls-client/src/Conversations.ts @@ -1,11 +1,9 @@ import type { GroupPermissions, NapiConversations, - NapiGroup, NapiListMessagesOptions, - NapiMessage, } from '@xmtp/mls-client-bindings-node' -import { AsyncStream } from '@/AsyncStream' +import { AsyncStream, type StreamCallback } from '@/AsyncStream' import type { Client } from '@/Client' import { Conversation } from '@/Conversation' import { DecodedMessage } from '@/DecodedMessage' @@ -13,10 +11,16 @@ import { DecodedMessage } from '@/DecodedMessage' export class Conversations { #client: Client #conversations: NapiConversations + #map: Map constructor(client: Client, conversations: NapiConversations) { this.#client = client this.#conversations = conversations + this.#map = new Map() + } + + get(id: string) { + return this.#map.get(id) } async newConversation( @@ -27,35 +31,53 @@ export class Conversations { accountAddresses, permissions ) - return new Conversation(this.#client, group) + const conversation = new Conversation(this.#client, group) + this.#map.set(conversation.id, conversation) + return conversation } async list(options?: NapiListMessagesOptions) { const groups = await this.#conversations.list(options) - return groups.map((group) => new Conversation(this.#client, group)) + return groups.map((group) => { + const conversation = new Conversation(this.#client, group) + this.#map.set(conversation.id, conversation) + return conversation + }) } async sync() { return this.#conversations.sync() } - stream() { - const asyncStream = new AsyncStream( - (group) => new Conversation(this.#client, group) - ) - const stream = this.#conversations.stream(asyncStream.callback) + stream(callback?: StreamCallback) { + const asyncStream = new AsyncStream() + + const stream = this.#conversations.stream((err, group) => { + const conversation = new Conversation(this.#client, group) + this.#map.set(conversation.id, conversation) + asyncStream.callback(err, conversation) + callback?.(err, conversation) + }) + asyncStream.stopCallback = stream.end.bind(stream) + return asyncStream } - async streamAllMessages() { + async streamAllMessages(callback?: StreamCallback) { // sync conversations first await this.sync() - const asyncStream = new AsyncStream( - (message) => new DecodedMessage(this.#client, message) - ) - const stream = this.#conversations.streamAllMessages(asyncStream.callback) + + const asyncStream = new AsyncStream() + + const stream = this.#conversations.streamAllMessages((err, message) => { + const decodedMessage = new DecodedMessage(this.#client, message) + asyncStream.callback(err, decodedMessage) + callback?.(err, decodedMessage) + }) + asyncStream.stopCallback = stream.end.bind(stream) + return asyncStream } } diff --git a/packages/mls-client/src/codecs/GroupUpdatedCodec.ts b/packages/mls-client/src/codecs/GroupUpdatedCodec.ts index fe540fb2..402318ed 100644 --- a/packages/mls-client/src/codecs/GroupUpdatedCodec.ts +++ b/packages/mls-client/src/codecs/GroupUpdatedCodec.ts @@ -1,9 +1,9 @@ -import { mlsTranscriptMessages } from '@xmtp/proto' import { ContentTypeId, type ContentCodec, type EncodedContent, -} from '@xmtp/xmtp-js' +} from '@xmtp/content-type-primitives' +import { mlsTranscriptMessages } from '@xmtp/proto' export const ContentTypeGroupUpdated = new ContentTypeId({ authorityId: 'xmtp.org', diff --git a/packages/mls-client/src/index.ts b/packages/mls-client/src/index.ts index f373bcd0..b538d9af 100644 --- a/packages/mls-client/src/index.ts +++ b/packages/mls-client/src/index.ts @@ -13,3 +13,4 @@ export { ContentTypeGroupUpdated, GroupUpdatedCodec, } from './codecs/GroupUpdatedCodec' +export type { StreamCallback } from './AsyncStream' diff --git a/packages/mls-client/test/Conversations.test.ts b/packages/mls-client/test/Conversations.test.ts index 30721ab6..d569300d 100644 --- a/packages/mls-client/test/Conversations.test.ts +++ b/packages/mls-client/test/Conversations.test.ts @@ -19,6 +19,7 @@ describe('Conversations', () => { user2.account.address, ]) expect(conversation).toBeDefined() + expect(client1.conversations.get(conversation.id)?.id).toBe(conversation.id) expect(conversation.id).toBeDefined() expect(conversation.createdAt).toBeDefined() expect(conversation.createdAtNs).toBeDefined() @@ -75,6 +76,12 @@ describe('Conversations', () => { } } stream.stop() + expect(client3.conversations.get(conversation1.id)?.id).toBe( + conversation1.id + ) + expect(client3.conversations.get(conversation2.id)?.id).toBe( + conversation2.id + ) }) it('should stream all messages', async () => { diff --git a/yarn.lock b/yarn.lock index 637ae849..079bfe16 100644 --- a/yarn.lock +++ b/yarn.lock @@ -2863,6 +2863,15 @@ __metadata: languageName: node linkType: hard +"@xmtp/content-type-primitives@npm:^1.0.1": + version: 1.0.1 + resolution: "@xmtp/content-type-primitives@npm:1.0.1" + dependencies: + "@xmtp/proto": "npm:^3.61.1" + checksum: 10/656826cda74328e3079c7f5937eeb694260bd68a66090303fdf6abf4c54c8bbf924064eb6895b9e66addee1269779dfe1c3f0e836fcd8857f784c5645c7b7bf5 + languageName: node + linkType: hard + "@xmtp/mls-client-bindings-node@npm:^0.0.4": version: 0.0.4 resolution: "@xmtp/mls-client-bindings-node@npm:0.0.4" @@ -2881,9 +2890,10 @@ __metadata: "@typescript-eslint/eslint-plugin": "npm:^7.8.0" "@typescript-eslint/parser": "npm:^7.8.0" "@vitest/coverage-v8": "npm:^1.6.0" + "@xmtp/content-type-primitives": "npm:^1.0.1" "@xmtp/mls-client-bindings-node": "npm:^0.0.4" "@xmtp/proto": "npm:^3.61.1" - "@xmtp/xmtp-js": "npm:^11.6.2" + "@xmtp/xmtp-js": "workspace:^" eslint: "npm:^8.57.0" eslint-config-prettier: "npm:^9.1.0" eslint-config-standard: "npm:^17.1.0" @@ -2963,7 +2973,7 @@ __metadata: languageName: node linkType: hard -"@xmtp/xmtp-js@npm:^11.6.2, @xmtp/xmtp-js@workspace:packages/js-sdk": +"@xmtp/xmtp-js@workspace:^, @xmtp/xmtp-js@workspace:packages/js-sdk": version: 0.0.0-use.local resolution: "@xmtp/xmtp-js@workspace:packages/js-sdk" dependencies: