Skip to content

Commit

Permalink
improve the message log
Browse files Browse the repository at this point in the history
Signed-off-by: Gabriele Santomaggio <[email protected]>
  • Loading branch information
Gsantomaggio committed Jun 19, 2024
1 parent a2fd590 commit c5ad6ff
Showing 1 changed file with 13 additions and 2 deletions.
15 changes: 13 additions & 2 deletions pkg/stream/server_frame.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,8 @@ func (c *Client) handleDeliver(r *bufio.Reader) {
if consumer.getStatus() == open {
consumer.response.chunkForConsumer <- chunk
} else {
logs.LogWarn("Consumer %s is closed", consumer.GetStreamName())
logs.LogDebug("The consumer %s for the stream %s is closed during the chunk dispatching. "+
"Messages won't dispatched", consumer.GetName(), consumer.GetStreamName())
}

}
Expand All @@ -439,7 +440,17 @@ func (c *Client) creditNotificationFrameHandler(readProtocol *ReaderProtocol,
r *bufio.Reader) {
readProtocol.ResponseCode = uShortExtractResponseCode(readUShort(r))
subscriptionId := readByte(r)
logs.LogWarn("received a credit for an unknown subscriptionId: %d", subscriptionId)
consumer, err := c.coordinator.GetConsumerById(subscriptionId)
if err != nil {
logs.LogWarn("received a credit for an unknown subscriptionId: %d", subscriptionId)
return
}

if consumer != nil && consumer.getStatus() == closed {
logs.LogDebug("received a credit for a closed consumer %d", subscriptionId)
return
}

}

func (c *Client) queryOffsetFrameHandler(readProtocol *ReaderProtocol,
Expand Down

0 comments on commit c5ad6ff

Please sign in to comment.