Skip to content

Commit 4306216

Browse files
authored
chore: kafka bundle consumer / clean up (#41)
* chore: kafka bundle consumer / clean up * Add helper methods to connect queues/publishers
1 parent 6c290d9 commit 4306216

File tree

15 files changed

+314
-103
lines changed

15 files changed

+314
-103
lines changed

Cargo.lock

Lines changed: 6 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/audit/src/archiver.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use std::time::Duration;
55
use tokio::time::sleep;
66
use tracing::{error, info};
77

8-
pub struct KafkaMempoolArchiver<R, W>
8+
pub struct KafkaAuditArchiver<R, W>
99
where
1010
R: EventReader,
1111
W: EventWriter,
@@ -14,7 +14,7 @@ where
1414
writer: W,
1515
}
1616

17-
impl<R, W> KafkaMempoolArchiver<R, W>
17+
impl<R, W> KafkaAuditArchiver<R, W>
1818
where
1919
R: EventReader,
2020
W: EventWriter,

crates/audit/src/bin/main.rs

Lines changed: 6 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,10 @@ use aws_sdk_s3::{Client as S3Client, config::Builder as S3ConfigBuilder};
55
use clap::{Parser, ValueEnum};
66
use rdkafka::consumer::Consumer;
77
use tips_audit::{
8-
KafkaMempoolArchiver, KafkaMempoolReader, S3EventReaderWriter, create_kafka_consumer,
8+
KafkaAuditArchiver, KafkaAuditLogReader, S3EventReaderWriter, create_kafka_consumer,
99
};
10-
use tracing::{info, warn};
11-
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
10+
use tips_core::logger::init_logger;
11+
use tracing::info;
1212

