Skip to content

Commit

Permalink
DOES NOT COMPILE
Browse files Browse the repository at this point in the history
  • Loading branch information
SamirTalwar committed Aug 22, 2024
1 parent c48e22c commit cf32100
Show file tree
Hide file tree
Showing 4 changed files with 128 additions and 32 deletions.
31 changes: 24 additions & 7 deletions crates/sdk/src/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ pub use error::*;
#[async_trait]
pub trait Connector: Send {
/// The type of validated configuration
type Configuration: Sync + Send;
type Configuration: Send + Sync;
/// The type of unserializable state
type State: Sync + Send;
type State: Send + Sync;

/// Update any metrics from the state
///
Expand Down Expand Up @@ -140,8 +140,19 @@ pub trait Connector: Send {
///
/// See [`Connector`] for further details.
#[async_trait]
pub trait ConnectorSetup {
type Connector: Connector;
pub trait ConnectorSetup:
ParseConfiguration<Configuration = <Self::Connector as Connector>::Configuration>
+ InitState<
Configuration = <Self::Connector as Connector>::Configuration,
State = <Self::Connector as Connector>::State,
> + 'static
{
type Connector: Connector<Configuration: Clone, State: Clone> + 'static;
}

#[async_trait]
pub trait ParseConfiguration {
type Configuration;

/// Validate the configuration provided by the user, returning a configuration error or a
/// validated [`Connector::Configuration`].
Expand All @@ -151,7 +162,13 @@ pub trait ConnectorSetup {
async fn parse_configuration(
&self,
configuration_dir: impl AsRef<Path> + Send,
) -> Result<<Self::Connector as Connector>::Configuration>;
) -> Result<Self::Configuration>;
}

#[async_trait]
pub trait InitState: Send + Sync {
type Configuration;
type State;

/// Initialize the connector's in-memory state.
///
Expand All @@ -162,7 +179,7 @@ pub trait ConnectorSetup {
/// registry.
async fn try_init_state(
&self,
configuration: &<Self::Connector as Connector>::Configuration,
configuration: &Self::Configuration,
metrics: &mut prometheus::Registry,
) -> Result<<Self::Connector as Connector>::State>;
) -> Result<Self::State>;
}
15 changes: 13 additions & 2 deletions crates/sdk/src/connector/example.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,21 @@ use super::*;
pub struct Example {}

#[async_trait]
impl ConnectorSetup for Example {
type Connector = Self;
impl ParseConfiguration for Example {
type Configuration = ();

async fn parse_configuration(
&self,
_configuration_dir: impl AsRef<Path> + Send,
) -> Result<<Self as Connector>::Configuration> {
Ok(())
}
}

#[async_trait]
impl InitState for Example {
type Configuration = ();
type State = ();

async fn try_init_state(
&self,
Expand All @@ -29,6 +35,11 @@ impl ConnectorSetup for Example {
}
}

#[async_trait]
impl ConnectorSetup for Example {
type Connector = Self;
}

#[async_trait]
impl Connector for Example {
type Configuration = ();
Expand Down
47 changes: 32 additions & 15 deletions crates/sdk/src/default_main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,10 +262,9 @@ pub async fn init_server_state<Setup: ConnectorSetup>(
setup: Setup,
config_directory: impl AsRef<Path> + Send,
) -> Result<ServerState<Setup::Connector>> {
let mut metrics = Registry::new();
let metrics = Registry::new();
let configuration = setup.parse_configuration(config_directory).await?;
let state = setup.try_init_state(&configuration, &mut metrics).await?;
Ok(ServerState::new(configuration, state, metrics))
Ok(ServerState::new(configuration, setup, metrics))
}

pub fn create_router<C>(
Expand Down Expand Up @@ -354,8 +353,11 @@ fn auth_handler(
}
}

async fn get_metrics<C: Connector>(State(state): State<ServerState<C>>) -> Result<String> {
fetch_metrics::<C>(state.configuration(), state.state(), state.metrics())
async fn get_metrics<C: Connector>(State(state): State<ServerState<C>>) -> Result<String>
where
C::State: Clone,
{
fetch_metrics::<C>(state.configuration(), state.state().await?, state.metrics())
}

async fn get_capabilities<C: Connector>() -> JsonResponse<CapabilitiesResponse> {
Expand All @@ -367,8 +369,11 @@ async fn get_capabilities<C: Connector>() -> JsonResponse<CapabilitiesResponse>
.into()
}

async fn get_health_readiness<C: Connector>(State(state): State<ServerState<C>>) -> Result<()> {
C::get_health_readiness(state.configuration(), state.state()).await
async fn get_health_readiness<C: Connector>(State(state): State<ServerState<C>>) -> Result<()>
where
C::State: Clone,
{
C::get_health_readiness(state.configuration(), state.state().await?).await
}

async fn get_schema<C: Connector>(
Expand All @@ -380,29 +385,41 @@ async fn get_schema<C: Connector>(
async fn post_query_explain<C: Connector>(
State(state): State<ServerState<C>>,
WithRejection(Json(request), _): WithRejection<Json<QueryRequest>, JsonRejection>,
) -> Result<JsonResponse<ExplainResponse>> {
C::query_explain(state.configuration(), state.state(), request).await
) -> Result<JsonResponse<ExplainResponse>>
where
C::State: Clone,
{
C::query_explain(state.configuration(), state.state().await?, request).await
}

async fn post_mutation_explain<C: Connector>(
State(state): State<ServerState<C>>,
WithRejection(Json(request), _): WithRejection<Json<MutationRequest>, JsonRejection>,
) -> Result<JsonResponse<ExplainResponse>> {
C::mutation_explain(state.configuration(), state.state(), request).await
) -> Result<JsonResponse<ExplainResponse>>
where
C::State: Clone,
{
C::mutation_explain(state.configuration(), state.state().await?, request).await
}

async fn post_mutation<C: Connector>(
State(state): State<ServerState<C>>,
WithRejection(Json(request), _): WithRejection<Json<MutationRequest>, JsonRejection>,
) -> Result<JsonResponse<MutationResponse>> {
C::mutation(state.configuration(), state.state(), request).await
) -> Result<JsonResponse<MutationResponse>>
where
C::State: Clone,
{
C::mutation(state.configuration(), state.state().await?, request).await
}

async fn post_query<C: Connector>(
State(state): State<ServerState<C>>,
WithRejection(Json(request), _): WithRejection<Json<QueryRequest>, JsonRejection>,
) -> Result<JsonResponse<QueryResponse>> {
C::query(state.configuration(), state.state(), request).await
) -> Result<JsonResponse<QueryResponse>>
where
C::State: Clone,
{
C::query(state.configuration(), state.state().await?, request).await
}

#[cfg(feature = "ndc-test")]
Expand Down
67 changes: 59 additions & 8 deletions crates/sdk/src/state.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,23 @@
use crate::connector::Connector;
use std::sync::{Arc, Mutex, OnceLock};

#[derive(Debug)]
use crate::connector::error::*;
use crate::connector::{Connector, InitState};

/// Everything we need to keep in memory.
pub struct ServerState<C: Connector> {
configuration: C::Configuration,
state: C::State,
state: Arc<(
OnceLock<C::State>,
Mutex<Box<dyn InitState<Configuration = C::Configuration, State = C::State>>>,
)>,
metrics: prometheus::Registry,
}

// Server state must be cloneable even if the underlying connector is not.
// We only require `Connector::Configuration` to be cloneable.
impl<C: Connector> Clone for ServerState<C>
where
C::Configuration: Clone,
C::State: Clone,
{
fn clone(&self) -> Self {
Self {
Expand All @@ -22,27 +29,71 @@ where
}

impl<C: Connector> ServerState<C> {
/// Construct a new server state.
pub fn new(
configuration: C::Configuration,
state: C::State,
init_state: impl InitState<Configuration = C::Configuration, State = C::State> + 'static,
metrics: prometheus::Registry,
) -> Self {
Self {
configuration,
state,
state: Arc::new((OnceLock::new(), Mutex::new(Box::new(init_state)))),
metrics,
}
}

/// The server configuration.
pub fn configuration(&self) -> &C::Configuration {
&self.configuration
}

pub fn state(&self) -> &C::State {
&self.state
/// The server state.
///
/// If the state has not yet been initialized, this initializes it.
///
/// On initialization failure, this function will also fail, and subsequent calls will retry.
pub async fn state(&self) -> Result<&C::State> {
// If the state is already created, return it.
if let Some(state) = self.state.0.get() {
return Ok(state);
}

{
let init_state = self.state.1.lock().map_err(|_| poisoned())?;
match self.state.0.get() {
// If the state was created before we acquired the lock, return it.
Some(state) => Ok(state),
// If not, let's call `setup.try_init_state` to create it.
// Failures are propagated outwards.
None => {
let new_state = init_state
.try_init_state(&self.configuration, &mut self.metrics.clone())
.await?;
self.state.0.set(new_state);
self.state
.0
.get()
.ok_or_else(|| unreachable!("uninitialized state"))
}
}
}
}

/// The server metrics.
pub fn metrics(&self) -> &prometheus::Registry {
&self.metrics
}
}

enum ServerStateState<Configuration, State> {
Uninitialized(Box<dyn InitState<Configuration = Configuration, State = State>>),
Initialized(Arc<State>),
}

fn poisoned() -> ErrorResponse {
ErrorResponse::new(
http::StatusCode::INTERNAL_SERVER_ERROR,
"The state has become corrupted.".to_string(),
serde_json::Value::Null,
)
}

0 comments on commit cf32100

Please sign in to comment.