-
Notifications
You must be signed in to change notification settings - Fork 3.6k
PIP 79: Reduce redundant producers from partitioned producer
- Status: Under Discussion
- Author: Yuri Mizushima
- Pull Request:
- Mailing List discussion:
- Release:
When producer connects to partitioned topic, sometimes doesn't need to connect to all of partitions depending on rate, routing mode, etc. Some cases as below.
- different rate producers
- we should set number of partitions according to the high-rate producer
- sometimes low-rate producer doesn't need to connect to all of partitions
- SinglePartition routing mode
- each producer uses only one partition
In this PIP, we reduce the number of producers to use system resources (e.g. connections between Client and Broker, memory usage of both Client and Broker) more efficiently. Here is an image of concept (second one is new concept). As you can see, each partitioned producer connects to part of partitions and the number of producers is less than or equal to 24.
Also, this change allows us to scale producers by increasing partitions even where maxProducersPerTopic
is set.
For example, suppose system has producer limit. Currently, if topic reaches to its limit, then we have to create new topics or increase limit config. But suppose all producers connect to system with using above features, we can avoid producers from connecting to all partitions and thus scale producers by only increasing partitions (without increasing the max producer limit or creating new topics).
The following changes will be introduced to a public interface:
- Add "limit number of partitions" config to
org.apache.pulsar.client.api.ProducerBuilder
interface
Add limit number of partitions feature to producer.
We would like to implement producer which can set limit number of partitions. When the setting is enabled, numPartitionsLimit partitions will be selected randomly from all partitions, and then each partitioned producer creates only numPartitionsLimit producer instances. If producer can't connect to the partition, try to connect to another partition. Sample of interface as below.
Producer<byte[]> producer = pulsarClient.newProducer()
.topic("persistent://public/default/pt")
.numPartitionsLimit(10) // added
.create();
Because of message ordering issue, this feature will be allowed in only RoundRobin routing mode. (Some custom routing mode can use the feature, but it's out of this PR's scope.)
In SinglePartition routing mode, each partitioned producer uses only ONE partition to send messages. However, currently each partitioned producer creates producers to ALL partitions. We would like to reduce such "redundant" producers.
To enable this feature, set a config from producer side like below.
Producer<byte[]> producer = pulsarClient.newProducer()
.topic("persistent://public/default/pt")
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.numPartitionsLimit(1) // added
.create();
After the numPartitionsLimit
feature becomes stable, we are planning to change producer to connect to only one partition without enabling numPartitionsLimit(1)
setting in SinglePartition mode.
Currently, if each partition has different producers like above, then PartitionedTopicStats will be incorrect. We would like to change logic to collect PartitionedTopicStats
This change allows us to get PartitionedTopicStats correctly not only when producers connect to all partitions, but also only part of partitions.
- What impact (if any) will there be on existing users?
- can get PartitionedTopicStats correctly not only when producers connect to all partitions, but also only part of partitions
- If we are changing behavior how will we phase out the older behavior?
- After the
numPartitionsLimit
feature becomes stable, we are planning to change producer to connect to only one partition without enablingnumPartitionsLimit(1)
setting in SinglePartition mode - we would like to change logic to collect PartitionedTopicStats, but this change is one of fixing issue
- After the
- If we need special migration tools, describe them here.
- don't need
- When will we remove the existing behavior?
- don't remove any behavior
We would like to validate these features by E2E test code.