diff --git a/src/fabric/createFabricClient.js b/src/fabric/createFabricClient.js deleted file mode 100644 index c16f69d..0000000 --- a/src/fabric/createFabricClient.js +++ /dev/null @@ -1,73 +0,0 @@ -const loadOrdererCert = require('./loadOrdererCert'); -const loadPeerCert = require('./loadPeerCert'); -const FabricClient = require('fabric-client'); -const {WALLET_PATH} = require('../../constants/fabric'); -const logger = require('../logging/logger').getLogger('fabric/createFabricClient'); - -module.exports = function createFabricClient({ - peers = [], - orderer = undefined, - channelId -}) { - const options = { - walletPath: WALLET_PATH, - channelId, - ordererUrl: orderer ? orderer.url : undefined - }; - - const fabricClient = new FabricClient(); - const channel = fabricClient.newChannel(channelId); - - const registerPeersCertOnChannel = () => Promise.all(peers.map((peer) => { - return new Promise((resolve, reject) => { - loadPeerCert(peer) - .then((certOptions) => { - channel.addPeer(fabricClient.newPeer(peer.url, certOptions)); - resolve(); - }) - .catch(reject); - }); - })); - - const registerOrdererCertOnChannel = () => new Promise((resolve, reject) => { - if (orderer) { - loadOrdererCert(orderer) - .then((certOptions) => { - channel.addOrderer(fabricClient.newOrderer(options.ordererUrl, certOptions)); - resolve(); - }) - .catch(reject); - } else { - resolve(); - } - }); - - return new Promise((resolve, reject) => { - Promise.resolve() - .then(registerPeersCertOnChannel) - .then(registerOrdererCertOnChannel) - .then(() => { - logger.info('Create a fabric client and set the wallet location'); - return FabricClient.newDefaultKeyValueStore({path: options.walletPath}); - }) - .then((stateStore) => { - logger.info('Set fabric client crypto suite'); - - // assign the store to the fabric client - fabricClient.setStateStore(stateStore); - const cryptoSuite = FabricClient.newCryptoSuite(); - // use the same location for the state store (where the users' certificate are kept) - // and the crypto store (where the users' keys are kept) - const cryptoStore = FabricClient.newCryptoKeyStore({path: options.walletPath}); - cryptoSuite.setCryptoKeyStore(cryptoStore); - fabricClient.setCryptoSuite(cryptoSuite); - - logger.info('Fabric client initialized'); - resolve({fabricClient, channel}); - }) - .catch((err) => { - logger.error(`Failed to initialize channel: ${err.message}`); - reject(err); - }); - }); -}; diff --git a/src/fabric/loadOrdererCert.js b/src/fabric/loadOrdererCert.js deleted file mode 100644 index efa370c..0000000 --- a/src/fabric/loadOrdererCert.js +++ /dev/null @@ -1,24 +0,0 @@ -const fs = require('fs'); -const {CRYPTO_PATH} = require('../../constants/fabric'); -const logger = require('../logging/logger').getLogger('fabric/loadOrdererCert'); - -module.exports = function loadOrdererCert(orderer) { - return new Promise((resolve, reject) => { - logger.info(`Loading orderer certificate for ${orderer.cn}`); - if (orderer.url.indexOf('grpc://') === 0) { - logger.info('Oderer is running without TLS'); - resolve(); - return; - } - fs.readFile(`${CRYPTO_PATH}/${orderer.org}/orderers/${orderer.cn}/tlsca.combined.${orderer.cn}-cert.pem`, 'utf8', (err, ordererCert) => { - if (err) { - reject(err); - return; - } - resolve({ - pem: ordererCert, - 'ssl-target-name-override': orderer.cn - }); - }); - }); -}; diff --git a/src/fabric/loadPeerCert.js b/src/fabric/loadPeerCert.js deleted file mode 100644 index 903d9d0..0000000 --- a/src/fabric/loadPeerCert.js +++ /dev/null @@ -1,24 +0,0 @@ -const fs = require('fs'); -const {CRYPTO_PATH} = require('../../constants/fabric'); -const logger = require('../logging/logger').getLogger('fabric/loadPeerCert'); - -module.exports = function loadPeerCert(peer) { - return new Promise((resolve, reject) => { - logger.info(`Loading peer certificate for ${peer.cn}`); - if (peer.url.indexOf('grpc://') === 0) { - logger.info('Peer is running without TLS'); - resolve(); - return; - } - fs.readFile(`${CRYPTO_PATH}/${peer.org}/peers/${peer.cn}/tlsca.combined.${peer.cn}-cert.pem`, 'utf8', (err, peerCert) => { - if (err) { - reject(err); - return; - } - resolve({ - pem: peerCert, - 'ssl-target-name-override': peer.cn - }); - }); - }); -}; diff --git a/src/index.js b/src/index.js index 700cff1..976243d 100644 --- a/src/index.js +++ b/src/index.js @@ -1,7 +1,13 @@ -const invoke = require('./invoke'); -const query = require('./query'); +const baseService = require('./lib/baseService'); +const createFabricClient = require('./lib/createFabricClient'); +const invoke = require('./lib/invoke'); +const query = require('./lib/query'); +const registerChaincodeEventListener = require('./lib/registerChaincodeEventListener'); module.exports = { + baseService, + createFabricClient, invoke, - query + query, + registerChaincodeEventListener }; diff --git a/src/services/baseService.js b/src/lib/baseService.js similarity index 95% rename from src/services/baseService.js rename to src/lib/baseService.js index 14fbb74..a547737 100644 --- a/src/services/baseService.js +++ b/src/lib/baseService.js @@ -1,5 +1,5 @@ -const invoke = require('../invoke'); -const query = require('../query'); +const invoke = require('./invoke'); +const query = require('./query'); const logger = require('../logging/logger').getLogger('services/baseService'); module.exports = ( diff --git a/src/lib/createFabricClient.js b/src/lib/createFabricClient.js new file mode 100644 index 0000000..bcb09a5 --- /dev/null +++ b/src/lib/createFabricClient.js @@ -0,0 +1,33 @@ +const FabricClient = require('fabric-client'); +const logger = require('../logging/logger').getLogger('lib/createFabricClient'); + +module.exports = function createFabricClient(keyStorePath) { + const fabricClient = new FabricClient(); + + return new Promise((resolve, reject) => { + Promise.resolve() + .then(() => { + logger.info('Create a fabric client and set the keystore location'); + return FabricClient.newDefaultKeyValueStore({path: keyStorePath}); + }) + .then((stateStore) => { + logger.info('Set fabric client crypto suite'); + + // assign the store to the fabric client + fabricClient.setStateStore(stateStore); + const cryptoSuite = FabricClient.newCryptoSuite(); + // use the same location for the state store (where the users' certificate are kept) + // and the crypto store (where the users' keys are kept) + const cryptoStore = FabricClient.newCryptoKeyStore({path: keyStorePath}); + cryptoSuite.setCryptoKeyStore(cryptoStore); + fabricClient.setCryptoSuite(cryptoSuite); + + logger.info('Fabric client initialized'); + resolve(fabricClient); + }) + .catch((err) => { + logger.error(`Failed to initialize fabric client: ${err.message}`); + reject(err); + }); + }); +}; diff --git a/src/invoke.js b/src/lib/invoke.js similarity index 62% rename from src/invoke.js rename to src/lib/invoke.js index df67c57..ca57a8c 100644 --- a/src/invoke.js +++ b/src/lib/invoke.js @@ -1,16 +1,19 @@ const util = require('util'); -const loadPeerCert = require('./fabric/loadPeerCert'); -const setUserContext = require('./fabric/setUserContext'); -const createFabricClient = require('./fabric/createFabricClient'); -const serializeArg = require('./utils/serializeArg'); -const parseErrorMessage = require('./fabric/parseErrorMessage'); -const logger = require('./logging/logger').getLogger('invoke'); const dropRightWhile = require('lodash.droprightwhile'); +const {URL} = require('url'); +const loadCert = require('../utils/loadCert'); +const setUserContext = require('../utils/setUserContext'); +const serializeArg = require('../utils/serializeArg'); +const parseErrorMessage = require('../utils/parseErrorMessage'); +const logger = require('../logging/logger').getLogger('lib/invoke'); +const isGrpcs = require('../utils/isGrpcs'); +const createChannel = require('../utils/createChannel'); const MAX_RETRIES_EVENT_HUB = 5; const MAX_TIMEOUT = 30000; module.exports = function invoke({ + fabricClient, chaincode, channelId, peers = [], @@ -20,7 +23,8 @@ module.exports = function invoke({ }) { const peersMap = {}; (peers || []).forEach((peer) => { - peersMap[peer.cn] = peer; + const peerUrl = new URL(peer.url); + peersMap[peerUrl.host.toLowerCase()] = peer; }); const uniquePeers = Object.values(peersMap); @@ -30,13 +34,16 @@ module.exports = function invoke({ return new Promise((resolve, reject) => { let txId = null; - let fabricClient = null; let channel = null; let transactionProposalResponse = null; Promise.resolve() - .then(() => createFabricClient({peers: uniquePeers, orderer, channelId})) - .then(({fabricClient: _fabricClient, channel: _channel}) => { - fabricClient = _fabricClient; + .then(createChannel({ + fabricClient, + channelId, + peers, + orderer + })) + .then((_channel) => { channel = _channel; }) .then(() => setUserContext(fabricClient, userId)) @@ -54,7 +61,9 @@ module.exports = function invoke({ }; if (uniquePeers && uniquePeers.length > 0) { - logger.info(`Sending transaction proposal to following endorser peers: ${uniquePeers.map((uniquePeer) => uniquePeer.url).join(', ')}`); + logger.info(`Sending transaction proposal to following endorser peers: ${uniquePeers + .map((uniquePeer) => uniquePeer.url) + .join(', ')}`); } // send the transaction proposal to the peers @@ -88,10 +97,11 @@ module.exports = function invoke({ if (isProposalGood) { const peerForListening = uniquePeers[0]; - return loadPeerCert(peerForListening).then((peerCertOptions) => { + const waitForTransactionCompleted = (peerCertOptions) => { logger.info(util.format( 'Successfully sent Proposal and received ProposalResponse: Status - %s, message - "%s"', - proposalResponses[0].response.status, proposalResponses[0].response.message + proposalResponses[0].response.status, + proposalResponses[0].response.message )); // build up the request for the orderer to have the transaction committed @@ -126,38 +136,45 @@ module.exports = function invoke({ .then(() => { const handle = setTimeout(() => { eventHub.disconnect(); - txPromiseReject(new Error('Transaction did not complete within the allowed time')); + const err = new Error('Transaction did not complete within the allowed time'); + txPromiseReject(err); }, maxTimeout); let retries = 0; const startListening = () => { eventHub.connect(); - eventHub.registerTxEvent(transactionIdString, (tx, code) => { - // this is the callback for transaction event status - // first some clean up of event listener - clearTimeout(handle); - eventHub.unregisterTxEvent(transactionIdString); - eventHub.disconnect(); - - // now let the application know what happened - const returnStatus = {event_status: code, tx_id: transactionIdString}; - if (code !== 'VALID') { - logger.error(`The transaction was invalid, code = ${code}`); - txPromiseReject(new Error(returnStatus)); - } else { - // eslint-disable-next-line no-underscore-dangle - logger.info(`The transaction has been committed on peer ${eventHub._ep._endpoint.addr}`); - txPromiseResolve(returnStatus); + eventHub.registerTxEvent( + transactionIdString, + (tx, code) => { + // this is the callback for transaction event status + // first some clean up of event listener + clearTimeout(handle); + eventHub.unregisterTxEvent(transactionIdString); + eventHub.disconnect(); + + // now let the application know what happened + const returnStatus = {event_status: code, tx_id: transactionIdString}; + if (code !== 'VALID') { + logger.error(`The transaction was invalid, code = ${code}`); + txPromiseReject(new Error(returnStatus)); + } else { + logger.info(`The transaction has been committed on peer ${ + // eslint-disable-next-line no-underscore-dangle + eventHub._ep._endpoint.addr + }`); + txPromiseResolve(returnStatus); + } + }, + (err) => { + // this is the callback if something goes wrong with the event registration or processing + if (retries >= MAX_RETRIES_EVENT_HUB) { + logger.info(`The event hub was disconnected, retrying (attempt: ${retries})`); + setTimeout(startListening, 0); + } else { + txPromiseReject(new Error(`There was a problem with the eventhub: ${err} `)); + } + retries += 1; } - }, (err) => { - // this is the callback if something goes wrong with the event registration or processing - if (retries >= MAX_RETRIES_EVENT_HUB) { - logger.info(`The event hub was disconnected, retrying (attempt: ${retries})`); - setTimeout(startListening, 0); - } else { - txPromiseReject(new Error(`There was a problem with the eventhub: ${err} `)); - } - retries += 1; - }); + ); }; startListening(); }) @@ -167,12 +184,18 @@ module.exports = function invoke({ promises.push(txPromise); return Promise.all(promises); - }); + }; + if (!isGrpcs(peerForListening.url)) { + return waitForTransactionCompleted(); + } + const {certPath, certOptions} = peerForListening; + return loadCert(certPath, certOptions).then(waitForTransactionCompleted); } throw proposalError || - new Error(transactionProposalResponse || 'Failed to send Proposal or receive valid response. ' + - 'Response null or status is not 200. exiting...'); + new Error(transactionProposalResponse || + 'Failed to send Proposal or receive valid response. ' + + 'Response null or status is not 200. exiting...'); }) .then((results) => { logger.info('Send transaction promise and event listener promise have completed'); @@ -184,16 +207,18 @@ module.exports = function invoke({ logger.info('Successfully sent transaction to the orderer.'); transactionSucceeded = true; } else { - logger.error(`Failed to order the transaction.Error code: ${results.status} `); - errors.push(`Failed to order the transaction.Error code: ${results.status} `); + const message = `Failed to order the transaction.Error code: ${results.status}`; + logger.error(message); + errors.push(message); } if (results && results[1] && results[1].event_status === 'VALID') { logger.info('Successfully committed the change to the ledger by the peer'); commitSucceeded = true; } else { - logger.info(`Transaction failed to be committed to the ledger due to: ${results[1].event_status}.`); - errors.push(`Transaction failed to be committed to the ledger due to: ${results[1].event_status}.`); + const message = `Transaction failed to be committed to the ledger due to: ${results[1].event_status}`; + logger.info(message); + errors.push(message); } if (transactionSucceeded && commitSucceeded) { diff --git a/src/query.js b/src/lib/query.js similarity index 72% rename from src/query.js rename to src/lib/query.js index c2132ab..f5d88f0 100644 --- a/src/query.js +++ b/src/lib/query.js @@ -1,24 +1,24 @@ -const setUserContext = require('./fabric/setUserContext'); -const createFabricClient = require('./fabric/createFabricClient'); -const serializeArg = require('./utils/serializeArg'); -const logger = require('./logging/logger').getLogger('query'); const dropRightWhile = require('lodash.droprightwhile'); -const parseErrorMessage = require('./fabric/parseErrorMessage'); +const setUserContext = require('../utils/setUserContext'); +const createChannel = require('../utils/createChannel'); +const serializeArg = require('../utils/serializeArg'); +const logger = require('../logging/logger').getLogger('lib/query'); +const parseErrorMessage = require('../utils/parseErrorMessage'); module.exports = function query({ - chaincode, - channelId, - peer, - userId + fabricClient, chaincode, channelId, peer, userId }) { return new Promise((resolve, reject) => { - let fabricClient = null; let channel = null; Promise.resolve() - .then(() => createFabricClient({peers: [peer], channelId})) - .then(({fabricClient: _fabricClient, channel: _channel}) => { - fabricClient = _fabricClient; + .then(() => + createChannel({ + fabricClient, + channelId, + peers: [peer] + })) + .then((_channel) => { channel = _channel; }) .then(() => setUserContext(fabricClient, userId)) @@ -26,7 +26,9 @@ module.exports = function query({ const request = { chaincodeId: chaincode.id, fcn: chaincode.fcn, - args: chaincode.args ? dropRightWhile(chaincode.args.map(serializeArg), (arg) => typeof arg === 'undefined') : [] + args: chaincode.args + ? dropRightWhile(chaincode.args.map(serializeArg), (arg) => typeof arg === 'undefined') + : [] }; // send the query proposal to the peer diff --git a/src/fabric/registerChaincodeEventListener.js b/src/lib/registerChaincodeEventListener.js similarity index 57% rename from src/fabric/registerChaincodeEventListener.js rename to src/lib/registerChaincodeEventListener.js index 5d0b5f6..639e396 100644 --- a/src/fabric/registerChaincodeEventListener.js +++ b/src/lib/registerChaincodeEventListener.js @@ -1,25 +1,31 @@ -const createFabricClient = require('./createFabricClient'); -const setUserContext = require('./setUserContext'); -const loadPeerCert = require('./loadPeerCert'); -const logger = require('../logging/logger').getLogger('fabric/registerChaincodeEventListener'); - -function setupEventHub(peer, orderer, channelId, callback) { - let fabricClient = null; +const setUserContext = require('../utils/setUserContext'); +const loadCert = require('../utils/loadCert'); +const logger = require('../logging/logger').getLogger('lib/registerChaincodeEventListener'); +const isGrpcs = require('../utils/isGrpcs'); +const createChannel = require('../utils/createChannel'); +function setupEventHub(fabricClient, peer, orderer, channelId, callback) { return Promise.resolve() - .then(() => createFabricClient({peers: [peer], orderer, channelId})) - .then(({fabricClient: _fabricClient}) => { - fabricClient = _fabricClient; + .then(() => + createChannel({ + fabricClient, + channelId, + peers: [peer], + orderer + })) + .then(() => { + if (!isGrpcs(peer.url)) { + return Promise.resolve(); + } + return loadCert(peer.certPath, peer.certOptions); }) - .then(() => loadPeerCert(peer)) .then((peerCertOptions) => { const eventHub = fabricClient.newEventHub(); eventHub.setPeerAddr(peer.broadcastUrl, peerCertOptions); - return setUserContext(fabricClient, peer.adminUserId) - .then(() => { - return eventHub; - }); + return setUserContext(fabricClient, peer.adminUserId).then(() => { + return eventHub; + }); }) .then(callback) .catch((error) => { @@ -29,9 +35,10 @@ function setupEventHub(peer, orderer, channelId, callback) { } module.exports = function registerChaincodeEventListener({ + fabricClient, peer, orderer, - channel, + channelId, chaincode, eventId, onEvent, @@ -42,7 +49,7 @@ module.exports = function registerChaincodeEventListener({ let eventHubMaster = null; // Listening for requests and keep requests in sync logger.info(`Setting up event hub for ${eventId} on ${chaincode}`); - const eventHubPromise = setupEventHub(peer, orderer, channel, (eventHub) => { + const eventHubPromise = setupEventHub(fabricClient, peer, orderer, channelId, (eventHub) => { eventHubMaster = eventHub; const startListening = () => { logger.info(`Start listening for ${eventId} on ${chaincode}`); @@ -58,13 +65,16 @@ module.exports = function registerChaincodeEventListener({ logger.info(`Event payload ${JSON.stringify(payload)}`); onEvent(payload, eventId); } - }, (error) => { + }, + (error) => { logger.warn(`Private Eventhub disconnected, trying to reconnect ${error}`); - Promise.resolve().then(() => { - return typeof onDisconnect === 'function' ? onDisconnect(error, eventId) : undefined; - }).then(() => { - setTimeout(startListening, timeoutForReconnect); - }); + Promise.resolve() + .then(() => { + return typeof onDisconnect === 'function' ? onDisconnect(error, eventId) : undefined; + }) + .then(() => { + setTimeout(startListening, timeoutForReconnect); + }); } ); }; diff --git a/src/utils/createChannel.js b/src/utils/createChannel.js new file mode 100644 index 0000000..e61c120 --- /dev/null +++ b/src/utils/createChannel.js @@ -0,0 +1,62 @@ +const loadCert = require('../utils/loadCert'); +const logger = require('../logging/logger').getLogger('utils/createChannel'); +const isGrpcs = require('../utils/isGrpcs'); + +module.exports = function createChannel({ + fabricClient, + channelId, + peers = [], + orderer = undefined +}) { + const ordererUrl = orderer ? orderer.url : undefined; + + const channel = fabricClient.newChannel(channelId); + + const registerPeersCertOnChannel = () => Promise.all(peers.map((peer) => { + return new Promise((resolve, reject) => { + if (!isGrpcs(peer.url)) { + channel.addPeer(fabricClient.newPeer(peer.url)); + resolve(); + return; + } + loadCert(peer.certPath, peer.certOptions) + .then((certOptions) => { + channel.addPeer(fabricClient.newPeer(peer.url, certOptions)); + resolve(); + }) + .catch(reject); + }); + })); + + const registerOrdererCertOnChannel = () => new Promise((resolve, reject) => { + if (orderer) { + if (!isGrpcs(orderer.url)) { + channel.addOrderer(fabricClient.newOrderer(ordererUrl)); + resolve(); + return; + } + loadCert(orderer.certPath, orderer.certOptions) + .then((certOptions) => { + channel.addOrderer(fabricClient.newOrderer(ordererUrl, certOptions)); + resolve(); + }) + .catch(reject); + } else { + resolve(); + } + }); + + return new Promise((resolve, reject) => { + Promise.resolve() + .then(registerPeersCertOnChannel) + .then(registerOrdererCertOnChannel) + .then(() => { + logger.info(`Channel ${channelId} initialized`); + resolve(channel); + }) + .catch((err) => { + logger.error(`Failed to initialize channel ${channelId}: ${err.message}`); + reject(err); + }); + }); +}; diff --git a/src/fabric/createPeers.js b/src/utils/createPeers.js similarity index 56% rename from src/fabric/createPeers.js rename to src/utils/createPeers.js index 9055047..5802547 100644 --- a/src/fabric/createPeers.js +++ b/src/utils/createPeers.js @@ -1,11 +1,17 @@ -const loadPeerCert = require('./loadPeerCert'); +const loadCert = require('./loadCert'); +const isGrpcs = require('../utils/isGrpcs'); module.exports = function createPeers(fabricClient, peers) { const createPeerPromises = []; if (Array.isArray(peers)) { peers.forEach((peer) => { const createPeerPromise = Promise.resolve() - .then(() => loadPeerCert(peer)) + .then(() => { + if (!isGrpcs(peer.url)) { + return Promise.resolve(); + } + return loadCert(peer.certPath, peer.certOptions); + }) .then((certOptions) => fabricClient.newPeer(peer.url, certOptions)); createPeerPromises.push(createPeerPromise); }); diff --git a/src/utils/isGrpcs.js b/src/utils/isGrpcs.js new file mode 100644 index 0000000..8f1252f --- /dev/null +++ b/src/utils/isGrpcs.js @@ -0,0 +1,6 @@ +module.exports = function isGrpcs(url) { + if (url.toLowerCase().indexOf('grpc://') === 0) { + return false; + } + return true; +}; diff --git a/src/utils/loadCert.js b/src/utils/loadCert.js new file mode 100644 index 0000000..7505589 --- /dev/null +++ b/src/utils/loadCert.js @@ -0,0 +1,18 @@ +const fs = require('fs'); +const logger = require('../logging/logger').getLogger('fabric/loadCert'); + +module.exports = function loadCert(certPath, certOptions = {}) { + return new Promise((resolve, reject) => { + logger.info(`Loading certificate for path: ${certPath}`); + fs.readFile(certPath, 'utf8', (err, ordererCert) => { + if (err) { + reject(err); + return; + } + resolve({ + ...certOptions, + pem: ordererCert + }); + }); + }); +}; diff --git a/src/fabric/parseErrorMessage.js b/src/utils/parseErrorMessage.js similarity index 100% rename from src/fabric/parseErrorMessage.js rename to src/utils/parseErrorMessage.js diff --git a/src/fabric/setUserContext.js b/src/utils/setUserContext.js similarity index 87% rename from src/fabric/setUserContext.js rename to src/utils/setUserContext.js index a47651d..564d9d8 100644 --- a/src/fabric/setUserContext.js +++ b/src/utils/setUserContext.js @@ -10,7 +10,7 @@ module.exports = function setUserContext(fabricClient, userId) { if (userFromStore && userFromStore.isEnrolled()) { logger.info(`Successfully loaded ${userId} from persistence`); } else { - throw new Error(`Failed to get ${userId}.... run registerUser.js`); + throw new Error(`Unable to load ${userId} as it's not been registered`); } return userFromStore;