Skip to content

Commit b2b409a

Browse files
committed
Add helper methods to connect queues/publishers
1 parent f6fe7fc commit b2b409a

File tree

7 files changed

+81
-27
lines changed

7 files changed

+81
-27
lines changed

crates/audit/src/archiver.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use std::time::Duration;
55
use tokio::time::sleep;
66
use tracing::{error, info};
77

8-
pub struct KafkaMempoolArchiver<R, W>
8+
pub struct KafkaAuditArchiver<R, W>
99
where
1010
R: EventReader,
1111
W: EventWriter,
@@ -14,7 +14,7 @@ where
1414
writer: W,
1515
}
1616

17-
impl<R, W> KafkaMempoolArchiver<R, W>
17+
impl<R, W> KafkaAuditArchiver<R, W>
1818
where
1919
R: EventReader,
2020
W: EventWriter,

crates/audit/src/bin/main.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use aws_sdk_s3::{Client as S3Client, config::Builder as S3ConfigBuilder};
55
use clap::{Parser, ValueEnum};
66
use rdkafka::consumer::Consumer;
77
use tips_audit::{
8-
KafkaAuditLogReader, KafkaMempoolArchiver, S3EventReaderWriter, create_kafka_consumer,
8+
KafkaAuditArchiver, KafkaAuditLogReader, S3EventReaderWriter, create_kafka_consumer,
99
};
1010
use tips_core::logger::init_logger;
1111
use tracing::info;
@@ -71,7 +71,7 @@ async fn main() -> Result<()> {
7171
let s3_bucket = args.s3_bucket.clone();
7272
let writer = S3EventReaderWriter::new(s3_client, s3_bucket);
7373

74-
let mut archiver = KafkaMempoolArchiver::new(reader, writer);
74+
let mut archiver = KafkaAuditArchiver::new(reader, writer);
7575

7676
info!("Audit archiver initialized, starting main loop");
7777

crates/audit/src/lib.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,25 @@ pub mod reader;
44
pub mod storage;
55
pub mod types;
66

7+
use tokio::sync::mpsc;
8+
use tracing::error;
9+
710
pub use archiver::*;
811
pub use publisher::*;
912
pub use reader::*;
1013
pub use storage::*;
1114
pub use types::*;
15+
16+
pub fn connect_audit_to_publisher<P>(event_rx: mpsc::UnboundedReceiver<BundleEvent>, publisher: P)
17+
where
18+
P: BundleEventPublisher + 'static,
19+
{
20+
tokio::spawn(async move {
21+
let mut event_rx = event_rx;
22+
while let Some(event) = event_rx.recv().await {
23+
if let Err(e) = publisher.publish(event).await {
24+
error!(error = %e, "Failed to publish bundle event");
25+
}
26+
}
27+
});
28+
}

crates/audit/tests/integration_tests.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use std::time::Duration;
22
use tips_audit::{
3-
KafkaAuditLogReader, KafkaMempoolArchiver,
3+
KafkaAuditArchiver, KafkaAuditLogReader,
44
publisher::{BundleEventPublisher, KafkaBundleEventPublisher},
55
storage::{BundleEventS3Reader, S3EventReaderWriter},
66
types::{BundleEvent, DropReason},
@@ -37,7 +37,7 @@ async fn test_kafka_publisher_s3_archiver_integration()
3737
publisher.publish(event.clone()).await?;
3838
}
3939

40-
let mut consumer = KafkaMempoolArchiver::new(
40+
let mut consumer = KafkaAuditArchiver::new(
4141
KafkaAuditLogReader::new(harness.kafka_consumer, topic.to_string())?,
4242
s3_writer.clone(),
4343
);

crates/bundle-pool/src/lib.rs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,35 @@
11
pub mod pool;
22
pub mod source;
33

4+
use source::BundleSource;
5+
use std::sync::{Arc, Mutex};
6+
use tokio::sync::mpsc;
7+
use tracing::error;
8+
49
pub use pool::{BundleStore, InMemoryBundlePool};
510
pub use source::KafkaBundleSource;
611
pub use tips_core::{Bundle, BundleHash, BundleWithMetadata, CancelBundle};
12+
13+
pub fn connect_sources_to_pool<S, P>(
14+
sources: Vec<S>,
15+
bundle_rx: mpsc::UnboundedReceiver<BundleWithMetadata>,
16+
pool: Arc<Mutex<P>>,
17+
) where
18+
S: BundleSource + Send + 'static,
19+
P: BundleStore + Send + 'static,
20+
{
21+
for source in sources {
22+
tokio::spawn(async move {
23+
if let Err(e) = source.run().await {
24+
error!(error = %e, "Bundle source failed");
25+
}
26+
});
27+
}
28+
29+
tokio::spawn(async move {
30+
let mut bundle_rx = bundle_rx;
31+
while let Some(bundle) = bundle_rx.recv().await {
32+
pool.lock().unwrap().add_bundle(bundle);
33+
}
34+
});
35+
}

crates/bundle-pool/src/source.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,16 @@
11
use anyhow::Result;
2+
use async_trait::async_trait;
23
use rdkafka::consumer::{Consumer, StreamConsumer};
34
use rdkafka::{ClientConfig, Message};
45
use tips_core::{Bundle, BundleWithMetadata};
56
use tokio::sync::mpsc;
67
use tracing::{debug, error};
78

9+
#[async_trait]
10+
pub trait BundleSource {
11+
async fn run(&self) -> Result<()>;
12+
}
13+
814
pub struct KafkaBundleSource {
915
queue_consumer: StreamConsumer,
1016
publisher: mpsc::UnboundedSender<BundleWithMetadata>,
@@ -23,8 +29,11 @@ impl KafkaBundleSource {
2329
publisher,
2430
})
2531
}
32+
}
2633

