You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
@@ -22,6 +22,41 @@ specific language governing permissions and limitations
22
22
under the License.
23
23
-->
24
24
25
+
26
+
27
+
## Overall
28
+
29
+
We provide a total of three ways to run the job: Local Run with Test Data, Local Run with Flink MiniCluster, and Run in Flink Cluster. The detailed differences are shown in the table below:
<th class="text-left" style="width: 50%">Typical Use Case</th>
37
+
</tr>
38
+
</thead>
39
+
<tbody>
40
+
<tr>
41
+
<td>Local Run with Test Data</td>
42
+
<td>Only Python</td>
43
+
<td>Validate the internal logic of the Agent.</td>
44
+
</tr>
45
+
<tr>
46
+
<td>Local Run with Flink MiniCluster</td>
47
+
<td>Python & Java</td>
48
+
<td>Verify upstream/downstream connectivity and schema correctness.</td>
49
+
</tr>
50
+
<tr>
51
+
<td>Run in Flink Cluster</td>
52
+
<td>Python & Java</td>
53
+
<td>Large-scale data and AI processing in production environments.</td>
54
+
</tr>
55
+
</tbody>
56
+
</table>
57
+
58
+
59
+
25
60
## Local Run with Test Data
26
61
27
62
After completing the [installation of flink-agents]({{< ref "docs/get-started/installation" >}}) and [building your agent]({{< ref "docs/development/workflow_agent" >}}), you can test and execute your agent locally using a simple Python script. This allows you to validate logic without requiring a Flink cluster.
@@ -32,51 +67,10 @@ After completing the [installation of flink-agents]({{< ref "docs/get-started/in
32
67
-**Test Data Simulation**: Easily inject mock inputs for validation.
33
68
-**IDE Compatibility**: Run directly in your preferred development environment.
34
69
35
-
### Data Format
36
-
37
-
#### Input Data Format
38
-
39
-
The input data should be a list of dictionaries (`List[Dict[str, Any]]`) with the following structure:
40
-
41
-
``````python
42
-
[
43
-
{
44
-
# Optional field: Context key for multi-session management
"key": "session_001", # or use shorthand "k": "session_001"
47
-
48
-
# Required field: Input content (supports text, JSON, or any serializable type)
49
-
# This becomes the `input` field in InputEvent
50
-
"value": "Calculate the sum of 1 and 2.", # or shorthand "v": "..."
51
-
},
52
-
...
53
-
]
54
-
``````
55
-
56
-
#### Output Data Format
57
-
58
-
The output data is a list of dictionaries (`List[Dict[str, Any]]`) where each dictionary contains a single key-value pair representing the processed result. The structure is generated from `OutputEvent` objects:
59
-
60
-
``````python
61
-
[
62
-
{key_1: output_1}, # From first OutputEvent
63
-
{key_2: output_2}, # From second OutputEvent
64
-
...
65
-
]
66
-
``````
67
-
68
-
Each dictionary in the output list follows this pattern:
69
-
70
-
``````
71
-
{
72
-
# Key: Matches the input context key (from "key"/"k" field or auto-generated UUID)
73
-
# Value: Result from agent processing (type depends on implementation)
74
-
<context_key>: <processed_output>
75
-
}
76
-
``````
77
-
78
70
### Example for Local Run with Test Data
79
71
72
+
#### Complete code
73
+
80
74
``````python
81
75
from flink_agents.api.execution_environment import AgentsExecutionEnvironment
82
76
from my_module.agents import MyAgent # Replace with your actual agent path
@@ -109,27 +103,67 @@ if __name__ == "__main__":
109
103
110
104
``````
111
105
112
-
##Local Run with Flink MiniCluster
106
+
#### Input Data Format
113
107
114
-
After completing the [installation of flink-agents]({{< ref "docs/get-started/installation" >}}) and [building your agent]({{< ref "docs/development/workflow_agent" >}}), you can test and execute your agent locally using a **Flink MiniCluster embedded in Python**. This allows you to simulate a real Flink streaming environment without deploying to a full cluster. For more details about how to integrate agents with Flink's `DataStream` or `Table`, please refer to the [Integrate with Flink]({{< ref "docs/development/integrate_with_flink" >}}) documentation.
108
+
The input data should be a list of dictionaries (`List[Dict[str, Any]]`) with the following structure:
115
109
116
-
{{< hint info >}}
110
+
``````python
111
+
[
112
+
{
113
+
# Optional field: Input key
114
+
"key": "key_1",
115
+
116
+
# Required field: Input content
117
+
# This becomes the `input` field in InputEvent
118
+
"value": "Calculate the sum of 1 and 2.",
119
+
},
120
+
...
121
+
]
122
+
``````
123
+
124
+
#### Output Data Format
125
+
126
+
The output data is a list of dictionaries (`List[Dict[str, Any]]`) where each dictionary contains a single key-value pair representing the processed result. The structure is generated from `OutputEvent` objects:
127
+
128
+
``````python
129
+
[
130
+
{key_1: output_1}, # From first OutputEvent
131
+
{key_2: output_2}, # From second OutputEvent
132
+
...
133
+
]
134
+
``````
135
+
136
+
137
+
138
+
## Local Run with Flink MiniCluster
139
+
140
+
After completing the [installation of flink-agents]({{< ref "docs/get-started/installation" >}}) and [building your agent]({{< ref "docs/development/workflow_agent" >}}), you can test and execute your agent locally using a **Flink MiniCluster**. This allows you to have a lightweight Flink streaming environment without deploying to a full cluster. For more details about how to integrate agents with Flink's `DataStream` or `Table`, please refer to the [Integrate with Flink]({{< ref "docs/development/integrate_with_flink" >}}) documentation.
117
141
118
-
If you encounter the exception "No module named 'flink_agents'" when running with Flink MiniCluster, you can set the PYTHONPATH by adding os.environ["PYTHONPATH"] = sysconfig.get_paths()["purelib"] at the beginning of your code.
119
142
120
-
{{< /hint >}}
121
143
122
144
## Run in Flink Cluster
123
145
124
-
Flink Agents jobs are deployed and run on the cluster similarly to Pyflink jobs. You can refer to the instructions for [submit pyflink jobs](https://https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/deployment/cli/#submitting-pyflink-jobs) for more detailed steps. The following explains how to submit Flink Agents jobs to the cluster, using a Standalone cluster as an example.
146
+
Flink Agents jobs are deployed and run on the cluster similarly to Pyflink jobs. You can refer to the instructions for [submitting PyFlink jobs](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/deployment/cli/#submitting-pyflink-jobs) for more details.
147
+
148
+
149
+
150
+
### Prerequisites
151
+
152
+
-**Operating System**: Unix-like environment (Linux, macOS, Cygwin, or WSL)
153
+
-**Python**: Version 3.10 or 3.11
154
+
-**Flink**: A running Flink cluster with the Flink Agents dependency installed
155
+
156
+
125
157
126
158
### Prepare Flink Agents
127
159
128
160
We recommand creating a Python virtual environment to install the Flink Agents Python library.
129
161
130
162
Follow the [instructions]({{< ref "docs/get-started/installation" >}}) to install the Flink Agents Python and Java libraries.
131
163
132
-
### Deploy a Standalone Flink Cluster
164
+
165
+
166
+
### Prepare Flink
133
167
134
168
Download a stable release of Flink 1.20.3, then extract the archive. You can refer to the [local installation](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/try-flink/local_installation/) instructions for more detailed step.
Deploy a standalone Flink cluster in your local environment with the following command.
142
175
143
-
```bash
144
-
./flink-1.20.3/bin/start-cluster.sh
145
-
```
146
-
147
-
You should be able to navigate to the web UI at [localhost:8081](localhost:8081) to view the Flink dashboard and see that the cluster is up and running.
148
176
149
177
### Submit to Flink Cluster
150
178
```bash
151
-
# Set Python environment variable to locate Python libraries , ensuring Flink
Now you should see a Flink job submitted to the Flink Cluster in Flink web UI [localhost:8081](localhost:8081)
163
-
164
-
After a few minutes, you can check for the output in the TaskManager output log.
202
+
Now you should see a Flink job submitted to the Flink Cluster in Flink web UI. After a few minutes, you can check for the output in the TaskManager output log.
0 commit comments