| 
 | 1 | +use alloy_signer_local::PrivateKeySigner;  | 
 | 2 | +use rdkafka::ClientConfig;  | 
 | 3 | +use rdkafka::producer::{FutureProducer, FutureRecord};  | 
 | 4 | +use std::sync::{Arc, Mutex};  | 
 | 5 | +use std::time::Duration;  | 
 | 6 | +use testcontainers::runners::AsyncRunner;  | 
 | 7 | +use testcontainers_modules::{kafka, kafka::Kafka};  | 
 | 8 | +use tips_audit::BundleEvent;  | 
 | 9 | +use tips_bundle_pool::{BundleStore, InMemoryBundlePool, KafkaBundleSource};  | 
 | 10 | +use tips_core::{  | 
 | 11 | +    BundleWithMetadata,  | 
 | 12 | +    test_utils::{create_test_bundle, create_transaction},  | 
 | 13 | +};  | 
 | 14 | +use tokio::sync::mpsc;  | 
 | 15 | + | 
 | 16 | +#[tokio::test]  | 
 | 17 | +async fn test_kafka_bundle_source_to_pool_integration() -> Result<(), Box<dyn std::error::Error>> {  | 
 | 18 | +    let kafka_container = Kafka::default().start().await?;  | 
 | 19 | +    let bootstrap_servers = format!(  | 
 | 20 | +        "127.0.0.1:{}",  | 
 | 21 | +        kafka_container  | 
 | 22 | +            .get_host_port_ipv4(kafka::KAFKA_PORT)  | 
 | 23 | +            .await?  | 
 | 24 | +    );  | 
 | 25 | + | 
 | 26 | +    let topic = "test-bundles";  | 
 | 27 | + | 
 | 28 | +    let kafka_producer = ClientConfig::new()  | 
 | 29 | +        .set("bootstrap.servers", &bootstrap_servers)  | 
 | 30 | +        .set("message.timeout.ms", "5000")  | 
 | 31 | +        .create::<FutureProducer>()?;  | 
 | 32 | + | 
 | 33 | + | 
 | 34 | +    let (bundle_tx, mut bundle_rx) = mpsc::unbounded_channel::<BundleWithMetadata>();  | 
 | 35 | +    let mut kafka_consumer_config = ClientConfig::new();  | 
 | 36 | +    kafka_consumer_config  | 
 | 37 | +        .set("group.id", "bundle-pool-test-source")  | 
 | 38 | +        .set("bootstrap.servers", &bootstrap_servers)  | 
 | 39 | +        .set("session.timeout.ms", "6000")  | 
 | 40 | +        .set("enable.auto.commit", "false")  | 
 | 41 | +        .set("auto.offset.reset", "earliest");  | 
 | 42 | + | 
 | 43 | +    let kafka_source = KafkaBundleSource::new(kafka_consumer_config, topic.to_string(), bundle_tx)?;  | 
 | 44 | +    tokio::spawn(async move {  | 
 | 45 | +        kafka_source.run().await.expect("Kafka source failed");  | 
 | 46 | +    });  | 
 | 47 | + | 
 | 48 | +    let (audit_tx, _audit_rx) = mpsc::unbounded_channel::<BundleEvent>();  | 
 | 49 | +    let pool = Arc::new(Mutex::new(InMemoryBundlePool::new(  | 
 | 50 | +        audit_tx,  | 
 | 51 | +        "test-builder".to_string(),  | 
 | 52 | +    )));  | 
 | 53 | + | 
 | 54 | +    let pool_clone = pool.clone();  | 
 | 55 | +    tokio::spawn(async move {  | 
 | 56 | +        while let Some(bundle) = bundle_rx.recv().await {  | 
 | 57 | +            pool_clone.lock().unwrap().add_bundle(bundle);  | 
 | 58 | +        }  | 
 | 59 | +    });  | 
 | 60 | + | 
 | 61 | +    let alice = PrivateKeySigner::random();  | 
 | 62 | +    let bob = PrivateKeySigner::random();  | 
 | 63 | +    let tx1 = create_transaction(alice.clone(), 1, bob.address());  | 
 | 64 | +    let test_bundle = create_test_bundle(vec![tx1], Some(100), None, None);  | 
 | 65 | +    let test_bundle_uuid = *test_bundle.uuid();  | 
 | 66 | + | 
 | 67 | +    let bundle_payload = serde_json::to_string(test_bundle.bundle())?;  | 
 | 68 | + | 
 | 69 | +    kafka_producer  | 
 | 70 | +        .send(  | 
 | 71 | +            FutureRecord::to(topic)  | 
 | 72 | +                .payload(&bundle_payload)  | 
 | 73 | +                .key("test-key"),  | 
 | 74 | +            Duration::from_secs(5),  | 
 | 75 | +        )  | 
 | 76 | +        .await  | 
 | 77 | +        .map_err(|(e, _)| e)?;  | 
 | 78 | + | 
 | 79 | +    let mut counter = 0;  | 
 | 80 | +    loop {  | 
 | 81 | +        counter += 1;  | 
 | 82 | +        if counter > 10 {  | 
 | 83 | +            panic!("Bundle was not added to pool within timeout");  | 
 | 84 | +        }  | 
 | 85 | + | 
 | 86 | +        tokio::time::sleep(Duration::from_secs(1)).await;  | 
 | 87 | + | 
 | 88 | +        let bundles = pool.lock().unwrap().get_bundles();  | 
 | 89 | +        if bundles.is_empty() {  | 
 | 90 | +            continue;  | 
 | 91 | +        }  | 
 | 92 | + | 
 | 93 | +        assert_eq!(bundles.len(), 1);  | 
 | 94 | +        assert_eq!(*bundles[0].uuid(), test_bundle_uuid);  | 
 | 95 | +        break;  | 
 | 96 | +    }  | 
 | 97 | + | 
 | 98 | +    Ok(())  | 
 | 99 | +}  | 
0 commit comments