From b7b6156c5b2f3253b964a7a75a5c52b6a55c01e1 Mon Sep 17 00:00:00 2001 From: youjin Date: Mon, 29 Sep 2025 18:57:49 +0800 Subject: [PATCH 1/5] [docs] Update the deployment introduction for Flink agents --- docs/content/docs/operations/deployment.md | 139 +++++++++++++++++++-- 1 file changed, 131 insertions(+), 8 deletions(-) diff --git a/docs/content/docs/operations/deployment.md b/docs/content/docs/operations/deployment.md index 18b3c234..2af7f6c2 100644 --- a/docs/content/docs/operations/deployment.md +++ b/docs/content/docs/operations/deployment.md @@ -24,18 +24,141 @@ under the License. ## Local Run with Test Data -{{< hint warning >}} -**TODO**: How to run with test data with LocalExecutorEnvironment. -{{< /hint >}} +After completing the [installation of flink-agents]({{< ref "docs/get-started/installation" >}}) and [building your agent]({{< ref "docs/development/workflow_agent" >}}), you can test and execute your agent locally using a simple Python script. This allows you to validate logic without requiring a Flink cluster. + +### Key Features + +- **No Flink Required**: Local execution is ideal for development and testing. +- **Test Data Simulation**: Easily inject mock inputs for validation. +- **IDE Compatibility**: Run directly in your preferred development environment. + +### Data Format + +#### Input Data Format + +The input data should be a list of dictionaries (`List[Dict[str, Any]]`) with the following structure: + +``````python +[ + { + # Optional field: Context key for multi-session management + # Priority order: "key" > "k" > auto-generated UUID + "key": "session_001", # or use shorthand "k": "session_001" + + # Required field: Input content (supports text, JSON, or any serializable type) + # This becomes the `input` field in InputEvent + "value": "Calculate the sum of 1 and 2.", # or shorthand "v": "..." + }, + ... +] +`````` + +#### Output Data Format + +The output data is a list of dictionaries (`List[Dict[str, Any]]`) where each dictionary contains a single key-value pair representing the processed result. The structure is generated from `OutputEvent` objects: + +``````python +[ + {key_1: output_1}, # From first OutputEvent + {key_2: output_2}, # From second OutputEvent + ... +] +`````` + +Each dictionary in the output list follows this pattern: + +`````` +{ + # Key: Matches the input context key (from "key"/"k" field or auto-generated UUID) + # Value: Result from agent processing (type depends on implementation) + : +} +`````` + +### Example for Local Run with Test Data + +``````python +from flink_agents.api.execution_environment import AgentsExecutionEnvironment +from my_module.agents import MyAgent # Replace with your actual agent path + +if __name__ == "__main__": + # 1. Initialize environment + env = AgentsExecutionEnvironment.get_execution_environment() + + # 2. Prepare test data + input_data = [ + {"key": "0001", "value": "Calculate the sum of 1 and 2."}, + {"key": "0002", "value": "Tell me a joke about cats."} + ] + + # 3. Create agent instance + agent = MyAgent() + + # 4. Build pipeline + output_data = env.from_list(input_data) \ + .apply(agent) \ + .to_list() + + # 5. Execute and show results + env.execute() + + print("\nExecution Results:") + for record in output_data: + for key, value in record.items(): + print(f"{key}: {value}") + +`````` ## Local Run with Flink MiniCluster -{{< hint warning >}} -**TODO**: How to run with Flink MiniCluster locally. +After completing the [installation of flink-agents]({{< ref "docs/get-started/installation" >}}) and [building your agent]({{< ref "docs/development/workflow_agent" >}}), you can test and execute your agent locally using a **Flink MiniCluster embedded in Python**. This allows you to simulate a real Flink streaming environment without deploying to a full cluster. For more details about how to integrate agents with Flink's `DataStream` or `Table`, please refer to the [Integrate with Flink]({{< ref "docs/development/integrate_with_flink" >}}) documentation. + +{{< hint info >}} + +If you encounter the exception "No module named 'flink_agents'" when running with Flink MiniCluster, you can set the PYTHONPATH by adding os.environ["PYTHONPATH"] = sysconfig.get_paths()["purelib"] at the beginning of your code. + {{< /hint >}} ## Run in Flink Cluster -{{< hint warning >}} -**TODO**: How to run in Flink Cluster. -{{< /hint >}} \ No newline at end of file +Flink Agents jobs are deployed and run on the cluster similarly to Pyflink jobs. You can refer to the instructions for [submit pyflink jobs](https://https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/deployment/cli/#submitting-pyflink-jobs) for more detailed steps. The following explains how to submit Flink Agents jobs to the cluster, using a Standalone cluster as an example. + +### Prepare Flink Agents + +We recommand creating a Python virtual environment to install the Flink Agents Python library. + +Follow the [instructions]({{< ref "docs/get-started/installation" >}}) to install the Flink Agents Python and Java libraries. + +### Deploy a Standalone Flink Cluster + +Download a stable release of Flink 1.20.3, then extract the archive. You can refer to the [local installation](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/try-flink/local_installation/) instructions for more detailed step. + +```bash +curl -LO https://archive.apache.org/dist/flink/flink-1.20.3/flink-1.20.3-bin-scala_2.12.tgz +tar -xzf flink-1.20.3-bin-scala_2.12.tgz +``` + +Deploy a standalone Flink cluster in your local environment with the following command. + +```bash +./flink-1.20.3/bin/start-cluster.sh +``` + +You should be able to navigate to the web UI at [localhost:8081](localhost:8081) to view the Flink dashboard and see that the cluster is up and running. + +### Submit to Flink Cluster +```bash +# Set Python environment variable to locate Python libraries , ensuring Flink +# can find Python dependencies +export PYTHONPATH=$(python -c 'import sysconfig; print(sysconfig.get_paths()["purelib"])') + +# Run Flink Python Job +# 1. Path note: Replace ./flink-1.20.3 with your actual Flink installation directory +# 2. -py parameter specifies the Python script entry file +# 3. Ensure /path/to/flink_agents_job.py is replaced with your actual file path +./flink-1.20.3/bin/flink run -py /path/to/flink_agents_job.py +``` + +Now you should see a Flink job submitted to the Flink Cluster in Flink web UI [localhost:8081](localhost:8081) + +After a few minutes, you can check for the output in the TaskManager output log. From 900561ff16605e0c80fcc0d0c4b5330633b93ee1 Mon Sep 17 00:00:00 2001 From: Eugene Liu Date: Thu, 2 Oct 2025 16:31:43 +0800 Subject: [PATCH 2/5] address comments --- docs/content/docs/operations/deployment.md | 172 +++++++++++++-------- 1 file changed, 105 insertions(+), 67 deletions(-) diff --git a/docs/content/docs/operations/deployment.md b/docs/content/docs/operations/deployment.md index 2af7f6c2..c0cbd703 100644 --- a/docs/content/docs/operations/deployment.md +++ b/docs/content/docs/operations/deployment.md @@ -22,6 +22,41 @@ specific language governing permissions and limitations under the License. --> + + +## Overall + +We provide a total of three ways to run the job: Local Run with Test Data, Local Run with Flink MiniCluster, and Run in Flink Cluster. The detailed differences are shown in the table below: + + + + + + + + + + + + + + + + + + + + + + + + + + +
Deployment ModeLanguage SupportTypical Use Case
Local Run with Test DataOnly PythonValidate the internal logic of the Agent.
Local Run with Flink MiniClusterPython & JavaVerify upstream/downstream connectivity and schema correctness.
Run in Flink ClusterPython & JavaLarge-scale data and AI processing in production environments.
+ + + ## Local Run with Test Data After completing the [installation of flink-agents]({{< ref "docs/get-started/installation" >}}) and [building your agent]({{< ref "docs/development/workflow_agent" >}}), you can test and execute your agent locally using a simple Python script. This allows you to validate logic without requiring a Flink cluster. @@ -32,51 +67,10 @@ After completing the [installation of flink-agents]({{< ref "docs/get-started/in - **Test Data Simulation**: Easily inject mock inputs for validation. - **IDE Compatibility**: Run directly in your preferred development environment. -### Data Format - -#### Input Data Format - -The input data should be a list of dictionaries (`List[Dict[str, Any]]`) with the following structure: - -``````python -[ - { - # Optional field: Context key for multi-session management - # Priority order: "key" > "k" > auto-generated UUID - "key": "session_001", # or use shorthand "k": "session_001" - - # Required field: Input content (supports text, JSON, or any serializable type) - # This becomes the `input` field in InputEvent - "value": "Calculate the sum of 1 and 2.", # or shorthand "v": "..." - }, - ... -] -`````` - -#### Output Data Format - -The output data is a list of dictionaries (`List[Dict[str, Any]]`) where each dictionary contains a single key-value pair representing the processed result. The structure is generated from `OutputEvent` objects: - -``````python -[ - {key_1: output_1}, # From first OutputEvent - {key_2: output_2}, # From second OutputEvent - ... -] -`````` - -Each dictionary in the output list follows this pattern: - -`````` -{ - # Key: Matches the input context key (from "key"/"k" field or auto-generated UUID) - # Value: Result from agent processing (type depends on implementation) - : -} -`````` - ### Example for Local Run with Test Data +#### Complete code + ``````python from flink_agents.api.execution_environment import AgentsExecutionEnvironment from my_module.agents import MyAgent # Replace with your actual agent path @@ -109,19 +103,57 @@ if __name__ == "__main__": `````` -## Local Run with Flink MiniCluster +#### Input Data Format -After completing the [installation of flink-agents]({{< ref "docs/get-started/installation" >}}) and [building your agent]({{< ref "docs/development/workflow_agent" >}}), you can test and execute your agent locally using a **Flink MiniCluster embedded in Python**. This allows you to simulate a real Flink streaming environment without deploying to a full cluster. For more details about how to integrate agents with Flink's `DataStream` or `Table`, please refer to the [Integrate with Flink]({{< ref "docs/development/integrate_with_flink" >}}) documentation. +The input data should be a list of dictionaries (`List[Dict[str, Any]]`) with the following structure: -{{< hint info >}} +``````python +[ + { + # Optional field: Input key + "key": "key_1", + + # Required field: Input content + # This becomes the `input` field in InputEvent + "value": "Calculate the sum of 1 and 2.", + }, + ... +] +`````` + +#### Output Data Format + +The output data is a list of dictionaries (`List[Dict[str, Any]]`) where each dictionary contains a single key-value pair representing the processed result. The structure is generated from `OutputEvent` objects: + +``````python +[ + {key_1: output_1}, # From first OutputEvent + {key_2: output_2}, # From second OutputEvent + ... +] +`````` + + + +## Local Run with Flink MiniCluster + +After completing the [installation of flink-agents]({{< ref "docs/get-started/installation" >}}) and [building your agent]({{< ref "docs/development/workflow_agent" >}}), you can test and execute your agent locally using a **Flink MiniCluster**. This allows you to have a lightweight Flink streaming environment without deploying to a full cluster. For more details about how to integrate agents with Flink's `DataStream` or `Table`, please refer to the [Integrate with Flink]({{< ref "docs/development/integrate_with_flink" >}}) documentation. -If you encounter the exception "No module named 'flink_agents'" when running with Flink MiniCluster, you can set the PYTHONPATH by adding os.environ["PYTHONPATH"] = sysconfig.get_paths()["purelib"] at the beginning of your code. -{{< /hint >}} ## Run in Flink Cluster -Flink Agents jobs are deployed and run on the cluster similarly to Pyflink jobs. You can refer to the instructions for [submit pyflink jobs](https://https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/deployment/cli/#submitting-pyflink-jobs) for more detailed steps. The following explains how to submit Flink Agents jobs to the cluster, using a Standalone cluster as an example. +Flink Agents jobs are deployed and run on the cluster similarly to Pyflink jobs. You can refer to the instructions for [submitting PyFlink jobs](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/deployment/cli/#submitting-pyflink-jobs) for more details. + + + +### Prerequisites + +- **Operating System**: Unix-like environment (Linux, macOS, Cygwin, or WSL) +- **Python**: Version 3.10 or 3.11 +- **Flink**: A running Flink cluster with the Flink Agents dependency installed + + ### Prepare Flink Agents @@ -129,7 +161,9 @@ We recommand creating a Python virtual environment to install the Flink Agents P Follow the [instructions]({{< ref "docs/get-started/installation" >}}) to install the Flink Agents Python and Java libraries. -### Deploy a Standalone Flink Cluster + + +### Prepare Flink Download a stable release of Flink 1.20.3, then extract the archive. You can refer to the [local installation](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/try-flink/local_installation/) instructions for more detailed step. @@ -138,27 +172,31 @@ curl -LO https://archive.apache.org/dist/flink/flink-1.20.3/flink-1.20.3-bin-sca tar -xzf flink-1.20.3-bin-scala_2.12.tgz ``` -Deploy a standalone Flink cluster in your local environment with the following command. -```bash -./flink-1.20.3/bin/start-cluster.sh -``` - -You should be able to navigate to the web UI at [localhost:8081](localhost:8081) to view the Flink dashboard and see that the cluster is up and running. ### Submit to Flink Cluster ```bash -# Set Python environment variable to locate Python libraries , ensuring Flink -# can find Python dependencies -export PYTHONPATH=$(python -c 'import sysconfig; print(sysconfig.get_paths()["purelib"])') - # Run Flink Python Job -# 1. Path note: Replace ./flink-1.20.3 with your actual Flink installation directory -# 2. -py parameter specifies the Python script entry file -# 3. Ensure /path/to/flink_agents_job.py is replaced with your actual file path -./flink-1.20.3/bin/flink run -py /path/to/flink_agents_job.py +# ------------------------------------------------------------------------ +# 1. Path Note: +# Replace "./flink-1.20.3" with the actual Flink installation directory. +# +# 2. Python Entry File: +# The "--python" parameter specifies the Python script to be executed. +# Replace "/path/to/flink_agents_job.py" with the full path to your job file. +# +# 3. JobManager Address: +# Replace "" with the hostname or IP address of the Flink JobManager. +# The default REST port is 8081. +# +# 4. Example: +# ./flink-1.20.3/bin/flink run \ +# --jobmanager localhost:8081 \ +# --python /home/user/flink_jobs/flink_agents_job.py +# ------------------------------------------------------------------------ +./flink-1.20.3/bin/flink run \ + --jobmanager :8081 \ + --python /path/to/flink_agents_job.py ``` -Now you should see a Flink job submitted to the Flink Cluster in Flink web UI [localhost:8081](localhost:8081) - -After a few minutes, you can check for the output in the TaskManager output log. +Now you should see a Flink job submitted to the Flink Cluster in Flink web UI. After a few minutes, you can check for the output in the TaskManager output log. From 43a52f6a8a719907e8b2989fca958c1f87981e76 Mon Sep 17 00:00:00 2001 From: youjin Date: Mon, 6 Oct 2025 15:15:01 +0800 Subject: [PATCH 3/5] address comments address comments --- docs/content/docs/operations/deployment.md | 95 +++++++--------------- 1 file changed, 28 insertions(+), 67 deletions(-) diff --git a/docs/content/docs/operations/deployment.md b/docs/content/docs/operations/deployment.md index c0cbd703..ec9de4ab 100644 --- a/docs/content/docs/operations/deployment.md +++ b/docs/content/docs/operations/deployment.md @@ -22,44 +22,19 @@ specific language governing permissions and limitations under the License. --> - - -## Overall +## Overview We provide a total of three ways to run the job: Local Run with Test Data, Local Run with Flink MiniCluster, and Run in Flink Cluster. The detailed differences are shown in the table below: - - - - - - - - - - - - - - - - - - - - - - - - - -
Deployment ModeLanguage SupportTypical Use Case
Local Run with Test DataOnly PythonValidate the internal logic of the Agent.
Local Run with Flink MiniClusterPython & JavaVerify upstream/downstream connectivity and schema correctness.
Run in Flink ClusterPython & JavaLarge-scale data and AI processing in production environments.
- - +| Deployment Mode | Language Support | Typical Use Case | +|-------------------------------------|------------------------|------------------------------------------------------------| +| Local Run with Test Data | Only Python | Validate the internal logic of the Agent. | +| Local Run with Flink MiniCluster | Python & Java | Verify upstream/downstream connectivity and schema correctness. | +| Run in Flink Cluster | Python & Java | Large-scale data and AI processing in production environments. | ## Local Run with Test Data -After completing the [installation of flink-agents]({{< ref "docs/get-started/installation" >}}) and [building your agent]({{< ref "docs/development/workflow_agent" >}}), you can test and execute your agent locally using a simple Python script. This allows you to validate logic without requiring a Flink cluster. +After completing the [installation of flink-agents]({{< ref "docs/get-started/installation" >}}) and building your [ReAct Agent]({{< ref "docs/development/react_agent" >}}) or [workflow agent]({{< ref "docs/development/workflow_agent" >}}), you can test and execute your agent locally using a simple Python script. This allows you to validate logic without requiring a Flink cluster. ### Key Features @@ -69,9 +44,7 @@ After completing the [installation of flink-agents]({{< ref "docs/get-started/in ### Example for Local Run with Test Data -#### Complete code - -``````python +```python from flink_agents.api.execution_environment import AgentsExecutionEnvironment from my_module.agents import MyAgent # Replace with your actual agent path @@ -101,13 +74,13 @@ if __name__ == "__main__": for key, value in record.items(): print(f"{key}: {value}") -`````` +``` #### Input Data Format -The input data should be a list of dictionaries (`List[Dict[str, Any]]`) with the following structure: +The input data should be a list of dictionaries `List[Dict[str, Any]]` with the following structure: -``````python +```python [ { # Optional field: Input key @@ -119,41 +92,39 @@ The input data should be a list of dictionaries (`List[Dict[str, Any]]`) with th }, ... ] -`````` +``` #### Output Data Format -The output data is a list of dictionaries (`List[Dict[str, Any]]`) where each dictionary contains a single key-value pair representing the processed result. The structure is generated from `OutputEvent` objects: +The output data is a list of dictionaries `List[Dict[str, Any]]` where each dictionary contains a single key-value pair representing the processed result. The structure is generated from `OutputEvent` objects: -``````python +```python [ - {key_1: output_1}, # From first OutputEvent - {key_2: output_2}, # From second OutputEvent + {key_1: output_1}, # From first OutputEvent; key is randomly generated if it is not provided in input + {key_2: output_2}, # From second OutputEvent; key is randomly generated if it is not provided in input ... ] -`````` - - +``` ## Local Run with Flink MiniCluster -After completing the [installation of flink-agents]({{< ref "docs/get-started/installation" >}}) and [building your agent]({{< ref "docs/development/workflow_agent" >}}), you can test and execute your agent locally using a **Flink MiniCluster**. This allows you to have a lightweight Flink streaming environment without deploying to a full cluster. For more details about how to integrate agents with Flink's `DataStream` or `Table`, please refer to the [Integrate with Flink]({{< ref "docs/development/integrate_with_flink" >}}) documentation. - +After completing the [installation of flink-agents]({{< ref "docs/get-started/installation" >}}) and [building your agent]({{< ref "docs/development/workflow_agent" >}}), you can test and execute your agent locally using a **Flink MiniCluster**. This allows you to have a lightweight Flink streaming environment without deploying to a full cluster. +To run your job locally with the MiniCluster, use the following command: -## Run in Flink Cluster - -Flink Agents jobs are deployed and run on the cluster similarly to Pyflink jobs. You can refer to the instructions for [submitting PyFlink jobs](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/deployment/cli/#submitting-pyflink-jobs) for more details. +```bash +python /path/to/flink_agents_job.py +``` +For more details about how to integrate agents with Flink's `DataStream` or `Table`, please refer to the [Integrate with Flink]({{< ref "docs/development/integrate_with_flink" >}}) documentation. +## Run in Flink Cluster ### Prerequisites - **Operating System**: Unix-like environment (Linux, macOS, Cygwin, or WSL) - **Python**: Version 3.10 or 3.11 -- **Flink**: A running Flink cluster with the Flink Agents dependency installed - - +- **Flink**: A running Flink cluster with version 1.20.3 and the Flink Agents dependency installed ### Prepare Flink Agents @@ -161,20 +132,10 @@ We recommand creating a Python virtual environment to install the Flink Agents P Follow the [instructions]({{< ref "docs/get-started/installation" >}}) to install the Flink Agents Python and Java libraries. +### Submit to Flink Cluster +Submitting Flink Agent jobs to the Flink Cluster is the same as submitting PyFlink jobs. For more details on all available options, please refer to the [Flink CLI documentation](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/deployment/cli/#submitting-pyflink-jobs). -### Prepare Flink - -Download a stable release of Flink 1.20.3, then extract the archive. You can refer to the [local installation](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/try-flink/local_installation/) instructions for more detailed step. - -```bash -curl -LO https://archive.apache.org/dist/flink/flink-1.20.3/flink-1.20.3-bin-scala_2.12.tgz -tar -xzf flink-1.20.3-bin-scala_2.12.tgz -``` - - - -### Submit to Flink Cluster ```bash # Run Flink Python Job # ------------------------------------------------------------------------ @@ -199,4 +160,4 @@ tar -xzf flink-1.20.3-bin-scala_2.12.tgz --python /path/to/flink_agents_job.py ``` -Now you should see a Flink job submitted to the Flink Cluster in Flink web UI. After a few minutes, you can check for the output in the TaskManager output log. +Now you should see a Flink job submitted to the Flink Cluster in Flink web UI (typically accessible at http://<jobmanagerHost>:8081). From 0388d416a28420fe8c80790f1ab1e3675f6b251c Mon Sep 17 00:00:00 2001 From: youjin Date: Tue, 7 Oct 2025 17:44:12 +0800 Subject: [PATCH 4/5] address comments --- docs/content/docs/operations/deployment.md | 67 ++++++---------------- 1 file changed, 18 insertions(+), 49 deletions(-) diff --git a/docs/content/docs/operations/deployment.md b/docs/content/docs/operations/deployment.md index ec9de4ab..5dabc974 100644 --- a/docs/content/docs/operations/deployment.md +++ b/docs/content/docs/operations/deployment.md @@ -24,23 +24,21 @@ under the License. ## Overview -We provide a total of three ways to run the job: Local Run with Test Data, Local Run with Flink MiniCluster, and Run in Flink Cluster. The detailed differences are shown in the table below: +We provide two options to run the job: -| Deployment Mode | Language Support | Typical Use Case | -|-------------------------------------|------------------------|------------------------------------------------------------| -| Local Run with Test Data | Only Python | Validate the internal logic of the Agent. | -| Local Run with Flink MiniCluster | Python & Java | Verify upstream/downstream connectivity and schema correctness. | -| Run in Flink Cluster | Python & Java | Large-scale data and AI processing in production environments. | +- **Run without Flink** + - **Language Support**: Only Python + - **Input and Output**: Python List -## Local Run with Test Data +- **Run in Flink** + - **Language Support**: Python & Java + - **Input and Output**: DataStream or Table -After completing the [installation of flink-agents]({{< ref "docs/get-started/installation" >}}) and building your [ReAct Agent]({{< ref "docs/development/react_agent" >}}) or [workflow agent]({{< ref "docs/development/workflow_agent" >}}), you can test and execute your agent locally using a simple Python script. This allows you to validate logic without requiring a Flink cluster. +These deployment modes differ in supported languages and data formats, allowing you to choose the one that best fits your use case. -### Key Features +## Run without Flink -- **No Flink Required**: Local execution is ideal for development and testing. -- **Test Data Simulation**: Easily inject mock inputs for validation. -- **IDE Compatibility**: Run directly in your preferred development environment. +After completing the [installation of flink-agents]({{< ref "docs/get-started/installation" >}}) and building your [ReAct Agent]({{< ref "docs/development/react_agent" >}}) or [workflow agent]({{< ref "docs/development/workflow_agent" >}}), you can test and execute your agent locally using a simple Python script. This allows you to validate logic without requiring a Flink cluster. ### Example for Local Run with Test Data @@ -83,7 +81,8 @@ The input data should be a list of dictionaries `List[Dict[str, Any]]` with the ```python [ { - # Optional field: Input key + # Optional field: Input key. + # The key is randomly generated if not provided. "key": "key_1", # Required field: Input content @@ -100,25 +99,13 @@ The output data is a list of dictionaries `List[Dict[str, Any]]` where each dict ```python [ - {key_1: output_1}, # From first OutputEvent; key is randomly generated if it is not provided in input - {key_2: output_2}, # From second OutputEvent; key is randomly generated if it is not provided in input + {key_1: output_1}, # From first OutputEvent + {key_2: output_2}, # From second OutputEvent ... ] ``` -## Local Run with Flink MiniCluster - -After completing the [installation of flink-agents]({{< ref "docs/get-started/installation" >}}) and [building your agent]({{< ref "docs/development/workflow_agent" >}}), you can test and execute your agent locally using a **Flink MiniCluster**. This allows you to have a lightweight Flink streaming environment without deploying to a full cluster. - -To run your job locally with the MiniCluster, use the following command: - -```bash -python /path/to/flink_agents_job.py -``` - -For more details about how to integrate agents with Flink's `DataStream` or `Table`, please refer to the [Integrate with Flink]({{< ref "docs/development/integrate_with_flink" >}}) documentation. - -## Run in Flink Cluster +## Run in Flink ### Prerequisites @@ -137,27 +124,9 @@ Follow the [instructions]({{< ref "docs/get-started/installation" >}}) to instal Submitting Flink Agent jobs to the Flink Cluster is the same as submitting PyFlink jobs. For more details on all available options, please refer to the [Flink CLI documentation](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/deployment/cli/#submitting-pyflink-jobs). ```bash -# Run Flink Python Job -# ------------------------------------------------------------------------ -# 1. Path Note: -# Replace "./flink-1.20.3" with the actual Flink installation directory. -# -# 2. Python Entry File: -# The "--python" parameter specifies the Python script to be executed. -# Replace "/path/to/flink_agents_job.py" with the full path to your job file. -# -# 3. JobManager Address: -# Replace "" with the hostname or IP address of the Flink JobManager. -# The default REST port is 8081. -# -# 4. Example: -# ./flink-1.20.3/bin/flink run \ -# --jobmanager localhost:8081 \ -# --python /home/user/flink_jobs/flink_agents_job.py -# ------------------------------------------------------------------------ -./flink-1.20.3/bin/flink run \ - --jobmanager :8081 \ - --python /path/to/flink_agents_job.py +/bin/flink run \ + --jobmanager \ + --python ``` Now you should see a Flink job submitted to the Flink Cluster in Flink web UI (typically accessible at http://<jobmanagerHost>:8081). From 289664ae6f10323d9ece0f9e0625e359ecf746c6 Mon Sep 17 00:00:00 2001 From: Xintong Song Date: Tue, 7 Oct 2025 19:26:46 +0800 Subject: [PATCH 5/5] minor fix --- docs/content/docs/operations/deployment.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/content/docs/operations/deployment.md b/docs/content/docs/operations/deployment.md index 5dabc974..71c7841b 100644 --- a/docs/content/docs/operations/deployment.md +++ b/docs/content/docs/operations/deployment.md @@ -29,10 +29,12 @@ We provide two options to run the job: - **Run without Flink** - **Language Support**: Only Python - **Input and Output**: Python List + - **Suitable Use Case**: Local Testing and Debugging - **Run in Flink** - **Language Support**: Python & Java - **Input and Output**: DataStream or Table + - **Suitable Use Case**: Production These deployment modes differ in supported languages and data formats, allowing you to choose the one that best fits your use case.