Skip to content

Commit

Permalink
feat: Support batch receive for consumer.
Browse files Browse the repository at this point in the history
  • Loading branch information
shibd committed Dec 5, 2023
1 parent 705c1fe commit b9b165f
Show file tree
Hide file tree
Showing 8 changed files with 195 additions and 32 deletions.
8 changes: 8 additions & 0 deletions index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,12 @@ export interface ConsumerConfig {
batchIndexAckEnabled?: boolean;
regexSubscriptionMode?: RegexSubscriptionMode;
deadLetterPolicy?: DeadLetterPolicy;
batchReceivePolicy?: ConsumerBatchReceivePolicy;
}

export class Consumer {
receive(timeout?: number): Promise<Message>;
batchReceive(): Promise<Message []>;
acknowledge(message: Message): Promise<null>;
acknowledgeId(messageId: MessageId): Promise<null>;
negativeAcknowledge(message: Message): void;
Expand Down Expand Up @@ -181,6 +183,12 @@ export interface DeadLetterPolicy {
initialSubscriptionName?: string;
}

export interface ConsumerBatchReceivePolicy {
maxNumMessages?: number;
maxNumBytes?: number;
timeoutMs?: number;
}

export class AuthenticationTls {
constructor(params: { certificatePath: string, privateKeyPath: string });
}
Expand Down
65 changes: 40 additions & 25 deletions src/Consumer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ void Consumer::Init(Napi::Env env, Napi::Object exports) {
DefineClass(env, "Consumer",
{
InstanceMethod("receive", &Consumer::Receive),
InstanceMethod("batchReceive", &Consumer::BatchReceive),
InstanceMethod("acknowledge", &Consumer::Acknowledge),
InstanceMethod("acknowledgeId", &Consumer::AcknowledgeId),
InstanceMethod("negativeAcknowledge", &Consumer::NegativeAcknowledge),
Expand Down Expand Up @@ -192,36 +193,17 @@ struct ConsumerNewInstanceContext {
Napi::Value Consumer::NewInstance(const Napi::CallbackInfo &info, std::shared_ptr<pulsar_client_t> cClient) {
auto deferred = ThreadSafeDeferred::New(info.Env());
auto config = info[0].As<Napi::Object>();
std::shared_ptr<ConsumerConfig> consumerConfig = std::make_shared<ConsumerConfig>(config, &MessageListener);
std::shared_ptr<ConsumerConfig> consumerConfig = std::make_shared<ConsumerConfig>();

consumerConfig->InitConfig(deferred, config, &MessageListener);
if (deferred->IsDone()) {
return deferred->Promise();
}

const std::string &topic = consumerConfig->GetTopic();
const std::vector<std::string> &topics = consumerConfig->GetTopics();
const std::string &topicsPattern = consumerConfig->GetTopicsPattern();
if (topic.empty() && topics.size() == 0 && topicsPattern.empty()) {
deferred->Reject(
std::string("Topic, topics or topicsPattern is required and must be specified as a string when "
"creating consumer"));
return deferred->Promise();
}
const std::string &subscription = consumerConfig->GetSubscription();
if (subscription.empty()) {
deferred->Reject(
std::string("Subscription is required and must be specified as a string when creating consumer"));
return deferred->Promise();
}
int32_t ackTimeoutMs = consumerConfig->GetAckTimeoutMs();
if (ackTimeoutMs != 0 && ackTimeoutMs < MIN_ACK_TIMEOUT_MILLIS) {
std::string msg("Ack timeout should be 0 or greater than or equal to " +
std::to_string(MIN_ACK_TIMEOUT_MILLIS));
deferred->Reject(msg);
return deferred->Promise();
}
int32_t nAckRedeliverTimeoutMs = consumerConfig->GetNAckRedeliverTimeoutMs();
if (nAckRedeliverTimeoutMs < 0) {
std::string msg("NAck timeout should be greater than or equal to zero");
deferred->Reject(msg);
return deferred->Promise();
}

auto ctx = new ConsumerNewInstanceContext(deferred, cClient, consumerConfig);

Expand Down Expand Up @@ -291,6 +273,39 @@ class ConsumerReceiveWorker : public Napi::AsyncWorker {
int64_t timeout;
};

Napi::Value Consumer::BatchReceive(const Napi::CallbackInfo &info) {
auto deferred = ThreadSafeDeferred::New(Env());
auto ctx = new ExtDeferredContext(deferred);
pulsar_consumer_batch_receive_async(
this->cConsumer.get(),
[](pulsar_result result, pulsar_messages_t *rawMessages, void *ctx) {
auto deferredContext = static_cast<ExtDeferredContext *>(ctx);
auto deferred = deferredContext->deferred;
delete deferredContext;

if (result != pulsar_result_Ok) {
deferred->Reject(std::string("Failed to batch receive message: ") + pulsar_result_str(result));
} else {
deferred->Resolve([rawMessages](const Napi::Env env) {
int listSize = pulsar_messages_size(rawMessages);
Napi::Array jsArray = Napi::Array::New(env, listSize);
for (int i = 0; i < listSize; i++) {
pulsar_message_t *rawMessage = pulsar_messages_get(rawMessages, i);
pulsar_message_t *message = pulsar_message_create();
pulsar_message_copy(rawMessage, message);
Napi::Object obj =
Message::NewInstance({}, std::shared_ptr<pulsar_message_t>(message, pulsar_message_free));
jsArray.Set(i, obj);
}
pulsar_messages_free(rawMessages);
return jsArray;
});
}
},
ctx);
return deferred->Promise();
}

Napi::Value Consumer::Receive(const Napi::CallbackInfo &info) {
if (info[0].IsUndefined()) {
auto deferred = ThreadSafeDeferred::New(Env());
Expand Down
1 change: 1 addition & 0 deletions src/Consumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class Consumer : public Napi::ObjectWrap<Consumer> {
MessageListenerCallback *listener;

Napi::Value Receive(const Napi::CallbackInfo &info);
Napi::Value BatchReceive(const Napi::CallbackInfo &info);
Napi::Value Acknowledge(const Napi::CallbackInfo &info);
Napi::Value AcknowledgeId(const Napi::CallbackInfo &info);
void NegativeAcknowledge(const Napi::CallbackInfo &info);
Expand Down
71 changes: 65 additions & 6 deletions src/ConsumerConfig.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ static const std::string CFG_DEAD_LETTER_POLICY = "deadLetterPolicy";
static const std::string CFG_DLQ_POLICY_TOPIC = "deadLetterTopic";
static const std::string CFG_DLQ_POLICY_MAX_REDELIVER_COUNT = "maxRedeliverCount";
static const std::string CFG_DLQ_POLICY_INIT_SUB_NAME = "initialSubscriptionName";
static const std::string CFG_BATCH_RECEIVE_POLICY = "batchReceivePolicy";
static const std::string CFG_BATCH_RECEIVE_POLICY_MAX_NUM_MESSAGES = "maxNumMessages";
static const std::string CFG_BATCH_RECEIVE_POLICY_MAX_NUM_BYTES = "maxNumBytes";
static const std::string CFG_BATCH_RECEIVE_POLICY_TIMEOUT_MS = "timeoutMs";

static const std::map<std::string, pulsar_consumer_type> SUBSCRIPTION_TYPE = {
{"Exclusive", pulsar_ConsumerExclusive},
Expand All @@ -74,7 +78,7 @@ static const std::map<std::string, pulsar_consumer_crypto_failure_action> CONSUM

void FinalizeListenerCallback(Napi::Env env, MessageListenerCallback *cb, void *) { delete cb; }

ConsumerConfig::ConsumerConfig(const Napi::Object &consumerConfig, pulsar_message_listener messageListener)
ConsumerConfig::ConsumerConfig()
: topic(""),
topicsPattern(""),
subscription(""),
Expand All @@ -83,7 +87,10 @@ ConsumerConfig::ConsumerConfig(const Napi::Object &consumerConfig, pulsar_messag
listener(nullptr) {
this->cConsumerConfig = std::shared_ptr<pulsar_consumer_configuration_t>(
pulsar_consumer_configuration_create(), pulsar_consumer_configuration_free);
}

void ConsumerConfig::InitConfig(const std::shared_ptr<ThreadSafeDeferred> deferred,
const Napi::Object &consumerConfig, pulsar_message_listener messageListener) {
if (consumerConfig.Has(CFG_TOPIC) && consumerConfig.Get(CFG_TOPIC).IsString()) {
this->topic = consumerConfig.Get(CFG_TOPIC).ToString().Utf8Value();
}
Expand All @@ -101,9 +108,21 @@ ConsumerConfig::ConsumerConfig(const Napi::Object &consumerConfig, pulsar_messag
this->topicsPattern = consumerConfig.Get(CFG_TOPICS_PATTERN).ToString().Utf8Value();
}

if (this->topic.empty() && this->topics.size() == 0 && this->topicsPattern.empty()) {
deferred->Reject(
std::string("Topic, topics or topicsPattern is required and must be specified as a string when "
"creating consumer"));
return;
}

if (consumerConfig.Has(CFG_SUBSCRIPTION) && consumerConfig.Get(CFG_SUBSCRIPTION).IsString()) {
this->subscription = consumerConfig.Get(CFG_SUBSCRIPTION).ToString().Utf8Value();
}
if (subscription.empty()) {
deferred->Reject(
std::string("Subscription is required and must be specified as a string when creating consumer"));
return;
}

if (consumerConfig.Has(CFG_SUBSCRIPTION_TYPE) && consumerConfig.Get(CFG_SUBSCRIPTION_TYPE).IsString()) {
std::string subscriptionType = consumerConfig.Get(CFG_SUBSCRIPTION_TYPE).ToString().Utf8Value();
Expand Down Expand Up @@ -139,18 +158,25 @@ ConsumerConfig::ConsumerConfig(const Napi::Object &consumerConfig, pulsar_messag

if (consumerConfig.Has(CFG_ACK_TIMEOUT) && consumerConfig.Get(CFG_ACK_TIMEOUT).IsNumber()) {
this->ackTimeoutMs = consumerConfig.Get(CFG_ACK_TIMEOUT).ToNumber().Int64Value();
if (this->ackTimeoutMs == 0 || this->ackTimeoutMs >= MIN_ACK_TIMEOUT_MILLIS) {
pulsar_consumer_set_unacked_messages_timeout_ms(this->cConsumerConfig.get(), this->ackTimeoutMs);
if (this->ackTimeoutMs != 0 && ackTimeoutMs < MIN_ACK_TIMEOUT_MILLIS) {
std::string msg("Ack timeout should be 0 or greater than or equal to " +
std::to_string(MIN_ACK_TIMEOUT_MILLIS));
deferred->Reject(msg);
return;
}
pulsar_consumer_set_unacked_messages_timeout_ms(this->cConsumerConfig.get(), this->ackTimeoutMs);
}

if (consumerConfig.Has(CFG_NACK_REDELIVER_TIMEOUT) &&
consumerConfig.Get(CFG_NACK_REDELIVER_TIMEOUT).IsNumber()) {
this->nAckRedeliverTimeoutMs = consumerConfig.Get(CFG_NACK_REDELIVER_TIMEOUT).ToNumber().Int64Value();
if (this->nAckRedeliverTimeoutMs >= 0) {
pulsar_configure_set_negative_ack_redelivery_delay_ms(this->cConsumerConfig.get(),
this->nAckRedeliverTimeoutMs);
if (nAckRedeliverTimeoutMs < 0) {
std::string msg("NAck timeout should be greater than or equal to zero");
deferred->Reject(msg);
return;
}
pulsar_configure_set_negative_ack_redelivery_delay_ms(this->cConsumerConfig.get(),
this->nAckRedeliverTimeoutMs);
}

if (consumerConfig.Has(CFG_RECV_QUEUE) && consumerConfig.Get(CFG_RECV_QUEUE).IsNumber()) {
Expand Down Expand Up @@ -265,6 +291,39 @@ ConsumerConfig::ConsumerConfig(const Napi::Object &consumerConfig, pulsar_messag
}
pulsar_consumer_configuration_set_dlq_policy(this->cConsumerConfig.get(), &dlq_policy);
}

if (consumerConfig.Has(CFG_BATCH_RECEIVE_POLICY) &&
consumerConfig.Get(CFG_BATCH_RECEIVE_POLICY).IsObject()) {
Napi::Object propObj = consumerConfig.Get(CFG_BATCH_RECEIVE_POLICY).ToObject();
int maxNumMessages = -1;
if (propObj.Has(CFG_BATCH_RECEIVE_POLICY_MAX_NUM_MESSAGES) &&
propObj.Get(CFG_BATCH_RECEIVE_POLICY_MAX_NUM_MESSAGES).IsNumber()) {
maxNumMessages = propObj.Get(CFG_BATCH_RECEIVE_POLICY_MAX_NUM_MESSAGES).ToNumber().Int32Value();
}
int maxNumBytes = 10 * 1024 * 1024;
if (propObj.Has(CFG_BATCH_RECEIVE_POLICY_MAX_NUM_BYTES) &&
propObj.Get(CFG_BATCH_RECEIVE_POLICY_MAX_NUM_BYTES).IsNumber()) {
maxNumBytes = propObj.Get(CFG_BATCH_RECEIVE_POLICY_MAX_NUM_BYTES).ToNumber().Int64Value();
}
int timeoutMs = 100;
if (propObj.Has(CFG_BATCH_RECEIVE_POLICY_TIMEOUT_MS) &&
propObj.Get(CFG_BATCH_RECEIVE_POLICY_TIMEOUT_MS).IsNumber()) {
timeoutMs = propObj.Get(CFG_BATCH_RECEIVE_POLICY_TIMEOUT_MS).ToNumber().Int64Value();
}
if (maxNumMessages <= 0 && maxNumBytes <= 0 && timeoutMs <= 0) {
std::string msg("At least one of maxNumMessages, maxNumBytes and timeoutMs must be specified.");
deferred->Reject(msg);
return;
}
pulsar_consumer_batch_receive_policy_t batch_receive_policy{maxNumMessages, maxNumBytes, timeoutMs};
int result = pulsar_consumer_configuration_set_batch_receive_policy(this->cConsumerConfig.get(),
&batch_receive_policy);
if (result == -1) {
std::string msg("Set batch receive policy failed: C client returned failure");
deferred->Reject(msg);
return;
}
}
}

ConsumerConfig::~ConsumerConfig() {
Expand Down
5 changes: 4 additions & 1 deletion src/ConsumerConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,17 @@
#define CONSUMER_CONFIG_H

#include <pulsar/c/consumer_configuration.h>
#include "ThreadSafeDeferred.h"
#include "MessageListener.h"

#define MIN_ACK_TIMEOUT_MILLIS 10000

class ConsumerConfig {
public:
ConsumerConfig(const Napi::Object &consumerConfig, pulsar_message_listener messageListener);
ConsumerConfig();
~ConsumerConfig();
void InitConfig(const std::shared_ptr<ThreadSafeDeferred> deferred, const Napi::Object &consumerConfig,
pulsar_message_listener messageListener);
std::shared_ptr<pulsar_consumer_configuration_t> GetCConsumerConfig();
std::string GetTopic();
std::vector<std::string> GetTopics();
Expand Down
4 changes: 4 additions & 0 deletions src/ThreadSafeDeferred.cc
Original file line number Diff line number Diff line change
Expand Up @@ -96,3 +96,7 @@ void ThreadSafeDeferred::Reject(const std::string &errorMsg) {
this->fate = EFate::REJECTED;
this->tsf.Release();
}

bool ThreadSafeDeferred::IsDone() const {
return this->fate == EFate::RESOLVED || this->fate == EFate::REJECTED;
}
1 change: 1 addition & 0 deletions src/ThreadSafeDeferred.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ class ThreadSafeDeferred : public Napi::Promise::Deferred {
inline void Reject() { this->Reject(""); }
void Reject(
const std::string &); // <- if only Reject were virtual... But we can live without polymorphism here
bool IsDone() const;

static std::shared_ptr<ThreadSafeDeferred> New(const Napi::Env env);
};
Expand Down
72 changes: 72 additions & 0 deletions tests/consumer.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,37 @@ const Pulsar = require('../index');
nAckRedeliverTimeoutMs: -12,
})).rejects.toThrow('NAck timeout should be greater than or equal to zero');
});

test('Ack timeout less 10000', async () => {
await expect(client.subscribe({
topic: 'test-topic',
subscription: 'sub1',
subscriptionType: 'Shared',
ackTimeoutMs: 100,
})).rejects.toThrow('Ack timeout should be 0 or greater than or equal to 10000');
});

test('NAck timeout less 0', async () => {
await expect(client.subscribe({
topic: 'test-topic',
subscription: 'sub1',
subscriptionType: 'Shared',
nAckRedeliverTimeoutMs: -1,
})).rejects.toThrow('NAck timeout should be greater than or equal to zero');
});

test('Batch Receive Config Error', async () => {
await expect(client.subscribe({
topic: 'test-batch-receive-policy-error',
subscription: 'sub1',
subscriptionType: 'Shared',
batchReceivePolicy: {
maxNumMessages: -1,
maxNumBytes: -1,
timeoutMs: -1,
},
})).rejects.toThrow('At least one of maxNumMessages, maxNumBytes and timeoutMs must be specified.');
});
});

describe('Close', () => {
Expand Down Expand Up @@ -315,6 +346,47 @@ const Pulsar = require('../index');
consumer.close();
dlqConsumer.close();
});

test('Batch Receive', async () => {
const topicName = 'batch-receive-test-topic';
const producer = await client.createProducer({
topic: topicName,
});

const consumer = await client.subscribe({
topic: topicName,
subscription: 'sub1',
subscriptionType: 'Shared',
batchReceivePolicy: {
maxNumMessages: 10,
maxNumBytes: -1,
timeoutMs: 500,
},
});
const num = 10;
const messages = [];
for (let i = 0; i < num; i += 1) {
const msg = `my-message-${i}`;
await producer.send({ data: Buffer.from(msg) });
messages.push(msg);
}

const receiveMessages = await consumer.batchReceive();
expect(receiveMessages.length).toEqual(num);
const results = [];
for (let i = 0; i < receiveMessages.length; i += 1) {
const msg = receiveMessages[i];
console.log(msg.getData().toString());
results.push(msg.getData().toString());
}
expect(results).toEqual(messages);

// assert no more msgs.
expect(await consumer.batchReceive()).toEqual([]);

await producer.close();
await consumer.close();
});
});
});
})();

0 comments on commit b9b165f

Please sign in to comment.