Skip to content

Commit

Permalink
feat: add py bindings for http service
Browse files Browse the repository at this point in the history
  • Loading branch information
biswapanda committed Feb 24, 2025
1 parent 3a3b339 commit c4f4dc9
Show file tree
Hide file tree
Showing 4 changed files with 219 additions and 0 deletions.
6 changes: 6 additions & 0 deletions python-wheel/python/triton_distributed_rs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,14 @@
from typing import Any, AsyncGenerator, Callable, Type

from pydantic import BaseModel, ValidationError

# List all the classes in the _core module for re-export
# import * causes "unable to detect undefined names"
from triton_distributed_rs._core import Backend as Backend
from triton_distributed_rs._core import DistributedRuntime
from triton_distributed_rs._core import HttpAsyncEngine as HttpAsyncEngine
from triton_distributed_rs._core import HttpError as HttpError
from triton_distributed_rs._core import HttpService as HttpService
from triton_distributed_rs._core import KvRouter as KvRouter
from triton_distributed_rs._core import ModelDeploymentCard as ModelDeploymentCard
from triton_distributed_rs._core import OAIChatPreprocessor as OAIChatPreprocessor
Expand Down
21 changes: 21 additions & 0 deletions python-wheel/python/triton_distributed_rs/_core.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,27 @@ class Client:
"""
...

class HttpService:
"""
A HTTP service for a distributed NOVA applications.
The primary purpose of the service is to serve OpenAI compatible HTTP endpoints.
It is meant to be a gateway/ingress into the Nova LLM Distributed Runtime.
"""
...
class HttpError:
"""
An error that occurred in the HTTP service
"""
...

class HttpAsyncEngine:
"""
An async engine for a distributed NOVA http service. This is an extension of the
python based AsyncEngine that handles HttpError exceptions from Python and
converts them to the Rust version of HttpError
"""
...

class KvRouter:
"""
The runtime object for a distributed NOVA applications
Expand Down
188 changes: 188 additions & 0 deletions python-wheel/rust/http.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use pyo3::{exceptions::PyException, prelude::*};

use crate::{engine::*, to_pyerr, CancellationToken};

pub use triton_llm::http::service::{error, service_v2};

pub use triton_distributed::{
// self as rs,
// error,
pipeline::{async_trait, AsyncEngine, Data, ManyOut, SingleIn},
protocols::annotated::Annotated,
Error,
Result,
};

#[pyclass]
pub struct HttpService {
inner: service_v2::HttpService,
}

#[pymethods]
impl HttpService {
#[new]
#[pyo3(signature = (port=None))]
pub fn new(port: Option<u16>) -> PyResult<Self> {
let builder = service_v2::HttpService::builder().port(port.unwrap_or(8080));
let inner = builder.build().map_err(to_pyerr)?;
Ok(Self { inner })
}

pub fn add_completions_model(&self, model: String, engine: HttpAsyncEngine) -> PyResult<()> {
let engine = Arc::new(engine);
self.inner
.model_manager()
.add_completions_model(&model, engine)
.map_err(to_pyerr)
}

pub fn add_chat_completions_model(
&self,
model: String,
engine: HttpAsyncEngine,
) -> PyResult<()> {
let engine = Arc::new(engine);
self.inner
.model_manager()
.add_chat_completions_model(&model, engine)
.map_err(to_pyerr)
}

pub fn remove_completions_model(&self, model: String) -> PyResult<()> {
self.inner
.model_manager()
.remove_completions_model(&model)
.map_err(to_pyerr)
}

pub fn remove_chat_completions_model(&self, model: String) -> PyResult<()> {
self.inner
.model_manager()
.remove_chat_completions_model(&model)
.map_err(to_pyerr)
}

pub fn list_chat_completions_models(&self) -> PyResult<Vec<String>> {
Ok(self.inner.model_manager().list_chat_completions_models())
}

pub fn list_completions_models(&self) -> PyResult<Vec<String>> {
Ok(self.inner.model_manager().list_completions_models())
}

fn run<'p>(&self, py: Python<'p>, token: CancellationToken) -> PyResult<Bound<'p, PyAny>> {
let service = self.inner.clone();
pyo3_async_runtimes::tokio::future_into_py(py, async move {
service.run(token.inner).await.map_err(to_pyerr)?;
Ok(())
})
}
}

/// Python Exception for HTTP errors
#[pyclass(extends=PyException)]
pub struct HttpError {
code: u16,
message: String,
}

#[pymethods]
impl HttpError {
#[new]
pub fn new(code: u16, message: String) -> Self {
HttpError { code, message }
}

#[getter]
fn code(&self) -> u16 {
self.code
}

#[getter]
fn message(&self) -> &str {
&self.message
}
}

#[pyclass]
#[derive(Clone)]
pub struct HttpAsyncEngine(pub PythonAsyncEngine);

impl From<PythonAsyncEngine> for HttpAsyncEngine {
fn from(engine: PythonAsyncEngine) -> Self {
Self(engine)
}
}

#[pymethods]
impl HttpAsyncEngine {
/// Create a new instance of the HttpAsyncEngine
/// This is a simple extension of the PythonAsyncEngine that handles HttpError
/// exceptions from Python and converts them to the Rust version of HttpError
///
/// # Arguments
/// - `generator`: a Python async generator that will be used to generate responses
/// - `event_loop`: the Python event loop that will be used to run the generator
///
/// Note: In Rust land, the request and the response are both concrete; however, in
/// Python land, the request and response are not strongly typed, meaning the generator
/// could accept a different type of request or return a different type of response
/// and we would not know until runtime.
#[new]
pub fn new(generator: PyObject, event_loop: PyObject) -> PyResult<Self> {
Ok(PythonAsyncEngine::new(generator, event_loop)?.into())
}
}

#[async_trait]
impl<Req, Resp> AsyncEngine<SingleIn<Req>, ManyOut<Annotated<Resp>>, Error> for HttpAsyncEngine
where
Req: Data + Serialize,
Resp: Data + for<'de> Deserialize<'de>,
{
async fn generate(&self, request: SingleIn<Req>) -> Result<ManyOut<Annotated<Resp>>, Error> {
match self.0.generate(request).await {
Ok(res) => Ok(res),

// Inspect the error - if it was an HttpError from Python, extract the code and message
// and return the rust version of HttpError
Err(e) => {
if let Some(py_err) = e.downcast_ref::<PyErr>() {
Python::with_gil(|py| {
if let Ok(http_error_instance) = py_err
.clone_ref(py)
.into_value(py)
.extract::<PyRef<HttpError>>(py)
{
Err(error::HttpError {
code: http_error_instance.code,
message: http_error_instance.message.clone(),
})?
} else {
Err(error!("Python Error: {}", py_err.to_string()))
}
})
} else {
Err(e)
}
}
}
}
}
4 changes: 4 additions & 0 deletions python-wheel/rust/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use triton_distributed::{
use triton_llm::{self as llm_rs};

mod engine;
mod http;
mod llm;

type JsonServerStreamingIngress =
Expand Down Expand Up @@ -63,6 +64,9 @@ fn _core(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<Endpoint>()?;
m.add_class::<Client>()?;
m.add_class::<AsyncResponseStream>()?;
m.add_class::<http::HttpService>()?;
m.add_class::<http::HttpError>()?;
m.add_class::<http::HttpAsyncEngine>()?;
m.add_class::<llm::kv::KvRouter>()?;
m.add_class::<llm::model_card::ModelDeploymentCard>()?;
m.add_class::<llm::preprocessor::OAIChatPreprocessor>()?;
Expand Down

0 comments on commit c4f4dc9

Please sign in to comment.