Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

new typed-protocols for cardano-node #3845

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import Cardano.Tracing.OrphanInstances.Consensus ()
import Cardano.Tracing.OrphanInstances.Network ()
import Cardano.Tracing.OrphanInstances.Shelley ()

import Ouroboros.Network.Protocol.TxSubmission2.Type (TokBlockingStyle (..))
import Ouroboros.Network.Protocol.TxSubmission2.Type (SingBlockingStyle (..))

import Cardano.Api

Expand Down Expand Up @@ -121,11 +121,11 @@ mkSubmissionSummary ssThreadName startTime reportsRefs
walletTxSource :: forall era. WalletScript era -> TpsThrottle -> TxSource era
walletTxSource walletScript tpsThrottle = Active $ worker walletScript
where
worker :: forall m blocking . MonadIO m => WalletScript era -> TokBlockingStyle blocking -> Req -> m (TxSource era, [Tx era])
worker :: forall m blocking . MonadIO m => WalletScript era -> SingBlockingStyle blocking -> Req -> m (TxSource era, [Tx era])
worker script blocking req = do
(done, txCount) <- case blocking of
TokBlocking -> liftIO $ consumeTxsBlocking tpsThrottle req
TokNonBlocking -> liftIO $ consumeTxsNonBlocking tpsThrottle req
SingBlocking -> liftIO $ consumeTxsBlocking tpsThrottle req
SingNonBlocking -> liftIO $ consumeTxsNonBlocking tpsThrottle req
(txList, newScript) <- liftIO $ unFold script txCount
case done of
Stop -> return (Exhausted, txList)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ import Ouroboros.Network.Protocol.TxSubmission2.Client (ClientStIdle (
ClientStTxs (..),
TxSubmissionClient (..))
import Ouroboros.Network.Protocol.TxSubmission2.Type (BlockingReplyList (..),
TokBlockingStyle (..), TxSizeInBytes)
SingBlockingStyle (..), TxSizeInBytes)

import Cardano.Api
import Cardano.Api.Shelley (fromShelleyTxId, toConsensusGenTx)
Expand All @@ -75,14 +75,14 @@ data TxSource era
= Exhausted
| Active (ProduceNextTxs era)

type ProduceNextTxs era = (forall m blocking . MonadIO m => TokBlockingStyle blocking -> Req -> m (TxSource era, [Tx era]))
type ProduceNextTxs era = (forall m blocking . MonadIO m => SingBlockingStyle blocking -> Req -> m (TxSource era, [Tx era]))

produceNextTxs :: forall m blocking era . MonadIO m => TokBlockingStyle blocking -> Req -> LocalState era -> m (LocalState era, [Tx era])
produceNextTxs :: forall m blocking era . MonadIO m => SingBlockingStyle blocking -> Req -> LocalState era -> m (LocalState era, [Tx era])
produceNextTxs blocking req (txProducer, unack, stats) = do
(newTxProducer, txList) <- produceNextTxs' blocking req txProducer
return ((newTxProducer, unack, stats), txList)

produceNextTxs' :: forall m blocking era . MonadIO m => TokBlockingStyle blocking -> Req -> TxSource era -> m (TxSource era, [Tx era])
produceNextTxs' :: forall m blocking era . MonadIO m => SingBlockingStyle blocking -> Req -> TxSource era -> m (TxSource era, [Tx era])
produceNextTxs' _ _ Exhausted = return (Exhausted, [])
produceNextTxs' blocking req (Active callback) = callback blocking req

Expand All @@ -104,10 +104,10 @@ txSubmissionClient tr bmtr initialTxSource endOfProtocolCallback =
TxSubmissionClient $
pure $ client (initialTxSource, UnAcked [], SubmissionThreadStats 0 0 0)
where
discardAcknowledged :: TokBlockingStyle a -> Ack -> LocalState era -> m (LocalState era)
discardAcknowledged :: SingBlockingStyle a -> Ack -> LocalState era -> m (LocalState era)
discardAcknowledged blocking (Ack ack) (txSource, UnAcked unAcked, stats) = do
when (tokIsBlocking blocking && ack /= length unAcked) $ do
let err = "decideAnnouncement: TokBlocking, but length unAcked != ack"
let err = "decideAnnouncement: SingBlocking, but length unAcked != ack"
traceWith bmtr (TraceBenchTxSubError err)
fail (T.unpack err)
let (stillUnacked, acked) = L.splitAtEnd ack unAcked
Expand All @@ -128,7 +128,7 @@ txSubmissionClient tr bmtr initialTxSource endOfProtocolCallback =

