Skip to content

Commit

Permalink
Updated dependencies
Browse files Browse the repository at this point in the history
`cardano-cli`: `ouroboros-network` exposes now
* `mapChainSyncClientSt`
* `mapChainSyncClientPipelinedSt`
Which are used to implement:
* `chainSyncClientWithLedgerState`
* `chainSyncClientPipelinedWithLedgerState`
  • Loading branch information
coot committed May 9, 2022
1 parent 24ad920 commit f209d10
Show file tree
Hide file tree
Showing 35 changed files with 733 additions and 903 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import Cardano.Tracing.OrphanInstances.Consensus ()
import Cardano.Tracing.OrphanInstances.Network ()
import Cardano.Tracing.OrphanInstances.Shelley ()

import Ouroboros.Network.Protocol.TxSubmission2.Type (TokBlockingStyle (..))
import Ouroboros.Network.Protocol.TxSubmission2.Type (SingBlockingStyle (..))

import Cardano.Api

Expand Down Expand Up @@ -121,11 +121,11 @@ mkSubmissionSummary ssThreadName startTime reportsRefs
walletTxSource :: forall era. WalletScript era -> TpsThrottle -> TxSource era
walletTxSource walletScript tpsThrottle = Active $ worker walletScript
where
worker :: forall m blocking . MonadIO m => WalletScript era -> TokBlockingStyle blocking -> Req -> m (TxSource era, [Tx era])
worker :: forall m blocking . MonadIO m => WalletScript era -> SingBlockingStyle blocking -> Req -> m (TxSource era, [Tx era])
worker script blocking req = do
(done, txCount) <- case blocking of
TokBlocking -> liftIO $ consumeTxsBlocking tpsThrottle req
TokNonBlocking -> liftIO $ consumeTxsNonBlocking tpsThrottle req
SingBlocking -> liftIO $ consumeTxsBlocking tpsThrottle req
SingNonBlocking -> liftIO $ consumeTxsNonBlocking tpsThrottle req
(txList, newScript) <- liftIO $ unFold script txCount
case done of
Stop -> return (Exhausted, txList)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ import Ouroboros.Network.Protocol.TxSubmission2.Client (ClientStIdle (
ClientStTxs (..),
TxSubmissionClient (..))
import Ouroboros.Network.Protocol.TxSubmission2.Type (BlockingReplyList (..),
TokBlockingStyle (..), TxSizeInBytes)
SingBlockingStyle (..), TxSizeInBytes)

import Cardano.Api
import Cardano.Api.Shelley (Tx(ShelleyTx), fromShelleyTxId)
Expand All @@ -76,14 +76,14 @@ data TxSource era
= Exhausted
| Active (ProduceNextTxs era)

type ProduceNextTxs era = (forall m blocking . MonadIO m => TokBlockingStyle blocking -> Req -> m (TxSource era, [Tx era]))
type ProduceNextTxs era = (forall m blocking . MonadIO m => SingBlockingStyle blocking -> Req -> m (TxSource era, [Tx era]))

produceNextTxs :: forall m blocking era . MonadIO m => TokBlockingStyle blocking -> Req -> LocalState era -> m (LocalState era, [Tx era])
produceNextTxs :: forall m blocking era . MonadIO m => SingBlockingStyle blocking -> Req -> LocalState era -> m (LocalState era, [Tx era])
produceNextTxs blocking req (txProducer, unack, stats) = do
(newTxProducer, txList) <- produceNextTxs' blocking req txProducer
return ((newTxProducer, unack, stats), txList)

produceNextTxs' :: forall m blocking era . MonadIO m => TokBlockingStyle blocking -> Req -> TxSource era -> m (TxSource era, [Tx era])
produceNextTxs' :: forall m blocking era . MonadIO m => SingBlockingStyle blocking -> Req -> TxSource era -> m (TxSource era, [Tx era])
produceNextTxs' _ _ Exhausted = return (Exhausted, [])
produceNextTxs' blocking req (Active callback) = callback blocking req

Expand All @@ -105,10 +105,10 @@ txSubmissionClient tr bmtr initialTxSource endOfProtocolCallback =
TxSubmissionClient $
pure $ client (initialTxSource, UnAcked [], SubmissionThreadStats 0 0 0)
where
discardAcknowledged :: TokBlockingStyle a -> Ack -> LocalState era -> m (LocalState era)
discardAcknowledged :: SingBlockingStyle a -> Ack -> LocalState era -> m (LocalState era)
discardAcknowledged blocking (Ack ack) (txSource, UnAcked unAcked, stats) = do
when (tokIsBlocking blocking && ack /= length unAcked) $ do
let err = "decideAnnouncement: TokBlocking, but length unAcked != ack"
let err = "decideAnnouncement: SingBlocking, but length unAcked != ack"
traceWith bmtr (TraceBenchTxSubError err)
fail (T.unpack err)
let (stillUnacked, acked) = L.splitAtEnd ack unAcked
Expand All @@ -132,7 +132,7 @@ txSubmissionClient tr bmtr initialTxSource endOfProtocolCallback =

