diff --git a/README.md b/README.md index 26c818d..e00cda7 100644 --- a/README.md +++ b/README.md @@ -1,48 +1,47 @@ -# qbus-ipc +# crocket -[![Build status](https://travis-ci.org/Hexagon/qbus-ipc.svg)](https://travis-ci.org/Hexagon/qbus-ipc) [![MIT License](https://img.shields.io/badge/license-MIT-blue.svg)](https://img.shields.io/badge/license-MIT-blue.svg) +[![Build status](https://travis-ci.org/Hexagon/crocket.svg)](https://travis-ci.org/Hexagon/crocket) [![MIT License](https://img.shields.io/badge/license-MIT-blue.svg)](https://img.shields.io/badge/license-MIT-blue.svg) Minimal node.js cross platform IPC communication library. * Communcates over TCP, unix sockets or windows pipe. -* [qbus](https://www.npmjs.com/package/qbus) as event mediator -* Works locally OR remotely. -* Works on Linux, Windows AND macOS +* Works both locally and remotely. +* Works on Linux, Windows AND macOS. +* Pluggable event mediator, uses EventEmitter by default. But can be extended with something like [qbus](https://www.npmjs.com/package/qbus) for extended functionality. # Installation -```npm install qbus-ipc``` +```npm install crocket``` # Usage ### Host process ```javascript -var ipc = require("qbus-ipc"), +var ipc = require("crocket"), server = new ipc(); // Start listening, this example communicate by file sockets -server.listen({ "path": "/tmp/qbus-ipc-test.sock" }, (e) => { +server.listen({ "path": "/tmp/crocket-ipc-test.sock" }, (e) => { // Fatal errors are supplied as the first parameter to callback if(e) throw e; // All is well if we got this far - console.log('IPC listening on /tmp/qbus-ipc-test.sock'); + console.log('IPC listening on /tmp/crocket-test.sock'); }); -// Events are handled by qbus -// Documentation: https://github.com/unkelpehr/qbus -// Query tester: http://unkelpehr.github.io/qbus/ -server.on('/request/:what', function (what, payload) { +// Events are handled by EventEmitter by default ... +server.on('/request/food', function (payload) { // Respond to the query - server.emit('/response', 'You asked for ' + what + ' and supplied ' + payload); + server.emit('/response', 'You asked for food and supplied ' + payload); }); + // React to communication errors server.on('error', (e) => { console.error('Communication error occurred: ', e); }); ``` @@ -51,16 +50,16 @@ Output ``` > node test-server.js -IPC listening on /tmp/qbus-ipc-test.sock +IPC listening on /tmp/crocket-test.sock ``` ### Client process ```javascript -var ipc = require("qbus-ipc"), +var ipc = require("crocket"), client = new ipc(); -client.connect({ "path": "/tmp/qbus-ipc-test.sock" }, (e) => { +client.connect({ "path": "/tmp/crocket-test.sock" }, (e) => { // Connection errors are supplied as the first parameter to callback if(e) throw e; @@ -88,6 +87,54 @@ Output > node test-client.js Server said: You asked for food and supplied cash ``` + +### Replacing EventEmitter + +### Host process + +```javascript +var ipc = require("crocket"), + server = new ipc(), + + // Require the alternative event handler + qbus = require("qbus"); + +// Use! +server.use(qbus); + +// Start listening, this example communicate by file sockets +server.listen({ "path": "/tmp/crocket-ipc-test.sock" }, (e) => { + + // Fatal errors are supplied as the first parameter to callback + if(e) throw e; + + // All is well if we got this far + console.log('IPC listening on /tmp/crocket-test.sock'); + +}); + +// Now we're using qbus to handle events +// Documentation: https://github.com/unkelpehr/qbus +// Query tester: http://unkelpehr.github.io/qbus/ +server.on('/request/:what', function (what, payload) { + + // Respond to the query + server.emit('/response', 'You asked for ' + what + ' and supplied ' + payload); + +}); + +// React to communication errors +server.on('error', (e) => { console.error('Communication error occurred: ', e); }); +``` + +Output + +``` +> node test-server.js +IPC listening on /tmp/crocket-test.sock +``` + + ### Options All available options for server.listen @@ -112,6 +159,7 @@ All available options for client.connect "host": null, "port": null, "reconnect": -1, + "timeout": 5000, "encoding": "utf8" } ``` @@ -120,7 +168,7 @@ All available options for client.connect **Port** is specified if you want to use TCP instead of file sockets. -**Host** Only used in TCP mode. For server, ```0.0.0.0``` makes qbus-ipc listen on any IPv4-interface. ```::``` Is the equivalent for IPv6. For client, you specify the host address. +**Host** Only used in TCP mode. For server, ```0.0.0.0``` makes crocket listen on any IPv4-interface. ```::``` Is the equivalent for IPv6. For client, you specify the host address. **Reconnect** is the number of milliseconds to wait before reviving a broken listener/connection, or -1 to disable automtic revive. diff --git a/lib/defaults.json b/lib/defaults.json index f345980..2df28d5 100644 --- a/lib/defaults.json +++ b/lib/defaults.json @@ -1,16 +1,17 @@ { - "serverDefaults": { - "path": "/tmp/node-ipc.sock", + "server": { + "path": "/tmp/crocket-ipc.sock", "host": null, "port": null, "reconnect": 2000, "encoding": "utf8" }, - "clientDefaults": { - "path": "/tmp/node-ipc.sock", + "client": { + "path": "/tmp/crocket-ipc.sock", "host": null, "port": null, "reconnect": -1, + "timeout": 5000, "encoding": "utf8" } } \ No newline at end of file diff --git a/lib/ipc.js b/lib/ipc.js index 6624789..4cb862a 100644 --- a/lib/ipc.js +++ b/lib/ipc.js @@ -27,15 +27,16 @@ THE SOFTWARE. const xpipe = require("xpipe"), net = require("net"), - qbus = require("qbus"), defaults = require("./defaults.json"), + EventEmitter = require('events'), + extend = require("util")._extend; function IPC () { // Private properties - var bus = new qbus(), + var mediator, sockets = []; // Private methods @@ -58,7 +59,7 @@ function IPC () { if (callback) { callback(e); } else { - bus.emit("error", e); + mediator.emit("error", e); } } @@ -79,54 +80,64 @@ function IPC () { try { var incoming = JSON.parse(data); if( incoming && incoming.topic ) { - bus.emit(incoming.topic, incoming.data, socket); + mediator.emit(incoming.topic, incoming.data, socket); } else { - bus.emit("error", new Error("Invalid data received.")); + mediator.emit("error", new Error("Invalid data received.")); } } catch (e) { - bus.emit("error", e); + mediator.emit("error", e); } }, - connect = (obj, fn, callback) => { - if (this.opts.port) obj[fn](this.opts.port, this.opts.host, callback); - else obj[fn](xpipe.eq(this.opts.path), callback); + initializeMediator = () => { + if ( !mediator ) mediator = new EventEmitter(); + // Register bugus error listener + mediator.on( "error" , (e) => {} ); }; // Public methods - this.on = function (event, callback) { bus.on(event, callback); return this; }; + this.use = (o) => { mediator = new o() }; + this.on = function (event, callback) { mediator.on(event, callback); return this; }; this.emit = function (topic, data, socket, callback) { write(topic, data, socket, callback); return this; }; this.close = function (callback) { close(callback); return this; }; this.listen = function (options, callback) { // ToDo, make options optional var server = net.createServer(), - opts = extend(extend({}, defaults.server), options); + opts = extend(extend({}, defaults.server), options), + + connect = () => { + if (opts.port) server.listen(opts.port, opts.host, callback); + else server.listen(xpipe.eq(opts.path), callback); + }; this.isServer = true; this.opts = opts; this.server = server; - - server.on("error", (e) => bus.emit("error", e) ); + + initializeMediator(); + + server.on("error", (e) => mediator.emit("error", e) ); // New connection established server.on("connection", function (socket) { sockets.push(socket); - bus.emit("connect", socket); + mediator.emit("connect", socket); socket.setEncoding(opts.encoding); socket.on("data", (data) => onData(data, socket) ); socket.on("close", (socket) => { - bus.emit("disconnect", socket); + mediator.emit("disconnect", socket); sockets.splice(sockets.indexOf(socket), 1); }); }); server.on("close", function () { if (opts.reconnect > 0) setTimeout(() => connect(server.listen), opts.reconnect); - else bus.emit("close"); + else mediator.emit("close"); }); - connect(server, "listen", callback); + // Start listening + connect(); return this; }; @@ -135,24 +146,49 @@ function IPC () { // ToDo, make options optional var socket = new net.Socket(), - opts = extend(extend({}, defaults.client), options); + opts = extend(extend({}, defaults.client), options), + + flagConnected, + + connected = () => { + flagConnected = true; + callback && callback(); + }, + + connect = (first) => { + flagConnected = false; + if (opts.port) socket.connect(opts.port, opts.host, first ? connected : undefined); + else socket.connect(xpipe.eq(opts.path), first ? connected : undefined); + setTimeout(() => { + if ( !flagConnected ) { + socket.destroy(); + if (opts.reconnect === -1) { + callback(new Error('Connection timeout')); + } + } + }, opts.timeout); + }; this.isServer = false; this.opts = opts; sockets = [socket]; + initializeMediator(); + socket.setEncoding(opts.encoding); - socket.on("error", (e) => bus.emit("error", e) ); + socket.on("error", (e) => { + mediator.emit("error", e) + }); socket.on("data", (data) => onData(data, socket) ); socket.on("close", () => { - if (opts.reconnect > 0) setTimeout(() => connect(socket.connect), opts.reconnect); - else bus.emit("close"); + if (opts.reconnect > 0) setTimeout(() => connect(), opts.reconnect); + else mediator.emit("close"); }); - connect(socket, "connect", callback); + connect(true); return this; }; diff --git a/package.json b/package.json index 0c691bf..e36d046 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { - "name": "qbus-ipc", - "version": "0.9.5", - "description": "Efficient and simple node.js TCP/unix socket/windows socket interprocess communication based upon qbus.", + "name": "crocket", + "version": "0.9.6", + "description": "Efficient and simple interprocess communication for unix/windows/macos over tcp or sockets.", "author": "Hexagon ", "scripts": { "test": "mocha" @@ -14,24 +14,26 @@ "url": "https://github.com/hexagon/qbus-ipc/issues" }, "keywords": [ - "qbus", "ipc", "rpc", "interprocess", "communication", "tcp", + "mediator", + "eventemitter", + "qbus", "unix", "windows", "sockets", "net" ], "dependencies": { - "qbus": "*", "xpipe": "*" }, "devDependencies": { "mocha": "*", - "should": "*" + "should": "*", + "qbus": "*" }, "license": "MIT" } \ No newline at end of file diff --git a/test/test.js b/test/test.js index 6f136e1..9081098 100644 --- a/test/test.js +++ b/test/test.js @@ -93,17 +93,85 @@ describe("Connecting ...", function () { }); -describe("One way communication ...", function () { +describe("Connecting to non existing server ...", function () { + + it("with tcp should throw error", function (done) { + + (function(){ + + var client = new ipc(); + client.connect({ host: "asdf", port: 1234, timeout: 500}, (e) => { + if(e) { + done(); + } + }); + + }).should.not.throw(); + }); + + it("with socket should throw error", function (done) { + + (function(){ + + var client = new ipc(); + client.connect({ path: "/tmp/__lol-asdf-not-existing", timeout: 500}, (e) => { + if(e) { + done(); + } + }); + + }).should.not.throw(); + }); + +}); + + +describe("Reconnecting ...", function () { + + it("with tcp should not throw", function (done) { + this.timeout(15000); + + (function(){ + + var client = new ipc(); + client.connect({ host: "127.0.0.1", port: 51234, timeout: 500, reconnect: 500}, (e) => { + if(!e) { + done(); + } + }); + + // Start server after 5 seconds + var server = new ipc(); + server.listen({ host: "127.0.0.1", port: 51234 } , (e1) => { + if(e1) throw e1; + // Create client + var client = new ipc(); + client.connect(address.socket, (e2) => { + if(e2) throw e2; + client.close(); + server.close(done); + }); + }); + + }).should.not.throw(); + }); + +}); + +describe("One way communication (qbus)...", function () { it("over tcp should complete and not throw", function (done) { (function(){ // Create server - var server = new ipc(); + var server = new ipc(), + qbus = require("qbus"); + server.use(qbus); server.listen(address.tcp, (e1) => { if(e1) throw e1; // Create client var client = new ipc(); + client.use(qbus); client.connect(address.tcp, (e2) => { if(e2) throw e2; client.emit('/test/send', 'I am payload'); @@ -125,11 +193,15 @@ describe("One way communication ...", function () { (function(){ // Create server - var server = new ipc(); + var server = new ipc(), + qbus = require("qbus"); + server.use(qbus); server.listen(address.socket, (e1) => { if(e1) throw e1; // Create client - var client = new ipc(); + var client = new ipc(), + qbus = require("qbus"); + client.use(qbus); client.connect(address.socket, (e2) => { if(e2) throw e2; client.emit('/test/send', 'I am payload'); @@ -150,17 +222,21 @@ describe("One way communication ...", function () { }); -describe("Two way communication ...", function () { +describe("Two way communication (qbus) ...", function () { it("over tcp should complete and not throw", function (done) { (function(){ // Create server - var server = new ipc(); + var server = new ipc(), + qbus = require("qbus"); + server.use(qbus); server.listen(address.tcp, (e1) => { if(e1) throw e1; // Create client - var client = new ipc(); + var client = new ipc(), + qbus = require("qbus"); + client.use(qbus); client.connect(address.tcp, (e2) => { if(e2) throw e2; client.emit('/test/send', 'I am payload'); @@ -187,11 +263,15 @@ describe("Two way communication ...", function () { (function(){ // Create server - var server = new ipc(); + var server = new ipc(), + qbus = require("qbus"); + server.use(qbus); server.listen(address.socket, (e1) => { if(e1) throw e1; // Create client - var client = new ipc(); + var client = new ipc(), + qbus = require("qbus"); + client.use(qbus); client.connect(address.socket, (e2) => { if(e2) throw e2; client.emit('/test/send', 'I am payload'); @@ -214,4 +294,66 @@ describe("Two way communication ...", function () { }).should.not.throw(); }); +}); + +describe("Two way communication (EventEmitter) ...", function () { + + it("over tcp should complete and not throw", function (done) { + (function(){ + + // Create server + var server = new ipc(); + server.listen(address.tcp, (e1) => { + if(e1) throw e1; + // Create client + var client = new ipc(); + client.connect(address.tcp, (e2) => { + if(e2) throw e2; + client.emit('/test/send', 'I am payload'); + client.on('/test/reply', function () { + client.close(); + server.close(done); + }); + }); + client.on('error', (e) => { throw e; }) ; + }); + server.on('/test/send', function (payload) { + if (payload == 'I am payload') { + server.emit('/test/reply'); + } + }); + server.on('error', (e) => { throw e; }) ; + + }).should.not.throw(); + }); + + it("over sockets should complete and not throw", function (done) { + (function(){ + + // Create server + var server = new ipc(); + server.listen(address.socket, (e1) => { + if(e1) throw e1; + // Create client + var client = new ipc(); + client.connect(address.socket, (e2) => { + if(e2) throw e2; + client.emit('/test/send', 'I am payload'); + client.on('/test/reply', function () { + client.close(); + server.close(done); + }); + }); + client.on('error', (e) => { throw e; }) ; + }); + server.on('/test/send', function (payload) { + if ( payload == 'I am payload') { + server.emit('/test/reply'); + } + }); + server.on('error', (e) => { throw e; }) ; + + }).should.not.throw(); + }); + }); \ No newline at end of file