Skip to content

Commit

Permalink
Rename
Browse files Browse the repository at this point in the history
  • Loading branch information
Hexagon committed Nov 21, 2016
1 parent e7d3149 commit d5fda4c
Show file tree
Hide file tree
Showing 5 changed files with 288 additions and 59 deletions.
84 changes: 66 additions & 18 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,48 +1,47 @@
# qbus-ipc
# crocket

[![Build status](https://travis-ci.org/Hexagon/qbus-ipc.svg)](https://travis-ci.org/Hexagon/qbus-ipc) [![MIT License](https://img.shields.io/badge/license-MIT-blue.svg)](https://img.shields.io/badge/license-MIT-blue.svg)
[![Build status](https://travis-ci.org/Hexagon/crocket.svg)](https://travis-ci.org/Hexagon/crocket) [![MIT License](https://img.shields.io/badge/license-MIT-blue.svg)](https://img.shields.io/badge/license-MIT-blue.svg)

Minimal node.js cross platform IPC communication library.

* Communcates over TCP, unix sockets or windows pipe.
* [qbus](https://www.npmjs.com/package/qbus) as event mediator
* Works locally OR remotely.
* Works on Linux, Windows AND macOS
* Works both locally and remotely.
* Works on Linux, Windows AND macOS.
* Pluggable event mediator, uses EventEmitter by default. But can be extended with something like [qbus](https://www.npmjs.com/package/qbus) for extended functionality.


# Installation

```npm install qbus-ipc```
```npm install crocket```

# Usage

### Host process

```javascript
var ipc = require("qbus-ipc"),
var ipc = require("crocket"),
server = new ipc();

// Start listening, this example communicate by file sockets
server.listen({ "path": "/tmp/qbus-ipc-test.sock" }, (e) => {
server.listen({ "path": "/tmp/crocket-ipc-test.sock" }, (e) => {

// Fatal errors are supplied as the first parameter to callback
if(e) throw e;

// All is well if we got this far
console.log('IPC listening on /tmp/qbus-ipc-test.sock');
console.log('IPC listening on /tmp/crocket-test.sock');

});

// Events are handled by qbus
// Documentation: https://github.com/unkelpehr/qbus
// Query tester: http://unkelpehr.github.io/qbus/
server.on('/request/:what', function (what, payload) {
// Events are handled by EventEmitter by default ...
server.on('/request/food', function (payload) {

// Respond to the query
server.emit('/response', 'You asked for ' + what + ' and supplied ' + payload);
server.emit('/response', 'You asked for food and supplied ' + payload);

});


// React to communication errors
server.on('error', (e) => { console.error('Communication error occurred: ', e); });
```
Expand All @@ -51,16 +50,16 @@ Output

```
> node test-server.js
IPC listening on /tmp/qbus-ipc-test.sock
IPC listening on /tmp/crocket-test.sock
```

### Client process

```javascript
var ipc = require("qbus-ipc"),
var ipc = require("crocket"),
client = new ipc();

client.connect({ "path": "/tmp/qbus-ipc-test.sock" }, (e) => {
client.connect({ "path": "/tmp/crocket-test.sock" }, (e) => {

// Connection errors are supplied as the first parameter to callback
if(e) throw e;
Expand Down Expand Up @@ -88,6 +87,54 @@ Output
> node test-client.js
Server said: You asked for food and supplied cash
```

### Replacing EventEmitter

### Host process

```javascript
var ipc = require("crocket"),
server = new ipc(),

// Require the alternative event handler
qbus = require("qbus");

// Use!
server.use(qbus);

// Start listening, this example communicate by file sockets
server.listen({ "path": "/tmp/crocket-ipc-test.sock" }, (e) => {

// Fatal errors are supplied as the first parameter to callback
if(e) throw e;

// All is well if we got this far
console.log('IPC listening on /tmp/crocket-test.sock');

});

// Now we're using qbus to handle events
// Documentation: https://github.com/unkelpehr/qbus
// Query tester: http://unkelpehr.github.io/qbus/
server.on('/request/:what', function (what, payload) {

// Respond to the query
server.emit('/response', 'You asked for ' + what + ' and supplied ' + payload);

});

// React to communication errors
server.on('error', (e) => { console.error('Communication error occurred: ', e); });
```

Output

```
> node test-server.js
IPC listening on /tmp/crocket-test.sock
```


### Options

All available options for server.listen
Expand All @@ -112,6 +159,7 @@ All available options for client.connect
"host": null,
"port": null,
"reconnect": -1,
"timeout": 5000,
"encoding": "utf8"
}
```
Expand All @@ -120,7 +168,7 @@ All available options for client.connect

**Port** is specified if you want to use TCP instead of file sockets.

**Host** Only used in TCP mode. For server, ```0.0.0.0``` makes qbus-ipc listen on any IPv4-interface. ```::``` Is the equivalent for IPv6. For client, you specify the host address.
**Host** Only used in TCP mode. For server, ```0.0.0.0``` makes crocket listen on any IPv4-interface. ```::``` Is the equivalent for IPv6. For client, you specify the host address.

**Reconnect** is the number of milliseconds to wait before reviving a broken listener/connection, or -1 to disable automtic revive.

Expand Down
9 changes: 5 additions & 4 deletions lib/defaults.json
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
{
"serverDefaults": {
"path": "/tmp/node-ipc.sock",
"server": {
"path": "/tmp/crocket-ipc.sock",
"host": null,
"port": null,
"reconnect": 2000,
"encoding": "utf8"
},
"clientDefaults": {
"path": "/tmp/node-ipc.sock",
"client": {
"path": "/tmp/crocket-ipc.sock",
"host": null,
"port": null,
"reconnect": -1,
"timeout": 5000,
"encoding": "utf8"
}
}
80 changes: 58 additions & 22 deletions lib/ipc.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,16 @@ THE SOFTWARE.
const
xpipe = require("xpipe"),
net = require("net"),
qbus = require("qbus"),
defaults = require("./defaults.json"),

EventEmitter = require('events'),

extend = require("util")._extend;

function IPC () {

// Private properties
var bus = new qbus(),
var mediator,
sockets = [];

// Private methods
Expand All @@ -58,7 +59,7 @@ function IPC () {
if (callback) {
callback(e);
} else {
bus.emit("error", e);
mediator.emit("error", e);
}
}

Expand All @@ -79,54 +80,64 @@ function IPC () {
try {
var incoming = JSON.parse(data);
if( incoming && incoming.topic ) {
bus.emit(incoming.topic, incoming.data, socket);
mediator.emit(incoming.topic, incoming.data, socket);
} else {
bus.emit("error", new Error("Invalid data received."));
mediator.emit("error", new Error("Invalid data received."));
}
} catch (e) {
bus.emit("error", e);
mediator.emit("error", e);
}
},

connect = (obj, fn, callback) => {
if (this.opts.port) obj[fn](this.opts.port, this.opts.host, callback);
else obj[fn](xpipe.eq(this.opts.path), callback);
initializeMediator = () => {
if ( !mediator ) mediator = new EventEmitter();
// Register bugus error listener
mediator.on( "error" , (e) => {} );
};

// Public methods
this.on = function (event, callback) { bus.on(event, callback); return this; };
this.use = (o) => { mediator = new o() };
this.on = function (event, callback) { mediator.on(event, callback); return this; };
this.emit = function (topic, data, socket, callback) { write(topic, data, socket, callback); return this; };
this.close = function (callback) { close(callback); return this; };
this.listen = function (options, callback) {

// ToDo, make options optional
var server = net.createServer(),
opts = extend(extend({}, defaults.server), options);
opts = extend(extend({}, defaults.server), options),

connect = () => {
if (opts.port) server.listen(opts.port, opts.host, callback);
else server.listen(xpipe.eq(opts.path), callback);
};

this.isServer = true;
this.opts = opts;
this.server = server;

server.on("error", (e) => bus.emit("error", e) );

initializeMediator();

server.on("error", (e) => mediator.emit("error", e) );

// New connection established
server.on("connection", function (socket) {
sockets.push(socket);
bus.emit("connect", socket);
mediator.emit("connect", socket);
socket.setEncoding(opts.encoding);
socket.on("data", (data) => onData(data, socket) );
socket.on("close", (socket) => {
bus.emit("disconnect", socket);
mediator.emit("disconnect", socket);
sockets.splice(sockets.indexOf(socket), 1);
});
});

server.on("close", function () {
if (opts.reconnect > 0) setTimeout(() => connect(server.listen), opts.reconnect);
else bus.emit("close");
else mediator.emit("close");
});

connect(server, "listen", callback);
// Start listening
connect();

return this;
};
Expand All @@ -135,24 +146,49 @@ function IPC () {

// ToDo, make options optional
var socket = new net.Socket(),
opts = extend(extend({}, defaults.client), options);
opts = extend(extend({}, defaults.client), options),

flagConnected,

connected = () => {
flagConnected = true;
callback && callback();
},

connect = (first) => {
flagConnected = false;
if (opts.port) socket.connect(opts.port, opts.host, first ? connected : undefined);
else socket.connect(xpipe.eq(opts.path), first ? connected : undefined);
setTimeout(() => {
if ( !flagConnected ) {
socket.destroy();
if (opts.reconnect === -1) {
callback(new Error('Connection timeout'));
}
}
}, opts.timeout);
};

this.isServer = false;
this.opts = opts;
sockets = [socket];

initializeMediator();

socket.setEncoding(opts.encoding);

socket.on("error", (e) => bus.emit("error", e) );
socket.on("error", (e) => {
mediator.emit("error", e)
});

socket.on("data", (data) => onData(data, socket) );

socket.on("close", () => {
if (opts.reconnect > 0) setTimeout(() => connect(socket.connect), opts.reconnect);
else bus.emit("close");
if (opts.reconnect > 0) setTimeout(() => connect(), opts.reconnect);
else mediator.emit("close");
});

connect(socket, "connect", callback);
connect(true);

return this;
};
Expand Down
14 changes: 8 additions & 6 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "qbus-ipc",
"version": "0.9.5",
"description": "Efficient and simple node.js TCP/unix socket/windows socket interprocess communication based upon qbus.",
"name": "crocket",
"version": "0.9.6",
"description": "Efficient and simple interprocess communication for unix/windows/macos over tcp or sockets.",
"author": "Hexagon <github.com/hexagon>",
"scripts": {
"test": "mocha"
Expand All @@ -14,24 +14,26 @@
"url": "https://github.com/hexagon/qbus-ipc/issues"
},
"keywords": [
"qbus",
"ipc",
"rpc",
"interprocess",
"communication",
"tcp",
"mediator",
"eventemitter",
"qbus",
"unix",
"windows",
"sockets",
"net"
],
"dependencies": {
"qbus": "*",
"xpipe": "*"
},
"devDependencies": {
"mocha": "*",
"should": "*"
"should": "*",
"qbus": "*"
},
"license": "MIT"
}
Loading

0 comments on commit d5fda4c

Please sign in to comment.