diff --git a/source/portal/index.js b/source/portal/index.js index ad32d1e37..cf64acb28 100644 --- a/source/portal/index.js +++ b/source/portal/index.js @@ -62,10 +62,7 @@ global.config = config; var amqper = require('./amqpClient')(); -var rpcClient; -var socketio_server; -var portal; -var worker; + var ip_address; (function getPublicIP() { @@ -103,224 +100,275 @@ var ip_address; } })(); -var dropAll = function() { - socketio_server && socketio_server.drop('all'); -}; - -var getTokenKey = function(id, on_key, on_error) { - var dataAccess = require('./data_access'); - dataAccess.token.key(id).then(function (key) { - on_key(key); - }).catch(function (err) { - log.info('Failed to get token key. err:', (err && err.message) ? err.message : err); - on_error(err); - }); -}; - -var joinCluster = function (on_ok) { - var joinOK = on_ok; - - var joinFailed = function (reason) { - log.error('portal join cluster failed. reason:', reason); - worker && worker.quit(); - process.exit(); - }; +// Mutiple process setup +const { setupMaster, setupWorker } = require("@socket.io/sticky"); +const { createAdapter, setupPrimary } = require("@socket.io/cluster-adapter"); +var cluster = require("cluster"); +var numCPUs = 4; - var loss = function () { - log.info('portal lost.'); - dropAll(); - }; - - var recovery = function () { - log.info('portal recovered.'); - }; +if (cluster.isMaster) { + // Master Process + const httpServer = require('http').createServer(); - var spec = {rpcClient: rpcClient, - purpose: 'portal', - clusterName: config.cluster.name, - joinRetry: config.cluster.join_retry, - info: {ip: ip_address, - hostname: config.portal.hostname, - port: config.portal.port, - via_host: config.portal.via_host, - ssl: config.portal.ssl, - state: 2, - max_load: config.cluster.max_load, - capacity: config.capacity - }, - onJoinOK: joinOK, - onJoinFailed: joinFailed, - onLoss: loss, - onRecovery: recovery, - loadCollection: {period: config.cluster.report_load_interval, - item: {name: 'cpu'}} - }; + setupMaster(httpServer, { + loadBalancingMethod: "least-connection", // either "random", "round-robin" or "least-connection" + }); - worker = require('./clusterWorker')(spec); -}; + setupPrimary(); + + cluster.setupMaster({ + serialization: "advanced", + }); -var refreshTokenKey = function(id, portal, tokenKey) { - var interval = setInterval(function() { - getTokenKey(id, function(newTokenKey) { - (socketio_server === undefined) && clearInterval(interval); - if (newTokenKey !== tokenKey) { - log.info('Token key updated!'); - portal.updateTokenKey(newTokenKey); - tokenKey = newTokenKey; - } - }, function() { - (socketio_server === undefined) && clearInterval(interval); - log.warn('Keep trying...'); - }); - }, 6 * 1000); -}; + httpServer.listen(config.portal.port); -var serviceObserver = { - onJoin: function(tokenCode) { - worker && worker.addTask(tokenCode); - }, - onLeave: function(tokenCode) { - worker && worker.removeTask(tokenCode); + for (var i = 0; i < numCPUs; i++) { + cluster.fork(); } -}; - -var startServers = function(id, tokenKey) { - var rpcChannel = require('./rpcChannel')(rpcClient); - var rpcReq = require('./rpcRequest')(rpcChannel); - portal = require('./portal')({tokenKey: tokenKey, - tokenServer: 'ManagementApi', - clusterName: config.cluster.name, - selfRpcId: id}, - rpcReq); - socketio_server = require('./socketIOServer')({port: config.portal.port, - cors: config.portal.cors, - ssl: config.portal.ssl, - forceTlsv12: config.portal.force_tls_v12, - keystorePath: config.portal.keystorePath, - reconnectionTicketLifetime: config.portal.reconnection_ticket_lifetime, - reconnectionTimeout: config.portal.reconnection_timeout, - pingInterval: config.portal.ping_interval, - pingTimeout: config.portal.ping_timeout}, - portal, - serviceObserver); - return socketio_server.start() - .then(function() { - log.info('start socket.io server ok.'); - refreshTokenKey(id, portal, tokenKey); - }) - .catch(function(err) { - log.error('Failed to start servers, reason:', err && err.message); - throw err; + cluster.on('exit', function(worker, code, signal) { + log.info(`Worker ${worker.process.pid} died`); + }); + + ['SIGINT', 'SIGTERM'].map(function (sig) { + process.on(sig, function () { + log.info('Master exiting on', sig); + process.exit(); + }); }); -}; -var stopServers = function() { - socketio_server && socketio_server.stop(); - socketio_server = undefined; - worker && worker.quit(); - worker = undefined; -}; - -var rpcPublic = { - drop: function(participantId, callback) { - socketio_server && socketio_server.drop(participantId); - callback('callback', 'ok'); - }, - notify: function(participantId, event, data, callback) { - // The "notify" is called on socket.io server, - // but one client ID should not be exists in both servers, - // there must be one failure, ignore this notify error here. - var notifyFail = (err) => {}; - socketio_server && socketio_server.notify(participantId, event, data).catch(notifyFail); - callback('callback', 'ok'); - }, - validateAndDeleteWebTransportToken: (token, callback) => { - if(portal.validateAndDeleteWebTransportToken(token)) { - callback('callback','ok'); - } else { - callback('callback', 'error', 'Invalid token for WebTransport.'); + process.on('SIGUSR2', function() { + logger.reconfigure(); + }); +}else{ + // Worker Process + var rpcClient; + var socketio_server; + var portal; + var worker; + + var dropAll = function() { + socketio_server && socketio_server.drop('all'); + }; + + var getTokenKey = function(id, on_key, on_error) { + var dataAccess = require('./data_access'); + dataAccess.token.key(id).then(function (key) { + on_key(key); + }).catch(function (err) { + log.info('Failed to get token key. err:', (err && err.message) ? err.message : err); + on_error(err); + }); + }; + + var joinCluster = function (on_ok) { + var joinOK = on_ok; + + var joinFailed = function (reason) { + log.error('portal join cluster failed. reason:', reason); + worker && worker.quit(); + process.exit(); + }; + + var loss = function () { + log.info('portal lost.'); + dropAll(); + }; + + var recovery = function () { + log.info('portal recovered.'); + }; + + var spec = {rpcClient: rpcClient, + purpose: 'portal', + clusterName: config.cluster.name, + joinRetry: config.cluster.join_retry, + info: {ip: ip_address, + hostname: config.portal.hostname, + port: config.portal.port, + via_host: config.portal.via_host, + ssl: config.portal.ssl, + state: 2, + max_load: config.cluster.max_load, + capacity: config.capacity + }, + onJoinOK: joinOK, + onJoinFailed: joinFailed, + onLoss: loss, + onRecovery: recovery, + loadCollection: {period: config.cluster.report_load_interval, + item: {name: 'cpu'}} + }; + + worker = require('./clusterWorker')(spec); + }; + + var refreshTokenKey = function(id, portal, tokenKey) { + var interval = setInterval(function() { + getTokenKey(id, function(newTokenKey) { + (socketio_server === undefined) && clearInterval(interval); + if (newTokenKey !== tokenKey) { + log.info('Token key updated!'); + portal.updateTokenKey(newTokenKey); + tokenKey = newTokenKey; + } + }, function() { + (socketio_server === undefined) && clearInterval(interval); + log.warn('Keep trying...'); + }); + }, 6 * 1000); + }; + + var serviceObserver = { + onJoin: function(tokenCode) { + worker && worker.addTask(tokenCode); + }, + onLeave: function(tokenCode) { + worker && worker.removeTask(tokenCode); } - }, - broadcast: function(controller, excludeList, event, data, callback) { - socketio_server && socketio_server.broadcast(controller, excludeList, event, data); - callback('callback', 'ok'); - } -}; + }; + + var startServers = function(id, tokenKey) { + var rpcChannel = require('./rpcChannel')(rpcClient); + var rpcReq = require('./rpcRequest')(rpcChannel); + + portal = require('./portal')({tokenKey: tokenKey, + tokenServer: 'ManagementApi', + clusterName: config.cluster.name, + selfRpcId: id}, + rpcReq); + socketio_server = require('./socketIOServer')({port: config.portal.port, + cors: config.portal.cors, + ssl: config.portal.ssl, + forceTlsv12: config.portal.force_tls_v12, + keystorePath: config.portal.keystorePath, + reconnectionTicketLifetime: config.portal.reconnection_ticket_lifetime, + reconnectionTimeout: config.portal.reconnection_timeout, + pingInterval: config.portal.ping_interval, + pingTimeout: config.portal.ping_timeout}, + portal, + serviceObserver); + return socketio_server.start() + .then(function() { + log.info('start socket.io server ok.'); + socketio_server.setupWorkerAdapter(); + refreshTokenKey(id, portal, tokenKey); + }) + .catch(function(err) { + log.error('Failed to start servers, reason:', err && err.message); + throw err; + }); + }; + + var stopServers = function() { + socketio_server && socketio_server.stop(); + socketio_server = undefined; + worker && worker.quit(); + worker = undefined; + }; + + var rpcPublic = { + drop: function(participantId, callback) { + socketio_server && socketio_server.drop(participantId); + callback('callback', 'ok'); + }, + notify: function(participantId, event, data, callback) { + // The "notify" is called on socket.io server, + // but one client ID should not be exists in both servers, + // there must be one failure, ignore this notify error here. + var notifyFail = (err) => {}; + socketio_server && socketio_server.notify(participantId, event, data).catch(notifyFail); + callback('callback', 'ok'); + }, + validateAndDeleteWebTransportToken: (token, callback) => { + if(portal.validateAndDeleteWebTransportToken(token)) { + callback('callback','ok'); + } else { + callback('callback', 'error', 'Invalid token for WebTransport.'); + } + }, + broadcast: function(controller, excludeList, event, data, callback) { + socketio_server && socketio_server.broadcast(controller, excludeList, event, data); + callback('callback', 'ok'); + } + }; -amqper.connect(config.rabbit, function () { - amqper.asRpcClient(function(rpcClnt) { - rpcClient = rpcClnt; - log.info('portal initializing as rpc client ok'); - joinCluster(function(id) { - log.info('portal join cluster ok, with rpcID:', id); - amqper.asRpcServer(id, rpcPublic, function(rpcSvr) { - log.info('portal initializing as rpc server ok'); - amqper.asMonitor(function (data) { - if (data.reason === 'abnormal' || data.reason === 'error' || data.reason === 'quit') { - if (portal !== undefined) { - if (data.message.purpose === 'conference') { - return portal.getParticipantsByController(data.message.type, data.message.id) - .then(function (impactedParticipants) { - impactedParticipants.forEach(function(participantId) { - log.error('Fault on conference controller(type:', data.message.type, 'id:', data.message.id, ') of participant', participantId, 'was detected, drop it.'); - socketio_server && socketio_server.drop(participantId); + amqper.connect(config.rabbit, function () { + amqper.asRpcClient(function(rpcClnt) { + rpcClient = rpcClnt; + log.info('portal initializing as rpc client ok'); + joinCluster(function(id) { + log.info('portal join cluster ok, with rpcID:', id); + amqper.asRpcServer(id, rpcPublic, function(rpcSvr) { + log.info('portal initializing as rpc server ok'); + amqper.asMonitor(function (data) { + if (data.reason === 'abnormal' || data.reason === 'error' || data.reason === 'quit') { + if (portal !== undefined) { + if (data.message.purpose === 'conference') { + return portal.getParticipantsByController(data.message.type, data.message.id) + .then(function (impactedParticipants) { + impactedParticipants.forEach(function(participantId) { + log.error('Fault on conference controller(type:', data.message.type, 'id:', data.message.id, ') of participant', participantId, 'was detected, drop it.'); + socketio_server && socketio_server.drop(participantId); + }); }); - }); + } } } - } - }, function (monitor) { - log.info(id + ' as monitor ready'); - getTokenKey(id, function(tokenKey) { - startServers(id, tokenKey); - }, function() { - log.error('portal getting token failed.'); + }, function (monitor) { + log.info(id + ' as monitor ready'); + getTokenKey(id, function(tokenKey) { + startServers(id, tokenKey); + }, function() { + log.error('portal getting token failed.'); + stopServers(); + process.exit(); + }); + }, function(reason) { + log.error('portal initializing as monitor failed, reason:', reason); stopServers(); process.exit(); }); - }, function(reason) { - log.error('portal initializing as rpc client failed, reason:', reason); - stopServers(); - process.exit(); - }); - }, function(reason) { - log.error('portal initializing as rpc client failed, reason:', reason); - stopServers(); - process.exit(); + }, function(reason) { + log.error('portal initializing as rpc server failed, reason:', reason); + stopServers(); + process.exit(); + }); }); + }, function(reason) { + log.error('portal initializing as rpc client failed, reason:', reason); + stopServers(); + process.exit(); }); }, function(reason) { - log.error('portal initializing as rpc client failed, reason:', reason); - stopServers(); - process.exit(); + log.error('portal connect to rabbitMQ server failed, reason:', reason); + process.exit(); }); -}, function(reason) { - log.error('portal connect to rabbitMQ server failed, reason:', reason); - process.exit(); -}); -['SIGINT', 'SIGTERM'].map(function (sig) { - process.on(sig, async function () { - log.warn('Exiting on', sig); - stopServers(); - amqper.disconnect(); - process.exit(); + log.info(`Worker ${process.pid} started`); + + ['SIGINT', 'SIGTERM'].map(function (sig) { + process.on(sig, async function () { + log.warn('Worker exiting on', sig); + stopServers(); + amqper.disconnect(); + process.exit(); + }); }); -}); -process.on('SIGPIPE', function () { - log.warn('SIGPIPE!!'); -}); + process.on('SIGPIPE', function () { + log.warn('SIGPIPE!!'); + }); -process.on('exit', function () { - log.info('Process exit'); -}); + process.on('exit', function () { + log.info('Process exit'); + }); -process.on('unhandledRejection', (reason) => { + process.on('unhandledRejection', (reason) => { log.info('Reason: ' + reason); -}); + }); -process.on('SIGUSR2', function () { - logger.reconfigure(); -}); + process.on('SIGUSR2', function() { + logger.reconfigure(); + }); +} diff --git a/source/portal/package.json b/source/portal/package.json index 8898289d7..2ce6feea8 100644 --- a/source/portal/package.json +++ b/source/portal/package.json @@ -4,6 +4,8 @@ "dependencies": { "amqplib": "^0.7.0", "socket.io": "^3.1.1", + "@socket.io/sticky": "*", + "@socket.io/cluster-adapter": "*", "log4js": "^1.1.1", "toml": "*", "sprintf-js": "^1.0.3", diff --git a/source/portal/socketIOServer.js b/source/portal/socketIOServer.js index e65f5417f..d8911c77a 100644 --- a/source/portal/socketIOServer.js +++ b/source/portal/socketIOServer.js @@ -369,6 +369,8 @@ var SocketIOServer = function(spec, portal, observer) { var that = {}; var io; var clients = {}; + const { setupMaster, setupWorker } = require("@socket.io/sticky"); + const { createAdapter, setupPrimary } = require("@socket.io/cluster-adapter"); // A Socket.IO server has a unique reconnection key. Client cannot reconnect to another Socket.IO server in the cluster. var reconnection_key = require('crypto').randomBytes(64).toString('hex'); var sioOptions = {}; @@ -391,7 +393,7 @@ var SocketIOServer = function(spec, portal, observer) { } var startInsecure = function(port) { - var server = require('http').createServer().listen(port); + var server = require('http').createServer();//.listen(port); io = require('socket.io')(server, sioOptions); run(); return Promise.resolve('ok'); @@ -408,7 +410,7 @@ var SocketIOServer = function(spec, portal, observer) { var constants = require('constants'); option.secureOptions = (constants.SSL_OP_NO_TLSv1 | constants.SSL_OP_NO_TLSv1_1); } - var server = require('https').createServer(option).listen(port); + var server = require('https').createServer(option);//.listen(port); io = require('socket.io')(server, sioOptions); run(); resolve('ok'); @@ -470,6 +472,11 @@ var SocketIOServer = function(spec, portal, observer) { io = undefined; }; + that.setupWorkerAdapter = function() { + io && io.adapter(createAdapter()); + setupWorker(io); + }; + that.notify = function(participantId, event, data) { log.debug('notify participant:', participantId, 'event:', event, 'data:', data); if (clients[participantId]) {