diff --git a/api-server/README.md b/api-server/README.md index 704b722d3..16cb5e741 100644 --- a/api-server/README.md +++ b/api-server/README.md @@ -15,7 +15,7 @@ To see how to make this your own, look here: [README]((https://openapi-generator.tech)) - API version: 0.9.0 -- Build date: 2024-01-24T14:39:40.257161505-07:00[America/Denver] +- Build date: 2024-01-28T18:51:47.756188-05:00[America/New_York] @@ -62,6 +62,7 @@ cargo run --example server To run a client, follow one of the following simple steps: ``` +cargo run --example client InterestsSortKeySortValuePost cargo run --example client LivenessGet cargo run --example client SubscribeSortKeySortValueGet cargo run --example client VersionPost @@ -99,6 +100,7 @@ All URIs are relative to */ceramic* Method | HTTP request | Description ------------- | ------------- | ------------- [****](docs/default_api.md#) | **POST** /events | Creates a new event +[****](docs/default_api.md#) | **POST** /interests/{sort_key}/{sort_value} | Register interest for a sort key [****](docs/default_api.md#) | **GET** /liveness | Test the liveness of the Ceramic node [****](docs/default_api.md#) | **GET** /subscribe/{sort_key}/{sort_value} | Get events for a stream [****](docs/default_api.md#) | **POST** /version | Get the version of the Ceramic node diff --git a/api-server/api/openapi.yaml b/api-server/api/openapi.yaml index d477b6f84..aa996e12f 100644 --- a/api-server/api/openapi.yaml +++ b/api-server/api/openapi.yaml @@ -96,6 +96,45 @@ paths: "204": description: success summary: Creates a new event + /interests/{sort_key}/{sort_value}: + post: + parameters: + - description: name of the sort_key + explode: false + in: path + name: sort_key + required: true + schema: + type: string + style: simple + - description: value associated with the sort key + explode: false + in: path + name: sort_value + required: true + schema: + type: string + style: simple + - description: the controller to register interest for + explode: true + in: query + name: controller + required: false + schema: + type: string + style: form + - description: the stream to register interest for + explode: true + in: query + name: streamId + required: false + schema: + type: string + style: form + responses: + "204": + description: success + summary: Register interest for a sort key components: requestBodies: Event: diff --git a/api-server/docs/default_api.md b/api-server/docs/default_api.md index 9ac9ca080..528ac0668 100644 --- a/api-server/docs/default_api.md +++ b/api-server/docs/default_api.md @@ -5,6 +5,7 @@ All URIs are relative to */ceramic* Method | HTTP request | Description ------------- | ------------- | ------------- ****](default_api.md#) | **POST** /events | Creates a new event +****](default_api.md#) | **POST** /interests/{sort_key}/{sort_value} | Register interest for a sort key ****](default_api.md#) | **GET** /liveness | Test the liveness of the Ceramic node ****](default_api.md#) | **GET** /subscribe/{sort_key}/{sort_value} | Get events for a stream ****](default_api.md#) | **POST** /version | Get the version of the Ceramic node @@ -35,6 +36,43 @@ No authorization required [[Back to top]](#) [[Back to API list]](../README.md#documentation-for-api-endpoints) [[Back to Model list]](../README.md#documentation-for-models) [[Back to README]](../README.md) +# **** +> (sort_key, sort_value, optional) +Register interest for a sort key + +### Required Parameters + +Name | Type | Description | Notes +------------- | ------------- | ------------- | ------------- + **sort_key** | **String**| name of the sort_key | + **sort_value** | **String**| value associated with the sort key | + **optional** | **map[string]interface{}** | optional parameters | nil if no parameters + +### Optional Parameters +Optional parameters are passed through a map[string]interface{}. + +Name | Type | Description | Notes +------------- | ------------- | ------------- | ------------- + **sort_key** | **String**| name of the sort_key | + **sort_value** | **String**| value associated with the sort key | + **controller** | **String**| the controller to register interest for | + **stream_id** | **String**| the stream to register interest for | + +### Return type + + (empty response body) + +### Authorization + +No authorization required + +### HTTP request headers + + - **Content-Type**: Not defined + - **Accept**: Not defined + +[[Back to top]](#) [[Back to API list]](../README.md#documentation-for-api-endpoints) [[Back to Model list]](../README.md#documentation-for-models) [[Back to README]](../README.md) + # **** > () Test the liveness of the Ceramic node diff --git a/api-server/examples/client/main.rs b/api-server/examples/client/main.rs index b5598ba86..2bbb4f0e1 100644 --- a/api-server/examples/client/main.rs +++ b/api-server/examples/client/main.rs @@ -2,7 +2,8 @@ #[allow(unused_imports)] use ceramic_api_server::{ - models, Api, ApiNoContext, Client, ContextWrapperExt, EventsPostResponse, LivenessGetResponse, + models, Api, ApiNoContext, Client, ContextWrapperExt, EventsPostResponse, + InterestsSortKeySortValuePostResponse, LivenessGetResponse, SubscribeSortKeySortValueGetResponse, VersionPostResponse, }; use clap::{App, Arg}; @@ -32,7 +33,12 @@ fn main() { .arg( Arg::with_name("operation") .help("Sets the operation to run") - .possible_values(&["LivenessGet", "SubscribeSortKeySortValueGet", "VersionPost"]) + .possible_values(&[ + "InterestsSortKeySortValuePost", + "LivenessGet", + "SubscribeSortKeySortValueGet", + "VersionPost", + ]) .required(true) .index(1), ) @@ -95,6 +101,19 @@ fn main() { info!("{:?} (X-Span-ID: {:?})", result, (client.context() as &dyn Has).get().clone()); }, */ + Some("InterestsSortKeySortValuePost") => { + let result = rt.block_on(client.interests_sort_key_sort_value_post( + "sort_key_example".to_string(), + "sort_value_example".to_string(), + Some("controller_example".to_string()), + Some("stream_id_example".to_string()), + )); + info!( + "{:?} (X-Span-ID: {:?})", + result, + (client.context() as &dyn Has).get().clone() + ); + } Some("LivenessGet") => { let result = rt.block_on(client.liveness_get()); info!( diff --git a/api-server/examples/server/server.rs b/api-server/examples/server/server.rs index 8918b4772..476a3e581 100644 --- a/api-server/examples/server/server.rs +++ b/api-server/examples/server/server.rs @@ -101,8 +101,8 @@ impl Server { use ceramic_api_server::server::MakeService; use ceramic_api_server::{ - Api, EventsPostResponse, LivenessGetResponse, SubscribeSortKeySortValueGetResponse, - VersionPostResponse, + Api, EventsPostResponse, InterestsSortKeySortValuePostResponse, LivenessGetResponse, + SubscribeSortKeySortValueGetResponse, VersionPostResponse, }; use std::error::Error; use swagger::ApiError; @@ -126,6 +126,26 @@ where Err(ApiError("Generic failure".into())) } + /// Register interest for a sort key + async fn interests_sort_key_sort_value_post( + &self, + sort_key: String, + sort_value: String, + controller: Option, + stream_id: Option, + context: &C, + ) -> Result { + info!( + "interests_sort_key_sort_value_post(\"{}\", \"{}\", {:?}, {:?}) - X-Span-ID: {:?}", + sort_key, + sort_value, + controller, + stream_id, + context.get().0.clone() + ); + Err(ApiError("Generic failure".into())) + } + /// Test the liveness of the Ceramic node async fn liveness_get(&self, context: &C) -> Result { info!("liveness_get() - X-Span-ID: {:?}", context.get().0.clone()); diff --git a/api-server/src/client/mod.rs b/api-server/src/client/mod.rs index 375e33c3b..2d30226dd 100644 --- a/api-server/src/client/mod.rs +++ b/api-server/src/client/mod.rs @@ -42,8 +42,8 @@ const FRAGMENT_ENCODE_SET: &AsciiSet = &percent_encoding::CONTROLS const ID_ENCODE_SET: &AsciiSet = &FRAGMENT_ENCODE_SET.add(b'|'); use crate::{ - Api, EventsPostResponse, LivenessGetResponse, SubscribeSortKeySortValueGetResponse, - VersionPostResponse, + Api, EventsPostResponse, InterestsSortKeySortValuePostResponse, LivenessGetResponse, + SubscribeSortKeySortValueGetResponse, VersionPostResponse, }; /// Convert input into a base path, e.g. "http://example:123". Also checks the scheme as it goes. @@ -479,6 +479,92 @@ where } } + async fn interests_sort_key_sort_value_post( + &self, + param_sort_key: String, + param_sort_value: String, + param_controller: Option, + param_stream_id: Option, + context: &C, + ) -> Result { + let mut client_service = self.client_service.clone(); + let mut uri = format!( + "{}/ceramic/interests/{sort_key}/{sort_value}", + self.base_path, + sort_key = utf8_percent_encode(¶m_sort_key.to_string(), ID_ENCODE_SET), + sort_value = utf8_percent_encode(¶m_sort_value.to_string(), ID_ENCODE_SET) + ); + + // Query parameters + let query_string = { + let mut query_string = form_urlencoded::Serializer::new("".to_owned()); + if let Some(param_controller) = param_controller { + query_string.append_pair("controller", ¶m_controller); + } + if let Some(param_stream_id) = param_stream_id { + query_string.append_pair("streamId", ¶m_stream_id); + } + query_string.finish() + }; + if !query_string.is_empty() { + uri += "?"; + uri += &query_string; + } + + let uri = match Uri::from_str(&uri) { + Ok(uri) => uri, + Err(err) => return Err(ApiError(format!("Unable to build URI: {}", err))), + }; + + let mut request = match Request::builder() + .method("POST") + .uri(uri) + .body(Body::empty()) + { + Ok(req) => req, + Err(e) => return Err(ApiError(format!("Unable to create request: {}", e))), + }; + + let header = HeaderValue::from_str(Has::::get(context).0.as_str()); + request.headers_mut().insert( + HeaderName::from_static("x-span-id"), + match header { + Ok(h) => h, + Err(e) => { + return Err(ApiError(format!( + "Unable to create X-Span ID header value: {}", + e + ))) + } + }, + ); + + let response = client_service + .call((request, context.clone())) + .map_err(|e| ApiError(format!("No response received: {}", e))) + .await?; + + match response.status().as_u16() { + 204 => Ok(InterestsSortKeySortValuePostResponse::Success), + code => { + let headers = response.headers().clone(); + let body = response.into_body().take(100).into_raw().await; + Err(ApiError(format!( + "Unexpected response code {}:\n{:?}\n\n{}", + code, + headers, + match body { + Ok(body) => match String::from_utf8(body) { + Ok(body) => body, + Err(e) => format!("", e), + }, + Err(e) => format!("", e), + } + ))) + } + } + } + async fn liveness_get(&self, context: &C) -> Result { let mut client_service = self.client_service.clone(); let mut uri = format!("{}/ceramic/liveness", self.base_path); diff --git a/api-server/src/lib.rs b/api-server/src/lib.rs index 1a98b208d..6488ce59c 100644 --- a/api-server/src/lib.rs +++ b/api-server/src/lib.rs @@ -28,6 +28,12 @@ pub enum EventsPostResponse { Success, } +#[derive(Debug, PartialEq, Serialize, Deserialize)] +pub enum InterestsSortKeySortValuePostResponse { + /// success + Success, +} + #[derive(Debug, PartialEq, Serialize, Deserialize)] pub enum LivenessGetResponse { /// success @@ -64,6 +70,16 @@ pub trait Api { context: &C, ) -> Result; + /// Register interest for a sort key + async fn interests_sort_key_sort_value_post( + &self, + sort_key: String, + sort_value: String, + controller: Option, + stream_id: Option, + context: &C, + ) -> Result; + /// Test the liveness of the Ceramic node async fn liveness_get(&self, context: &C) -> Result; @@ -97,6 +113,15 @@ pub trait ApiNoContext { /// Creates a new event async fn events_post(&self, event: models::Event) -> Result; + /// Register interest for a sort key + async fn interests_sort_key_sort_value_post( + &self, + sort_key: String, + sort_value: String, + controller: Option, + stream_id: Option, + ) -> Result; + /// Test the liveness of the Ceramic node async fn liveness_get(&self) -> Result; @@ -146,6 +171,22 @@ impl + Send + Sync, C: Clone + Send + Sync> ApiNoContext for Contex self.api().events_post(event, &context).await } + /// Register interest for a sort key + async fn interests_sort_key_sort_value_post( + &self, + sort_key: String, + sort_value: String, + controller: Option, + stream_id: Option, + ) -> Result { + let context = self.context().clone(); + self.api() + .interests_sort_key_sort_value_post( + sort_key, sort_value, controller, stream_id, &context, + ) + .await + } + /// Test the liveness of the Ceramic node async fn liveness_get(&self) -> Result { let context = self.context().clone(); diff --git a/api-server/src/server/mod.rs b/api-server/src/server/mod.rs index 959c026aa..0f1b9241e 100644 --- a/api-server/src/server/mod.rs +++ b/api-server/src/server/mod.rs @@ -22,8 +22,8 @@ pub use crate::context; type ServiceFuture = BoxFuture<'static, Result, crate::ServiceError>>; use crate::{ - Api, EventsPostResponse, LivenessGetResponse, SubscribeSortKeySortValueGetResponse, - VersionPostResponse, + Api, EventsPostResponse, InterestsSortKeySortValuePostResponse, LivenessGetResponse, + SubscribeSortKeySortValueGetResponse, VersionPostResponse, }; mod paths { @@ -32,6 +32,7 @@ mod paths { lazy_static! { pub static ref GLOBAL_REGEX_SET: regex::RegexSet = regex::RegexSet::new(vec![ r"^/ceramic/events$", + r"^/ceramic/interests/(?P[^/?#]*)/(?P[^/?#]*)$", r"^/ceramic/liveness$", r"^/ceramic/subscribe/(?P[^/?#]*)/(?P[^/?#]*)$", r"^/ceramic/version$" @@ -39,8 +40,17 @@ mod paths { .expect("Unable to create global regex set"); } pub(crate) static ID_EVENTS: usize = 0; - pub(crate) static ID_LIVENESS: usize = 1; - pub(crate) static ID_SUBSCRIBE_SORT_KEY_SORT_VALUE: usize = 2; + pub(crate) static ID_INTERESTS_SORT_KEY_SORT_VALUE: usize = 1; + lazy_static! { + pub static ref REGEX_INTERESTS_SORT_KEY_SORT_VALUE: regex::Regex = + #[allow(clippy::invalid_regex)] + regex::Regex::new( + r"^/ceramic/interests/(?P[^/?#]*)/(?P[^/?#]*)$" + ) + .expect("Unable to create regex for INTERESTS_SORT_KEY_SORT_VALUE"); + } + pub(crate) static ID_LIVENESS: usize = 2; + pub(crate) static ID_SUBSCRIBE_SORT_KEY_SORT_VALUE: usize = 3; lazy_static! { pub static ref REGEX_SUBSCRIBE_SORT_KEY_SORT_VALUE: regex::Regex = #[allow(clippy::invalid_regex)] @@ -49,7 +59,7 @@ mod paths { ) .expect("Unable to create regex for SUBSCRIBE_SORT_KEY_SORT_VALUE"); } - pub(crate) static ID_VERSION: usize = 3; + pub(crate) static ID_VERSION: usize = 4; } pub struct MakeService @@ -235,6 +245,128 @@ where } } + // InterestsSortKeySortValuePost - POST /interests/{sort_key}/{sort_value} + hyper::Method::POST if path.matched(paths::ID_INTERESTS_SORT_KEY_SORT_VALUE) => { + // Path parameters + let path: &str = uri.path(); + let path_params = + paths::REGEX_INTERESTS_SORT_KEY_SORT_VALUE + .captures(path) + .unwrap_or_else(|| + panic!("Path {} matched RE INTERESTS_SORT_KEY_SORT_VALUE in set but failed match against \"{}\"", path, paths::REGEX_INTERESTS_SORT_KEY_SORT_VALUE.as_str()) + ); + + let param_sort_key = match percent_encoding::percent_decode(path_params["sort_key"].as_bytes()).decode_utf8() { + Ok(param_sort_key) => match param_sort_key.parse::() { + Ok(param_sort_key) => param_sort_key, + Err(e) => return Ok(Response::builder() + .status(StatusCode::BAD_REQUEST) + .body(Body::from(format!("Couldn't parse path parameter sort_key: {}", e))) + .expect("Unable to create Bad Request response for invalid path parameter")), + }, + Err(_) => return Ok(Response::builder() + .status(StatusCode::BAD_REQUEST) + .body(Body::from(format!("Couldn't percent-decode path parameter as UTF-8: {}", &path_params["sort_key"]))) + .expect("Unable to create Bad Request response for invalid percent decode")) + }; + + let param_sort_value = match percent_encoding::percent_decode(path_params["sort_value"].as_bytes()).decode_utf8() { + Ok(param_sort_value) => match param_sort_value.parse::() { + Ok(param_sort_value) => param_sort_value, + Err(e) => return Ok(Response::builder() + .status(StatusCode::BAD_REQUEST) + .body(Body::from(format!("Couldn't parse path parameter sort_value: {}", e))) + .expect("Unable to create Bad Request response for invalid path parameter")), + }, + Err(_) => return Ok(Response::builder() + .status(StatusCode::BAD_REQUEST) + .body(Body::from(format!("Couldn't percent-decode path parameter as UTF-8: {}", &path_params["sort_value"]))) + .expect("Unable to create Bad Request response for invalid percent decode")) + }; + + // Query parameters (note that non-required or collection query parameters will ignore garbage values, rather than causing a 400 response) + let query_params = + form_urlencoded::parse(uri.query().unwrap_or_default().as_bytes()) + .collect::>(); + let param_controller = query_params + .iter() + .filter(|e| e.0 == "controller") + .map(|e| e.1.clone()) + .next(); + let param_controller = match param_controller { + Some(param_controller) => { + let param_controller = + ::from_str(¶m_controller); + match param_controller { + Ok(param_controller) => Some(param_controller), + Err(e) => return Ok(Response::builder() + .status(StatusCode::BAD_REQUEST) + .body(Body::from(format!("Couldn't parse query parameter controller - doesn't match schema: {}", e))) + .expect("Unable to create Bad Request response for invalid query parameter controller")), + } + } + None => None, + }; + let param_stream_id = query_params + .iter() + .filter(|e| e.0 == "streamId") + .map(|e| e.1.clone()) + .next(); + let param_stream_id = match param_stream_id { + Some(param_stream_id) => { + let param_stream_id = + ::from_str(¶m_stream_id); + match param_stream_id { + Ok(param_stream_id) => Some(param_stream_id), + Err(e) => return Ok(Response::builder() + .status(StatusCode::BAD_REQUEST) + .body(Body::from(format!("Couldn't parse query parameter streamId - doesn't match schema: {}", e))) + .expect("Unable to create Bad Request response for invalid query parameter streamId")), + } + } + None => None, + }; + + let result = api_impl + .interests_sort_key_sort_value_post( + param_sort_key, + param_sort_value, + param_controller, + param_stream_id, + &context, + ) + .await; + let mut response = Response::new(Body::empty()); + response.headers_mut().insert( + HeaderName::from_static("x-span-id"), + HeaderValue::from_str( + (&context as &dyn Has) + .get() + .0 + .clone() + .as_str(), + ) + .expect("Unable to create X-Span-ID header value"), + ); + + match result { + Ok(rsp) => match rsp { + InterestsSortKeySortValuePostResponse::Success => { + *response.status_mut() = StatusCode::from_u16(204) + .expect("Unable to turn 204 into a StatusCode"); + } + }, + Err(_) => { + // Application code returned an error. This should not happen, as the implementation should + // return a valid response. + *response.status_mut() = StatusCode::INTERNAL_SERVER_ERROR; + *response.body_mut() = Body::from("An internal error occurred"); + } + } + + Ok(response) + } + // LivenessGet - GET /liveness hyper::Method::GET if path.matched(paths::ID_LIVENESS) => { let result = api_impl.liveness_get(&context).await; @@ -478,6 +610,7 @@ where } _ if path.matched(paths::ID_EVENTS) => method_not_allowed(), + _ if path.matched(paths::ID_INTERESTS_SORT_KEY_SORT_VALUE) => method_not_allowed(), _ if path.matched(paths::ID_LIVENESS) => method_not_allowed(), _ if path.matched(paths::ID_SUBSCRIBE_SORT_KEY_SORT_VALUE) => method_not_allowed(), _ if path.matched(paths::ID_VERSION) => method_not_allowed(), @@ -499,6 +632,10 @@ impl RequestParser for ApiRequestParser { match *request.method() { // EventsPost - POST /events hyper::Method::POST if path.matched(paths::ID_EVENTS) => Some("EventsPost"), + // InterestsSortKeySortValuePost - POST /interests/{sort_key}/{sort_value} + hyper::Method::POST if path.matched(paths::ID_INTERESTS_SORT_KEY_SORT_VALUE) => { + Some("InterestsSortKeySortValuePost") + } // LivenessGet - GET /liveness hyper::Method::GET if path.matched(paths::ID_LIVENESS) => Some("LivenessGet"), // SubscribeSortKeySortValueGet - GET /subscribe/{sort_key}/{sort_value} diff --git a/api/ceramic.yaml b/api/ceramic.yaml index e0216cd39..71edaf01d 100644 --- a/api/ceramic.yaml +++ b/api/ceramic.yaml @@ -89,6 +89,38 @@ paths: '204': description: success + '/interests/{sort_key}/{sort_value}': + post: + summary: Register interest for a sort key + parameters: + - name: sort_key + in: path + description: name of the sort_key + schema: + type: string + required: true + - name: sort_value + in: path + description: value associated with the sort key + schema: + type: string + required: true + - name: controller + in: query + description: the controller to register interest for + required: false + schema: + type: string + - name: streamId + in: query + description: the stream to register interest for + required: false + schema: + type: string + responses: + '204': + description: success + components: requestBodies: Event: diff --git a/api/src/metrics/api.rs b/api/src/metrics/api.rs index 0b05616f0..6499aecff 100644 --- a/api/src/metrics/api.rs +++ b/api/src/metrics/api.rs @@ -1,7 +1,7 @@ use async_trait::async_trait; use ceramic_api_server::{ - models, Api, EventsPostResponse, LivenessGetResponse, SubscribeSortKeySortValueGetResponse, - VersionPostResponse, + models, Api, EventsPostResponse, InterestsSortKeySortValuePostResponse, LivenessGetResponse, + SubscribeSortKeySortValueGetResponse, VersionPostResponse, }; use ceramic_metrics::Recorder; use futures::Future; @@ -82,4 +82,22 @@ where self.record("/version", self.api.version_post(context)) .await } + + /// Register interest for a sort key + async fn interests_sort_key_sort_value_post( + &self, + sort_key: String, + sort_value: String, + controller: Option, + stream_id: Option, + context: &C, + ) -> Result { + self.record( + "/interests", + self.api.interests_sort_key_sort_value_post( + sort_key, sort_value, controller, stream_id, context, + ), + ) + .await + } } diff --git a/api/src/server.rs b/api/src/server.rs index d66a84941..f5ee08f45 100644 --- a/api/src/server.rs +++ b/api/src/server.rs @@ -27,8 +27,8 @@ use tracing::{debug, info, instrument, Level}; use ceramic_api_server::{ models::{self, Event}, - EventsPostResponse, LivenessGetResponse, SubscribeSortKeySortValueGetResponse, - VersionPostResponse, + EventsPostResponse, InterestsSortKeySortValuePostResponse, LivenessGetResponse, + SubscribeSortKeySortValueGetResponse, VersionPostResponse, }; use ceramic_core::{EventId, Interest, Network, PeerId, StreamId}; @@ -104,6 +104,70 @@ where marker: PhantomData, } } + + async fn store_interest( + &self, + sort_key: String, + sort_value: String, + controller: Option, + stream_id: Option, + ) -> Result<(EventId, EventId), ApiError> { + // Construct start and stop event id based on provided data. + let start_builder = EventId::builder() + .with_network(&self.network) + .with_sort_value(&sort_key, &sort_value); + let stop_builder = EventId::builder() + .with_network(&self.network) + .with_sort_value(&sort_key, &sort_value); + + let (start_builder, stop_builder) = match (controller, stream_id) { + (Some(controller), Some(stream_id)) => { + let stream_id = StreamId::from_str(&stream_id) + .map_err(|err| ApiError(format!("stream_id: {err}")))?; + ( + start_builder + .with_controller(&controller) + .with_init(&stream_id.cid), + stop_builder + .with_controller(&controller) + .with_init(&stream_id.cid), + ) + } + (Some(controller), None) => ( + start_builder.with_controller(&controller).with_min_init(), + stop_builder.with_controller(&controller).with_max_init(), + ), + (None, Some(_)) => { + return Err(ApiError( + "controller is required if stream_id is specified".to_owned(), + )) + } + (None, None) => ( + start_builder.with_min_controller().with_min_init(), + stop_builder.with_max_controller().with_max_init(), + ), + }; + + let start = start_builder.with_min_event_height().build_fencepost(); + let stop = stop_builder.with_max_event_height().build_fencepost(); + + // Update interest ranges to include this new subscription. + let interest = Interest::builder() + .with_sort_key(&sort_key) + .with_peer_id(&self.peer_id) + .with_range((start.as_slice(), stop.as_slice())) + .with_not_after(0) + .build(); + self.interest + // We must store a value for the interest otherwise Recon will try forever to + // synchronize the value. + // In the case of interests an empty value is sufficient. + .insert(interest, Some(vec![])) + .await + .map_err(|err| ApiError(format!("failed to update interest: {err}")))?; + + Ok((start, stop)) + } } use ceramic_api_server::server::MakeService; @@ -187,59 +251,10 @@ where }) .transpose()? .unwrap_or(usize::MAX); - // Construct start and stop event id based on provided data. - let start_builder = EventId::builder() - .with_network(&self.network) - .with_sort_value(&sort_key, &sort_value); - let stop_builder = EventId::builder() - .with_network(&self.network) - .with_sort_value(&sort_key, &sort_value); - - let (start_builder, stop_builder) = match (controller, stream_id) { - (Some(controller), Some(stream_id)) => { - let stream_id = StreamId::from_str(&stream_id) - .map_err(|err| ApiError(format!("stream_id: {err}")))?; - ( - start_builder - .with_controller(&controller) - .with_init(&stream_id.cid), - stop_builder - .with_controller(&controller) - .with_init(&stream_id.cid), - ) - } - (Some(controller), None) => ( - start_builder.with_controller(&controller).with_min_init(), - stop_builder.with_controller(&controller).with_max_init(), - ), - (None, Some(_)) => { - return Err(ApiError( - "controller is required if stream_id is specified".to_owned(), - )) - } - (None, None) => ( - start_builder.with_min_controller().with_min_init(), - stop_builder.with_max_controller().with_max_init(), - ), - }; - let start = start_builder.with_min_event_height().build_fencepost(); - let stop = stop_builder.with_max_event_height().build_fencepost(); - - // Update interest ranges to include this new subscription. - let interest = Interest::builder() - .with_sort_key(&sort_key) - .with_peer_id(&self.peer_id) - .with_range((start.as_slice(), stop.as_slice())) - .with_not_after(0) - .build(); - self.interest - // We must store a value for the interest otherwise Recon will try forever to - // synchronize the value. - // In the case of interests an empty value is sufficient. - .insert(interest, Some(vec![])) - .await - .map_err(|err| ApiError(format!("failed to update interest: {err}")))?; + let (start, stop) = self + .store_interest(sort_key, sort_value, controller, stream_id) + .await?; let events = self .model @@ -254,6 +269,24 @@ where .collect(); Ok(SubscribeSortKeySortValueGetResponse::Success(events)) } + + #[instrument(skip(self, _context), ret(level = Level::DEBUG), err(level = Level::ERROR))] + async fn interests_sort_key_sort_value_post( + &self, + sort_key: String, + sort_value: String, + controller: Option, + stream_id: Option, + _context: &C, + ) -> Result { + debug!( + ?self.network, + sort_key, sort_value, controller, "interest registration params" + ); + self.store_interest(sort_key, sort_value, controller, stream_id) + .await?; + Ok(InterestsSortKeySortValuePostResponse::Success) + } } fn decode_event_id(value: &str) -> Result {