requestTxIds :: forall blocking.
LocalState era
-> TokBlockingStyle blocking
-> SingBlockingStyle blocking
-> Word16
-> Word16
-> m (ClientStTxIds blocking (GenTxId CardanoBlock) (GenTx CardanoBlock) m ())
Expand All @@ -150,15 +150,15 @@ txSubmissionClient tr bmtr initialTxSource endOfProtocolCallback =
traceWith bmtr $ TraceBenchTxSubServOuts (getTxId . getTxBody <$> outs)

case blocking of
TokBlocking -> case NE.nonEmpty newTxs of
SingBlocking -> case NE.nonEmpty newTxs of
Nothing -> do
traceWith tr EndOfProtocol
endOfProtocolCallback stats
pure $ SendMsgDone ()
(Just txs) -> pure $ SendMsgReplyTxIds
(BlockingReply $ txToIdSize <$> txs)
(client stateC)
TokNonBlocking -> pure $ SendMsgReplyTxIds
SingNonBlocking -> pure $ SendMsgReplyTxIds
(NonBlockingReply $ txToIdSize <$> newTxs)
(client stateC)

Expand Down Expand Up @@ -205,18 +205,18 @@ txSubmissionClient tr bmtr initialTxSource endOfProtocolCallback =
fromGenTxId (Block.GenTxIdAlonzo (Mempool.ShelleyTxId i)) = fromShelleyTxId i
fromGenTxId _ = error "submission.hs: fromGenTxId"

tokIsBlocking :: TokBlockingStyle a -> Bool
tokIsBlocking :: SingBlockingStyle a -> Bool
tokIsBlocking = \case
TokBlocking -> True
TokNonBlocking -> False
SingBlocking -> True
SingNonBlocking -> False

reqIdsTrace :: Ack -> Req -> TokBlockingStyle a -> NodeToNodeSubmissionTrace
reqIdsTrace :: Ack -> Req -> SingBlockingStyle a -> NodeToNodeSubmissionTrace
reqIdsTrace ack req = \case
TokBlocking -> ReqIdsBlocking ack req
TokNonBlocking -> ReqIdsPrompt ack req
SingBlocking -> ReqIdsBlocking ack req
SingNonBlocking -> ReqIdsPrompt ack req

idListTrace :: ToAnnce tx -> TokBlockingStyle a -> NodeToNodeSubmissionTrace
idListTrace :: ToAnnce tx -> SingBlockingStyle a -> NodeToNodeSubmissionTrace
idListTrace (ToAnnce toAnn) = \case
TokBlocking -> IdsListBlocking $ length toAnn
TokNonBlocking -> IdsListPrompt $ length toAnn
SingBlocking -> IdsListBlocking $ length toAnn
SingNonBlocking -> IdsListPrompt $ length toAnn

10 changes: 6 additions & 4 deletions cabal.project
Original file line number Diff line number Diff line change
Expand Up @@ -257,8 +257,8 @@ source-repository-package
source-repository-package
type: git
location: https://github.com/input-output-hk/ouroboros-network
tag: ed5db33e0f0d44b175c25974b6305bfd36e88278
--sha256: 1331xpas1vpzliwqkwjzpy42anywf0gx85p6g4x23haphq7limcr
tag: 1540f23e08e172fed54b4b54eb30bce68d9e0384
--sha256: 1h0r8sa36iylc649mklyg4887dnn1zjbpaf7dq163rspcszxa6kp
subdir:
io-sim
io-classes
Expand All @@ -275,6 +275,8 @@ source-repository-package
strict-stm
typed-protocols
typed-protocols-cborg
typed-protocols-stateful
typed-protocols-stateful-cborg
typed-protocols-examples

source-repository-package
Expand All @@ -294,8 +296,8 @@ source-repository-package
source-repository-package
type: git
location: https://github.com/input-output-hk/ekg-forward
tag: 297cd9db5074339a2fb2e5ae7d0780debb670c63
--sha256: 1zcwry3y5rmd9lgxy89wsb3k4kpffqji35dc7ghzbz603y1gy24g
tag: 13d9c7cfb2b8047520c7ed164ce75ac6128683fd
--sha256: 1j3lad5aklrbwn589z6f2bhxvmkz9za4kkp7ic1xxv6zzfcrlbsf

