Skip to content

Commit

Permalink
chore: Gracefully handle errors on RPC (#87)
Browse files Browse the repository at this point in the history
  • Loading branch information
gonzalezzfelipe authored Aug 20, 2024
1 parent c6ac5e7 commit d741ef2
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 27 deletions.
3 changes: 2 additions & 1 deletion bootstrap/rpc/config.tf
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ resource "kubernetes_config_map_v1" "fabric_rpc_config" {
"${path.module}/rpc.toml.tftpl",
{
port = local.port,
db_path = "/var/cache/cache.db",
// If we change the consumer, we must rebuild the cache.
db_path = "/var/cache/${var.consumer_name}.db",
broker_urls = var.broker_urls
consumer_name = var.consumer_name
kafka_username = var.kafka_username
Expand Down
9 changes: 8 additions & 1 deletion src/domain/project/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,21 @@ impl TryFrom<ProjectCreated> for Project {
pub enum ProjectStatus {
Active,
Deleted,
PaymentMethodFailed,
}
impl FromStr for ProjectStatus {
type Err = Error;

fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
match s {
"active" => Ok(ProjectStatus::Active),
"dcu-consumed" => Ok(ProjectStatus::Active),
"pm-failed" => Ok(ProjectStatus::PaymentMethodFailed),
"deleted" => Ok(ProjectStatus::Deleted),
_ => Err(Error::Unexpected("project status not supported".into())),
_ => Err(Error::Unexpected(format!(
"project status not supported: {}",
s
))),
}
}
}
Expand All @@ -58,6 +64,7 @@ impl Display for ProjectStatus {
match self {
ProjectStatus::Active => write!(f, "active"),
ProjectStatus::Deleted => write!(f, "deleted"),
ProjectStatus::PaymentMethodFailed => write!(f, "pm-failed"),
}
}
}
Expand Down
62 changes: 37 additions & 25 deletions src/drivers/cache/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use anyhow::Result;
use rdkafka::{
consumer::{CommitMode, Consumer, StreamConsumer},
ClientConfig,
ClientConfig, Message,
};
use std::{borrow::Borrow, collections::HashMap, path::Path, sync::Arc};
use tracing::{error, info};
Expand Down Expand Up @@ -29,31 +29,43 @@ pub async fn subscribe(config: CacheConfig) -> Result<()> {

info!("Subscriber running");
loop {
match consumer.recv().await {
Err(error) => error!(?error, "kafka subscribe error"),
Ok(message) => match message.borrow().try_into() {
Ok(event) => {
match event {
Event::ProjectCreated(evt) => {
project::cache::create(project_cache.clone(), evt).await?;
}
Event::ProjectSecretCreated(evt) => {
project::cache::create_secret(project_cache.clone(), evt).await?;
}
Event::ResourceCreated(evt) => {
resource::cache::create(resource_cache.clone(), evt).await?
}
Event::ResourceDeleted(evt) => {
resource::cache::delete(resource_cache.clone(), evt).await?
}
};
consumer.commit_message(&message, CommitMode::Async)?;
}
Err(error) => {
error!(?error, "fail to convert message to event");
consumer.commit_message(&message, CommitMode::Async)?;
// If we fail to consume from Kafka, we need a restart.
let message = consumer
.recv()
.await
.expect("Failed to consume from Kafka, restarting");

info!("Consuming from kafka, current offset: {}", message.offset());
match message.borrow().try_into() {
Ok(event) => {
let event_application = match &event {
Event::ProjectCreated(evt) => {
project::cache::create(project_cache.clone(), evt.clone()).await
}
Event::ProjectSecretCreated(evt) => {
project::cache::create_secret(project_cache.clone(), evt.clone()).await
}
Event::ResourceCreated(evt) => {
resource::cache::create(resource_cache.clone(), evt.clone()).await
}
Event::ResourceDeleted(evt) => {
resource::cache::delete(resource_cache.clone(), evt.clone()).await
}
};

match event_application {
Ok(_) => info!("Succesfully handled event {:?}", event),
Err(err) => error!(
error = err.to_string(),
"Failed to handle event: {:?}", event
),
}
},
consumer.commit_message(&message, CommitMode::Async)?;
}
Err(error) => {
error!(?error, "fail to convert message to event");
consumer.commit_message(&message, CommitMode::Async)?;
}
};
}
}
Expand Down

0 comments on commit d741ef2

Please sign in to comment.