diff --git a/bootstrap/rpc/config.tf b/bootstrap/rpc/config.tf index 5e798cb..e0ea24e 100644 --- a/bootstrap/rpc/config.tf +++ b/bootstrap/rpc/config.tf @@ -9,7 +9,8 @@ resource "kubernetes_config_map_v1" "fabric_rpc_config" { "${path.module}/rpc.toml.tftpl", { port = local.port, - db_path = "/var/cache/cache.db", + // If we change the consumer, we must rebuild the cache. + db_path = "/var/cache/${var.consumer_name}.db", broker_urls = var.broker_urls consumer_name = var.consumer_name kafka_username = var.kafka_username diff --git a/src/domain/project/mod.rs b/src/domain/project/mod.rs index bf0c751..0e58789 100644 --- a/src/domain/project/mod.rs +++ b/src/domain/project/mod.rs @@ -41,6 +41,7 @@ impl TryFrom for Project { pub enum ProjectStatus { Active, Deleted, + PaymentMethodFailed, } impl FromStr for ProjectStatus { type Err = Error; @@ -48,8 +49,13 @@ impl FromStr for ProjectStatus { fn from_str(s: &str) -> std::result::Result { match s { "active" => Ok(ProjectStatus::Active), + "dcu-consumed" => Ok(ProjectStatus::Active), + "pm-failed" => Ok(ProjectStatus::PaymentMethodFailed), "deleted" => Ok(ProjectStatus::Deleted), - _ => Err(Error::Unexpected("project status not supported".into())), + _ => Err(Error::Unexpected(format!( + "project status not supported: {}", + s + ))), } } } @@ -58,6 +64,7 @@ impl Display for ProjectStatus { match self { ProjectStatus::Active => write!(f, "active"), ProjectStatus::Deleted => write!(f, "deleted"), + ProjectStatus::PaymentMethodFailed => write!(f, "pm-failed"), } } } diff --git a/src/drivers/cache/mod.rs b/src/drivers/cache/mod.rs index 4695e24..e99b543 100644 --- a/src/drivers/cache/mod.rs +++ b/src/drivers/cache/mod.rs @@ -1,7 +1,7 @@ use anyhow::Result; use rdkafka::{ consumer::{CommitMode, Consumer, StreamConsumer}, - ClientConfig, + ClientConfig, Message, }; use std::{borrow::Borrow, collections::HashMap, path::Path, sync::Arc}; use tracing::{error, info}; @@ -29,31 +29,43 @@ pub async fn subscribe(config: CacheConfig) -> Result<()> { info!("Subscriber running"); loop { - match consumer.recv().await { - Err(error) => error!(?error, "kafka subscribe error"), - Ok(message) => match message.borrow().try_into() { - Ok(event) => { - match event { - Event::ProjectCreated(evt) => { - project::cache::create(project_cache.clone(), evt).await?; - } - Event::ProjectSecretCreated(evt) => { - project::cache::create_secret(project_cache.clone(), evt).await?; - } - Event::ResourceCreated(evt) => { - resource::cache::create(resource_cache.clone(), evt).await? - } - Event::ResourceDeleted(evt) => { - resource::cache::delete(resource_cache.clone(), evt).await? - } - }; - consumer.commit_message(&message, CommitMode::Async)?; - } - Err(error) => { - error!(?error, "fail to convert message to event"); - consumer.commit_message(&message, CommitMode::Async)?; + // If we fail to consume from Kafka, we need a restart. + let message = consumer + .recv() + .await + .expect("Failed to consume from Kafka, restarting"); + + info!("Consuming from kafka, current offset: {}", message.offset()); + match message.borrow().try_into() { + Ok(event) => { + let event_application = match &event { + Event::ProjectCreated(evt) => { + project::cache::create(project_cache.clone(), evt.clone()).await + } + Event::ProjectSecretCreated(evt) => { + project::cache::create_secret(project_cache.clone(), evt.clone()).await + } + Event::ResourceCreated(evt) => { + resource::cache::create(resource_cache.clone(), evt.clone()).await + } + Event::ResourceDeleted(evt) => { + resource::cache::delete(resource_cache.clone(), evt.clone()).await + } + }; + + match event_application { + Ok(_) => info!("Succesfully handled event {:?}", event), + Err(err) => error!( + error = err.to_string(), + "Failed to handle event: {:?}", event + ), } - }, + consumer.commit_message(&message, CommitMode::Async)?; + } + Err(error) => { + error!(?error, "fail to convert message to event"); + consumer.commit_message(&message, CommitMode::Async)?; + } }; } }