diff --git a/examples/kafkajs/admin/fetch-topic-offsets.js b/examples/kafkajs/admin/fetch-topic-offsets.js new file mode 100644 index 00000000..38773b17 --- /dev/null +++ b/examples/kafkajs/admin/fetch-topic-offsets.js @@ -0,0 +1,77 @@ +const { Kafka, IsolationLevel } = require('@confluentinc/kafka-javascript').KafkaJS; +const { parseArgs } = require('node:util'); + +async function fetchOffsets() { + // Parse command-line arguments + const args = parseArgs({ + allowPositionals: true, + options: { + 'bootstrap-servers': { + type: 'string', + short: 'b', + default: 'localhost:9092', + }, + 'timeout': { + type: 'string', + short: 't', + default: '5000', + }, + 'isolation-level': { + type: 'string', + short: 'i', + default: '0', // Default to '0' for read_uncommitted + }, + }, + }); + + const { + 'bootstrap-servers': bootstrapServers, + timeout, + 'isolation-level': isolationLevel, + } = args.values; + + const [topic] = args.positionals; + + if (!topic) { + console.error('Topic name is required'); + process.exit(1); + } + + // Determine the isolation level + let isolationLevelValue; + if (isolationLevel === '0') { + isolationLevelValue = IsolationLevel.READ_UNCOMMITTED; + } else if (isolationLevel === '1') { + isolationLevelValue = IsolationLevel.READ_COMMITTED; + } else { + console.error('Invalid isolation level. Use 0 for READ_UNCOMMITTED or 1 for READ_COMMITTED.'); + process.exit(1); + } + + const kafka = new Kafka({ + kafkaJS: { + brokers: [bootstrapServers], + }, + }); + + const admin = kafka.admin(); + await admin.connect(); + + try { + // Fetch offsets for the specified topic + const offsets = await admin.fetchTopicOffsets( + topic, + { + isolationLevel: isolationLevelValue, // Use determined isolation level + timeout: Number(timeout), // Convert timeout to a number + }); + + console.log(`Offsets for topic "${topic}":`, JSON.stringify(offsets, null, 2)); + } catch (err) { + console.error('Error fetching topic offsets:', err); + } finally { + await admin.disconnect(); + } +} + +fetchOffsets(); diff --git a/lib/admin.js b/lib/admin.js index 24e38f21..3a76ce90 100644 --- a/lib/admin.js +++ b/lib/admin.js @@ -38,11 +38,52 @@ const AclOperationTypes = Object.seal({ IDEMPOTENT_WRITE: 12, }); +/** + * A list of isolation levels. + * @enum {number} + * @readonly + * @memberof RdKafka + */ +const IsolationLevel = Object.seal({ + READ_UNCOMMITTED: 0, + READ_COMMITTED: 1, +}); + +/** + * Define an OffsetSpec to list offsets at. + * Either a timestamp can be used, or else, one of the special, pre-defined values + * (EARLIEST, LATEST, MAX_TIMESTAMP) can be used while passing an OffsetSpec to listOffsets. + * @param {number} timestamp - The timestamp to list offsets at. + * @constructor + */ +function OffsetSpec(timestamp) { + this.timestamp = timestamp; +} + +/** + * Specific OffsetSpec value used to retrieve the offset with the largest timestamp of a partition + * as message timestamps can be specified client side this may not match + * the log end offset returned by OffsetSpec.LATEST. + */ +OffsetSpec.MAX_TIMESTAMP = new OffsetSpec(-3); + +/** + * Special OffsetSpec value denoting the earliest offset for a topic partition. + */ +OffsetSpec.EARLIEST = new OffsetSpec(-2); + +/** + * Special OffsetSpec value denoting the latest offset for a topic partition. + */ +OffsetSpec.LATEST = new OffsetSpec(-1); + module.exports = { create: createAdminClient, createFrom: createAdminClientFrom, ConsumerGroupStates, AclOperationTypes, + IsolationLevel: Object.freeze(IsolationLevel), + OffsetSpec, }; var Client = require('./client'); @@ -50,6 +91,8 @@ var util = require('util'); var Kafka = require('../librdkafka'); var LibrdKafkaError = require('./error'); var { shallowCopy } = require('./util'); +// eslint-disable-next-line no-unused-vars +const { RdKafka } = require('../'); util.inherits(AdminClient, Client); @@ -609,3 +652,45 @@ AdminClient.prototype.describeTopics = function (topics, options, cb) { } }); }; + +/** + * List offsets for topic partition(s). + * + * @param {Array<{topic: string, partition: number, offset: OffsetSpec}>} partitions - The list of partitions to fetch offsets for. + * @param {any?} options + * @param {number?} options.timeout - The request timeout in milliseconds. + * May be unset (default: 5000) + * @param {RdKafka.IsolationLevel?} options.isolationLevel - The isolation level for reading the offsets. + * (default: READ_UNCOMMITTED) + * @param {function} cb - The callback to be executed when finished. + */ +AdminClient.prototype.listOffsets = function (partitions, options, cb) { + if (!this._isConnected) { + throw new Error('Client is disconnected'); + } + + if(!options) { + options = {}; + } + + if (!Object.hasOwn(options, 'timeout')) { + options.timeout = 5000; + } + + if(!Object.hasOwn(options, 'isolationLevel')) { + options.isolationLevel = IsolationLevel.READ_UNCOMMITTED; + } + + this._client.listOffsets(partitions, options, function (err, offsets) { + if (err) { + if (cb) { + cb(LibrdKafkaError.create(err)); + } + return; + } + + if (cb) { + cb(null, offsets); + } + }); +}; diff --git a/lib/kafkajs/_admin.js b/lib/kafkajs/_admin.js index 99527c53..ee3d2127 100644 --- a/lib/kafkajs/_admin.js +++ b/lib/kafkajs/_admin.js @@ -1,3 +1,4 @@ +const { OffsetSpec } = require('../admin'); const RdKafka = require('../rdkafka'); const { kafkaJSToRdKafkaConfig, createKafkaJsErrorFromLibRdKafkaError, @@ -10,6 +11,7 @@ const { kafkaJSToRdKafkaConfig, severityToLogLevel, } = require('./_common'); const error = require('./_error'); +const { hrtime } = require('process'); /** * NOTE: The Admin client is currently in an experimental state with many @@ -666,10 +668,131 @@ class Admin { }); }); } + + /** + * List offsets for the specified topic partition(s). + * + * @param {string} topic - The topic to fetch offsets for. + * @param {object} options + * @param {number?} options.timeout - The request timeout in milliseconds. + * May be unset (default: 5000) + * @param {kafkaJS.IsolationLevel?} options.isolationLevel - The isolation level for reading the offsets. + * (default: READ_UNCOMMITTED) + * + * @returns {Promise>} + */ + async fetchTopicOffsets(topic, options = {}) { + if (this.#state !== AdminState.CONNECTED) { + throw new error.KafkaJSError("Admin client is not connected.", { code: error.ErrorCodes.ERR__STATE }); + } + + if (!Object.hasOwn(options, 'timeout')) { + options.timeout = 5000; + } + + let topicData; + let startTime, endTime, timeTaken; + + try { + // Measure time taken for fetchTopicMetadata + startTime = hrtime.bigint(); + topicData = await this.fetchTopicMetadata({ topics: [topic], timeout: options.timeout }); + endTime = hrtime.bigint(); + timeTaken = Number(endTime - startTime) / 1e6; // Convert nanoseconds to milliseconds + + // Adjust timeout for the next request + options.timeout -= timeTaken; + if (options.timeout <= 0) { + throw new error.KafkaJSError("Timeout exceeded while fetching topic metadata.", { code: error.ErrorCodes.ERR__TIMED_OUT }); + } + } catch (err) { + throw new createKafkaJsErrorFromLibRdKafkaError(err); + } + + const partitionIds = topicData.flatMap(topic => + topic.partitions.map(partition => partition.partitionId) + ); + + const topicPartitionOffsetsLatest = partitionIds.map(partitionId => ({ + topic, + partition: partitionId, + offset: OffsetSpec.LATEST + })); + + const topicPartitionOffsetsEarliest = partitionIds.map(partitionId => ({ + topic, + partition: partitionId, + offset: OffsetSpec.EARLIEST + })); + console.log(OffsetSpec.LATEST); + console.log(OffsetSpec.EARLIEST); + + try { + // Measure time taken for listOffsets (latest) + startTime = hrtime.bigint(); + const latestOffsets = await this.#listOffsets(topicPartitionOffsetsLatest, options); + endTime = hrtime.bigint(); + timeTaken = Number(endTime - startTime) / 1e6; // Convert nanoseconds to milliseconds + + // Adjust timeout for the next request + options.timeout -= timeTaken; + if (options.timeout <= 0) { + throw new error.KafkaJSError("Timeout exceeded while fetching latest offsets.", { code: error.ErrorCodes.ERR__TIMED_OUT }); + } + + // Measure time taken for listOffsets (earliest) + startTime = hrtime.bigint(); + const earliestOffsets = await this.#listOffsets(topicPartitionOffsetsEarliest, options); + endTime = hrtime.bigint(); + timeTaken = Number(endTime - startTime) / 1e6; // Convert nanoseconds to milliseconds + + // Adjust timeout for the next request + options.timeout -= timeTaken; + if (options.timeout <= 0) { + throw new error.KafkaJSError("Timeout exceeded while fetching earliest offsets.", { code: error.ErrorCodes.ERR__TIMED_OUT }); + } + + const combinedResults = partitionIds.map(partitionId => { + const latest = latestOffsets.find(offset => offset.partition === partitionId); + const earliest = earliestOffsets.find(offset => offset.partition === partitionId); + + return { + partition: partitionId, + offset: latest.offset.toString(), + high: latest.offset.toString(), + low: earliest.offset.toString() + }; + }); + + return combinedResults; + } catch (err) { + throw createKafkaJsErrorFromLibRdKafkaError(err); + } + } + + #listOffsets(partitionOffsets, options) { + return new Promise((resolve, reject) => { + this.#internalClient.listOffsets(partitionOffsets, options, (err, offsets) => { + if (err) { + reject(createKafkaJsErrorFromLibRdKafkaError(err)); + } else { + resolve(offsets); + } + }); + }); + } } module.exports = { Admin, ConsumerGroupStates: RdKafka.AdminClient.ConsumerGroupStates, - AclOperationTypes: RdKafka.AdminClient.AclOperationTypes + AclOperationTypes: RdKafka.AdminClient.AclOperationTypes, + /** + * A list of isolation levels. + * @enum {number} + * @readonly + * @memberof KafkaJS + * @see RdKafka.IsolationLevel + */ + IsolationLevel: RdKafka.AdminClient.IsolationLevel }; diff --git a/lib/kafkajs/_kafka.js b/lib/kafkajs/_kafka.js index 49a97d17..48feed68 100644 --- a/lib/kafkajs/_kafka.js +++ b/lib/kafkajs/_kafka.js @@ -1,6 +1,6 @@ const { Producer, CompressionTypes } = require('./_producer'); const { Consumer, PartitionAssigners } = require('./_consumer'); -const { Admin, ConsumerGroupStates, AclOperationTypes } = require('./_admin'); +const { Admin, ConsumerGroupStates, AclOperationTypes, IsolationLevel } = require('./_admin'); const error = require('./_error'); const { logLevel, checkIfKafkaJsKeysPresent, CompatibilityErrorMessages } = require('./_common'); @@ -87,4 +87,5 @@ module.exports = { PartitionAssignors: PartitionAssigners, CompressionTypes, ConsumerGroupStates, - AclOperationTypes }; + AclOperationTypes, + IsolationLevel}; diff --git a/src/admin.cc b/src/admin.cc index 51eadbb2..52a1ce05 100644 --- a/src/admin.cc +++ b/src/admin.cc @@ -122,6 +122,7 @@ void AdminClient::Init(v8::Local exports) { Nan::SetPrototypeMethod(tpl, "createPartitions", NodeCreatePartitions); Nan::SetPrototypeMethod(tpl, "deleteRecords", NodeDeleteRecords); Nan::SetPrototypeMethod(tpl, "describeTopics", NodeDescribeTopics); + Nan::SetPrototypeMethod(tpl, "listOffsets", NodeListOffsets); // Consumer group related operations Nan::SetPrototypeMethod(tpl, "listGroups", NodeListGroups); @@ -902,6 +903,74 @@ Baton AdminClient::DescribeTopics(rd_kafka_TopicCollection_t *topics, } } + +Baton AdminClient::ListOffsets(rd_kafka_topic_partition_list_t *partitions, + int timeout_ms, + rd_kafka_IsolationLevel_t isolation_level, + rd_kafka_event_t **event_response) { + if (!IsConnected()) { + return Baton(RdKafka::ERR__STATE); + } + + { + scoped_shared_write_lock lock(m_connection_lock); + if (!IsConnected()) { + return Baton(RdKafka::ERR__STATE); + } + + // Make admin options to establish that we are fetching offsets + rd_kafka_AdminOptions_t *options = rd_kafka_AdminOptions_new( + m_client->c_ptr(), RD_KAFKA_ADMIN_OP_LISTOFFSETS); + + char errstr[512]; + rd_kafka_resp_err_t err = rd_kafka_AdminOptions_set_request_timeout( + options, timeout_ms, errstr, sizeof(errstr)); + if (err != RD_KAFKA_RESP_ERR_NO_ERROR) { + return Baton(static_cast(err), errstr); + } + + rd_kafka_error_t *error = + rd_kafka_AdminOptions_set_isolation_level(options, isolation_level); + if (error) { + return Baton::BatonFromErrorAndDestroy(error); + } + + // Create queue just for this operation. + rd_kafka_queue_t *rkqu = rd_kafka_queue_new(m_client->c_ptr()); + + rd_kafka_ListOffsets(m_client->c_ptr(), partitions, options, rkqu); + + // Poll for an event by type in that queue + // DON'T destroy the event. It is the out parameter, and ownership is + // the caller's. + *event_response = + PollForEvent(rkqu, RD_KAFKA_EVENT_LISTOFFSETS_RESULT, timeout_ms); + + // Destroy the queue since we are done with it. + rd_kafka_queue_destroy(rkqu); + + // Destroy the options we just made because we polled already + rd_kafka_AdminOptions_destroy(options); + + // If we got no response from that operation, this is a failure + // likely due to time out + if (*event_response == NULL) { + return Baton(RdKafka::ERR__TIMED_OUT); + } + + // Now we can get the error code from the event + if (rd_kafka_event_error(*event_response)) { + // If we had a special error code, get out of here with it + const rd_kafka_resp_err_t errcode = rd_kafka_event_error(*event_response); + return Baton(static_cast(errcode)); + } + + // At this point, event_response contains the result, which needs + // to be parsed/converted by the caller. + return Baton(RdKafka::ERR_NO_ERROR); + } +} + void AdminClient::ActivateDispatchers() { // Listen to global config m_gconfig->listen(); @@ -1443,4 +1512,49 @@ NAN_METHOD(AdminClient::NodeDescribeTopics) { include_authorised_operations, timeout_ms)); } + +/** + * List Offsets. + */ +NAN_METHOD(AdminClient::NodeListOffsets) { + Nan::HandleScope scope; + + if (info.Length() < 3 || !info[2]->IsFunction()) { + return Nan::ThrowError("Need to specify a callback"); + } + + if (!info[0]->IsArray()) { + return Nan::ThrowError("Must provide an array of 'TopicPartitionOffsets'"); + } + + v8::Local listOffsets = info[0].As(); + + /** + * The ownership of this is taken by + * Workers::AdminClientListOffsets and freeing it is also handled + * by that class. + */ + rd_kafka_topic_partition_list_t *partitions = Conversion::TopicPartition:: + TopicPartitionOffsetSpecv8ArrayToTopicPartitionList(listOffsets); + + // Now process the second argument: options (timeout and isolationLevel) + v8::Local options = info[1].As(); + + rd_kafka_IsolationLevel_t isolation_level = + static_cast(GetParameter( + options, "isolationLevel", + static_cast(RD_KAFKA_ISOLATION_LEVEL_READ_UNCOMMITTED))); + + int timeout_ms = GetParameter(options, "timeout", 5000); + + // Create the final callback object + v8::Local cb = info[2].As(); + Nan::Callback *callback = new Nan::Callback(cb); + AdminClient *client = ObjectWrap::Unwrap(info.This()); + + // Queue the worker to process the offset fetch request asynchronously + Nan::AsyncQueueWorker(new Workers::AdminClientListOffsets( + callback, client, partitions, timeout_ms, isolation_level)); +} + } // namespace NodeKafka diff --git a/src/admin.h b/src/admin.h index 12103cdc..9a269134 100644 --- a/src/admin.h +++ b/src/admin.h @@ -71,6 +71,9 @@ class AdminClient : public Connection { Baton DescribeTopics(rd_kafka_TopicCollection_t* topics, bool include_authorized_operations, int timeout_ms, rd_kafka_event_t** event_response); + Baton ListOffsets(rd_kafka_topic_partition_list_t* partitions, int timeout_ms, + rd_kafka_IsolationLevel_t isolation_level, + rd_kafka_event_t** event_response); protected: static Nan::Persistent constructor; @@ -96,6 +99,7 @@ class AdminClient : public Connection { static NAN_METHOD(NodeListConsumerGroupOffsets); static NAN_METHOD(NodeDeleteRecords); static NAN_METHOD(NodeDescribeTopics); + static NAN_METHOD(NodeListOffsets); static NAN_METHOD(NodeConnect); static NAN_METHOD(NodeDisconnect); diff --git a/src/common.cc b/src/common.cc index 6181001e..e488e02e 100644 --- a/src/common.cc +++ b/src/common.cc @@ -496,13 +496,54 @@ rd_kafka_topic_partition_list_t* TopicPartitionv8ArrayToTopicPartitionList( rd_kafka_topic_partition_list_add(newList, topic.c_str(), partition); if (include_offset) { - int offset = GetParameter(item, "offset", 0); + int64_t offset = GetParameter(item, "offset", 0); toppar->offset = offset; } } return newList; } +/** + * @brief v8 Array of Topic Partitions with offsetspec to + * rd_kafka_topic_partition_list_t + * + * @note Converts a v8 array of type [{topic: string, partition: number, + * offset: {timestamp: number}}] to a rd_kafka_topic_partition_list_t + */ +rd_kafka_topic_partition_list_t* +TopicPartitionOffsetSpecv8ArrayToTopicPartitionList( + v8::Local parameter) { + rd_kafka_topic_partition_list_t* newList = + rd_kafka_topic_partition_list_new(parameter->Length()); + + for (unsigned int i = 0; i < parameter->Length(); i++) { + v8::Local v; + if (!Nan::Get(parameter, i).ToLocal(&v)) { + continue; + } + + if (!v->IsObject()) { + return NULL; // Return NULL to indicate an error + } + + v8::Local item = v.As(); + + std::string topic = GetParameter(item, "topic", ""); + int partition = GetParameter(item, "partition", -1); + + rd_kafka_topic_partition_t* toppar = + rd_kafka_topic_partition_list_add(newList, topic.c_str(), partition); + + v8::Local offsetValue = + Nan::Get(item, Nan::New("offset").ToLocalChecked()).ToLocalChecked(); + v8::Local offsetObject = offsetValue.As(); + int64_t offset = GetParameter(offsetObject, "timestamp", 0); + + toppar->offset = offset; + } + return newList; +} + /** * @brief v8::Object to RdKafka::TopicPartition * @@ -1431,6 +1472,64 @@ v8::Local FromDescribeTopicsResult( return returnArray; } +/** + * @brief Converts a rd_kafka_ListOffsets_result_t* into a v8 Array. + */ +v8::Local FromListOffsetsResult( + const rd_kafka_ListOffsets_result_t* result) { + /* Return object type: + [{ + topic: string, + partition: number, + offset: number, + error: LibrdKafkaError + timestamp: number + }] + */ + + size_t result_cnt, i; + const rd_kafka_ListOffsetsResultInfo_t** results = + rd_kafka_ListOffsets_result_infos(result, &result_cnt); + + v8::Local resultArray = Nan::New(); + int partitionIndex = 0; + + for (i = 0; i < result_cnt; i++) { + const rd_kafka_topic_partition_t* partition = + rd_kafka_ListOffsetsResultInfo_topic_partition(results[i]); + int64_t timestamp = rd_kafka_ListOffsetsResultInfo_timestamp(results[i]); + + // Create the ListOffsetsResult object + v8::Local partition_object = Nan::New(); + + // Set topic, partition, offset, error and timestamp + Nan::Set(partition_object, Nan::New("topic").ToLocalChecked(), + Nan::New(partition->topic).ToLocalChecked()); + Nan::Set(partition_object, Nan::New("partition").ToLocalChecked(), + Nan::New(partition->partition)); + Nan::Set(partition_object, Nan::New("offset").ToLocalChecked(), + Nan::New(partition->offset)); + if (partition->err != RD_KAFKA_RESP_ERR_NO_ERROR) { + RdKafka::ErrorCode code = static_cast(partition->err); + Nan::Set(partition_object, Nan::New("error").ToLocalChecked(), + RdKafkaError(code, rd_kafka_err2str(partition->err))); + } + // Set leaderEpoch (if available) + int32_t leader_epoch = + rd_kafka_topic_partition_get_leader_epoch(partition); + if (leader_epoch >= 0) { + Nan::Set(partition_object, Nan::New("leaderEpoch").ToLocalChecked(), + Nan::New(leader_epoch)); + } + Nan::Set(partition_object, Nan::New("timestamp").ToLocalChecked(), + Nan::New(timestamp)); + + Nan::Set(resultArray, partitionIndex++, partition_object); + } + + return resultArray; +} + } // namespace Admin } // namespace Conversion diff --git a/src/common.h b/src/common.h index 0eef82ab..121a5cda 100644 --- a/src/common.h +++ b/src/common.h @@ -143,6 +143,10 @@ v8::Local FromDeleteRecordsResult( // DescribeTopics: Response v8::Local FromDescribeTopicsResult( const rd_kafka_DescribeTopics_result_t* result); + +// ListOffsets: Response +v8::Local FromListOffsetsResult( + const rd_kafka_ListOffsets_result_t* result); } // namespace Admin namespace TopicPartition { @@ -154,6 +158,9 @@ RdKafka::TopicPartition *FromV8Object(v8::Local); std::vector FromV8Array(const v8::Local &); // NOLINT rd_kafka_topic_partition_list_t *TopicPartitionv8ArrayToTopicPartitionList( v8::Local parameter, bool include_offset); +rd_kafka_topic_partition_list_t * +TopicPartitionOffsetSpecv8ArrayToTopicPartitionList( + v8::Local parameter); } // namespace TopicPartition diff --git a/src/workers.cc b/src/workers.cc index 1756baad..bb653c7d 100644 --- a/src/workers.cc +++ b/src/workers.cc @@ -1661,6 +1661,59 @@ void AdminClientDescribeTopics::HandleErrorCallback() { callback->Call(argc, argv); } +/** + * @brief ListOffsets in an asynchronous worker + * + * This callback will list requested offsets for the specified topic partitions. + */ +AdminClientListOffsets::AdminClientListOffsets( + Nan::Callback* callback, NodeKafka::AdminClient* client, + rd_kafka_topic_partition_list_t* partitions, const int& timeout_ms, + rd_kafka_IsolationLevel_t isolation_level) + : ErrorAwareWorker(callback), + m_client(client), + m_partitions(partitions), + m_timeout_ms(timeout_ms), + m_isolation_level(isolation_level) {} + +AdminClientListOffsets::~AdminClientListOffsets() { + if (m_partitions) { + rd_kafka_topic_partition_list_destroy(m_partitions); + } + + if (this->m_event_response) { + rd_kafka_event_destroy(this->m_event_response); + } +} + +void AdminClientListOffsets::Execute() { + Baton b = m_client->ListOffsets(m_partitions, m_timeout_ms, m_isolation_level, + &m_event_response); + if (b.err() != RdKafka::ERR_NO_ERROR) { + SetErrorBaton(b); + } +} + +void AdminClientListOffsets::HandleOKCallback() { + Nan::HandleScope scope; + const unsigned int argc = 2; + v8::Local argv[argc]; + + argv[0] = Nan::Null(); + argv[1] = Conversion::Admin::FromListOffsetsResult( + rd_kafka_event_ListOffsets_result(m_event_response)); + + callback->Call(argc, argv); +} + +void AdminClientListOffsets::HandleErrorCallback() { + Nan::HandleScope scope; + + const unsigned int argc = 1; + v8::Local argv[argc] = {GetErrorObject()}; + + callback->Call(argc, argv); +} } // namespace Workers } // namespace NodeKafka diff --git a/src/workers.h b/src/workers.h index bac7e018..b9583823 100644 --- a/src/workers.h +++ b/src/workers.h @@ -589,6 +589,9 @@ class AdminClientDeleteGroups : public ErrorAwareWorker { rd_kafka_event_t *m_event_response; }; +/** + * @brief List consumer group offsets on a remote broker cluster. + */ class AdminClientListConsumerGroupOffsets : public ErrorAwareWorker { public: AdminClientListConsumerGroupOffsets(Nan::Callback *, NodeKafka::AdminClient *, @@ -654,6 +657,28 @@ class AdminClientDescribeTopics : public ErrorAwareWorker { rd_kafka_event_t *m_event_response; }; +/** + * @brief List Offsets on a remote broker cluster. + */ +class AdminClientListOffsets : public ErrorAwareWorker { + public: + AdminClientListOffsets(Nan::Callback *, NodeKafka::AdminClient *, + rd_kafka_topic_partition_list_t *, const int &, + rd_kafka_IsolationLevel_t); + ~AdminClientListOffsets(); + + void Execute(); + void HandleOKCallback(); + void HandleErrorCallback(); + + private: + NodeKafka::AdminClient *m_client; + rd_kafka_topic_partition_list_t *m_partitions; + const int m_timeout_ms; + const rd_kafka_IsolationLevel_t m_isolation_level; + rd_kafka_event_t *m_event_response; +}; + } // namespace Workers } // namespace NodeKafka diff --git a/test/promisified/admin/fetch_topic_offsets.spec.js b/test/promisified/admin/fetch_topic_offsets.spec.js new file mode 100644 index 00000000..6988bfbe --- /dev/null +++ b/test/promisified/admin/fetch_topic_offsets.spec.js @@ -0,0 +1,115 @@ +jest.setTimeout(30000); + +const { ErrorCodes, IsolationLevel } = require("../../../lib").KafkaJS; +const { + secureRandom, + createTopic, + createProducer, + createAdmin, +} = require("../testhelpers"); + +describe("fetchTopicOffsets function", () => { + let topicName, admin, producer; + + beforeEach(async () => { + admin = createAdmin({}); + producer = createProducer({ + clientId: "test-producer-id", + }); + + await admin.connect(); + await producer.connect(); + + topicName = `test-topic-${secureRandom()}`; + }); + + afterEach(async () => { + await admin.deleteTopics({ + topics: [topicName], + }); + await admin.disconnect(); + producer && (await producer.disconnect()); + }); + + it("should timeout when fetching topic offsets", async () => { + await createTopic({ topic: topicName, partitions: 1 }); + + await expect( + admin.fetchTopicOffsets(topicName, { timeout: 0 }) + ).rejects.toHaveProperty("code", ErrorCodes.ERR__TIMED_OUT); + }); + + it("should return result for a topic with a single partition with isolation level READ_UNCOMMITTED", async () => { + await createTopic({ topic: topicName, partitions: 1 }); + + // Send some messages to reach specific offsets + const messages = Array.from({ length: 5 }, (_, i) => ({ + value: `message${i}`, + })); + await producer.send({ topic: topicName, messages: messages }); + + // Fetch offsets with isolation level READ_UNCOMMITTED + const offsets = await admin.fetchTopicOffsets(topicName, { + isolationLevel: IsolationLevel.READ_UNCOMMITTED, + }); + + expect(offsets).toEqual([ + { + partition: 0, + offset: "5", + low: "0", + high: "5", + }, + ]); + }); + + it("should return result for a topic with a single partition with isolation level READ_COMMITTED", async () => { + await createTopic({ topic: topicName, partitions: 1 }); + + // Send some messages to reach specific offsets + const messages = Array.from({ length: 5 }, (_, i) => ({ + value: `message${i}`, + })); + await producer.send({ topic: topicName, messages: messages }); + + // Fetch offsets with isolation level READ_COMMITTED + const offsets = await admin.fetchTopicOffsets(topicName, { + isolationLevel: IsolationLevel.READ_COMMITTED, + }); + + expect(offsets).toEqual([ + { + partition: 0, + offset: "5", + low: "0", + high: "5", + }, + ]); + }); + + it("should return result for a topic with multiple partitions", async () => { + await createTopic({ topic: topicName, partitions: 2 }); + + const messagesPartition0 = Array.from({ length: 5 }, (_, i) => ({ + value: `message${i}`, + partition: 0, + })); + const messagesPartition1 = Array.from({ length: 5 }, (_, i) => ({ + value: `message${i}`, + partition: 1, + })); + + await producer.send({ topic: topicName, messages: messagesPartition0 }); + await producer.send({ topic: topicName, messages: messagesPartition1 }); + + // Fetch offsets with isolation level READ_UNCOMMITTED + const offsets = await admin.fetchTopicOffsets(topicName, { + isolationLevel: IsolationLevel.READ_UNCOMMITTED, + }); + + expect(offsets).toEqual([ + { partition: 0, offset: "5", low: "0", high: "5" }, + { partition: 1, offset: "5", low: "0", high: "5" }, + ]); + }); +}); diff --git a/types/kafkajs.d.ts b/types/kafkajs.d.ts index 4a129926..05ba626e 100644 --- a/types/kafkajs.d.ts +++ b/types/kafkajs.d.ts @@ -8,7 +8,8 @@ import { DeleteRecordsResult, Node, AclOperationTypes, - Uuid + Uuid, + IsolationLevel } from './rdkafka' // Admin API related interfaces, types etc; and Error types are common, so @@ -22,7 +23,8 @@ export { DeleteRecordsResult, Node, AclOperationTypes, - Uuid + Uuid, + IsolationLevel } from './rdkafka' export interface OauthbearerProviderResponse { @@ -419,4 +421,9 @@ export type Admin = { includeAuthorizedOperations?: boolean, timeout?: number }): Promise<{ topics: Array }> + fetchTopicOffsets(topic: string, + options?: { + timeout?: number, + isolationLevel: IsolationLevel + }): Promise> } diff --git a/types/rdkafka.d.ts b/types/rdkafka.d.ts index 165786ea..097587f9 100644 --- a/types/rdkafka.d.ts +++ b/types/rdkafka.d.ts @@ -63,7 +63,7 @@ export interface Metadata { brokers: BrokerMetadata[]; } -export interface WatermarkOffsets{ +export interface WatermarkOffsets { lowOffset: number; highOffset: number; } @@ -455,6 +455,33 @@ export type TopicDescription = { authorizedOperations?: AclOperationTypes[] } +export class OffsetSpec { + constructor(timestamp: number); + static EARLIEST: OffsetSpec; + static LATEST: OffsetSpec; + static MAX_TIMESTAMP: OffsetSpec; +} + +export type TopicPartitionOffsetSpec = { + topic: string + partition: number + offset: OffsetSpec +} + +export enum IsolationLevel { + READ_UNCOMMITTED = 0, + READ_COMMITTED = 1, +} + +export interface ListOffsetsResult { + topic: string; + partition: number; + offset: number; + error?: LibrdKafkaError; + leaderEpoch?: number; + timestamp: number; +} + export interface IAdminClient { createTopic(topic: NewTopic, cb?: (err: LibrdKafkaError) => void): void; createTopic(topic: NewTopic, timeout?: number, cb?: (err: LibrdKafkaError) => void): void; @@ -482,18 +509,22 @@ export interface IAdminClient { options?: { timeout?: number }, cb?: (err: LibrdKafkaError, result: DeleteGroupsResult[]) => any): void; - listConsumerGroupOffsets(listGroupOffsets : ListGroupOffsets[], + listConsumerGroupOffsets(listGroupOffsets: ListGroupOffsets[], options?: { timeout?: number, requireStableOffsets?: boolean }, cb?: (err: LibrdKafkaError, result: GroupResults[]) => any): void; deleteRecords(delRecords: TopicPartitionOffset[], options?: { timeout?: number, operationTimeout?: number }, cb?: (err: LibrdKafkaError, result: DeleteRecordsResult[]) => any): void; - + describeTopics(topics: string[], options?: { includeAuthorizedOperations?: boolean, timeout?: number }, cb?: (err: LibrdKafkaError, result: TopicDescription[]) => any): void; + listOffsets(partitions: TopicPartitionOffsetSpec[], + options?: { timeout?: number, isolationLevel?: IsolationLevel }, + cb?: (err: LibrdKafkaError, result: ListOffsetsResult[]) => any): void; + disconnect(): void; }