Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(topology): Allow internal topologies to be controlled #20286

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 29 additions & 8 deletions src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,12 +257,29 @@ impl Application {
require_healthy: root_opts.require_healthy,
#[cfg(feature = "enterprise")]
enterprise_reporter: config.enterprise,
extra_context: config.extra_context,
extra_context: config.extra_context.clone(),
});

let internal_topologies: Vec<_> = config
.internal_topologies
.into_iter()
.map(|topology| {
SharedTopologyController::new(TopologyController {
#[cfg(feature = "api")]
api_server: None,
topology,
config_paths: Vec::new(),
require_healthy: root_opts.require_healthy,
#[cfg(feature = "enterprise")]
enterprise_reporter: None,
extra_context: config.extra_context.clone(),
})
})
.collect();

Ok(StartedApplication {
config_paths: config.config_paths,
internal_topologies: config.internal_topologies,
internal_topologies,
graceful_crash_receiver: config.graceful_crash_receiver,
signals,
topology_controller,
Expand All @@ -273,7 +290,7 @@ impl Application {

pub struct StartedApplication {
pub config_paths: Vec<ConfigPath>,
pub internal_topologies: Vec<RunningTopology>,
pub internal_topologies: Vec<SharedTopologyController>,
pub graceful_crash_receiver: ShutdownErrorReceiver,
pub signals: SignalPair,
pub topology_controller: SharedTopologyController,
Expand Down Expand Up @@ -391,7 +408,7 @@ pub struct FinishedApplication {
pub signal: SignalTo,
pub signal_rx: SignalRx,
pub topology_controller: SharedTopologyController,
pub internal_topologies: Vec<RunningTopology>,
pub internal_topologies: Vec<SharedTopologyController>,
}

impl FinishedApplication {
Expand All @@ -403,11 +420,11 @@ impl FinishedApplication {
internal_topologies,
} = self;

// At this point, we'll have the only reference to the shared topology controller and can
// safely remove it from the wrapper to shut down the topology.
// At this point, we should have the only reference to the shared topology controllers and
// can safely remove it from the wrapper to shut down the topology.
let topology_controller = topology_controller
.try_into_inner()
.expect("fail to unwrap topology controller")
.expect("Topology controller is still shared, cannot stop")
.into_inner();

let status = match signal {
Expand All @@ -417,7 +434,11 @@ impl FinishedApplication {
};

for topology in internal_topologies {
topology.stop().await;
let Ok(topology) = topology.try_into_inner() else {
warn!("Internal topology controller is still shared, cannot stop.");
continue;
};
topology.into_inner().stop().await;
}

status
Expand Down
Loading