Skip to content

Commit 39531ad

Browse files
committed
[docs] Update the deployment introduction for Flink agents
1 parent e7f2adb commit 39531ad

File tree

1 file changed

+131
-8
lines changed

1 file changed

+131
-8
lines changed

docs/content/docs/operations/deployment.md

Lines changed: 131 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,18 +24,141 @@ under the License.
2424

2525
## Local Run with Test Data
2626

27-
{{< hint warning >}}
28-
**TODO**: How to run with test data with LocalExecutorEnvironment.
29-
{{< /hint >}}
27+
After completing the [installation of flink-agents]({{< ref "docs/get-started/installation" >}}) and [building your agent]({{< ref "docs/development/workflow_agent" >}}), you can test and execute your agent locally using a simple Python script. This allows you to validate logic without requiring a Flink cluster.
28+
29+
### Key Features
30+
31+
- **No Flink Required**: Local execution is ideal for development and testing.
32+
- **Test Data Simulation**: Easily inject mock inputs for validation.
33+
- **IDE Compatibility**: Run directly in your preferred development environment.
34+
35+
### Data Format
36+
37+
#### Input Data Format
38+
39+
The input data should be a list of dictionaries (`List[Dict[str, Any]]`) with the following structure:
40+
41+
``````python
42+
[
43+
{
44+
# Optional field: Context key for multi-session management
45+
# Priority order: "key" > "k" > auto-generated UUID
46+
"key": "session_001", # or use shorthand "k": "session_001"
47+
48+
# Required field: Input content (supports text, JSON, or any serializable type)
49+
# This becomes the `input` field in InputEvent
50+
"value": "Calculate the sum of 1 and 2.", # or shorthand "v": "..."
51+
},
52+
...
53+
]
54+
``````
55+
56+
#### Output Data Format
57+
58+
The output data is a list of dictionaries (`List[Dict[str, Any]]`) where each dictionary contains a single key-value pair representing the processed result. The structure is generated from `OutputEvent` objects:
59+
60+
``````python
61+
[
62+
{key_1: output_1}, # From first OutputEvent
63+
{key_2: output_2}, # From second OutputEvent
64+
...
65+
]
66+
``````
67+
68+
Each dictionary in the output list follows this pattern:
69+
70+
``````
71+
{
72+
# Key: Matches the input context key (from "key"/"k" field or auto-generated UUID)
73+
# Value: Result from agent processing (type depends on implementation)
74+
<context_key>: <processed_output>
75+
}
76+
``````
77+
78+
### Example for Local Run with Test Data
79+
80+
``````python
81+
from flink_agents.api.execution_environment import AgentsExecutionEnvironment
82+
from my_module.agents import MyAgent # Replace with your actual agent path
83+
84+
if __name__ == "__main__":
85+
# 1. Initialize environment
86+
env = AgentsExecutionEnvironment.get_execution_environment()
87+
88+
# 2. Prepare test data
89+
input_data = [
90+
{"key": "0001", "value": "Calculate the sum of 1 and 2."},
91+
{"key": "0002", "value": "Tell me a joke about cats."}
92+
]
93+
94+
# 3. Create agent instance
95+
agent = MyAgent()
96+
97+
# 4. Build pipeline
98+
output_data = env.from_list(input_data) \
99+
.apply(agent) \
100+
.to_list()
101+
102+
# 5. Execute and show results
103+
env.execute()
104+
105+
print("\nExecution Results:")
106+
for record in output_data:
107+
for key, value in record.items():
108+
print(f"{key}: {value}")
109+
110+
``````
30111

31112
## Local Run with Flink MiniCluster
32113

33-
{{< hint warning >}}
34-
**TODO**: How to run with Flink MiniCluster locally.
114+
After completing the [installation of flink-agents]({{< ref "docs/get-started/installation" >}}) and [building your agent]({{< ref "docs/development/workflow_agent" >}}), you can test and execute your agent locally using a **Flink MiniCluster embedded in Python**. This allows you to simulate a real Flink streaming environment without deploying to a full cluster. For more details about how to integrate agents with Flink's `DataStream` or `Table`, please refer to the [Integrate with Flink]({{< ref "docs/development/integrate_with_flink" >}}) documentation.
115+
116+
{{< hint info >}}
117+
118+
If you encounter the exception "No module named 'flink_agents'" when running with Flink MiniCluster, you can set the PYTHONPATH by adding os.environ["PYTHONPATH"] = sysconfig.get_paths()["purelib"] at the beginning of your code.
119+
35120
{{< /hint >}}
36121

37122
## Run in Flink Cluster
38123

39-
{{< hint warning >}}
40-
**TODO**: How to run in Flink Cluster.
41-
{{< /hint >}}
124+
Flink Agents jobs are deployed and run on the cluster similarly to Pyflink jobs. You can refer to the instructions for [submit pyflink jobs](https://https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/deployment/cli/#submitting-pyflink-jobs) for more detailed steps. The following explains how to submit Flink Agents jobs to the cluster, using a Standalone cluster as an example.
125+
126+
### Prepare Flink Agents
127+
128+
We recommand creating a Python virtual environment to install the Flink Agents Python library.
129+
130+
Follow the [instructions]({{< ref "docs/get-started/installation" >}}) to install the Flink Agents Python and Java libraries.
131+
132+
### Deploy a Standalone Flink Cluster
133+
134+
Download a stable release of Flink 1.20.3, then extract the archive. You can refer to the [local installation](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/try-flink/local_installation/) instructions for more detailed step.
135+
136+
```bash
137+
curl -LO https://archive.apache.org/dist/flink/flink-1.20.3/flink-1.20.3-bin-scala_2.12.tgz
138+
tar -xzf flink-1.20.3-bin-scala_2.12.tgz
139+
```
140+
141+
Deploy a standalone Flink cluster in your local environment with the following command.
142+
143+
```bash
144+
./flink-1.20.3/bin/start-cluster.sh
145+
```
146+
147+
You should be able to navigate to the web UI at [localhost:8081](localhost:8081) to view the Flink dashboard and see that the cluster is up and running.
148+
149+
### Submit to Flink Cluster
150+
```bash
151+
# Set Python environment variable to locate Python libraries , ensuring Flink
152+
# can find Python dependencies
153+
export PYTHONPATH=$(python -c 'import sysconfig; print(sysconfig.get_paths()["purelib"])')
154+
155+
# Run Flink Python Job
156+
# 1. Path note: Replace ./flink-1.20.3 with your actual Flink installation directory
157+
# 2. -py parameter specifies the Python script entry file
158+
# 3. Ensure /path/to/flink_agents_job.py is replaced with your actual file path
159+
./flink-1.20.3/bin/flink run -py /path/to/flink_agents_job.py
160+
```
161+
162+
Now you should see a Flink job submitted to the Flink Cluster in Flink web UI [localhost:8081](localhost:8081)
163+
164+
After a few minutes, you can check for the output in the TaskManager output log.

0 commit comments

Comments
 (0)