Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

List offsets api implemented #156

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 77 additions & 0 deletions examples/kafkajs/admin/fetch-topic-offsets.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
const { Kafka, IsolationLevel } = require('@confluentinc/kafka-javascript').KafkaJS;
const { parseArgs } = require('node:util');

async function fetchOffsets() {
// Parse command-line arguments
const args = parseArgs({
allowPositionals: true,
options: {
'bootstrap-servers': {
type: 'string',
short: 'b',
default: 'localhost:9092',
},
'timeout': {
type: 'string',
short: 't',
default: '5000',
},
'isolation-level': {
type: 'string',
short: 'i',
default: '0', // Default to '0' for read_uncommitted
},
},
});

const {
'bootstrap-servers': bootstrapServers,
timeout,
'isolation-level': isolationLevel,
} = args.values;

const [topic] = args.positionals;

if (!topic) {
console.error('Topic name is required');
process.exit(1);
}

// Determine the isolation level
let isolationLevelValue;
if (isolationLevel === '0') {
isolationLevelValue = IsolationLevel.READ_UNCOMMITTED;
} else if (isolationLevel === '1') {
isolationLevelValue = IsolationLevel.READ_COMMITTED;
} else {
console.error('Invalid isolation level. Use 0 for READ_UNCOMMITTED or 1 for READ_COMMITTED.');
process.exit(1);
}

const kafka = new Kafka({
kafkaJS: {
brokers: [bootstrapServers],
},
});

const admin = kafka.admin();
await admin.connect();

try {
// Fetch offsets for the specified topic
const offsets = await admin.fetchTopicOffsets(
topic,
{
isolationLevel: isolationLevelValue, // Use determined isolation level
timeout: Number(timeout), // Convert timeout to a number
});

console.log(`Offsets for topic "${topic}":`, JSON.stringify(offsets, null, 2));
} catch (err) {
console.error('Error fetching topic offsets:', err);
} finally {
await admin.disconnect();
}
}

fetchOffsets();
47 changes: 47 additions & 0 deletions lib/admin.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,16 @@ const AclOperationTypes = Object.seal({
IDEMPOTENT_WRITE: 12,
});

const IsolationLevel = Object.seal({
READ_UNCOMMITTED: 0,
READ_COMMITTED: 1,
});

module.exports = {
create: createAdminClient,
ConsumerGroupStates,
AclOperationTypes,
IsolationLevel,
};
Comment on lines +41 to 51
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately this needs to be changed now to be compliant with jsdoc.
Add a comment above this too, like

/**
 * A list of isolation levels.
 * @enum {number}
 * @readonly
 * @memberof RdKafka
 */

Check this - https://github.com/confluentinc/confluent-kafka-javascript/pull/202/files#diff-7be52b6e88b2f7e0da9ce50e48c14057a4d1bbc2f45038ad223aff01ebef2d0dR38


var Client = require('./client');
Expand Down Expand Up @@ -575,3 +581,44 @@ AdminClient.prototype.describeTopics = function (topics, options, cb) {
}
});
};

/**
* List offsets for topic partition(s).
*
* @param {import("../../types/rdkafka").TopicPartitionOffset} partitions - The list of partitions to fetch offsets for.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again a jsdoc related change

Suggested change
* @param {import("../../types/rdkafka").TopicPartitionOffset} partitions - The list of partitions to fetch offsets for.
* @param {Array<{topic: string, partition: number, offset: number}>} partitions - The list of partitions to fetch offsets for.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's better if we create a new type of object, and use offset: OffsetSpec.

function OffsetSpec(timestamp) { this.value = timestamp; /* for non-negative cases */ } // creates offset spec for arbitrary timestamp

Then we provide OffsetSpec.EARLIER, OffsetSpec.LATEST, and OffsetSpec.MAX_TIMESTAMP_SPEC values if someone wants to use one of the special values.

Something of this sort. An undocumented number is not that good to have

* @param {number} options.timeout - The request timeout in milliseconds.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just above this, add

@param {object} options

Again for jsdoc

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* @param {number} options.timeout - The request timeout in milliseconds.
* @param {number?} options.timeout - The request timeout in milliseconds.

* May be unset (default: 5000)
* @param {IsolationLevel} options.isolationLevel - The isolation level for reading the offsets.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* @param {IsolationLevel} options.isolationLevel - The isolation level for reading the offsets.
* @param {RdKafka.IsolationLevel?} options.isolationLevel - The isolation level for reading the offsets.

* (default: READ_UNCOMMITTED)
* @param {function} cb - The callback to be executed when finished.
*/
AdminClient.prototype.listOffsets = function (partitions, options, cb) {
if (!this._isConnected) {
throw new Error('Client is disconnected');
}

if(!options) {
options = {};
}

if (!Object.hasOwn(options, 'timeout')) {
options.timeout = 5000;
}

if(!Object.hasOwn(options, 'isolationLevel')) {
options.isolationLevel = IsolationLevel.READ_UNCOMMITTED;
}

this._client.listOffsets(partitions, options, function (err, offsets) {
if (err) {
if (cb) {
cb(LibrdKafkaError.create(err));
}
return;
}

if (cb) {
cb(null, offsets);
}
});
};
114 changes: 113 additions & 1 deletion lib/kafkajs/_admin.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ const { kafkaJSToRdKafkaConfig,
severityToLogLevel,
} = require('./_common');
const error = require('./_error');
const { hrtime } = require('process');

