From 3e6688a6b81766b1d9c11268ef276025a14ab77a Mon Sep 17 00:00:00 2001 From: rinze Date: Mon, 20 Aug 2018 12:58:34 +0200 Subject: [PATCH 1/5] Make sure that offset is in a valid range before subscribing (closes #5) --- src/example/consumer.ts | 2 +- src/lib/test-bed-adapter.ts | 52 +++++++++++++++++++++---------------- 2 files changed, 31 insertions(+), 23 deletions(-) diff --git a/src/example/consumer.ts b/src/example/consumer.ts index 4d52a61..580fcfa 100644 --- a/src/example/consumer.ts +++ b/src/example/consumer.ts @@ -47,7 +47,7 @@ class Consumer { private subscribe() { this.adapter.on('message', message => this.handleMessage(message)); - // this.adapter.addConsumerTopics({ topic: TestBedAdapter.HeartbeatTopic }).catch(err => { + // this.adapter.addConsumerTopics({ topic: TestBedAdapter.HeartbeatTopic, offset: Number.MAX_SAFE_INTEGER }, true).catch(err => { // if (err) { log.error(`Consumer received an error: ${err}`); } // }); } diff --git a/src/lib/test-bed-adapter.ts b/src/lib/test-bed-adapter.ts index 9ce7db3..a147292 100644 --- a/src/lib/test-bed-adapter.ts +++ b/src/lib/test-bed-adapter.ts @@ -6,7 +6,7 @@ import { FileLogger } from './logger/file-logger'; import { EventEmitter } from 'events'; import { Logger } from './logger/logger'; import { ISendResponse } from './models/adapter-message'; -import { KafkaClient, Producer, Consumer, ProduceRequest, Message, OffsetFetchRequest, Topic } from 'kafka-node'; +import { KafkaClient, Producer, Consumer, ProduceRequest, Message, OffsetFetchRequest, Topic, Offset } from 'kafka-node'; import { ITimeMessage } from './models/time-message'; import { IHeartbeat } from './models/heartbeat'; import { IInitializedTopic } from './models/topic'; @@ -106,29 +106,22 @@ export class TestBedAdapter extends EventEmitter { /** After the Kafka client is connected, initialize the other services too, starting with the schema registry. */ private initialize() { - return new Promise(async (resolve) => { - await this.schemaRegistry.init(); - await this.initProducer(); - await this.addProducerTopics(this.config.produce); - await this.addKafkaLogger(); - await this.startHeartbeat(); - await this.initConsumer(); - await this.addConsumerTopics(this.config.consume, this.config.fromOffset); - await this.configUpdated(); - await this.emit('ready'); + return new Promise(async (resolve, reject) => { + try { + await this.schemaRegistry.init(); + await this.initProducer(); + await this.addProducerTopics(this.config.produce); + await this.addKafkaLogger(); + await this.startHeartbeat(); + await this.initConsumer(); + await this.addConsumerTopics(this.config.consume, this.config.fromOffset); + await this.configUpdated(); + await this.emit('ready'); + } catch (err) { + return this.emitErrorMsg(`Error initializing kafka services: ${err}`, reject); + } resolve(); }); - // this.schemaRegistry - // .init() - // .then(() => this.initProducer()) - // .then(() => this.addKafkaLogger()) - // .then(() => this.startHeartbeat()) - // .then(() => this.addProducerTopics(this.config.produce)) - // .then(() => this.initConsumer()) - // .then(() => this.addConsumerTopics(this.config.consume, this.config.fromOffset)) - // .then(() => this.configUpdated()) - // .then(() => this.emit('ready')) - // .catch((err) => this.emitErrorMsg(err)); } public pause() { @@ -428,6 +421,7 @@ export class TestBedAdapter extends EventEmitter { this.log.error(`initializeConsumerTopics - no schema registered for topic ${t.topic}`); return; } + this.assertOffsetInRange(t); newTopics.push(t); if (this.config.consume && this.config.consume.filter((fr) => fr.topic === t.topic).length === 0) { isConfigUpdated = true; @@ -445,6 +439,20 @@ export class TestBedAdapter extends EventEmitter { return newTopics; } + private assertOffsetInRange(t: OffsetFetchRequest) { + if (!this.client || !t.hasOwnProperty('offset')) return; + const partition = t.partition || 0; + const topic = t.topic; + const offset = new Offset(this.client); + offset.fetchLatestOffsets([topic], (error, offsets) => { + if (error) return this.emitErrorMsg(error); + if (!offsets || !offsets[topic] || !offsets[topic].hasOwnProperty(partition)) + return this.emitErrorMsg(`Could not fetch latest offset for ${topic}`); + t.offset = Math.min(t.offset!, offsets[topic][partition]); + this.log.info(`Set offset of topic ${topic} to ${t.offset}`); + }); + } + /** * Add the topics to the configuration and initialize the encoders/validators. * @param topics topics to add From 7922e9afefb8c0c66aa3e1149e0730c7bbca5984 Mon Sep 17 00:00:00 2001 From: rinze Date: Wed, 29 Aug 2018 18:01:02 +0200 Subject: [PATCH 2/5] Add debug logging to magic byte error --- src/lib/models/magic-byte.ts | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/lib/models/magic-byte.ts b/src/lib/models/magic-byte.ts index c59f34c..4a55f17 100644 --- a/src/lib/models/magic-byte.ts +++ b/src/lib/models/magic-byte.ts @@ -45,7 +45,9 @@ export const toMessageBuffer = (val: any, type: IAvroType, schemaId: number, opt */ export const fromMessageBuffer = (type: IAvroType, encodedMessage: Buffer, sr: SchemaRegistry) => { if (encodedMessage[0] !== MAGIC_BYTE) { - Logger.instance.error('Message not serialized with magic byte!'); + Logger.instance.error(`Message not serialized with magic byte!`); + Logger.instance.debug(`type: ${JSON.stringify(type)}`); + Logger.instance.debug(`encodedMessage: ${encodedMessage.toString()}`); return { value: undefined, schemaId: undefined }; } From f383145f8d2126098c49c1c0973db0cfb1f2f66f Mon Sep 17 00:00:00 2001 From: rinze Date: Wed, 29 Aug 2018 18:03:35 +0200 Subject: [PATCH 3/5] Make several methods async. Add more debug logging. --- src/lib/test-bed-adapter.ts | 89 +++++++++++++++++++++---------------- 1 file changed, 51 insertions(+), 38 deletions(-) diff --git a/src/lib/test-bed-adapter.ts b/src/lib/test-bed-adapter.ts index a147292..f69063a 100644 --- a/src/lib/test-bed-adapter.ts +++ b/src/lib/test-bed-adapter.ts @@ -70,13 +70,21 @@ export class TestBedAdapter extends EventEmitter { this.schemaRegistry = new SchemaRegistry(this.config); } - public connect(): Promise<{}> { + public async connect(): Promise<{}> { return new Promise(async (resolve, reject) => { - await this.initLogger(); - await this.schemaPublisher.init(); + try { + await this.initLogger(); + await this.schemaPublisher.init(); + } catch (e) { + this.emitErrorMsg(`Error before initializing the testbed connection: ${e}`); + } this.client = new KafkaClient(this.config); this.client.on('ready', async () => { - await this.initialize(); + try { + await this.initialize(); + } catch (e) { + this.emitErrorMsg(`Error in initializing the testbed connection: ${e}`, reject); + } resolve(); }); this.client.on('error', (error) => { @@ -112,9 +120,9 @@ export class TestBedAdapter extends EventEmitter { await this.initProducer(); await this.addProducerTopics(this.config.produce); await this.addKafkaLogger(); - await this.startHeartbeat(); await this.initConsumer(); await this.addConsumerTopics(this.config.consume, this.config.fromOffset); + await this.startHeartbeat(); await this.configUpdated(); await this.emit('ready'); } catch (err) { @@ -243,9 +251,9 @@ export class TestBedAdapter extends EventEmitter { newTopics, (error, added) => { if (error) { - return this.emitErrorMsg(`addProducerTopics - Error ${error}`, reject); + return this.emitErrorMsg(`addConsumerTopics - Error ${error}`, reject); } - this.log.info(`Added topics: ${added}`); + this.log.info(`Added topics: ${added} (fromOffset? ${fromOffset})`); registerCallback(added); resolve(newTopics); }, @@ -313,27 +321,24 @@ export class TestBedAdapter extends EventEmitter { }); } - private initLogger() { - return new Promise((resolve) => { - const loggers: ILogger[] = []; - const logOptions = this.config.logging; - if (logOptions) { - if (logOptions.logToConsole) { - loggers.push({ - logger: new ConsoleLogger(), - minLevel: logOptions.logToConsole, - }); - } - if (logOptions.logToFile) { - loggers.push({ - logger: new FileLogger(logOptions.logFile || 'log.txt'), - minLevel: logOptions.logToFile, - }); - } - this.log.initialize(loggers); + private async initLogger() { + const loggers: ILogger[] = []; + const logOptions = this.config.logging; + if (logOptions) { + if (logOptions.logToConsole) { + loggers.push({ + logger: new ConsoleLogger(), + minLevel: logOptions.logToConsole, + }); } - resolve(); - }); + if (logOptions.logToFile) { + loggers.push({ + logger: new FileLogger(logOptions.logFile || 'log.txt'), + minLevel: logOptions.logToFile, + }); + } + this.log.initialize(loggers); + } } /** If required, add the Kafka logger too (after the producer has been initialised). */ @@ -422,6 +427,7 @@ export class TestBedAdapter extends EventEmitter { return; } this.assertOffsetInRange(t); + t = this.removeAdditionalFields(t); newTopics.push(t); if (this.config.consume && this.config.consume.filter((fr) => fr.topic === t.topic).length === 0) { isConfigUpdated = true; @@ -448,11 +454,17 @@ export class TestBedAdapter extends EventEmitter { if (error) return this.emitErrorMsg(error); if (!offsets || !offsets[topic] || !offsets[topic].hasOwnProperty(partition)) return this.emitErrorMsg(`Could not fetch latest offset for ${topic}`); - t.offset = Math.min(t.offset!, offsets[topic][partition]); + this.log.debug(`Latest offsets:\n${JSON.stringify(offsets)}`); + const configuredOffset = (t.offset! === -1) ? Number.MAX_SAFE_INTEGER : t.offset!; + t.offset = Math.min(configuredOffset, offsets[topic][partition]); this.log.info(`Set offset of topic ${topic} to ${t.offset}`); }); } + private removeAdditionalFields(t: OffsetFetchRequest): OffsetFetchRequest { + return {offset: t.offset, partition: t.partition, topic: t.topic}; + } + /** * Add the topics to the configuration and initialize the encoders/validators. * @param topics topics to add @@ -497,7 +509,7 @@ export class TestBedAdapter extends EventEmitter { * Configuration has changed. */ private configUpdated() { - return new Promise((resolve) => { + return new Promise((resolve, reject) => { if (!this.producer) { return; } @@ -517,18 +529,18 @@ export class TestBedAdapter extends EventEmitter { } : undefined, }; - this.send( - [ - { - topic: TestBedAdapter.ConfigurationTopic, - messages: msg, - }, - ], + const configurationMsg = [ + { + topic: TestBedAdapter.ConfigurationTopic, + messages: msg, + }, + ]; + this.send(configurationMsg, (err?: string, result?: ISendResponse) => { if (err) { - this.emitErrorMsg(err); + return this.emitErrorMsg(`Error updating configuration: ${err} (msg: ${JSON.stringify(configurationMsg)})`, reject); } else if (result) { - this.log.info(result); + this.log.info(`Updated configuration: ${JSON.stringify(result)}`); } resolve(); } @@ -562,6 +574,7 @@ export class TestBedAdapter extends EventEmitter { } ); }, this.config.heartbeatInterval || 5000); + this.log.info(`Started heartbeat`); resolve(); }); // }); From 48137f67ae4dddd70ff28dd640e19f1a38a8d0d9 Mon Sep 17 00:00:00 2001 From: rinze Date: Wed, 29 Aug 2018 18:03:58 +0200 Subject: [PATCH 4/5] Update package versions. Bump version. --- package.json | 11 ++++++----- yarn.lock | 34 +++++++++++++++++++--------------- 2 files changed, 25 insertions(+), 20 deletions(-) diff --git a/package.json b/package.json index c240388..ba2e559 100644 --- a/package.json +++ b/package.json @@ -1,11 +1,12 @@ { "name": "node-test-bed-adapter", - "version": "0.4.5", + "version": "0.4.6", "description": "An adapter to connect a node.js application to the test-bed's Common Information Space or Common Simulation Space.", "main": "./dist/lib/index.js", "typings": "./dist/lib/index.d.ts", "scripts": { "build": "tsc -w", + "install": "tsc", "patch-release": "npm version patch && npm publish && git push --follow-tags", "minor-release": "npm version minor && npm publish && git push --follow-tags", "test": "jasmine JASMINE_CONFIG_PATH=src/test/jasmine.json || true", @@ -31,17 +32,17 @@ "homepage": "https://github.com/DRIVER-EU/node-test-bed-adapter#readme", "dependencies": { "@types/kafka-node": "^2.0.6", - "@types/bluebird": "^3.5.20", - "avsc": "^5.3.0", + "@types/bluebird": "^3.5.24", + "avsc": "^5.4.3", "axios": "^0.18.0", "bluebird": "^3.5.1", "kafka-node": "^2.6.1", "url": "^0.11.0", - "hoek": "5.0.3" + "hoek": "5.0.4" }, "devDependencies": { "@types/jasmine": "^2.8.8", - "@types/node": "^10.3.2", + "@types/node": "^10.9.3", "@types/proxyquire": "^1.3.28", "jasmine-ts": "^0.2.1", "proxyquire": "^2.0.1", diff --git a/yarn.lock b/yarn.lock index f54b1e3..eb027f6 100644 --- a/yarn.lock +++ b/yarn.lock @@ -2,13 +2,13 @@ # yarn lockfile v1 -"@types/bluebird@^3.5.20": - version "3.5.20" - resolved "https://registry.yarnpkg.com/@types/bluebird/-/bluebird-3.5.20.tgz#f6363172add6f4eabb8cada53ca9af2781e8d6a1" +"@types/bluebird@^3.5.24": + version "3.5.24" + resolved "https://registry.yarnpkg.com/@types/bluebird/-/bluebird-3.5.24.tgz#11f76812531c14f793b8ecbf1de96f672905de8a" -"@types/jasmine@^2.8.7": - version "2.8.7" - resolved "https://registry.yarnpkg.com/@types/jasmine/-/jasmine-2.8.7.tgz#3fe583928ae0a22cdd34cedf930eeffeda2602fd" +"@types/jasmine@^2.8.8": + version "2.8.8" + resolved "https://registry.yarnpkg.com/@types/jasmine/-/jasmine-2.8.8.tgz#bf53a7d193ea8b03867a38bfdb4fbb0e0bf066c9" "@types/kafka-node@^2.0.6": version "2.0.6" @@ -20,9 +20,9 @@ version "9.4.6" resolved "https://registry.yarnpkg.com/@types/node/-/node-9.4.6.tgz#d8176d864ee48753d053783e4e463aec86b8d82e" -"@types/node@^10.1.2": - version "10.1.2" - resolved "https://registry.yarnpkg.com/@types/node/-/node-10.1.2.tgz#1b928a0baa408fc8ae3ac012cc81375addc147c6" +"@types/node@^10.9.3": + version "10.9.3" + resolved "https://registry.yarnpkg.com/@types/node/-/node-10.9.3.tgz#85f288502503ade0b3bfc049fe1777b05d0327d5" "@types/proxyquire@^1.3.28": version "1.3.28" @@ -92,9 +92,9 @@ asynckit@^0.4.0: version "0.4.0" resolved "https://registry.yarnpkg.com/asynckit/-/asynckit-0.4.0.tgz#c79ed97f7f34cb8f2ba1bc9790bcc366474b4b79" -avsc@^5.2.3: - version "5.2.3" - resolved "https://registry.yarnpkg.com/avsc/-/avsc-5.2.3.tgz#d52c79898f79f032335f5ae3206ff4300ceb881a" +avsc@^5.4.3: + version "5.4.3" + resolved "https://registry.yarnpkg.com/avsc/-/avsc-5.4.3.tgz#56b5d2b188f4cf5c1aa1c204df387582535040f8" aws-sign2@~0.7.0: version "0.7.0" @@ -469,6 +469,10 @@ hoek@4.x.x: version "4.2.1" resolved "https://registry.yarnpkg.com/hoek/-/hoek-4.2.1.tgz#9634502aa12c445dd5a7c5734b572bb8738aacbb" +hoek@5.0.4: + version "5.0.4" + resolved "https://registry.yarnpkg.com/hoek/-/hoek-5.0.4.tgz#0f7fa270a1cafeb364a4b2ddfaa33f864e4157da" + homedir-polyfill@^1.0.1: version "1.0.1" resolved "https://registry.yarnpkg.com/homedir-polyfill/-/homedir-polyfill-1.0.1.tgz#4c2bbc8a758998feebf5ed68580f76d46768b4bc" @@ -1180,9 +1184,9 @@ typescript@^2.4.1: version "2.7.2" resolved "https://registry.yarnpkg.com/typescript/-/typescript-2.7.2.tgz#2d615a1ef4aee4f574425cdff7026edf81919836" -typescript@^2.8.3: - version "2.8.3" - resolved "https://registry.yarnpkg.com/typescript/-/typescript-2.8.3.tgz#5d817f9b6f31bb871835f4edf0089f21abe6c170" +typescript@^2.9.1: + version "2.9.2" + resolved "https://registry.yarnpkg.com/typescript/-/typescript-2.9.2.tgz#1cbf61d05d6b96269244eb6a3bce4bd914e0f00c" underscore@~1.4.4: version "1.4.4" From 7fe5cb0cec6a8787352ed2de7e6ea35321c3ab37 Mon Sep 17 00:00:00 2001 From: rinze Date: Wed, 29 Aug 2018 19:03:35 +0200 Subject: [PATCH 5/5] Add prepublish script --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index 4e58c46..36eb25f 100644 --- a/package.json +++ b/package.json @@ -6,7 +6,7 @@ "typings": "./dist/lib/index.d.ts", "scripts": { "build": "tsc -w", - "install": "tsc", + "prepublish": "tsc", "patch-release": "npm version patch && npm publish && git push --follow-tags", "minor-release": "npm version minor && npm publish && git push --follow-tags", "test": "jasmine JASMINE_CONFIG_PATH=src/test/jasmine.json || true",