requestTxIds :: forall blocking.
LocalState era
-> TokBlockingStyle blocking
-> SingBlockingStyle blocking
-> Word16
-> Word16
-> m (ClientStTxIds blocking (GenTxId CardanoBlock) (GenTx CardanoBlock) m ())
Expand All @@ -147,15 +147,15 @@ txSubmissionClient tr bmtr initialTxSource endOfProtocolCallback =
traceWith bmtr $ SubmissionClientUnAcked (getTxId . getTxBody <$> outs)

case blocking of
TokBlocking -> case NE.nonEmpty newTxs of
SingBlocking -> case NE.nonEmpty newTxs of
Nothing -> do
traceWith tr EndOfProtocol
endOfProtocolCallback stats
pure $ SendMsgDone ()
(Just txs) -> pure $ SendMsgReplyTxIds
(BlockingReply $ txToIdSize <$> txs)
(client stateC)
TokNonBlocking -> pure $ SendMsgReplyTxIds
SingNonBlocking -> pure $ SendMsgReplyTxIds
(NonBlockingReply $ txToIdSize <$> newTxs)
(client stateC)

Expand Down Expand Up @@ -203,17 +203,17 @@ txSubmissionClient tr bmtr initialTxSource endOfProtocolCallback =
fromGenTxId (Block.GenTxIdBabbage (Mempool.ShelleyTxId i)) = fromShelleyTxId i
fromGenTxId _ = error "TODO: fix incomplete match"

tokIsBlocking :: TokBlockingStyle a -> Bool
tokIsBlocking :: SingBlockingStyle a -> Bool
tokIsBlocking = \case
TokBlocking -> True
TokNonBlocking -> False
SingBlocking -> True
SingNonBlocking -> False

reqIdsTrace :: Ack -> Req -> TokBlockingStyle a -> NodeToNodeSubmissionTrace
reqIdsTrace :: Ack -> Req -> SingBlockingStyle a -> NodeToNodeSubmissionTrace
reqIdsTrace ack req = \case
TokBlocking -> ReqIdsBlocking ack req
TokNonBlocking -> ReqIdsNonBlocking ack req
SingBlocking -> ReqIdsBlocking ack req
SingNonBlocking -> ReqIdsNonBlocking ack req

idListTrace :: ToAnnce tx -> TokBlockingStyle a -> NodeToNodeSubmissionTrace
idListTrace :: ToAnnce tx -> SingBlockingStyle a -> NodeToNodeSubmissionTrace
idListTrace (ToAnnce toAnn) = \case
TokBlocking -> IdsListBlocking $ length toAnn
TokNonBlocking -> IdsListNonBlocking $ length toAnn
SingBlocking -> IdsListBlocking $ length toAnn
SingNonBlocking -> IdsListNonBlocking $ length toAnn
14 changes: 8 additions & 6 deletions cabal.project
Original file line number Diff line number Diff line change
Expand Up @@ -263,8 +263,8 @@ source-repository-package
source-repository-package
type: git
location: https://github.com/input-output-hk/ouroboros-network
tag: c764553561bed8978d2c6753d1608dc65449617a
--sha256: 0hdh7xdrvxw943r6qr0xr4kwszindh5mnsn1lww6qdnxnmn7wcsc
tag: c7b4ead782e88238714db4a305d68a6fbdef7692
--sha256: 0h79izbcyylf38slncm3vh9vq89hy0phy2ps6q3bpklaz02x12j3
subdir:
monoidal-synchronisation
network-mux
Expand All @@ -290,11 +290,13 @@ source-repository-package
source-repository-package
type: git
location: https://github.com/input-output-hk/typed-protocols
tag: 181601bc3d9e9d21a671ce01e0b481348b3ca104
--sha256: 1lr97b2z7l0rpsmmz92rsv27qzd5vavz10cf7n25svya4kkiysp5
tag: e808e5c084593055fd34709cd159fa9e74c843e1
--sha256: 01b035im4lxahdwdmmlfg3cjc32mx7p2y0fk64m7vb3wqx0nvys5
subdir:
typed-protocols
typed-protocols-cborg
typed-protocols-stateful
typed-protocols-stateful-cborg
typed-protocols-examples

source-repository-package
Expand All @@ -314,8 +316,8 @@ source-repository-package
source-repository-package
type: git
location: https://github.com/input-output-hk/ekg-forward
tag: 297cd9db5074339a2fb2e5ae7d0780debb670c63
--sha256: 1zcwry3y5rmd9lgxy89wsb3k4kpffqji35dc7ghzbz603y1gy24g
tag: a7b401a95f1963352fbfd940e65f0d609386c2fd
--sha256: 17yxa4panvgfa4q3czpr3k03hpd1bxz6d1jpr4dhhqx1sagp4lri

