Skip to content

(EAI-152) check for change to chunkAlgoHash when updating embeddings #580

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 22 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
6816ee7
check for change in chunkAlgoHash
yakubova92 Dec 16, 2024
d688f5c
add pages with changed chunking to arg for next step
yakubova92 Dec 16, 2024
47c3b92
pulling pages to be updated based on embedded content
yakubova92 Dec 17, 2024
11bac3a
update embeddings if chunk algo changes, regardless of page changes
yakubova92 Dec 20, 2024
5425929
update mockEmbeddedContentStore to have newly added method
yakubova92 Dec 20, 2024
e358ca0
teardown in afterEach
yakubova92 Dec 20, 2024
57091b2
chunkAlgoHash creation moved up in function chain, value passed down
yakubova92 Dec 20, 2024
42ab655
move chunkAlgoHashes out of global scope
yakubova92 Dec 20, 2024
5872f56
change implementation - query for data sources that use an old chunk …
yakubova92 Jan 15, 2025
963870a
cleanup and lint
yakubova92 Jan 15, 2025
81bee15
fix tests
yakubova92 Jan 21, 2025
4fdf716
move mongo memory server creation from beforeEach to beforeAll
yakubova92 Jan 21, 2025
8ed50ec
refactor
yakubova92 Jan 22, 2025
743515f
lint
yakubova92 Jan 22, 2025
236c50a
Merge branch 'main' into EAI-152
yakubova92 Jan 22, 2025
dcfff5b
fix beforeAll
yakubova92 Jan 23, 2025
517ac50
Apply suggestions from code review: changes to comment copy
yakubova92 Mar 25, 2025
c9ba2fb
address feedback
yakubova92 Mar 25, 2025
0213ad3
test case descriptions
yakubova92 Mar 25, 2025
ffc47fa
rm unused chunk algo hash caching
yakubova92 Apr 7, 2025
04c526d
Merge branch 'main' into EAI-152
yakubova92 Apr 7, 2025
c83ab36
include JIRA ticket link in todo comment
yakubova92 May 6, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions packages/mongodb-rag-core/src/contentStore/EmbeddedContent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,14 @@ export type DeleteEmbeddedContentArgs = {
inverseDataSources?: boolean;
};

export interface GetSourcesMatchParams {
sourceNames?: string[];
chunkAlgoHash: {
hashValue: string;
operation: "equals" | "notEquals";
};
}

/**
Data store of the embedded content.
*/
Expand Down Expand Up @@ -114,4 +122,9 @@ export type EmbeddedContentStore = VectorStore<EmbeddedContent> & {
Initialize the store.
*/
init?: () => Promise<void>;

/**
Get the names of ingested data sources that match the given query.
*/
getDataSources(matchQuery: GetSourcesMatchParams): Promise<string[]>;
};
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import { pageIdentity } from ".";
import { pageIdentity, PersistedPage } from ".";
import { DatabaseConnection } from "../DatabaseConnection";
import { EmbeddedContent, EmbeddedContentStore } from "./EmbeddedContent";
import {
EmbeddedContent,
EmbeddedContentStore,
GetSourcesMatchParams,
} from "./EmbeddedContent";
import { FindNearestNeighborsOptions, WithScore } from "../VectorStore";
import {
MakeMongoDbDatabaseConnectionParams,
Expand Down Expand Up @@ -58,6 +62,21 @@ export type MongoDbEmbeddedContentStore = EmbeddedContentStore &
init(): Promise<void>;
};

function makeMatchQuery({ sourceNames, chunkAlgoHash }: GetSourcesMatchParams) {
const operator = chunkAlgoHash.operation === "equals" ? "$eq" : "$ne";
return {
chunkAlgoHash: { [operator]: chunkAlgoHash.hashValue },
// run on specific source names if specified, run on all if not
...(sourceNames
? {
sourceName: {
$in: sourceNames,
},
}
: undefined),
};
}

