Skip to content

Commit

Permalink
Adapt the heartbeat checker to the configuration. (#361)
Browse files Browse the repository at this point in the history
* Adapt the heartbeat checker to 
 the configuration. The client could raise a missing heartbeat if a heartbeat is less than 20 seconds. In some cases, it could disconnect the client
---------

Signed-off-by: Gabriele Santomaggio <[email protected]>
Co-authored-by: Alberto Moretti <[email protected]>
  • Loading branch information
Gsantomaggio and hiimjako authored Nov 22, 2024
1 parent bb5b42a commit 3b0dc4c
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 9 deletions.
2 changes: 1 addition & 1 deletion pkg/stream/brokers.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func newBrokerDefault() *Broker {

func newTCPParameterDefault() *TCPParameters {
return &TCPParameters{
RequestedHeartbeat: 60 * time.Second,
RequestedHeartbeat: defaultHeartbeat,
RequestedMaxFrameSize: 1048576,
WriteBuffer: 8192,
ReadBuffer: 65536,
Expand Down
10 changes: 6 additions & 4 deletions pkg/stream/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,13 +413,15 @@ func (c *Client) DeleteStream(streamName string) error {
}

func (c *Client) heartBeat() {
ticker := time.NewTicker(60 * time.Second)
tickerHeatBeat := time.NewTicker(20 * time.Second)
ticker := time.NewTicker(time.Duration(c.tuneState.requestedHeartbeat) * time.Second)
tickerHeartbeat := time.NewTicker(time.Duration(c.tuneState.requestedHeartbeat-2) * time.Second)

resp := c.coordinator.NewResponseWitName("heartbeat")
var heartBeatMissed int32

go func() {
for c.socket.isOpen() {
<-tickerHeatBeat.C
<-tickerHeartbeat.C
if time.Since(c.getLastHeartBeat()) > time.Duration(c.tuneState.requestedHeartbeat)*time.Second {
v := atomic.AddInt32(&heartBeatMissed, 1)
logs.LogWarn("Missing heart beat: %d", v)
Expand All @@ -432,7 +434,7 @@ func (c *Client) heartBeat() {
}

}
tickerHeatBeat.Stop()
tickerHeartbeat.Stop()
}()

go func() {
Expand Down
2 changes: 2 additions & 0 deletions pkg/stream/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ const (
///
defaultSocketCallTimeout = 10 * time.Second

defaultHeartbeat = 60 * time.Second

//
LocalhostUriConnection = "rabbitmq-stream://guest:guest@localhost:5552/%2f"

Expand Down
11 changes: 11 additions & 0 deletions pkg/stream/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ func NewEnvironment(options *EnvironmentOptions) (*Environment, error) {
options.RPCTimeout = defaultSocketCallTimeout
}

if options.TCPParameters == nil {
options.TCPParameters = newTCPParameterDefault()

}

client := newClient("go-stream-locator", nil,
options.TCPParameters, options.SaslConfiguration, options.RPCTimeout)
defer func(client *Client) {
Expand All @@ -39,6 +44,12 @@ func NewEnvironment(options *EnvironmentOptions) (*Environment, error) {
}
}(client)

// we put a limit to the heartbeat.
// it doesn't make sense to have a heartbeat less than 3 seconds
if options.TCPParameters.RequestedHeartbeat < (3 * time.Second) {
return nil, errors.New("RequestedHeartbeat must be greater than 3 seconds")
}

if options.MaxConsumersPerClient <= 0 || options.MaxProducersPerClient <= 0 ||
options.MaxConsumersPerClient > 254 || options.MaxProducersPerClient > 254 {
return nil, fmt.Errorf(" MaxConsumersPerClient and MaxProducersPerClient must be between 1 and 254")
Expand Down
7 changes: 6 additions & 1 deletion pkg/stream/environment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ var _ = Describe("Environment test", func() {
},
TCPParameters: &TCPParameters{
tlsConfig: nil,
RequestedHeartbeat: 60,
RequestedHeartbeat: defaultHeartbeat,
RequestedMaxFrameSize: 1048574,
WriteBuffer: 100,
ReadBuffer: 200,
Expand Down Expand Up @@ -263,6 +263,11 @@ var _ = Describe("Environment test", func() {
Expect(env.options.TCPParameters.RequestedMaxFrameSize).NotTo(BeZero())
})

It("RequestedHeartbeat should be greater then 3 seconds", func() {
_, err := NewEnvironment(NewEnvironmentOptions().SetRequestedHeartbeat(2 * time.Second))
Expect(err).To(HaveOccurred())
})

})

Describe("Validation Query Offset/Sequence", func() {
Expand Down
17 changes: 14 additions & 3 deletions pkg/stream/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,10 @@ func NewProducerFilter(filterValue FilterValue) *ProducerFilter {
type ProducerOptions struct {
client *Client
streamName string
Name string // Producer name, it is useful to handle deduplication messages
Name string // Producer name, valid for deduplication
QueueSize int // Internal queue to handle back-pressure, low value reduces the back-pressure on the server
BatchSize int // It is the batch-unCompressedSize aggregation, low value reduce the latency, high value increase the throughput
BatchPublishingDelay int // Period to Send a batch of messages.
BatchSize int // It is the batch-unCompressedSize aggregation, low value reduce the latency, high value increase the throughput. Valid only for the method Send()
BatchPublishingDelay int // Timout within the aggregation sent a batch of messages. Valid only for the method Send()
SubEntrySize int // Size of sub Entry, to aggregate more subEntry using one publishing id
Compression Compression // Compression type, it is valid only if SubEntrySize > 1
ConfirmationTimeOut time.Duration // Time to wait for the confirmation
Expand All @@ -115,6 +115,8 @@ func (po *ProducerOptions) SetQueueSize(size int) *ProducerOptions {
return po
}

// SetBatchSize sets the batch size for the producer
// Valid only for the method Send()
func (po *ProducerOptions) SetBatchSize(size int) *ProducerOptions {
po.BatchSize = size
return po
Expand Down Expand Up @@ -355,6 +357,9 @@ func (producer *Producer) sendBytes(streamMessage message.StreamMessage, message
return nil
}

// Send sends a message to the stream and returns an error if the message could not be sent.
// Send is asynchronous. The aggregation of the messages is based on the BatchSize and BatchPublishingDelay
// options. The message is sent when the aggregation is reached or the BatchPublishingDelay is reached.
func (producer *Producer) Send(streamMessage message.StreamMessage) error {
messageBytes, err := streamMessage.MarshalBinary()
if err != nil {
Expand All @@ -372,6 +377,12 @@ func (producer *Producer) assignPublishingID(message message.StreamMessage) int6
return sequence
}

// BatchSend sends a batch of messages to the stream and returns an error if the messages could not be sent.
// BatchSend is synchronous. The aggregation is up to the user. The user has to aggregate the messages
// and send them in a batch.
// BatchSend is not affected by the BatchSize and BatchPublishingDelay options.
// BatchSend is the primitive method to send messages to the stream, the method Send prepares the messages and
// calls BatchSend internally.
func (producer *Producer) BatchSend(batchMessages []message.StreamMessage) error {
var messagesSequence = make([]messageSequence, len(batchMessages))
totalBufferToSend := 0
Expand Down

0 comments on commit 3b0dc4c

Please sign in to comment.