Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix issue 173 relating to zombie connections from Socket #176

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 49 additions & 33 deletions lib/Socket.ts
Original file line number Diff line number Diff line change
@@ -1,54 +1,70 @@
import { BITSOCKET_URL, WS_URL } from "./BITBOX"
import { SocketConfig } from "./interfaces/BITBOXInterfaces"
import io from "socket.io-client"
import EventSource from "eventsource"

enum SocketType {
Uninitialized,
SocketIO,
BitSocket
}

const io: any = require("socket.io-client")
export class Socket {
socket: any
websocketURL: string
bitsocketURL: string
constructor(config: SocketConfig = {}) {
if (config.wsURL) {
// default to passed in wsURL
this.websocketURL = config.wsURL
} else if (config.restURL) {
// 2nd option deprecated restURL
this.websocketURL = config.restURL
} else {
// fallback to WS_URL
this.websocketURL = WS_URL
}

if (config.bitsocketURL) {
this.bitsocketURL = config.bitsocketURL
} else {
this.bitsocketURL = BITSOCKET_URL
}
socketType: SocketType = SocketType.Uninitialized

constructor(config: SocketConfig = {}) {
// Order of preference: passed in wsURL, deprecated restURL, fallback WS_URL
this.websocketURL = config.wsURL || config.restURL || WS_URL
// Similar for BitSocket case
this.bitsocketURL = config.bitsocketURL || BITSOCKET_URL
// Execute callback (immediate, synchronous and unconditional)
if (config.callback) config.callback()
// Note that we can't set socketType in constructor as config may contain
// both socket.io and BitSocket URLs, so we need to wait for listen() before
// we know which type it will be.
}

public listen(query: string, cb: Function): void {
if (query === "blocks" || query === "transactions") {
this.socket = io(this.websocketURL, { transports: ["websocket"] })
this.socket.emit(query)

if (query === "blocks") this.socket.on("blocks", (msg: any) => cb(msg))
else if (query === "transactions")
this.socket.on("transactions", (msg: any) => cb(msg))
} else {
let EventSource = require("eventsource")
let b64 = Buffer.from(JSON.stringify(query)).toString("base64")
this.socket = new EventSource(`${this.bitsocketURL}/s/${b64}`)
this.socket.onmessage = (msg: any) => {
cb(msg.data)
// socket.io case
switch (this.socketType) {
case SocketType.Uninitialized:
this.socketType = SocketType.SocketIO
this.socket = io(this.websocketURL, { transports: ["websocket"] })
case SocketType.SocketIO:
// Send server the event name of interest. At time of writing this is
// not used by the server but is left in so that server-side filtering
// is an option in the future.
this.socket.emit(query)
this.socket.on(query, (msg: any) => cb(msg))
break;
case SocketType.BitSocket:
throw new Error("Query type not possible on a BitSocket connection.")
}
} else {
// BitSocket case
switch (this.socketType) {
case SocketType.Uninitialized:
this.socketType = SocketType.BitSocket
let b64 = Buffer.from(JSON.stringify(query)).toString("base64")
this.socket = new EventSource(`${this.bitsocketURL}/s/${b64}`)
this.socket.onmessage = (msg: any) => {
cb(msg.data)
}
break;
case SocketType.BitSocket:
throw new Error("Only one BitSocket query can be run at a time.")
case SocketType.SocketIO:
throw new Error("Query type not possible on a SocketIO connection.")
}
}
}

public close(cb?: Function): void {
this.socket.close()
if (cb) {
cb()
}
if (cb) cb()
}
}
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
"@bitcoin-dot-com/bitcoincashjs2-lib": "^4.1.0",
"@types/bigi": "^1.4.2",
"@types/bip39": "^2.4.2",
"@types/eventsource": "^1.1.2",
"@types/randombytes": "^2.0.0",
"@types/socket.io": "^2.1.2",
"@types/socket.io-client": "^1.4.32",
Expand Down
62 changes: 62 additions & 0 deletions test/e2e/count-connections-issue-173/countConnectionsBitSocket.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
const BITBOX = require("../../../lib/BITBOX").BITBOX;
const { exec } = require('child_process');

const bitbox = new BITBOX();
const socket = new bitbox.Socket();

function countSockets(stage) {
return new Promise((resolve, reject) => {
// Call the lsof system command for outgoing internet connections.
exec(`lsof -i -n -P | grep ${process.pid}`, (err, stdout, stderr) => {
// Print list of open connections allowing a visual count to be done.
console.log(`Outbound connections from this node process ${stage}:\n${stdout}`);
resolve();
});
});
}

function sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}

(async () => {

await countSockets("before calling listen");

// First call to listen() which should create new connection.
socket.listen({"v": 3, "q": {"find": {}}},
(message) => {
console.log("Callback from first query invoked.");
});

// Second call to listen() which should share connection with first call.
// Use try catch in case this throws one of our new errors.
try {
socket.listen({"v": 3, "q": {"find": {}}},
(message) => {
console.log("Callback from first query invoked.");
});
} catch(error) {
console.log(`ERROR: ${error.message}`);
}

// listen doesn't return a promise so wait 100ms for connections to establish.
await sleep(100);

await countSockets("after calling listen twice");

// now close the socket
socket.close();

// callback from close() is short-circuited so give it 100ms to clean up.
await sleep(100);

// check if any zombie connections remaining
await countSockets("after calling close (zombie connections)");

// exit process
process.exit();

})().catch((error)=> {console.log(`ERROR: ${error.message}`)});


Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
const BITBOX = require("../../../lib/BITBOX").BITBOX;
const { exec } = require('child_process');

const bitbox = new BITBOX();
const socket = new bitbox.Socket();

function countSockets(stage) {
return new Promise((resolve, reject) => {
// Call the lsof system command for outgoing internet connections.
exec(`lsof -i -n -P | grep ${process.pid}`, (err, stdout, stderr) => {
// Print list of open connections allowing a visual count to be done.
console.log(`Outbound connections from this node process ${stage}:\n${stdout}`);
resolve();
});
});
}

function sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}

