diff --git a/package.json b/package.json index 2ce6815..36eb25f 100644 --- a/package.json +++ b/package.json @@ -1,11 +1,12 @@ { "name": "node-test-bed-adapter", - "version": "0.4.5", + "version": "0.4.6", "description": "An adapter to connect a node.js application to the test-bed's Common Information Space or Common Simulation Space.", "main": "./dist/lib/index.js", "typings": "./dist/lib/index.d.ts", "scripts": { "build": "tsc -w", + "prepublish": "tsc", "patch-release": "npm version patch && npm publish && git push --follow-tags", "minor-release": "npm version minor && npm publish && git push --follow-tags", "test": "jasmine JASMINE_CONFIG_PATH=src/test/jasmine.json || true", @@ -34,8 +35,8 @@ }, "homepage": "https://github.com/DRIVER-EU/node-test-bed-adapter#readme", "dependencies": { + "@types/bluebird": "^3.5.24", "@types/kafka-node": "^2.0.7", - "@types/bluebird": "^3.5.23", "avsc": "^5.4.3", "axios": "^0.18.0", "bluebird": "^3.5.1", @@ -45,7 +46,7 @@ }, "devDependencies": { "@types/jasmine": "^2.8.8", - "@types/node": "^10.9.2", + "@types/node": "^10.9.3", "@types/proxyquire": "^1.3.28", "jasmine-ts": "^0.2.1", "proxyquire": "^2.1.0", diff --git a/src/example/consumer.ts b/src/example/consumer.ts index 4d52a61..580fcfa 100644 --- a/src/example/consumer.ts +++ b/src/example/consumer.ts @@ -47,7 +47,7 @@ class Consumer { private subscribe() { this.adapter.on('message', message => this.handleMessage(message)); - // this.adapter.addConsumerTopics({ topic: TestBedAdapter.HeartbeatTopic }).catch(err => { + // this.adapter.addConsumerTopics({ topic: TestBedAdapter.HeartbeatTopic, offset: Number.MAX_SAFE_INTEGER }, true).catch(err => { // if (err) { log.error(`Consumer received an error: ${err}`); } // }); } diff --git a/src/lib/models/magic-byte.ts b/src/lib/models/magic-byte.ts index 8750c0f..c315a0e 100644 --- a/src/lib/models/magic-byte.ts +++ b/src/lib/models/magic-byte.ts @@ -46,7 +46,9 @@ export const toMessageBuffer = (val: any, type: Type, schemaId: number, optLengt */ export const fromMessageBuffer = (type: Type, encodedMessage: Buffer, sr: SchemaRegistry) => { if (encodedMessage[0] !== MAGIC_BYTE) { - Logger.instance.error('Message not serialized with magic byte!'); + Logger.instance.error(`Message not serialized with magic byte!`); + Logger.instance.debug(`type: ${JSON.stringify(type)}`); + Logger.instance.debug(`encodedMessage: ${encodedMessage.toString()}`); return { value: undefined, schemaId: undefined }; } diff --git a/src/lib/test-bed-adapter.ts b/src/lib/test-bed-adapter.ts index 9552bef..4467244 100644 --- a/src/lib/test-bed-adapter.ts +++ b/src/lib/test-bed-adapter.ts @@ -6,7 +6,7 @@ import { FileLogger } from './logger/file-logger'; import { EventEmitter } from 'events'; import { Logger } from './logger/logger'; import { ISendResponse } from './models/adapter-message'; -import { KafkaClient, Producer, Consumer, ProduceRequest, Message, OffsetFetchRequest, Topic } from 'kafka-node'; +import { KafkaClient, Producer, Consumer, ProduceRequest, Message, OffsetFetchRequest, Topic, Offset } from 'kafka-node'; import { ITimeMessage } from './models/time-message'; import { IHeartbeat } from './models/heartbeat'; import { IInitializedTopic } from './models/topic'; @@ -70,13 +70,21 @@ export class TestBedAdapter extends EventEmitter { this.schemaRegistry = new SchemaRegistry(this.config); } - public connect(): Promise<{}> { + public async connect(): Promise<{}> { return new Promise(async (resolve, reject) => { - await this.initLogger(); - await this.schemaPublisher.init(); + try { + await this.initLogger(); + await this.schemaPublisher.init(); + } catch (e) { + this.emitErrorMsg(`Error before initializing the testbed connection: ${e}`); + } this.client = new KafkaClient(this.config); this.client.on('ready', async () => { - await this.initialize(); + try { + await this.initialize(); + } catch (e) { + this.emitErrorMsg(`Error in initializing the testbed connection: ${e}`, reject); + } resolve(); }); this.client.on('error', (error) => { @@ -106,29 +114,22 @@ export class TestBedAdapter extends EventEmitter { /** After the Kafka client is connected, initialize the other services too, starting with the schema registry. */ private initialize() { - return new Promise(async (resolve) => { - await this.schemaRegistry.init(); - await this.initProducer(); - await this.addProducerTopics(this.config.produce); - await this.addKafkaLogger(); - await this.startHeartbeat(); - await this.initConsumer(); - await this.addConsumerTopics(this.config.consume, this.config.fromOffset); - await this.configUpdated(); - await this.emit('ready'); + return new Promise(async (resolve, reject) => { + try { + await this.schemaRegistry.init(); + await this.initProducer(); + await this.addProducerTopics(this.config.produce); + await this.addKafkaLogger(); + await this.initConsumer(); + await this.addConsumerTopics(this.config.consume, this.config.fromOffset); + await this.startHeartbeat(); + await this.configUpdated(); + await this.emit('ready'); + } catch (err) { + return this.emitErrorMsg(`Error initializing kafka services: ${err}`, reject); + } resolve(); }); - // this.schemaRegistry - // .init() - // .then(() => this.initProducer()) - // .then(() => this.addKafkaLogger()) - // .then(() => this.startHeartbeat()) - // .then(() => this.addProducerTopics(this.config.produce)) - // .then(() => this.initConsumer()) - // .then(() => this.addConsumerTopics(this.config.consume, this.config.fromOffset)) - // .then(() => this.configUpdated()) - // .then(() => this.emit('ready')) - // .catch((err) => this.emitErrorMsg(err)); } public pause() { @@ -250,9 +251,9 @@ export class TestBedAdapter extends EventEmitter { newTopics, (error, added) => { if (error) { - return this.emitErrorMsg(`addProducerTopics - Error ${error}`, reject); + return this.emitErrorMsg(`addConsumerTopics - Error ${error}`, reject); } - this.log.info(`Added topics: ${added}`); + this.log.info(`Added topics: ${added} (fromOffset? ${fromOffset})`); registerCallback(added); resolve(newTopics); }, @@ -320,27 +321,24 @@ export class TestBedAdapter extends EventEmitter { }); } - private initLogger() { - return new Promise((resolve) => { - const loggers: ILogger[] = []; - const logOptions = this.config.logging; - if (logOptions) { - if (logOptions.logToConsole) { - loggers.push({ - logger: new ConsoleLogger(), - minLevel: logOptions.logToConsole, - }); - } - if (logOptions.logToFile) { - loggers.push({ - logger: new FileLogger(logOptions.logFile || 'log.txt'), - minLevel: logOptions.logToFile, - }); - } - this.log.initialize(loggers); + private async initLogger() { + const loggers: ILogger[] = []; + const logOptions = this.config.logging; + if (logOptions) { + if (logOptions.logToConsole) { + loggers.push({ + logger: new ConsoleLogger(), + minLevel: logOptions.logToConsole, + }); } - resolve(); - }); + if (logOptions.logToFile) { + loggers.push({ + logger: new FileLogger(logOptions.logFile || 'log.txt'), + minLevel: logOptions.logToFile, + }); + } + this.log.initialize(loggers); + } } /** If required, add the Kafka logger too (after the producer has been initialised). */ @@ -428,6 +426,8 @@ export class TestBedAdapter extends EventEmitter { this.log.error(`initializeConsumerTopics - no schema registered for topic ${t.topic}`); return; } + this.assertOffsetInRange(t); + t = this.removeAdditionalFields(t); newTopics.push(t); if (this.config.consume && this.config.consume.filter((fr) => fr.topic === t.topic).length === 0) { isConfigUpdated = true; @@ -445,6 +445,26 @@ export class TestBedAdapter extends EventEmitter { return newTopics; } + private assertOffsetInRange(t: OffsetFetchRequest) { + if (!this.client || !t.hasOwnProperty('offset')) return; + const partition = t.partition || 0; + const topic = t.topic; + const offset = new Offset(this.client); + offset.fetchLatestOffsets([topic], (error, offsets) => { + if (error) return this.emitErrorMsg(error); + if (!offsets || !offsets[topic] || !offsets[topic].hasOwnProperty(partition)) + return this.emitErrorMsg(`Could not fetch latest offset for ${topic}`); + this.log.debug(`Latest offsets:\n${JSON.stringify(offsets)}`); + const configuredOffset = (t.offset! === -1) ? Number.MAX_SAFE_INTEGER : t.offset!; + t.offset = Math.min(configuredOffset, offsets[topic][partition]); + this.log.info(`Set offset of topic ${topic} to ${t.offset}`); + }); + } + + private removeAdditionalFields(t: OffsetFetchRequest): OffsetFetchRequest { + return {offset: t.offset, partition: t.partition, topic: t.topic}; + } + /** * Add the topics to the configuration and initialize the encoders/validators. * @param topics topics to add @@ -489,7 +509,7 @@ export class TestBedAdapter extends EventEmitter { * Configuration has changed. */ private configUpdated() { - return new Promise((resolve) => { + return new Promise((resolve, reject) => { if (!this.producer) { return; } @@ -509,18 +529,18 @@ export class TestBedAdapter extends EventEmitter { } : undefined, }; - this.send( - [ - { - topic: TestBedAdapter.ConfigurationTopic, - messages: msg, - }, - ], + const configurationMsg = [ + { + topic: TestBedAdapter.ConfigurationTopic, + messages: msg, + }, + ]; + this.send(configurationMsg, (err?: string, result?: ISendResponse) => { if (err) { - this.emitErrorMsg(err); + return this.emitErrorMsg(`Error updating configuration: ${err} (msg: ${JSON.stringify(configurationMsg)})`, reject); } else if (result) { - this.log.info(result); + this.log.info(`Updated configuration: ${JSON.stringify(result)}`); } resolve(); } @@ -554,6 +574,7 @@ export class TestBedAdapter extends EventEmitter { } ); }, this.config.heartbeatInterval || 5000); + this.log.info(`Started heartbeat`); resolve(); }); // }); diff --git a/yarn.lock b/yarn.lock index f54b1e3..eb027f6 100644 --- a/yarn.lock +++ b/yarn.lock @@ -2,13 +2,13 @@ # yarn lockfile v1 -"@types/bluebird@^3.5.20": - version "3.5.20" - resolved "https://registry.yarnpkg.com/@types/bluebird/-/bluebird-3.5.20.tgz#f6363172add6f4eabb8cada53ca9af2781e8d6a1" +"@types/bluebird@^3.5.24": + version "3.5.24" + resolved "https://registry.yarnpkg.com/@types/bluebird/-/bluebird-3.5.24.tgz#11f76812531c14f793b8ecbf1de96f672905de8a" -"@types/jasmine@^2.8.7": - version "2.8.7" - resolved "https://registry.yarnpkg.com/@types/jasmine/-/jasmine-2.8.7.tgz#3fe583928ae0a22cdd34cedf930eeffeda2602fd" +"@types/jasmine@^2.8.8": + version "2.8.8" + resolved "https://registry.yarnpkg.com/@types/jasmine/-/jasmine-2.8.8.tgz#bf53a7d193ea8b03867a38bfdb4fbb0e0bf066c9" "@types/kafka-node@^2.0.6": version "2.0.6" @@ -20,9 +20,9 @@ version "9.4.6" resolved "https://registry.yarnpkg.com/@types/node/-/node-9.4.6.tgz#d8176d864ee48753d053783e4e463aec86b8d82e" -"@types/node@^10.1.2": - version "10.1.2" - resolved "https://registry.yarnpkg.com/@types/node/-/node-10.1.2.tgz#1b928a0baa408fc8ae3ac012cc81375addc147c6" +"@types/node@^10.9.3": + version "10.9.3" + resolved "https://registry.yarnpkg.com/@types/node/-/node-10.9.3.tgz#85f288502503ade0b3bfc049fe1777b05d0327d5" "@types/proxyquire@^1.3.28": version "1.3.28" @@ -92,9 +92,9 @@ asynckit@^0.4.0: version "0.4.0" resolved "https://registry.yarnpkg.com/asynckit/-/asynckit-0.4.0.tgz#c79ed97f7f34cb8f2ba1bc9790bcc366474b4b79" -avsc@^5.2.3: - version "5.2.3" - resolved "https://registry.yarnpkg.com/avsc/-/avsc-5.2.3.tgz#d52c79898f79f032335f5ae3206ff4300ceb881a" +avsc@^5.4.3: + version "5.4.3" + resolved "https://registry.yarnpkg.com/avsc/-/avsc-5.4.3.tgz#56b5d2b188f4cf5c1aa1c204df387582535040f8" aws-sign2@~0.7.0: version "0.7.0" @@ -469,6 +469,10 @@ hoek@4.x.x: version "4.2.1" resolved "https://registry.yarnpkg.com/hoek/-/hoek-4.2.1.tgz#9634502aa12c445dd5a7c5734b572bb8738aacbb" +hoek@5.0.4: + version "5.0.4" + resolved "https://registry.yarnpkg.com/hoek/-/hoek-5.0.4.tgz#0f7fa270a1cafeb364a4b2ddfaa33f864e4157da" + homedir-polyfill@^1.0.1: version "1.0.1" resolved "https://registry.yarnpkg.com/homedir-polyfill/-/homedir-polyfill-1.0.1.tgz#4c2bbc8a758998feebf5ed68580f76d46768b4bc" @@ -1180,9 +1184,9 @@ typescript@^2.4.1: version "2.7.2" resolved "https://registry.yarnpkg.com/typescript/-/typescript-2.7.2.tgz#2d615a1ef4aee4f574425cdff7026edf81919836" -typescript@^2.8.3: - version "2.8.3" - resolved "https://registry.yarnpkg.com/typescript/-/typescript-2.8.3.tgz#5d817f9b6f31bb871835f4edf0089f21abe6c170" +typescript@^2.9.1: + version "2.9.2" + resolved "https://registry.yarnpkg.com/typescript/-/typescript-2.9.2.tgz#1cbf61d05d6b96269244eb6a3bce4bd914e0f00c" underscore@~1.4.4: version "1.4.4"