export function makeMongoDbEmbeddedContentStore({
connectionUri,
databaseName,
Expand Down Expand Up @@ -232,5 +251,22 @@ export function makeMongoDbEmbeddedContentStore({
}
}
},

async getDataSources(matchQuery: GetSourcesMatchParams): Promise<string[]> {
const result = await embeddedContentCollection
.aggregate([
{ $match: makeMatchQuery(matchQuery) },
{
$group: {
_id: null,
uniqueSources: { $addToSet: "$sourceName" },
},
},
{ $project: { _id: 0, uniqueSources: 1 } },
])
.toArray();
const uniqueSources = result.length > 0 ? result[0].uniqueSources : [];
return uniqueSources;
},
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,27 @@ import {
updateEmbeddedContent,
updateEmbeddedContentForPage,
} from "./updateEmbeddedContent";
import { persistPages } from ".";
import {
makeMongoDbEmbeddedContentStore,
makeMongoDbPageStore,
MongoDbEmbeddedContentStore,
MongoDbPageStore,
persistPages,
updatePages,
} from ".";
import { makeMockPageStore } from "../test/MockPageStore";
import * as chunkPageModule from "../chunk/chunkPage";
import { EmbeddedContentStore, EmbeddedContent } from "./EmbeddedContent";
import {
EmbeddedContentStore,
EmbeddedContent,
GetSourcesMatchParams,
} from "./EmbeddedContent";
import { Embedder } from "../embed";
import { Page, PersistedPage } from ".";
import { strict as assert } from "assert";
import { MongoMemoryReplSet } from "mongodb-memory-server";
import { DataSource } from "../dataSources";
import { MongoClient } from "mongodb";

export const makeMockEmbeddedContentStore = (): EmbeddedContentStore => {
const content: Map<string /* page url */, EmbeddedContent[]> = new Map();
Expand All @@ -29,6 +44,9 @@ export const makeMockEmbeddedContentStore = (): EmbeddedContentStore => {
metadata: {
embeddingName: "test",
},
async getDataSources(matchQuery: GetSourcesMatchParams): Promise<string[]> {
return [];
},
};
};

Expand All @@ -49,6 +67,7 @@ const embedder = {
},
};

