From 7b11e9c9c41d7e30afd0fd6bd7acd7f128772e2d Mon Sep 17 00:00:00 2001 From: Ian Macartney Date: Tue, 5 Aug 2025 14:20:41 -0700 Subject: [PATCH 1/4] fix flatMap indexfields merging --- packages/convex-helpers/server/stream.test.ts | 50 +++++++++++++++++++ packages/convex-helpers/server/stream.ts | 9 +++- 2 files changed, 57 insertions(+), 2 deletions(-) diff --git a/packages/convex-helpers/server/stream.test.ts b/packages/convex-helpers/server/stream.test.ts index 609fea28..62a04be9 100644 --- a/packages/convex-helpers/server/stream.test.ts +++ b/packages/convex-helpers/server/stream.test.ts @@ -22,6 +22,14 @@ const schema = defineSchema({ d: v.number(), e: v.number(), }).index("cde", ["c", "d", "e"]), + channels: defineTable({ + workspaceId: v.string(), + isPublic: v.boolean(), + }).index("by_isPublicWorkspace", ["isPublic", "workspaceId"]), + channelMembers: defineTable({ + channelId: v.id("channels"), + userId: v.string(), + }).index("by_user", ["userId"]), }); function stripSystemFields(doc: GenericDocument) { @@ -505,6 +513,48 @@ describe("stream", () => { ]); }); }); + + test("merge with flatMap and default index fields", async () => { + const t = convexTest(schema, modules); + await t.run(async (ctx) => { + const workspaceId = "w"; + const userId = "u"; + const privateChannelId = await ctx.db.insert("channels", { + workspaceId, + isPublic: false, + }); + await ctx.db.insert("channelMembers", { channelId: privateChannelId, userId }); + await ctx.db.insert("channels", { workspaceId, isPublic: true }); + + const userMemberships = stream(ctx.db, schema) + .query("channelMembers") + .withIndex("by_user", (q) => q.eq("userId", userId)); + + const privateChannels = userMemberships.flatMap( + async (membership) => + stream(ctx.db, schema) + .query("channels") + .withIndex("by_isPublicWorkspace", (q) => + q.eq("isPublic", false).eq("workspaceId", workspaceId), + ) + .filterWith(async (channel) => channel._id === membership.channelId), + [], + ); + + const publicChannels = stream(ctx.db, schema) + .query("channels") + .withIndex("by_isPublicWorkspace", (q) => + q.eq("isPublic", true).eq("workspaceId", workspaceId), + ); + + const merged = mergedStream([privateChannels, publicChannels], []); + const result = await merged.collect(); + expect(result.map(stripSystemFields)).toEqual([ + { workspaceId, isPublic: false }, + { workspaceId, isPublic: true }, + ]); + }); + }); test("streamIndexRange returns correct subset", async () => { const t = convexTest(schema, modules); await t.run(async (ctx) => { diff --git a/packages/convex-helpers/server/stream.ts b/packages/convex-helpers/server/stream.ts index 4f4ebb74..76af195e 100644 --- a/packages/convex-helpers/server/stream.ts +++ b/packages/convex-helpers/server/stream.ts @@ -291,7 +291,9 @@ export abstract class QueryStream mapper: (doc: T) => Promise>, mappedIndexFields: string[], ): QueryStream { - normalizeIndexFields(mappedIndexFields); + if (mappedIndexFields.length > 0) { + normalizeIndexFields(mappedIndexFields); + } return new FlatMapStream(this, mapper, mappedIndexFields); } @@ -1269,10 +1271,13 @@ class FlatMapStreamIterator< } else { innerStream = await this.#mapper(t); if ( + this.#mappedIndexFields.length > 0 && !equalIndexFields(innerStream.getIndexFields(), this.#mappedIndexFields) ) { throw new Error( - `FlatMapStream: inner stream has different index fields than expected: ${JSON.stringify(innerStream.getIndexFields())} vs ${JSON.stringify(this.#mappedIndexFields)}`, + `FlatMapStream: inner stream has different index fields than expected: ${JSON.stringify( + innerStream.getIndexFields(), + )} vs ${JSON.stringify(this.#mappedIndexFields)}`, ); } if (innerStream.getOrder() !== this.#outerStream.getOrder()) { From 43da75d5938f52f14c9d84684b2ecce2cacb9dd4 Mon Sep 17 00:00:00 2001 From: Ian Macartney Date: Tue, 5 Aug 2025 14:23:27 -0700 Subject: [PATCH 2/4] format --- packages/convex-helpers/server/stream.test.ts | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/packages/convex-helpers/server/stream.test.ts b/packages/convex-helpers/server/stream.test.ts index 62a04be9..389e7cab 100644 --- a/packages/convex-helpers/server/stream.test.ts +++ b/packages/convex-helpers/server/stream.test.ts @@ -523,7 +523,10 @@ describe("stream", () => { workspaceId, isPublic: false, }); - await ctx.db.insert("channelMembers", { channelId: privateChannelId, userId }); + await ctx.db.insert("channelMembers", { + channelId: privateChannelId, + userId, + }); await ctx.db.insert("channels", { workspaceId, isPublic: true }); const userMemberships = stream(ctx.db, schema) @@ -537,7 +540,9 @@ describe("stream", () => { .withIndex("by_isPublicWorkspace", (q) => q.eq("isPublic", false).eq("workspaceId", workspaceId), ) - .filterWith(async (channel) => channel._id === membership.channelId), + .filterWith( + async (channel) => channel._id === membership.channelId, + ), [], ); From 06876916bcb1b77e4ccddc7bdcb9f67e4f768b26 Mon Sep 17 00:00:00 2001 From: Ian Macartney Date: Tue, 5 Aug 2025 14:24:12 -0700 Subject: [PATCH 3/4] test-order --- .github/workflows/test.yml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 7edc8326..d0f9fd6c 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -34,6 +34,7 @@ jobs: - name: Run lint, test, and codegen run: | - npm run lint npm run test + npm run lint + npm run typecheck npx convex codegen && git diff --exit-code From ce0b700fa30441d779b509da58c498741bfdd7cf Mon Sep 17 00:00:00 2001 From: Ian Macartney Date: Tue, 5 Aug 2025 15:42:10 -0700 Subject: [PATCH 4/4] repro --- packages/convex-helpers/server/stream.test.ts | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/packages/convex-helpers/server/stream.test.ts b/packages/convex-helpers/server/stream.test.ts index 389e7cab..2396775f 100644 --- a/packages/convex-helpers/server/stream.test.ts +++ b/packages/convex-helpers/server/stream.test.ts @@ -25,7 +25,8 @@ const schema = defineSchema({ channels: defineTable({ workspaceId: v.string(), isPublic: v.boolean(), - }).index("by_isPublicWorkspace", ["isPublic", "workspaceId"]), + ownerId: v.string(), + }).index("by_isPublicWorkspace", ["isPublic", "workspaceId", "ownerId"]), channelMembers: defineTable({ channelId: v.id("channels"), userId: v.string(), @@ -522,12 +523,17 @@ describe("stream", () => { const privateChannelId = await ctx.db.insert("channels", { workspaceId, isPublic: false, + ownerId: userId, }); await ctx.db.insert("channelMembers", { channelId: privateChannelId, userId, }); - await ctx.db.insert("channels", { workspaceId, isPublic: true }); + await ctx.db.insert("channels", { + workspaceId, + isPublic: true, + ownerId: userId, + }); const userMemberships = stream(ctx.db, schema) .query("channelMembers") @@ -552,7 +558,10 @@ describe("stream", () => { q.eq("isPublic", true).eq("workspaceId", workspaceId), ); - const merged = mergedStream([privateChannels, publicChannels], []); + const merged = mergedStream( + [privateChannels, publicChannels], + ["userId"], + ); const result = await merged.collect(); expect(result.map(stripSystemFields)).toEqual([ { workspaceId, isPublic: false },