diff --git a/src/consumer.rs b/src/consumer.rs index 956e7a4..86d42b2 100644 --- a/src/consumer.rs +++ b/src/consumer.rs @@ -132,13 +132,11 @@ impl ConsumerBuilder { let collector = self.environment.options.client_options.collector.clone(); - let client_uwrapped = self + let client = self .environment .create_consumer_client(stream, self.client_provided_name.clone()) .await?; - let client = client_uwrapped.clone(); - let subscription_id = 1; let (tx, rx) = channel(10000); let consumer = Arc::new(ConsumerInternal { diff --git a/src/environment.rs b/src/environment.rs index 132e1ca..b033475 100644 --- a/src/environment.rs +++ b/src/environment.rs @@ -39,7 +39,7 @@ impl Environment { Ok(Environment { options }) } - pub async fn create_producer_client( + pub(crate) async fn create_producer_client( self, stream: &str, client_provided_name: String, @@ -91,7 +91,7 @@ impl Environment { Ok(client) } - pub async fn create_consumer_client( + pub(crate) async fn create_consumer_client( self, stream: &str, client_provided_name: String, diff --git a/src/producer.rs b/src/producer.rs index b41db0e..96c94a0 100644 --- a/src/producer.rs +++ b/src/producer.rs @@ -120,13 +120,11 @@ impl ProducerBuilder { let metrics_collector = self.environment.options.client_options.collector.clone(); - let client_unwrapped = self + let client = self .environment .create_producer_client(stream, self.client_provided_name.clone()) .await?; - let client = client_unwrapped; - let mut publish_version = 1; if self.filter_value_extractor.is_some() {