|
| 1 | +--- |
| 2 | +layout: default |
| 3 | +title: Execute stream agent |
| 4 | +parent: Agent APIs |
| 5 | +grand_parent: ML Commons APIs |
| 6 | +nav_order: 25 |
| 7 | +--- |
| 8 | + |
| 9 | +# Execute Stream Agent API |
| 10 | +**Introduced 3.3** |
| 11 | +{: .label .label-purple } |
| 12 | + |
| 13 | +This is an experimental feature and is not recommended for use in a production environment. For updates on the progress of the feature or if you want to leave feedback, join the discussion on the [OpenSearch forum](https://forum.opensearch.org/). |
| 14 | +{: .warning} |
| 15 | + |
| 16 | +The Execute Stream Agent API provides the same functionality as the [Execute Agent API]({{site.url}}{{site.baseurl}}/ml-commons-plugin/api/agent-apis/execute-agent/) but returns responses in a streaming format, delivering data in chunks as it becomes available. This streaming approach is particularly beneficial for large language model interactions with lengthy responses, allowing you to see partial results immediately rather than waiting for the complete response. |
| 17 | + |
| 18 | +This API currently supports conversational agents with the following remote model types: |
| 19 | +- [OpenAI Chat Completion](https://platform.openai.com/docs/api-reference/completions) |
| 20 | +- [Amazon Bedrock Converse Stream](https://docs.aws.amazon.com/bedrock/latest/APIReference/API_runtime_ConverseStream.html) |
| 21 | + |
| 22 | +## Endpoint |
| 23 | + |
| 24 | +```json |
| 25 | +POST /_plugins/_ml/agents/<agent_id>/_execute/stream |
| 26 | +``` |
| 27 | + |
| 28 | +## Prerequisites |
| 29 | + |
| 30 | +Before using this API, ensure that you have fulfilled the following prerequisites. |
| 31 | + |
| 32 | +### Set up your cluster |
| 33 | + |
| 34 | +Follow these steps to set up your cluster. |
| 35 | + |
| 36 | +#### Step 1: Install the required plugins |
| 37 | + |
| 38 | +The Execute Stream Agent API depends on the following plugins, which are included in the OpenSearch distribution but must be explicitly installed as follows: |
| 39 | + |
| 40 | +```bash |
| 41 | +bin/opensearch-plugin install transport-reactor-netty4 |
| 42 | +bin/opensearch-plugin install arrow-flight-rpc |
| 43 | +``` |
| 44 | + |
| 45 | +For more information, see [Installing plugins]({{site.url}}{{site.baseurl}}/install-and-configure/plugins/). |
| 46 | + |
| 47 | +#### Step 2: Configure OpenSearch settings |
| 48 | + |
| 49 | +Add these settings to your `opensearch.yml` file or Docker Compose configuration: |
| 50 | + |
| 51 | +```yaml |
| 52 | +opensearch.experimental.feature.transport.stream.enabled: true |
| 53 | + |
| 54 | +# Choose one based on your security settings |
| 55 | +http.type: reactor-netty4 # security disabled |
| 56 | +http.type: reactor-netty4-secure # security enabled |
| 57 | + |
| 58 | +# Multi-node cluster settings (if applicable) |
| 59 | +# Use network.host IP for opensearch.yml or node name for Docker |
| 60 | +arrow.flight.publish_host: <ip> |
| 61 | +arrow.flight.bind_host: <ip> |
| 62 | + |
| 63 | +# Security-enabled cluster settings (if applicable) |
| 64 | +transport.stream.type.default: FLIGHT-SECURE |
| 65 | +flight.ssl.enable: true |
| 66 | +transport.ssl.enforce_hostname_verification: false |
| 67 | +``` |
| 68 | +{% include copy.html %} |
| 69 | +
|
| 70 | +If you're using the security demo certificates, change `plugins.security.ssl.transport.enforce_hostname_verification: false` to `transport.ssl.enforce_hostname_verification: false` in your `opensearch.yml` file. |
| 71 | +{: .note} |
| 72 | + |
| 73 | +For more information about enabling experimental features, see [Experimental feature flags]({{site.url}}{{site.baseurl}}/install-and-configure/configuring-opensearch/experimental/). |
| 74 | + |
| 75 | +#### Step 3: Configure JVM options |
| 76 | + |
| 77 | +Add these settings to your `jvm.options` file: |
| 78 | + |
| 79 | +```yaml |
| 80 | +-Dio.netty.allocator.numDirectArenas=1 |
| 81 | +-Dio.netty.noUnsafe=false |
| 82 | +-Dio.netty.tryUnsafe=true |
| 83 | +-Dio.netty.tryReflectionSetAccessible=true |
| 84 | +--add-opens=java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED |
| 85 | +``` |
| 86 | +{% include copy.html %} |
| 87 | + |
| 88 | +### Configure the necessary APIs |
| 89 | + |
| 90 | +Configure the API using the following steps. |
| 91 | + |
| 92 | +#### Step 1: Enable the streaming feature flag |
| 93 | + |
| 94 | +To enable the streaming feature flag, update the cluster settings as follows: |
| 95 | + |
| 96 | +```json |
| 97 | +PUT _cluster/settings |
| 98 | +{ |
| 99 | + "persistent" : { |
| 100 | + "plugins.ml_commons.stream_enabled": true |
| 101 | + } |
| 102 | +} |
| 103 | +``` |
| 104 | +{% include copy-curl.html %} |
| 105 | + |
| 106 | +#### Step 2: Register a compatible externally hosted model |
| 107 | + |
| 108 | +To register an OpenAI Chat Completion model, send the following request: |
| 109 | + |
| 110 | +```json |
| 111 | +POST /_plugins/_ml/models/_register |
| 112 | +{ |
| 113 | + "name": "OpenAI gpt 3.5 turbo", |
| 114 | + "function_name": "remote", |
| 115 | + "description": "OpenAI model", |
| 116 | + "connector": { |
| 117 | + "name": "OpenAI Chat Connector", |
| 118 | + "description": "The connector to OpenAI model service for GPT 3.5", |
| 119 | + "version": 1, |
| 120 | + "protocol": "http", |
| 121 | + "parameters": { |
| 122 | + "endpoint": "api.openai.com", |
| 123 | + "model": "gpt-3.5-turbo" |
| 124 | + }, |
| 125 | + "credential": { |
| 126 | + "openAI_key": "<your_api_key>" |
| 127 | + }, |
| 128 | + "actions": [ |
| 129 | + { |
| 130 | + "action_type": "predict", |
| 131 | + "method": "POST", |
| 132 | + "url": "https://${parameters.endpoint}/v1/chat/completions", |
| 133 | + "headers": { |
| 134 | + "Authorization": "Bearer ${credential.openAI_key}" |
| 135 | + }, |
| 136 | + "request_body": "{ \"model\": \"${parameters.model}\", \"messages\": [{\"role\":\"developer\",\"content\":\"${parameters.system_prompt}\"},${parameters._chat_history:-}{\"role\":\"user\",\"content\":\"${parameters.prompt}\"}${parameters._interactions:-}]${parameters.tool_configs:-} }" |
| 137 | + } |
| 138 | + ] |
| 139 | + } |
| 140 | +} |
| 141 | +``` |
| 142 | +{% include copy-curl.html %} |
| 143 | + |
| 144 | +To register an Amazon Bedrock Converse Stream model, send the following request: |
| 145 | + |
| 146 | +```json |
| 147 | +POST /_plugins/_ml/models/_register |
| 148 | +{ |
| 149 | + "name": "Amazon Bedrock Converse Stream model", |
| 150 | + "function_name": "remote", |
| 151 | + "description": "Amazon Bedrock Claude model", |
| 152 | + "connector": { |
| 153 | + "name": "Amazon Bedrock Converse", |
| 154 | + "description": "The connector to Amazon Bedrock Converse", |
| 155 | + "version": 1, |
| 156 | + "protocol": "aws_sigv4", |
| 157 | + "credential": { |
| 158 | + "access_key": "<your_aws_access_key>", |
| 159 | + "secret_key": "<your_aws_secret_key>", |
| 160 | + "session_token": "<your_aws_session_token>" |
| 161 | + }, |
| 162 | + "parameters": { |
| 163 | + "region": "<your_aws_region>", |
| 164 | + "service_name": "bedrock", |
| 165 | + "model": "us.anthropic.claude-3-7-sonnet-20250219-v1:0" |
| 166 | + }, |
| 167 | + "actions": [ |
| 168 | + { |
| 169 | + "action_type": "predict", |
| 170 | + "method": "POST", |
| 171 | + "headers": { |
| 172 | + "content-type": "application/json" |
| 173 | + }, |
| 174 | + "url": "https://bedrock-runtime.${parameters.region}.amazonaws.com/model/${parameters.model}/converse", |
| 175 | + "request_body": "{ \"system\": [{\"text\": \"${parameters.system_prompt}\"}], \"messages\": [${parameters._chat_history:-}{\"role\":\"user\",\"content\":[{\"text\":\"${parameters.prompt}\"}]}${parameters._interactions:-}]${parameters.tool_configs:-} }" |
| 176 | + } |
| 177 | + ] |
| 178 | + } |
| 179 | +} |
| 180 | +``` |
| 181 | +{% include copy-curl.html %} |
| 182 | + |
| 183 | +#### Step 3: Register a conversational agent |
| 184 | + |
| 185 | +When registering your agent, you must include the `_llm_interface` parameter that corresponds to your model type: |
| 186 | +- OpenAI Chat Completion: `openai/v1/chat/completions` |
| 187 | +- Amazon Bedrock Converse Stream: `bedrock/converse/claude` |
| 188 | + |
| 189 | +To register your agent, send the following request: |
| 190 | + |
| 191 | +```json |
| 192 | +POST /_plugins/_ml/agents/_register |
| 193 | +{ |
| 194 | + "name": "Chat Agent with RAG", |
| 195 | + "type": "conversational", |
| 196 | + "description": "This is a test agent", |
| 197 | + "llm": { |
| 198 | + "model_id": "<model_id_from_step_2>", |
| 199 | + "parameters": { |
| 200 | + "max_iteration": 5, |
| 201 | + "system_prompt": "You are a helpful assistant. You are able to assist with a wide range of tasks, from answering simple questions to providing in-depth explanations and discussions on a wide range of topics.\nIf the question is complex, you will split it into several smaller questions, and solve them one by one. For example, the original question is:\nhow many orders in last three month? Which month has highest?\nYou will spit into several smaller questions:\n1.Calculate total orders of last three month.\n2.Calculate monthly total order of last three month and calculate which months order is highest. You MUST use the available tools everytime to answer the question", |
| 202 | + "prompt": "${parameters.question}" |
| 203 | + } |
| 204 | + }, |
| 205 | + "memory": { |
| 206 | + "type": "conversation_index" |
| 207 | + }, |
| 208 | + "parameters": { |
| 209 | + "_llm_interface": "openai/v1/chat/completions" |
| 210 | + }, |
| 211 | + "tools": [ |
| 212 | + { |
| 213 | + "type": "IndexMappingTool", |
| 214 | + "name": "DemoIndexMappingTool", |
| 215 | + "parameters": { |
| 216 | + "index": "${parameters.index}", |
| 217 | + "input": "${parameters.question}" |
| 218 | + } |
| 219 | + }, |
| 220 | + { |
| 221 | + "type": "ListIndexTool", |
| 222 | + "name": "RetrieveIndexMetaTool", |
| 223 | + "description": "Use this tool to get OpenSearch index information: (health, status, index, uuid, primary count, replica count, docs.count, docs.deleted, store.size, primary.store.size)." |
| 224 | + } |
| 225 | + ], |
| 226 | + "app_type": "my_app" |
| 227 | +} |
| 228 | +``` |
| 229 | +{% include copy-curl.html %} |
| 230 | + |
| 231 | +## Example request |
| 232 | + |
| 233 | +```json |
| 234 | +POST /_plugins/_ml/agents/<agent_id>/_execute/stream |
| 235 | +{ |
| 236 | + "parameters": { |
| 237 | + "question": "How many indices are in my cluster?" |
| 238 | + } |
| 239 | +} |
| 240 | +``` |
| 241 | +{% include copy-curl.html %} |
| 242 | + |
| 243 | +## Example response |
| 244 | + |
| 245 | +```json |
| 246 | +data: {"inference_results":[{"output":[{"name":"memory_id","result":"LvU1iJkBCzHrriq5hXbN"},{"name":"parent_interaction_id","result":"L_U1iJkBCzHrriq5hXbs"},{"name":"response","dataAsMap":{"content":"[{\"index\":0.0,\"id\":\"call_HjpbrbdQFHK0omPYa6m2DCot\",\"type\":\"function\",\"function\":{\"name\":\"RetrieveIndexMetaTool\",\"arguments\":\"\"}}]","is_last":false}}]}]} |
| 247 | +
|
| 248 | +data: {"inference_results":[{"output":[{"name":"memory_id","result":"LvU1iJkBCzHrriq5hXbN"},{"name":"parent_interaction_id","result":"L_U1iJkBCzHrriq5hXbs"},{"name":"response","dataAsMap":{"content":"[{\"index\":0.0,\"function\":{\"arguments\":\"{}\"}}]","is_last":false}}]}]} |
| 249 | +
|
| 250 | +data: {"inference_results":[{"output":[{"name":"memory_id","result":"LvU1iJkBCzHrriq5hXbN"},{"name":"parent_interaction_id","result":"L_U1iJkBCzHrriq5hXbs"},{"name":"response","dataAsMap":{"content":"{\"choices\":[{\"message\":{\"tool_calls\":[{\"type\":\"function\",\"function\":{\"name\":\"RetrieveIndexMetaTool\",\"arguments\":\"{}\"},\"id\":\"call_HjpbrbdQFHK0omPYa6m2DCot\"}]},\"finish_reason\":\"tool_calls\"}]}","is_last":false}}]}]} |
| 251 | +
|
| 252 | +data: {"inference_results":[{"output":[{"name":"memory_id","result":"LvU1iJkBCzHrriq5hXbN"},{"name":"parent_interaction_id","result":"L_U1iJkBCzHrriq5hXbs"},{"name":"response","dataAsMap":{"content":"","is_last":false}}]}]} |
| 253 | +
|
| 254 | +data: {"inference_results":[{"output":[{"name":"memory_id","result":"LvU1iJkBCzHrriq5hXbN"},{"name":"parent_interaction_id","result":"L_U1iJkBCzHrriq5hXbs"},{"name":"response","dataAsMap":{"content":"row,health,status,index,uuid,pri(number of primary shards),rep(number of replica shards),docs.count(number of available documents),docs.deleted(number of deleted documents),store.size(store size of primary and replica shards),pri.store.size(store size of primary shards)\n1,green,open,.plugins-ml-model-group,Msb1Y4W5QeiLs5yUQi-VRg,1,1,2,0,17.1kb,5.9kb\n2,green,open,.plugins-ml-memory-message,1IWd1HPeSWmM29qE6rcj_A,1,1,658,0,636.4kb,313.5kb\n3,green,open,.plugins-ml-memory-meta,OETb21fqQJa3Y2hGQbknCQ,1,1,267,7,188kb,93.9kb\n4,green,open,.plugins-ml-config,0mnOWX5gSX2s-yP27zPFNw,1,1,1,0,8.1kb,4kb\n5,green,open,.plugins-ml-model,evYOOKN4QPqtmUjxsDwJYA,1,1,5,5,421.5kb,210.7kb\n6,green,open,.plugins-ml-agent,I0SpBovjT3C6NABCBzGiiQ,1,1,6,0,205.5kb,111.3kb\n7,green,open,.plugins-ml-task,_Urzn9gdSuCRqUaYAFaD_Q,1,1,100,4,136.1kb,45.3kb\n8,green,open,top_queries-2025.09.26-00444,jb7Q1FiLSl-wTxjdSUKs_w,1,1,1736,126,1.8mb,988kb\n9,green,open,.plugins-ml-connector,YaJORo4jT0Ksp24L5cW1uA,1,1,2,0,97.8kb,48.9kb\n","is_last":false}}]}]} |
| 255 | +
|
| 256 | +data: {"inference_results":[{"output":[{"name":"memory_id","result":"LvU1iJkBCzHrriq5hXbN"},{"name":"parent_interaction_id","result":"L_U1iJkBCzHrriq5hXbs"},{"name":"response","dataAsMap":{"content":"There","is_last":false}}]}]} |
| 257 | +
|
| 258 | +data: {"inference_results":[{"output":[{"name":"memory_id","result":"LvU1iJkBCzHrriq5hXbN"},{"name":"parent_interaction_id","result":"L_U1iJkBCzHrriq5hXbs"},{"name":"response","dataAsMap":{"content":" are","is_last":false}}]}]} |
| 259 | +
|
| 260 | +data: {"inference_results":[{"output":[{"name":"memory_id","result":"LvU1iJkBCzHrriq5hXbN"},{"name":"parent_interaction_id","result":"L_U1iJkBCzHrriq5hXbs"},{"name":"response","dataAsMap":{"content":" ","is_last":false}}]}]} |
| 261 | +
|
| 262 | +data: {"inference_results":[{"output":[{"name":"memory_id","result":"LvU1iJkBCzHrriq5hXbN"},{"name":"parent_interaction_id","result":"L_U1iJkBCzHrriq5hXbs"},{"name":"response","dataAsMap":{"content":"9","is_last":false}}]}]} |
| 263 | +
|
| 264 | +data: {"inference_results":[{"output":[{"name":"memory_id","result":"LvU1iJkBCzHrriq5hXbN"},{"name":"parent_interaction_id","result":"L_U1iJkBCzHrriq5hXbs"},{"name":"response","dataAsMap":{"content":" indices","is_last":false}}]}]} |
| 265 | +
|
| 266 | +data: {"inference_results":[{"output":[{"name":"memory_id","result":"LvU1iJkBCzHrriq5hXbN"},{"name":"parent_interaction_id","result":"L_U1iJkBCzHrriq5hXbs"},{"name":"response","dataAsMap":{"content":" in","is_last":false}}]}]} |
| 267 | +
|
| 268 | +data: {"inference_results":[{"output":[{"name":"memory_id","result":"LvU1iJkBCzHrriq5hXbN"},{"name":"parent_interaction_id","result":"L_U1iJkBCzHrriq5hXbs"},{"name":"response","dataAsMap":{"content":" your","is_last":false}}]}]} |
| 269 | +
|
| 270 | +data: {"inference_results":[{"output":[{"name":"memory_id","result":"LvU1iJkBCzHrriq5hXbN"},{"name":"parent_interaction_id","result":"L_U1iJkBCzHrriq5hXbs"},{"name":"response","dataAsMap":{"content":" cluster","is_last":false}}]}]} |
| 271 | +
|
| 272 | +data: {"inference_results":[{"output":[{"name":"memory_id","result":"LvU1iJkBCzHrriq5hXbN"},{"name":"parent_interaction_id","result":"L_U1iJkBCzHrriq5hXbs"},{"name":"response","dataAsMap":{"content":".","is_last":false}}]}]} |
| 273 | +
|
| 274 | +data: {"inference_results":[{"output":[{"name":"memory_id","result":"LvU1iJkBCzHrriq5hXbN"},{"name":"parent_interaction_id","result":"L_U1iJkBCzHrriq5hXbs"},{"name":"response","dataAsMap":{"content":"","is_last":true}}]}]} |
| 275 | +``` |
| 276 | + |
| 277 | +## Response body fields |
| 278 | + |
| 279 | +The following table lists all response body fields. |
| 280 | + |
| 281 | +| Field | Data type | Description | |
| 282 | +| :--- | :--- |:------------------------------------------------------------------------------------------------------------| |
| 283 | +| `inference_results` | Array | Contains the streaming response data returned by the agent. | |
| 284 | +| `inference_results.output` | Array | Contains output objects for each inference result. | |
| 285 | +| `inference_results.output.name` | String | The name of the output field. Can be `memory_id`, `parent_interaction_id`, or `response`. | |
| 286 | +| `inference_results.output.result` | String | The values of the `memory_id` and `parent_interaction_id` fields. | |
| 287 | +| `inference_results.output.dataAsMap` | Object | Contains the response content and metadata (present only for a `response` output). | |
| 288 | +| `inference_results.output.dataAsMap.content` | String | The agent's response content, which can include tool calls, tool results, or final text output. | |
| 289 | +| `inference_results.output.dataAsMap.is_last` | Boolean | Indicates whether this is the final chunk in the stream: `true` for the last chunk, `false` if there are more chunks. | |
0 commit comments