diff --git a/bench/tx-generator/src/Cardano/Benchmarking/GeneratorTx/Submission.hs b/bench/tx-generator/src/Cardano/Benchmarking/GeneratorTx/Submission.hs index b9f6b08b0d0..e7eb3b772ee 100644 --- a/bench/tx-generator/src/Cardano/Benchmarking/GeneratorTx/Submission.hs +++ b/bench/tx-generator/src/Cardano/Benchmarking/GeneratorTx/Submission.hs @@ -42,7 +42,7 @@ import Cardano.Tracing.OrphanInstances.Consensus () import Cardano.Tracing.OrphanInstances.Network () import Cardano.Tracing.OrphanInstances.Shelley () -import Ouroboros.Network.Protocol.TxSubmission2.Type (TokBlockingStyle (..)) +import Ouroboros.Network.Protocol.TxSubmission2.Type (SingBlockingStyle (..)) import Cardano.Api @@ -121,11 +121,11 @@ mkSubmissionSummary ssThreadName startTime reportsRefs walletTxSource :: forall era. WalletScript era -> TpsThrottle -> TxSource era walletTxSource walletScript tpsThrottle = Active $ worker walletScript where - worker :: forall m blocking . MonadIO m => WalletScript era -> TokBlockingStyle blocking -> Req -> m (TxSource era, [Tx era]) + worker :: forall m blocking . MonadIO m => WalletScript era -> SingBlockingStyle blocking -> Req -> m (TxSource era, [Tx era]) worker script blocking req = do (done, txCount) <- case blocking of - TokBlocking -> liftIO $ consumeTxsBlocking tpsThrottle req - TokNonBlocking -> liftIO $ consumeTxsNonBlocking tpsThrottle req + SingBlocking -> liftIO $ consumeTxsBlocking tpsThrottle req + SingNonBlocking -> liftIO $ consumeTxsNonBlocking tpsThrottle req (txList, newScript) <- liftIO $ unFold script txCount case done of Stop -> return (Exhausted, txList) diff --git a/bench/tx-generator/src/Cardano/Benchmarking/GeneratorTx/SubmissionClient.hs b/bench/tx-generator/src/Cardano/Benchmarking/GeneratorTx/SubmissionClient.hs index b26b09b90f9..6a47b3f3d54 100644 --- a/bench/tx-generator/src/Cardano/Benchmarking/GeneratorTx/SubmissionClient.hs +++ b/bench/tx-generator/src/Cardano/Benchmarking/GeneratorTx/SubmissionClient.hs @@ -55,7 +55,7 @@ import Ouroboros.Network.Protocol.TxSubmission2.Client (ClientStIdle ( ClientStTxs (..), TxSubmissionClient (..)) import Ouroboros.Network.Protocol.TxSubmission2.Type (BlockingReplyList (..), - TokBlockingStyle (..), TxSizeInBytes) + SingBlockingStyle (..), TxSizeInBytes) import Cardano.Api import Cardano.Api.Shelley (fromShelleyTxId, toConsensusGenTx) @@ -75,14 +75,14 @@ data TxSource era = Exhausted | Active (ProduceNextTxs era) -type ProduceNextTxs era = (forall m blocking . MonadIO m => TokBlockingStyle blocking -> Req -> m (TxSource era, [Tx era])) +type ProduceNextTxs era = (forall m blocking . MonadIO m => SingBlockingStyle blocking -> Req -> m (TxSource era, [Tx era])) -produceNextTxs :: forall m blocking era . MonadIO m => TokBlockingStyle blocking -> Req -> LocalState era -> m (LocalState era, [Tx era]) +produceNextTxs :: forall m blocking era . MonadIO m => SingBlockingStyle blocking -> Req -> LocalState era -> m (LocalState era, [Tx era]) produceNextTxs blocking req (txProducer, unack, stats) = do (newTxProducer, txList) <- produceNextTxs' blocking req txProducer return ((newTxProducer, unack, stats), txList) -produceNextTxs' :: forall m blocking era . MonadIO m => TokBlockingStyle blocking -> Req -> TxSource era -> m (TxSource era, [Tx era]) +produceNextTxs' :: forall m blocking era . MonadIO m => SingBlockingStyle blocking -> Req -> TxSource era -> m (TxSource era, [Tx era]) produceNextTxs' _ _ Exhausted = return (Exhausted, []) produceNextTxs' blocking req (Active callback) = callback blocking req @@ -104,10 +104,10 @@ txSubmissionClient tr bmtr initialTxSource endOfProtocolCallback = TxSubmissionClient $ pure $ client (initialTxSource, UnAcked [], SubmissionThreadStats 0 0 0) where - discardAcknowledged :: TokBlockingStyle a -> Ack -> LocalState era -> m (LocalState era) + discardAcknowledged :: SingBlockingStyle a -> Ack -> LocalState era -> m (LocalState era) discardAcknowledged blocking (Ack ack) (txSource, UnAcked unAcked, stats) = do when (tokIsBlocking blocking && ack /= length unAcked) $ do - let err = "decideAnnouncement: TokBlocking, but length unAcked != ack" + let err = "decideAnnouncement: SingBlocking, but length unAcked != ack" traceWith bmtr (TraceBenchTxSubError err) fail (T.unpack err) let (stillUnacked, acked) = L.splitAtEnd ack unAcked @@ -128,7 +128,7 @@ txSubmissionClient tr bmtr initialTxSource endOfProtocolCallback = requestTxIds :: forall blocking. LocalState era - -> TokBlockingStyle blocking + -> SingBlockingStyle blocking -> Word16 -> Word16 -> m (ClientStTxIds blocking (GenTxId CardanoBlock) (GenTx CardanoBlock) m ()) @@ -147,7 +147,7 @@ txSubmissionClient tr bmtr initialTxSource endOfProtocolCallback = traceWith bmtr $ SubmissionClientUnAcked (getTxId . getTxBody <$> outs) case blocking of - TokBlocking -> case NE.nonEmpty newTxs of + SingBlocking -> case NE.nonEmpty newTxs of Nothing -> do traceWith tr EndOfProtocol endOfProtocolCallback stats @@ -155,7 +155,7 @@ txSubmissionClient tr bmtr initialTxSource endOfProtocolCallback = (Just txs) -> pure $ SendMsgReplyTxIds (BlockingReply $ txToIdSize <$> txs) (client stateC) - TokNonBlocking -> pure $ SendMsgReplyTxIds + SingNonBlocking -> pure $ SendMsgReplyTxIds (NonBlockingReply $ txToIdSize <$> newTxs) (client stateC) @@ -203,17 +203,17 @@ txSubmissionClient tr bmtr initialTxSource endOfProtocolCallback = fromGenTxId (Block.GenTxIdBabbage (Mempool.ShelleyTxId i)) = fromShelleyTxId i fromGenTxId _ = error "TODO: fix incomplete match" - tokIsBlocking :: TokBlockingStyle a -> Bool + tokIsBlocking :: SingBlockingStyle a -> Bool tokIsBlocking = \case - TokBlocking -> True - TokNonBlocking -> False + SingBlocking -> True + SingNonBlocking -> False - reqIdsTrace :: Ack -> Req -> TokBlockingStyle a -> NodeToNodeSubmissionTrace + reqIdsTrace :: Ack -> Req -> SingBlockingStyle a -> NodeToNodeSubmissionTrace reqIdsTrace ack req = \case - TokBlocking -> ReqIdsBlocking ack req - TokNonBlocking -> ReqIdsNonBlocking ack req + SingBlocking -> ReqIdsBlocking ack req + SingNonBlocking -> ReqIdsNonBlocking ack req - idListTrace :: ToAnnce tx -> TokBlockingStyle a -> NodeToNodeSubmissionTrace + idListTrace :: ToAnnce tx -> SingBlockingStyle a -> NodeToNodeSubmissionTrace idListTrace (ToAnnce toAnn) = \case - TokBlocking -> IdsListBlocking $ length toAnn - TokNonBlocking -> IdsListNonBlocking $ length toAnn + SingBlocking -> IdsListBlocking $ length toAnn + SingNonBlocking -> IdsListNonBlocking $ length toAnn diff --git a/cardano-client-demo/LedgerState.hs b/cardano-client-demo/LedgerState.hs index 53a8dff1ab4..df76b36640e 100644 --- a/cardano-client-demo/LedgerState.hs +++ b/cardano-client-demo/LedgerState.hs @@ -17,7 +17,7 @@ import Data.Sequence (Seq) import qualified Data.Sequence as Seq import qualified Data.Text as T import Data.Word -import Network.TypedProtocol.Pipelined (Nat (..)) +import Data.Type.Nat (Nat (..)) import qualified Ouroboros.Consensus.Shelley.Ledger as Shelley import Ouroboros.Network.Protocol.ChainSync.ClientPipelined (ChainSyncClientPipelined (ChainSyncClientPipelined), diff --git a/cardano-client-demo/ScanBlocksPipelined.hs b/cardano-client-demo/ScanBlocksPipelined.hs index 080202aa90a..95c32609399 100644 --- a/cardano-client-demo/ScanBlocksPipelined.hs +++ b/cardano-client-demo/ScanBlocksPipelined.hs @@ -11,12 +11,16 @@ import Cardano.Slotting.Slot import Control.Monad (when) import Data.Kind import Data.Proxy +import Data.Type.Queue +import Data.Type.Nat import Data.Time import Data.Word (Word32) import qualified GHC.TypeLits as GHC import System.Environment (getArgs) import System.FilePath (()) +import Ouroboros.Network.Protocol.ChainSync.ClientPipelined + -- | Connects to a local cardano node, requests the blocks and prints out the -- number of transactions. To run this, you must first start a local node e.g.: -- @@ -76,16 +80,17 @@ chainSyncClient pipelineSize = ChainSyncClientPipelined $ do startTime <- getCurrentTime let clientIdle_RequestMoreN :: WithOrigin BlockNo -> WithOrigin BlockNo - -> Nat n -> ClientPipelinedStIdle n (BlockInMode CardanoMode) - ChainPoint ChainTip IO () - clientIdle_RequestMoreN clientTip serverTip n = case pipelineDecisionMax pipelineSize n clientTip serverTip of - Collect -> case n of - Succ predN -> CollectResponse Nothing (clientNextN predN) - _ -> SendMsgRequestNextPipelined (clientIdle_RequestMoreN clientTip serverTip (Succ n)) + -> SingQueueF F q -> ClientPipelinedStIdle (BlockInMode CardanoMode) + ChainPoint ChainTip q IO () + clientIdle_RequestMoreN clientTip serverTip q = case pipelineDecisionMax pipelineSize (queueFDepthNat q) clientTip serverTip of + Collect -> case q of + SingConsF FCanAwait q' -> CollectResponse Nothing (clientNextN q') + SingConsF FMustReply q' -> CollectResponse Nothing (clientNextN q') + _ -> SendMsgRequestNextPipelined (clientIdle_RequestMoreN clientTip serverTip (q |> FCanAwait)) - clientNextN :: Nat n -> ClientStNext n (BlockInMode CardanoMode) - ChainPoint ChainTip IO () - clientNextN n = + clientNextN :: SingQueueF F q -> ClientStNext (BlockInMode CardanoMode) + ChainPoint ChainTip q IO () + clientNextN q = ClientStNext { recvMsgRollForward = \(BlockInMode block@(Block (BlockHeader _ _ currBlockNo@(BlockNo blockNo)) _) _) serverChainTip -> do let newClientTip = At currBlockNo @@ -97,27 +102,30 @@ chainSyncClient pipelineSize = ChainSyncClientPipelined $ do rate = fromIntegral blockNo / elapsedTime putStrLn $ "Rate = " ++ show rate ++ " blocks/second" if newClientTip == newServerTip - then clientIdle_DoneN n - else return (clientIdle_RequestMoreN newClientTip newServerTip n) + then clientIdle_DoneN q + else return (clientIdle_RequestMoreN newClientTip newServerTip q) , recvMsgRollBackward = \_ serverChainTip -> do putStrLn "Rollback" let newClientTip = Origin -- We don't actually keep track of blocks so we temporarily "forget" the tip. newServerTip = fromChainTip serverChainTip - return (clientIdle_RequestMoreN newClientTip newServerTip n) + return (clientIdle_RequestMoreN newClientTip newServerTip q) } - clientIdle_DoneN :: Nat n -> IO (ClientPipelinedStIdle n (BlockInMode CardanoMode) - ChainPoint ChainTip IO ()) - clientIdle_DoneN n = case n of - Succ predN -> do + clientIdle_DoneN :: SingQueueF F q -> IO (ClientPipelinedStIdle (BlockInMode CardanoMode) + ChainPoint ChainTip q IO ()) + clientIdle_DoneN q = case q of + SingConsF FCanAwait q' -> do + putStrLn "Chain Sync: done! (Ignoring remaining responses)" + return $ CollectResponse Nothing (clientNext_DoneN q') -- Ignore remaining message responses + SingConsF FMustReply q' -> do putStrLn "Chain Sync: done! (Ignoring remaining responses)" - return $ CollectResponse Nothing (clientNext_DoneN predN) -- Ignore remaining message responses - Zero -> do + return $ CollectResponse Nothing (clientNext_DoneN q') -- Ignore remaining message responses + SingEmptyF -> do putStrLn "Chain Sync: done!" return $ SendMsgDone () - clientNext_DoneN :: Nat n -> ClientStNext n (BlockInMode CardanoMode) - ChainPoint ChainTip IO () + clientNext_DoneN :: SingQueueF F q -> ClientStNext (BlockInMode CardanoMode) + ChainPoint ChainTip q IO () clientNext_DoneN n = ClientStNext { recvMsgRollForward = \_ _ -> clientIdle_DoneN n @@ -133,4 +141,4 @@ chainSyncClient pipelineSize = ChainSyncClientPipelined $ do ChainTipAtGenesis -> Origin ChainTip _ _ bno -> At bno - return (clientIdle_RequestMoreN Origin Origin Zero) + return (clientIdle_RequestMoreN Origin Origin SingEmptyF) diff --git a/cardano-client-demo/cardano-client-demo.cabal b/cardano-client-demo/cardano-client-demo.cabal index 0982c02fff7..305f2551b64 100644 --- a/cardano-client-demo/cardano-client-demo.cabal +++ b/cardano-client-demo/cardano-client-demo.cabal @@ -34,7 +34,9 @@ executable scan-blocks-pipelined , cardano-ledger-byron , cardano-slotting , filepath + , ouroboros-network , time + , typed-protocols executable chain-sync-client-with-ledger-state import: base, project-config