Skip to content

Commit

Permalink
send
Browse files Browse the repository at this point in the history
Signed-off-by: Gabriele Santomaggio <[email protected]>
  • Loading branch information
Gsantomaggio committed Dec 9, 2024
1 parent fb0a6c9 commit 3930f60
Show file tree
Hide file tree
Showing 14 changed files with 188 additions and 416 deletions.
26 changes: 17 additions & 9 deletions examples/reliable/reliable_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,14 @@ var consumed int32 = 0
var sent int32
var reSent int32

const enableResend = false

func main() {
// Tune the parameters to test the reliability
const messagesToSend = 10_000_000
const numberOfProducers = 4
const numberOfProducers = 1
const concurrentProducers = 2
const numberOfConsumers = 4
const numberOfConsumers = 1
const sendDelay = 1 * time.Millisecond
const delayEachMessages = 200
const maxProducersPerClient = 4
Expand Down Expand Up @@ -79,8 +81,8 @@ func main() {
go func() {
for isRunning {
totalConfirmed := atomic.LoadInt32(&confirmed) + atomic.LoadInt32(&fail)
expectedMessages := messagesToSend * numberOfProducers * concurrentProducers
fmt.Printf("%s - ToSend: %d - nProducers: %d - concurrentProducers: %d - nConsumers %d \n", time.Now().Format(time.RFC822),
expectedMessages := messagesToSend * numberOfProducers * concurrentProducers * 2
fmt.Printf("%s - ToSend: %d - nProducers: %d - concurrentProducers: %d - nConsumers %d \n", time.Now().Format(time.RFC850),
expectedMessages, numberOfProducers, concurrentProducers, numberOfConsumers)
fmt.Printf("Sent:%d - ReSent %d - Confirmed:%d - Not confirmed:%d - Fail+Confirmed :%d \n",
sent, atomic.LoadInt32(&reSent), atomic.LoadInt32(&confirmed), atomic.LoadInt32(&fail), totalConfirmed)
Expand All @@ -100,7 +102,7 @@ func main() {
rProducer, err := ha.NewReliableProducer(env,
streamName,
stream.NewProducerOptions().
SetConfirmationTimeOut(5*time.Second).
SetConfirmationTimeOut(2*time.Second).
SetClientProvidedName(fmt.Sprintf("producer-%d", i)),
func(messageStatus []*stream.ConfirmationStatus) {
go func() {
Expand All @@ -109,9 +111,11 @@ func main() {
atomic.AddInt32(&confirmed, 1)
} else {
atomic.AddInt32(&fail, 1)
mutex.Lock()
unConfirmedMessages = append(unConfirmedMessages, msgStatus.GetMessage())
mutex.Unlock()
if enableResend {
mutex.Lock()
unConfirmedMessages = append(unConfirmedMessages, msgStatus.GetMessage())
mutex.Unlock()
}
}
}
}()
Expand All @@ -122,7 +126,6 @@ func main() {
for i := 0; i < concurrentProducers; i++ {
go func() {
for i := 0; i < messagesToSend; i++ {
msg := amqp.NewMessage([]byte("ha"))
mutex.Lock()
for _, confirmedMessage := range unConfirmedMessages {
err := rProducer.Send(confirmedMessage)
Expand All @@ -131,13 +134,18 @@ func main() {
}
unConfirmedMessages = []message.StreamMessage{}
mutex.Unlock()
msg := amqp.NewMessage([]byte("ha"))
err := rProducer.Send(msg)
if i%delayEachMessages == 0 {
time.Sleep(sendDelay)
}
atomic.AddInt32(&sent, 1)
CheckErr(err)

errBatch := rProducer.BatchSend([]message.StreamMessage{msg})
CheckErr(errBatch)
atomic.AddInt32(&sent, 1)

}
}()
}
Expand Down
12 changes: 7 additions & 5 deletions perfTest/cmd/silent.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,13 @@ func printStats() {

averageLatency := int64(0)
CMessagesPerSecond := float64(0)
ConfirmedMessagesPerSecond := float64(0)
if atomic.LoadInt32(&consumerMessageCount) > 0 {
CMessagesPerSecond = float64(atomic.LoadInt32(&consumerMessageCount)) / float64(v) * 1000
averageLatency = totalLatency / int64(atomic.LoadInt32(&consumerMessageCount))
}

ConfirmedMessagesPerSecond := float64(0)
if atomic.LoadInt32(&confirmedMessageCount) > 0 {
ConfirmedMessagesPerSecond = float64(atomic.LoadInt32(&confirmedMessageCount)) / float64(v) * 1000
}
p := gomsg.NewPrinter(language.English)
Expand Down Expand Up @@ -277,7 +280,7 @@ func startPublisher(streamName string) error {
logInfo("Enable SubEntrySize: %d, compression: %s", subEntrySize, cp)
}

producerOptions.SetClientProvidedName(clientProvidedName)
producerOptions.SetClientProvidedName(clientProvidedName).SetBatchSize(batchSize)
rPublisher, err := ha.NewReliableProducer(simulEnvironment,
streamName,
producerOptions,
Expand Down Expand Up @@ -319,7 +322,6 @@ func startPublisher(streamName string) error {
err = prod.BatchSend(messages)
checkErr(err)
}

atomic.AddInt32(&publisherMessageCount, int32(len(messages)))

}
Expand All @@ -337,8 +339,8 @@ func buildMessages() []message.StreamMessage {
body = make([]byte, fixedBody)
} else {
if variableBody > 0 {
rand.Seed(time.Now().UnixNano())
body = make([]byte, rand.Intn(variableBody))
r := rand.New(rand.NewSource(time.Now().Unix()))
body = make([]byte, r.Intn(variableBody))
}
}
var buff = make([]byte, 8)
Expand Down
3 changes: 1 addition & 2 deletions pkg/ha/ha_publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,6 @@ func (p *ReliableProducer) Send(message message.StreamMessage) error {
if err := p.isReadyToSend(); err != nil {
return err
}

p.mutex.Lock()
errW := p.producer.Send(message)
p.mutex.Unlock()
Expand All @@ -162,7 +161,7 @@ func (p *ReliableProducer) BatchSend(batchMessages []message.StreamMessage) erro
}

p.mutex.Lock()
_, errW := p.producer.BatchSend(batchMessages)
errW := p.producer.BatchSend(batchMessages)
p.mutex.Unlock()

return p.checkWriteError(errW)
Expand Down
4 changes: 2 additions & 2 deletions pkg/stream/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -590,9 +590,9 @@ func (c *Client) DeclarePublisher(streamName string, options *ProducerOptions) (
}
res := c.internalDeclarePublisher(streamName, producer)
if res.Err == nil {
producer.startUnconfirmedMessagesTimeOutTask()
producer.processSendingMessages()
//producer.startPublishTask()
producer.startUnconfirmedMessagesTimeOutTask()
}
return producer, res.Err
}
Expand Down Expand Up @@ -756,7 +756,7 @@ func (c *Client) BrokerForConsumer(stream string) (*Broker, error) {
brokers = append(brokers, replica)
}

r := rand.New(rand.NewSource(SEED))
r := rand.New(rand.NewSource(time.Now().Unix()))
n := r.Intn(len(brokers))
return brokers[n], nil
}
Expand Down
4 changes: 1 addition & 3 deletions pkg/stream/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,10 +142,8 @@ var _ = Describe("Streaming testEnvironment", func() {

producer, err := testEnvironment.NewProducer(testStreamName, nil)
Expect(err).NotTo(HaveOccurred())
result, err := producer.BatchSend(CreateArrayMessagesForTesting(1_000))
err = producer.BatchSend(CreateArrayMessagesForTesting(1_000))
Expect(err).NotTo(HaveOccurred())
Expect(result.TotalSent).To(Equal(1_000))
Expect(result.TotalFrames).To(Equal(1))
time.Sleep(time.Millisecond * 800)
Expect(producer.Close()).NotTo(HaveOccurred())

Expand Down
8 changes: 2 additions & 6 deletions pkg/stream/consumer_sac_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,8 @@ import (
func SendMessages(testEnvironment *Environment, streamName string) {
producer, err := testEnvironment.NewProducer(streamName, nil)
Expect(err).NotTo(HaveOccurred())
result, err := producer.BatchSend(CreateArrayMessagesForTesting(30))
err = producer.BatchSend(CreateArrayMessagesForTesting(30))
Expect(err).NotTo(HaveOccurred())
Expect(result.TotalSent).To(Equal(30))
Expect(result.TotalFrames).To(Equal(1))

Expect(producer.Close()).NotTo(HaveOccurred())
}
Expand Down Expand Up @@ -214,10 +212,8 @@ var _ = Describe("Streaming Single Active Consumer", func() {
SetAutoCommit(nil))
Expect(err).NotTo(HaveOccurred())

result, err := producer.BatchSend(CreateArrayMessagesForTesting(10))
err = producer.BatchSend(CreateArrayMessagesForTesting(10))
Expect(err).NotTo(HaveOccurred())
Expect(result.TotalSent).To(Equal(10))
Expect(result.TotalFrames).To(Equal(1))

Eventually(func() int32 {
return atomic.LoadInt32(&messagesReceived)
Expand Down
32 changes: 8 additions & 24 deletions pkg/stream/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,10 +155,8 @@ var _ = Describe("Streaming Consumers", func() {
producer, err := env.NewProducer(streamName, nil)
Expect(err).NotTo(HaveOccurred())

result, err := producer.BatchSend(CreateArrayMessagesForTesting(30)) // batch Send
err = producer.BatchSend(CreateArrayMessagesForTesting(30)) // batch Send
Expect(err).NotTo(HaveOccurred())
Expect(result.TotalSent).To(Equal(30))
Expect(result.TotalFrames).To(Equal(1))

Expect(producer.Close()).NotTo(HaveOccurred())
var messagesReceived int32 = 0
Expand Down Expand Up @@ -190,10 +188,8 @@ var _ = Describe("Streaming Consumers", func() {
}, NewConsumerOptions().
SetOffset(OffsetSpecification{}.First()))
Expect(err).NotTo(HaveOccurred())
result, err := producer.BatchSend(CreateArrayMessagesForTesting(3)) // batch Send
err = producer.BatchSend(CreateArrayMessagesForTesting(3)) // batch Send
Expect(err).NotTo(HaveOccurred())
Expect(result.TotalSent).To(Equal(3))
Expect(result.TotalFrames).To(Equal(1))

Expect(producer.Close()).NotTo(HaveOccurred())
Eventually(func() int32 {
Expand All @@ -209,10 +205,8 @@ var _ = Describe("Streaming Consumers", func() {
Expect(err).NotTo(HaveOccurred())

// Given we have produced 105 messages ...
result, err := producer.BatchSend(CreateArrayMessagesForTesting(105)) // batch Send
err = producer.BatchSend(CreateArrayMessagesForTesting(105)) // batch Send
Expect(err).NotTo(HaveOccurred())
Expect(result.TotalSent).To(Equal(105))
Expect(result.TotalFrames).To(Equal(1))

Expect(producer.Close()).NotTo(HaveOccurred())

Expand Down Expand Up @@ -351,10 +345,8 @@ var _ = Describe("Streaming Consumers", func() {
// same SetPublishingId
// even we publish the same array more times
for i := 0; i < 10; i++ {
result, err := producer.BatchSend(arr)
err = producer.BatchSend(arr)
Expect(err).NotTo(HaveOccurred())
Expect(result.TotalSent).To(Equal(len(arr)))
Expect(result.TotalFrames).To(Equal(1))

}

Expand Down Expand Up @@ -403,10 +395,8 @@ var _ = Describe("Streaming Consumers", func() {
_, err = env.QueryOffset("consumer_test", streamName)
Expect(err).To(HaveOccurred())

result, err := producer.BatchSend(CreateArrayMessagesForTesting(107))
err = producer.BatchSend(CreateArrayMessagesForTesting(107))
Expect(err).NotTo(HaveOccurred())
Expect(result.TotalSent).To(Equal(107))
Expect(result.TotalFrames).To(Equal(1))

Expect(producer.Close()).NotTo(HaveOccurred())
var messagesReceived int32 = 0
Expand Down Expand Up @@ -464,10 +454,8 @@ var _ = Describe("Streaming Consumers", func() {
It("Check already closed", func() {
producer, err := env.NewProducer(streamName, nil)
Expect(err).NotTo(HaveOccurred())
result, err := producer.BatchSend(CreateArrayMessagesForTesting(500))
err = producer.BatchSend(CreateArrayMessagesForTesting(500))
Expect(err).NotTo(HaveOccurred())
Expect(result.TotalSent).To(Equal(500))
Expect(result.TotalFrames).To(Equal(1))

defer func(producer *Producer) {
err := producer.Close()
Expand Down Expand Up @@ -706,10 +694,8 @@ var _ = Describe("Streaming Consumers", func() {
}

for i := 0; i < 50; i++ {
result, err2 := producer6Batch.BatchSend(batchMessages)
err2 := producer6Batch.BatchSend(batchMessages)
Expect(err2).NotTo(HaveOccurred())
Expect(result.TotalSent).To(Equal(len(batchMessages)))
Expect(result.TotalFrames).To(Equal(1))
}

var messagesReceived int32
Expand Down Expand Up @@ -753,10 +739,8 @@ var _ = Describe("Streaming Consumers", func() {
// so, even we set the SetPublishingId
// it will be ignored
for i := 0; i < 10; i++ {
result, err := producer.BatchSend(arr)
err := producer.BatchSend(arr)
Expect(err).NotTo(HaveOccurred())
Expect(result.TotalSent).To(Equal(len(arr)))
Expect(result.TotalFrames).To(Equal(1))
}

var messagesReceived int32 = 0
Expand Down
17 changes: 5 additions & 12 deletions pkg/stream/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"fmt"
"github.com/pkg/errors"
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp"
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/message"
"strconv"
"sync"
"time"
Expand Down Expand Up @@ -55,11 +54,9 @@ func (coordinator *Coordinator) NewProducer(
parameters *ProducerOptions) (*Producer, error) {
coordinator.mutex.Lock()
defer coordinator.mutex.Unlock()
size := 10000
adativeSize := 10000
dynSize := 10000
if parameters != nil {
size = parameters.QueueSize
adativeSize = parameters.BatchSize
dynSize = parameters.BatchSize
}

var lastId, err = coordinator.getNextProducerItem()
Expand All @@ -69,15 +66,11 @@ func (coordinator *Coordinator) NewProducer(
var producer = &Producer{id: lastId,
options: parameters,
mutex: &sync.RWMutex{},
mutexPending: &sync.Mutex{},
mutexUnconfirmed: &sync.Mutex{},
unConfirmedMessages: map[int64]*ConfirmationStatus{},
status: open,
messageSequenceCh: make(chan messageSequence, size),
dynamicSendCh: make(chan message.StreamMessage, adativeSize),
pendingMessages: pendingMessagesSequence{
messages: make([]*messageSequence, 0),
size: initBufferPublishSize,
}}
dynamicSendCh: make(chan *messageSequence, dynSize),
}
coordinator.producers[lastId] = producer
return producer, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/stream/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func (env *Environment) newReconnectClient() (*Client, error) {
logs.LogError("Can't connect the locator client, error:%s, retry in %d milliseconds, broker: ", err, sleepTime, brokerUri)

time.Sleep(time.Duration(sleepTime) * time.Millisecond)
r := rand.New(rand.NewSource(SEED))
r := rand.New(rand.NewSource(time.Now().Unix()))
n := r.Intn(len(env.options.ConnectionParameters))
client = newClient("stream-locator", env.options.ConnectionParameters[n], env.options.TCPParameters,
env.options.SaslConfiguration, env.options.RPCTimeout)
Expand Down
4 changes: 1 addition & 3 deletions pkg/stream/filtering_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,9 +214,7 @@ func send(producer *Producer, state string) {
msg.ApplicationProperties = map[string]interface{}{"state": state}
messages = append(messages, msg)
}
result, err := producer.BatchSend(messages)
err := producer.BatchSend(messages)
Expect(err).NotTo(HaveOccurred())
Expect(result.TotalSent).To(Equal(25))
Expect(result.TotalFrames).To(Equal(1))

}
Loading

0 comments on commit 3930f60

Please sign in to comment.