Skip to content

Commit eaee98b

Browse files
authored
[example][java] Add multiple agent integration example (#239)
1 parent ed9cad1 commit eaee98b

File tree

3 files changed

+418
-0
lines changed

3 files changed

+418
-0
lines changed
Lines changed: 189 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,189 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.agents.examples;
20+
21+
import com.fasterxml.jackson.core.JsonProcessingException;
22+
import com.fasterxml.jackson.databind.ObjectMapper;
23+
import org.apache.flink.agents.api.AgentsExecutionEnvironment;
24+
import org.apache.flink.agents.api.resource.ResourceType;
25+
import org.apache.flink.agents.examples.agents.CustomTypesAndResources;
26+
import org.apache.flink.agents.examples.agents.ProductSuggestionAgent;
27+
import org.apache.flink.agents.examples.agents.ReviewAnalysisAgent;
28+
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
29+
import org.apache.flink.connector.file.src.FileSource;
30+
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
31+
import org.apache.flink.core.fs.Path;
32+
import org.apache.flink.streaming.api.datastream.DataStream;
33+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
34+
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
35+
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
36+
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
37+
import org.apache.flink.util.Collector;
38+
39+
import java.util.ArrayList;
40+
import java.util.List;
41+
import java.util.Objects;
42+
43+
import static org.apache.flink.agents.examples.agents.CustomTypesAndResources.ProductReviewAnalysisRes;
44+
import static org.apache.flink.agents.examples.agents.CustomTypesAndResources.ProductReviewSummary;
45+
import static org.apache.flink.streaming.api.windowing.time.Time.minutes;
46+
47+
/**
48+
* Java example demonstrating multiple workflow agents for product improvement suggestion.
49+
*
50+
* <p>This example demonstrates a multi-stage streaming pipeline using Flink Agents:
51+
*
52+
* <ol>
53+
* <li>Reads product reviews from a source as a streaming source.
54+
* <li>Uses an LLM agent to analyze each review and extract score and unsatisfied reasons.
55+
* <li>Aggregates the analysis results in 1-minute tumbling windows, producing score distributions
56+
* and collecting all unsatisfied reasons.
57+
* <li>Uses another LLM agent to generate product improvement suggestions based on the aggregated
58+
* analysis.
59+
* <li>Prints the final suggestions to stdout.
60+
* </ol>
61+
*/
62+
public class WorkflowMultipleAgentExample {
63+
64+
private static final ObjectMapper MAPPER = new ObjectMapper();
65+
66+
/**
67+
* ProcessWindowFunction to aggregate score distribution and dislike reasons.
68+
*
69+
* <p>This class aggregates multiple ProductReviewAnalysisRes elements within a window,
70+
* calculating score distribution percentages and collecting all unsatisfied reasons.
71+
*/
72+
static class AggregateScoreDistributionAndDislikeReasons
73+
extends ProcessWindowFunction<ProductReviewAnalysisRes, String, String, TimeWindow> {
74+
75+
@Override
76+
public void process(
77+
String key,
78+
Context context,
79+
Iterable<ProductReviewAnalysisRes> elements,
80+
Collector<String> out)
81+
throws JsonProcessingException {
82+
83+
// Initialize rating counts for scores 1-5
84+
int[] ratingCounts = new int[5];
85+
List<String> reasonList = new ArrayList<>();
86+
87+
// Process each element in the window
88+
for (CustomTypesAndResources.ProductReviewAnalysisRes element : elements) {
89+
int rating = element.getScore();
90+
if (rating >= 1 && rating <= 5) {
91+
ratingCounts[rating - 1]++;
92+
}
93+
reasonList.addAll(element.getReasons());
94+
}
95+
96+
// Calculate total and percentages
97+
int total = 0;
98+
for (int count : ratingCounts) {
99+
total += count;
100+
}
101+
102+
// Calculate percentages and format them
103+
List<String> formattedPercentages = new ArrayList<>();
104+
if (total > 0) {
105+
for (int count : ratingCounts) {
106+
double percentage = Math.round((count * 100.0 / total) * 10.0) / 10.0;
107+
formattedPercentages.add(String.format("%.1f%%", percentage));
108+
}
109+
} else {
110+
// If no ratings, set all to 0%
111+
for (int i = 0; i < 5; i++) {
112+
formattedPercentages.add("0.0%");
113+
}
114+
}
115+
116+
// Create and emit the aggregated result
117+
ProductReviewSummary summary =
118+
new ProductReviewSummary(key, formattedPercentages, reasonList);
119+
out.collect(MAPPER.writeValueAsString(summary));
120+
}
121+
}
122+
123+
/** Runs the example pipeline. */
124+
public static void main(String[] args) throws Exception {
125+
// Set up the Flink streaming environment and the Agents execution environment.
126+
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
127+
env.setParallelism(1);
128+
AgentsExecutionEnvironment agentsEnv =
129+
AgentsExecutionEnvironment.getExecutionEnvironment(env);
130+
131+
// Add Ollama chat model connection to be used by the ReviewAnalysisAgent
132+
// and ProductSuggestionAgent.
133+
agentsEnv.addResource(
134+
"ollama_server",
135+
ResourceType.CHAT_MODEL,
136+
CustomTypesAndResources.OLLAMA_SERVER_DESCRIPTOR);
137+
138+
// Read product reviews from input_data.txt file as a streaming source.
139+
// Each element represents a ProductReview.
140+
DataStream<String> productReviewStream =
141+
env.fromSource(
142+
FileSource.forRecordStreamFormat(
143+
new TextLineInputFormat(),
144+
new Path(
145+
Objects.requireNonNull(
146+
WorkflowSingleAgentExample.class
147+
.getClassLoader()
148+
.getResource(
149+
"input_data.txt"))
150+
.getPath()))
151+
.build(),
152+
WatermarkStrategy.noWatermarks(),
153+
"streaming-agent-example");
154+
155+
// Use the ReviewAnalysisAgent (LLM) to analyze each review.
156+
// The agent extracts the review score and unsatisfied reasons.
157+
DataStream<Object> reviewAnalysisResStream =
158+
agentsEnv
159+
.fromDataStream(productReviewStream)
160+
.apply(new ReviewAnalysisAgent())
161+
.toDataStream();
162+
163+
// Aggregate the analysis results in 1-minute tumbling windows.
164+
// This produces a score distribution and collects all unsatisfied reasons for
165+
// each
166+
// product.
167+
DataStream<String> aggregatedAnalysisResStream =
168+
reviewAnalysisResStream
169+
.map(element -> (ProductReviewAnalysisRes) element)
170+
.keyBy(ProductReviewAnalysisRes::getId)
171+
.window(TumblingProcessingTimeWindows.of(minutes(1)))
172+
.process(new AggregateScoreDistributionAndDislikeReasons());
173+
174+
// Use the ProductSuggestionAgent (LLM) to generate product improvement
175+
// suggestions
176+
// based on the aggregated analysis results.
177+
DataStream<Object> productSuggestionResStream =
178+
agentsEnv
179+
.fromDataStream(aggregatedAnalysisResStream)
180+
.apply(new ProductSuggestionAgent())
181+
.toDataStream();
182+
183+
// Print the final product improvement suggestions to stdout.
184+
productSuggestionResStream.print();
185+
186+
// Execute the pipeline.
187+
agentsEnv.execute();
188+
}
189+
}

examples/src/main/java/org/apache/flink/agents/examples/agents/CustomTypesAndResources.java

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,37 @@ public class CustomTypesAndResources {
6161
new ChatMessage(MessageRole.SYSTEM, REVIEW_ANALYSIS_SYSTEM_PROMPT_STR),
6262
new ChatMessage(MessageRole.USER, "\"input\":\n" + "{input}")));
6363

