From 7bd3db63f3b0f9503aee1f4ab82601c23b996514 Mon Sep 17 00:00:00 2001 From: "Ruokun (Tommy) Niu" Date: Thu, 24 Oct 2024 14:04:44 -0700 Subject: [PATCH] Updated change router to retain subscription after a pod restart (#94) * updated router and added state manager in infra * updated state manager to be internal to router * nit * updated state manager based on PR comments * lint * moved query_state to state manager and configured arugment type for save_state * clippy lint fix --- infrastructure/comms-dapr/src/comms.rs | 2 +- sources/shared/change-router/Cargo.lock | 88 +++++++------- sources/shared/change-router/Cargo.toml | 2 +- sources/shared/change-router/src/main.rs | 108 +++++++++-------- .../shared/change-router/src/state_manager.rs | 114 ++++++++++++++++++ 5 files changed, 221 insertions(+), 93 deletions(-) create mode 100644 sources/shared/change-router/src/state_manager.rs diff --git a/infrastructure/comms-dapr/src/comms.rs b/infrastructure/comms-dapr/src/comms.rs index ab3a2844..a268dbf8 100644 --- a/infrastructure/comms-dapr/src/comms.rs +++ b/infrastructure/comms-dapr/src/comms.rs @@ -153,4 +153,4 @@ impl Invoker for DaprHttpInvoker { Err(e) => Err(Box::new(e)), } } -} +} \ No newline at end of file diff --git a/sources/shared/change-router/Cargo.lock b/sources/shared/change-router/Cargo.lock index d39d078e..25828b21 100644 --- a/sources/shared/change-router/Cargo.lock +++ b/sources/shared/change-router/Cargo.lock @@ -154,7 +154,7 @@ dependencies = [ "futures-util", "http 0.2.12", "http-body 0.4.6", - "hyper 0.14.30", + "hyper 0.14.31", "itoa", "matchit", "memchr", @@ -183,7 +183,7 @@ dependencies = [ "http 1.1.0", "http-body 1.0.1", "http-body-util", - "hyper 1.4.1", + "hyper 1.5.0", "hyper-util", "itoa", "matchit", @@ -312,9 +312,9 @@ checksum = "428d9aa8fbc0670b7b8d6030a7fadd0f86151cae55e4dbbece15f3780a3dfaf3" [[package]] name = "cc" -version = "1.1.28" +version = "1.1.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2e80e3b6a3ab07840e1cae9b0666a63970dc28e8ed5ffbcdacbfc760c281bfc1" +checksum = "b16803a61b81d9eabb7eae2588776c4c1e584b738ede45fdbb4c972cec1e9945" dependencies = [ "shlex", ] @@ -800,9 +800,9 @@ checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" [[package]] name = "hyper" -version = "0.14.30" +version = "0.14.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a152ddd61dfaec7273fe8419ab357f33aee0d914c5f4efbf0d96fa749eea5ec9" +checksum = "8c08302e8fa335b151b788c775ff56e7a03ae64ff85c548ee820fecb70356e85" dependencies = [ "bytes", "futures-channel", @@ -824,9 +824,9 @@ dependencies = [ [[package]] name = "hyper" -version = "1.4.1" +version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "50dfd22e0e76d0f662d429a5f80fcaf3855009297eab6a0a9f8543834744ba05" +checksum = "bbbff0a806a4728c99295b254c8838933b5b082d75e3cb70c8dab21fdfbcfa9a" dependencies = [ "bytes", "futures-channel", @@ -851,7 +851,7 @@ checksum = "08afdbb5c31130e3034af566421053ab03787c640246a446327f550d11bcb333" dependencies = [ "futures-util", "http 1.1.0", - "hyper 1.4.1", + "hyper 1.5.0", "hyper-util", "rustls", "rustls-pki-types", @@ -866,7 +866,7 @@ version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1" dependencies = [ - "hyper 0.14.30", + "hyper 0.14.31", "pin-project-lite", "tokio", "tokio-io-timeout", @@ -880,7 +880,7 @@ checksum = "70206fc6890eaca9fde8a0bf71caa2ddfc9fe045ac9e5c70df101a7dbde866e0" dependencies = [ "bytes", "http-body-util", - "hyper 1.4.1", + "hyper 1.5.0", "hyper-util", "native-tls", "tokio", @@ -899,7 +899,7 @@ dependencies = [ "futures-util", "http 1.1.0", "http-body 1.0.1", - "hyper 1.4.1", + "hyper 1.5.0", "pin-project-lite", "socket2", "tokio", @@ -989,9 +989,9 @@ checksum = "49f1f14873335454500d59611f1cf4a4b0f786f9ac11f4312a78e4cf2566695b" [[package]] name = "js-sys" -version = "0.3.71" +version = "0.3.72" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0cb94a0ffd3f3ee755c20f7d8752f45cac88605a4dcf808abcff72873296ec7b" +checksum = "6a88f1bda2bd75b0452a14784937d796722fdebfe50df998aeb3f0b7603019a9" dependencies = [ "wasm-bindgen", ] @@ -1137,9 +1137,9 @@ checksum = "1261fe7e33c73b354eab43b1273a57c8f967d0391e80353e51f764ac02cf6775" [[package]] name = "openssl" -version = "0.10.66" +version = "0.10.67" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9529f4786b70a3e8c61e11179af17ab6188ad8d0ded78c5529441ed39d4bd9c1" +checksum = "7b8cefcf97f41316955f9294cd61f639bdcfa9f2f230faac6cb896aa8ab64704" dependencies = [ "bitflags 2.6.0", "cfg-if", @@ -1169,9 +1169,9 @@ checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" [[package]] name = "openssl-sys" -version = "0.9.103" +version = "0.9.104" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f9e8deee91df40a943c71b917e5874b951d32a802526c85721ce3b776c929d6" +checksum = "45abf306cbf99debc8195b66b7346498d7b10c210de50418b5ccd7ceba08c741" dependencies = [ "cc", "libc", @@ -1338,9 +1338,9 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.87" +version = "1.0.88" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b3e4daa0dcf6feba26f985457cdf104d4b4256fc5a09547140f3631bb076b19a" +checksum = "7c3a7fc5db1e57d5a779a352c8cdb57b29aa4c40cc69c3a68a7fedc815fbf2f9" dependencies = [ "unicode-ident", ] @@ -1506,7 +1506,7 @@ dependencies = [ "http 1.1.0", "http-body 1.0.1", "http-body-util", - "hyper 1.4.1", + "hyper 1.5.0", "hyper-rustls", "hyper-tls", "hyper-util", @@ -1570,9 +1570,9 @@ dependencies = [ [[package]] name = "rustls" -version = "0.23.14" +version = "0.23.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "415d9944693cb90382053259f89fbb077ea730ad7273047ec63b19bc9b160ba8" +checksum = "5fbb44d7acc4e873d613422379f69f237a1b141928c02f6bc6ccfddddc2d7993" dependencies = [ "once_cell", "rustls-pki-types", @@ -1592,9 +1592,9 @@ dependencies = [ [[package]] name = "rustls-pki-types" -version = "1.9.0" +version = "1.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0e696e35370c65c9c541198af4543ccd580cf17fc25d8e05c5a242b202488c55" +checksum = "16f1201b3c9a7ee8039bcadc17b7e605e2945b27eee7631788c1bd2b0643674b" [[package]] name = "rustls-webpki" @@ -1609,9 +1609,9 @@ dependencies = [ [[package]] name = "rustversion" -version = "1.0.17" +version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "955d28af4278de8121b7ebeb796b6a45735dc01436d898801014aced2773a3d6" +checksum = "0e819f2bc632f285be6d7cd36e25940d45b2391dd6d9b939e79de557f7014248" [[package]] name = "ryu" @@ -1975,7 +1975,7 @@ dependencies = [ "h2 0.3.26", "http 0.2.12", "http-body 0.4.6", - "hyper 0.14.30", + "hyper 0.14.31", "hyper-timeout", "percent-encoding", "pin-project", @@ -2196,9 +2196,9 @@ checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" [[package]] name = "uuid" -version = "1.10.0" +version = "1.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "81dfa00651efa65069b0b6b651f4aaa31ba9e3c3ce0137aaad053604ee7e0314" +checksum = "f8c5f0a0af699448548ad1a2fbf920fb4bee257eae39953ba95cb84891a0446a" dependencies = [ "getrandom", ] @@ -2232,9 +2232,9 @@ checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" [[package]] name = "wasm-bindgen" -version = "0.2.94" +version = "0.2.95" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ef073ced962d62984fb38a36e5fdc1a2b23c9e0e1fa0689bb97afa4202ef6887" +checksum = "128d1e363af62632b8eb57219c8fd7877144af57558fb2ef0368d0087bddeb2e" dependencies = [ "cfg-if", "once_cell", @@ -2243,9 +2243,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-backend" -version = "0.2.94" +version = "0.2.95" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c4bfab14ef75323f4eb75fa52ee0a3fb59611977fd3240da19b2cf36ff85030e" +checksum = "cb6dd4d3ca0ddffd1dd1c9c04f94b868c37ff5fac97c30b97cff2d74fce3a358" dependencies = [ "bumpalo", "log", @@ -2258,9 +2258,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-futures" -version = "0.4.44" +version = "0.4.45" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "65471f79c1022ffa5291d33520cbbb53b7687b01c2f8e83b57d102eed7ed479d" +checksum = "cc7ec4f8827a71586374db3e87abdb5a2bb3a15afed140221307c3ec06b1f63b" dependencies = [ "cfg-if", "js-sys", @@ -2270,9 +2270,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro" -version = "0.2.94" +version = "0.2.95" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a7bec9830f60924d9ceb3ef99d55c155be8afa76954edffbb5936ff4509474e7" +checksum = "e79384be7f8f5a9dd5d7167216f022090cf1f9ec128e6e6a482a2cb5c5422c56" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -2280,9 +2280,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.94" +version = "0.2.95" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c74f6e152a76a2ad448e223b0fc0b6b5747649c3d769cc6bf45737bf97d0ed6" +checksum = "26c6ab57572f7a24a4985830b120de1594465e5d500f24afe89e16b4e833ef68" dependencies = [ "proc-macro2", "quote", @@ -2293,15 +2293,15 @@ dependencies = [ [[package]] name = "wasm-bindgen-shared" -version = "0.2.94" +version = "0.2.95" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a42f6c679374623f295a8623adfe63d9284091245c3504bde47c17a3ce2777d9" +checksum = "65fc09f10666a9f147042251e0dda9c18f166ff7de300607007e96bdebc1068d" [[package]] name = "web-sys" -version = "0.3.71" +version = "0.3.72" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "44188d185b5bdcae1052d08bcbcf9091a5524038d4572cc4f4f2bb9d5554ddd9" +checksum = "f6488b90108c040df0fe62fa815cbdee25124641df01814dd7282749234c6112" dependencies = [ "js-sys", "wasm-bindgen", diff --git a/sources/shared/change-router/Cargo.toml b/sources/shared/change-router/Cargo.toml index c9390dc4..bec36d97 100644 --- a/sources/shared/change-router/Cargo.toml +++ b/sources/shared/change-router/Cargo.toml @@ -9,7 +9,7 @@ edition = "2021" async-trait = "0.1.80" axum = { version = "0.7.5", features = ["macros", "http1", "tokio"] } chrono = "0.4.33" -dapr = "=0.15.1" +dapr = "0.15.1" dapr-macros = {version = "=0.15.1", package = "dapr-macros"} drasi-comms-abstractions = { path = "../../../infrastructure/comms-abstractions" } drasi-comms-dapr = { path = "../../../infrastructure/comms-dapr" } diff --git a/sources/shared/change-router/src/main.rs b/sources/shared/change-router/src/main.rs index ce8e419b..e0710317 100644 --- a/sources/shared/change-router/src/main.rs +++ b/sources/shared/change-router/src/main.rs @@ -30,8 +30,10 @@ use axum::{ use drasi_comms_abstractions::comms::{Headers, Publisher}; use drasi_comms_dapr::comms::DaprHttpPublisher; +use state_manager::{DaprStateManager, StateEntry}; mod change_router_config; +mod state_manager; mod subscriber_map; mod subscribers; @@ -44,38 +46,42 @@ async fn main() -> Result<(), Box> { let node_subscriber = Subscriber::new(); let rel_subscriber = Subscriber::new(); - let addr = "https://127.0.0.1".to_string(); - let mut dapr_client = dapr::Client::::connect(addr) - .await - .expect("Unable to connect to Dapr"); - let query_condition = json!({ "filter": { - "EQ": { "type": "SourceSubscription"} - }, + "EQ": { "type": "SourceSubscription" } + } }); - let response = match dapr_client - .query_state_alpha1(config.clone().subscriber_store, query_condition, None) + let mut metadata = std::collections::HashMap::new(); + + let topic = format!("{}-dispatch", config.source_id.clone()).to_string(); + let dapr_port = match config.dapr_port.parse::() { + Ok(port) => port, + Err(_e) => { + return Err(Box::::from( + "Error parsing Dapr port", + )); + } + }; + let publisher = DaprHttpPublisher::new( + "127.0.0.1".to_string(), + dapr_port, + config.pubsub_name.clone(), + topic, + ); + let state_manager = DaprStateManager::new("127.0.0.1", dapr_port, &config.subscriber_store); + + metadata.insert("contentType".to_string(), "application/json".to_string()); + let response = match state_manager + .query_state(query_condition, Some(metadata)) .await { - Ok(response) => response.results, + Ok(response) => response, Err(e) => { - log::error!("Error querying state: {:?}", e); + log::error!("Error querying the Dapr state store: {:?}", e); vec![] } }; - - for sub in response { - let data: Value = match serde_json::from_slice(&sub.data) { - Ok(data) => data, - Err(e) => { - log::error!( - "Error parsing the response from the Dapr state query: {:?}", - e - ); - continue; - } - }; + for data in response { // if the data is corrupt for this subscription, this will cause a panic and stop loading all the others... we should probably log an error but continue to load the rest of the subscriptions let node_labels: Vec<&str> = match data["nodeLabels"].as_array() { Some(labels) => labels @@ -149,27 +155,12 @@ async fn main() -> Result<(), Box> { rel_subscriber.get_label_map() ); - let topic = format!("{}-dispatch", config.source_id.clone()).to_string(); - let dapr_port = match config.dapr_port.parse::() { - Ok(port) => port, - Err(_e) => { - return Err(Box::::from( - "Error parsing Dapr port", - )); - } - }; - let publisher = DaprHttpPublisher::new( - "127.0.0.1".to_string(), - dapr_port, - config.pubsub_name.clone(), - topic, - ); - let shared_state = Arc::new(AppState { node_subscriber, rel_subscriber, config: config.clone(), publisher, + state_manager, }); let subscriber_server = Router::new() .route("/dapr/subscribe", get(subscribe)) @@ -196,11 +187,11 @@ struct AppState { rel_subscriber: Subscriber, config: ChangeRouterConfig, publisher: DaprHttpPublisher, + state_manager: DaprStateManager, } async fn subscribe() -> impl IntoResponse { let config = ChangeRouterConfig::from_env(); - // just do a json that is a list of subscriptions let subscriptions = vec![json! { { "pubsubname": config.pubsub_name.clone(), @@ -231,6 +222,7 @@ async fn receive( let json_data = body["data"].clone(); let publisher = &state.publisher; + let state_manager = &state.state_manager; match process_changes( publisher, json_data, @@ -238,6 +230,7 @@ async fn receive( node_subscriber, rel_subscriber, trace_parent, + state_manager, ) .await { @@ -260,6 +253,7 @@ async fn process_changes( node_subscriber: &Subscriber, rel_subscriber: &Subscriber, traceparent: String, + state_manager: &DaprStateManager, ) -> Result<(), Box> { if !changes.is_array() { return Err(Box::::from( @@ -277,6 +271,7 @@ async fn process_changes( ); debug!("ChangeEvent: {}", change); + // Bootstrap if change["payload"]["source"]["db"] == "Drasi" { if change["payload"]["source"]["table"] == "SourceSubscription" { if change["op"] == "i" { @@ -357,7 +352,6 @@ async fn process_changes( } ); - // let mut dapr_client = dapr_client.clone(); let source_subscription_value = json!({ "type": "SourceSubscription", "queryId": change["payload"]["after"]["queryId"], @@ -365,11 +359,31 @@ async fn process_changes( "nodeLabels": change["payload"]["after"]["nodeLabels"], "relLabels": change["payload"]["after"]["relLabels"] }); - let mut headers = std::collections::HashMap::new(); - headers.insert("traceparent".to_string(), traceparent.clone()); - let headers = Headers::new(headers); - match publisher.publish(source_subscription_value, headers).await { - Ok(_) => {} + + let state_key = format!( + "SourceSubscription-{}-{}", + match change["payload"]["after"]["queryNodeId"].as_str() { + Some(query_node_id) => query_node_id, + None => + return Err(Box::::from( + "Error loading queryNodeId from the ChangeEvent" + )), + }, + match change["payload"]["after"]["queryId"].as_str() { + Some(query_id) => query_id, + None => + return Err(Box::::from( + "Error loading queryId from the ChangeEvent" + )), + } + ); + + // combine statekey and source_subscription_value into a json + // where the key is the state key and the value is the source subscription value + let states = vec![StateEntry::new(&state_key, source_subscription_value)]; + + match state_manager.save_state(states).await { + Ok(_) => info!("Saved SourceSubscription to state store"), Err(e) => { return Err(e); } @@ -494,7 +508,7 @@ async fn process_changes( let headers = Headers::new(headers); match publisher.publish(change_dispatch_event, headers).await { Ok(_) => { - log::info!("published event to topic: {}", publish_topic); + info!("published event to topic: {}", publish_topic); } Err(e) => { return Err(e); diff --git a/sources/shared/change-router/src/state_manager.rs b/sources/shared/change-router/src/state_manager.rs new file mode 100644 index 00000000..11e028d9 --- /dev/null +++ b/sources/shared/change-router/src/state_manager.rs @@ -0,0 +1,114 @@ +use serde::{Deserialize, Serialize}; +use serde_json::Value; +use std::collections::HashMap; + +pub struct DaprStateManager { + client: reqwest::Client, + dapr_host: String, + dapr_port: u16, + store_name: String, +} + +// Used to validate the state entry objects +#[derive(Deserialize, Serialize)] +pub struct StateEntry { + key: String, + value: Value, +} + +impl StateEntry { + pub fn new(key: &str, value: Value) -> Self { + StateEntry { + key: key.to_string(), + value, + } + } +} + +impl DaprStateManager { + pub fn new(dapr_host: &str, dapr_port: u16, store_name: &str) -> Self { + DaprStateManager { + client: reqwest::Client::new(), + dapr_host: dapr_host.to_string(), + dapr_port, + store_name: store_name.to_string(), + } + } + + pub async fn query_state( + &self, + query_condition: Value, + metadata: Option>, + ) -> Result, Box> { + let addr = "https://127.0.0.1".to_string(); + let mut dapr_client = dapr::Client::::connect(addr) + .await + .expect("Unable to connect to Dapr"); + + let response = match dapr_client + .query_state_alpha1(&self.store_name, query_condition, metadata) + .await + { + Ok(response) => response.results, + Err(e) => { + log::error!("Error querying the Dapr state store: {:?}", e); + vec![] + } + }; + + // for each item in response, serialize the data field in json + let mut result = vec![]; + for item in response { + let data: Value = match serde_json::from_slice(&item.data) { + Ok(data) => data, + Err(e) => { + log::error!( + "Error parsing the response from the Dapr state query: {:?}", + e + ); + continue; + } + }; + result.push(data); + } + + Ok(result) + } + + pub async fn save_state( + &self, + state_entry: Vec, + ) -> Result<(), Box> { + let url = format!( + "http://{}:{}/v1.0/state/{}?metadata.contentType=application/json", + self.dapr_host, self.dapr_port, self.store_name + ); + + let mut request_headers = reqwest::header::HeaderMap::new(); + request_headers.insert("Content-Type", "application/json".parse().unwrap()); + + let response = self + .client + .post(url) + .headers(request_headers) + .json(&state_entry) + .send() + .await; + + match response { + Ok(resp) => { + if resp.status().is_success() { + Ok(()) + } else { + let error_message = format!( + "State save request failed with status: {} and body: {}", + resp.status(), + resp.text().await.unwrap_or_default() + ); + Err(Box::from(error_message)) + } + } + Err(e) => Err(Box::new(e)), + } + } +}