source-repository-package
type: git
Expand Down
7 changes: 5 additions & 2 deletions cardano-api/src/Cardano/Api/ChainSync/ClientPipelined.hs
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,17 @@ module Cardano.Api.ChainSync.ClientPipelined (
, pipelineDecisionLowHighMark

-- * Type level natural numbers
, Queue (..)
, SingQueueF (..)
, F (..)
, N (..)
, Nat (..)
, natToInt

-- * Utilities
, mapChainSyncClientPipelined
) where

import Network.TypedProtocol.Pipelined (N (..), Nat (..), natToInt)
import Network.TypedProtocol.Core
import Ouroboros.Network.Protocol.ChainSync.ClientPipelined
import Ouroboros.Network.Protocol.ChainSync.PipelineDecision
import Data.Type.Nat
35 changes: 19 additions & 16 deletions cardano-api/src/Cardano/Api/IPC.hs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ import Control.Tracer (nullTracer)

import Ouroboros.Consensus.Shelley.Ledger.SupportsProtocol ()
import qualified Ouroboros.Network.Block as Net
import qualified Ouroboros.Network.Driver.Stateful as Stateful
import qualified Ouroboros.Network.Mux as Net
import Ouroboros.Network.NodeToClient (NodeToClientProtocols (..),
NodeToClientVersionData (..))
Expand All @@ -102,7 +103,7 @@ import Ouroboros.Network.Protocol.ChainSync.Client as Net.Sync
import Ouroboros.Network.Protocol.ChainSync.ClientPipelined as Net.SyncP
import Ouroboros.Network.Protocol.LocalStateQuery.Client (LocalStateQueryClient (..))
import qualified Ouroboros.Network.Protocol.LocalStateQuery.Client as Net.Query
import Ouroboros.Network.Protocol.LocalStateQuery.Type (AcquireFailure (..))
import Ouroboros.Network.Protocol.LocalStateQuery.Type (AcquireFailure (..), State(StateIdle))
import qualified Ouroboros.Network.Protocol.LocalStateQuery.Type as Net.Query
import Ouroboros.Network.Protocol.LocalTxMonitor.Client (LocalTxMonitorClient (..),
localTxMonitorClientPeer)
Expand Down Expand Up @@ -241,9 +242,7 @@ connectToLocalNodeWithVersion LocalNodeConnectInfo {
mkVersionedProtocols localNodeNetworkId ptcl clients'

mkVersionedProtocols :: forall block.
( Consensus.ShowQuery (Consensus.Query block)
, ProtocolClient block
)
ProtocolClient block
=> NetworkId
-> ProtocolClientInfoArgs block
-> (NodeToClientVersion -> LocalNodeClientProtocolsForBlock block)
Expand Down Expand Up @@ -295,7 +294,7 @@ mkVersionedProtocols networkid ptcl unversionedClients =
cChainSyncCodec
(Net.Sync.chainSyncClientPeer client)
LocalChainSyncClientPipelined clientPipelined
-> Net.MuxPeerPipelined
-> Net.MuxPeer
nullTracer
cChainSyncCodec
(Net.SyncP.chainSyncClientPeerPipelined clientPipelined)
Expand All @@ -311,20 +310,24 @@ mkVersionedProtocols networkid ptcl unversionedClients =

, localStateQueryProtocol =
Net.InitiatorProtocolOnly $
Net.MuxPeer
nullTracer
cStateQueryCodec
(maybe Net.localStateQueryPeerNull
Net.Query.localStateQueryClientPeer
localStateQueryClientForBlock)
Net.MuxPeerRaw $ \channel ->
Stateful.runPeer
nullTracer
cStateQueryCodec
channel
StateIdle
(maybe Net.localStateQueryPeerNull
Net.Query.localStateQueryClientPeer
localStateQueryClientForBlock)

, localTxMonitorProtocol =
Net.InitiatorProtocolOnly $
Net.MuxPeer
nullTracer
cTxMonitorCodec
(maybe Net.localTxMonitorPeerNull
localTxMonitorClientPeer
localTxMonitoringClientForBlock)
nullTracer
cTxMonitorCodec
(maybe Net.localTxMonitorPeerNull
localTxMonitorClientPeer
localTxMonitoringClientForBlock)
}
where
Consensus.Codecs {
Expand Down
Loading

0 comments on commit f209d10

Please sign in to comment.