From bd782366dd35141b66585ff0ee39b7939f384dfc Mon Sep 17 00:00:00 2001 From: Sandeep Nishad Date: Mon, 7 Aug 2023 01:20:51 +0530 Subject: [PATCH] feat(weaver-corda): support array of remote views, consequent user flow call Signed-off-by: Sandeep Nishad --- .../workflows/test_weaver-data-sharing.yaml | 16 +- .../src/main/kotlin/CordaDriver.kt | 12 +- .../imodule/corda/states/ExternalState.kt | 9 + .../corda/flows/WriteExternalStateFlows.kt | 301 +++++++++++------- .../imodule/corda/WriteExternalStateTest.kt | 97 ++++-- .../imodule/corda/WriteExternalStateTest2.kt | 8 +- .../src/main/kotlin/TestQueryFlow.kt | 2 +- .../src/main/kotlin/TestUserFlow.kt | 26 ++ .../client/AssetTransferManager.kt | 22 +- .../client/InteropManager.kt | 49 +-- .../contracts-kotlin/build.gradle | 2 + .../com/example/contract/SimpleContract.kt | 9 + .../kotlin/com/example/flow/SimpleFlow.kt | 122 +++++++ .../weaver/sdk/corda/InteroperableHelper.kt | 168 +++++++--- weaver/tests/network-setups/corda/makefile | 3 + 15 files changed, 615 insertions(+), 231 deletions(-) create mode 100644 weaver/core/network/corda-interop-app/test-cordapp/src/main/kotlin/TestUserFlow.kt diff --git a/.github/workflows/test_weaver-data-sharing.yaml b/.github/workflows/test_weaver-data-sharing.yaml index fc4d0c839f..7c00c78c5a 100644 --- a/.github/workflows/test_weaver-data-sharing.yaml +++ b/.github/workflows/test_weaver-data-sharing.yaml @@ -619,7 +619,7 @@ jobs: TOTAL=8 # CORDA-CORDA2 - ./clients/build/install/clients/bin/clients request-state localhost:9081 relay-corda2:9082/Corda_Network2/corda_network2_partya_1:10003#com.cordaSimpleApplication.flow.GetStateByKey:H 1> tmp.out + ./clients/build/install/clients/bin/clients request-state --wkey=H localhost:9081 relay-corda2:9082/Corda_Network2/corda_network2_partya_1:10003#com.cordaSimpleApplication.flow.GetStateByKey:H 1> tmp.out cat tmp.out | grep "SimpleState(key=H, value=\[SimpleState(key=H, value=1" && COUNT=$(( COUNT + 1 )) && echo "PASS" cat tmp.out @@ -629,7 +629,7 @@ jobs: # CORDA2-CORDA - NETWORK_NAME=Corda_Network2 CORDA_PORT=30006 ./clients/build/install/clients/bin/clients request-state localhost:9082 relay-corda:9081/Corda_Network/corda_partya_1:10003#com.cordaSimpleApplication.flow.GetStateByKey:C 1> tmp.out + NETWORK_NAME=Corda_Network2 CORDA_PORT=30006 ./clients/build/install/clients/bin/clients request-state --wkey=C localhost:9082 relay-corda:9081/Corda_Network/corda_partya_1:10003#com.cordaSimpleApplication.flow.GetStateByKey:C 1> tmp.out cat tmp.out | grep "SimpleState(key=C, value=\[SimpleState(key=C, value=20" && COUNT=$(( COUNT + 1 )) && echo "PASS" cat tmp.out @@ -638,7 +638,7 @@ jobs: cat tmp.out # CORDA - FABRIC1 - ./clients/build/install/clients/bin/clients request-state localhost:9081 relay-network1:9080/network1/mychannel:simplestate:Read:a 1> tmp.out + ./clients/build/install/clients/bin/clients request-state --wkey=a localhost:9081 relay-network1:9080/network1/mychannel:simplestate:Read:a 1> tmp.out cat tmp.out | grep "SimpleState(key=a, value=Arcturus" && COUNT=$(( COUNT + 1 )) && echo "PASS" cat tmp.out @@ -647,7 +647,7 @@ jobs: cat tmp.out # CORDA - FABRIC2 - ./clients/build/install/clients/bin/clients request-state localhost:9081 relay-network2:9083/network2/mychannel:simplestate:Read:Arcturus 1> tmp.out + ./clients/build/install/clients/bin/clients request-state --wkey=Arcturus localhost:9081 relay-network2:9083/network2/mychannel:simplestate:Read:Arcturus 1> tmp.out cat tmp.out | grep "SimpleState(key=Arcturus, value=17.671" && COUNT=$(( COUNT + 1 )) && echo "PASS" cat tmp.out @@ -1029,7 +1029,7 @@ jobs: TOTAL=8 # CORDA-CORDA2 - ./clients/build/install/clients/bin/clients request-state localhost:9081 localhost:9082/Corda_Network2/localhost:30006#com.cordaSimpleApplication.flow.GetStateByKey:H 1> tmp.out + ./clients/build/install/clients/bin/clients request-state --wkey=H localhost:9081 localhost:9082/Corda_Network2/localhost:30006#com.cordaSimpleApplication.flow.GetStateByKey:H 1> tmp.out cat tmp.out | grep "SimpleState(key=H, value=\[SimpleState(key=H, value=1" && COUNT=$(( COUNT + 1 )) && echo "PASS" cat tmp.out @@ -1039,7 +1039,7 @@ jobs: # CORDA2-CORDA - NETWORK_NAME=Corda_Network2 CORDA_PORT=30006 ./clients/build/install/clients/bin/clients request-state localhost:9082 localhost:9081/Corda_Network/localhost:10006#com.cordaSimpleApplication.flow.GetStateByKey:C 1> tmp.out + NETWORK_NAME=Corda_Network2 CORDA_PORT=30006 ./clients/build/install/clients/bin/clients request-state --wkey=C localhost:9082 localhost:9081/Corda_Network/localhost:10006#com.cordaSimpleApplication.flow.GetStateByKey:C 1> tmp.out cat tmp.out | grep "SimpleState(key=C, value=\[SimpleState(key=C, value=20" && COUNT=$(( COUNT + 1 )) && echo "PASS" cat tmp.out @@ -1048,7 +1048,7 @@ jobs: cat tmp.out # CORDA - FABRIC1 - ./clients/build/install/clients/bin/clients request-state localhost:9081 localhost:9080/network1/mychannel:simplestate:Read:a 1> tmp.out + ./clients/build/install/clients/bin/clients request-state --wkey=a localhost:9081 localhost:9080/network1/mychannel:simplestate:Read:a 1> tmp.out cat tmp.out | grep "SimpleState(key=a, value=Arcturus" && COUNT=$(( COUNT + 1 )) && echo "PASS" cat tmp.out @@ -1057,7 +1057,7 @@ jobs: cat tmp.out # CORDA - FABRIC2 - ./clients/build/install/clients/bin/clients request-state localhost:9081 localhost:9083/network2/mychannel:simplestate:Read:Arcturus 1> tmp.out + ./clients/build/install/clients/bin/clients request-state --wkey=Arcturus localhost:9081 localhost:9083/network2/mychannel:simplestate:Read:Arcturus 1> tmp.out cat tmp.out | grep "SimpleState(key=Arcturus, value=17.671" && COUNT=$(( COUNT + 1 )) && echo "PASS" cat tmp.out diff --git a/weaver/core/drivers/corda-driver/src/main/kotlin/CordaDriver.kt b/weaver/core/drivers/corda-driver/src/main/kotlin/CordaDriver.kt index eaaa2b3f8f..7acb838674 100644 --- a/weaver/core/drivers/corda-driver/src/main/kotlin/CordaDriver.kt +++ b/weaver/core/drivers/corda-driver/src/main/kotlin/CordaDriver.kt @@ -18,6 +18,7 @@ import java.util.* import org.hyperledger.cacti.weaver.imodule.corda.flows.HandleExternalRequest import org.hyperledger.cacti.weaver.sdk.corda.InteroperableHelper +import org.hyperledger.cacti.weaver.sdk.corda.RelayOptions import org.hyperledger.cacti.weaver.protos.common.query.QueryOuterClass import org.hyperledger.cacti.weaver.protos.common.state.State import org.hyperledger.cacti.weaver.protos.corda.ViewDataOuterClass @@ -143,13 +144,16 @@ fun createAggregatedCordaView(views: List) : Either // TODO: if the first relay address fails, retry with other relay addresses in the list. + val relayOptions = RelayOptions( + useTlsForRelay = System.getenv("RELAY_TLS")?.toBoolean() ?: false, + relayTlsTrustStorePath = System.getenv("RELAY_TLSCA_TRUST_STORE")?.toString() ?: "", + relayTlsTrustStorePassword = System.getenv("RELAY_TLSCA_TRUST_STORE_PASSWORD")?.toString() ?: "", + tlsCACertPathsForRelay = System.getenv("RELAY_TLSCA_CERT_PATHS")?.toString() ?: "" + ) val channel = InteroperableHelper.getChannelToRelay( relayAddresses[0].host, relayAddresses[0].port, - System.getenv("RELAY_TLS")?.toBoolean() ?: false, - System.getenv("RELAY_TLSCA_TRUST_STORE")?.toString() ?: "", - System.getenv("RELAY_TLSCA_TRUST_STORE_PASSWORD")?.toString() ?: "", - System.getenv("RELAY_TLSCA_CERT_PATHS")?.toString() ?: "") + relayOptions) GrpcClient(channel) } } catch (e: Exception) { diff --git a/weaver/core/network/corda-interop-app/interop-contracts/src/main/kotlin/org/hyperledger/cacti/weaver/imodule/corda/states/ExternalState.kt b/weaver/core/network/corda-interop-app/interop-contracts/src/main/kotlin/org/hyperledger/cacti/weaver/imodule/corda/states/ExternalState.kt index 53423445ad..acfacc24e7 100644 --- a/weaver/core/network/corda-interop-app/interop-contracts/src/main/kotlin/org/hyperledger/cacti/weaver/imodule/corda/states/ExternalState.kt +++ b/weaver/core/network/corda-interop-app/interop-contracts/src/main/kotlin/org/hyperledger/cacti/weaver/imodule/corda/states/ExternalState.kt @@ -11,6 +11,7 @@ import net.corda.core.contracts.BelongsToContract import net.corda.core.contracts.LinearState import net.corda.core.contracts.UniqueIdentifier import net.corda.core.identity.Party +import net.corda.core.serialization.CordaSerializable /** * A representation of state and proof retrieved from an external network. @@ -27,3 +28,11 @@ data class ExternalState( override val linearId: UniqueIdentifier = UniqueIdentifier(), override val participants: List = listOf() ) : LinearState + +@CordaSerializable +data class InvocationSpec( + val disableInvocation: Boolean = true, + val invokeFlowName: String = "", + val invokeFlowArgs: List = listOf(), + val interopArgsIndex: Int = -1 +) diff --git a/weaver/core/network/corda-interop-app/interop-workflows/src/main/kotlin/org/hyperledger/cacti/weaver/imodule/corda/flows/WriteExternalStateFlows.kt b/weaver/core/network/corda-interop-app/interop-workflows/src/main/kotlin/org/hyperledger/cacti/weaver/imodule/corda/flows/WriteExternalStateFlows.kt index 34dc878591..e4bae5e1c4 100644 --- a/weaver/core/network/corda-interop-app/interop-workflows/src/main/kotlin/org/hyperledger/cacti/weaver/imodule/corda/flows/WriteExternalStateFlows.kt +++ b/weaver/core/network/corda-interop-app/interop-workflows/src/main/kotlin/org/hyperledger/cacti/weaver/imodule/corda/flows/WriteExternalStateFlows.kt @@ -34,6 +34,7 @@ import org.hyperledger.cacti.weaver.protos.corda.ViewDataOuterClass import org.hyperledger.cacti.weaver.protos.common.interop_payload.InteropPayloadOuterClass import org.hyperledger.cacti.weaver.imodule.corda.contracts.ExternalStateContract import org.hyperledger.cacti.weaver.imodule.corda.states.ExternalState +import org.hyperledger.cacti.weaver.imodule.corda.states.InvocationSpec /** @@ -44,15 +45,17 @@ import org.hyperledger.cacti.weaver.imodule.corda.states.ExternalState * @property view The view received from the foreign network. * @property address The address of the view, containing a location, securityDomain and view segment. */ + @InitiatingFlow @StartableByRPC class WriteExternalStateInitiator - @JvmOverloads - constructor( - val viewBase64String: String, - val address: String, - val participants: List = listOf() - ): FlowLogic>() { +@JvmOverloads +constructor( + val views64: Array, + val addresses: Array, + val invokeObject: InvocationSpec = InvocationSpec(), + val participants: List = listOf() +): FlowLogic>() { /** * The call() method captures the logic to perform the proof validation and the construction of @@ -61,51 +64,85 @@ class WriteExternalStateInitiator * @return Returns the linearId of the newly created [ExternalState]. */ @Suspendable - override fun call(): Either = try { - println("External network returned view: $viewBase64String\n") + override fun call(): Either { + try { + var externalStatesLinearIdArray = Array(addresses.size) { UniqueIdentifier() } + for (i in 0..addresses.size-1) { + val viewBase64String = views64[i] + val address = addresses[i] + + println("External network returned view #${i}: $viewBase64String\n") - val view = State.View.parseFrom(Base64.getDecoder().decode(viewBase64String)) + val view = State.View.parseFrom(Base64.getDecoder().decode(viewBase64String)) - // 1. Verify the proofs that are returned - verifyView(view, address, serviceHub).flatMap { - println("View verification successful. Creating state to be stored in the vault.") - // 2. Create the state to be stored - var externalStateParticipants = if (participants.contains(ourIdentity)) { participants } else { listOf(ourIdentity) + participants } - val state = ExternalState( - linearId = UniqueIdentifier(), - participants = externalStateParticipants, - meta = view.meta.toByteArray(), - state = view.data.toByteArray()) - println("Storing ExternalState in the vault:\n\tLinear Id = ${state.linearId}\n\tParticipants = ${state.participants}\n\tMeta = ${view.meta}\tState = ${Base64.getEncoder().encodeToString(state.state)}\n") + // 1. Verify the proofs that are returned + val verifyResult = verifyView(view, address, serviceHub).fold({ + println("View verification failed with error: ${it.message}") + Left(Error("View verification failed with error: ${it.message}")) + }, { + println("View verification successful. Creating state to be stored in the vault.") + // 2. Create the state to be stored + var externalStateParticipants = if (participants.contains(ourIdentity)) { participants } else { listOf(ourIdentity) + participants } + val state = ExternalState( + linearId = UniqueIdentifier(), + participants = externalStateParticipants, + meta = view.meta.toByteArray(), + state = view.data.toByteArray()) + println("Storing ExternalState in the vault:\n\tLinear Id = ${state.linearId}\n\tParticipants = ${state.participants}\n\tMeta = ${view.meta}\tState = ${Base64.getEncoder().encodeToString(state.state)}\n") - // 3. Build the transaction - val notary = serviceHub.networkMapCache.notaryIdentities.first() - val command = Command(ExternalStateContract.Commands.Create(), ourIdentity.owningKey) - val txBuilder = TransactionBuilder(notary) - .addOutputState(state, ExternalStateContract.ID) - .addCommand(command) + // 3. Build the transaction + val notary = serviceHub.networkMapCache.notaryIdentities.first() + val command = Command(ExternalStateContract.Commands.Create(), externalStateParticipants.map { it.owningKey }) + val txBuilder = TransactionBuilder(notary) + .addOutputState(state, ExternalStateContract.ID) + .addCommand(command) + + // 4. Verify and collect signatures on the transaction + txBuilder.verify(serviceHub) + val tx = serviceHub.signInitialTransaction(txBuilder) + var sessions = listOf() + for (party in externalStateParticipants) { + if (!ourIdentity.equals(party)) { + println("Sending Tx to ${party}") + val session = initiateFlow(party) + session.send(address) + sessions += session + } + } + + val stx = subFlow(CollectSignaturesFlow(tx, sessions)) + val storedExternalState = subFlow(FinalityFlow( + stx, + sessions)).tx.outputStates.first() as ExternalState - // 4. Verify and collect signatures on the transaction - txBuilder.verify(serviceHub) - val tx = serviceHub.signInitialTransaction(txBuilder) - var sessions = listOf() - for (party in externalStateParticipants) { - if (!ourIdentity.equals(party)) { - val session = initiateFlow(party) - session.send(address) - sessions += session + // 5. Return the linearId of the state + println("State stored successfully.\n") + externalStatesLinearIdArray[i] = storedExternalState.linearId + Right(Unit) + }) + if (verifyResult.isLeft()) { + return verifyResult } } - - val stx = subFlow(CollectSignaturesFlow(tx, sessions)) - subFlow(FinalityFlow(stx, sessions)) - - // 5. Return the linearId of the state - println("State stored successfully.\n") - Right(state.linearId) + if (invokeObject.disableInvocation) { + println("Invocation disabled!") + return Right(externalStatesLinearIdArray) + } else { + val argsMList = invokeObject.invokeFlowArgs.toMutableList() + argsMList[invokeObject.interopArgsIndex] = externalStatesLinearIdArray + println("Calling Workflow: ${invokeObject.invokeFlowName} with args: ${argsMList}") + val userFlow = resolveGenericFlow(invokeObject.invokeFlowName, argsMList.toList()) + return userFlow.fold({ + println("Error in resolving user flow: ${it.message}") + Left(Error("Error in resolving user flow: ${it.message}")) + }, { + Right(subFlow(it)) + }) + } + } catch (e: Exception) { + println("Error in WriteExternalState: ${e.message}") + return Left(Error("Error in WriteExternalState: ${e.message}")) } - } catch (e: Exception) { - Left(Error("Failed to store state in ledger: ${e.message}")) } } @@ -129,7 +166,7 @@ class WriteExternalStateAcceptor(val session: FlowSession) : FlowLogic { - val cordaViewData = ViewDataOuterClass.ViewData.parseFrom(viewDataByteArray) - println("cordaViewData: $cordaViewData") - val interopPayload = InteropPayloadOuterClass.InteropPayload.parseFrom(cordaViewData.notarizedPayloadsList[0].payload) - val payloadString = interopPayload.payload.toStringUtf8() - println("response from remote: ${payloadString}.\n") - println("query address: ${interopPayload.address}.\n") - val viewData = ViewDataOuterClass.ViewData.newBuilder() - .addAllNotarizedPayloads(cordaViewData.notarizedPayloadsList) - .build() - - return viewData.toByteArray() - } - State.Meta.Protocol.FABRIC -> { - val fabricViewData = ViewData.FabricView.parseFrom(viewDataByteArray) - println("fabricViewData: $fabricViewData") - // TODO: We assume here that the response payloads have been matched earlier, but perhaps we should match them here too - val chaincodeAction = ProposalPackage.ChaincodeAction.parseFrom(fabricViewData.endorsedProposalResponsesList[0].payload.extension) - val interopPayload = InteropPayloadOuterClass.InteropPayload.parseFrom(chaincodeAction.response.payload) - val payloadString = interopPayload.payload.toStringUtf8() - println("response from remote: ${payloadString}.\n") - println("query address: ${interopPayload.address}.\n") - - val securityDomain = interopPayload.address.split("/")[1] - val proofStringPrefix = "Verified Proof: Endorsed by members: [" - val proofStringSuffix = "] of security domain: $securityDomain" - var mspIdList = "" - fabricViewData.endorsedProposalResponsesList.map { endorsedProposalResponse -> - val endorsement = endorsedProposalResponse.endorsement - val mspId = Identities.SerializedIdentity.parseFrom(endorsement.endorser).mspid - if (mspIdList.isNotEmpty()) { - mspIdList += ", " - } - mspIdList += mspId - } - val proofMessage = proofStringPrefix + mspIdList + proofStringSuffix - println("Proof Message: ${proofMessage}.\n") - - var notarizationList: List = listOf() - - fabricViewData.endorsedProposalResponsesList.map { endorsedProposalResponse -> - val endorsement = endorsedProposalResponse.endorsement - val serializedIdentity = Identities.SerializedIdentity.parseFrom(endorsement.endorser) - val mspId = serializedIdentity.mspid - val certString = Base64.getEncoder().encodeToString(serializedIdentity.idBytes.toByteArray()) - val signature = Base64.getEncoder().encodeToString(endorsement.signature.toByteArray()) - - val notarization = ViewDataOuterClass.ViewData.NotarizedPayload.newBuilder() - .setCertificate(certString) - .setSignature(signature) - .setId(mspId) - .setPayload(chaincodeAction.response.payload) - .build() - notarizationList = notarizationList + notarization - } - - val viewData = ViewDataOuterClass.ViewData.newBuilder() - .addAllNotarizedPayloads(notarizationList) - .build() - - return viewData.toByteArray() - } - else -> { - println("GetExternalStateByLinearId Error: Unrecognized protocol.\n") - throw IllegalArgumentException("Error: Unrecognized protocol.") - } - } - - + return getViewFromExternalState(states.first().state.data) } } @@ -282,3 +246,102 @@ class GetExternalStateAndRefByLinearId( } } + +fun getViewFromExternalState(state: ExternalState): ByteArray { + val viewMetaByteArray = state.meta + val viewDataByteArray = state.state + val meta = State.Meta.parseFrom(viewMetaByteArray) + var viewData = when (meta.protocol) { + State.Meta.Protocol.CORDA -> { + val cordaViewData = ViewDataOuterClass.ViewData.parseFrom(viewDataByteArray) + println("cordaViewData: $cordaViewData") + val interopPayload = InteropPayloadOuterClass.InteropPayload.parseFrom(cordaViewData.notarizedPayloadsList[0].payload) + val payloadString = interopPayload.payload.toStringUtf8() + println("response from remote: ${payloadString}.\n") + println("query address: ${interopPayload.address}.\n") + ViewDataOuterClass.ViewData.newBuilder() + .addAllNotarizedPayloads(cordaViewData.notarizedPayloadsList) + .build() + } + State.Meta.Protocol.FABRIC -> { + val fabricViewData = ViewData.FabricView.parseFrom(viewDataByteArray) + println("fabricViewData: $fabricViewData") + // TODO: We assume here that the response payloads have been matched earlier, but perhaps we should match them here too + val chaincodeAction = ProposalPackage.ChaincodeAction.parseFrom(fabricViewData.endorsedProposalResponsesList[0].payload.extension) + val interopPayload = InteropPayloadOuterClass.InteropPayload.parseFrom(chaincodeAction.response.payload) + val payloadString = interopPayload.payload.toStringUtf8() + println("response from remote: ${payloadString}.\n") + println("query address: ${interopPayload.address}.\n") + + val securityDomain = interopPayload.address.split("/")[1] + val proofStringPrefix = "Verified Proof: Endorsed by members: [" + val proofStringSuffix = "] of security domain: $securityDomain" + var mspIdList = "" + fabricViewData.endorsedProposalResponsesList.map { endorsedProposalResponse -> + val endorsement = endorsedProposalResponse.endorsement + val mspId = Identities.SerializedIdentity.parseFrom(endorsement.endorser).mspid + if (mspIdList.isNotEmpty()) { + mspIdList += ", " + } + mspIdList += mspId + } + val proofMessage = proofStringPrefix + mspIdList + proofStringSuffix + println("Proof Message: ${proofMessage}.\n") + + var notarizationList: List = listOf() + + fabricViewData.endorsedProposalResponsesList.map { endorsedProposalResponse -> + val endorsement = endorsedProposalResponse.endorsement + val serializedIdentity = Identities.SerializedIdentity.parseFrom(endorsement.endorser) + val mspId = serializedIdentity.mspid + val certString = Base64.getEncoder().encodeToString(serializedIdentity.idBytes.toByteArray()) + val signature = Base64.getEncoder().encodeToString(endorsement.signature.toByteArray()) + + val notarization = ViewDataOuterClass.ViewData.NotarizedPayload.newBuilder() + .setCertificate(certString) + .setSignature(signature) + .setId(mspId) + .setPayload(chaincodeAction.response.payload) + .build() + notarizationList = notarizationList + notarization + } + + ViewDataOuterClass.ViewData.newBuilder() + .addAllNotarizedPayloads(notarizationList) + .build() + } + else -> { + println("GetExternalStateByLinearId Error: Unrecognized protocol.\n") + throw IllegalArgumentException("Error: Unrecognized protocol.") + } + } + return viewData.toByteArray() +} + +fun getPayloadFromView(viewBytes: ByteArray): ByteArray { + val externalStateView = ViewDataOuterClass.ViewData.parseFrom(viewBytes) + val interopPayload = InteropPayloadOuterClass.InteropPayload.parseFrom(externalStateView.notarizedPayloadsList[0].payload) + return interopPayload.payload.toByteArray() +} + +@Suppress("UNCHECKED_CAST") +fun resolveGenericFlow(flowName: String, flowArgs: List): Either> = try { + println("Attempting to resolve $flowName to a Corda flow.") + val kotlinClass = Class.forName(flowName).kotlin + val ctor = kotlinClass.constructors.first() + if (ctor.parameters.size != flowArgs.size) { + println("Flow Resolution Error: wrong number of arguments supplied.\n") + Left(Error("Flow Resolution Error: wrong number of arguments supplied")) + } else { + println("Resolved flow to ${ctor}") + try { + Right(ctor.call(*flowArgs.toTypedArray()) as FlowLogic) + } catch (e: Exception) { + println("Flow Resolution Error: $flowName not a flow: ${e.message}.\n") + Left(Error("Flow Resolution Error: $flowName not a flow")) + } + } +} catch (e: Exception) { + println("Flow Resolution Error: ${e.message} \n") + Left(Error("Flow Resolution Error: ${e.message}")) +} diff --git a/weaver/core/network/corda-interop-app/interop-workflows/src/test/kotlin/org/hyperledger/cacti/weaver/imodule/corda/WriteExternalStateTest.kt b/weaver/core/network/corda-interop-app/interop-workflows/src/test/kotlin/org/hyperledger/cacti/weaver/imodule/corda/WriteExternalStateTest.kt index 0ade66426b..28f37e20d0 100644 --- a/weaver/core/network/corda-interop-app/interop-workflows/src/test/kotlin/org/hyperledger/cacti/weaver/imodule/corda/WriteExternalStateTest.kt +++ b/weaver/core/network/corda-interop-app/interop-workflows/src/test/kotlin/org/hyperledger/cacti/weaver/imodule/corda/WriteExternalStateTest.kt @@ -20,20 +20,24 @@ import kotlin.test.assertEquals import kotlin.test.assertTrue import com.google.gson.Gson import com.google.gson.annotations.SerializedName +import org.hyperledger.cacti.weaver.imodule.corda.states.InvocationSpec class WriteExternalStateTest { companion object { lateinit var network: MockNetwork lateinit var partyA: StartedMockNode + lateinit var partyB: StartedMockNode @BeforeClass @JvmStatic fun setup() { network = MockNetwork(MockNetworkParameters(cordappsForAllNodes = listOf( TestCordapp.findCordapp("org.hyperledger.cacti.weaver.imodule.corda.contracts"), - TestCordapp.findCordapp("org.hyperledger.cacti.weaver.imodule.corda.flows") + TestCordapp.findCordapp("org.hyperledger.cacti.weaver.imodule.corda.flows"), + TestCordapp.findCordapp("org.hyperledger.cacti.weaver.imodule.corda.test") ))) partyA = network.createPartyNode() + partyB = network.createPartyNode() network.runNetwork() } @@ -45,6 +49,9 @@ class WriteExternalStateTest { } } + val partya = partyA.info.legalIdentities.first() + val partyb = partyB.info.legalIdentities.first() + val fabricNetwork = "network1" val fabricRelayEndpoint = "relay-network1:9080" val fabricViewAddress = "mychannel:simplestate:Read:a" @@ -64,7 +71,8 @@ class WriteExternalStateTest { identifiers = listOf(Identifier( fabricViewAddress, Policy("signature", listOf("Org1MSP")) - )) + )), + participants = listOf(partya, partyb) ) val fabricMembership = MembershipState( @@ -73,7 +81,8 @@ class WriteExternalStateTest { value = fabricCert, type = "ca", chain = listOf("") - )) + )), + participants = listOf(partya, partyb) ) val cordaTestDataJSON = javaClass.getResource("/test_data/corda_viewdata.json").readText(Charsets.UTF_8) @@ -84,7 +93,8 @@ class WriteExternalStateTest { identifiers = listOf(Identifier( "localhost:10006#com.cordaSimpleApplication.flow.GetStateByKey:*", Policy("signature", listOf("PartyA")) - )) + )), + participants = listOf(partya, partyb) ) val rootCACert = javaClass.getResource("/test_data/corda_cacert_root.pem").readText(Charsets.UTF_8) @@ -103,30 +113,42 @@ class WriteExternalStateTest { value = "", type = "certificate", chain = certChain - )) + )), + participants = listOf(partya, partyb) ) - - @Test - fun `WriteExternalState tests`() { - // Corda happy case + val disableInvokeObject = InvocationSpec() + val invokeObject = InvocationSpec( + disableInvocation = false, + invokeFlowName = "org.hyperledger.cacti.weaver.imodule.corda.test.UserFlow", + invokeFlowArgs = listOf(arrayOf()), + interopArgsIndex = 0 + ) + + fun initCordaPolicies() { // Create corda membership and verificationPolicy in vault - println(fabricCert) val future = partyA.startFlow(CreateVerificationPolicyState(cordaVerificationPolicy)) network.runNetwork() val linearId = future.getOrThrow() assert(linearId.isRight()) { "CreateVerificationPolicyState should return a Right(UniqueIdentifier)" } + + val futureb = partyB.startFlow(CreateVerificationPolicyState(cordaVerificationPolicy)) + network.runNetwork() + val linearIdb = futureb.getOrThrow() + assert(linearIdb.isRight()) { "CreateVerificationPolicyState should return a Right(UniqueIdentifier)" } val future2 = partyA.startFlow(CreateMembershipState(cordaMembership)) network.runNetwork() val linearId2 = future2.getOrThrow() assert(linearId2.isRight()) { "CreateMembershipState should return a Right(UniqueIdentifier)" } - val happyFuture = partyA.startFlow(WriteExternalStateInitiator(cordaTestData.B64View, "localhost:9081/Corda_Network/localhost:10006#com.cordaSimpleApplication.flow.GetStateByKey:H")) + val future2b = partyB.startFlow(CreateMembershipState(cordaMembership)) network.runNetwork() - val happyLinearId = happyFuture.getOrThrow() - assertTrue(happyLinearId.isRight()) - - // Fabric happy case + val linearId2b = future2b.getOrThrow() + assert(linearId2b.isRight()) { "CreateMembershipState should return a Right(UniqueIdentifier)" } + } + + + fun initFabricPolicies() { // Create fabric membership and verificationPolicy in vault val future3 = partyA.startFlow(CreateVerificationPolicyState(fabricVerificationPolicy)) network.runNetwork() @@ -137,8 +159,42 @@ class WriteExternalStateTest { network.runNetwork() val linearId4 = future4.getOrThrow() assert(linearId4.isRight()) { "CreateMembershipState should return a Right(UniqueIdentifier)" } + + // Create fabric membership and verificationPolicy in vault + val future3b = partyB.startFlow(CreateVerificationPolicyState(fabricVerificationPolicy)) + network.runNetwork() + val linearId3b = future3b.getOrThrow() + assert(linearId3b.isRight()) { "CreateVerificationPolicyState should return a Right(UniqueIdentifier)" } + + val future4b = partyB.startFlow(CreateMembershipState(fabricMembership)) + network.runNetwork() + val linearId4b = future4b.getOrThrow() + assert(linearId4b.isRight()) { "CreateMembershipState should return a Right(UniqueIdentifier)" } + } + + @Test + fun `WriteExternalState tests`() { + // Corda happy case + initCordaPolicies() - val happyFuture2 = partyA.startFlow(WriteExternalStateInitiator(fabricTestData.B64View, "${fabricRelayEndpoint}/${fabricNetwork}/${fabricViewAddress}")) + val happyFuture = partyA.startFlow(WriteExternalStateInitiator(arrayOf(cordaTestData.B64View), arrayOf("localhost:9081/Corda_Network/localhost:10006#com.cordaSimpleApplication.flow.GetStateByKey:H"), disableInvokeObject, listOf(partyb))) + network.runNetwork() + val happyLinearId = happyFuture.getOrThrow() + assertTrue(happyLinearId.isRight()) + + // Corda happy case2 + val happyFuture_1 = partyA.startFlow(WriteExternalStateInitiator(arrayOf(cordaTestData.B64View), arrayOf("localhost:9081/Corda_Network/localhost:10006#com.cordaSimpleApplication.flow.GetStateByKey:H"), invokeObject, listOf(partyb))) + network.runNetwork() + val happyResponse_1 = happyFuture_1.getOrThrow() + assertTrue(happyResponse_1.isRight()) + happyResponse_1.fold({ println("Error") },{ + assertEquals(it, true) + }) + + // Fabric happy case + initFabricPolicies() + + val happyFuture2 = partyA.startFlow(WriteExternalStateInitiator(arrayOf(fabricTestData.B64View), arrayOf("${fabricRelayEndpoint}/${fabricNetwork}/${fabricViewAddress}"), disableInvokeObject, listOf(partyb))) network.runNetwork() val happyLinearId2 = happyFuture2.getOrThrow() assertTrue(happyLinearId2.isRight()) @@ -153,12 +209,17 @@ class WriteExternalStateTest { network.runNetwork() val linearId5 = future5.getOrThrow() assert(linearId5.isRight()) { "UpdateMembershipState should return a Right(UniqueIdentifier)" } + + val future5b = partyB.startFlow(UpdateMembershipState(invalidMembership)) + network.runNetwork() + val linearId5b = future5b.getOrThrow() + assert(linearId5b.isRight()) { "UpdateMembershipState should return a Right(UniqueIdentifier)" } - val unhappyFuture = partyA.startFlow(WriteExternalStateInitiator(cordaTestData.B64View, "localhost:9081/Corda_Network/localhost:10006#com.cordaSimpleApplication.flow.GetStateByKey:H")) + val unhappyFuture = partyA.startFlow(WriteExternalStateInitiator(arrayOf(cordaTestData.B64View), arrayOf("localhost:9081/Corda_Network/localhost:10006#com.cordaSimpleApplication.flow.GetStateByKey:H"), disableInvokeObject, listOf(partyb))) network.runNetwork() val unhappyLinearId = unhappyFuture.getOrThrow() assertTrue(unhappyLinearId.isLeft()) - assertEquals("Parse Error: failed to parse requester certificate: Illegal base64 character 5f", unhappyLinearId.fold({ it.message }, { "" })) + assertEquals("View verification failed with error: Parse Error: failed to parse requester certificate: Illegal base64 character 5f", unhappyLinearId.fold({ it.message }, { "" })) // Test case: Invalid policy in verification policy diff --git a/weaver/core/network/corda-interop-app/interop-workflows/src/test/kotlin/org/hyperledger/cacti/weaver/imodule/corda/WriteExternalStateTest2.kt b/weaver/core/network/corda-interop-app/interop-workflows/src/test/kotlin/org/hyperledger/cacti/weaver/imodule/corda/WriteExternalStateTest2.kt index 0424880006..d278409e08 100644 --- a/weaver/core/network/corda-interop-app/interop-workflows/src/test/kotlin/org/hyperledger/cacti/weaver/imodule/corda/WriteExternalStateTest2.kt +++ b/weaver/core/network/corda-interop-app/interop-workflows/src/test/kotlin/org/hyperledger/cacti/weaver/imodule/corda/WriteExternalStateTest2.kt @@ -126,7 +126,7 @@ class WriteExternalStateTest2 { val linearId2 = future2.getOrThrow() assert(linearId2.isRight()) { "CreateMembershipState should return a Right(UniqueIdentifier)" } - val happyFuture = partyA.startFlow(WriteExternalStateInitiator(cordaTestData.B64View, "localhost:9081/Corda_Network/localhost:10006#com.cordaSimpleApplication.flow.GetStateByKey:H")) + val happyFuture = partyA.startFlow(WriteExternalStateInitiator(arrayOf(cordaTestData.B64View), arrayOf("localhost:9081/Corda_Network/localhost:10006#com.cordaSimpleApplication.flow.GetStateByKey:H"))) network.runNetwork() val happyLinearId = happyFuture.getOrThrow() assertTrue(happyLinearId.isRight()) @@ -143,7 +143,7 @@ class WriteExternalStateTest2 { val linearId4 = future4.getOrThrow() assert(linearId4.isRight()) { "CreateMembershipState should return a Right(UniqueIdentifier)" } - val happyFuture2 = partyA.startFlow(WriteExternalStateInitiator(fabricTestData.B64View, "${fabricRelayEndpoint}/${fabricNetwork}/${fabricViewAddress}")) + val happyFuture2 = partyA.startFlow(WriteExternalStateInitiator(arrayOf(fabricTestData.B64View), arrayOf("${fabricRelayEndpoint}/${fabricNetwork}/${fabricViewAddress}"))) network.runNetwork() val happyLinearId2 = happyFuture2.getOrThrow() assertTrue(happyLinearId2.isRight()) @@ -159,11 +159,11 @@ class WriteExternalStateTest2 { val linearId5 = future5.getOrThrow() assert(linearId5.isRight()) { "UpdateMembershipState should return a Right(UniqueIdentifier)" } - val unhappyFuture = partyA.startFlow(WriteExternalStateInitiator(cordaTestData.B64View, "localhost:9081/Corda_Network/localhost:10006#com.cordaSimpleApplication.flow.GetStateByKey:H")) + val unhappyFuture = partyA.startFlow(WriteExternalStateInitiator(arrayOf(cordaTestData.B64View), arrayOf("localhost:9081/Corda_Network/localhost:10006#com.cordaSimpleApplication.flow.GetStateByKey:H"))) network.runNetwork() val unhappyLinearId = unhappyFuture.getOrThrow() assertTrue(unhappyLinearId.isLeft()) - assertEquals("Parse Error: failed to parse requester certificate: Illegal base64 character 5f", unhappyLinearId.fold({ it.message }, { "" })) + assertEquals("View verification failed with error: Parse Error: failed to parse requester certificate: Illegal base64 character 5f", unhappyLinearId.fold({ it.message }, { "" })) // Test case: Invalid policy in verification policy diff --git a/weaver/core/network/corda-interop-app/test-cordapp/src/main/kotlin/TestQueryFlow.kt b/weaver/core/network/corda-interop-app/test-cordapp/src/main/kotlin/TestQueryFlow.kt index 6648882ee4..e06fe8c3dc 100644 --- a/weaver/core/network/corda-interop-app/test-cordapp/src/main/kotlin/TestQueryFlow.kt +++ b/weaver/core/network/corda-interop-app/test-cordapp/src/main/kotlin/TestQueryFlow.kt @@ -55,4 +55,4 @@ class QueryState() : FlowLogic() { } return states.toString().toByteArray() } -} \ No newline at end of file +} diff --git a/weaver/core/network/corda-interop-app/test-cordapp/src/main/kotlin/TestUserFlow.kt b/weaver/core/network/corda-interop-app/test-cordapp/src/main/kotlin/TestUserFlow.kt new file mode 100644 index 0000000000..fbfd6a67a5 --- /dev/null +++ b/weaver/core/network/corda-interop-app/test-cordapp/src/main/kotlin/TestUserFlow.kt @@ -0,0 +1,26 @@ +/* + * Copyright IBM Corp. All Rights Reserved. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.hyperledger.cacti.weaver.imodule.corda.test + +import co.paralleluniverse.fibers.Suspendable +import net.corda.core.flows.* +import net.corda.core.contracts.UniqueIdentifier + +/* + * Example of user flow for WriteExternalState + */ +@StartableByRPC +class UserFlow( + val externalStateLinearIds: Array +) : FlowLogic() { + @Suspendable + override fun call(): Boolean { + if (externalStateLinearIds.size > 0) + return true + return false + } +} \ No newline at end of file diff --git a/weaver/samples/corda/corda-simple-application/clients/src/main/kotlin/com/cordaSimpleApplication/client/AssetTransferManager.kt b/weaver/samples/corda/corda-simple-application/clients/src/main/kotlin/com/cordaSimpleApplication/client/AssetTransferManager.kt index e69f0fd948..95d88716ac 100644 --- a/weaver/samples/corda/corda-simple-application/clients/src/main/kotlin/com/cordaSimpleApplication/client/AssetTransferManager.kt +++ b/weaver/samples/corda/corda-simple-application/clients/src/main/kotlin/com/cordaSimpleApplication/client/AssetTransferManager.kt @@ -24,8 +24,12 @@ import kotlin.system.exitProcess import net.corda.core.messaging.startFlow import java.util.Base64 import net.corda.core.identity.CordaX500Name +import net.corda.core.identity.Party +import net.corda.core.contracts.UniqueIdentifier + import org.hyperledger.cacti.weaver.sdk.corda.AssetTransferSDK import org.hyperledger.cacti.weaver.sdk.corda.InteroperableHelper +import org.hyperledger.cacti.weaver.sdk.corda.RelayOptions import com.cordaSimpleApplication.contract.AssetContract import com.cordaSimpleApplication.contract.BondAssetContract import java.util.Calendar @@ -36,7 +40,6 @@ import com.cordaSimpleApplication.flow.GetAssetClaimStatusByPledgeId import com.cordaSimpleApplication.flow.GetBondAssetPledgeStatusByPledgeId import com.cordaSimpleApplication.flow.GetBondAssetClaimStatusByPledgeId import com.cordaSimpleApplication.flow.GetAssetPledgeStatusByPledgeId -import net.corda.core.identity.Party class AssetTransferCommand : CliktCommand(name = "transfer", help ="Manages simple asset transfer") { override fun run() { @@ -613,21 +616,24 @@ fun requestStateFromRemoteNetwork( val networkName = System.getenv("NETWORK_NAME") ?: "Corda_Network" try { + val relayOptions = RelayOptions( + useTlsForRelay = config["RELAY_TLS"]!!.toBoolean(), + relayTlsTrustStorePath = config["RELAY_TLSCA_TRUST_STORE"]!!, + relayTlsTrustStorePassword = config["RELAY_TLSCA_TRUST_STORE_PASSWORD"]!!, + tlsCACertPathsForRelay = config["RELAY_TLSCA_CERT_PATHS"]!! + ) InteroperableHelper.interopFlow( proxy, + arrayOf(externalStateAddress), localRelayAddress, - externalStateAddress, networkName, - externalStateParticipants, - config["RELAY_TLS"]!!.toBoolean(), - config["RELAY_TLSCA_TRUST_STORE"]!!, - config["RELAY_TLSCA_TRUST_STORE_PASSWORD"]!!, - config["RELAY_TLSCA_CERT_PATHS"]!! + externalStateParticipants = externalStateParticipants, + relayOptions = relayOptions ).fold({ println("Error in Interop Flow: ${it.message}") exitProcess(1) }, { - linearId = it.toString() + linearId = (it as Array)[0].toString() // Get the first linearId, as only requested one external state println("Interop flow successful and external-state was stored with linearId $linearId.\n") }) } catch (e: Exception) { diff --git a/weaver/samples/corda/corda-simple-application/clients/src/main/kotlin/com/cordaSimpleApplication/client/InteropManager.kt b/weaver/samples/corda/corda-simple-application/clients/src/main/kotlin/com/cordaSimpleApplication/client/InteropManager.kt index 3af05e9540..2ff6ad26dc 100644 --- a/weaver/samples/corda/corda-simple-application/clients/src/main/kotlin/com/cordaSimpleApplication/client/InteropManager.kt +++ b/weaver/samples/corda/corda-simple-application/clients/src/main/kotlin/com/cordaSimpleApplication/client/InteropManager.kt @@ -12,17 +12,20 @@ import arrow.core.Right import com.github.ajalt.clikt.core.CliktCommand import com.github.ajalt.clikt.core.requireObject import com.github.ajalt.clikt.parameters.arguments.argument +import com.github.ajalt.clikt.parameters.options.option import io.grpc.ManagedChannelBuilder import java.lang.Exception import kotlinx.coroutines.* import net.corda.core.messaging.startFlow import java.util.* import net.corda.core.identity.Party +import net.corda.core.transactions.SignedTransaction import com.cordaSimpleApplication.flow.CreateState import com.cordaSimpleApplication.state.SimpleState import org.hyperledger.cacti.weaver.sdk.corda.InteroperableHelper +import org.hyperledger.cacti.weaver.sdk.corda.RelayOptions /** * The CLI command used to trigger a request for state from an external network. @@ -39,6 +42,7 @@ import org.hyperledger.cacti.weaver.sdk.corda.InteroperableHelper */ class RequestStateCommand : CliktCommand(help = "Requests state from a foreign network. " + "Requires the port number for the local relay and remote relay name") { + val key: String? by option("-wk", "--wkey", help="key to write the external state") val localRelayAddress: String by argument() val externalStateAddress: String by argument() val config by requireObject>() @@ -50,32 +54,39 @@ class RequestStateCommand : CliktCommand(help = "Requests state from a foreign n password = "test", rpcPort = config["CORDA_PORT"]!!.toInt()) try { + val relayOptions = RelayOptions( + useTlsForRelay = config["RELAY_TLS"]!!.toBoolean(), + relayTlsTrustStorePath = config["RELAY_TLSCA_TRUST_STORE"]!!, + relayTlsTrustStorePassword = config["RELAY_TLSCA_TRUST_STORE_PASSWORD"]!!, + tlsCACertPathsForRelay = config["RELAY_TLSCA_CERT_PATHS"]!! + ) + val k: String = if (key != null) { + key!! + } else { + "external_state" + } + val args: List = listOf(k, arrayOf()) InteroperableHelper.interopFlow( - rpc.proxy, - localRelayAddress, - externalStateAddress, + rpc.proxy, + arrayOf(externalStateAddress), + localRelayAddress, networkName, - listOf(), - config["RELAY_TLS"]!!.toBoolean(), - config["RELAY_TLSCA_TRUST_STORE"]!!, - config["RELAY_TLSCA_TRUST_STORE_PASSWORD"]!!, - config["RELAY_TLSCA_CERT_PATHS"]!! + false, + "com.cordaSimpleApplication.flow.CreateFromExternalState", + args, + 1, + externalStateParticipants = listOf(), + relayOptions = relayOptions ).fold({ println("Error in Interop Flow: ${it.message}") }, { - val linearId = it.toString() - println("Interop flow successful and external-state was stored with linearId $linearId.\n") - val value = InteroperableHelper.getExternalStatePayloadString( - rpc.proxy, - linearId - ) - val key = externalStateAddress.split(":").last() - val createdState = rpc.proxy.startFlow(::CreateState, key, value) - .returnValue.get().tx.outputStates.first() as SimpleState - println("Created simplestate: ${createdState}") + println("Successful response: ${it}") + val stx = it as SignedTransaction + val createdState = stx.tx.outputStates.first() as SimpleState + println("Created simplestate: ${createdState}") }) } catch (e: Exception) { - println("Error: ${e.toString()}") + println("Error in request state: ${e.toString()}") } finally { rpc.close() } diff --git a/weaver/samples/corda/corda-simple-application/contracts-kotlin/build.gradle b/weaver/samples/corda/corda-simple-application/contracts-kotlin/build.gradle index 34a99f6a32..b30a10ceff 100644 --- a/weaver/samples/corda/corda-simple-application/contracts-kotlin/build.gradle +++ b/weaver/samples/corda/corda-simple-application/contracts-kotlin/build.gradle @@ -22,6 +22,8 @@ dependencies { cordaCompile "$corda_core_release_group:corda-core:$corda_core_release_version" testCompile "$corda_release_group:corda-node-driver:$corda_release_version" + + implementation(group: 'org.hyperledger.cacti.weaver.imodule.corda', name: 'interop-contracts', version: "$cacti_version") } publishing { diff --git a/weaver/samples/corda/corda-simple-application/contracts-kotlin/src/main/kotlin/com/example/contract/SimpleContract.kt b/weaver/samples/corda/corda-simple-application/contracts-kotlin/src/main/kotlin/com/example/contract/SimpleContract.kt index e0709c0182..98a711548b 100644 --- a/weaver/samples/corda/corda-simple-application/contracts-kotlin/src/main/kotlin/com/example/contract/SimpleContract.kt +++ b/weaver/samples/corda/corda-simple-application/contracts-kotlin/src/main/kotlin/com/example/contract/SimpleContract.kt @@ -13,6 +13,8 @@ import net.corda.core.contracts.requireSingleCommand import net.corda.core.contracts.requireThat import net.corda.core.transactions.LedgerTransaction +import org.hyperledger.cacti.weaver.imodule.corda.states.ExternalState + /** * SimpleContract defines the rules for managing [SimpleState]. * @@ -52,6 +54,12 @@ class SimpleContract : Contract { val out = tx.outputsOfType().single() "The participant must be the signer." using (command.signers.containsAll(out.participants.map { it.owningKey })) } + is Commands.CreateFromExternal -> requireThat { + "One external state as input should be consumed when issuing a SimpleState from ExternalState." using (tx.inputsOfType().size == 1) + "Only one output state should be created." using (tx.outputs.size == 1) + val out = tx.outputsOfType().single() + "The participant must be the signer." using (command.signers.containsAll(out.participants.map { it.owningKey })) + } is Commands.Update -> requireThat { "There should be one input state" using (tx.inputs.size == 1) "The input state should be of type SimpleState" using (tx.inputs[0].state.data is SimpleState) @@ -79,6 +87,7 @@ class SimpleContract : Contract { */ interface Commands : CommandData { class Create : Commands + class CreateFromExternal: Commands class Update : Commands class Delete : Commands } diff --git a/weaver/samples/corda/corda-simple-application/workflows-kotlin/src/main/kotlin/com/example/flow/SimpleFlow.kt b/weaver/samples/corda/corda-simple-application/workflows-kotlin/src/main/kotlin/com/example/flow/SimpleFlow.kt index 03f5805a3a..dab2259208 100644 --- a/weaver/samples/corda/corda-simple-application/workflows-kotlin/src/main/kotlin/com/example/flow/SimpleFlow.kt +++ b/weaver/samples/corda/corda-simple-application/workflows-kotlin/src/main/kotlin/com/example/flow/SimpleFlow.kt @@ -12,6 +12,7 @@ import com.cordaSimpleApplication.state.SimpleState import javassist.NotFoundException import net.corda.core.contracts.Command import net.corda.core.contracts.UniqueIdentifier +import net.corda.core.contracts.requireThat import net.corda.core.flows.* import net.corda.core.node.services.Vault import net.corda.core.node.services.queryBy @@ -20,8 +21,15 @@ import net.corda.core.transactions.SignedTransaction import net.corda.core.transactions.TransactionBuilder import net.corda.core.utilities.ProgressTracker import net.corda.core.utilities.ProgressTracker.Step +import net.corda.core.serialization.CordaSerializable import java.util.Arrays.asList +import org.hyperledger.cacti.weaver.imodule.corda.states.ExternalState +import org.hyperledger.cacti.weaver.imodule.corda.contracts.ExternalStateContract +import org.hyperledger.cacti.weaver.imodule.corda.flows.GetExternalStateAndRefByLinearId +import org.hyperledger.cacti.weaver.imodule.corda.flows.getViewFromExternalState +import org.hyperledger.cacti.weaver.imodule.corda.flows.getPayloadFromView + /** * The CreateState flow is used to create a new [SimpleState]. * @@ -89,6 +97,120 @@ class CreateState(val key: String, val value: String) : FlowLogic +) : FlowLogic() { + /** + * The progress tracker checkpoints each stage of the flow and outputs the specified messages when each + * checkpoint is reached in the code. See the 'progressTracker.currentStep' expressions within the call() function. + */ + companion object { + object GENERATING_TRANSACTION : Step("Generating transaction based on new simple state.") + object VERIFYING_TRANSACTION : Step("Verifying contract constraints.") + object SIGNING_TRANSACTION : Step("Signing transaction with our private key.") + object FINALISING_TRANSACTION : Step("Obtaining notary signature and recording transaction.") { + override fun childProgressTracker() = FinalityFlow.tracker() + } + fun tracker() = ProgressTracker( + GENERATING_TRANSACTION, + VERIFYING_TRANSACTION, + SIGNING_TRANSACTION, + FINALISING_TRANSACTION + ) + } + + override val progressTracker = tracker() + + /** + * The call() method captures the logic to build and sign a transaction that creates a [SimpleState]. + * + * @return returns the signed transaction. + */ + @Suspendable + override fun call(): SignedTransaction { + println("Creating state from External State...") + // Obtain a reference to the notary we want to use. + val notary = serviceHub.networkMapCache.notaryIdentities[0] + val externalStateAndRef = subFlow(GetExternalStateAndRefByLinearId(externalStateLinearIds[0].toString())) + val value = getPayloadFromView(getViewFromExternalState(externalStateAndRef.state.data)).toString(Charsets.UTF_8) + + progressTracker.currentStep = GENERATING_TRANSACTION + // Generate an unsigned transaction. + val simpleState = SimpleState(key, value, serviceHub.myInfo.legalIdentities.first()) + println("Storing simple state in the ledger: $simpleState\n") + val txCommand = Command(SimpleContract.Commands.CreateFromExternal(), simpleState.participants.map { it.owningKey }) + val externalStateConsumeCommand = Command(ExternalStateContract.Commands.Consume(), + externalStateAndRef.state.data.participants.map { it.owningKey } + ) + val txBuilder = TransactionBuilder(notary) + .addInputState(externalStateAndRef) + .addOutputState(simpleState, SimpleContract.ID) + .addCommand(txCommand) + .addCommand(externalStateConsumeCommand) + + // Stage 2. + progressTracker.currentStep = VERIFYING_TRANSACTION + // Verify that the transaction is valid. + txBuilder.verify(serviceHub) + + // Stage 3. + progressTracker.currentStep = SIGNING_TRANSACTION + // Sign the transaction. + val signedTx = serviceHub.signInitialTransaction(txBuilder) + + // Stage 5. + progressTracker.currentStep = FINALISING_TRANSACTION + var sessions = listOf() + for (party in externalStateAndRef.state.data.participants) { + if (!ourIdentity.equals(party)) { + val session = initiateFlow(party) + sessions += session + } + } + + val sTx = subFlow(CollectSignaturesFlow(signedTx, sessions)) + + // Notarise and record the transaction in the party's vault. + return subFlow(FinalityFlow(sTx, sessions, FINALISING_TRANSACTION.childProgressTracker())) + } +} +@InitiatedBy(CreateFromExternalState::class) +class CreateFromExternalStateAcceptor(val session: FlowSession) : FlowLogic() { + @Suspendable + override fun call(): SignedTransaction { + val signTransactionFlow = object : SignTransactionFlow(session) { + override fun checkTransaction(stx: SignedTransaction) = requireThat { + val tx = stx.tx.toLedgerTransaction(serviceHub) + + val externalState = tx.inputsOfType().single() + val out = tx.outputsOfType().single() + + val externalValue = getPayloadFromView(getViewFromExternalState(externalState)).toString(Charsets.UTF_8) + + "Value in SimpleState should match with external state" using (out.value == externalValue) + } + } + try { + val txId = subFlow(signTransactionFlow).id + println("${ourIdentity} signed transaction.") + return subFlow(ReceiveFinalityFlow(session, expectedTxId = txId)) + } catch (e: Exception) { + val errorMsg = "Error signing create from external state transaction: ${e.message}\n" + println(errorMsg) + throw Error(errorMsg) + } + } +} + /** * The UpdateState flow is used to update an existing [SimpleState]. * diff --git a/weaver/sdks/corda/src/main/kotlin/org/hyperledger/cacti/weaver/sdk/corda/InteroperableHelper.kt b/weaver/sdks/corda/src/main/kotlin/org/hyperledger/cacti/weaver/sdk/corda/InteroperableHelper.kt index b5f7b0b36c..12a76250fb 100644 --- a/weaver/sdks/corda/src/main/kotlin/org/hyperledger/cacti/weaver/sdk/corda/InteroperableHelper.kt +++ b/weaver/sdks/corda/src/main/kotlin/org/hyperledger/cacti/weaver/sdk/corda/InteroperableHelper.kt @@ -29,14 +29,22 @@ import net.corda.core.messaging.startFlow import net.corda.core.messaging.CordaRPCOps import net.corda.core.identity.Party +import org.hyperledger.cacti.weaver.imodule.corda.states.InvocationSpec import org.hyperledger.cacti.weaver.imodule.corda.flows.CreateExternalRequest import org.hyperledger.cacti.weaver.imodule.corda.flows.WriteExternalStateInitiator import org.hyperledger.cacti.weaver.imodule.corda.flows.GetExternalStateByLinearId import org.hyperledger.cacti.weaver.protos.common.interop_payload.InteropPayloadOuterClass -import org.hyperledger.cacti.weaver.protos.common.state.State +import org.hyperledger.cacti.weaver.protos.common.state.State.RequestState import org.hyperledger.cacti.weaver.protos.corda.ViewDataOuterClass import org.hyperledger.cacti.weaver.protos.networks.networks.Networks +data class RelayOptions( + val useTlsForRelay: Boolean = false, + val relayTlsTrustStorePath: String = "", + val relayTlsTrustStorePassword: String = "", + val tlsCACertPathsForRelay: String = "" +) + class InteroperableHelper { companion object { private val logger = LoggerFactory.getLogger(InteroperableHelper::class.java) @@ -85,22 +93,19 @@ class InteroperableHelper { fun getChannelToRelay ( localRelayHost: String, localRelayPort: Int, - useTlsForRelay: Boolean, - relayTlsTrustStorePath: String, - relayTlsTrustStorePassword: String, - tlsCACertPathsForRelay: String + relayOptions: RelayOptions ): ManagedChannel { - if (useTlsForRelay) { + if (relayOptions.useTlsForRelay) { var trustStore: KeyStore = KeyStore.getInstance(KeyStore.getDefaultType()) - if (relayTlsTrustStorePath.length > 0) { - if (relayTlsTrustStorePassword.length == 0) { + if (relayOptions.relayTlsTrustStorePath.length > 0) { + if (relayOptions.relayTlsTrustStorePassword.length == 0) { throw Exception("Password not supplied for JKS trust store") } - val trustStream = FileInputStream(relayTlsTrustStorePath) - trustStore.load(trustStream, relayTlsTrustStorePassword.toCharArray()) - } else if (tlsCACertPathsForRelay.length > 0) { + val trustStream = FileInputStream(relayOptions.relayTlsTrustStorePath) + trustStore.load(trustStream, relayOptions.relayTlsTrustStorePassword.toCharArray()) + } else if (relayOptions.tlsCACertPathsForRelay.length > 0) { trustStore.load(null, null) - val tlsCACertPaths = tlsCACertPathsForRelay.split(":") + val tlsCACertPaths = relayOptions.tlsCACertPathsForRelay.split(":") var tlsCACertCounter = 0 for (tlsCACertPath in tlsCACertPaths) { val certFactory = CertificateFactory.getInstance("X509") @@ -146,15 +151,17 @@ class InteroperableHelper { @JvmStatic @JvmOverloads fun interopFlow ( proxy: CordaRPCOps, + externalStateAddresses: Array, localRelayEndpoint: String, - externalStateAddress: String, networkName: String, - externalStateParticipants: List = listOf() , - useTlsForRelay: Boolean = false, - relayTlsTrustStorePath: String = "", - relayTlsTrustStorePassword: String = "", - tlsCACertPathsForRelay: String = "" - ): Either { + returnWithoutLocalInvocation: Boolean = true, + invokeFlowName: String = "", + invokeFlowArgs: List = listOf(), + interopArgsIndex: Int = -1, + externalStateParticipants: List = listOf(), + relayOptions: RelayOptions = RelayOptions() + ): Either { + // Create Relay client instance val localRelayHost = localRelayEndpoint.split(":").first() val localRelayPort = localRelayEndpoint.split(":").last().toInt() var channel: ManagedChannel @@ -162,38 +169,76 @@ class InteroperableHelper { channel = getChannelToRelay( localRelayHost, localRelayPort, - useTlsForRelay, - relayTlsTrustStorePath, - relayTlsTrustStorePassword, - tlsCACertPathsForRelay) + relayOptions) } catch(e: Exception) { logger.error("Error creating channel to relay: ${e.message}\n") return Left(Error("Error creating channel to relay: ${e.message}\n")) } val client = RelayClient(channel) + var responseStates = Array(externalStateAddresses.size) { null } + var responseError: Error? = null + + // Fetch remote views + runBlocking { + coroutineScope { // limits the scope of concurrency + externalStateAddresses.mapIndexed { index, externalStateAddress -> + async { // async means "concurrently", context goes here + getRemoteView( + proxy, + networkName, + externalStateAddress, + client + ).fold({ + logger.error("Error in Interop Flow for address ${externalStateAddress}: ${it.message}\n") + responseError = Error("Error in Interop Flow for address ${externalStateAddress}: ${it.message}\n") + },{ state -> + responseStates[index] = state + }) + } + }.awaitAll() // waits all of them + } + } + + if (responseError is Error){ + return Left(responseError!!) + } + + // Write external state + val views64 = responseStates.mapNotNull { + Base64.getEncoder().encodeToString(it!!.view.toByteArray()) + }.toTypedArray() + + return writeExternalStateToVault( + proxy, + views64, + externalStateAddresses, + externalStateParticipants, + returnWithoutLocalInvocation, + invokeFlowName, + invokeFlowArgs, + interopArgsIndex + ) + } + + fun getRemoteView( + proxy: CordaRPCOps, + networkName: String, + externalStateAddress: String, + client: RelayClient + ): Either { val eitherErrorQuery = constructNetworkQuery(proxy, externalStateAddress, networkName) logger.debug("Corda network returned: $eitherErrorQuery \n") - val result = eitherErrorQuery.fold({ + return eitherErrorQuery.fold({ logger.error("error ${it}") Left(it) }, { networkQuery -> logger.debug("Network query: $networkQuery") var eitherErrorResult = runBlocking { val ack = async { client.requestState(networkQuery) }.await() - pollForState(ack.requestId, client).fold({ - logger.error("Error in Interop Flow: ${it.message}\n") - Left(Error("Error in Interop Flow: ${it.message}\n")) - }, { state -> - writeExternalStateToVault( - proxy, - state, - externalStateAddress, - externalStateParticipants) - }) + pollForState(ack.requestId, client) } eitherErrorResult }) - return result } /** @@ -337,7 +382,7 @@ class InteroperableHelper { * Will have status "PENDING", "PENDING_ACK", "COMPLETED" or "ERROR". * @return Returns the request state when it has status "COMPLETED" or "ERROR". */ - suspend fun pollForState(requestId: String, client: RelayClient, retryCount: Int = 0): Either = coroutineScope { + suspend fun pollForState(requestId: String, client: RelayClient, retryCount: Int = 0): Either = coroutineScope { val timeout = 30 val delayTime = 500L val num = 30*1000/delayTime @@ -374,23 +419,46 @@ class InteroperableHelper { */ fun writeExternalStateToVault( proxy: CordaRPCOps, - requestState: State.RequestState, - address: String, - externalStateParticipants: List = listOf() - ): Either { + views64: Array, + addresses: Array, + externalStateParticipants: List = listOf(), + returnWithoutLocalInvocation: Boolean = true, + invokeFlowName: String = "", + invokeFlowArgs: List = listOf(), + interopArgsIndex: Int = -1 + ): Either { return try { logger.debug("Sending response to Corda for view verification.\n") - val stateId = runCatching { - val viewBase64String = Base64.getEncoder().encodeToString(requestState.view.toByteArray()) - proxy.startFlow(::WriteExternalStateInitiator, viewBase64String, address, externalStateParticipants) - .returnValue.get() - }.fold({ - it.map { linearId -> - logger.debug("Verification was successful and external-state was stored with linearId $linearId.\n") - linearId.toString() - } + val invokeObject = InvocationSpec( + disableInvocation = returnWithoutLocalInvocation, + invokeFlowName = invokeFlowName, + invokeFlowArgs = invokeFlowArgs, + interopArgsIndex = interopArgsIndex + ) + logger.debug("Invoke Object: ${invokeObject}") + val stateId: Either = runCatching { + proxy.startFlow(::WriteExternalStateInitiator, + views64, + addresses, + invokeObject, + externalStateParticipants + ).returnValue.get() + }.fold({ + it.fold({ + logger.error("WriteExternalState flow error: ${it.message}\n") + Left(Error("WriteExternalState flow error: ${it.message}\n")) + }, { result: Any -> + if (returnWithoutLocalInvocation) { + logger.debug("Verification was successful and external-state was stored with array of linearIds $result.\n") + Right(result) + } else { + logger.debug("Verification was successful and called flow: $invokeFlowName with result: $result.\n") + Right(result) + } + }) }, { - Left(Error("Corda Network Error: Error running WriteExternalStateInitiator flow: ${it.message}\n")) + logger.error("Corda Network Error: Error running WriteExternalState flow: ${it.message}\n") + Left(Error("Corda Network Error: Error running WriteExternalState flow: ${it.message}\n")) }) stateId } catch (e: Exception) { diff --git a/weaver/tests/network-setups/corda/makefile b/weaver/tests/network-setups/corda/makefile index edfe9902cd..2f24107177 100644 --- a/weaver/tests/network-setups/corda/makefile +++ b/weaver/tests/network-setups/corda/makefile @@ -60,6 +60,9 @@ restart-with-new-interop-app: stop cd ../../../core/network/corda-interop-app && ./gradlew clean jar ./scripts/get-cordapps.sh $(APP_NAME) local ./scripts/start-nodes.sh $(APP_NAME) $(PROFILE) + ./scripts/start-nodes.sh $(APP_NAME) $(PROFILE) Corda_Network2 + ./scripts/check-nodes-status.sh $(PROFILE) + ./scripts/check-nodes-status.sh $(PROFILE) Corda_Network2 .PHONY: stop-network1 stop-network1: