diff --git a/src/connection.rs b/src/connection.rs index 13eaf08..e2d5aa8 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -1639,6 +1639,7 @@ pub(crate) mod messages { .collect(), read_compacted: Some(options.read_compacted.unwrap_or(false)), initial_position: Some(options.initial_position.into()), + replicate_subscription_state: options.replicate_subscription_state, schema: options.schema, start_message_id: options.start_message_id, ..Default::default() diff --git a/src/consumer/options.rs b/src/consumer/options.rs index ba2af56..1053ce2 100644 --- a/src/consumer/options.rs +++ b/src/consumer/options.rs @@ -31,6 +31,10 @@ pub struct ConsumerOptions { /// } /// ``` pub initial_position: InitialPosition, + /// Mark the subscription as "replicated". Pulsar will make sure + /// to periodically sync the state of replicated subscriptions + /// across different clusters (when using geo-replication). + pub replicate_subscription_state: Option, } impl ConsumerOptions { @@ -76,4 +80,10 @@ impl ConsumerOptions { self.initial_position = initial_position; self } + + #[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))] + pub fn with_replicate_subscription_state(mut self, replicate_subscription_state: bool) -> Self { + self.replicate_subscription_state = Some(replicate_subscription_state); + self + } }