/**
* NOTE: The Admin client is currently in an experimental state with many
Expand Down Expand Up @@ -649,10 +650,121 @@ class Admin {
});
});
}

/**
* List offsets for the specified topic partition(s).
*
* @param {string} topic - The topic to fetch offsets for.
* @param {number} options.timeout - The request timeout in milliseconds.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just above this add

* @param {object} options
* @param {number?} options.timeout - The request timeout in milliseconds.

* May be unset (default: 5000)
* @param {IsolationLevel} options.isolationLevel - The isolation level for reading the offsets.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* @param {IsolationLevel} options.isolationLevel - The isolation level for reading the offsets.
* @param {KafkaJS.IsolationLevel?} options.isolationLevel - The isolation level for reading the offsets.

* (default: READ_UNCOMMITTED)
*
* @returns {Promise<Array<import("../../types/kafkajs").SeekEntry & {high: string; low: string}>>}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* @returns {Promise<Array<import("../../types/kafkajs").SeekEntry & {high: string; low: string}>>}
* @returns {Promise<Array<{partition: number, offset: string, high: string; low: string}>>}

*/
async fetchTopicOffsets(topic, options = {}) {
if (this.#state !== AdminState.CONNECTED) {
throw new error.KafkaJSError("Admin client is not connected.", { code: error.ErrorCodes.ERR__STATE });
}

if (!Object.hasOwn(options, 'timeout')) {
options.timeout = 5000;
}

let topicData;
let startTime, endTime, timeTaken;

try {
// Measure time taken for fetchTopicMetadata
startTime = hrtime.bigint();
topicData = await this.fetchTopicMetadata({ topics: [topic], timeout: options.timeout });
endTime = hrtime.bigint();
timeTaken = Number(endTime - startTime) / 1e6; // Convert nanoseconds to milliseconds

// Adjust timeout for the next request
options.timeout -= timeTaken;
if (options.timeout <= 0) {
throw new error.KafkaJSError("Timeout exceeded while fetching topic metadata.", { code: error.ErrorCodes.ERR__TIMED_OUT });
}
} catch (err) {
throw new createKafkaJsErrorFromLibRdKafkaError(err);
}

const partitionIds = topicData.flatMap(topic =>
topic.partitions.map(partition => partition.partitionId)
);

const topicPartitionOffsetsLatest = partitionIds.map(partitionId => ({
topic,
partition: partitionId,
offset: -1
}));

const topicPartitionOffsetsEarliest = partitionIds.map(partitionId => ({
topic,
partition: partitionId,
offset: -2
}));

try {
// Measure time taken for listOffsets (latest)
startTime = hrtime.bigint();
const latestOffsets = await this.listOffsets(topicPartitionOffsetsLatest, options);
endTime = hrtime.bigint();
timeTaken = Number(endTime - startTime) / 1e6; // Convert nanoseconds to milliseconds

// Adjust timeout for the next request
options.timeout -= timeTaken;
if (options.timeout <= 0) {
throw new error.KafkaJSError("Timeout exceeded while fetching latest offsets.", { code: error.ErrorCodes.ERR__TIMED_OUT });
}

// Measure time taken for listOffsets (earliest)
startTime = hrtime.bigint();
const earliestOffsets = await this.listOffsets(topicPartitionOffsetsEarliest, options);
endTime = hrtime.bigint();
timeTaken = Number(endTime - startTime) / 1e6; // Convert nanoseconds to milliseconds

// Adjust timeout for the next request
options.timeout -= timeTaken;
if (options.timeout <= 0) {
throw new error.KafkaJSError("Timeout exceeded while fetching earliest offsets.", { code: error.ErrorCodes.ERR__TIMED_OUT });
}

const combinedResults = partitionIds.map(partitionId => {
const latest = latestOffsets.find(offset => offset.partition === partitionId);
const earliest = earliestOffsets.find(offset => offset.partition === partitionId);

return {
partition: partitionId,
offset: latest.offset.toString(),
high: latest.offset.toString(),
low: earliest.offset.toString()
};
});

return combinedResults;
} catch (err) {
throw createKafkaJsErrorFromLibRdKafkaError(err);
}
}

listOffsets(partitionOffsets, options) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make this a private method

Suggested change
listOffsets(partitionOffsets, options) {
#listOffsets(partitionOffsets, options) {

return new Promise((resolve, reject) => {
this.#internalClient.listOffsets(partitionOffsets, options, (err, offsets) => {
if (err) {
reject(createKafkaJsErrorFromLibRdKafkaError(err));
} else {
resolve(offsets);
}
});
});
}
}

module.exports = {
Admin,
ConsumerGroupStates: RdKafka.AdminClient.ConsumerGroupStates,
AclOperationTypes: RdKafka.AdminClient.AclOperationTypes
AclOperationTypes: RdKafka.AdminClient.AclOperationTypes,
IsolationLevel: RdKafka.AdminClient.IsolationLevel
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

};
5 changes: 3 additions & 2 deletions lib/kafkajs/_kafka.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
const { Producer, CompressionTypes } = require('./_producer');
const { Consumer, PartitionAssigners } = require('./_consumer');
const { Admin, ConsumerGroupStates, AclOperationTypes } = require('./_admin');
const { Admin, ConsumerGroupStates, AclOperationTypes, IsolationLevel } = require('./_admin');
const error = require('./_error');
const { logLevel, checkIfKafkaJsKeysPresent, CompatibilityErrorMessages } = require('./_common');

Expand Down Expand Up @@ -87,4 +87,5 @@ module.exports = {
PartitionAssignors: PartitionAssigners,
CompressionTypes,
ConsumerGroupStates,
AclOperationTypes };
AclOperationTypes,
IsolationLevel};
Loading