Skip to content

Commit

Permalink
Merge branch 'main' into dependabot/go_modules/github.com/klauspost/c…
Browse files Browse the repository at this point in the history
…ompress-1.17.10
  • Loading branch information
Gsantomaggio authored Sep 24, 2024
2 parents f9976d4 + d3ecf25 commit bf82779
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 30 deletions.
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ rabbitmq-ha-proxy:
mv compose/ha_tls/tls-gen/basic/result/server_*_certificate.pem compose/ha_tls/tls-gen/basic/result/server_certificate.pem
mv compose/ha_tls/tls-gen/basic/result/server_*key.pem compose/ha_tls/tls-gen/basic/result/server_key.pem
cd compose/ha_tls; docker build -t haproxy-rabbitmq-cluster .
cd compose/ha_tls; docker-compose down
cd compose/ha_tls; docker-compose up
cd compose/ha_tls; docker compose down
cd compose/ha_tls; docker compose up

rabbitmq-server-tls:
cd compose/tls; rm -rf tls-gen;
Expand Down
18 changes: 13 additions & 5 deletions pkg/stream/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -595,13 +595,26 @@ func (c *Client) DeclarePublisher(streamName string, options *ProducerOptions) (
}

func (c *Client) internalDeclarePublisher(streamName string, producer *Producer) responseError {

publisherReferenceSize := 0
if producer.options != nil {
if producer.options.Name != "" {
publisherReferenceSize = len(producer.options.Name)
}
}

if publisherReferenceSize > 0 {
v, err := c.queryPublisherSequence(producer.options.Name, streamName)
if err != nil {
// if the client can't get the sequence, the function will return an error
// because is not able to set the sequence
// in most of the case the error timeout is during the re-connection
// in this case the producer can't be created and the client will return an error
return responseError{Err: err}
}
producer.sequence = v
}

length := 2 + 2 + 4 + 1 + 2 + publisherReferenceSize + 2 + len(streamName)
resp := c.coordinator.NewResponse(commandDeclarePublisher, streamName)
correlationId := resp.correlationid
Expand All @@ -618,11 +631,6 @@ func (c *Client) internalDeclarePublisher(streamName string, producer *Producer)
writeString(b, streamName)
res := c.handleWrite(b.Bytes(), resp)

if publisherReferenceSize > 0 {
v, _ := c.queryPublisherSequence(producer.options.Name, streamName)
producer.sequence = v
}

return res
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/stream/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ var _ = Describe("Streaming testEnvironment", func() {
cli.socketCallTimeout = time.Millisecond

res, err := cli.queryPublisherSequence("ref", "stream")
Expect(err.Error()).To(ContainSubstring("timeout 1 ms - waiting Code, operation: Command not handled 5"))
Expect(err.Error()).To(ContainSubstring("timeout 1 ms - waiting Code, operation: CommandQueryPublisherSequence"))
Expect(res).To(BeZero())
})

Expand All @@ -208,7 +208,7 @@ var _ = Describe("Streaming testEnvironment", func() {
cli.socketCallTimeout = time.Millisecond

res, err := cli.StreamStats("stream")
Expect(err.Error()).To(ContainSubstring("timeout 1 ms - waiting Code, operation: Command not handled 28"))
Expect(err.Error()).To(ContainSubstring("timeout 1 ms - waiting Code, operation: CommandStreamStatus"))
Expect(res).To(BeNil())
})

Expand Down
52 changes: 31 additions & 21 deletions pkg/stream/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,27 +197,37 @@ func lookErrorCode(errorCode uint16) error {

func lookUpCommand(command uint16) string {
var constLookup = map[uint16]string{
commandPeerProperties: `commandPeerProperties`,
commandSaslHandshake: `commandSaslHandshake`,
commandSaslAuthenticate: `commandSaslAuthenticate`,
commandTune: `commandTune`,
commandOpen: `commandOpen`,
commandHeartbeat: `commandHeartbeat`,
CommandMetadataUpdate: `CommandMetadataUpdate`,
commandMetadata: `CommandMetadata`,
commandDeleteStream: `CommandDeleteStream`,
commandCreateStream: `CommandCreateStream`,
CommandUnsubscribe: `CommandUnsubscribe`,
CommandQueryOffset: `CommandQueryOffset`,
commandCredit: `CommandCredit`,
commandDeliver: `CommandDeliver`,
commandSubscribe: `CommandSubscribe`,
CommandDeletePublisher: `CommandDeletePublisher`,
commandPublishError: `CommandPublishError`,
commandPublishConfirm: `CommandPublishConfirm`,
commandDeclarePublisher: `CommandDeclarePublisher`,
commandUnitTest: `UnitTest`,
CommandClose: `CommandClose`,
commandPeerProperties: `commandPeerProperties`,
commandSaslHandshake: `commandSaslHandshake`,
commandSaslAuthenticate: `commandSaslAuthenticate`,
commandTune: `commandTune`,
commandOpen: `commandOpen`,
commandHeartbeat: `commandHeartbeat`,
CommandMetadataUpdate: `CommandMetadataUpdate`,
commandMetadata: `CommandMetadata`,
commandDeleteStream: `CommandDeleteStream`,
commandCreateStream: `CommandCreateStream`,
CommandUnsubscribe: `CommandUnsubscribe`,
CommandQueryOffset: `CommandQueryOffset`,
commandCredit: `CommandCredit`,
commandDeliver: `CommandDeliver`,
commandSubscribe: `CommandSubscribe`,
CommandDeletePublisher: `CommandDeletePublisher`,
commandPublishError: `CommandPublishError`,
commandPublishConfirm: `CommandPublishConfirm`,
commandDeclarePublisher: `CommandDeclarePublisher`,
commandPublish: `CommandPublish`,
commandQueryPublisherSequence: `CommandQueryPublisherSequence`,
commandQueryRoute: `CommandQueryRoute`,
commandQueryPartition: `CommandQueryPartition`,
commandExchangeVersion: `CommandExchangeVersion`,
commandStreamStatus: `CommandStreamStatus`,
commandCreateSuperStream: `CommandCreateSuperStream`,
commandDeleteSuperStream: `CommandDeleteSuperStream`,
commandConsumerUpdate: `CommandConsumerUpdate`,

commandUnitTest: `UnitTest`,
CommandClose: `CommandClose`,
}
if constLookup[command] == "" {
return fmt.Sprintf("Command not handled %d", command)
Expand Down

0 comments on commit bf82779

Please sign in to comment.