diff --git a/amm-executor/resources/config.dhall b/amm-executor/resources/config.dhall index a205916..aa1a76a 100755 --- a/amm-executor/resources/config.dhall +++ b/amm-executor/resources/config.dhall @@ -1,4 +1,4 @@ -let FeePolicy = < Strict | Balance > +let FeePolicy = < Strict | Balance | SplitBetween : List Text > let CollateralPolicy = < Ignore | Cover > let Network = < Mainnet | Preview > @@ -88,9 +88,9 @@ in {- The backlog is an essential component of the application responsible for executing orders. The configuration provided below specifies the main parameters related to order execution: - * orderLifetime - This parameter defines the duration in picoseconds during which an + * orderLifetime - This parameter defines the duration in milliseconds during which an order will be considered ready for execution, starting from the current time. - * orderExecTime - This parameter determines the duration in picoseconds for rechecking + * orderExecTime - This parameter determines the duration in milliseconds for rechecking the execution status of an order. If an order was not executed within this timeframe, the backlog will attempt to execute it again. * suspendedPropability - This parameter sets the probability level for executing orders @@ -103,9 +103,10 @@ in of 95%. -} backlogConfig = - { orderLifetime = 4500 - , orderExecTime = 1500 - , suspendedPropability = 0 + { orderLifetime = 45000000 + , orderExecTime = 15000000 + , suspendedPropability = 90000 + , unsafeQueueOrderLifetime = 60000 }, {- @@ -192,11 +193,14 @@ in utxoStoreConfig = { utxoStorePath = "./path/to/utxoStore" , createIfMissing = True - }, + } , unsafeEval = { unsafeTxFee = +320000 , exUnits = 165000000 , exMem = 530000 } +, httpSubmit = + { submitUri = "http://localhost:8090/api/submit/tx" + } } \ No newline at end of file diff --git a/amm-executor/src/Spectrum/Executor.hs b/amm-executor/src/Spectrum/Executor.hs index c94ae5d..c2894ca 100755 --- a/amm-executor/src/Spectrum/Executor.hs +++ b/amm-executor/src/Spectrum/Executor.hs @@ -272,7 +272,7 @@ wireApp = App { unApp = interceptSigTerm >> do poolActionsV1 = mkPoolActions unsafeEval (PaymentPubKeyHash executorPkh) validatorsV1 poolActionsV2 = mkPoolActions unsafeEval (PaymentPubKeyHash executorPkh) validatorsV2 refInputs <- liftIO $ mkRefInputs txsInsRefs explorer - executorService <- mkOrdersExecutorService backlogService transactions explorer resolver poolActionsV1 poolActionsV2 refInputs + executorService <- mkOrdersExecutorService backlogService transactions backlogConfig explorer resolver poolActionsV1 poolActionsV2 refInputs executor <- mkOrdersExecutor backlogService executorService pendingOrdersLogging <- forComponent mkLogging "Bots.PendingOrdersHandler" mempoolOrdersLogging <- forComponent mkLogging "Bots.MempoolOrdersHandler" diff --git a/amm-executor/src/Spectrum/Executor/Backlog/Config.hs b/amm-executor/src/Spectrum/Executor/Backlog/Config.hs index 7653324..b7ce63a 100755 --- a/amm-executor/src/Spectrum/Executor/Backlog/Config.hs +++ b/amm-executor/src/Spectrum/Executor/Backlog/Config.hs @@ -17,7 +17,8 @@ import GHC.Natural data BacklogServiceConfig = BacklogServiceConfig { orderLifetime :: !NominalDiffTime , orderExecTime :: !NominalDiffTime - , suspendedPropability :: !Natural + , suspendedPropability :: !Natural + , unsafeQueueOrderLifetime :: !NominalDiffTime } deriving (Generic, FromDhall, Show) instance FromDhall NominalDiffTime where diff --git a/amm-executor/src/Spectrum/Executor/Backlog/Service.hs b/amm-executor/src/Spectrum/Executor/Backlog/Service.hs index 2740142..da3378a 100755 --- a/amm-executor/src/Spectrum/Executor/Backlog/Service.hs +++ b/amm-executor/src/Spectrum/Executor/Backlog/Service.hs @@ -28,7 +28,7 @@ import Spectrum.Executor.Backlog.Data.BacklogOrder import Spectrum.Executor.Backlog.Persistence.BacklogStore ( BacklogStore(BacklogStore, exists, get, dropOrder, put, getAll) ) import Spectrum.Executor.Backlog.Config - ( BacklogServiceConfig (BacklogServiceConfig, orderLifetime, orderExecTime, suspendedPropability) ) + ( BacklogServiceConfig (..) ) import Spectrum.Executor.Data.OrderState ( OrderState (..), OrderInState (PendingOrder, SuspendedOrder, InProgressOrder) ) import Spectrum.Executor.Types diff --git a/amm-executor/src/Spectrum/Executor/OrdersExecutor/Process.hs b/amm-executor/src/Spectrum/Executor/OrdersExecutor/Process.hs index aeb0dbd..6d8be20 100755 --- a/amm-executor/src/Spectrum/Executor/OrdersExecutor/Process.hs +++ b/amm-executor/src/Spectrum/Executor/OrdersExecutor/Process.hs @@ -12,9 +12,11 @@ import RIO ( (&), MonadReader, MonadUnliftIO, MonadIO (liftIO), QSem, (<&>) ) import qualified RIO.List as List import Streamly.Prelude as S - ( repeatM, mapM, MonadAsync, IsStream, before, drain ) + ( repeatM, mapM, MonadAsync, IsStream, before, drain, fromEffect ) import Control.Monad.Catch ( MonadThrow, SomeException, MonadCatch, catches, Handler (Handler) ) +import RIO + ( atomicModifyIORef' ) import System.Logging.Hlog ( MakeLogging (MakeLogging, forComponent), Logging (Logging, debugM, debugM) ) @@ -99,7 +101,17 @@ run' -> OrdersExecutorService m -> s m () run' Logging{..} BacklogService{..} OrdersExecutorService{..} = - S.repeatM (liftIO (threadDelay 5000000) >> tryAcquire) & S.mapM (\case + S.repeatM (liftIO (threadDelay 5000000) >> atomicModifyIORef' getQueue (\queue -> + case queue of + [] -> ([], Nothing) + [xs] -> ([], Just xs) + (x:xs) -> (xs, Just x) + )) & S.mapM (\case + Just order -> + pure $ Just order + Nothing -> + tryAcquire + ) & S.mapM (\case Just orderWithCreationTime@(OrderWithCreationTime order _) -> infoM ("Going to execute order for pool" ++ show (orderId order)) >> execute orderWithCreationTime diff --git a/amm-executor/src/Spectrum/Executor/OrdersExecutor/Service.hs b/amm-executor/src/Spectrum/Executor/OrdersExecutor/Service.hs index d024973..d335a61 100644 --- a/amm-executor/src/Spectrum/Executor/OrdersExecutor/Service.hs +++ b/amm-executor/src/Spectrum/Executor/OrdersExecutor/Service.hs @@ -7,7 +7,7 @@ import Prelude hiding (drop) import RIO.Time ( UTCTime, getCurrentTime, diffUTCTime, nominalDiffTimeToSeconds ) import RIO - ( MonadReader, MonadUnliftIO, (<&>), MonadIO, Text ) + ( MonadReader, MonadUnliftIO, (<&>), MonadIO, Text, IORef, atomicModifyIORef', newIORef ) import qualified RIO.List as List import Control.Monad.Catch ( MonadThrow, SomeException, MonadCatch, catches, Handler (Handler) ) @@ -68,33 +68,39 @@ import Data.Text (pack) import Data.Aeson (encode) import qualified System.Logging.Hlog as Trace import Spectrum.Executor.OrdersExecutor.RefInputs (RefInputs(..)) +import Spectrum.Executor.Backlog.Config (BacklogServiceConfig(..)) data OrdersExecutorService m = OrdersExecutorService { execute :: OrderWithCreationTime -> m () , executeUnsafe :: OrderWithCreationTime -> m () + , getQueue :: IORef [OrderWithCreationTime] } mkOrdersExecutorService :: forall f m env era. ( MonadUnliftIO m , MonadReader env f + , MonadUnliftIO f , HasType (MakeLogging f m) env , HasType TxRefs env, MonadCatch m) => BacklogService m -> Transactions m era + -> BacklogServiceConfig -> Explorer m -> PoolResolver m -> PoolActions 'V1 -> PoolActions 'V2 -> RefInputs -> f (OrdersExecutorService m) -mkOrdersExecutorService backlog transactions explorer resolver poolActionsV1 poolActionsV2 refInputs = do - MakeLogging{..} <- askContext - txRefsCfg <- askContext - logging <- forComponent "Bots.OrdersExecutorService" +mkOrdersExecutorService backlog transactions config explorer resolver poolActionsV1 poolActionsV2 refInputs = do + MakeLogging{..} <- askContext + txRefsCfg <- askContext + logging <- forComponent "Bots.OrdersExecutorService123" + unsafeOrderQueue <- newIORef [] pure $ OrdersExecutorService - { execute = execute' logging txRefsCfg backlog transactions explorer resolver poolActionsV1 poolActionsV2 - , executeUnsafe = executeUnsafe' logging refInputs backlog transactions explorer resolver poolActionsV1 poolActionsV2 + { execute = execute' logging txRefsCfg backlog transactions config explorer resolver poolActionsV1 poolActionsV2 unsafeOrderQueue + , executeUnsafe = executeUnsafe' logging refInputs backlog transactions config explorer resolver poolActionsV1 poolActionsV2 unsafeOrderQueue + , getQueue = unsafeOrderQueue } execute' @@ -103,13 +109,15 @@ execute' -> TxRefs -> BacklogService m -> Transactions m era + -> BacklogServiceConfig -> Explorer m -> PoolResolver m -> PoolActions 'V1 -> PoolActions 'V2 + -> IORef [OrderWithCreationTime] -> OrderWithCreationTime -> m () -execute' l@Logging{..} txRefs backlog@BacklogService{suspend, drop} txs explorer resolver poolActionsV1 poolActionsV2 (OrderWithCreationTime order orderTime) = do +execute' l@Logging{..} txRefs backlog@BacklogService{suspend, drop} txs config explorer resolver poolActionsV1 poolActionsV2 priorityMap (OrderWithCreationTime order orderTime) = do executionStartTime <- getCurrentTime executeOrder' backlog l txRefs txs explorer resolver poolActionsV1 poolActionsV2 order executionStartTime `catches` [ Handler (\ (execErr :: OrderExecErr) -> case execErr of @@ -120,7 +128,7 @@ execute' l@Logging{..} txRefs backlog@BacklogService{suspend, drop} txs explorer dropError -> drop (orderId order) >> infoM ("Err " ++ show dropError ++ " occured for " ++ show (orderId order) ++ ". Going to drop") ) - , Handler (\ (dropError :: SomeException) -> processOrderExecutionException l backlog dropError order orderTime) + , Handler (\ (dropError :: SomeException) -> processOrderExecutionException l backlog config dropError order orderTime priorityMap executionStartTime) ] executionEndTime <- getCurrentTime let timeDiff = fromEnum $ nominalDiffTimeToSeconds $ diffUTCTime executionEndTime executionStartTime @@ -134,13 +142,15 @@ executeUnsafe' -> RefInputs -> BacklogService m -> Transactions m era + -> BacklogServiceConfig -> Explorer m -> PoolResolver m -> PoolActions 'V1 -> PoolActions 'V2 + -> IORef [OrderWithCreationTime] -> OrderWithCreationTime -> m () -executeUnsafe' l@Logging{..} refInputs backlog@BacklogService{suspend, drop} txs explorer resolver poolActionsV1 poolActionsV2 (OrderWithCreationTime order orderTime) = do +executeUnsafe' l@Logging{..} refInputs backlog@BacklogService{suspend, drop} txs config explorer resolver poolActionsV1 poolActionsV2 priorityMap (OrderWithCreationTime order orderTime) = do executionStartTime <- getCurrentTime executeOrderUnsafe' backlog l refInputs txs explorer resolver poolActionsV1 poolActionsV2 order executionStartTime `catches` [ Handler (\ (execErr :: OrderExecErr) -> case execErr of @@ -151,7 +161,7 @@ executeUnsafe' l@Logging{..} refInputs backlog@BacklogService{suspend, drop} txs dropError -> drop (orderId order) >> infoM ("(Unsafe) Err " ++ show dropError ++ " occured for " ++ show (orderId order) ++ ". Going to drop") ) - , Handler (\ (dropError :: SomeException) -> processOrderExecutionException l backlog dropError order orderTime) + , Handler (\ (dropError :: SomeException) -> processOrderExecutionException l backlog config dropError order orderTime priorityMap executionStartTime) ] executionEndTime <- getCurrentTime let timeDiff = fromEnum $ nominalDiffTimeToSeconds $ diffUTCTime executionEndTime executionStartTime @@ -159,13 +169,18 @@ executeUnsafe' l@Logging{..} refInputs backlog@BacklogService{suspend, drop} txs infoM $ "(Unsafe) Time of end order processing is " ++ show executionEndTime infoM $ "(Unsafe) Time of processing order is " ++ show (timeDiff `div` 1000000000) ++ " mills" -processOrderExecutionException :: Monad m => Logging m -> BacklogService m -> SomeException -> Order -> UTCTime -> m () -processOrderExecutionException Logging{..} BacklogService{suspend, drop} executionError order@(OnChain FullTxOut{..} _) orderTime = do +processOrderExecutionException :: MonadIO m => Logging m -> BacklogService m -> BacklogServiceConfig -> SomeException -> Order -> UTCTime -> IORef [OrderWithCreationTime] -> UTCTime -> m () +processOrderExecutionException Logging{..} BacklogService{suspend, drop} BacklogServiceConfig{..} executionError order@(OnChain FullTxOut{..} _) orderTime priorityQueue executionStartTime = do let errMsgText = pack (show executionError) if isInfixOf "BadInputsUTxO" errMsgText && not (pack (show (txOutRefId fullTxOutRef)) `isInfixOf` errMsgText) then - suspend (SuspendedOrder order orderTime) >> - infoM ("Got BadInputsUTxO error during order (" ++ show (orderId order) ++ ") execution without orderId (" ++ show (txOutRefId fullTxOutRef) ++ "). " ++ show errMsgText ++ ". Going to suspend order") + if (diffUTCTime executionStartTime orderTime < unsafeQueueOrderLifetime) + then + atomicModifyIORef' priorityQueue (\prevQueue -> (OrderWithCreationTime order orderTime : prevQueue, ())) >> + infoM ("Got BadInputsUTxO error during order (" ++ show (orderId order) ++ ") execution without orderId (" ++ show (txOutRefId fullTxOutRef) ++ "). " ++ show errMsgText ++ ". Going to put it to priority unsafe order queue") + else + suspend (SuspendedOrder order orderTime) >> + infoM ("Got BadInputsUTxO error during order (" ++ show (orderId order) ++ ") execution without orderId (" ++ show (txOutRefId fullTxOutRef) ++ "). " ++ show errMsgText ++ ". Going to suspend order") else drop (orderId order) >> infoM ("Got error (" ++ show (processDropErrorMsg errMsgText)++ ") during order (" ++ show (orderId order) ++ ") " ++ ". Going to drop order") processDropErrorMsg :: Text -> Text @@ -293,7 +308,7 @@ runOrder -> Logging m -> m (TxCandidate, Predicted Core.Pool) runOrder TxRefs{..} Explorer{..} (Pool (OnChain poolOut pool) version) (OnChain orderOut Core.AnyOrder{..}) PoolActions{..} Logging{..} = do - let + let poolOutRef = case version of V1 -> Interop.fromCardanoTxIn poolV1Ref V2 -> Interop.fromCardanoTxIn poolV2Ref diff --git a/amm-executor/test/Gen/ConstantsGen.hs b/amm-executor/test/Gen/ConstantsGen.hs index 1b8ea1d..c1c6f4d 100755 --- a/amm-executor/test/Gen/ConstantsGen.hs +++ b/amm-executor/test/Gen/ConstantsGen.hs @@ -59,6 +59,7 @@ cfgForOnlyPendingOrders = BacklogServiceConfig { orderLifetime = 900 :: NominalDiffTime , orderExecTime = 600 :: NominalDiffTime , suspendedPropability = 0 + , unsafeQueueOrderLifetime = 600 :: NominalDiffTime } data ValidatorInfo = ValidatorInfo diff --git a/dcConfigs/dcSpectrumConfig.dhall b/dcConfigs/dcSpectrumConfig.dhall index 68e1f01..849288d 100755 --- a/dcConfigs/dcSpectrumConfig.dhall +++ b/dcConfigs/dcSpectrumConfig.dhall @@ -1,4 +1,4 @@ -let FeePolicy = < Strict | Balance > +let FeePolicy = < Strict | Balance | SplitBetween : List Text > let CollateralPolicy = < Ignore | Cover > let Network = < Mainnet | Preview > @@ -31,9 +31,10 @@ in , createIfMissing = True } , backlogConfig = - { orderLifetime = 4500 - , orderExecTime = 1500 - , suspendedPropability = 0 + { orderLifetime = 45000000 + , orderExecTime = 15000000 + , suspendedPropability = 50 + , unsafeQueueOrderLifetime = 60000 } , backlogStoreConfig = { storePath = "./data/backlogStore" @@ -80,4 +81,7 @@ in , exUnits = 165000000 , exMem = 530000 } +, httpSubmit = + { submitUri = "http://localhost:8090/api/submit/tx" + } } \ No newline at end of file