Skip to content

Commit

Permalink
Check version when version is not (#345)
Browse files Browse the repository at this point in the history
* Check version when version is not set and is less than 3.11
* Reduce the log level for the exchange version 

---------

Signed-off-by: Gabriele Santomaggio <[email protected]>
  • Loading branch information
Gsantomaggio authored Jul 30, 2024
1 parent e4f6b06 commit e0bcf7e
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 4 deletions.
2 changes: 1 addition & 1 deletion pkg/stream/available_features.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (a *availableFeatures) BrokerFilterEnabled() bool {
func (a *availableFeatures) IsBrokerSingleActiveConsumerEnabled() bool {
lock.Lock()
defer lock.Unlock()
return a.brokerSingleActiveConsumerEnabled == a.is311OrMore
return a.brokerSingleActiveConsumerEnabled
}

func (a *availableFeatures) SetVersion(version string) error {
Expand Down
10 changes: 7 additions & 3 deletions pkg/stream/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,9 +212,13 @@ func (c *Client) connect() error {
return err2
}

logs.LogDebug("Server properties: %s", c.serverProperties)
if serverProperties["version"] == "" {
logs.LogInfo(
err = c.availableFeatures.SetVersion(serverProperties["version"])
if err != nil {
logs.LogWarn("Error checking server version: %s", err)
}

if serverProperties["version"] == "" || !c.availableFeatures.Is311OrMore() {
logs.LogDebug(
"Server version is less than 3.11.0, skipping command version exchange")
} else {
err := c.exchangeVersion(c.serverProperties["version"])
Expand Down

0 comments on commit e0bcf7e

Please sign in to comment.