Skip to content

Commit

Permalink
retry es http requests
Browse files Browse the repository at this point in the history
  • Loading branch information
giantimi committed Feb 1, 2024
1 parent 1036cfe commit 6893ec3
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 11 deletions.
38 changes: 27 additions & 11 deletions exec/TXG.hs
Original file line number Diff line number Diff line change
Expand Up @@ -508,20 +508,24 @@ mkElasticSearchRequest esConf version start end rks = do
, "timestamp" .= now
]

esPutReq :: MonadIO m => MonadThrow m => Manager -> ElasticSearchConfig -> ChainwebVersion -> m ()
esPutReq :: MonadIO m => MonadThrow m => Manager -> ElasticSearchConfig -> ChainwebVersion -> m (Either String Value)
esPutReq mgr esConf version = do
esReq <- liftIO $ createElasticsearchIndex esConf version
liftIO $ httpJson mgr esReq
resp <- liftIO $ httpLbs esReq mgr
return $ eitherDecode @Value (responseBody resp)

esCheckIndex :: MonadIO m => MonadThrow m => Manager -> Logger Text -> ElasticSearchConfig -> ChainwebVersion -> m ()
esCheckIndex :: MonadIO m => MonadThrow m => Manager -> Logger Text -> ElasticSearchConfig -> ChainwebVersion -> m (Either String Value)
esCheckIndex mgr logger esConf version = do
let indexName :: String
indexName = printf "chainweb-%s-%s" (T.unpack $ chainwebVersionToText version) (fromMaybe "" $ esIndex esConf)
req <- HTTP.parseUrlThrow $ printf "http://%s:%s/%s?pretty" (T.unpack $ hostnameToText $ esHost esConf) (show $ esPort esConf) indexName
resp <- liftIO $ httpLbs req mgr

case eitherDecode @Value (responseBody resp) of
Left err -> throwM $ userError err
Right _ -> liftIO $ loggerFunIO logger Info $ "Index " <> T.pack indexName <> " exists"
err@(Left _) -> return err
r@(Right _) -> do
liftIO $ loggerFunIO logger Info $ "Index " <> T.pack indexName <> " exists"
return r

createElasticsearchIndex :: MonadIO m => MonadThrow m => ElasticSearchConfig -> ChainwebVersion -> m HTTP.Request
createElasticsearchIndex esConf version = do
Expand Down Expand Up @@ -732,9 +736,13 @@ realTransactions config (ChainwebHost h _p2p service) tcut tv distribution = do
gen <- liftIO createSystemRandom
-- create elasticsearch index if ElasticSearchConfig is set
forM_ (elasticSearchConfig config) $ \esConfig -> do
esPutReq (confManager cfg) esConfig (nodeVersion config)
let policy = exponentialBackoff (esDelay esConfig) <> limitRetries 5
toRetry _ = \case
Right _ -> return False
Left _ -> return True
void $ retrying policy toRetry (const $ esPutReq (confManager cfg) esConfig (nodeVersion config))
logger' <- ask
esCheckIndex (confManager cfg) logger' esConfig (nodeVersion config)
void $ retrying policy toRetry (const $ esCheckIndex (confManager cfg) logger' esConfig (nodeVersion config))
let act = loop (confirmationDepth config) tcut (liftIO randomEnum >>= generateTransactions False (verbose config))
env = set (field @"confKeysets") accountMap cfg
stt = TXGState gen tv chains
Expand Down Expand Up @@ -829,9 +837,13 @@ realCoinTransactions config (ChainwebHost h _p2p service) tcut tv distribution =
-- Set up values for running the effect stack.
gen <- liftIO createSystemRandom
forM_ (elasticSearchConfig config) $ \esConfig -> do
esPutReq (confManager cfg) esConfig (nodeVersion config)
let policy = exponentialBackoff (esDelay esConfig) <> limitRetries 5
toRetry _ = \case
Right _ -> return False
Left _ -> return True
void $ retrying policy toRetry (const $ esPutReq (confManager cfg) esConfig (nodeVersion config))
logger' <- ask
esCheckIndex (confManager cfg) logger' esConfig (nodeVersion config)
void $ retrying policy toRetry (const $ esCheckIndex (confManager cfg) logger' esConfig (nodeVersion config))
let act = loop (confirmationDepth config) tcut (generateTransactions True (verbose config) CoinContract)
env = set (field @"confKeysets") accountMap cfg
stt = TXGState gen tv chains
Expand Down Expand Up @@ -867,9 +879,13 @@ simpleExpressions config (ChainwebHost h _p2p service) tcut tv distribution = do
-- Set up values for running the effect stack.
gen <- liftIO createSystemRandom
forM_ (elasticSearchConfig config) $ \esConfig -> do
esPutReq (confManager gencfg) esConfig (nodeVersion config)
let policy = exponentialBackoff (esDelay esConfig) <> limitRetries 5
toRetry _ = \case
Right _ -> return False
Left _ -> return True
void $ retrying policy toRetry (const $ esPutReq (confManager gencfg) esConfig (nodeVersion config))
logger' <- ask
esCheckIndex (confManager gencfg) logger' esConfig (nodeVersion config)
void $ retrying policy toRetry (const $ esCheckIndex (confManager gencfg) logger' esConfig (nodeVersion config))
let chs = maybe (versionChains $ nodeVersion config) NES.fromList
. NEL.nonEmpty
$ nodeChainIds config
Expand Down
11 changes: 11 additions & 0 deletions exec/TXG/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ data ElasticSearchConfig = ElasticSearchConfig
, esPort :: !Port
, esIndex :: !(Maybe Text)
, esApiKey :: !(Maybe Text)
, esDelay :: !Int
} deriving (Show, Generic)

instance ToJSON ElasticSearchConfig where
Expand All @@ -210,6 +211,7 @@ instance ToJSON ElasticSearchConfig where
, "esPort" .= esPort o
, "esIndex" .= esIndex o
, "esApiKey" .= esApiKey o
, "esDelay" .= esDelay o
]

instance FromJSON ElasticSearchConfig where
Expand All @@ -218,6 +220,7 @@ instance FromJSON ElasticSearchConfig where
<*> o .: "esPort"
<*> o .: "esIndex"
<*> o .: "esApiKey"
<*> o .: "esDelay"

data Args = Args
{ scriptCommand :: !TXCmd
Expand Down Expand Up @@ -383,6 +386,14 @@ pElasticSearchConfig = ElasticSearchConfig
<*> pPort (Just "elastic-search")
<*> pIndexName
<*> pApiKey
<*> pDelay

pDelay :: O.Parser Int
pDelay = option auto
% long "elastic-search-delay"
<> short 'd'
<> metavar "INT"
<> help "The delay in microseconds to wait before sending data to elastic search."

pApiKey :: O.Parser (Maybe Text)
pApiKey = optional $ strOption
Expand Down

0 comments on commit 6893ec3

Please sign in to comment.