Skip to content

Commit

Permalink
Refactor multi agents execution (#376) (#385)
Browse files Browse the repository at this point in the history
* refactor(t2viz): introduced pipeline to run async operations

Signed-off-by: Yulong Ruan <[email protected]>

* add tests

Signed-off-by: Yulong Ruan <[email protected]>

* update changelog entry

Signed-off-by: Yulong Ruan <[email protected]>

* rename Operator -> Task

Signed-off-by: Yulong Ruan <[email protected]>

* cleanup console.log

Signed-off-by: Yulong Ruan <[email protected]>

---------

Signed-off-by: Yulong Ruan <[email protected]>
(cherry picked from commit fb47ce8)
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>

# Conflicts:
#	CHANGELOG.md

Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
1 parent cfc975c commit b0cc6fe
Show file tree
Hide file tree
Showing 8 changed files with 346 additions and 186 deletions.
180 changes: 0 additions & 180 deletions public/components/visualization/text2vega.ts

This file was deleted.

23 changes: 17 additions & 6 deletions public/components/visualization/text2viz.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ import { v4 as uuidv4 } from 'uuid';
import { useCallback } from 'react';
import { useObservable } from 'react-use';
import { useLocation, useParams } from 'react-router-dom';
import { Pipeline } from '../../utils/pipeline/pipeline';
import { Text2PPLTask } from '../../utils/pipeline/text_to_ppl_task';
import { PPLSampleTask } from '../../utils/pipeline/ppl_sample_task';
import { SourceSelector } from './source_selector';
import type { IndexPattern } from '../../../../../src/plugins/data/public';
import chatIcon from '../../assets/chat.svg';
Expand All @@ -36,7 +39,6 @@ import { StartServices } from '../../types';
import './text2viz.scss';
import { Text2VizEmpty } from './text2viz_empty';
import { Text2VizLoading } from './text2viz_loading';
import { Text2Vega } from './text2vega';
import {
OnSaveProps,
SavedObjectSaveModalOrigin,
Expand All @@ -52,6 +54,7 @@ import { HeaderVariant } from '../../../../../src/core/public';
import { TEXT2VEGA_INPUT_SIZE_LIMIT } from '../../../common/constants/llm';
import { FeedbackThumbs } from '../feedback_thumbs';
import { VizStyleEditor } from './viz_style_editor';
import { Text2VegaTask } from '../../utils/pipeline/text_to_vega_task';

export const INDEX_PATTERN_URL_SEARCH_KEY = 'indexPatternId';
export const ASSISTANT_INPUT_URL_SEARCH_KEY = 'assistantInput';
Expand Down Expand Up @@ -102,7 +105,15 @@ export const Text2Viz = () => {
);
const [currentInstruction, setCurrentInstruction] = useState('');
const [editorInput, setEditorInput] = useState('');
const text2vegaRef = useRef(new Text2Vega(http, data.search, savedObjects));
const text2vegaRef = useRef<Pipeline | null>(null);

if (text2vegaRef.current === null) {
text2vegaRef.current = new Pipeline([
new Text2PPLTask(http),
new PPLSampleTask(data.search),
new Text2VegaTask(http, savedObjects),
]);
}

const status = useObservable(text2vegaRef.current.status$);

Expand All @@ -129,7 +140,7 @@ export const Text2Viz = () => {
*/
useEffect(() => {
const text2vega = text2vegaRef.current;
const subscription = text2vega.getResult$().subscribe((result) => {
const subscription = text2vega?.getResult$().subscribe((result) => {
if (result) {
if (result.error) {
notifications.toasts.addError(result.error, {
Expand All @@ -138,7 +149,7 @@ export const Text2Viz = () => {
}),
});
} else {
setEditorInput(JSON.stringify(result, undefined, 4));
setEditorInput(JSON.stringify(result.vega, undefined, 4));

// Report metric when visualization generated successfully
if (usageCollection) {
Expand All @@ -153,7 +164,7 @@ export const Text2Viz = () => {
});

return () => {
subscription.unsubscribe();
subscription?.unsubscribe();
};
}, [http, notifications, usageCollection]);

Expand Down Expand Up @@ -232,7 +243,7 @@ export const Text2Viz = () => {
currentUsedIndexPatternRef.current = indexPattern;

const text2vega = text2vegaRef.current;
text2vega.invoke({
text2vega?.run({
index: indexPattern.title,
inputQuestion,
inputInstruction,
Expand Down
76 changes: 76 additions & 0 deletions public/utils/pipeline/pipeline.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

import { Pipeline } from './pipeline';

describe('pipeline', () => {
it('should run pipeline', (done) => {
const fn1 = jest.fn().mockImplementation((input) => {
return Promise.resolve(Array<string>().concat(input).concat('fn1'));
});
const fn2 = jest.fn().mockImplementation((input) => {
return Promise.resolve(Array<string>().concat(input).concat('fn2'));
});
const pipeline = new Pipeline([{ execute: fn1 }, { execute: fn2 }]);
pipeline.getResult$().subscribe((result) => {
expect(result).toEqual(['input', 'fn1', 'fn2']);
expect(pipeline.status$.value).toBe('STOPPED');
done();
});

expect(pipeline.status$.value).toBe('STOPPED');
pipeline.run('input');
expect(pipeline.status$.value).toBe('RUNNING');
});

it('should run pipeline with the latest input', (done) => {
const fn1 = jest.fn().mockImplementation((input) => {
return Promise.resolve(Array<string>().concat(input).concat('fn1'));
});
const fn2 = jest.fn().mockImplementation((input) => {
return Promise.resolve(Array<string>().concat(input).concat('fn2'));
});
const pipeline = new Pipeline([{ execute: fn1 }, { execute: fn2 }]);
pipeline.getResult$().subscribe((result) => {
expect(result).toEqual(['input2', 'fn1', 'fn2']);
expect(fn1).toHaveBeenCalledTimes(2);
// The fn2 should only be called onece because the first pipeline run should already be canceled
expect(fn2).toHaveBeenCalledTimes(1);
done();
});
// the pipeline run twice with different inputs
// the second pipeline.run should be make it to cancel the first pipeline run
pipeline.run('input1');
pipeline.run('input2');
});

it('should run pipeline once synchronously', async () => {
const fn1 = jest.fn().mockImplementation((input) => {
return Promise.resolve(Array<string>().concat(input).concat('fn1'));
});
const fn2 = jest.fn().mockImplementation((input) => {
return Promise.resolve(Array<string>().concat(input).concat('fn2'));
});
const pipeline = new Pipeline([{ execute: fn1 }, { execute: fn2 }]);
const result = await pipeline.runOnce('input');
expect(result).toEqual(['input', 'fn1', 'fn2']);
});

it('should catch error', (done) => {
const fn1 = jest.fn().mockImplementation((input) => {
return Promise.resolve(Array<string>().concat(input).concat('fn1'));
});
const fn2 = jest.fn().mockImplementation(() => {
throw new Error('test');
});

const pipeline = new Pipeline([{ execute: fn1 }, { execute: fn2 }]);
pipeline.getResult$().subscribe((result) => {
expect(result).toEqual({ error: new Error('test') });
done();
});
pipeline.run('input');
});
});
58 changes: 58 additions & 0 deletions public/utils/pipeline/pipeline.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
/* eslint-disable @typescript-eslint/no-explicit-any */

import { BehaviorSubject, Observable, Subject, of } from 'rxjs';
import { switchMap, tap, catchError } from 'rxjs/operators';

import { Task } from './task';

export class Pipeline {
input$ = new Subject<any>();
output$: Observable<any>;
status$ = new BehaviorSubject<'RUNNING' | 'STOPPED'>('STOPPED');

constructor(private readonly tasks: Array<Task<any, any>>) {
this.output$ = this.input$
.pipe(tap(() => this.status$.next('RUNNING')))
.pipe(
switchMap((value) => {
return this.tasks
.reduce((acc$, task) => {
return acc$.pipe(switchMap((result) => task.execute(result)));
}, of(value))
.pipe(catchError((e) => of({ error: e })));
})
)
.pipe(tap(() => this.status$.next('STOPPED')));
}

/**
* Triggers the pipeline execution by emitting a new input value.
* This will start the processing of the provided input value through the pipeline's tasks,
* with each task transforming the input in sequence. The resulting value will be emitted
* through the `output$` observable.
*/
run(input: any) {
this.input$.next(input);
}

/**
* Synchronously processes the provided input value through the pipeline's tasks in sequence.
* This method bypasses the reactive pipeline and executes each task one by one,
* it suitable for use cases where you need a one-time, imperative-style execution.
*/
async runOnce(input: any) {
let nextInput = input;
for (const task of this.tasks) {
nextInput = await task.execute(nextInput);
}
return nextInput;
}

getResult$() {
return this.output$;
}
}
Loading

0 comments on commit b0cc6fe

Please sign in to comment.