Skip to content
This repository was archived by the owner on Oct 22, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion clients/rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ anyhow = "1.0"
base64 = "0.22.1"
eventsource-client = "0.14.0"
futures-util = "0.3.31"
reqwest = "0.12.12"
reqwest = { version = "0.12.12", features = ["json"] }
serde = { version = "1.0", features = ["derive"] }
serde_cbor = "0.11.2"
serde_json = "1.0"
Expand Down
47 changes: 28 additions & 19 deletions clients/rust/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ use anyhow::Result;
use serde_json::{Value as JsonValue};

use crate::{
common::{resolve_actor_id, ActorKey, EncodingKind, TransportKind},
common::{ActorKey, EncodingKind, TransportKind},
handle::ActorHandle,
protocol::query::*
protocol::query::*,
remote_manager::RemoteManager,
};

#[derive(Default)]
Expand Down Expand Up @@ -35,7 +36,7 @@ pub struct CreateOptions {


pub struct Client {
manager_endpoint: String,
remote_manager: RemoteManager,
encoding_kind: EncodingKind,
transport_kind: TransportKind,
shutdown_tx: Arc<tokio::sync::broadcast::Sender<()>>,
Expand All @@ -48,7 +49,21 @@ impl Client {
encoding_kind: EncodingKind,
) -> Self {
Self {
manager_endpoint: manager_endpoint.to_string(),
remote_manager: RemoteManager::new(manager_endpoint, None),
encoding_kind,
transport_kind,
shutdown_tx: Arc::new(tokio::sync::broadcast::channel(1).0)
}
}

pub fn new_with_token(
manager_endpoint: &str,
token: String,
transport_kind: TransportKind,
encoding_kind: EncodingKind,
) -> Self {
Self {
remote_manager: RemoteManager::new(manager_endpoint, Some(token)),
encoding_kind,
transport_kind,
shutdown_tx: Arc::new(tokio::sync::broadcast::channel(1).0)
Expand All @@ -61,7 +76,7 @@ impl Client {
query: ActorQuery
) -> ActorHandle {
let handle = ActorHandle::new(
&self.manager_endpoint,
self.remote_manager.clone(),
params,
query,
self.shutdown_tx.clone(),
Expand Down Expand Up @@ -95,11 +110,13 @@ impl Client {

pub fn get_for_id(
&self,
name: &str,
actor_id: &str,
opts: GetOptions
) -> Result<ActorHandle> {
let actor_query = ActorQuery::GetForId {
get_for_id: GetForIdRequest {
name: name.to_string(),
actor_id: actor_id.to_string(),
}
};
Expand Down Expand Up @@ -145,25 +162,17 @@ impl Client {
opts: CreateOptions
) -> Result<ActorHandle> {
let input = opts.input;
let region = opts.region;
let _region = opts.region;

let create_query = ActorQuery::Create {
create: CreateRequest {
name: name.to_string(),
key,
input,
region
}
};

let actor_id = resolve_actor_id(
&self.manager_endpoint,
create_query,
self.encoding_kind
let actor_id = self.remote_manager.create_actor(
name,
&key,
input,
).await?;

let get_query = ActorQuery::GetForId {
get_for_id: GetForIdRequest {
name: name.to_string(),
actor_id,
}
};
Expand Down
195 changes: 31 additions & 164 deletions clients/rust/src/common.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,37 @@
use anyhow::Result;
use reqwest::{header::USER_AGENT, RequestBuilder};
use serde::{de::DeserializeOwned, Serialize};
use serde_json::{json, Value as JsonValue};
use tracing::debug;

use crate::protocol::query::ActorQuery;

#[allow(dead_code)]
pub const VERSION: &str = env!("CARGO_PKG_VERSION");
pub const USER_AGENT_VALUE: &str = concat!("ActorClient-Rust/", env!("CARGO_PKG_VERSION"));

pub const HEADER_ACTOR_QUERY: &str = "X-AC-Query";
pub const HEADER_ENCODING: &str = "X-AC-Encoding";
pub const HEADER_CONN_PARAMS: &str = "X-AC-Conn-Params";
pub const HEADER_ACTOR_ID: &str = "X-AC-Actor";
pub const HEADER_CONN_ID: &str = "X-AC-Conn";
pub const HEADER_CONN_TOKEN: &str = "X-AC-Conn-Token";
// Headers
#[allow(dead_code)]
pub const HEADER_ACTOR_QUERY: &str = "x-rivet-query";
pub const HEADER_ENCODING: &str = "x-rivet-encoding";
pub const HEADER_CONN_PARAMS: &str = "x-rivet-conn-params";
#[allow(dead_code)]
pub const HEADER_ACTOR_ID: &str = "x-rivet-actor";
#[allow(dead_code)]
pub const HEADER_CONN_ID: &str = "x-rivet-conn";
#[allow(dead_code)]
pub const HEADER_CONN_TOKEN: &str = "x-rivet-conn-token";

// Gateway headers
pub const HEADER_RIVET_TARGET: &str = "x-rivet-target";
pub const HEADER_RIVET_ACTOR: &str = "x-rivet-actor";
pub const HEADER_RIVET_TOKEN: &str = "x-rivet-token";

// Paths
pub const PATH_CONNECT_WEBSOCKET: &str = "/connect/websocket";

// WebSocket protocol prefixes
pub const WS_PROTOCOL_STANDARD: &str = "rivet";
pub const WS_PROTOCOL_TARGET: &str = "rivet_target.";
pub const WS_PROTOCOL_ACTOR: &str = "rivet_actor.";
pub const WS_PROTOCOL_ENCODING: &str = "rivet_encoding.";
pub const WS_PROTOCOL_CONN_PARAMS: &str = "rivet_conn_params.";
pub const WS_PROTOCOL_CONN_ID: &str = "rivet_conn.";
pub const WS_PROTOCOL_CONN_TOKEN: &str = "rivet_conn_token.";
pub const WS_PROTOCOL_TOKEN: &str = "rivet_token.";

#[derive(Debug, Clone, Copy)]
pub enum TransportKind {
Expand Down Expand Up @@ -46,154 +63,4 @@ impl ToString for EncodingKind {


// Max size of each entry is 128 bytes
pub type ActorKey = Vec<String>;

pub struct HttpRequestOptions<'a, T: Serialize> {
pub method: &'a str,
pub url: &'a str,
pub headers: Vec<(&'a str, String)>,
pub body: Option<T>,
pub encoding_kind: EncodingKind
}

impl<'a, T: Serialize> Default for HttpRequestOptions<'a, T> {
fn default() -> Self {
Self {
method: "GET",
url: "",
headers: Vec::new(),
body: None,
encoding_kind: EncodingKind::Json
}
}
}

fn build_http_request<RQ>(opts: &HttpRequestOptions<RQ>) -> Result<RequestBuilder>
where
RQ: Serialize
{
let client = reqwest::Client::new();
let mut req = client.request(
reqwest::Method::from_bytes(opts.method.as_bytes()).unwrap(),
opts.url,
);

for (key, value) in &opts.headers {
req = req.header(*key, value);
}

if opts.method == "POST" || opts.method == "PUT" {
let Some(body) = &opts.body else {
return Err(anyhow::anyhow!("Body is required for POST/PUT requests"));
};

match opts.encoding_kind {
EncodingKind::Json => {
req = req.header("Content-Type", "application/json");
let body = serde_json::to_string(&body)?;
req = req.body(body);
}
EncodingKind::Cbor => {
req = req.header("Content-Type", "application/octet-stream");
let body =serde_cbor::to_vec(&body)?;
req = req.body(body);
}
}
};

req = req.header(USER_AGENT, USER_AGENT_VALUE);

Ok(req)
}

async fn send_http_request_raw(req: reqwest::RequestBuilder) -> Result<reqwest::Response> {
let res = req.send().await?;

if !res.status().is_success() {
// TODO: Decode
/*
let data: Option<RpcResponseError> = match opts.encoding_kind {
EncodingKind::Json => {
let data = res.text().await?;

serde_json::from_str::<RpcResponseError>(&data).ok()
}
EncodingKind::Cbor => {
let data = res.bytes().await?;

serde_cbor::from_slice(&data).ok()
}
};

match data {
Some(data) => {
return Err(anyhow::anyhow!(
"HTTP request failed with status: {}, error: {}",
res.status(),
data.m
));
},
None => {

}
}
*/
return Err(anyhow::anyhow!(
"HTTP request failed with status: {}",
res.status()
));
}

Ok(res)
}

pub async fn send_http_request<'a, RQ, RS>(opts: HttpRequestOptions<'a, RQ>) -> Result<RS>
where
RQ: Serialize,
RS: DeserializeOwned,
{
let req = build_http_request(&opts)?;
let res = send_http_request_raw(req).await?;

let res: RS = match opts.encoding_kind {
EncodingKind::Json => {
let data = res.text().await?;
serde_json::from_str(&data)?
}
EncodingKind::Cbor => {
let bytes = res.bytes().await?;
serde_cbor::from_slice(&bytes)?
}
};

Ok(res)
}


pub async fn resolve_actor_id(
manager_endpoint: &str,
query: ActorQuery,
encoding_kind: EncodingKind
) -> Result<String> {
#[derive(serde::Serialize, serde::Deserialize)]
struct ResolveResponse {
i: String,
}

let query = serde_json::to_string(&query)?;

let res = send_http_request::<JsonValue, ResolveResponse>(
HttpRequestOptions {
method: "POST",
url: &format!("{}/actors/resolve", manager_endpoint),
headers: vec![
(HEADER_ENCODING, encoding_kind.to_string()),
(HEADER_ACTOR_QUERY, query),
],
body: Some(json!({})),
encoding_kind,
}
).await?;

Ok(res.i)
}
pub type ActorKey = Vec<String>;
Loading
Loading