source-repository-package
type: git
Expand Down
7 changes: 5 additions & 2 deletions cardano-api/src/Cardano/Api/ChainSync/ClientPipelined.hs
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,17 @@ module Cardano.Api.ChainSync.ClientPipelined (
, pipelineDecisionLowHighMark

-- * Type level natural numbers
, Queue (..)
, SingQueueF (..)
, F (..)
, N (..)
, Nat (..)
, natToInt

-- * Utilities
, mapChainSyncClientPipelined
) where

import Network.TypedProtocol.Pipelined (N (..), Nat (..), natToInt)
import Network.TypedProtocol.Core
import Ouroboros.Network.Protocol.ChainSync.ClientPipelined
import Ouroboros.Network.Protocol.ChainSync.PipelineDecision
import Data.Type.Nat
35 changes: 19 additions & 16 deletions cardano-api/src/Cardano/Api/IPC.hs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ import Control.Tracer (nullTracer)

import Ouroboros.Consensus.Shelley.Ledger.SupportsProtocol ()
import qualified Ouroboros.Network.Block as Net
import qualified Ouroboros.Network.Driver.Stateful as Stateful
import qualified Ouroboros.Network.Mux as Net
import Ouroboros.Network.NodeToClient (NodeToClientProtocols (..),
NodeToClientVersionData (..))
Expand All @@ -102,7 +103,7 @@ import Ouroboros.Network.Protocol.ChainSync.Client as Net.Sync
import Ouroboros.Network.Protocol.ChainSync.ClientPipelined as Net.SyncP
import Ouroboros.Network.Protocol.LocalStateQuery.Client (LocalStateQueryClient (..))
import qualified Ouroboros.Network.Protocol.LocalStateQuery.Client as Net.Query
import Ouroboros.Network.Protocol.LocalStateQuery.Type (AcquireFailure (..))
import Ouroboros.Network.Protocol.LocalStateQuery.Type (AcquireFailure (..), State(StateIdle))
import qualified Ouroboros.Network.Protocol.LocalStateQuery.Type as Net.Query
import Ouroboros.Network.Protocol.LocalTxMonitor.Client (LocalTxMonitorClient (..),
localTxMonitorClientPeer)
Expand Down Expand Up @@ -241,9 +242,7 @@ connectToLocalNodeWithVersion LocalNodeConnectInfo {
mkVersionedProtocols localNodeNetworkId ptcl clients'

mkVersionedProtocols :: forall block.
( Consensus.ShowQuery (Consensus.Query block)
, ProtocolClient block
)
ProtocolClient block
=> NetworkId
-> ProtocolClientInfoArgs block
-> (NodeToClientVersion -> LocalNodeClientProtocolsForBlock block)
Expand Down Expand Up @@ -295,7 +294,7 @@ mkVersionedProtocols networkid ptcl unversionedClients =
cChainSyncCodec
(Net.Sync.chainSyncClientPeer client)
LocalChainSyncClientPipelined clientPipelined
-> Net.MuxPeerPipelined
-> Net.MuxPeer
nullTracer
cChainSyncCodec
(Net.SyncP.chainSyncClientPeerPipelined clientPipelined)
Expand All @@ -311,20 +310,24 @@ mkVersionedProtocols networkid ptcl unversionedClients =

, localStateQueryProtocol =
Net.InitiatorProtocolOnly $
Net.MuxPeer
nullTracer
cStateQueryCodec
(maybe Net.localStateQueryPeerNull
Net.Query.localStateQueryClientPeer
localStateQueryClientForBlock)
Net.MuxPeerRaw $ \channel ->
Stateful.runPeer
nullTracer
cStateQueryCodec
channel
StateIdle
(maybe Net.localStateQueryPeerNull
Net.Query.localStateQueryClientPeer
localStateQueryClientForBlock)

, localTxMonitorProtocol =
Net.InitiatorProtocolOnly $
Net.MuxPeer
nullTracer
cTxMonitorCodec
(maybe Net.localTxMonitorPeerNull
localTxMonitorClientPeer
localTxMonitoringClientForBlock)
nullTracer
cTxMonitorCodec
(maybe Net.localTxMonitorPeerNull
localTxMonitorClientPeer
localTxMonitoringClientForBlock)
}
where
Consensus.Codecs {
Expand Down
Loading