(async () => {

await countSockets("before calling listen");

// First call to listen() which should create new BitSocket connection.
socket.listen({"v": 3, "q": {"find": {}}},
(message) => {
console.log("Callback from first query invoked.");
});

// Second call to listen() which needs a socket.io connection and so can't
// share the exisiting connection. Use try catch in case this throws one of
// our new errors.
try {
socket.listen("blocks", (message) => {
console.log("Received a block.");
});
} catch(error) {
console.log(`ERROR: ${error.message}`);
}

// listen doesn't return a promise so wait 100ms for connections to establish.
await sleep(100);

await countSockets("after calling listen twice");

// now close the socket
socket.close();

// callback from close() is short-circuited so give it 100ms to clean up.
await sleep(100);

// check if any zombie connections remaining
await countSockets("after calling close (zombie connections)");

// exit process
process.exit();

})().catch((error)=> {console.log(`ERROR: ${error.message}`)});


Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
const BITBOX = require("../../../lib/BITBOX").BITBOX;
const { exec } = require('child_process');

const bitbox = new BITBOX();
const socket = new bitbox.Socket();

function countSockets(stage) {
return new Promise((resolve, reject) => {
// Call the lsof system command for outgoing internet connections.
exec(`lsof -i -n -P | grep ${process.pid}`, (err, stdout, stderr) => {
// Print list of open connections allowing a visual count to be done.
console.log(`Outbound connections from this node process ${stage}:\n${stdout}`);
resolve();
});
});
}

function sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}

(async () => {

await countSockets("before calling listen");

// First call to listen() which should create new connection.
socket.listen("transactions", (message) => {
console.log("Received a transaction.");
});

// Second call to listen() which should share connection with first call.
// Use try catch in case this throws one of our new errors.
try {
socket.listen({"v": 3, "q": {"find": {}}},
(message) => {
console.log("Callback from first query invoked.");
});
} catch(error) {
console.log(`ERROR: ${error.message}`);
}

// listen doesn't return a promise so wait 100ms for connections to establish.
await sleep(100);

await countSockets("after calling listen twice");

// now close the socket
socket.close();

// callback from close() is short-circuited so give it 100ms to clean up.
await sleep(100);

// check if any zombie connections remaining
await countSockets("after calling close (zombie connections)");

// exit process
process.exit();

})().catch((error)=> {console.log(`ERROR: ${error.message}`)});


60 changes: 60 additions & 0 deletions test/e2e/count-connections-issue-173/countConnectionsSocketIO.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
const BITBOX = require("../../../lib/BITBOX").BITBOX;
const { exec } = require('child_process');

const bitbox = new BITBOX();
const socket = new bitbox.Socket();

function countSockets(stage) {
return new Promise((resolve, reject) => {
// Call the lsof system command for outgoing internet connections.
exec(`lsof -i -n -P | grep ${process.pid}`, (err, stdout, stderr) => {
// Print list of open connections allowing a visual count to be done.
console.log(`Outbound connections from this node process ${stage}:\n${stdout}`);
resolve();
});
});
}

function sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}

(async () => {

await countSockets("before calling listen");

// First call to listen() which should create new connection.
socket.listen("transactions", (message) => {
console.log("Received a transaction.");
});

// Second call to listen() which should share connection with first call.
// Use try catch in case this throws one of our new errors.
try {
socket.listen("blocks", (message) => {
console.log("Received a block.");
});
} catch(error) {
console.log(`ERROR: ${error.message}`);
}

// listen doesn't return a promise so wait 100ms for connections to establish.
await sleep(100);

await countSockets("after calling listen twice");

// now close the socket
socket.close();

// callback from close() is short-circuited so give it 100ms to clean up.
await sleep(100);

// check if any zombie connections remaining
await countSockets("after calling close (zombie connections)");

// exit process
process.exit();

})().catch((error)=> {console.log(`ERROR: ${error.message}`)});


5 changes: 5 additions & 0 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,11 @@
resolved "https://registry.yarnpkg.com/@types/events/-/events-3.0.0.tgz#2862f3f58a9a7f7c3e78d79f130dd4d71c25c2a7"
integrity sha512-EaObqwIvayI5a8dCzhFrjKzVwKLxjoG9T6Ppd5CEo07LRKfQ8Yokw54r5+Wq7FaBQ+yXRvQAYPrHwya1/UFt9g==

"@types/eventsource@^1.1.2":
version "1.1.2"
resolved "https://registry.yarnpkg.com/@types/eventsource/-/eventsource-1.1.2.tgz#079ab4213e844e56f7384aec620e1163dab692b3"
integrity sha512-4AKWJ6tvEU4fk0770oAK4Z0lQUuSnc5ljHTcYZhQtdP7XMDKKvegGUC6xGD8+4+F+svZKAzlxbKnuGWfgMtgVA==

"@types/glob@^7.1.1":
version "7.1.1"
resolved "https://registry.yarnpkg.com/@types/glob/-/glob-7.1.1.tgz#aa59a1c6e3fbc421e07ccd31a944c30eba521575"
Expand Down