1313
#[derive(Debug, Clone, ValueEnum)]
1414
enum S3ConfigType {
@@ -53,28 +53,7 @@ async fn main() -> Result<()> {
5353

5454
let args = Args::parse();
5555

56-
let log_level = match args.log_level.to_lowercase().as_str() {
57-
"trace" => tracing::Level::TRACE,
58-
"debug" => tracing::Level::DEBUG,
59-
"info" => tracing::Level::INFO,
60-
"warn" => tracing::Level::WARN,
61-
"error" => tracing::Level::ERROR,
62-
_ => {
63-
warn!(
64-
"Invalid log level '{}', defaulting to 'info'",
65-
args.log_level
66-
);
67-
tracing::Level::INFO
68-
}
69-
};
70-
71-
tracing_subscriber::registry()
72-
.with(
73-
tracing_subscriber::EnvFilter::try_from_default_env()
74-
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new(log_level.to_string())),
75-
)
76-
.with(tracing_subscriber::fmt::layer())
77-
.init();
56+
init_logger(&args.log_level);
7857

7958
info!(
8059
kafka_properties_file = %args.kafka_properties_file,
@@ -86,13 +65,13 @@ async fn main() -> Result<()> {
8665
let consumer = create_kafka_consumer(&args.kafka_properties_file)?;
8766
consumer.subscribe(&[&args.kafka_topic])?;
8867

89-
let reader = KafkaMempoolReader::new(consumer, args.kafka_topic.clone())?;
68+
let reader = KafkaAuditLogReader::new(consumer, args.kafka_topic.clone())?;
9069

9170
let s3_client = create_s3_client(&args).await?;
9271
let s3_bucket = args.s3_bucket.clone();
9372
let writer = S3EventReaderWriter::new(s3_client, s3_bucket);
9473

95-
let mut archiver = KafkaMempoolArchiver::new(reader, writer);
74+
let mut archiver = KafkaAuditArchiver::new(reader, writer);
9675

9776
info!("Audit archiver initialized, starting main loop");
9877

crates/audit/src/lib.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,25 @@ pub mod reader;
44
pub mod storage;
55
pub mod types;
66

7+
use tokio::sync::mpsc;
8+
use tracing::error;
9+
710
pub use archiver::*;
811
pub use publisher::*;
912
pub use reader::*;
1013
pub use storage::*;
1114
pub use types::*;
15+
16+
pub fn connect_audit_to_publisher<P>(event_rx: mpsc::UnboundedReceiver<BundleEvent>, publisher: P)
17+
where
18+
P: BundleEventPublisher + 'static,
19+
{
20+
tokio::spawn(async move {
21+
let mut event_rx = event_rx;
22+
while let Some(event) = event_rx.recv().await {
23+
if let Err(e) = publisher.publish(event).await {
24+
error!(error = %e, "Failed to publish bundle event");
25+
}
26+
}
27+
});
28+
}

crates/audit/src/reader.rs

Lines changed: 8 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -7,36 +7,18 @@ use rdkafka::{
77
consumer::{Consumer, StreamConsumer},
88
message::Message,
99
};
10-
use std::fs;
1110
use std::time::{Duration, SystemTime, UNIX_EPOCH};
11+
use tips_core::kafka::load_kafka_config_from_file;
1212
use tokio::time::sleep;
13-
use tracing::{debug, error, info};
13+
use tracing::{debug, error};
1414

1515
pub fn create_kafka_consumer(kafka_properties_file: &str) -> Result<StreamConsumer> {
16-
let client_config = load_kafka_config_from_file(kafka_properties_file)?;
16+
let client_config =
17+
ClientConfig::from_iter(load_kafka_config_from_file(kafka_properties_file)?);
1718
let consumer: StreamConsumer = client_config.create()?;
1819
Ok(consumer)
1920
}
2021

21-
fn load_kafka_config_from_file(properties_file_path: &str) -> Result<ClientConfig> {
22-
let kafka_properties = fs::read_to_string(properties_file_path)?;
23-
info!("Kafka properties:\n{}", kafka_properties);
24-
25-
let mut client_config = ClientConfig::new();
26-
27-
for line in kafka_properties.lines() {
28-
let line = line.trim();
29-
if line.is_empty() || line.starts_with('#') {
30-
continue;
31-
}
32-
if let Some((key, value)) = line.split_once('=') {
33-
client_config.set(key.trim(), value.trim());
34-
}
35-
}
36-
37-
Ok(client_config)
38-
}
39-
4022
pub fn assign_topic_partition(consumer: &StreamConsumer, topic: &str) -> Result<()> {
4123
let mut tpl = TopicPartitionList::new();
4224
tpl.add_partition(topic, 0);
@@ -57,14 +39,14 @@ pub trait EventReader {
5739
async fn commit(&mut self) -> Result<()>;
5840
}
5941

60-
pub struct KafkaMempoolReader {
42+
pub struct KafkaAuditLogReader {
6143
consumer: StreamConsumer,
6244
topic: String,
6345
last_message_offset: Option<i64>,
6446
last_message_partition: Option<i32>,
6547
}
6648

67-
impl KafkaMempoolReader {
49+
impl KafkaAuditLogReader {
6850
pub fn new(consumer: StreamConsumer, topic: String) -> Result<Self> {
6951
consumer.subscribe(&[&topic])?;
7052
Ok(Self {
@@ -77,7 +59,7 @@ impl KafkaMempoolReader {
7759
}
7860

7961
#[async_trait]
80-
impl EventReader for KafkaMempoolReader {
62+
impl EventReader for KafkaAuditLogReader {
8163
async fn read_event(&mut self) -> Result<Event> {
8264
match self.consumer.recv().await {
8365
Ok(message) => {
@@ -143,7 +125,7 @@ impl EventReader for KafkaMempoolReader {
143125
}
144126
}
145127

146-
impl KafkaMempoolReader {
128+
impl KafkaAuditLogReader {
147129
pub fn topic(&self) -> &str {
148130
&self.topic
149131
}

crates/audit/tests/integration_tests.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use std::time::Duration;
22
use tips_audit::{
3-
KafkaMempoolArchiver, KafkaMempoolReader,
3+
KafkaAuditArchiver, KafkaAuditLogReader,
44
publisher::{BundleEventPublisher, KafkaBundleEventPublisher},
55
storage::{BundleEventS3Reader, S3EventReaderWriter},
66
types::{BundleEvent, DropReason},
@@ -37,8 +37,8 @@ async fn test_kafka_publisher_s3_archiver_integration()
3737
publisher.publish(event.clone()).await?;
3838
}
3939

40-
let mut consumer = KafkaMempoolArchiver::new(
41-
KafkaMempoolReader::new(harness.kafka_consumer, topic.to_string())?,
40+
let mut consumer = KafkaAuditArchiver::new(
41+
KafkaAuditLogReader::new(harness.kafka_consumer, topic.to_string())?,
4242
s3_writer.clone(),
4343
);
4444

crates/bundle-pool/Cargo.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ tracing.workspace = true
1717
tokio.workspace = true
1818
anyhow.workspace = true
1919
async-trait.workspace = true
20+
rdkafka.workspace = true
21+
serde_json.workspace = true
2022

2123
[dev-dependencies]
2224
tips-core = { workspace = true, features = ["test-utils"] }
@@ -26,3 +28,6 @@ alloy-signer = "1.0.41"
2628
alloy-signer-local = "1.0.41"
2729
op-alloy-consensus.workspace = true
2830
op-alloy-rpc-types.workspace = true
31+
testcontainers.workspace = true
32+
testcontainers-modules.workspace = true
33+
serde.workspace = true

crates/bundle-pool/src/lib.rs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,35 @@
11
pub mod pool;
2+
pub mod source;
3+
4+
use source::BundleSource;
5+
use std::sync::{Arc, Mutex};
6+
use tokio::sync::mpsc;
7+
use tracing::error;
28

39
pub use pool::{BundleStore, InMemoryBundlePool};
10+
pub use source::KafkaBundleSource;
411
pub use tips_core::{Bundle, BundleHash, BundleWithMetadata, CancelBundle};
12+
13+
pub fn connect_sources_to_pool<S, P>(
14+
sources: Vec<S>,
15+
bundle_rx: mpsc::UnboundedReceiver<BundleWithMetadata>,
16+
pool: Arc<Mutex<P>>,
17+
) where
18+
S: BundleSource + Send + 'static,
19+
P: BundleStore + Send + 'static,
20+
{
21+
for source in sources {
22+
tokio::spawn(async move {
23+
if let Err(e) = source.run().await {
24+
error!(error = %e, "Bundle source failed");
25+
}
26+
});
27+
}
28+
29+
tokio::spawn(async move {
30+
let mut bundle_rx = bundle_rx;
31+
while let Some(bundle) = bundle_rx.recv().await {
32+
pool.lock().unwrap().add_bundle(bundle);
33+
}
34+
});
35+
}

crates/bundle-pool/src/source.rs

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
use anyhow::Result;
2+
use async_trait::async_trait;
3+
use rdkafka::consumer::{Consumer, StreamConsumer};
4+
use rdkafka::{ClientConfig, Message};
5+
use tips_core::{Bundle, BundleWithMetadata};
6+
use tokio::sync::mpsc;
7+
use tracing::{debug, error};
8+
9+
#[async_trait]
10+
pub trait BundleSource {
11+
async fn run(&self) -> Result<()>;
12+
}
13+
14+
pub struct KafkaBundleSource {
15+
queue_consumer: StreamConsumer,
16+
publisher: mpsc::UnboundedSender<BundleWithMetadata>,
17+
}
18+
19+
impl KafkaBundleSource {
20+
pub fn new(
21+
client_config: ClientConfig,
22+
topic: String,
23+
publisher: mpsc::UnboundedSender<BundleWithMetadata>,
24+
) -> Result<Self> {
25+
let queue_consumer: StreamConsumer = client_config.create()?;
26+
queue_consumer.subscribe(&[topic.as_str()])?;
27+
Ok(Self {
28+
queue_consumer,
29+
publisher,
30+
})
31+
}
32+
}
33+
34+
#[async_trait]
35+
impl BundleSource for KafkaBundleSource {
36+
async fn run(&self) -> Result<()> {
37+
loop {
38+
match self.queue_consumer.recv().await {
39+
Ok(message) => {
40+
let payload = match message.payload() {
41+
Some(p) => p,
42+
None => {
43+
error!("Message has no payload");
44+
continue;
45+
}
46+
};
47+
48+
let bundle: Bundle = match serde_json::from_slice(payload) {
49+
Ok(b) => b,
50+
Err(e) => {
51+
error!(error = %e, "Failed to deserialize bundle");
52+
continue;
53+
}
54+
};
55+
56+
debug!(
57+
bundle = ?bundle,
58+
offset = message.offset(),
59+
partition = message.partition(),
60+
"Received bundle from Kafka"
61+
);
62+
63+
let bundle_with_metadata = match BundleWithMetadata::load(bundle) {
64+
Ok(b) => b,
65+
Err(e) => {
66+
error!(error = %e, "Failed to load bundle");
67+
continue;
68+
}
69+
};
70+
71+
if let Err(e) = self.publisher.send(bundle_with_metadata) {
72+
error!(error = ?e, "Failed to publish bundle to queue");
73+
}
74+
}
75+
Err(e) => {
76+
error!(error = %e, "Error receiving message from Kafka");
77+
}
78+
}
79+
}
80+
}
81+
}

0 commit comments

Comments
 (0)