From 6893ec3b81b9ea62b0862078e1e35f2dd08c8045 Mon Sep 17 00:00:00 2001 From: Emmanuel Denloye-Ito Date: Wed, 31 Jan 2024 22:16:39 -0500 Subject: [PATCH] retry es http requests --- exec/TXG.hs | 38 +++++++++++++++++++++++++++----------- exec/TXG/Types.hs | 11 +++++++++++ 2 files changed, 38 insertions(+), 11 deletions(-) diff --git a/exec/TXG.hs b/exec/TXG.hs index 56fd0f0..2827573 100644 --- a/exec/TXG.hs +++ b/exec/TXG.hs @@ -508,20 +508,24 @@ mkElasticSearchRequest esConf version start end rks = do , "timestamp" .= now ] -esPutReq :: MonadIO m => MonadThrow m => Manager -> ElasticSearchConfig -> ChainwebVersion -> m () +esPutReq :: MonadIO m => MonadThrow m => Manager -> ElasticSearchConfig -> ChainwebVersion -> m (Either String Value) esPutReq mgr esConf version = do esReq <- liftIO $ createElasticsearchIndex esConf version - liftIO $ httpJson mgr esReq + resp <- liftIO $ httpLbs esReq mgr + return $ eitherDecode @Value (responseBody resp) -esCheckIndex :: MonadIO m => MonadThrow m => Manager -> Logger Text -> ElasticSearchConfig -> ChainwebVersion -> m () +esCheckIndex :: MonadIO m => MonadThrow m => Manager -> Logger Text -> ElasticSearchConfig -> ChainwebVersion -> m (Either String Value) esCheckIndex mgr logger esConf version = do let indexName :: String indexName = printf "chainweb-%s-%s" (T.unpack $ chainwebVersionToText version) (fromMaybe "" $ esIndex esConf) req <- HTTP.parseUrlThrow $ printf "http://%s:%s/%s?pretty" (T.unpack $ hostnameToText $ esHost esConf) (show $ esPort esConf) indexName resp <- liftIO $ httpLbs req mgr + case eitherDecode @Value (responseBody resp) of - Left err -> throwM $ userError err - Right _ -> liftIO $ loggerFunIO logger Info $ "Index " <> T.pack indexName <> " exists" + err@(Left _) -> return err + r@(Right _) -> do + liftIO $ loggerFunIO logger Info $ "Index " <> T.pack indexName <> " exists" + return r createElasticsearchIndex :: MonadIO m => MonadThrow m => ElasticSearchConfig -> ChainwebVersion -> m HTTP.Request createElasticsearchIndex esConf version = do @@ -732,9 +736,13 @@ realTransactions config (ChainwebHost h _p2p service) tcut tv distribution = do gen <- liftIO createSystemRandom -- create elasticsearch index if ElasticSearchConfig is set forM_ (elasticSearchConfig config) $ \esConfig -> do - esPutReq (confManager cfg) esConfig (nodeVersion config) + let policy = exponentialBackoff (esDelay esConfig) <> limitRetries 5 + toRetry _ = \case + Right _ -> return False + Left _ -> return True + void $ retrying policy toRetry (const $ esPutReq (confManager cfg) esConfig (nodeVersion config)) logger' <- ask - esCheckIndex (confManager cfg) logger' esConfig (nodeVersion config) + void $ retrying policy toRetry (const $ esCheckIndex (confManager cfg) logger' esConfig (nodeVersion config)) let act = loop (confirmationDepth config) tcut (liftIO randomEnum >>= generateTransactions False (verbose config)) env = set (field @"confKeysets") accountMap cfg stt = TXGState gen tv chains @@ -829,9 +837,13 @@ realCoinTransactions config (ChainwebHost h _p2p service) tcut tv distribution = -- Set up values for running the effect stack. gen <- liftIO createSystemRandom forM_ (elasticSearchConfig config) $ \esConfig -> do - esPutReq (confManager cfg) esConfig (nodeVersion config) + let policy = exponentialBackoff (esDelay esConfig) <> limitRetries 5 + toRetry _ = \case + Right _ -> return False + Left _ -> return True + void $ retrying policy toRetry (const $ esPutReq (confManager cfg) esConfig (nodeVersion config)) logger' <- ask - esCheckIndex (confManager cfg) logger' esConfig (nodeVersion config) + void $ retrying policy toRetry (const $ esCheckIndex (confManager cfg) logger' esConfig (nodeVersion config)) let act = loop (confirmationDepth config) tcut (generateTransactions True (verbose config) CoinContract) env = set (field @"confKeysets") accountMap cfg stt = TXGState gen tv chains @@ -867,9 +879,13 @@ simpleExpressions config (ChainwebHost h _p2p service) tcut tv distribution = do -- Set up values for running the effect stack. gen <- liftIO createSystemRandom forM_ (elasticSearchConfig config) $ \esConfig -> do - esPutReq (confManager gencfg) esConfig (nodeVersion config) + let policy = exponentialBackoff (esDelay esConfig) <> limitRetries 5 + toRetry _ = \case + Right _ -> return False + Left _ -> return True + void $ retrying policy toRetry (const $ esPutReq (confManager gencfg) esConfig (nodeVersion config)) logger' <- ask - esCheckIndex (confManager gencfg) logger' esConfig (nodeVersion config) + void $ retrying policy toRetry (const $ esCheckIndex (confManager gencfg) logger' esConfig (nodeVersion config)) let chs = maybe (versionChains $ nodeVersion config) NES.fromList . NEL.nonEmpty $ nodeChainIds config diff --git a/exec/TXG/Types.hs b/exec/TXG/Types.hs index ca130e7..5950f70 100644 --- a/exec/TXG/Types.hs +++ b/exec/TXG/Types.hs @@ -202,6 +202,7 @@ data ElasticSearchConfig = ElasticSearchConfig , esPort :: !Port , esIndex :: !(Maybe Text) , esApiKey :: !(Maybe Text) + , esDelay :: !Int } deriving (Show, Generic) instance ToJSON ElasticSearchConfig where @@ -210,6 +211,7 @@ instance ToJSON ElasticSearchConfig where , "esPort" .= esPort o , "esIndex" .= esIndex o , "esApiKey" .= esApiKey o + , "esDelay" .= esDelay o ] instance FromJSON ElasticSearchConfig where @@ -218,6 +220,7 @@ instance FromJSON ElasticSearchConfig where <*> o .: "esPort" <*> o .: "esIndex" <*> o .: "esApiKey" + <*> o .: "esDelay" data Args = Args { scriptCommand :: !TXCmd @@ -383,6 +386,14 @@ pElasticSearchConfig = ElasticSearchConfig <*> pPort (Just "elastic-search") <*> pIndexName <*> pApiKey + <*> pDelay + +pDelay :: O.Parser Int +pDelay = option auto + % long "elastic-search-delay" + <> short 'd' + <> metavar "INT" + <> help "The delay in microseconds to wait before sending data to elastic search." pApiKey :: O.Parser (Maybe Text) pApiKey = optional $ strOption