Skip to content

Commit

Permalink
Enable batching by default
Browse files Browse the repository at this point in the history
Keep the same behavior with other clients.
  • Loading branch information
BewareMyPower committed Aug 29, 2024
1 parent c3c12c4 commit 63a9907
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 6 deletions.
4 changes: 2 additions & 2 deletions pulsar/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -617,7 +617,7 @@ def create_producer(self, topic,
max_pending_messages=1000,
max_pending_messages_across_partitions=50000,
block_if_queue_full=False,
batching_enabled=False,
batching_enabled=True,
batching_max_messages=1000,
batching_max_allowed_size_in_bytes=128*1024,
batching_max_publish_delay_ms=10,
Expand Down Expand Up @@ -670,7 +670,7 @@ def create_producer(self, topic,
SNAPPY is supported since Pulsar 2.4. Consumers will need to be at least at that release in order to
be able to receive messages compressed with SNAPPY.
batching_enabled: bool, default=False
batching_enabled: bool, default=True
When automatic batching is enabled, multiple calls to `send` can result in a single batch to be sent to the
broker, leading to better throughput, especially when publishing small messages.
All messages in a batch will be published as a single batched message. The consumer will be delivered
Expand Down
11 changes: 7 additions & 4 deletions tests/pulsar_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,18 +146,21 @@ def test_producer_send_async(self):

sent_messages = []

event = threading.Event()
def send_callback(producer, msg):
sent_messages.append(msg)
if len(sent_messages) >= 3:
event.set()

producer.send_async(b"hello", send_callback)
producer.send_async(b"hello", send_callback)
producer.send_async(b"hello", send_callback)

i = 0
while len(sent_messages) < 3 and i < 100:
time.sleep(0.1)
i += 1
event.wait(3000)
self.assertEqual(len(sent_messages), 3)
for i in range(0, len(sent_messages)):
msg_id = sent_messages[i]
self.assertEqual(msg_id.batch_index(), i)
client.close()

def test_producer_send(self):
Expand Down

0 comments on commit 63a9907

Please sign in to comment.