Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Included Prometheus interceptor support for gRPC streaming #1858

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 39 additions & 20 deletions docs/examples/streaming/README.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
},
{
"cell_type": "code",
"execution_count": 2,
"execution_count": 1,
"metadata": {},
"outputs": [
{
Expand Down Expand Up @@ -121,7 +121,7 @@
},
{
"cell_type": "code",
"execution_count": 4,
"execution_count": 2,
"metadata": {},
"outputs": [
{
Expand All @@ -138,8 +138,7 @@
"{\n",
" \"debug\": false,\n",
" \"parallel_workers\": 0,\n",
" \"gzip_enabled\": false,\n",
" \"metrics_endpoint\": null\n",
" \"gzip_enabled\": false\n",
"}\n"
]
},
Expand All @@ -150,8 +149,7 @@
"Note the currently there are three main limitations of the streaming support in MLServer:\n",
"\n",
"- distributed workers are not supported (i.e., the `parallel_workers` setting should be set to `0`)\n",
"- `gzip` middleware is not supported for REST (i.e., `gzip_enabled` setting should be set to `false`)\n",
"- metrics endpoint is not available (i.e. `metrics_endpoint` is also disabled for streaming for gRPC)"
"- `gzip` middleware is not supported for REST (i.e., `gzip_enabled` setting should be set to `false`)"
]
},
{
Expand All @@ -163,7 +161,7 @@
},
{
"cell_type": "code",
"execution_count": 5,
"execution_count": 3,
"metadata": {},
"outputs": [
{
Expand Down Expand Up @@ -227,14 +225,14 @@
},
{
"cell_type": "code",
"execution_count": 6,
"execution_count": 4,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Writing generate-request.json\n"
"Overwriting generate-request.json\n"
]
}
],
Expand Down Expand Up @@ -272,9 +270,22 @@
},
{
"cell_type": "code",
"execution_count": null,
"execution_count": 5,
"metadata": {},
"outputs": [],
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"['What']\n",
"[' is']\n",
"[' the']\n",
"[' capital']\n",
"[' of']\n",
"[' France?']\n"
]
}
],
"source": [
"import httpx\n",
"from httpx_sse import connect_sse\n",
Expand All @@ -301,9 +312,22 @@
},
{
"cell_type": "code",
"execution_count": null,
"execution_count": 6,
"metadata": {},
"outputs": [],
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"['What']\n",
"[' is']\n",
"[' the']\n",
"[' capital']\n",
"[' of']\n",
"[' France?']\n"
]
}
],
"source": [
"import grpc\n",
"import mlserver.types as types\n",
Expand All @@ -315,7 +339,7 @@
"inference_request = types.InferenceRequest.parse_file(\"./generate-request.json\")\n",
"\n",
"# need to convert from string to bytes for grpc\n",
"inference_request.inputs[0] = StringCodec.encode_input(\"prompt\", inference_request.inputs[0].data.__root__)\n",
"inference_request.inputs[0] = StringCodec.encode_input(\"prompt\", inference_request.inputs[0].data.root)\n",
"inference_request_g = converters.ModelInferRequestConverter.from_types(\n",
" inference_request, model_name=\"text-model\", model_version=None\n",
")\n",
Expand All @@ -338,11 +362,6 @@
"source": [
"Note that for gRPC, the request is transformed into an async generator which is then passed to the `ModelStreamInfer` method. The response is also an async generator which can be iterated over to get the response."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": []
}
],
"metadata": {
Expand All @@ -361,7 +380,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.14"
"version": "3.10.12"
}
},
"nbformat": 4,
Expand Down
8 changes: 2 additions & 6 deletions docs/examples/streaming/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,7 @@ The next step will be to create 2 configuration files:
{
"debug": false,
"parallel_workers": 0,
"gzip_enabled": false,
"metrics_endpoint": null
"gzip_enabled": false
}

```
Expand All @@ -88,7 +87,6 @@ Note the currently there are three main limitations of the streaming support in

- distributed workers are not supported (i.e., the `parallel_workers` setting should be set to `0`)
- `gzip` middleware is not supported for REST (i.e., `gzip_enabled` setting should be set to `false`)
- metrics endpoint is not available (i.e. `metrics_endpoint` is also disabled for streaming for gRPC)

#### model-settings.json

Expand Down Expand Up @@ -195,7 +193,7 @@ import mlserver.grpc.dataplane_pb2_grpc as dataplane
inference_request = types.InferenceRequest.parse_file("./generate-request.json")

# need to convert from string to bytes for grpc
inference_request.inputs[0] = StringCodec.encode_input("prompt", inference_request.inputs[0].data.__root__)
inference_request.inputs[0] = StringCodec.encode_input("prompt", inference_request.inputs[0].data.root)
inference_request_g = converters.ModelInferRequestConverter.from_types(
inference_request, model_name="text-model", model_version=None
)
Expand All @@ -213,5 +211,3 @@ async with grpc.aio.insecure_channel("localhost:8081") as grpc_channel:
```

Note that for gRPC, the request is transformed into an async generator which is then passed to the `ModelStreamInfer` method. The response is also an async generator which can be iterated over to get the response.


3 changes: 1 addition & 2 deletions docs/examples/streaming/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,5 @@
{
"debug": false,
"parallel_workers": 0,
"gzip_enabled": false,
"metrics_endpoint": null
"gzip_enabled": false
}
13 changes: 0 additions & 13 deletions docs/examples/streaming/text_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,6 @@

class TextModel(MLModel):

async def predict(self, payload: InferenceRequest) -> InferenceResponse:
text = StringCodec.decode_input(payload.inputs[0])[0]
return InferenceResponse(
model_name=self._settings.name,
outputs=[
StringCodec.encode_output(
name="output",
payload=[text],
use_bytes=True,
),
],
)

async def predict_stream(
self, payloads: AsyncIterator[InferenceRequest]
) -> AsyncIterator[InferenceResponse]:
Expand Down
1 change: 0 additions & 1 deletion docs/user-guide/streaming.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,3 @@ There are three main limitations of the streaming support in MLServer:

- the `parallel_workers` setting should be set to `0` to disable distributed workers (to be addressed in future releases)
- for REST, the `gzip_enabled` setting should be set to `false` to disable GZIP compression, as streaming is not compatible with GZIP compression (see issue [here]( https://github.com/encode/starlette/issues/20#issuecomment-704106436))
- `metrics_endpoint` is also disabled for streaming for gRPC (to be addressed in future releases)
Loading