From 91c2bd7e23e32e2656e612146d63bed50d6e381d Mon Sep 17 00:00:00 2001 From: Aaron Arinder Date: Fri, 11 Oct 2024 10:40:01 -0400 Subject: [PATCH] fix(composition): general cleanup --- src/composition/run_composition.rs | 11 +++++++--- src/composition/runner.rs | 34 +++++++++++++++++++----------- 2 files changed, 30 insertions(+), 15 deletions(-) diff --git a/src/composition/run_composition.rs b/src/composition/run_composition.rs index 5899e7a35..15de0fc9b 100644 --- a/src/composition/run_composition.rs +++ b/src/composition/run_composition.rs @@ -143,9 +143,14 @@ mod tests { .read_file(mock_read_file) .build(); - let subgraph_change_events: BoxStream = - once(async { SubgraphEvent::SubgraphChanged(SubgraphSchemaChanged::default()) }) - .boxed(); + let subgraph_change_events: BoxStream = once(async { + SubgraphEvent::SubgraphChanged(SubgraphSchemaChanged::new( + "some-name".to_string(), + "some sdl".to_string(), + )) + }) + .boxed(); + let (mut composition_messages, composition_subtask) = Subtask::new(composition_handler); let abort_handle = composition_subtask.run(subgraph_change_events); diff --git a/src/composition/runner.rs b/src/composition/runner.rs index 59c676c58..66fd31f70 100644 --- a/src/composition/runner.rs +++ b/src/composition/runner.rs @@ -213,7 +213,7 @@ pub enum SubgraphEvent { } /// An event denoting that the subgraph has changed, emitting its name and the SDL reflecting that /// change -#[derive(derive_getters::Getters, Default)] +#[derive(derive_getters::Getters)] pub struct SubgraphSchemaChanged { /// Subgraph name name: String, @@ -221,6 +221,13 @@ pub struct SubgraphSchemaChanged { sdl: String, } +#[cfg(test)] +impl SubgraphSchemaChanged { + pub fn new(name: String, sdl: String) -> Self { + Self { name, sdl } + } +} + /// The subgraph is no longer watched #[derive(derive_getters::Getters, Default)] pub struct SubgraphSchemaRemoved { @@ -309,17 +316,20 @@ impl SubtaskHandleStream for SubgraphWatchers { Subtask::::new(subgraph_watcher); let sender = sender.clone(); - let subgraph_name_c = subgraph_name.clone(); - let messages_abort_handle = tokio::spawn(async move { - while let Some(change) = messages.next().await { - let _ = sender - .send(SubgraphEvent::SubgraphChanged( - SubgraphSchemaChanged { - name: subgraph_name_c.to_string(), - sdl: change.sdl().to_string(), - }, - )) - .tap_err(|err| tracing::error!("{:?}", err)); + let messages_abort_handle = tokio::spawn({ + let subgraph_name = subgraph_name.clone(); + + async move { + while let Some(change) = messages.next().await { + let _ = sender + .send(SubgraphEvent::SubgraphChanged( + SubgraphSchemaChanged { + name: subgraph_name.to_string(), + sdl: change.sdl().to_string(), + }, + )) + .tap_err(|err| tracing::error!("{:?}", err)); + } } }) .abort_handle();