64+
// Prompt for review analysis react agent
65+
public static final Prompt REVIEW_ANALYSIS_REACT_PROMPT =
66+
new Prompt(
67+
Arrays.asList(
68+
new ChatMessage(MessageRole.SYSTEM, REVIEW_ANALYSIS_SYSTEM_PROMPT_STR),
69+
new ChatMessage(
70+
MessageRole.USER, "\"id\": {id},\n" + "\"review\": {review}")));
71+
72+
// Prompt for product suggestion agent
73+
public static final String PRODUCT_SUGGESTION_PROMPT_STR =
74+
"Based on the rating distribution and user dissatisfaction reasons, generate three actionable suggestions for product improvement.\n\n"
75+
+ "Input format:\n"
76+
+ "{\n"
77+
+ " \"id\": \"1\",\n"
78+
+ " \"score_histogram\": [\"10%\", \"20%\", \"10%\", \"15%\", \"45%\"],\n"
79+
+ " \"unsatisfied_reasons\": [\"reason1\", \"reason2\", \"reason3\"]\n"
80+
+ "}\n\n"
81+
+ "Ensure that your response can be parsed by Java JSON, use the following format as an example:\n"
82+
+ "{\n"
83+
+ " \"suggestion_list\": [\n"
84+
+ " \"suggestion1\",\n"
85+
+ " \"suggestion2\",\n"
86+
+ " \"suggestion3\"\n"
87+
+ " ]\n"
88+
+ "}\n\n"
89+
+ "input:\n"
90+
+ "{input}";
91+
92+
public static final Prompt PRODUCT_SUGGESTION_PROMPT =
93+
new Prompt(PRODUCT_SUGGESTION_PROMPT_STR);
94+
6495
/**
6596
* Tool for notifying the shipping manager when product received a negative review due to
6697
* shipping damage.
@@ -146,4 +177,80 @@ public String toString() {
146177
"ProductReviewAnalysisRes{id='%s', score=%d, reasons=%s}", id, score, reasons);
147178
}
148179
}
180+
181+
/** Aggregates multiple reviews and insights using LLM for a product. */
182+
@JsonSerialize
183+
@JsonDeserialize
184+
public static class ProductReviewSummary {
185+
private final String id;
186+
private final List<String> scoreHist;
187+
private final List<String> unsatisfiedReasons;
188+
189+
@JsonCreator
190+
public ProductReviewSummary(
191+
@JsonProperty("id") String id,
192+
@JsonProperty("scoreHist") List<String> scoreHist,
193+
@JsonProperty("unsatisfiedReasons") List<String> unsatisfiedReasons) {
194+
this.id = id;
195+
this.scoreHist = scoreHist;
196+
this.unsatisfiedReasons = unsatisfiedReasons;
197+
}
198+
199+
public String getId() {
200+
return id;
201+
}
202+
203+
public List<String> getScoreHist() {
204+
return scoreHist;
205+
}
206+
207+
public List<String> getUnsatisfiedReasons() {
208+
return unsatisfiedReasons;
209+
}
210+
211+
@Override
212+
public String toString() {
213+
return String.format(
214+
"ProductReviewSummary{id='%s', scoreHist=%s, unsatisfiedReasons=%s}",
215+
id, scoreHist, unsatisfiedReasons);
216+
}
217+
}
218+
219+
/** Provides a summary of review data including suggestions for improvement. */
220+
@JsonSerialize
221+
@JsonDeserialize
222+
public static class ProductSuggestion {
223+
private final String id;
224+
private final List<String> scoreHist;
225+
private final List<String> suggestions;
226+
227+
@JsonCreator
228+
public ProductSuggestion(
229+
@JsonProperty("id") String id,
230+
@JsonProperty("scoreHist") List<String> scoreHist,
231+
@JsonProperty("suggestions") List<String> suggestions) {
232+
this.id = id;
233+
this.scoreHist = scoreHist;
234+
this.suggestions = suggestions;
235+
}
236+
237+
public String getId() {
238+
return id;
239+
}
240+
241+
public List<String> getScoreHist() {
242+
return scoreHist;
243+
}
244+
245+
public List<String> getSuggestions() {
246+
return suggestions;
247+
}
248+
249+
@Override
250+
public String toString() {
251+
return String.format(
252+
"ProductSuggestion{id='%s', scoreHist=%s, suggestions=%s}",
253+
id, scoreHist, suggestions);
254+
}
255+
}
149256
}

0 commit comments

Comments
 (0)