// TODO: deprecate mock store and use mongodb-memory-server instead. https://jira.mongodb.org/browse/EAI-935
describe("updateEmbeddedContent", () => {
it("deletes embedded content for deleted page", async () => {
const pageStore = makeMockPageStore();
Expand Down Expand Up @@ -207,6 +226,7 @@ describe("updateEmbeddedContent", () => {
store: embeddedContentStore,
page,
concurrencyOptions: { createChunks: 2 },
chunkAlgoHash: "testchunkalgohash",
});

const embeddedContent = await embeddedContentStore.loadEmbeddedContent({
Expand Down Expand Up @@ -276,3 +296,231 @@ describe("updateEmbeddedContent", () => {
});
});
});

// These tests use "mongodb-memory-server", not mockEmbeddedContentStore
describe("updateEmbeddedContent updates chunks based on changes to copy or changes to the chunk algo", () => {
let mongod: MongoMemoryReplSet | undefined;
let pageStore: MongoDbPageStore;
let embedStore: MongoDbEmbeddedContentStore;
let uri: string;
let databaseName: string;
let mongoClient: MongoClient;
let page1Embedding: EmbeddedContent[], page2Embedding: EmbeddedContent[];
let pages: PersistedPage[] = [];

const embedder = {
async embed() {
return { embedding: [1, 2, 3] };
},
};
const mockDataSources: DataSource[] = [
{
name: "source1",
fetchPages: async () => [
{
url: "test1.com",
format: "html",
sourceName: "source1",
body: "hello source 1",
},
],
},
{
name: "source2",
fetchPages: async () => [
{
url: "test2.com",
format: "html",
sourceName: "source2",
body: "hello source 2",
},
],
},
];
const mockDataSourceNames = mockDataSources.map(
(dataSource) => dataSource.name
);
beforeAll(async () => {
mongod = await MongoMemoryReplSet.create();
uri = mongod.getUri();
mongoClient = new MongoClient(uri);
await mongoClient.connect();
});
beforeEach(async () => {
// setup mongo client, page store, and embedded content store
databaseName = "test-all-command";
embedStore = makeMongoDbEmbeddedContentStore({
connectionUri: uri,
databaseName,
searchIndex: { embeddingName: "test-embedding" },
});
pageStore = makeMongoDbPageStore({
connectionUri: uri,
databaseName,
});
// create pages and verify that they have been created
await updatePages({ sources: mockDataSources, pageStore });
pages = await pageStore.loadPages();
assert(pages.length == 2);
// create embeddings for the pages and verify that they have been created
await updateEmbeddedContent({
since: new Date(0),
embeddedContentStore: embedStore,
pageStore,
sourceNames: mockDataSourceNames,
embedder,
});
page1Embedding = await embedStore.loadEmbeddedContent({
page: pages[0],
});
page2Embedding = await embedStore.loadEmbeddedContent({
page: pages[1],
});
assert(page1Embedding.length);
assert(page2Embedding.length);
});

afterEach(async () => {
await pageStore?.drop();
await embedStore?.drop();
});
afterAll(async () => {
await pageStore?.close();
await embedStore?.close();
await mongoClient?.close();
await mongod?.stop();
});

it("should update embedded content only for pages that have been updated (copy change) after the 'since' date provided", async () => {
// Modify dates of pages and embedded content for testing
const sinceDate = new Date("2024-01-01");
const beforeSinceDate = new Date("2023-01-01");
const afterSinceDate = new Date("2025-01-01");
// set pages[0] to be last updated before sinceDate (should not be modified)
await mongoClient
.db(databaseName)
.collection("pages")
.updateOne({ ...pages[0] }, { $set: { updated: beforeSinceDate } });
await mongoClient
.db(databaseName)
.collection("embedded_content")
.updateOne(
{ sourceName: mockDataSourceNames[0] },
{ $set: { updated: beforeSinceDate } }
);
// set pages[1] to be last updated after sinceDate (should be re-chunked)
await mongoClient
.db(databaseName)
.collection("pages")
.updateOne({ ...pages[1] }, { $set: { updated: afterSinceDate } });
await mongoClient
.db(databaseName)
.collection("embedded_content")
.updateOne(
{ sourceName: mockDataSourceNames[1] },
{ $set: { updated: afterSinceDate } }
);
const originalPage1Embedding = await embedStore.loadEmbeddedContent({
page: pages[0],
});
const originalPage2Embedding = await embedStore.loadEmbeddedContent({
page: pages[1],
});
await updateEmbeddedContent({
since: sinceDate,
embeddedContentStore: embedStore,
pageStore,
sourceNames: mockDataSourceNames,
embedder,
});
const updatedPage1Embedding = await embedStore.loadEmbeddedContent({
page: pages[0],
});
const updatedPage2Embedding = await embedStore.loadEmbeddedContent({
page: pages[1],
});
assert(updatedPage1Embedding.length);
assert(updatedPage2Embedding.length);
expect(updatedPage1Embedding[0].updated.getTime()).toBe(
originalPage1Embedding[0].updated.getTime()
);
expect(updatedPage2Embedding[0].updated.getTime()).not.toBe(
originalPage2Embedding[0].updated.getTime()
);
});
it("should update embedded content when only chunk algo has changed", async () => {
// change the chunking algo for the second page, but not the first
await updateEmbeddedContent({
since: new Date(),
embeddedContentStore: embedStore,
pageStore,
sourceNames: [mockDataSourceNames[0]],
embedder,
});
await updateEmbeddedContent({
since: new Date(),
embeddedContentStore: embedStore,
pageStore,
sourceNames: [mockDataSourceNames[1]],
embedder,
chunkOptions: { chunkOverlap: 2 },
});
const updatedPage1Embedding = await embedStore.loadEmbeddedContent({
page: pages[0],
});
const updatedPage2Embedding = await embedStore.loadEmbeddedContent({
page: pages[1],
});
assert(updatedPage1Embedding.length);
assert(updatedPage2Embedding.length);
expect(updatedPage1Embedding[0].chunkAlgoHash).toBe(
page1Embedding[0].chunkAlgoHash
);
expect(updatedPage2Embedding[0].chunkAlgoHash).not.toBe(
page2Embedding[0].chunkAlgoHash
);
});
it("should update embedded content when either chunk algo has changed or copy has changed", async () => {
// SETUP: Modify dates of pages and embedded content for this test case
const sinceDate = new Date("2024-01-01");
const afterSinceDate = new Date("2025-01-01");
await mongoClient
.db(databaseName)
.collection("pages")
.updateOne({ ...pages[0] }, { $set: { updated: afterSinceDate } });
await mongoClient
.db(databaseName)
.collection("embedded_content")
.updateOne(
{ sourceName: mockDataSourceNames[0] },
{ $set: { updated: afterSinceDate } }
);
const originalPage1Embedding = await embedStore.loadEmbeddedContent({
page: pages[0],
});
// END SETUP
await updateEmbeddedContent({
since: sinceDate,
embeddedContentStore: embedStore,
pageStore,
sourceNames: mockDataSourceNames,
embedder,
chunkOptions: { chunkOverlap: 2 },
});
const updatedPage1Embedding = await embedStore.loadEmbeddedContent({
page: pages[0],
});
const updatedPage2Embedding = await embedStore.loadEmbeddedContent({
page: pages[1],
});
assert(updatedPage1Embedding.length);
assert(updatedPage2Embedding.length);
// both pages should be updated
expect(updatedPage1Embedding[0].chunkAlgoHash).not.toBe(
originalPage1Embedding[0].chunkAlgoHash
);
expect(updatedPage2Embedding[0].chunkAlgoHash).not.toBe(
page2Embedding[0].chunkAlgoHash
);
});
});
Loading