Skip to content

Commit 61e6af2

Browse files
committed
Nick: streaming callback experimental
1 parent 23d3257 commit 61e6af2

File tree

3 files changed

+45
-9
lines changed

3 files changed

+45
-9
lines changed

apps/api/src/lib/extract/extract-redis.ts

+4-1
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,15 @@ import { logger as _logger } from "../logger";
33

44
export enum ExtractStep {
55
INITIAL = "initial",
6+
MAP = "map",
7+
MAP_RERANK = "map-rerank",
68
MULTI_ENTITY = "multi-entity",
79
MULTI_ENTITY_SCRAPE = "multi-entity-scrape",
810
MULTI_ENTITY_EXTRACT = "multi-entity-extract",
911
SCRAPE = "scrape",
10-
MAP = "map",
12+
1113
EXTRACT = "extract",
14+
1215
COMPLETE = "complete",
1316
}
1417

apps/api/src/lib/extract/extraction-service.ts

+38-8
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,8 @@ export async function performExtraction(
170170
],
171171
});
172172

173+
let startMap = Date.now();
174+
let aggMapLinks: string[] = [];
173175
// Process URLs
174176
const urlPromises = request.urls.map((url) =>
175177
processUrl(
@@ -184,9 +186,20 @@ export async function performExtraction(
184186
includeSubdomains: request.includeSubdomains,
185187
schema: request.schema,
186188
},
187-
urlTraces,
188-
),
189-
);
189+
urlTraces,
190+
(links: string[]) => {
191+
aggMapLinks.push(...links);
192+
updateExtract(extractId, {
193+
steps: [
194+
{
195+
step: ExtractStep.MAP,
196+
startedAt: startMap,
197+
finishedAt: Date.now(),
198+
discoveredLinks: aggMapLinks,
199+
},
200+
],
201+
});
202+
}));
190203

191204
const processedUrls = await Promise.all(urlPromises);
192205
const links = processedUrls.flat().filter((url) => url);
@@ -205,8 +218,8 @@ export async function performExtraction(
205218
status: "processing",
206219
steps: [
207220
{
208-
step: ExtractStep.MAP,
209-
startedAt: Date.now(),
221+
step: ExtractStep.MAP_RERANK,
222+
startedAt: startMap,
210223
finishedAt: Date.now(),
211224
discoveredLinks: links,
212225
},
@@ -221,6 +234,7 @@ export async function performExtraction(
221234
// if so, it splits the results into 2 types of completions:
222235
// 1. the first one is a completion that will extract the array of items
223236
// 2. the second one is multiple completions that will extract the items from the array
237+
let startAnalyze = Date.now();
224238
const { isMultiEntity, multiEntityKeys, reasoning, keyIndicators } =
225239
await analyzeSchemaAndPrompt(links, request.schema, request.prompt ?? "");
226240

@@ -239,7 +253,7 @@ export async function performExtraction(
239253
steps: [
240254
{
241255
step: ExtractStep.MULTI_ENTITY,
242-
startedAt: Date.now(),
256+
startedAt: startAnalyze,
243257
finishedAt: Date.now(),
244258
discoveredLinks: [],
245259
},
@@ -254,12 +268,14 @@ export async function performExtraction(
254268
steps: [
255269
{
256270
step: ExtractStep.MULTI_ENTITY_SCRAPE,
257-
startedAt: Date.now(),
271+
startedAt: startAnalyze,
258272
finishedAt: Date.now(),
259273
discoveredLinks: links,
260274
},
261275
],
262276
});
277+
278+
let startScrape = Date.now();
263279
const scrapePromises = links.map((url) => {
264280
if (!docsMap.has(url)) {
265281
return scrapeDocument(
@@ -280,6 +296,20 @@ export async function performExtraction(
280296
(doc): doc is Document => doc !== null,
281297
);
282298

299+
let endScrape = Date.now();
300+
301+
await updateExtract(extractId, {
302+
status: "processing",
303+
steps: [
304+
{
305+
step: ExtractStep.MULTI_ENTITY_SCRAPE,
306+
startedAt: startScrape,
307+
finishedAt: endScrape,
308+
discoveredLinks: links,
309+
},
310+
],
311+
});
312+
283313
for (const doc of multyEntityDocs) {
284314
if (doc?.metadata?.url) {
285315
docsMap.set(doc.metadata.url, doc);
@@ -352,7 +382,7 @@ export async function performExtraction(
352382
steps: [
353383
{
354384
step: ExtractStep.MULTI_ENTITY_EXTRACT,
355-
startedAt: Date.now(),
385+
startedAt: startScrape,
356386
finishedAt: Date.now(),
357387
discoveredLinks: [doc.metadata.url || doc.metadata.sourceURL || ""],
358388
},

apps/api/src/lib/extract/url-processor.ts

+3
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ interface ProcessUrlOptions {
2525
export async function processUrl(
2626
options: ProcessUrlOptions,
2727
urlTraces: URLTrace[],
28+
updateExtractCallback: (links: string[]) => void,
2829
): Promise<string[]> {
2930
const trace: URLTrace = {
3031
url: options.url,
@@ -160,6 +161,8 @@ export async function processUrl(
160161
);
161162

162163

164+
updateExtractCallback(mappedLinks.map((x) => x.url));
165+
163166

164167
// Perform reranking using either prompt or schema
165168
let searchQuery = "";

0 commit comments

Comments
 (0)