Skip to content

Commit

Permalink
Updated cardano-client-demo & benchmarking code
Browse files Browse the repository at this point in the history
  • Loading branch information
coot committed May 9, 2022
1 parent 0822e9b commit 2cccf2b
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import Cardano.Tracing.OrphanInstances.Consensus ()
import Cardano.Tracing.OrphanInstances.Network ()
import Cardano.Tracing.OrphanInstances.Shelley ()

import Ouroboros.Network.Protocol.TxSubmission2.Type (TokBlockingStyle (..))
import Ouroboros.Network.Protocol.TxSubmission2.Type (SingBlockingStyle (..))

import Cardano.Api

Expand Down Expand Up @@ -121,11 +121,11 @@ mkSubmissionSummary ssThreadName startTime reportsRefs
walletTxSource :: forall era. WalletScript era -> TpsThrottle -> TxSource era
walletTxSource walletScript tpsThrottle = Active $ worker walletScript
where
worker :: forall m blocking . MonadIO m => WalletScript era -> TokBlockingStyle blocking -> Req -> m (TxSource era, [Tx era])
worker :: forall m blocking . MonadIO m => WalletScript era -> SingBlockingStyle blocking -> Req -> m (TxSource era, [Tx era])
worker script blocking req = do
(done, txCount) <- case blocking of
TokBlocking -> liftIO $ consumeTxsBlocking tpsThrottle req
TokNonBlocking -> liftIO $ consumeTxsNonBlocking tpsThrottle req
SingBlocking -> liftIO $ consumeTxsBlocking tpsThrottle req
SingNonBlocking -> liftIO $ consumeTxsNonBlocking tpsThrottle req
(txList, newScript) <- liftIO $ unFold script txCount
case done of
Stop -> return (Exhausted, txList)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ import Ouroboros.Network.Protocol.TxSubmission2.Client (ClientStIdle (
ClientStTxs (..),
TxSubmissionClient (..))
import Ouroboros.Network.Protocol.TxSubmission2.Type (BlockingReplyList (..),
TokBlockingStyle (..), TxSizeInBytes)
SingBlockingStyle (..), TxSizeInBytes)

import Cardano.Api
import Cardano.Api.Shelley (Tx(ShelleyTx), fromShelleyTxId)
Expand All @@ -76,14 +76,14 @@ data TxSource era
= Exhausted
| Active (ProduceNextTxs era)

type ProduceNextTxs era = (forall m blocking . MonadIO m => TokBlockingStyle blocking -> Req -> m (TxSource era, [Tx era]))
type ProduceNextTxs era = (forall m blocking . MonadIO m => SingBlockingStyle blocking -> Req -> m (TxSource era, [Tx era]))

produceNextTxs :: forall m blocking era . MonadIO m => TokBlockingStyle blocking -> Req -> LocalState era -> m (LocalState era, [Tx era])
produceNextTxs :: forall m blocking era . MonadIO m => SingBlockingStyle blocking -> Req -> LocalState era -> m (LocalState era, [Tx era])
produceNextTxs blocking req (txProducer, unack, stats) = do
(newTxProducer, txList) <- produceNextTxs' blocking req txProducer
return ((newTxProducer, unack, stats), txList)

produceNextTxs' :: forall m blocking era . MonadIO m => TokBlockingStyle blocking -> Req -> TxSource era -> m (TxSource era, [Tx era])
produceNextTxs' :: forall m blocking era . MonadIO m => SingBlockingStyle blocking -> Req -> TxSource era -> m (TxSource era, [Tx era])
produceNextTxs' _ _ Exhausted = return (Exhausted, [])
produceNextTxs' blocking req (Active callback) = callback blocking req

Expand All @@ -105,10 +105,10 @@ txSubmissionClient tr bmtr initialTxSource endOfProtocolCallback =
TxSubmissionClient $
pure $ client (initialTxSource, UnAcked [], SubmissionThreadStats 0 0 0)
where
discardAcknowledged :: TokBlockingStyle a -> Ack -> LocalState era -> m (LocalState era)
discardAcknowledged :: SingBlockingStyle a -> Ack -> LocalState era -> m (LocalState era)
discardAcknowledged blocking (Ack ack) (txSource, UnAcked unAcked, stats) = do
when (tokIsBlocking blocking && ack /= length unAcked) $ do
let err = "decideAnnouncement: TokBlocking, but length unAcked != ack"
let err = "decideAnnouncement: SingBlocking, but length unAcked != ack"
traceWith bmtr (TraceBenchTxSubError err)
fail (T.unpack err)
let (stillUnacked, acked) = L.splitAtEnd ack unAcked
Expand All @@ -132,7 +132,7 @@ txSubmissionClient tr bmtr initialTxSource endOfProtocolCallback =

