Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initialize state on first request. #31

Merged
merged 7 commits into from
Aug 23, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 24 additions & 7 deletions crates/sdk/src/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ pub use error::*;
#[async_trait]
pub trait Connector: Send {
/// The type of validated configuration
type Configuration: Sync + Send;
type Configuration: Send + Sync;
/// The type of unserializable state
type State: Sync + Send;
type State: Send + Sync;

/// Update any metrics from the state
///
Expand Down Expand Up @@ -140,8 +140,19 @@ pub trait Connector: Send {
///
/// See [`Connector`] for further details.
#[async_trait]
pub trait ConnectorSetup {
type Connector: Connector;
pub trait ConnectorSetup:
ParseConfiguration<Configuration = <Self::Connector as Connector>::Configuration>
+ InitState<
Configuration = <Self::Connector as Connector>::Configuration,
State = <Self::Connector as Connector>::State,
> + 'static
{
type Connector: Connector<Configuration: Clone, State: Clone> + 'static;
}

#[async_trait]
pub trait ParseConfiguration {
type Configuration;

/// Validate the configuration provided by the user, returning a configuration error or a
/// validated [`Connector::Configuration`].
Expand All @@ -151,7 +162,13 @@ pub trait ConnectorSetup {
async fn parse_configuration(
&self,
configuration_dir: impl AsRef<Path> + Send,
) -> Result<<Self::Connector as Connector>::Configuration>;
) -> Result<Self::Configuration>;
}

#[async_trait]
pub trait InitState: Send + Sync {
type Configuration;
type State;

/// Initialize the connector's in-memory state.
///
Expand All @@ -162,7 +179,7 @@ pub trait ConnectorSetup {
/// registry.
async fn try_init_state(
&self,
configuration: &<Self::Connector as Connector>::Configuration,
configuration: &Self::Configuration,
metrics: &mut prometheus::Registry,
) -> Result<<Self::Connector as Connector>::State>;
) -> Result<Self::State>;
}
15 changes: 13 additions & 2 deletions crates/sdk/src/connector/example.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,21 @@ use super::*;
pub struct Example {}

#[async_trait]
impl ConnectorSetup for Example {
type Connector = Self;
impl ParseConfiguration for Example {
type Configuration = ();

async fn parse_configuration(
&self,
_configuration_dir: impl AsRef<Path> + Send,
) -> Result<<Self as Connector>::Configuration> {
Ok(())
}
}

#[async_trait]
impl InitState for Example {
type Configuration = ();
type State = ();

async fn try_init_state(
&self,
Expand All @@ -29,6 +35,11 @@ impl ConnectorSetup for Example {
}
}

#[async_trait]
impl ConnectorSetup for Example {
type Connector = Self;
}

#[async_trait]
impl Connector for Example {
type Configuration = ();
Expand Down
55 changes: 12 additions & 43 deletions crates/sdk/src/default_main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use crate::connector::{Connector, ConnectorSetup, ErrorResponse, Result};
use crate::fetch_metrics::fetch_metrics;
use crate::json_rejection::JsonRejection;
use crate::json_response::JsonResponse;
use crate::state::ServerState;
use crate::tracing::{init_tracing, make_span, on_response};

#[derive(Parser)]
Expand Down Expand Up @@ -138,37 +139,6 @@ struct CheckHealthCommand {

type Port = u16;

#[derive(Debug)]
pub struct ServerState<C: Connector> {
configuration: C::Configuration,
state: C::State,
metrics: Registry,
}

impl<C: Connector> Clone for ServerState<C>
where
C::Configuration: Clone,
C::State: Clone,
{
fn clone(&self) -> Self {
Self {
configuration: self.configuration.clone(),
state: self.state.clone(),
metrics: self.metrics.clone(),
}
}
}

impl<C: Connector> ServerState<C> {
pub fn new(configuration: C::Configuration, state: C::State, metrics: Registry) -> Self {
Self {
configuration,
state,
metrics,
}
}
}

/// A default main function for a connector.
///
/// The intent is that this function can replace your `main` function
Expand Down Expand Up @@ -244,8 +214,8 @@ where
let server_state = init_server_state(setup, serve_command.configuration).await?;

let router = create_router::<Setup::Connector>(
server_state.clone(),
serve_command.service_token_secret.clone(),
server_state,
serve_command.service_token_secret,
serve_command.max_request_size,
);

Expand Down Expand Up @@ -292,10 +262,9 @@ pub async fn init_server_state<Setup: ConnectorSetup>(
setup: Setup,
config_directory: impl AsRef<Path> + Send,
) -> Result<ServerState<Setup::Connector>> {
let mut metrics = Registry::new();
let metrics = Registry::new();
let configuration = setup.parse_configuration(config_directory).await?;
let state = setup.try_init_state(&configuration, &mut metrics).await?;
Ok(ServerState::new(configuration, state, metrics))
Ok(ServerState::new(configuration, setup, metrics))
}

pub fn create_router<C>(
Expand Down Expand Up @@ -385,7 +354,7 @@ fn auth_handler(
}

async fn get_metrics<C: Connector>(State(state): State<ServerState<C>>) -> Result<String> {
fetch_metrics::<C>(&state.configuration, &state.state, &state.metrics)
fetch_metrics::<C>(state.configuration(), state.state().await?, state.metrics())
}

async fn get_capabilities<C: Connector>() -> JsonResponse<CapabilitiesResponse> {
Expand All @@ -398,41 +367,41 @@ async fn get_capabilities<C: Connector>() -> JsonResponse<CapabilitiesResponse>
}

async fn get_health_readiness<C: Connector>(State(state): State<ServerState<C>>) -> Result<()> {
C::get_health_readiness(&state.configuration, &state.state).await
C::get_health_readiness(state.configuration(), state.state().await?).await
}

async fn get_schema<C: Connector>(
State(state): State<ServerState<C>>,
) -> Result<JsonResponse<SchemaResponse>> {
C::get_schema(&state.configuration).await
C::get_schema(state.configuration()).await
}

async fn post_query_explain<C: Connector>(
State(state): State<ServerState<C>>,
WithRejection(Json(request), _): WithRejection<Json<QueryRequest>, JsonRejection>,
) -> Result<JsonResponse<ExplainResponse>> {
C::query_explain(&state.configuration, &state.state, request).await
C::query_explain(state.configuration(), state.state().await?, request).await
}

async fn post_mutation_explain<C: Connector>(
State(state): State<ServerState<C>>,
WithRejection(Json(request), _): WithRejection<Json<MutationRequest>, JsonRejection>,
) -> Result<JsonResponse<ExplainResponse>> {
C::mutation_explain(&state.configuration, &state.state, request).await
C::mutation_explain(state.configuration(), state.state().await?, request).await
}

async fn post_mutation<C: Connector>(
State(state): State<ServerState<C>>,
WithRejection(Json(request), _): WithRejection<Json<MutationRequest>, JsonRejection>,
) -> Result<JsonResponse<MutationResponse>> {
C::mutation(&state.configuration, &state.state, request).await
C::mutation(state.configuration(), state.state().await?, request).await
}

async fn post_query<C: Connector>(
State(state): State<ServerState<C>>,
WithRejection(Json(request), _): WithRejection<Json<QueryRequest>, JsonRejection>,
) -> Result<JsonResponse<QueryResponse>> {
C::query(&state.configuration, &state.state, request).await
C::query(state.configuration(), state.state().await?, request).await
}

#[cfg(feature = "ndc-test")]
Expand Down
1 change: 1 addition & 0 deletions crates/sdk/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ pub mod default_main;
pub mod fetch_metrics;
pub mod json_rejection;
pub mod json_response;
pub mod state;
pub mod tracing;

pub use ndc_models as models;
81 changes: 81 additions & 0 deletions crates/sdk/src/state.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
use std::sync::Arc;

use tokio::sync::OnceCell;

use crate::connector::error::*;
use crate::connector::{Connector, InitState};

/// Everything we need to keep in memory.
pub struct ServerState<C: Connector> {
configuration: C::Configuration,
state: Arc<ApplicationState<C>>,
metrics: prometheus::Registry,
}

/// The application state, which may or may not be initialized.
struct ApplicationState<C: Connector> {
cell: OnceCell<C::State>,
init_state: Box<dyn InitState<Configuration = C::Configuration, State = C::State>>,
}

// Server state must be cloneable even if the underlying connector is not.
// We only require `Connector::Configuration` to be cloneable.
//
// Server state is always stored in an `Arc`, so is therefore cloneable.
impl<C: Connector> Clone for ServerState<C>
where
C::Configuration: Clone,
{
fn clone(&self) -> Self {
Self {
configuration: self.configuration.clone(),
state: self.state.clone(),
metrics: self.metrics.clone(),
}
}
}

impl<C: Connector> ServerState<C> {
/// Construct a new server state.
pub fn new(
configuration: C::Configuration,
init_state: impl InitState<Configuration = C::Configuration, State = C::State> + 'static,
metrics: prometheus::Registry,
) -> Self {
Self {
configuration,
state: Arc::new(ApplicationState {
cell: OnceCell::new(),
init_state: Box::new(init_state),
}),
metrics,
}
}

/// The server configuration.
pub fn configuration(&self) -> &C::Configuration {
&self.configuration
}

/// The transient server state.
///
/// If the state has not yet been initialized, this initializes it.
///
/// On initialization failure, this function will also fail, and subsequent calls will retry.
pub async fn state(&self) -> Result<&C::State> {
self.state
.cell
.get_or_try_init(|| async {
self.state
.init_state
.try_init_state(&self.configuration, &mut self.metrics.clone())
.await
})
.await
}

/// The server metrics.
pub fn metrics(&self) -> &prometheus::Registry {
&self.metrics
}
}
Loading