diff --git a/Cargo.lock b/Cargo.lock index 71b92e7..28fd6ee 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -342,15 +342,6 @@ version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5" -[[package]] -name = "crc32fast" -version = "1.4.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a97769d94ddab943e4510d138150169a2758b5ef3eb191a9ee688de3e23ef7b3" -dependencies = [ - "cfg-if", -] - [[package]] name = "crossbeam-queue" version = "0.3.11" @@ -498,12 +489,12 @@ dependencies = [ "dotenv", "futures", "k8s-openapi", - "kafka", "kube", "mockall", "prost 0.12.6", "protoc-wkt", "rand", + "rdkafka", "serde", "serde_json", "sqlx", @@ -526,16 +517,6 @@ version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" -[[package]] -name = "flate2" -version = "1.0.30" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f54427cfd1c7829e2a139fcefea601bf088ebca651d2bf53ebc600eac295dae" -dependencies = [ - "crc32fast", - "miniz_oxide", -] - [[package]] name = "flume" version = "0.11.0" @@ -553,21 +534,6 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" -[[package]] -name = "foreign-types" -version = "0.3.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" -dependencies = [ - "foreign-types-shared", -] - -[[package]] -name = "foreign-types-shared" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" - [[package]] name = "form_urlencoded" version = "1.2.1" @@ -1120,25 +1086,6 @@ dependencies = [ "serde_json", ] -[[package]] -name = "kafka" -version = "0.10.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2054ba4edcb4dcda4209e138c7e88caf26d4a325b3db76fbdb6ca5eecc23e426" -dependencies = [ - "byteorder", - "crc", - "flate2", - "fnv", - "openssl", - "openssl-sys", - "ref_slice", - "snap", - "thiserror", - "tracing", - "twox-hash", -] - [[package]] name = "kube" version = "0.92.0" @@ -1235,6 +1182,18 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "libz-sys" +version = "1.1.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c15da26e5af7e25c90b37a2d75cdbf940cf4a55316de9d84c679c9b8bfabf82e" +dependencies = [ + "cc", + "libc", + "pkg-config", + "vcpkg", +] + [[package]] name = "linked-hash-map" version = "0.5.6" @@ -1437,45 +1396,40 @@ dependencies = [ ] [[package]] -name = "object" -version = "0.36.0" +name = "num_enum" +version = "0.5.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "576dfe1fc8f9df304abb159d767a29d0476f7750fbf8aa7ad07816004a207434" +checksum = "1f646caf906c20226733ed5b1374287eb97e3c2a5c227ce668c1f2ce20ae57c9" dependencies = [ - "memchr", + "num_enum_derive", ] [[package]] -name = "once_cell" -version = "1.19.0" +name = "num_enum_derive" +version = "0.5.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" +checksum = "dcbff9bc912032c62bf65ef1d5aea88983b420f4f839db1e9b0c281a25c9c799" +dependencies = [ + "proc-macro-crate", + "proc-macro2", + "quote", + "syn 1.0.109", +] [[package]] -name = "openssl" -version = "0.10.64" +name = "object" +version = "0.36.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "95a0481286a310808298130d22dd1fef0fa571e05a8f44ec801801e84b216b1f" +checksum = "576dfe1fc8f9df304abb159d767a29d0476f7750fbf8aa7ad07816004a207434" dependencies = [ - "bitflags 2.5.0", - "cfg-if", - "foreign-types", - "libc", - "once_cell", - "openssl-macros", - "openssl-sys", + "memchr", ] [[package]] -name = "openssl-macros" -version = "0.1.1" +name = "once_cell" +version = "1.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.66", -] +checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" [[package]] name = "openssl-probe" @@ -1483,18 +1437,6 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" -[[package]] -name = "openssl-sys" -version = "0.9.102" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c597637d56fbc83893a35eb0dd04b2b8e7a50c91e64e9493e398b5df4fb45fa2" -dependencies = [ - "cc", - "libc", - "pkg-config", - "vcpkg", -] - [[package]] name = "ordered-float" version = "2.10.1" @@ -1773,6 +1715,16 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "proc-macro-crate" +version = "1.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f4c021e1093a56626774e81216a4ce732a735e5bad4868a03f3ed65ca0c3919" +dependencies = [ + "once_cell", + "toml_edit 0.19.15", +] + [[package]] name = "proc-macro2" version = "1.0.85" @@ -1913,6 +1865,36 @@ dependencies = [ "getrandom", ] +[[package]] +name = "rdkafka" +version = "0.36.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1beea247b9a7600a81d4cc33f659ce1a77e1988323d7d2809c7ed1c21f4c316d" +dependencies = [ + "futures-channel", + "futures-util", + "libc", + "log", + "rdkafka-sys", + "serde", + "serde_derive", + "serde_json", + "slab", + "tokio", +] + +[[package]] +name = "rdkafka-sys" +version = "4.7.0+2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "55e0d2f9ba6253f6ec72385e453294f8618e9e15c2c6aba2a5c01ccf9622d615" +dependencies = [ + "libc", + "libz-sys", + "num_enum", + "pkg-config", +] + [[package]] name = "redox_syscall" version = "0.4.1" @@ -1931,12 +1913,6 @@ dependencies = [ "bitflags 2.5.0", ] -[[package]] -name = "ref_slice" -version = "1.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f4ed1d73fb92eba9b841ba2aef69533a060ccc0d3ec71c90aeda5996d4afb7a9" - [[package]] name = "regex" version = "1.10.5" @@ -2340,12 +2316,6 @@ version = "1.13.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67" -[[package]] -name = "snap" -version = "1.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1b6b67fb9a61334225b5b790716f609cd58395f895b3fe8b328786812a40bc3b" - [[package]] name = "socket2" version = "0.5.7" @@ -2587,12 +2557,6 @@ dependencies = [ "urlencoding", ] -[[package]] -name = "static_assertions" -version = "1.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" - [[package]] name = "stringprep" version = "0.1.5" @@ -2793,7 +2757,7 @@ dependencies = [ "serde", "serde_spanned", "toml_datetime", - "toml_edit", + "toml_edit 0.22.14", ] [[package]] @@ -2805,6 +2769,17 @@ dependencies = [ "serde", ] +[[package]] +name = "toml_edit" +version = "0.19.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b5bb770da30e5cbfde35a2d7b9b8a2c4b8ef89548a7a6aeab5c9a576e3e7421" +dependencies = [ + "indexmap 2.2.6", + "toml_datetime", + "winnow 0.5.40", +] + [[package]] name = "toml_edit" version = "0.22.14" @@ -2815,7 +2790,7 @@ dependencies = [ "serde", "serde_spanned", "toml_datetime", - "winnow", + "winnow 0.6.13", ] [[package]] @@ -2977,17 +2952,6 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" -[[package]] -name = "twox-hash" -version = "1.6.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675" -dependencies = [ - "cfg-if", - "rand", - "static_assertions", -] - [[package]] name = "typenum" version = "1.17.0" @@ -3296,6 +3260,15 @@ version = "0.52.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bec47e5bfd1bff0eeaf6d8b485cc1074891a197ab4225d504cb7a1ab88b02bf0" +[[package]] +name = "winnow" +version = "0.5.40" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f593a95398737aeed53e489c785df13f3618e41dbcd6718c6addbf1395aa6876" +dependencies = [ + "memchr", +] + [[package]] name = "winnow" version = "0.6.13" diff --git a/Cargo.toml b/Cargo.toml index 2c4a349..aa2745e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,7 +12,6 @@ async-trait = "0.1.80" sqlx = { version = "0.7.4", features = ["runtime-tokio-rustls", "sqlite"] } tokio = { version = "1.38.0", features = ["macros", "rt-multi-thread", "time"] } tonic = "0.11.0" -kafka = "0.10.0" serde = { version = "1.0.203", features = ["derive"] } serde_json = "1.0.117" tracing = "0.1.40" @@ -26,6 +25,7 @@ k8s-openapi = { version = "0.22.0", features = ["latest"] } kube = { version = "0.92.0", features = ["client"] } protoc-wkt = "1.0.0" config = { version = "0.14.0", features = ["toml"] } +rdkafka = "0.36.2" [dev-dependencies] mockall = "0.12.1" diff --git a/examples/config.toml b/examples/config.toml new file mode 100644 index 0000000..783caaf --- /dev/null +++ b/examples/config.toml @@ -0,0 +1,3 @@ +addr="0.0.0.0:5000" +db_path="dev.db" +brokers="localhost:19092" diff --git a/examples/docker-compose.yaml b/examples/docker-compose.yaml index 5ad6cb0..1e6dfa1 100644 --- a/examples/docker-compose.yaml +++ b/examples/docker-compose.yaml @@ -1,42 +1,60 @@ version: "3.7" +name: redpanda-quickstart-one-broker + networks: - kafka-network: + redpanda_network: driver: bridge + +volumes: + redpanda-0: null services: - zookeeper: - image: confluentinc/cp-zookeeper:latest - networks: - - kafka-network - environment: - ZOOKEEPER_CLIENT_PORT: 2181 - ZOOKEEPER_TICK_TIME: 2000 - - kafka: - image: confluentinc/cp-kafka:latest + redpanda-0: + command: + - redpanda + - start + - --kafka-addr internal://0.0.0.0:9092,external://0.0.0.0:19092 + - --advertise-kafka-addr internal://redpanda-0:9092,external://localhost:19092 + - --pandaproxy-addr internal://0.0.0.0:8082,external://0.0.0.0:18082 + - --advertise-pandaproxy-addr internal://redpanda-0:8082,external://localhost:18082 + - --schema-registry-addr internal://0.0.0.0:8081,external://0.0.0.0:18081 + - --rpc-addr redpanda-0:33145 + - --advertise-rpc-addr redpanda-0:33145 + - --mode dev-container + - --smp 1 + - --default-log-level=debug + image: docker.redpanda.com/redpandadata/redpanda:latest + container_name: redpanda-0 + volumes: + - redpanda-0:/var/lib/redpanda/data networks: - - kafka-network - depends_on: - - zookeeper + - redpanda_network ports: - - 9092:9092 - environment: - KAFKA_BROKER_ID: 1 - KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 - KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092 - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT - KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 - - kafdrop: - image: obsidiandynamics/kafdrop:latest + - 18081:18081 + - 18082:18082 + - 19092:19092 + - 19644:9644 + console: + container_name: redpanda-console + image: docker.redpanda.com/redpandadata/console:latest networks: - - kafka-network - depends_on: - - kafka - ports: - - 19000:9000 + - redpanda_network + entrypoint: /bin/sh + command: -c 'echo "$$CONSOLE_CONFIG_FILE" > /tmp/config.yml; /app/console' environment: - KAFKA_BROKERCONNECT: kafka:29092 - + CONFIG_FILEPATH: /tmp/config.yml + CONSOLE_CONFIG_FILE: | + kafka: + brokers: ["redpanda-0:9092"] + schemaRegistry: + enabled: true + urls: ["http://redpanda-0:8081"] + redpanda: + adminApi: + enabled: true + urls: ["http://redpanda-0:9644"] + ports: + - 8080:8080 + depends_on: + - redpanda-0 diff --git a/src/bin/daemon.rs b/src/bin/daemon.rs index 2365136..5e6a6c0 100644 --- a/src/bin/daemon.rs +++ b/src/bin/daemon.rs @@ -22,12 +22,12 @@ async fn main() -> Result<()> { let config = Config::new()?; - fabric::drivers::monitor::subscribe(&config.kafka_host).await + fabric::drivers::monitor::subscribe(&config.brokers).await } #[derive(Debug, Deserialize)] struct Config { - kafka_host: String, + brokers: String, } impl Config { pub fn new() -> Result { diff --git a/src/bin/rpc.rs b/src/bin/rpc.rs index 3569e06..201c044 100644 --- a/src/bin/rpc.rs +++ b/src/bin/rpc.rs @@ -23,8 +23,8 @@ async fn main() -> Result<()> { let config = Config::new()?; futures::future::try_join( - fabric::drivers::grpc::server(&config.addr, &config.db_path, &config.kafka_host), - fabric::drivers::event::subscribe(&config.kafka_host), + fabric::drivers::grpc::server(&config.addr, &config.db_path, &config.brokers), + fabric::drivers::event::subscribe(&config.brokers), ) .await?; @@ -35,7 +35,7 @@ async fn main() -> Result<()> { struct Config { addr: String, db_path: String, - kafka_host: String, + brokers: String, } impl Config { pub fn new() -> Result { diff --git a/src/driven/kafka/mod.rs b/src/driven/kafka/mod.rs index 8ccd2f2..f5516de 100644 --- a/src/driven/kafka/mod.rs +++ b/src/driven/kafka/mod.rs @@ -1,38 +1,39 @@ -use anyhow::{ensure, Result}; -use kafka::{ - client::KafkaClient, - producer::{Producer, Record}, +use anyhow::{Error, Result}; +use rdkafka::{ + producer::{FutureProducer, FutureRecord}, + ClientConfig, }; +use std::time::Duration; use crate::domain::events::{Event, EventBridge}; -pub struct KafkaEventBridge { - hosts: Vec, +pub struct KafkaProducer { + producer: FutureProducer, topic: String, } -impl KafkaEventBridge { - pub fn new(hosts: &[String], topic: &str) -> Result { - let hosts = hosts.to_vec(); - let mut client = KafkaClient::new(hosts.to_vec()); - client.load_metadata_all()?; +impl KafkaProducer { + pub fn new(brokers: &str, topic: &str) -> Result { + let producer: FutureProducer = ClientConfig::new() + .set("bootstrap.servers", brokers) + .create()?; - let topic = topic.to_string(); - ensure!( - client.topics().contains(&topic), - "topic {topic} does not exist yet", - ); - - Ok(Self { hosts, topic }) + Ok(Self { + producer, + topic: topic.to_string(), + }) } } #[async_trait::async_trait] -impl EventBridge for KafkaEventBridge { +impl EventBridge for KafkaProducer { async fn dispatch(&self, event: Event) -> Result<()> { let data = serde_json::to_vec(&event)?; - let record = Record::from_value(&self.topic, data); - - let mut producer = Producer::from_hosts(self.hosts.clone()).create()?; - producer.send(&record)?; + self.producer + .send( + FutureRecord::to(&self.topic).payload(&data).key(""), + Duration::from_secs(0), + ) + .await + .map_err(|err| Error::msg(err.0.to_string()))?; Ok(()) } diff --git a/src/drivers/event/mod.rs b/src/drivers/event/mod.rs index ac56710..5e87707 100644 --- a/src/drivers/event/mod.rs +++ b/src/drivers/event/mod.rs @@ -1,52 +1,47 @@ use anyhow::Result; -use kafka::{ - client::{FetchOffset, GroupOffsetStorage}, - consumer::Consumer, +use rdkafka::{ + consumer::{CommitMode, Consumer, StreamConsumer}, + ClientConfig, Message, }; use std::{path::Path, sync::Arc}; -use tracing::info; +use tracing::{error, info}; use crate::{ domain::{events::Event, management::project::create_cache}, driven::cache::{project::SqliteProjectCache, SqliteCache}, }; -pub async fn subscribe(kafka_host: &str) -> Result<()> { +pub async fn subscribe(brokers: &str) -> Result<()> { let sqlite_cache = Arc::new(SqliteCache::new(Path::new("dev.db")).await?); sqlite_cache.migrate().await?; let project_cache = Arc::new(SqliteProjectCache::new(sqlite_cache)); - let topic = "events".to_string(); - let hosts = &[kafka_host.into()]; + let topic = String::from("events"); - let mut consumer = Consumer::from_hosts(hosts.to_vec()) - .with_topic(topic.clone()) - .with_group("cache".to_string()) - .with_fallback_offset(FetchOffset::Earliest) - .with_offset_storage(Some(GroupOffsetStorage::Kafka)) + let consumer: StreamConsumer = ClientConfig::new() + .set("bootstrap.servers", brokers) + .set("group.id", "cache") .create()?; - info!("Subscriber running"); + consumer.subscribe(&[&topic])?; + info!("Subscriber running"); loop { - let mss = consumer.poll()?; - if mss.is_empty() { - continue; - } - - for ms in mss.iter() { - for m in ms.messages() { - let event: Event = serde_json::from_slice(m.value)?; - match event { - Event::NamespaceCreation(namespace) => { - create_cache(project_cache.clone(), namespace).await?; - } - Event::AccountCreation(_) => todo!(), - }; + match consumer.recv().await { + Err(err) => error!(error = err.to_string(), "kafka subscribe error"), + Ok(message) => { + if let Some(payload) = message.payload() { + let event: Event = serde_json::from_slice(payload)?; + match event { + Event::NamespaceCreation(namespace) => { + create_cache(project_cache.clone(), namespace).await?; + } + Event::AccountCreation(_) => todo!(), + }; + consumer.commit_message(&message, CommitMode::Async)?; + } } - consumer.consume_messageset(ms)?; - } - consumer.commit_consumed()?; + }; } } diff --git a/src/drivers/grpc/mod.rs b/src/drivers/grpc/mod.rs index c9297c8..b2f6631 100644 --- a/src/drivers/grpc/mod.rs +++ b/src/drivers/grpc/mod.rs @@ -8,16 +8,16 @@ use tracing::info; use dmtri::demeter::ops::v1alpha::project_service_server::ProjectServiceServer; use crate::driven::cache::{project::SqliteProjectCache, SqliteCache}; -use crate::driven::kafka::KafkaEventBridge; +use crate::driven::kafka::KafkaProducer; mod account; mod project; -pub async fn server(addr: &str, db_path: &str, kafka_host: &str) -> Result<()> { +pub async fn server(addr: &str, db_path: &str, brokers: &str) -> Result<()> { let sqlite_cache = Arc::new(SqliteCache::new(Path::new(&db_path)).await?); let project_cache = Arc::new(SqliteProjectCache::new(sqlite_cache)); - let event_bridge = Arc::new(KafkaEventBridge::new(&[kafka_host.into()], "events")?); + let event_bridge = Arc::new(KafkaProducer::new(brokers, "events")?); let reflection = tonic_reflection::server::Builder::configure() .register_encoded_file_descriptor_set(dmtri::demeter::ops::v1alpha::FILE_DESCRIPTOR_SET) diff --git a/src/drivers/monitor/mod.rs b/src/drivers/monitor/mod.rs index 3b7ad99..4a4bd20 100644 --- a/src/drivers/monitor/mod.rs +++ b/src/drivers/monitor/mod.rs @@ -1,49 +1,44 @@ use anyhow::Result; -use kafka::{ - client::{FetchOffset, GroupOffsetStorage}, - consumer::Consumer, +use rdkafka::{ + consumer::{CommitMode, Consumer, StreamConsumer}, + ClientConfig, Message, }; use std::sync::Arc; -use tracing::info; +use tracing::{error, info}; use crate::{ domain::{daemon::namespace::create_namespace, events::Event}, driven::k8s::K8sCluster, }; -pub async fn subscribe(kafka_host: &str) -> Result<()> { +pub async fn subscribe(brokers: &str) -> Result<()> { let k8s_cluster = Arc::new(K8sCluster::new().await?); - let topic = "events".to_string(); - let hosts = &[kafka_host.into()]; + let topic = String::from("events"); - let mut consumer = Consumer::from_hosts(hosts.to_vec()) - .with_topic(topic.clone()) - .with_group("clusters".to_string()) - .with_fallback_offset(FetchOffset::Earliest) - .with_offset_storage(Some(GroupOffsetStorage::Kafka)) + let consumer: StreamConsumer = ClientConfig::new() + .set("bootstrap.servers", brokers) + .set("group.id", "clusters") .create()?; - info!("Subscriber running"); + consumer.subscribe(&[&topic])?; + info!("Subscriber running"); loop { - let mss = consumer.poll()?; - if mss.is_empty() { - continue; - } - - for ms in mss.iter() { - for m in ms.messages() { - let event: Event = serde_json::from_slice(m.value)?; - match event { - Event::NamespaceCreation(namespace) => { - create_namespace(k8s_cluster.clone(), namespace).await?; - } - Event::AccountCreation(_) => todo!(), - }; + match consumer.recv().await { + Err(err) => error!(error = err.to_string(), "kafka subscribe error"), + Ok(message) => { + if let Some(payload) = message.payload() { + let event: Event = serde_json::from_slice(payload)?; + match event { + Event::NamespaceCreation(namespace) => { + create_namespace(k8s_cluster.clone(), namespace).await?; + } + Event::AccountCreation(_) => todo!(), + }; + consumer.commit_message(&message, CommitMode::Async)?; + } } - consumer.consume_messageset(ms)?; - } - consumer.commit_consumed()?; + }; } }