27-
pub async fn run(&self) -> Result<()> {
34+
#[async_trait]
35+
impl BundleSource for KafkaBundleSource {
36+
async fn run(&self) -> Result<()> {
2837
loop {
2938
match self.queue_consumer.recv().await {
3039
Ok(message) => {

crates/bundle-pool/tests/integration_tests.rs

Lines changed: 19 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,20 @@ use rdkafka::producer::{FutureProducer, FutureRecord};
44
use std::sync::{Arc, Mutex};
55
use std::time::Duration;
66
use testcontainers::runners::AsyncRunner;
7+
use testcontainers_modules::testcontainers::ContainerAsync;
78
use testcontainers_modules::{kafka, kafka::Kafka};
89
use tips_audit::BundleEvent;
9-
use tips_bundle_pool::{BundleStore, InMemoryBundlePool, KafkaBundleSource};
10+
use tips_bundle_pool::{
11+
BundleStore, InMemoryBundlePool, KafkaBundleSource, connect_sources_to_pool,
12+
};
1013
use tips_core::{
1114
BundleWithMetadata,
1215
test_utils::{create_test_bundle, create_transaction},
1316
};
1417
use tokio::sync::mpsc;
1518

16-
#[tokio::test]
17-
async fn test_kafka_bundle_source_to_pool_integration() -> Result<(), Box<dyn std::error::Error>> {
19+
async fn setup_kafka()
20+
-> Result<(ContainerAsync<Kafka>, FutureProducer, ClientConfig), Box<dyn std::error::Error>> {
1821
let kafka_container = Kafka::default().start().await?;
1922
let bootstrap_servers = format!(
2023
"127.0.0.1:{}",
@@ -23,15 +26,11 @@ async fn test_kafka_bundle_source_to_pool_integration() -> Result<(), Box<dyn st
2326
.await?
2427
);
2528

26-
let topic = "test-bundles";
27-
2829
let kafka_producer = ClientConfig::new()
2930
.set("bootstrap.servers", &bootstrap_servers)
3031
.set("message.timeout.ms", "5000")
3132
.create::<FutureProducer>()?;
3233

33-
34-
let (bundle_tx, mut bundle_rx) = mpsc::unbounded_channel::<BundleWithMetadata>();
3534
let mut kafka_consumer_config = ClientConfig::new();
3635
kafka_consumer_config
3736
.set("group.id", "bundle-pool-test-source")
@@ -40,23 +39,25 @@ async fn test_kafka_bundle_source_to_pool_integration() -> Result<(), Box<dyn st
4039
.set("enable.auto.commit", "false")
4140
.set("auto.offset.reset", "earliest");
4241

42+
Ok((kafka_container, kafka_producer, kafka_consumer_config))
43+
}
44+
45+
#[tokio::test]
46+
async fn test_kafka_bundle_source_to_pool_integration() -> Result<(), Box<dyn std::error::Error>> {
47+
let topic = "test-bundles";
48+
let (_kafka_container, kafka_producer, kafka_consumer_config) = setup_kafka().await?;
49+
50+
let (bundle_tx, bundle_rx) = mpsc::unbounded_channel::<BundleWithMetadata>();
51+
4352
let kafka_source = KafkaBundleSource::new(kafka_consumer_config, topic.to_string(), bundle_tx)?;
44-
tokio::spawn(async move {
45-
kafka_source.run().await.expect("Kafka source failed");
46-
});
4753

4854
let (audit_tx, _audit_rx) = mpsc::unbounded_channel::<BundleEvent>();
4955
let pool = Arc::new(Mutex::new(InMemoryBundlePool::new(
5056
audit_tx,
5157
"test-builder".to_string(),
5258
)));
5359

54-
let pool_clone = pool.clone();
55-
tokio::spawn(async move {
56-
while let Some(bundle) = bundle_rx.recv().await {
57-
pool_clone.lock().unwrap().add_bundle(bundle);
58-
}
59-
});
60+
connect_sources_to_pool(vec![kafka_source], bundle_rx, pool.clone());
6061

6162
let alice = PrivateKeySigner::random();
6263
let bob = PrivateKeySigner::random();
@@ -79,11 +80,9 @@ async fn test_kafka_bundle_source_to_pool_integration() -> Result<(), Box<dyn st
7980
let mut counter = 0;
8081
loop {
8182
counter += 1;
82-
if counter > 10 {
83-
panic!("Bundle was not added to pool within timeout");
84-
}
83+
assert!(counter < 10);
8584

86-
tokio::time::sleep(Duration::from_secs(1)).await;
85+
tokio::time::sleep(Duration::from_millis(500)).await;
8786

8887
let bundles = pool.lock().unwrap().get_bundles();
8988
if bundles.is_empty() {

0 commit comments

Comments
 (0)