Node runs into a memory alarm with CQv2 and a fast publisher without publisher confirms #12885
-
Describe the bugI plan to upgrade RabbitMQ version 3.13.7 to 4.0.4. I test with publish data without consumers for load testing. I compare the memory usage between both versions. I found version 4.0.4 that consume memory higher than version 3.13.7. You can see it from my screenshot. Both are use the same code and same message for data publishing. FYI. I use classic queue version 2 with the single instance. Reproduction steps
package main
import (
"fmt"
"log"
"os"
"time"
"github.com/sourcegraph/conc"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func CreatePublisherConnectionQueueV2(uri, queueName string) (*amqp.Connection, *amqp.Channel, amqp.Queue) {
conn, err := amqp.Dial(uri)
failOnError(err, "Failed to connect to RabbitMQ")
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
q, err := ch.QueueDeclare(
queueName, // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
map[string]interface{}{
"x-queue-version": 2,
}, // arguments version 2
)
failOnError(err, "Failed to declare a queue")
return conn, ch, q
}
func process(inputQueueName string, size int) {
url := "amqp://user:password@localhost/"
body := []byte(`{"request":{"operation":{"_collection":"project_001","_category":"source_type","_reference":"content_123_reply_456"}},"data":{"script":"ctx._source.primary_tags=primary_tags;ctx._source.secondary_tags=secondary_tags;ctx._source.excluded_tags=excluded_tags;ctx._source.image_tags=image_tags;ctx._source.searchable_text=searchable_text;if(ctx._source.status!=1&&ctx._source.status!=4){ctx._source.status=status};if(ctx._source.emotion_changed==0){ctx._source.emotion=emotion};if(ctx._source.containsKey('monitoring')){ctx._source.monitoring.content=monitoring.content;ctx._source.monitoring.user=monitoring.user;ctx._source.monitoring.category=monitoring.category}else{ctx._source.monitoring=monitoring};if(ctx._source.containsKey('match_source')){ctx._source.match_source.monitor_content=match_source.monitor_content;ctx._source.match_source.monitor_user=match_source.monitor_user;ctx._source.match_source.monitor_category=match_source.monitor_category;ctx._source.match_source.tag_match=match_source.tag_match;ctx._source.match_source.text_scan=match_source.text_scan;ctx._source.match_source.image_scan=match_source.image_scan}else{ctx._source.match_source=match_source}","params":{"modified_at":"2024-06-06T07:37:47+00:00","primary_tags":["shopping_mall","mall"],"secondary_tags":["store"],"excluded_tags":[],"image_tags":[],"searchable_text":["store"],"monitoring":{"content":0,"user":0,"category":0},"status":1,"emotion":0,"match_source":{"monitor_content":0,"monitor_user":0,"monitor_category":0,"tag_match":0,"text_scan":1,"image_scan":0}}},"fallback":{"limit":0,"count":0}}`)
outputCon, outputCh, _ := CreatePublisherConnectionQueueV2(url, inputQueueName)
defer func() {
outputCh.Close()
outputCon.Close()
}()
outputCloseNotify := outputCon.NotifyClose(make(chan *amqp.Error))
go func() {
for range outputCloseNotify {
log.Print("Rabbit MQ connection lost")
os.Exit(1)
}
}()
var wg conc.WaitGroup
for i := 0; i < 10; i++ {
wg.Go(func() {
for j := 0; j < size; j++ {
err := outputCh.Publish(
"", // exchange
inputQueueName, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
DeliveryMode: amqp.Persistent,
ContentType: "text/plain",
Body: body,
})
failOnError(err, "Failed to publish a message")
}
})
}
fmt.Println("Waiting for all goroutines to finish")
wg.Wait()
fmt.Println("All for 10 seconds before closing the connection")
// sleep for 10 secs
time.Sleep(10 * time.Second)
}
func main() {
for i := 0; i < 2; i++ {
go process(fmt.Sprintf("test_%d", i), 100000000)
}
forever := make(chan bool)
<-forever
}
Expected behaviorVersion 4.0.4 should not consume higher memory than version 3.13.7 Additional contextI test both versions on instances that run on Ubuntu version 24.04 LTS in AWS EC2 instance type m6a.large. |
Beta Was this translation helpful? Give feedback.
Replies: 5 comments 18 replies
-
I'm not sure why the difference is so significant, but the main problem is that this code doesn't use publisher confirms and therefore just overloads RabbitMQ. unrelated to the question, but you should switch to https://github.com/rabbitmq/amqp091-go, which is a continuation of github.com/streadway/amqp |
Beta Was this translation helpful? Give feedback.
-
I will investigate it to see why this happens, but as mentioned above - generally speaking, your code should be using confirms and this wouldn't have happened. For the record, repro using https://perftest.rabbitmq.com/: |
Beta Was this translation helpful? Give feedback.
-
@songponlekpetch you haven't provided any evidence of a bug. There is a dedicated documentation guide and that explains how to collect data on what uses the memory. @mkuratczyk's hypothesis seem right. A publisher does drastically less work than RabbitMQ, so without publisher confirms it will eventually drive the node above the threshold after which an alarm is triggered and publishers are blocked to protect the node. If you need fire-and-forget publishing with maximum throughput and a mostly flat memory footprint, use streams or superstreams with a dedicated RabbitMQ Stream Protocol client https://github.com/rabbitmq/rabbitmq-stream-go-client. Quorum queues, as their docs explain, have a certain footprint that is usually stable within a range, so it looks like a zig-zag pattern. Even with tuning this won't change fundamentally. Classic queues, even v2, are less predictable than the other two types and using publisher confirms will be particularly important. |
Beta Was this translation helpful? Give feedback.
-
I am sorry but this proves absolutely nothing about the memory footprint of CQs in both versions. A single run of such publisher proves absolutely nothing. Long-running, more realistic workloads with metrics collected over the entire duration of the run do. |
Beta Was this translation helpful? Give feedback.
-
@songponlekpetch thanks to @gomoripeti, an unintentionally removed portion of classic queue flow control was restored in #12906, #12907. You can get a new development version to try from https://github.com/rabbitmq/server-packages/releases/tag/alphas.1733843440418. A |
Beta Was this translation helpful? Give feedback.
@songponlekpetch thanks to @gomoripeti, an unintentionally removed portion of classic queue flow control was restored in #12906, #12907.
You can get a new development version to try from https://github.com/rabbitmq/server-packages/releases/tag/alphas.1733843440418. A
4.0.x
version will be available in the same repo when we backport the changes. The plan right now is to ship this change in4.0.5
.