requestTxIds :: forall blocking.
LocalState era
-> TokBlockingStyle blocking
-> SingBlockingStyle blocking
-> Word16
-> Word16
-> m (ClientStTxIds blocking (GenTxId CardanoBlock) (GenTx CardanoBlock) m ())
Expand All @@ -150,15 +150,15 @@ txSubmissionClient tr bmtr initialTxSource endOfProtocolCallback =
traceWith bmtr $ TraceBenchTxSubServOuts (getTxId . getTxBody <$> outs)

case blocking of
TokBlocking -> case NE.nonEmpty newTxs of
SingBlocking -> case NE.nonEmpty newTxs of
Nothing -> do
traceWith tr EndOfProtocol
endOfProtocolCallback stats
pure $ SendMsgDone ()
(Just txs) -> pure $ SendMsgReplyTxIds
(BlockingReply $ txToIdSize <$> txs)
(client stateC)
TokNonBlocking -> pure $ SendMsgReplyTxIds
SingNonBlocking -> pure $ SendMsgReplyTxIds
(NonBlockingReply $ txToIdSize <$> newTxs)
(client stateC)

Expand Down Expand Up @@ -205,18 +205,18 @@ txSubmissionClient tr bmtr initialTxSource endOfProtocolCallback =
fromGenTxId (Block.GenTxIdAlonzo (Mempool.ShelleyTxId i)) = fromShelleyTxId i
fromGenTxId _ = error "submission.hs: fromGenTxId"

tokIsBlocking :: TokBlockingStyle a -> Bool
tokIsBlocking :: SingBlockingStyle a -> Bool
tokIsBlocking = \case
TokBlocking -> True
TokNonBlocking -> False
SingBlocking -> True
SingNonBlocking -> False

reqIdsTrace :: Ack -> Req -> TokBlockingStyle a -> NodeToNodeSubmissionTrace
reqIdsTrace :: Ack -> Req -> SingBlockingStyle a -> NodeToNodeSubmissionTrace
reqIdsTrace ack req = \case
TokBlocking -> ReqIdsBlocking ack req
TokNonBlocking -> ReqIdsPrompt ack req
SingBlocking -> ReqIdsBlocking ack req
SingNonBlocking -> ReqIdsPrompt ack req

idListTrace :: ToAnnce tx -> TokBlockingStyle a -> NodeToNodeSubmissionTrace
idListTrace :: ToAnnce tx -> SingBlockingStyle a -> NodeToNodeSubmissionTrace
idListTrace (ToAnnce toAnn) = \case
TokBlocking -> IdsListBlocking $ length toAnn
TokNonBlocking -> IdsListPrompt $ length toAnn
SingBlocking -> IdsListBlocking $ length toAnn
SingNonBlocking -> IdsListPrompt $ length toAnn

2 changes: 1 addition & 1 deletion cardano-client-demo/LedgerState.hs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import Data.Sequence (Seq)
import qualified Data.Sequence as Seq
import qualified Data.Text as T
import Data.Word
import Network.TypedProtocol.Pipelined (Nat (..))
import Data.Type.Nat (Nat (..))
import qualified Ouroboros.Consensus.Shelley.Ledger as Shelley
import Ouroboros.Network.Protocol.ChainSync.ClientPipelined
(ChainSyncClientPipelined (ChainSyncClientPipelined),
Expand Down
50 changes: 29 additions & 21 deletions cardano-client-demo/ScanBlocksPipelined.hs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,16 @@ import Cardano.Slotting.Slot
import Control.Monad (when)
import Data.Kind
import Data.Proxy
import Data.Type.Queue
import Data.Type.Nat
import Data.Time
import Data.Word (Word32)
import qualified GHC.TypeLits as GHC
import System.Environment (getArgs)
import System.FilePath ((</>))

import Ouroboros.Network.Protocol.ChainSync.ClientPipelined

-- | Connects to a local cardano node, requests the blocks and prints out the
-- number of transactions. To run this, you must first start a local node e.g.:
--
Expand Down Expand Up @@ -76,16 +80,17 @@ chainSyncClient pipelineSize = ChainSyncClientPipelined $ do
startTime <- getCurrentTime
let
clientIdle_RequestMoreN :: WithOrigin BlockNo -> WithOrigin BlockNo
-> Nat n -> ClientPipelinedStIdle n (BlockInMode CardanoMode)
ChainPoint ChainTip IO ()
clientIdle_RequestMoreN clientTip serverTip n = case pipelineDecisionMax pipelineSize n clientTip serverTip of
Collect -> case n of
Succ predN -> CollectResponse Nothing (clientNextN predN)
_ -> SendMsgRequestNextPipelined (clientIdle_RequestMoreN clientTip serverTip (Succ n))
-> SingQueueF F q -> ClientPipelinedStIdle (BlockInMode CardanoMode)
ChainPoint ChainTip q IO ()
clientIdle_RequestMoreN clientTip serverTip q = case pipelineDecisionMax pipelineSize (queueFDepthNat q) clientTip serverTip of
Collect -> case q of
SingConsF FCanAwait q' -> CollectResponse Nothing (clientNextN q')
SingConsF FMustReply q' -> CollectResponse Nothing (clientNextN q')
_ -> SendMsgRequestNextPipelined (clientIdle_RequestMoreN clientTip serverTip (q |> FCanAwait))

