From 2728f6f78adb9f48430763f9cdcba9a3efcc2632 Mon Sep 17 00:00:00 2001 From: Jim Zhang Date: Tue, 8 Mar 2022 13:50:25 -0500 Subject: [PATCH 1/5] Change chaincode Query endpoint to only use the first peer Signed-off-by: Jim Zhang --- internal/fabric/client/client_ccp.go | 15 ++-- internal/fabric/client/client_common.go | 53 +++++++++++++ .../client/client_gateway_clientside.go | 75 ++++++++++++++++--- internal/fabric/client/rpc_test.go | 29 ++++++- 4 files changed, 152 insertions(+), 20 deletions(-) create mode 100644 internal/fabric/client/client_common.go diff --git a/internal/fabric/client/client_ccp.go b/internal/fabric/client/client_ccp.go index 1ba2253..18bd683 100644 --- a/internal/fabric/client/client_ccp.go +++ b/internal/fabric/client/client_ccp.go @@ -41,11 +41,9 @@ type ccpClientWrapper struct { signer *msp.IdentityIdentifier } -// defined to allow mocking in tests -type channelCreator func(context.ChannelProvider) (*channel.Client, error) - type ccpRPCWrapper struct { txTimeout int + configProvider core.ConfigProvider sdk *fabsdk.FabricSDK cryptoSuiteConfig core.CryptoSuiteConfig userStore msp.UserStore @@ -72,6 +70,7 @@ func newRPCClientFromCCP(configProvider core.ConfigProvider, txTimeout int, user log.Infof("New gRPC connection established") w := &ccpRPCWrapper{ sdk: ledgerClientWrapper.sdk, + configProvider: configProvider, cryptoSuiteConfig: cryptoConfig, userStore: userStore, idClient: idClient, @@ -106,6 +105,11 @@ func (w *ccpRPCWrapper) Query(channelId, signer, chaincodeName, method string, a return nil, errors.Errorf("Failed to get channel client. %s", err) } + peerEndpoint, err := getFirstPeerEndpointFromConfig(w.configProvider) + if err != nil { + return nil, err + } + result, err := client.channelClient.Query( channel.Request{ ChaincodeID: chaincodeName, @@ -113,6 +117,7 @@ func (w *ccpRPCWrapper) Query(channelId, signer, chaincodeName, method string, a Args: convert(args), }, channel.WithRetry(retry.DefaultChannelOpts), + channel.WithTargetEndpoints(peerEndpoint), ) if err != nil { log.Errorf("Failed to send query [%s:%s:%s]. %s", channelId, chaincodeName, method, err) @@ -212,10 +217,6 @@ func (w *ccpRPCWrapper) Close() error { return nil } -func createChannelClient(channelProvider context.ChannelProvider) (*channel.Client, error) { - return channel.New(channelProvider) -} - func (w *ccpRPCWrapper) sendTransaction(channelId, signer, chaincodeName, method string, args []string, isInit bool) (*msp.IdentityIdentifier, []byte, *fab.TxStatusEvent, error) { client, err := w.getChannelClient(channelId, signer) if err != nil { diff --git a/internal/fabric/client/client_common.go b/internal/fabric/client/client_common.go new file mode 100644 index 0000000..df070fc --- /dev/null +++ b/internal/fabric/client/client_common.go @@ -0,0 +1,53 @@ +package client + +import ( + "fmt" + + "github.com/hyperledger/fabric-sdk-go/pkg/client/channel" + "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/context" + "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/core" + "github.com/hyperledger/firefly-fabconnect/internal/errors" +) + +func getOrgFromConfig(config core.ConfigProvider) (string, error) { + configBackend, err := config() + if err != nil { + return "", err + } + if len(configBackend) != 1 { + return "", errors.Errorf("Invalid config file") + } + + cfg := configBackend[0] + value, ok := cfg.Lookup("client.organization") + if !ok { + return "", errors.Errorf("No client organization defined in the config") + } + + return value.(string), nil +} + +func getFirstPeerEndpointFromConfig(config core.ConfigProvider) (string, error) { + org, err := getOrgFromConfig(config) + if err != nil { + return "", err + } + configBackend, _ := config() + cfg := configBackend[0] + value, ok := cfg.Lookup(fmt.Sprintf("organizations.%s.peers", org)) + if !ok { + return "", errors.Errorf("No peers list found in the organization %s", org) + } + peers := value.([]interface{}) + if len(peers) < 1 { + return "", errors.Errorf("Peers list for organization %s is empty", org) + } + return peers[0].(string), nil +} + +// defined to allow mocking in tests +type channelCreator func(context.ChannelProvider) (*channel.Client, error) + +func createChannelClient(channelProvider context.ChannelProvider) (*channel.Client, error) { + return channel.New(channelProvider) +} diff --git a/internal/fabric/client/client_gateway_clientside.go b/internal/fabric/client/client_gateway_clientside.go index a9543fa..3bb9348 100644 --- a/internal/fabric/client/client_gateway_clientside.go +++ b/internal/fabric/client/client_gateway_clientside.go @@ -20,8 +20,11 @@ import ( "context" "time" + "github.com/hyperledger/fabric-sdk-go/pkg/client/channel" + "github.com/hyperledger/fabric-sdk-go/pkg/common/errors/retry" "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/core" "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/fab" + "github.com/hyperledger/fabric-sdk-go/pkg/fabsdk" "github.com/hyperledger/fabric-sdk-go/pkg/gateway" "github.com/hyperledger/firefly-fabconnect/internal/errors" eventsapi "github.com/hyperledger/firefly-fabconnect/internal/events/api" @@ -41,11 +44,14 @@ type gwRPCWrapper struct { eventClientWrapper *eventClientWrapper gatewayCreator gatewayCreator networkCreator networkCreator + channelCreator channelCreator // networkCreator networkC // one gateway client per signer gwClients map[string]*gateway.Gateway // one gateway network per signer per channel - gwChannelClients map[string]map[string]*gateway.Network + gwGatewayClients map[string]map[string]*gateway.Network + // one channel client per signer per channel + gwChannelClients map[string]map[string]*channel.Client } func newRPCClientWithClientSideGateway(configProvider core.ConfigProvider, txTimeout int, idClient IdentityClient, ledgerClientWrapper *ledgerClientWrapper, eventClientWrapper *eventClientWrapper) (RPCClient, error) { @@ -57,8 +63,10 @@ func newRPCClientWithClientSideGateway(configProvider core.ConfigProvider, txTim eventClientWrapper: eventClientWrapper, gatewayCreator: createGateway, networkCreator: getNetwork, + channelCreator: createChannelClient, gwClients: make(map[string]*gateway.Gateway), - gwChannelClients: make(map[string]map[string]*gateway.Network), + gwGatewayClients: make(map[string]map[string]*gateway.Network), + gwChannelClients: make(map[string]map[string]*channel.Client), }, nil } @@ -87,15 +95,28 @@ func (w *gwRPCWrapper) Query(channelId, signer, chaincodeName, method string, ar return nil, errors.Errorf("Failed to get channel client. %s", err) } - contractClient := client.GetContract(chaincodeName) - result, err := contractClient.EvaluateTransaction(method, args...) + peerEndpoint, err := getFirstPeerEndpointFromConfig(w.configProvider) + if err != nil { + return nil, err + } + + bytes := make([][]byte, len(args)) + for i, v := range args { + bytes[i] = []byte(v) + } + req := channel.Request{ + ChaincodeID: chaincodeName, + Fcn: method, + Args: bytes, + } + result, err := client.Query(req, channel.WithRetry(retry.DefaultChannelOpts), channel.WithTargetEndpoints(peerEndpoint)) if err != nil { log.Errorf("Failed to send query [%s:%s:%s]. %s", channelId, chaincodeName, method, err) return nil, err } log.Tracef("RPC [%s:%s:%s] <-- %+v", channelId, chaincodeName, method, result) - return result, nil + return result.Payload, nil } func (w *gwRPCWrapper) QueryChainInfo(channelId, signer string) (*fab.BlockchainInfoResponse, error) { @@ -159,7 +180,7 @@ func (w *gwRPCWrapper) Close() error { } func (w *gwRPCWrapper) sendTransaction(signer, channelId, chaincodeName, method string, args []string, isInit bool) ([]byte, *fab.TxStatusEvent, error) { - channelClient, err := w.getChannelClient(signer, channelId) + channelClient, err := w.getGatewayClient(signer, channelId) if err != nil { return nil, nil, err } @@ -185,9 +206,11 @@ func (w *gwRPCWrapper) sendTransaction(signer, channelId, chaincodeName, method } } -func (w *gwRPCWrapper) getChannelClient(channelId, signer string) (channelClient *gateway.Network, err error) { - channelClientsForSigner := w.gwChannelClients[signer] - if channelClientsForSigner == nil { +// channel clients for transactions are created with the gateway API, so that the internal handling of using +// the discovery service and selecting the right set of endorsers are automated +func (w *gwRPCWrapper) getGatewayClient(channelId, signer string) (gatewayClient *gateway.Network, err error) { + gatewayClientsForSigner := w.gwGatewayClients[signer] + if gatewayClientsForSigner == nil { // no channel clients have been created for this signer at all // we will not have created a gateway client for this user either gatewayClient, err := w.gatewayCreator(w.configProvider, signer, w.txTimeout) @@ -195,17 +218,45 @@ func (w *gwRPCWrapper) getChannelClient(channelId, signer string) (channelClient return nil, err } w.gwClients[signer] = gatewayClient - channelClientsForSigner = make(map[string]*gateway.Network) + gatewayClientsForSigner = make(map[string]*gateway.Network) + w.gwGatewayClients[signer] = gatewayClientsForSigner + } + + gatewayClient = gatewayClientsForSigner[channelId] + if gatewayClient == nil { + client := w.gwClients[signer] + gatewayClient, err = w.networkCreator(client, channelId) + if err != nil { + return nil, err + } + gatewayClientsForSigner[channelId] = gatewayClient + } + return gatewayClient, nil +} + +// channel clients for queries are created with the channel client API, so that we can dictate the target +// peer to be the single peer that this fabconnect instance is attached to. This is more useful than trying to +// do a "strong read" across multiple peers +func (w *gwRPCWrapper) getChannelClient(channelId, signer string) (channelClient *channel.Client, err error) { + channelClientsForSigner := w.gwChannelClients[signer] + if channelClientsForSigner == nil { + channelClientsForSigner = make(map[string]*channel.Client) w.gwChannelClients[signer] = channelClientsForSigner } channelClient = channelClientsForSigner[channelId] if channelClient == nil { - client := w.gwClients[signer] - channelClient, err = w.networkCreator(client, channelId) + sdk := w.ledgerClientWrapper.sdk + org, err := getOrgFromConfig(w.configProvider) if err != nil { return nil, err } + clientChannelContext := sdk.ChannelContext(channelId, fabsdk.WithUser(signer), fabsdk.WithOrg(org)) + // Channel client is used to query and execute transactions (Org1 is default org) + channelClient, err = w.channelCreator(clientChannelContext) + if err != nil { + return nil, errors.Errorf("Failed to create new channel client: %s", err) + } channelClientsForSigner[channelId] = channelClient } return channelClient, nil diff --git a/internal/fabric/client/rpc_test.go b/internal/fabric/client/rpc_test.go index 76dc8e3..e89798d 100644 --- a/internal/fabric/client/rpc_test.go +++ b/internal/fabric/client/rpc_test.go @@ -182,13 +182,40 @@ func TestGatewayClientInstantiation(t *testing.T) { wrapper.gatewayCreator = createMockGateway wrapper.networkCreator = createMockNetwork - client, err := wrapper.getChannelClient("default-channel", "user1") + client, err := wrapper.getGatewayClient("default-channel", "user1") assert.NoError(err) assert.NotNil(client) assert.Equal(1, len(wrapper.gwClients)) + assert.Equal(0, len(wrapper.gwChannelClients)) gw := wrapper.gwClients["user1"] assert.NotNil(gw) + assert.Equal(1, len(wrapper.gwGatewayClients)) + assert.Equal(1, len(wrapper.gwGatewayClients["user1"])) + assert.Equal(client, wrapper.gwGatewayClients["user1"]["default-channel"]) +} + +func TestChannelClientInstantiation(t *testing.T) { + assert := assert.New(t) + + config := conf.RPCConf{ + UseGatewayClient: true, + ConfigPath: tmpShortCCPFile, + } + rpc, idclient, err := RPCConnect(config, 5) + assert.NoError(err) + assert.NotNil(rpc) + assert.NotNil(idclient) + + wrapper, ok := rpc.(*gwRPCWrapper) + assert.True(ok) + + wrapper.channelCreator = createMockChannelClient + client, err := wrapper.getChannelClient("default-channel", "user1") + assert.NoError(err) + assert.NotNil(client) + assert.Equal(0, len(wrapper.gwClients)) + assert.Equal(0, len(wrapper.gwGatewayClients)) assert.Equal(1, len(wrapper.gwChannelClients)) assert.Equal(1, len(wrapper.gwChannelClients["user1"])) assert.Equal(client, wrapper.gwChannelClients["user1"]["default-channel"]) From 0241c7bf1fa82afac0085ef918bb5dd6519936bf Mon Sep 17 00:00:00 2001 From: Jim Zhang Date: Tue, 8 Mar 2022 14:19:15 -0500 Subject: [PATCH 2/5] Refactoring to move common code b/w CCP and Gateway clients Signed-off-by: Jim Zhang --- internal/fabric/client/client_ccp.go | 108 +++--------------- internal/fabric/client/client_common.go | 88 ++++++++++++++ .../client/client_gateway_clientside.go | 95 +++------------ 3 files changed, 120 insertions(+), 171 deletions(-) diff --git a/internal/fabric/client/client_ccp.go b/internal/fabric/client/client_ccp.go index 18bd683..c64e874 100644 --- a/internal/fabric/client/client_ccp.go +++ b/internal/fabric/client/client_ccp.go @@ -28,8 +28,6 @@ import ( "github.com/hyperledger/fabric-sdk-go/pkg/fabsdk" mspImpl "github.com/hyperledger/fabric-sdk-go/pkg/msp" "github.com/hyperledger/firefly-fabconnect/internal/errors" - eventsapi "github.com/hyperledger/firefly-fabconnect/internal/events/api" - "github.com/hyperledger/firefly-fabconnect/internal/fabric/utils" log "github.com/sirupsen/logrus" ) @@ -42,15 +40,9 @@ type ccpClientWrapper struct { } type ccpRPCWrapper struct { - txTimeout int - configProvider core.ConfigProvider - sdk *fabsdk.FabricSDK - cryptoSuiteConfig core.CryptoSuiteConfig - userStore msp.UserStore - idClient IdentityClient - ledgerClientWrapper *ledgerClientWrapper - eventClientWrapper *eventClientWrapper - channelCreator channelCreator + *commonRPCWrapper + cryptoSuiteConfig core.CryptoSuiteConfig + userStore msp.UserStore // one channel client per channel ID, per signer ID channelClients map[string](map[string]*ccpClientWrapper) } @@ -69,16 +61,18 @@ func newRPCClientFromCCP(configProvider core.ConfigProvider, txTimeout int, user log.Infof("New gRPC connection established") w := &ccpRPCWrapper{ - sdk: ledgerClientWrapper.sdk, - configProvider: configProvider, - cryptoSuiteConfig: cryptoConfig, - userStore: userStore, - idClient: idClient, - ledgerClientWrapper: ledgerClientWrapper, - eventClientWrapper: eventClientWrapper, - channelClients: make(map[string]map[string]*ccpClientWrapper), - channelCreator: createChannelClient, - txTimeout: txTimeout, + commonRPCWrapper: &commonRPCWrapper{ + sdk: ledgerClientWrapper.sdk, + configProvider: configProvider, + idClient: idClient, + ledgerClientWrapper: ledgerClientWrapper, + eventClientWrapper: eventClientWrapper, + channelCreator: createChannelClient, + txTimeout: txTimeout, + }, + cryptoSuiteConfig: cryptoConfig, + userStore: userStore, + channelClients: make(map[string]map[string]*ccpClientWrapper), } return w, nil } @@ -128,59 +122,6 @@ func (w *ccpRPCWrapper) Query(channelId, signer, chaincodeName, method string, a return result.Payload, nil } -func (w *ccpRPCWrapper) QueryTransaction(channelId, signer, txId string) (map[string]interface{}, error) { - log.Tracef("RPC [%s] --> QueryTransaction %s", channelId, txId) - - result, err := w.ledgerClientWrapper.queryTransaction(channelId, signer, txId) - if err != nil { - log.Errorf("Failed to query transaction on channel %s. %s", channelId, err) - return nil, err - } - - log.Tracef("RPC [%s] <-- %+v", channelId, result) - return result, nil -} - -func (w *ccpRPCWrapper) QueryChainInfo(channelId, signer string) (*fab.BlockchainInfoResponse, error) { - log.Tracef("RPC [%s] --> ChainInfo", channelId) - - result, err := w.ledgerClientWrapper.queryChainInfo(channelId, signer) - if err != nil { - log.Errorf("Failed to query chain info on channel %s. %s", channelId, err) - return nil, err - } - - log.Tracef("RPC [%s] <-- %+v", channelId, result) - return result, nil -} - -func (w *ccpRPCWrapper) QueryBlock(channelId string, blockNumber uint64, signer string) (*utils.RawBlock, *utils.Block, error) { - log.Tracef("RPC [%s] --> QueryBlock %v", channelId, blockNumber) - - rawblock, block, err := w.ledgerClientWrapper.queryBlock(channelId, blockNumber, signer) - if err != nil { - log.Errorf("Failed to query block %v on channel %s. %s", blockNumber, channelId, err) - return nil, nil, err - } - - log.Tracef("RPC [%s] <-- success", channelId) - return rawblock, block, nil -} - -// The returned registration must be closed when done -func (w *ccpRPCWrapper) SubscribeEvent(subInfo *eventsapi.SubscriptionInfo, since uint64) (*RegistrationWrapper, <-chan *fab.BlockEvent, <-chan *fab.CCEvent, error) { - reg, blockEventCh, ccEventCh, err := w.eventClientWrapper.subscribeEvent(subInfo, since) - if err != nil { - log.Errorf("Failed to subscribe to event [%s:%s:%s]. %s", subInfo.Stream, subInfo.ChannelId, subInfo.Filter.ChaincodeId, err) - return nil, nil, nil, err - } - return reg, blockEventCh, ccEventCh, nil -} - -func (w *ccpRPCWrapper) Unregister(regWrapper *RegistrationWrapper) { - regWrapper.eventClient.Unregister(regWrapper.registration) -} - func (w *ccpRPCWrapper) getChannelClient(channelId string, signer string) (*ccpClientWrapper, error) { id, err := w.idClient.GetSigningIdentity(signer) if err == msp.ErrUserNotFound { @@ -248,22 +189,3 @@ func (w *ccpRPCWrapper) sendTransaction(channelId, signer, chaincodeName, method } return client.signer, result.Payload, &txStatus, nil } - -func convert(args []string) [][]byte { - result := [][]byte{} - for _, v := range args { - result = append(result, []byte(v)) - } - return result -} - -func newReceipt(responsePayload []byte, status *fab.TxStatusEvent, signerID *msp.IdentityIdentifier) *TxReceipt { - return &TxReceipt{ - SignerMSP: signerID.MSPID, - Signer: signerID.ID, - TransactionID: status.TxID, - Status: status.TxValidationCode, - BlockNumber: status.BlockNumber, - SourcePeer: status.SourceURL, - } -} diff --git a/internal/fabric/client/client_common.go b/internal/fabric/client/client_common.go index df070fc..3eba440 100644 --- a/internal/fabric/client/client_common.go +++ b/internal/fabric/client/client_common.go @@ -6,9 +6,25 @@ import ( "github.com/hyperledger/fabric-sdk-go/pkg/client/channel" "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/context" "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/core" + "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/fab" + "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/msp" + "github.com/hyperledger/fabric-sdk-go/pkg/fabsdk" "github.com/hyperledger/firefly-fabconnect/internal/errors" + eventsapi "github.com/hyperledger/firefly-fabconnect/internal/events/api" + "github.com/hyperledger/firefly-fabconnect/internal/fabric/utils" + log "github.com/sirupsen/logrus" ) +type commonRPCWrapper struct { + txTimeout int + configProvider core.ConfigProvider + sdk *fabsdk.FabricSDK + idClient IdentityClient + ledgerClientWrapper *ledgerClientWrapper + eventClientWrapper *eventClientWrapper + channelCreator channelCreator +} + func getOrgFromConfig(config core.ConfigProvider) (string, error) { configBackend, err := config() if err != nil { @@ -51,3 +67,75 @@ type channelCreator func(context.ChannelProvider) (*channel.Client, error) func createChannelClient(channelProvider context.ChannelProvider) (*channel.Client, error) { return channel.New(channelProvider) } + +func newReceipt(responsePayload []byte, status *fab.TxStatusEvent, signerID *msp.IdentityIdentifier) *TxReceipt { + return &TxReceipt{ + SignerMSP: signerID.MSPID, + Signer: signerID.ID, + TransactionID: status.TxID, + Status: status.TxValidationCode, + BlockNumber: status.BlockNumber, + SourcePeer: status.SourceURL, + } +} + +func convert(args []string) [][]byte { + result := [][]byte{} + for _, v := range args { + result = append(result, []byte(v)) + } + return result +} + +func (w *commonRPCWrapper) QueryChainInfo(channelId, signer string) (*fab.BlockchainInfoResponse, error) { + log.Tracef("RPC [%s] --> ChainInfo", channelId) + + result, err := w.ledgerClientWrapper.queryChainInfo(channelId, signer) + if err != nil { + log.Errorf("Failed to query chain info on channel %s. %s", channelId, err) + return nil, err + } + + log.Tracef("RPC [%s] <-- %+v", channelId, result) + return result, nil +} + +func (w *commonRPCWrapper) QueryBlock(channelId string, blockNumber uint64, signer string) (*utils.RawBlock, *utils.Block, error) { + log.Tracef("RPC [%s] --> QueryBlock %v", channelId, blockNumber) + + rawblock, block, err := w.ledgerClientWrapper.queryBlock(channelId, blockNumber, signer) + if err != nil { + log.Errorf("Failed to query block %v on channel %s. %s", blockNumber, channelId, err) + return nil, nil, err + } + + log.Tracef("RPC [%s] <-- success", channelId) + return rawblock, block, nil +} + +func (w *commonRPCWrapper) QueryTransaction(channelId, signer, txId string) (map[string]interface{}, error) { + log.Tracef("RPC [%s] --> QueryTransaction %s", channelId, txId) + + result, err := w.ledgerClientWrapper.queryTransaction(channelId, signer, txId) + if err != nil { + log.Errorf("Failed to query transaction on channel %s. %s", channelId, err) + return nil, err + } + + log.Tracef("RPC [%s] <-- %+v", channelId, result) + return result, nil +} + +// The returned registration must be closed when done +func (w *commonRPCWrapper) SubscribeEvent(subInfo *eventsapi.SubscriptionInfo, since uint64) (*RegistrationWrapper, <-chan *fab.BlockEvent, <-chan *fab.CCEvent, error) { + reg, blockEventCh, ccEventCh, err := w.eventClientWrapper.subscribeEvent(subInfo, since) + if err != nil { + log.Errorf("Failed to subscribe to event [%s:%s:%s]. %s", subInfo.Stream, subInfo.ChannelId, subInfo.Filter.ChaincodeId, err) + return nil, nil, nil, err + } + return reg, blockEventCh, ccEventCh, nil +} + +func (w *commonRPCWrapper) Unregister(regWrapper *RegistrationWrapper) { + regWrapper.eventClient.Unregister(regWrapper.registration) +} diff --git a/internal/fabric/client/client_gateway_clientside.go b/internal/fabric/client/client_gateway_clientside.go index 3bb9348..8c764f6 100644 --- a/internal/fabric/client/client_gateway_clientside.go +++ b/internal/fabric/client/client_gateway_clientside.go @@ -27,8 +27,6 @@ import ( "github.com/hyperledger/fabric-sdk-go/pkg/fabsdk" "github.com/hyperledger/fabric-sdk-go/pkg/gateway" "github.com/hyperledger/firefly-fabconnect/internal/errors" - eventsapi "github.com/hyperledger/firefly-fabconnect/internal/events/api" - "github.com/hyperledger/firefly-fabconnect/internal/fabric/utils" log "github.com/sirupsen/logrus" ) @@ -37,14 +35,9 @@ type gatewayCreator func(core.ConfigProvider, string, int) (*gateway.Gateway, er type networkCreator func(*gateway.Gateway, string) (*gateway.Network, error) type gwRPCWrapper struct { - txTimeout int - configProvider core.ConfigProvider - idClient IdentityClient - ledgerClientWrapper *ledgerClientWrapper - eventClientWrapper *eventClientWrapper - gatewayCreator gatewayCreator - networkCreator networkCreator - channelCreator channelCreator + *commonRPCWrapper + gatewayCreator gatewayCreator + networkCreator networkCreator // networkCreator networkC // one gateway client per signer gwClients map[string]*gateway.Gateway @@ -56,17 +49,19 @@ type gwRPCWrapper struct { func newRPCClientWithClientSideGateway(configProvider core.ConfigProvider, txTimeout int, idClient IdentityClient, ledgerClientWrapper *ledgerClientWrapper, eventClientWrapper *eventClientWrapper) (RPCClient, error) { return &gwRPCWrapper{ - txTimeout: txTimeout, - configProvider: configProvider, - idClient: idClient, - ledgerClientWrapper: ledgerClientWrapper, - eventClientWrapper: eventClientWrapper, - gatewayCreator: createGateway, - networkCreator: getNetwork, - channelCreator: createChannelClient, - gwClients: make(map[string]*gateway.Gateway), - gwGatewayClients: make(map[string]map[string]*gateway.Network), - gwChannelClients: make(map[string]map[string]*channel.Client), + commonRPCWrapper: &commonRPCWrapper{ + txTimeout: txTimeout, + configProvider: configProvider, + idClient: idClient, + ledgerClientWrapper: ledgerClientWrapper, + eventClientWrapper: eventClientWrapper, + channelCreator: createChannelClient, + }, + gatewayCreator: createGateway, + networkCreator: getNetwork, + gwClients: make(map[string]*gateway.Gateway), + gwGatewayClients: make(map[string]map[string]*gateway.Network), + gwChannelClients: make(map[string]map[string]*channel.Client), }, nil } @@ -100,10 +95,7 @@ func (w *gwRPCWrapper) Query(channelId, signer, chaincodeName, method string, ar return nil, err } - bytes := make([][]byte, len(args)) - for i, v := range args { - bytes[i] = []byte(v) - } + bytes := convert(args) req := channel.Request{ ChaincodeID: chaincodeName, Fcn: method, @@ -119,59 +111,6 @@ func (w *gwRPCWrapper) Query(channelId, signer, chaincodeName, method string, ar return result.Payload, nil } -func (w *gwRPCWrapper) QueryChainInfo(channelId, signer string) (*fab.BlockchainInfoResponse, error) { - log.Tracef("RPC [%s] --> ChainInfo", channelId) - - result, err := w.ledgerClientWrapper.queryChainInfo(channelId, signer) - if err != nil { - log.Errorf("Failed to query chain info on channel %s. %s", channelId, err) - return nil, err - } - - log.Tracef("RPC [%s] <-- %+v", channelId, result) - return result, nil -} - -func (w *gwRPCWrapper) QueryBlock(channelId string, blockNumber uint64, signer string) (*utils.RawBlock, *utils.Block, error) { - log.Tracef("RPC [%s] --> QueryBlock %v", channelId, blockNumber) - - rawblock, block, err := w.ledgerClientWrapper.queryBlock(channelId, blockNumber, signer) - if err != nil { - log.Errorf("Failed to query block %v on channel %s. %s", blockNumber, channelId, err) - return nil, nil, err - } - - log.Tracef("RPC [%s] <-- success", channelId) - return rawblock, block, nil -} - -func (w *gwRPCWrapper) QueryTransaction(channelId, signer, txId string) (map[string]interface{}, error) { - log.Tracef("RPC [%s] --> QueryTransaction %s", channelId, txId) - - result, err := w.ledgerClientWrapper.queryTransaction(channelId, signer, txId) - if err != nil { - log.Errorf("Failed to query transaction on channel %s. %s", channelId, err) - return nil, err - } - - log.Tracef("RPC [%s] <-- %+v", channelId, result) - return result, nil -} - -// The returned registration must be closed when done -func (w *gwRPCWrapper) SubscribeEvent(subInfo *eventsapi.SubscriptionInfo, since uint64) (*RegistrationWrapper, <-chan *fab.BlockEvent, <-chan *fab.CCEvent, error) { - reg, blockEventCh, ccEventCh, err := w.eventClientWrapper.subscribeEvent(subInfo, since) - if err != nil { - log.Errorf("Failed to subscribe to event [%s:%s:%s]. %s", subInfo.Stream, subInfo.ChannelId, subInfo.Filter.ChaincodeId, err) - return nil, nil, nil, err - } - return reg, blockEventCh, ccEventCh, nil -} - -func (w *gwRPCWrapper) Unregister(regWrapper *RegistrationWrapper) { - regWrapper.eventClient.Unregister(regWrapper.registration) -} - func (w *gwRPCWrapper) Close() error { // the ledgerClientWrapper and the eventClientWrapper share the same sdk instance // only need to close it from one of them From e0cb0ad298bb9842894a9e14f28e2ad852a8dd0c Mon Sep 17 00:00:00 2001 From: Jim Zhang Date: Tue, 8 Mar 2022 15:38:50 -0500 Subject: [PATCH 3/5] Added option to use strongread for chaincode queries Signed-off-by: Jim Zhang --- internal/fabric/client/api.go | 2 +- internal/fabric/client/client_ccp.go | 35 +++++++------ .../client/client_gateway_clientside.go | 52 ++++++++++++------- internal/fabric/test/helper.go | 2 +- internal/messages/messages.go | 5 +- internal/rest/sync/syncdispatcher.go | 2 +- internal/rest/utils/params.go | 13 +++++ mocks/fabric/client/rpc_client.go | 14 ++--- 8 files changed, 80 insertions(+), 45 deletions(-) diff --git a/internal/fabric/client/api.go b/internal/fabric/client/api.go index a30a906..e355ac8 100644 --- a/internal/fabric/client/api.go +++ b/internal/fabric/client/api.go @@ -53,7 +53,7 @@ type RegistrationWrapper struct { type RPCClient interface { Invoke(channelId, signer, chaincodeName, method string, args []string, isInit bool) (*TxReceipt, error) - Query(channelId, signer, chaincodeName, method string, args []string) ([]byte, error) + Query(channelId, signer, chaincodeName, method string, args []string, strongread bool) ([]byte, error) QueryChainInfo(channelId, signer string) (*fab.BlockchainInfoResponse, error) QueryBlock(channelId string, blockNumber uint64, signer string) (*utils.RawBlock, *utils.Block, error) QueryTransaction(channelId, signer, txId string) (map[string]interface{}, error) diff --git a/internal/fabric/client/client_ccp.go b/internal/fabric/client/client_ccp.go index c64e874..aaf3beb 100644 --- a/internal/fabric/client/client_ccp.go +++ b/internal/fabric/client/client_ccp.go @@ -90,7 +90,7 @@ func (w *ccpRPCWrapper) Invoke(channelId, signer, chaincodeName, method string, return newReceipt(result, txStatus, signerID), err } -func (w *ccpRPCWrapper) Query(channelId, signer, chaincodeName, method string, args []string) ([]byte, error) { +func (w *ccpRPCWrapper) Query(channelId, signer, chaincodeName, method string, args []string, strongread bool) ([]byte, error) { log.Tracef("RPC [%s:%s:%s] --> %+v", channelId, chaincodeName, method, args) client, err := w.getChannelClient(channelId, signer) @@ -99,23 +99,28 @@ func (w *ccpRPCWrapper) Query(channelId, signer, chaincodeName, method string, a return nil, errors.Errorf("Failed to get channel client. %s", err) } - peerEndpoint, err := getFirstPeerEndpointFromConfig(w.configProvider) - if err != nil { - return nil, err + req := channel.Request{ + ChaincodeID: chaincodeName, + Fcn: method, + Args: convert(args), } - result, err := client.channelClient.Query( - channel.Request{ - ChaincodeID: chaincodeName, - Fcn: method, - Args: convert(args), - }, - channel.WithRetry(retry.DefaultChannelOpts), - channel.WithTargetEndpoints(peerEndpoint), - ) - if err != nil { + var result channel.Response + var err1 error + if strongread { + // strongread means querying a set of peers that would have fulfilled the + // endorsement policies and make sure they all have the same results + result, err1 = client.channelClient.Query(req, channel.WithRetry(retry.DefaultChannelOpts)) + } else { + peerEndpoint, err := getFirstPeerEndpointFromConfig(w.configProvider) + if err != nil { + return nil, err + } + result, err1 = client.channelClient.Query(req, channel.WithRetry(retry.DefaultChannelOpts), channel.WithTargetEndpoints(peerEndpoint)) + } + if err1 != nil { log.Errorf("Failed to send query [%s:%s:%s]. %s", channelId, chaincodeName, method, err) - return nil, err + return nil, err1 } log.Tracef("RPC [%s:%s:%s] <-- %+v", channelId, chaincodeName, method, result) diff --git a/internal/fabric/client/client_gateway_clientside.go b/internal/fabric/client/client_gateway_clientside.go index 8c764f6..9cebd4e 100644 --- a/internal/fabric/client/client_gateway_clientside.go +++ b/internal/fabric/client/client_gateway_clientside.go @@ -82,7 +82,7 @@ func (w *gwRPCWrapper) Invoke(channelId, signer, chaincodeName, method string, a return newReceipt(result, txStatus, signingId.Identifier()), err } -func (w *gwRPCWrapper) Query(channelId, signer, chaincodeName, method string, args []string) ([]byte, error) { +func (w *gwRPCWrapper) Query(channelId, signer, chaincodeName, method string, args []string, strongread bool) ([]byte, error) { log.Tracef("RPC [%s:%s:%s] --> %+v", channelId, chaincodeName, method, args) client, err := w.getChannelClient(channelId, signer) @@ -90,25 +90,41 @@ func (w *gwRPCWrapper) Query(channelId, signer, chaincodeName, method string, ar return nil, errors.Errorf("Failed to get channel client. %s", err) } - peerEndpoint, err := getFirstPeerEndpointFromConfig(w.configProvider) - if err != nil { - return nil, err - } + if strongread { + client, err := w.getGatewayClient(channelId, signer) + if err != nil { + return nil, errors.Errorf("Failed to get gateway client. %s", err) + } + contractClient := client.GetContract(chaincodeName) + result, err := contractClient.EvaluateTransaction(method, args...) + if err != nil { + log.Errorf("Failed to send query [%s:%s:%s]. %s", channelId, chaincodeName, method, err) + return nil, err + } - bytes := convert(args) - req := channel.Request{ - ChaincodeID: chaincodeName, - Fcn: method, - Args: bytes, - } - result, err := client.Query(req, channel.WithRetry(retry.DefaultChannelOpts), channel.WithTargetEndpoints(peerEndpoint)) - if err != nil { - log.Errorf("Failed to send query [%s:%s:%s]. %s", channelId, chaincodeName, method, err) - return nil, err - } + log.Tracef("RPC [%s:%s:%s] <-- %+v", channelId, chaincodeName, method, result) + return result, nil + } else { + peerEndpoint, err := getFirstPeerEndpointFromConfig(w.configProvider) + if err != nil { + return nil, err + } + + bytes := convert(args) + req := channel.Request{ + ChaincodeID: chaincodeName, + Fcn: method, + Args: bytes, + } + result, err := client.Query(req, channel.WithRetry(retry.DefaultChannelOpts), channel.WithTargetEndpoints(peerEndpoint)) + if err != nil { + log.Errorf("Failed to send query [%s:%s:%s]. %s", channelId, chaincodeName, method, err) + return nil, err + } - log.Tracef("RPC [%s:%s:%s] <-- %+v", channelId, chaincodeName, method, result) - return result.Payload, nil + log.Tracef("RPC [%s:%s:%s] <-- %+v", channelId, chaincodeName, method, result) + return result.Payload, nil + } } func (w *gwRPCWrapper) Close() error { diff --git a/internal/fabric/test/helper.go b/internal/fabric/test/helper.go index 0dfba86..ce3066d 100644 --- a/internal/fabric/test/helper.go +++ b/internal/fabric/test/helper.go @@ -52,7 +52,7 @@ func MockRPCClient(fromBlock string, withReset ...bool) *mockfabric.RPCClient { chaincodeResult := []byte(`{"AppraisedValue":123000,"Color":"red","ID":"asset01","Owner":"Tom","Size":10}`) txResult["transaction"] = tx1 rpc.On("SubscribeEvent", mock.Anything, mock.Anything).Return(nil, roBlockEventChan, roCCEventChan, nil) - rpc.On("Query", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(chaincodeResult, nil) + rpc.On("Query", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(chaincodeResult, nil) rpc.On("QueryChainInfo", mock.Anything, mock.Anything).Return(res, nil) rpc.On("QueryBlock", mock.Anything, mock.Anything, mock.Anything).Return(rawBlock, block, nil) rpc.On("QueryTransaction", mock.Anything, mock.Anything, mock.Anything).Return(txResult, nil) diff --git a/internal/messages/messages.go b/internal/messages/messages.go index 32b5b2a..d5603b9 100644 --- a/internal/messages/messages.go +++ b/internal/messages/messages.go @@ -94,8 +94,9 @@ func (r *ReplyCommon) ReplyHeaders() *ReplyHeaders { // QueryChaincode message instructs the bridge to install a contract type QueryChaincode struct { RequestCommon - Function string `json:"func"` - Args []string `json:"args,omitempty"` + Function string `json:"func"` + Args []string `json:"args,omitempty"` + StrongRead bool `json:"strongread"` } type GetTxById struct { diff --git a/internal/rest/sync/syncdispatcher.go b/internal/rest/sync/syncdispatcher.go index acbd9fc..a9fb667 100644 --- a/internal/rest/sync/syncdispatcher.go +++ b/internal/rest/sync/syncdispatcher.go @@ -188,7 +188,7 @@ func (d *syncDispatcher) QueryChaincode(res http.ResponseWriter, req *http.Reque return } - result, err1 := d.processor.GetRPCClient().Query(msg.Headers.ChannelID, msg.Headers.Signer, msg.Headers.ChaincodeName, msg.Function, msg.Args) + result, err1 := d.processor.GetRPCClient().Query(msg.Headers.ChannelID, msg.Headers.Signer, msg.Headers.ChaincodeName, msg.Function, msg.Args, msg.StrongRead) callTime := time.Now().UTC().Sub(start) if err1 != nil { log.Warnf("Query [chaincode=%s, func=%s] failed to send: %s [%.2fs]", msg.Headers.ChaincodeName, msg.Function, err1, callTime.Seconds()) diff --git a/internal/rest/utils/params.go b/internal/rest/utils/params.go index 58eb6a7..9405dd0 100644 --- a/internal/rest/utils/params.go +++ b/internal/rest/utils/params.go @@ -118,6 +118,19 @@ func BuildQueryMessage(res http.ResponseWriter, req *http.Request, params httpro return nil, NewRestError(err.Error(), 400) } msg.Args = argsVal + strongread := body["strongread"] + if strongread != nil { + strVal, ok := strongread.(string) + if ok { + isStrongread, err := strconv.ParseBool(strVal) + if err != nil { + return nil, NewRestError(err.Error(), 400) + } + msg.StrongRead = isStrongread + } else { + msg.StrongRead = strongread.(bool) + } + } return &msg, nil } diff --git a/mocks/fabric/client/rpc_client.go b/mocks/fabric/client/rpc_client.go index 2037146..098ed0a 100644 --- a/mocks/fabric/client/rpc_client.go +++ b/mocks/fabric/client/rpc_client.go @@ -55,13 +55,13 @@ func (_m *RPCClient) Invoke(channelId string, signer string, chaincodeName strin return r0, r1 } -// Query provides a mock function with given fields: channelId, signer, chaincodeName, method, args -func (_m *RPCClient) Query(channelId string, signer string, chaincodeName string, method string, args []string) ([]byte, error) { - ret := _m.Called(channelId, signer, chaincodeName, method, args) +// Query provides a mock function with given fields: channelId, signer, chaincodeName, method, args, strongread +func (_m *RPCClient) Query(channelId string, signer string, chaincodeName string, method string, args []string, strongread bool) ([]byte, error) { + ret := _m.Called(channelId, signer, chaincodeName, method, args, strongread) var r0 []byte - if rf, ok := ret.Get(0).(func(string, string, string, string, []string) []byte); ok { - r0 = rf(channelId, signer, chaincodeName, method, args) + if rf, ok := ret.Get(0).(func(string, string, string, string, []string, bool) []byte); ok { + r0 = rf(channelId, signer, chaincodeName, method, args, strongread) } else { if ret.Get(0) != nil { r0 = ret.Get(0).([]byte) @@ -69,8 +69,8 @@ func (_m *RPCClient) Query(channelId string, signer string, chaincodeName string } var r1 error - if rf, ok := ret.Get(1).(func(string, string, string, string, []string) error); ok { - r1 = rf(channelId, signer, chaincodeName, method, args) + if rf, ok := ret.Get(1).(func(string, string, string, string, []string, bool) error); ok { + r1 = rf(channelId, signer, chaincodeName, method, args, strongread) } else { r1 = ret.Error(1) } From 4c661d191517ca0d58b75b82460633ea1dd53af6 Mon Sep 17 00:00:00 2001 From: Jim Zhang Date: Tue, 8 Mar 2022 22:19:37 -0500 Subject: [PATCH 4/5] Add QueryBlockByTxId, and QueryBlockByHash support Signed-off-by: Jim Zhang --- internal/events/subscription.go | 2 +- internal/fabric/client/api.go | 3 +- internal/fabric/client/client_common.go | 19 ++++++++-- internal/fabric/client/ledger.go | 24 +++++++++++- internal/fabric/test/helper.go | 3 +- internal/messages/messages.go | 8 +++- internal/rest/restgateway_test.go | 10 +++++ internal/rest/router.go | 7 ++++ internal/rest/sync/syncdispatcher.go | 24 +++++++++++- internal/rest/utils/params.go | 45 +++++++++++++++++++--- mocks/fabric/client/rpc_client.go | 50 ++++++++++++++++++++----- 11 files changed, 171 insertions(+), 24 deletions(-) diff --git a/internal/events/subscription.go b/internal/events/subscription.go index 01b8c07..f1c2cc3 100644 --- a/internal/events/subscription.go +++ b/internal/events/subscription.go @@ -157,7 +157,7 @@ func (s *subscription) getEventTimestamp(evt *eventsapi.EventEntry) { return } // we didn't find the timestamp in our cache, query the node for the block header where we can find the timestamp - _, block, err := s.client.QueryBlock(s.info.ChannelId, evt.BlockNumber, s.info.Signer) + _, block, err := s.client.QueryBlock(s.info.ChannelId, s.info.Signer, evt.BlockNumber, nil) if err != nil { log.Errorf("Unable to retrieve block[%s] timestamp: %s", blockNumber, err) evt.Timestamp = 0 // set to 0, we were not able to retrieve the timestamp. diff --git a/internal/fabric/client/api.go b/internal/fabric/client/api.go index e355ac8..546ff7d 100644 --- a/internal/fabric/client/api.go +++ b/internal/fabric/client/api.go @@ -55,7 +55,8 @@ type RPCClient interface { Invoke(channelId, signer, chaincodeName, method string, args []string, isInit bool) (*TxReceipt, error) Query(channelId, signer, chaincodeName, method string, args []string, strongread bool) ([]byte, error) QueryChainInfo(channelId, signer string) (*fab.BlockchainInfoResponse, error) - QueryBlock(channelId string, blockNumber uint64, signer string) (*utils.RawBlock, *utils.Block, error) + QueryBlock(channelId string, signer string, blocknumber uint64, blockhash []byte) (*utils.RawBlock, *utils.Block, error) + QueryBlockByTxId(channelId string, signer string, txId string) (*utils.RawBlock, *utils.Block, error) QueryTransaction(channelId, signer, txId string) (map[string]interface{}, error) SubscribeEvent(subInfo *eventsapi.SubscriptionInfo, since uint64) (*RegistrationWrapper, <-chan *fab.BlockEvent, <-chan *fab.CCEvent, error) Unregister(*RegistrationWrapper) diff --git a/internal/fabric/client/client_common.go b/internal/fabric/client/client_common.go index 3eba440..b9951ba 100644 --- a/internal/fabric/client/client_common.go +++ b/internal/fabric/client/client_common.go @@ -88,7 +88,7 @@ func convert(args []string) [][]byte { } func (w *commonRPCWrapper) QueryChainInfo(channelId, signer string) (*fab.BlockchainInfoResponse, error) { - log.Tracef("RPC [%s] --> ChainInfo", channelId) + log.Tracef("RPC [%s] --> QueryChainInfo", channelId) result, err := w.ledgerClientWrapper.queryChainInfo(channelId, signer) if err != nil { @@ -100,10 +100,10 @@ func (w *commonRPCWrapper) QueryChainInfo(channelId, signer string) (*fab.Blockc return result, nil } -func (w *commonRPCWrapper) QueryBlock(channelId string, blockNumber uint64, signer string) (*utils.RawBlock, *utils.Block, error) { +func (w *commonRPCWrapper) QueryBlock(channelId string, signer string, blockNumber uint64, blockhash []byte) (*utils.RawBlock, *utils.Block, error) { log.Tracef("RPC [%s] --> QueryBlock %v", channelId, blockNumber) - rawblock, block, err := w.ledgerClientWrapper.queryBlock(channelId, blockNumber, signer) + rawblock, block, err := w.ledgerClientWrapper.queryBlock(channelId, signer, blockNumber, blockhash) if err != nil { log.Errorf("Failed to query block %v on channel %s. %s", blockNumber, channelId, err) return nil, nil, err @@ -113,6 +113,19 @@ func (w *commonRPCWrapper) QueryBlock(channelId string, blockNumber uint64, sign return rawblock, block, nil } +func (w *commonRPCWrapper) QueryBlockByTxId(channelId string, signer string, txId string) (*utils.RawBlock, *utils.Block, error) { + log.Tracef("RPC [%s] --> QueryBlockByTxId %s", channelId, txId) + + rawblock, block, err := w.ledgerClientWrapper.queryBlockByTxId(channelId, signer, txId) + if err != nil { + log.Errorf("Failed to query block by transaction Id %s on channel %s. %s", txId, channelId, err) + return nil, nil, err + } + + log.Tracef("RPC [%s] <-- success", channelId) + return rawblock, block, nil +} + func (w *commonRPCWrapper) QueryTransaction(channelId, signer, txId string) (map[string]interface{}, error) { log.Tracef("RPC [%s] --> QueryTransaction %s", channelId, txId) diff --git a/internal/fabric/client/ledger.go b/internal/fabric/client/ledger.go index 72e9331..46ea0c1 100644 --- a/internal/fabric/client/ledger.go +++ b/internal/fabric/client/ledger.go @@ -17,6 +17,7 @@ package client import ( + "github.com/hyperledger/fabric-protos-go/common" "github.com/hyperledger/fabric-sdk-go/pkg/client/ledger" "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/context" "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/core" @@ -58,12 +59,31 @@ func (l *ledgerClientWrapper) queryChainInfo(channelId, signer string) (*fab.Blo return result, nil } -func (l *ledgerClientWrapper) queryBlock(channelId string, blockNumber uint64, signer string) (*utils.RawBlock, *utils.Block, error) { +func (l *ledgerClientWrapper) queryBlock(channelId string, signer string, blockNumber uint64, blockhash []byte) (*utils.RawBlock, *utils.Block, error) { client, err := l.getLedgerClient(channelId, signer) if err != nil { return nil, nil, errors.Errorf("Failed to get channel client. %s", err) } - result, err := client.QueryBlock(blockNumber) + var result *common.Block + var err1 error + if blockhash == nil { + result, err1 = client.QueryBlock(blockNumber) + } else { + result, err1 = client.QueryBlockByHash(blockhash) + } + if err1 != nil { + return nil, nil, err1 + } + rawblock, block, err := utils.DecodeBlock(result) + return rawblock, block, err +} + +func (l *ledgerClientWrapper) queryBlockByTxId(channelId string, signer string, txId string) (*utils.RawBlock, *utils.Block, error) { + client, err := l.getLedgerClient(channelId, signer) + if err != nil { + return nil, nil, errors.Errorf("Failed to get channel client. %s", err) + } + result, err := client.QueryBlockByTxID(fab.TransactionID(txId)) if err != nil { return nil, nil, err } diff --git a/internal/fabric/test/helper.go b/internal/fabric/test/helper.go index ce3066d..cfa84dc 100644 --- a/internal/fabric/test/helper.go +++ b/internal/fabric/test/helper.go @@ -54,7 +54,8 @@ func MockRPCClient(fromBlock string, withReset ...bool) *mockfabric.RPCClient { rpc.On("SubscribeEvent", mock.Anything, mock.Anything).Return(nil, roBlockEventChan, roCCEventChan, nil) rpc.On("Query", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(chaincodeResult, nil) rpc.On("QueryChainInfo", mock.Anything, mock.Anything).Return(res, nil) - rpc.On("QueryBlock", mock.Anything, mock.Anything, mock.Anything).Return(rawBlock, block, nil) + rpc.On("QueryBlock", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(rawBlock, block, nil) + rpc.On("QueryBlockByTxId", mock.Anything, mock.Anything, mock.Anything).Return(rawBlock, block, nil) rpc.On("QueryTransaction", mock.Anything, mock.Anything, mock.Anything).Return(txResult, nil) rpc.On("Unregister", mock.Anything).Return() diff --git a/internal/messages/messages.go b/internal/messages/messages.go index d5603b9..1ca6613 100644 --- a/internal/messages/messages.go +++ b/internal/messages/messages.go @@ -110,7 +110,13 @@ type GetChainInfo struct { type GetBlock struct { RequestCommon - BlockNumber uint64 `json:"blockNumber"` + BlockNumber uint64 + BlockHash []byte +} + +type GetBlockByTxId struct { + RequestCommon + TxId string } // SendTransaction message instructs the bridge to install a contract diff --git a/internal/rest/restgateway_test.go b/internal/rest/restgateway_test.go index c469ac8..0ecc02a 100644 --- a/internal/rest/restgateway_test.go +++ b/internal/rest/restgateway_test.go @@ -419,6 +419,16 @@ func TestQueryEndpoints(t *testing.T) { block := rr["block"].(map[string]interface{}) assert.Equal(float64(20), block["block_number"]) + url, _ = url.Parse(fmt.Sprintf("http://localhost:%d/blockByTxId/f008dbfcb393fd40fa14a26fc2a0aaa01327d9483576e277a0a91b042bf7612f?fly-channel=default-channel&fly-signer=user1", g.config.HTTP.Port)) + req = &http.Request{URL: url, Method: http.MethodGet, Header: header} + resp, _ = http.DefaultClient.Do(req) + assert.Equal(200, resp.StatusCode) + bodyBytes, _ = io.ReadAll(resp.Body) + result = utils.DecodePayload(bodyBytes).(map[string]interface{}) + rr = result["result"].(map[string]interface{}) + block = rr["block"].(map[string]interface{}) + assert.Equal(float64(20), block["block_number"]) + url, _ = url.Parse(fmt.Sprintf("http://localhost:%d/query?fly-channel=default-channel&fly-signer=user1&fly-chaincode=asset_transfer", g.config.HTTP.Port)) req = &http.Request{ URL: url, diff --git a/internal/rest/router.go b/internal/rest/router.go index 47d0fc2..a270a04 100644 --- a/internal/rest/router.go +++ b/internal/rest/router.go @@ -78,6 +78,7 @@ func (r *router) addRoutes() { r.httpRouter.GET("/chaininfo", r.queryChainInfo) r.httpRouter.GET("/blocks/:blockNumber", r.queryBlock) + r.httpRouter.GET("/blockByTxId/:txId", r.queryBlockByTxId) r.httpRouter.POST("/query", r.queryChaincode) r.httpRouter.POST("/transactions", r.sendTransaction) @@ -157,6 +158,12 @@ func (r *router) queryBlock(res http.ResponseWriter, req *http.Request, params h r.syncDispatcher.GetBlock(res, req, params) } +func (r *router) queryBlockByTxId(res http.ResponseWriter, req *http.Request, params httprouter.Params) { + log.Infof("--> %s %s", req.Method, req.URL) + // query requests are always synchronous + r.syncDispatcher.GetBlockByTxId(res, req, params) +} + func (r *router) queryChaincode(res http.ResponseWriter, req *http.Request, params httprouter.Params) { log.Infof("--> %s %s", req.Method, req.URL) // query requests are always synchronous diff --git a/internal/rest/sync/syncdispatcher.go b/internal/rest/sync/syncdispatcher.go index a9fb667..40a59bf 100644 --- a/internal/rest/sync/syncdispatcher.go +++ b/internal/rest/sync/syncdispatcher.go @@ -44,6 +44,7 @@ type SyncDispatcher interface { GetTxById(res http.ResponseWriter, req *http.Request, params httprouter.Params) GetChainInfo(res http.ResponseWriter, req *http.Request, params httprouter.Params) GetBlock(res http.ResponseWriter, req *http.Request, params httprouter.Params) + GetBlockByTxId(res http.ResponseWriter, req *http.Request, params httprouter.Params) } type syncDispatcher struct { @@ -254,7 +255,28 @@ func (d *syncDispatcher) GetBlock(res http.ResponseWriter, req *http.Request, pa return } - rawblock, block, err1 := d.processor.GetRPCClient().QueryBlock(msg.Headers.ChannelID, msg.BlockNumber, msg.Headers.Signer) + rawblock, block, err1 := d.processor.GetRPCClient().QueryBlock(msg.Headers.ChannelID, msg.Headers.Signer, msg.BlockNumber, msg.BlockHash) + if err1 != nil { + errors.RestErrReply(res, req, err1, 500) + return + } + var reply messages.LedgerQueryResult + result := make(map[string]interface{}) + result["raw"] = rawblock + result["block"] = block + reply.Result = result + + sendReply(res, req, reply) +} + +func (d *syncDispatcher) GetBlockByTxId(res http.ResponseWriter, req *http.Request, params httprouter.Params) { + msg, err := restutil.BuildGetBlockByTxIdMessage(res, req, params) + if err != nil { + errors.RestErrReply(res, req, err.Error, err.StatusCode) + return + } + + rawblock, block, err1 := d.processor.GetRPCClient().QueryBlockByTxId(msg.Headers.ChannelID, msg.Headers.Signer, msg.TxId) if err1 != nil { errors.RestErrReply(res, req, err1, 500) return diff --git a/internal/rest/utils/params.go b/internal/rest/utils/params.go index 9405dd0..973b311 100644 --- a/internal/rest/utils/params.go +++ b/internal/rest/utils/params.go @@ -17,6 +17,7 @@ package util import ( + "encoding/hex" "encoding/json" "fmt" "net/http" @@ -199,16 +200,50 @@ func BuildGetBlockMessage(res http.ResponseWriter, req *http.Request, params htt if signer == "" { return nil, NewRestError("Must specify the signer", 400) } - blockNumber, err := strconv.ParseUint(params.ByName("blockNumber"), 10, 64) - if err != nil { - return nil, NewRestError("Invalid block number", 400) - } msg := messages.GetBlock{} msg.Headers.ID = msgId // this could be empty msg.Headers.ChannelID = channel msg.Headers.Signer = signer - msg.BlockNumber = blockNumber + + blockNumberOrHash := params.ByName("blockNumber") + if len(blockNumberOrHash) == 64 { + // 32-byte hex string means this is a block hash + bytes, err := hex.DecodeString(blockNumberOrHash) + if err != nil { + return nil, NewRestError("Invalid block hash", 400) + } + msg.BlockHash = bytes + } else { + blockNumber, err := strconv.ParseUint(blockNumberOrHash, 10, 64) + if err != nil { + return nil, NewRestError("Invalid block number", 400) + } + msg.BlockNumber = blockNumber + } + + return &msg, nil +} + +func BuildGetBlockByTxIdMessage(res http.ResponseWriter, req *http.Request, params httprouter.Params) (*messages.GetBlockByTxId, *RestError) { + var body map[string]interface{} + err := req.ParseForm() + if err != nil { + return nil, NewRestError(err.Error(), 400) + } + channel := getFlyParam("channel", body, req) + if channel == "" { + return nil, NewRestError("Must specify the channel", 400) + } + signer := getFlyParam("signer", body, req) + if signer == "" { + return nil, NewRestError("Must specify the signer", 400) + } + + msg := messages.GetBlockByTxId{} + msg.Headers.ChannelID = channel + msg.Headers.Signer = signer + msg.TxId = params.ByName("txId") return &msg, nil } diff --git a/mocks/fabric/client/rpc_client.go b/mocks/fabric/client/rpc_client.go index 098ed0a..5eead9a 100644 --- a/mocks/fabric/client/rpc_client.go +++ b/mocks/fabric/client/rpc_client.go @@ -78,13 +78,13 @@ func (_m *RPCClient) Query(channelId string, signer string, chaincodeName string return r0, r1 } -// QueryBlock provides a mock function with given fields: channelId, blockNumber, signer -func (_m *RPCClient) QueryBlock(channelId string, blockNumber uint64, signer string) (*utils.RawBlock, *utils.Block, error) { - ret := _m.Called(channelId, blockNumber, signer) +// QueryBlock provides a mock function with given fields: channelId, signer, blocknumber, blockhash +func (_m *RPCClient) QueryBlock(channelId string, signer string, blocknumber uint64, blockhash []byte) (*utils.RawBlock, *utils.Block, error) { + ret := _m.Called(channelId, signer, blocknumber, blockhash) var r0 *utils.RawBlock - if rf, ok := ret.Get(0).(func(string, uint64, string) *utils.RawBlock); ok { - r0 = rf(channelId, blockNumber, signer) + if rf, ok := ret.Get(0).(func(string, string, uint64, []byte) *utils.RawBlock); ok { + r0 = rf(channelId, signer, blocknumber, blockhash) } else { if ret.Get(0) != nil { r0 = ret.Get(0).(*utils.RawBlock) @@ -92,8 +92,8 @@ func (_m *RPCClient) QueryBlock(channelId string, blockNumber uint64, signer str } var r1 *utils.Block - if rf, ok := ret.Get(1).(func(string, uint64, string) *utils.Block); ok { - r1 = rf(channelId, blockNumber, signer) + if rf, ok := ret.Get(1).(func(string, string, uint64, []byte) *utils.Block); ok { + r1 = rf(channelId, signer, blocknumber, blockhash) } else { if ret.Get(1) != nil { r1 = ret.Get(1).(*utils.Block) @@ -101,8 +101,40 @@ func (_m *RPCClient) QueryBlock(channelId string, blockNumber uint64, signer str } var r2 error - if rf, ok := ret.Get(2).(func(string, uint64, string) error); ok { - r2 = rf(channelId, blockNumber, signer) + if rf, ok := ret.Get(2).(func(string, string, uint64, []byte) error); ok { + r2 = rf(channelId, signer, blocknumber, blockhash) + } else { + r2 = ret.Error(2) + } + + return r0, r1, r2 +} + +// QueryBlockByTxId provides a mock function with given fields: channelId, signer, txId +func (_m *RPCClient) QueryBlockByTxId(channelId string, signer string, txId string) (*utils.RawBlock, *utils.Block, error) { + ret := _m.Called(channelId, signer, txId) + + var r0 *utils.RawBlock + if rf, ok := ret.Get(0).(func(string, string, string) *utils.RawBlock); ok { + r0 = rf(channelId, signer, txId) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*utils.RawBlock) + } + } + + var r1 *utils.Block + if rf, ok := ret.Get(1).(func(string, string, string) *utils.Block); ok { + r1 = rf(channelId, signer, txId) + } else { + if ret.Get(1) != nil { + r1 = ret.Get(1).(*utils.Block) + } + } + + var r2 error + if rf, ok := ret.Get(2).(func(string, string, string) error); ok { + r2 = rf(channelId, signer, txId) } else { r2 = ret.Error(2) } From 0b0c834abe3b1973a759eca04bf9f86705f062a3 Mon Sep 17 00:00:00 2001 From: Jim Zhang Date: Tue, 8 Mar 2022 22:30:34 -0500 Subject: [PATCH 5/5] Update openapi spec Signed-off-by: Jim Zhang --- openapi/spec.yaml | 31 ++++++++++++++++++++++++++----- 1 file changed, 26 insertions(+), 5 deletions(-) diff --git a/openapi/spec.yaml b/openapi/spec.yaml index 13d2cac..d15784d 100644 --- a/openapi/spec.yaml +++ b/openapi/spec.yaml @@ -134,16 +134,30 @@ paths: application/json: schema: $ref: "#/components/schemas/chaininfo" - /blocks/{blockNumber}: + /blocks/{blockNumberOrHash}: get: summary: "Query the block by number" parameters: - - $ref: "#/components/parameters/blockNumber" + - $ref: "#/components/parameters/blockNumberOrHash" - $ref: "#/components/parameters/channel" - $ref: "#/components/parameters/signer" responses: 200: - description: "Transaction retrieved" + description: "Block retrieved" + content: + application/json: + schema: + $ref: "#/components/schemas/get_block_output" + /blockByTxId/{txId}: + get: + summary: "Query the block by a transaction Id included in the block" + parameters: + - $ref: "#/components/parameters/txId" + - $ref: "#/components/parameters/channel" + - $ref: "#/components/parameters/signer" + responses: + 200: + description: "Block retrieved" content: application/json: schema: @@ -485,6 +499,9 @@ components: items: type: "string" description: "Parameters to pass to the chaincode function" + strongread: + type: boolean + description: By default only the client organization's first peer is contacted for the query request; set to true to contact multiple peers in the channel query_input_structured: description: "Specify a JSON schema in the headers, so that the 'args' property can be specified as a JSON object" type: "object" @@ -497,6 +514,9 @@ components: args: type: "object" description: "JSON structure for the parameters to pass to the chaincode function" + strongread: + type: boolean + description: By default only the client organization's first peer is contacted for the query request; set to true to contact multiple peers in the channel webhook_info: type: "object" properties: @@ -929,9 +949,10 @@ components: in: "query" schema: type: "string" - blockNumber: + blockNumberOrHash: + description: block number or block hash required: true - name: blockNumber + name: blockNumberOrHash in: path schema: type: string