From e0bcf7e6fcec4be18c14ff87ffd212662c092a94 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Tue, 30 Jul 2024 10:28:59 +0200 Subject: [PATCH] Check version when version is not (#345) * Check version when version is not set and is less than 3.11 * Reduce the log level for the exchange version --------- Signed-off-by: Gabriele Santomaggio --- pkg/stream/available_features.go | 2 +- pkg/stream/client.go | 10 +++++++--- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/pkg/stream/available_features.go b/pkg/stream/available_features.go index 2f9b787e..22f1a409 100644 --- a/pkg/stream/available_features.go +++ b/pkg/stream/available_features.go @@ -45,7 +45,7 @@ func (a *availableFeatures) BrokerFilterEnabled() bool { func (a *availableFeatures) IsBrokerSingleActiveConsumerEnabled() bool { lock.Lock() defer lock.Unlock() - return a.brokerSingleActiveConsumerEnabled == a.is311OrMore + return a.brokerSingleActiveConsumerEnabled } func (a *availableFeatures) SetVersion(version string) error { diff --git a/pkg/stream/client.go b/pkg/stream/client.go index a5adc857..7a713b1b 100644 --- a/pkg/stream/client.go +++ b/pkg/stream/client.go @@ -212,9 +212,13 @@ func (c *Client) connect() error { return err2 } - logs.LogDebug("Server properties: %s", c.serverProperties) - if serverProperties["version"] == "" { - logs.LogInfo( + err = c.availableFeatures.SetVersion(serverProperties["version"]) + if err != nil { + logs.LogWarn("Error checking server version: %s", err) + } + + if serverProperties["version"] == "" || !c.availableFeatures.Is311OrMore() { + logs.LogDebug( "Server version is less than 3.11.0, skipping command version exchange") } else { err := c.exchangeVersion(c.serverProperties["version"])