Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dev. Expr build rc1.0.3 rc 4 #79

Open
wants to merge 3 commits into
base: update-dc-version-to-101
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 11 additions & 7 deletions amm-executor/resources/config.dhall
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
let FeePolicy = < Strict | Balance >
let FeePolicy = < Strict | Balance | SplitBetween : List Text >
let CollateralPolicy = < Ignore | Cover >
let Network = < Mainnet | Preview >

Expand Down Expand Up @@ -88,9 +88,9 @@ in
{-
The backlog is an essential component of the application responsible for executing orders.
The configuration provided below specifies the main parameters related to order execution:
* orderLifetime - This parameter defines the duration in picoseconds during which an
* orderLifetime - This parameter defines the duration in milliseconds during which an
order will be considered ready for execution, starting from the current time.
* orderExecTime - This parameter determines the duration in picoseconds for rechecking
* orderExecTime - This parameter determines the duration in milliseconds for rechecking
the execution status of an order. If an order was not executed within this timeframe,
the backlog will attempt to execute it again.
* suspendedPropability - This parameter sets the probability level for executing orders
Expand All @@ -103,9 +103,10 @@ in
of 95%.
-}
backlogConfig =
{ orderLifetime = 4500
, orderExecTime = 1500
, suspendedPropability = 0
{ orderLifetime = 45000000
, orderExecTime = 15000000
, suspendedPropability = 90000
, unsafeQueueOrderLifetime = 60000
},

{-
Expand Down Expand Up @@ -192,11 +193,14 @@ in
utxoStoreConfig =
{ utxoStorePath = "./path/to/utxoStore"
, createIfMissing = True
},
}

, unsafeEval =
{ unsafeTxFee = +320000
, exUnits = 165000000
, exMem = 530000
}
, httpSubmit =
{ submitUri = "http://localhost:8090/api/submit/tx"
}
}
2 changes: 1 addition & 1 deletion amm-executor/src/Spectrum/Executor.hs
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ wireApp = App { unApp = interceptSigTerm >> do
poolActionsV1 = mkPoolActions unsafeEval (PaymentPubKeyHash executorPkh) validatorsV1
poolActionsV2 = mkPoolActions unsafeEval (PaymentPubKeyHash executorPkh) validatorsV2
refInputs <- liftIO $ mkRefInputs txsInsRefs explorer
executorService <- mkOrdersExecutorService backlogService transactions explorer resolver poolActionsV1 poolActionsV2 refInputs
executorService <- mkOrdersExecutorService backlogService transactions backlogConfig explorer resolver poolActionsV1 poolActionsV2 refInputs
executor <- mkOrdersExecutor backlogService executorService
pendingOrdersLogging <- forComponent mkLogging "Bots.PendingOrdersHandler"
mempoolOrdersLogging <- forComponent mkLogging "Bots.MempoolOrdersHandler"
Expand Down
3 changes: 2 additions & 1 deletion amm-executor/src/Spectrum/Executor/Backlog/Config.hs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ import GHC.Natural
data BacklogServiceConfig = BacklogServiceConfig
{ orderLifetime :: !NominalDiffTime
, orderExecTime :: !NominalDiffTime
, suspendedPropability :: !Natural
, suspendedPropability :: !Natural
, unsafeQueueOrderLifetime :: !NominalDiffTime
} deriving (Generic, FromDhall, Show)

instance FromDhall NominalDiffTime where
Expand Down
2 changes: 1 addition & 1 deletion amm-executor/src/Spectrum/Executor/Backlog/Service.hs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import Spectrum.Executor.Backlog.Data.BacklogOrder
import Spectrum.Executor.Backlog.Persistence.BacklogStore
( BacklogStore(BacklogStore, exists, get, dropOrder, put, getAll) )
import Spectrum.Executor.Backlog.Config
( BacklogServiceConfig (BacklogServiceConfig, orderLifetime, orderExecTime, suspendedPropability) )
( BacklogServiceConfig (..) )
import Spectrum.Executor.Data.OrderState
( OrderState (..), OrderInState (PendingOrder, SuspendedOrder, InProgressOrder) )
import Spectrum.Executor.Types
Expand Down
16 changes: 14 additions & 2 deletions amm-executor/src/Spectrum/Executor/OrdersExecutor/Process.hs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@ import RIO
( (&), MonadReader, MonadUnliftIO, MonadIO (liftIO), QSem, (<&>) )
import qualified RIO.List as List
import Streamly.Prelude as S
( repeatM, mapM, MonadAsync, IsStream, before, drain )
( repeatM, mapM, MonadAsync, IsStream, before, drain, fromEffect )
import Control.Monad.Catch
( MonadThrow, SomeException, MonadCatch, catches, Handler (Handler) )
import RIO
( atomicModifyIORef' )

import System.Logging.Hlog
( MakeLogging (MakeLogging, forComponent), Logging (Logging, debugM, debugM) )
Expand Down Expand Up @@ -99,7 +101,17 @@ run'
-> OrdersExecutorService m
-> s m ()
run' Logging{..} BacklogService{..} OrdersExecutorService{..} =
S.repeatM (liftIO (threadDelay 5000000) >> tryAcquire) & S.mapM (\case
S.repeatM (liftIO (threadDelay 5000000) >> atomicModifyIORef' getQueue (\queue ->
case queue of
[] -> ([], Nothing)
[xs] -> ([], Just xs)
(x:xs) -> (xs, Just x)
)) & S.mapM (\case
Just order ->
pure $ Just order
Nothing ->
tryAcquire
) & S.mapM (\case
Just orderWithCreationTime@(OrderWithCreationTime order _) ->
infoM ("Going to execute order for pool" ++ show (orderId order)) >>
execute orderWithCreationTime
Expand Down
47 changes: 31 additions & 16 deletions amm-executor/src/Spectrum/Executor/OrdersExecutor/Service.hs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import Prelude hiding (drop)
import RIO.Time
( UTCTime, getCurrentTime, diffUTCTime, nominalDiffTimeToSeconds )
import RIO
( MonadReader, MonadUnliftIO, (<&>), MonadIO, Text )
( MonadReader, MonadUnliftIO, (<&>), MonadIO, Text, IORef, atomicModifyIORef', newIORef )
import qualified RIO.List as List
import Control.Monad.Catch
( MonadThrow, SomeException, MonadCatch, catches, Handler (Handler) )
Expand Down Expand Up @@ -68,33 +68,39 @@ import Data.Text (pack)
import Data.Aeson (encode)
import qualified System.Logging.Hlog as Trace
import Spectrum.Executor.OrdersExecutor.RefInputs (RefInputs(..))
import Spectrum.Executor.Backlog.Config (BacklogServiceConfig(..))

data OrdersExecutorService m = OrdersExecutorService
{ execute :: OrderWithCreationTime -> m ()
, executeUnsafe :: OrderWithCreationTime -> m ()
, getQueue :: IORef [OrderWithCreationTime]
}

mkOrdersExecutorService
:: forall f m env era.
( MonadUnliftIO m
, MonadReader env f
, MonadUnliftIO f
, HasType (MakeLogging f m) env
, HasType TxRefs env, MonadCatch m)
=> BacklogService m
-> Transactions m era
-> BacklogServiceConfig
-> Explorer m
-> PoolResolver m
-> PoolActions 'V1
-> PoolActions 'V2
-> RefInputs
-> f (OrdersExecutorService m)
mkOrdersExecutorService backlog transactions explorer resolver poolActionsV1 poolActionsV2 refInputs = do
MakeLogging{..} <- askContext
txRefsCfg <- askContext
logging <- forComponent "Bots.OrdersExecutorService"
mkOrdersExecutorService backlog transactions config explorer resolver poolActionsV1 poolActionsV2 refInputs = do
MakeLogging{..} <- askContext
txRefsCfg <- askContext
logging <- forComponent "Bots.OrdersExecutorService123"
unsafeOrderQueue <- newIORef []
pure $ OrdersExecutorService
{ execute = execute' logging txRefsCfg backlog transactions explorer resolver poolActionsV1 poolActionsV2
, executeUnsafe = executeUnsafe' logging refInputs backlog transactions explorer resolver poolActionsV1 poolActionsV2
{ execute = execute' logging txRefsCfg backlog transactions config explorer resolver poolActionsV1 poolActionsV2 unsafeOrderQueue
, executeUnsafe = executeUnsafe' logging refInputs backlog transactions config explorer resolver poolActionsV1 poolActionsV2 unsafeOrderQueue
, getQueue = unsafeOrderQueue
}

execute'
Expand All @@ -103,13 +109,15 @@ execute'
-> TxRefs
-> BacklogService m
-> Transactions m era
-> BacklogServiceConfig
-> Explorer m
-> PoolResolver m
-> PoolActions 'V1
-> PoolActions 'V2
-> IORef [OrderWithCreationTime]
-> OrderWithCreationTime
-> m ()
execute' l@Logging{..} txRefs backlog@BacklogService{suspend, drop} txs explorer resolver poolActionsV1 poolActionsV2 (OrderWithCreationTime order orderTime) = do
execute' l@Logging{..} txRefs backlog@BacklogService{suspend, drop} txs config explorer resolver poolActionsV1 poolActionsV2 priorityMap (OrderWithCreationTime order orderTime) = do
executionStartTime <- getCurrentTime
executeOrder' backlog l txRefs txs explorer resolver poolActionsV1 poolActionsV2 order executionStartTime `catches`
[ Handler (\ (execErr :: OrderExecErr) -> case execErr of
Expand All @@ -120,7 +128,7 @@ execute' l@Logging{..} txRefs backlog@BacklogService{suspend, drop} txs explorer
dropError ->
drop (orderId order) >> infoM ("Err " ++ show dropError ++ " occured for " ++ show (orderId order) ++ ". Going to drop")
)
, Handler (\ (dropError :: SomeException) -> processOrderExecutionException l backlog dropError order orderTime)
, Handler (\ (dropError :: SomeException) -> processOrderExecutionException l backlog config dropError order orderTime priorityMap executionStartTime)
]
executionEndTime <- getCurrentTime
let timeDiff = fromEnum $ nominalDiffTimeToSeconds $ diffUTCTime executionEndTime executionStartTime
Expand All @@ -134,13 +142,15 @@ executeUnsafe'
-> RefInputs
-> BacklogService m
-> Transactions m era
-> BacklogServiceConfig
-> Explorer m
-> PoolResolver m
-> PoolActions 'V1
-> PoolActions 'V2
-> IORef [OrderWithCreationTime]
-> OrderWithCreationTime
-> m ()
executeUnsafe' l@Logging{..} refInputs backlog@BacklogService{suspend, drop} txs explorer resolver poolActionsV1 poolActionsV2 (OrderWithCreationTime order orderTime) = do
executeUnsafe' l@Logging{..} refInputs backlog@BacklogService{suspend, drop} txs config explorer resolver poolActionsV1 poolActionsV2 priorityMap (OrderWithCreationTime order orderTime) = do
executionStartTime <- getCurrentTime
executeOrderUnsafe' backlog l refInputs txs explorer resolver poolActionsV1 poolActionsV2 order executionStartTime `catches`
[ Handler (\ (execErr :: OrderExecErr) -> case execErr of
Expand All @@ -151,21 +161,26 @@ executeUnsafe' l@Logging{..} refInputs backlog@BacklogService{suspend, drop} txs
dropError ->
drop (orderId order) >> infoM ("(Unsafe) Err " ++ show dropError ++ " occured for " ++ show (orderId order) ++ ". Going to drop")
)
, Handler (\ (dropError :: SomeException) -> processOrderExecutionException l backlog dropError order orderTime)
, Handler (\ (dropError :: SomeException) -> processOrderExecutionException l backlog config dropError order orderTime priorityMap executionStartTime)
]
executionEndTime <- getCurrentTime
let timeDiff = fromEnum $ nominalDiffTimeToSeconds $ diffUTCTime executionEndTime executionStartTime
infoM $ "(Unsafe) Time of start order processing is " ++ show executionStartTime
infoM $ "(Unsafe) Time of end order processing is " ++ show executionEndTime
infoM $ "(Unsafe) Time of processing order is " ++ show (timeDiff `div` 1000000000) ++ " mills"

processOrderExecutionException :: Monad m => Logging m -> BacklogService m -> SomeException -> Order -> UTCTime -> m ()
processOrderExecutionException Logging{..} BacklogService{suspend, drop} executionError order@(OnChain FullTxOut{..} _) orderTime = do
processOrderExecutionException :: MonadIO m => Logging m -> BacklogService m -> BacklogServiceConfig -> SomeException -> Order -> UTCTime -> IORef [OrderWithCreationTime] -> UTCTime -> m ()
processOrderExecutionException Logging{..} BacklogService{suspend, drop} BacklogServiceConfig{..} executionError order@(OnChain FullTxOut{..} _) orderTime priorityQueue executionStartTime = do
let errMsgText = pack (show executionError)
if isInfixOf "BadInputsUTxO" errMsgText && not (pack (show (txOutRefId fullTxOutRef)) `isInfixOf` errMsgText)
then
suspend (SuspendedOrder order orderTime) >>
infoM ("Got BadInputsUTxO error during order (" ++ show (orderId order) ++ ") execution without orderId (" ++ show (txOutRefId fullTxOutRef) ++ "). " ++ show errMsgText ++ ". Going to suspend order")
if (diffUTCTime executionStartTime orderTime < unsafeQueueOrderLifetime)
then
atomicModifyIORef' priorityQueue (\prevQueue -> (OrderWithCreationTime order orderTime : prevQueue, ())) >>
infoM ("Got BadInputsUTxO error during order (" ++ show (orderId order) ++ ") execution without orderId (" ++ show (txOutRefId fullTxOutRef) ++ "). " ++ show errMsgText ++ ". Going to put it to priority unsafe order queue")
else
suspend (SuspendedOrder order orderTime) >>
infoM ("Got BadInputsUTxO error during order (" ++ show (orderId order) ++ ") execution without orderId (" ++ show (txOutRefId fullTxOutRef) ++ "). " ++ show errMsgText ++ ". Going to suspend order")
else drop (orderId order) >> infoM ("Got error (" ++ show (processDropErrorMsg errMsgText)++ ") during order (" ++ show (orderId order) ++ ") " ++ ". Going to drop order")

processDropErrorMsg :: Text -> Text
Expand Down Expand Up @@ -293,7 +308,7 @@ runOrder
-> Logging m
-> m (TxCandidate, Predicted Core.Pool)
runOrder TxRefs{..} Explorer{..} (Pool (OnChain poolOut pool) version) (OnChain orderOut Core.AnyOrder{..}) PoolActions{..} Logging{..} = do
let
let
poolOutRef = case version of
V1 -> Interop.fromCardanoTxIn poolV1Ref
V2 -> Interop.fromCardanoTxIn poolV2Ref
Expand Down
1 change: 1 addition & 0 deletions amm-executor/test/Gen/ConstantsGen.hs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ cfgForOnlyPendingOrders = BacklogServiceConfig
{ orderLifetime = 900 :: NominalDiffTime
, orderExecTime = 600 :: NominalDiffTime
, suspendedPropability = 0
, unsafeQueueOrderLifetime = 600 :: NominalDiffTime
}

data ValidatorInfo = ValidatorInfo
Expand Down
12 changes: 8 additions & 4 deletions dcConfigs/dcSpectrumConfig.dhall
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
let FeePolicy = < Strict | Balance >
let FeePolicy = < Strict | Balance | SplitBetween : List Text >
let CollateralPolicy = < Ignore | Cover >
let Network = < Mainnet | Preview >

Expand Down Expand Up @@ -31,9 +31,10 @@ in
, createIfMissing = True
}
, backlogConfig =
{ orderLifetime = 4500
, orderExecTime = 1500
, suspendedPropability = 0
{ orderLifetime = 45000000
, orderExecTime = 15000000
, suspendedPropability = 50
, unsafeQueueOrderLifetime = 60000
}
, backlogStoreConfig =
{ storePath = "./data/backlogStore"
Expand Down Expand Up @@ -80,4 +81,7 @@ in
, exUnits = 165000000
, exMem = 530000
}
, httpSubmit =
{ submitUri = "http://localhost:8090/api/submit/tx"
}
}