Replies: 3 comments 2 replies
-
Adding to the problem section: Model inference / generation is non-deterministic. Repeating model calls multiple times with identical inputs may result in different outputs. This creates inconsistency, especially when we rely on the model outputs to decide what to do next (reasoning, tool calls, etc.). What should we do with the half performed actions when the model makes a different decision after recovery? I think the best way is to never ask the model to make a decision twice. |
Beta Was this translation helpful? Give feedback.
-
@xintongsong this doc should be in a good state. One question i don't know the answer to is, is there anyway for the operator or any notification channel we will know a checkpoint is committed? If so, we can use that as a signal to do GC. |
Beta Was this translation helpful? Give feedback.
-
@letaoj Thanks for updating the design doc based on our offline discussion. I think the overall design is quite good now. I just have a few more comments on the details.
|
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
Uh oh!
There was an error while loading. Please reload this page.
-
Background
Flink Agents execute various actions during record processing, including model inference and tool invocation. Model inference involves calling LLMs for reasoning, classification, or generation tasks, often through expensive API calls to external providers. Tool invocation allows agents to interact with external systems through UDFs with network access, with native support for Model Context Protocol (MCP). These actions enable agents to perform contextual searches, execute business logic, interact with enterprise systems, and invoke specialized processing services.
Problem
Side Effects and Costs from Action Replay
While Flink provides exactly-once processing guarantees for stream processing on a per message basis, agent actions create challenges around side effects, costs, and recovery semantics. Both model inference and tool invocation can produce effects that persist beyond the agent's execution context or incur significant costs that should not be duplicated.
The core problem occurs when:
This creates several issues:
Non-Deterministic Model Outputs
A critical additional challenge is that model inference and generation is inherently non-deterministic. Repeating model calls multiple times with identical inputs may result in different outputs due to sampling, temperature settings, or model provider variations. This creates severe consistency problems when model outputs drive downstream decisions such as reasoning chains or tool selection.
Consider this scenario: an agent makes a model call that decides to invoke Tool A, but crashes before completion. Upon recovery, the same model call with identical inputs may decide to invoke Tool B instead. This leaves the system in an inconsistent state where Tool A was already executed based on the first decision, but the agent now wants to execute Tool B based on the second decision. The best approach is to ensure the model never makes the same decision twice - the original model output should be preserved and reused during recovery.
Flink's streaming architecture introduces additional complexity through continuous processing on unbounded streams, distributed state management, back-pressure from action failures, and a semantic gap where exactly-once guarantees don't extend to external model providers or tool endpoints.
Goals and Non-Goals
Goals
Non-Goals
High-Level Design
Execution Flow for Static Agent
From the above diagram, the save state will evolve like below
<message_key>-<action_hash_1>: {"request": request}
<message_key>-<action_hash_1>: {"request": request, "response": response}
<message_key>-<action_hash_1>: {"request": request, "response": response, "short-term-memory-updates": [...]}
<message_key>-<action_hash_1>: {"request": request, "response": response, "short-term-memory-updates": [...], "output_event": [output_event]}
<message_key>-<action_hash_2>: ...
<message_key>-<action_hash_2>: ...
<message_key>-<action_hash_2>: ...
<message_key>-<action_hash_2>: ...
<message_key>-<action_hash_3>: ...
APIs
Task Action State
Memory Updates
Action Result Store
Action result store is an abstract layer to the external database which handles the serialization/deserialization from/to AgentState
External Database
Database Consideration
Below are some characters of the agent state to consider when picking the right external DB:
Data Retention
To prevent unbounded growth in backend storage, we need to implement a data retention policy since this data is only required for failure recovery. Once a Flink checkpoint is successfully committed, we should automatically delete all data that precedes that checkpoint, ensuring storage remains manageable while maintaining the necessary recovery capabilities. This can be achieved by listen and act on
notifyCheckpointComplete
sent by flink after each checkpoint.Viable solution
A practical solution for the external database is to use Kafka. All the state will be written to Kafka as a separate message. The per-partition offset will be recorded in the flink state. During recovery, Flink agent will get the latest offset information from the latest checkpoint and reading from the offset until the end to recover the task action state.
There are a couple of drawbacks of using Kafka as the data store
Beta Was this translation helpful? Give feedback.
All reactions