clientNextN :: Nat n -> ClientStNext n (BlockInMode CardanoMode)
ChainPoint ChainTip IO ()
clientNextN n =
clientNextN :: SingQueueF F q -> ClientStNext (BlockInMode CardanoMode)
ChainPoint ChainTip q IO ()
clientNextN q =
ClientStNext {
recvMsgRollForward = \(BlockInMode block@(Block (BlockHeader _ _ currBlockNo@(BlockNo blockNo)) _) _) serverChainTip -> do
let newClientTip = At currBlockNo
Expand All @@ -97,27 +102,30 @@ chainSyncClient pipelineSize = ChainSyncClientPipelined $ do
rate = fromIntegral blockNo / elapsedTime
putStrLn $ "Rate = " ++ show rate ++ " blocks/second"
if newClientTip == newServerTip
then clientIdle_DoneN n
else return (clientIdle_RequestMoreN newClientTip newServerTip n)
then clientIdle_DoneN q
else return (clientIdle_RequestMoreN newClientTip newServerTip q)
, recvMsgRollBackward = \_ serverChainTip -> do
putStrLn "Rollback"
let newClientTip = Origin -- We don't actually keep track of blocks so we temporarily "forget" the tip.
newServerTip = fromChainTip serverChainTip
return (clientIdle_RequestMoreN newClientTip newServerTip n)
return (clientIdle_RequestMoreN newClientTip newServerTip q)
}

clientIdle_DoneN :: Nat n -> IO (ClientPipelinedStIdle n (BlockInMode CardanoMode)
ChainPoint ChainTip IO ())
clientIdle_DoneN n = case n of
Succ predN -> do
clientIdle_DoneN :: SingQueueF F q -> IO (ClientPipelinedStIdle (BlockInMode CardanoMode)
ChainPoint ChainTip q IO ())
clientIdle_DoneN q = case q of
SingConsF FCanAwait q' -> do
putStrLn "Chain Sync: done! (Ignoring remaining responses)"
return $ CollectResponse Nothing (clientNext_DoneN q') -- Ignore remaining message responses
SingConsF FMustReply q' -> do
putStrLn "Chain Sync: done! (Ignoring remaining responses)"
return $ CollectResponse Nothing (clientNext_DoneN predN) -- Ignore remaining message responses
Zero -> do
return $ CollectResponse Nothing (clientNext_DoneN q') -- Ignore remaining message responses
SingEmptyF -> do
putStrLn "Chain Sync: done!"
return $ SendMsgDone ()

clientNext_DoneN :: Nat n -> ClientStNext n (BlockInMode CardanoMode)
ChainPoint ChainTip IO ()
clientNext_DoneN :: SingQueueF F q -> ClientStNext (BlockInMode CardanoMode)
ChainPoint ChainTip q IO ()
clientNext_DoneN n =
ClientStNext {
recvMsgRollForward = \_ _ -> clientIdle_DoneN n
Expand All @@ -133,4 +141,4 @@ chainSyncClient pipelineSize = ChainSyncClientPipelined $ do
ChainTipAtGenesis -> Origin
ChainTip _ _ bno -> At bno

return (clientIdle_RequestMoreN Origin Origin Zero)
return (clientIdle_RequestMoreN Origin Origin SingEmptyF)
2 changes: 2 additions & 0 deletions cardano-client-demo/cardano-client-demo.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ executable scan-blocks-pipelined
, cardano-ledger-byron
, cardano-slotting
, filepath
, ouroboros-network
, time
, typed-protocols

executable chain-sync-client-with-ledger-state
import: base, project-config
Expand Down

0 comments on commit 2cccf2b

Please sign in to comment.