Skip to content

Commit

Permalink
Merge pull request #23 from Candyman770/fix/timeoutException
Browse files Browse the repository at this point in the history
throwing TimeoutException for timeouts
  • Loading branch information
aravindgopall authored Apr 10, 2023
2 parents a21fa1b + b7aea75 commit 22d8146
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 11 deletions.
3 changes: 2 additions & 1 deletion hedis.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ library
HTTP,
errors,
network-uri,
unliftio-core
unliftio-core,
random
if !impl(ghc >= 8.0)
build-depends:
semigroups >= 0.11 && < 0.19
Expand Down
24 changes: 18 additions & 6 deletions src/Database/Redis/Cluster.hs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ module Database.Redis.Cluster
, ShardMap(..)
, HashSlot
, Shard(..)
, TimeoutException(..)
, connect
, disconnect
, requestPipelined
Expand All @@ -26,7 +27,7 @@ import Data.Maybe(mapMaybe, fromMaybe)
import Data.List(nub, sortBy, find)
import Data.Map(fromListWith, assocs)
import Data.Function(on)
import Control.Exception(Exception, SomeException, throwIO, BlockedIndefinitelyOnMVar(..), catches, Handler(..), try)
import Control.Exception(Exception, SomeException, throwIO, BlockedIndefinitelyOnMVar(..), catches, Handler(..), try, fromException)
import Control.Concurrent.Async(race)
import Control.Concurrent(threadDelay)
import Control.Concurrent.MVar(MVar, newMVar, readMVar, modifyMVar, modifyMVar_)
Expand Down Expand Up @@ -124,6 +125,9 @@ instance Exception CrossSlotException
data NoNodeException = NoNodeException deriving (Show, Typeable)
instance Exception NoNodeException

data TimeoutException = TimeoutException String deriving (Show, Typeable)
instance Exception TimeoutException

connect :: (Host -> CC.PortID -> Maybe Int -> IO CC.ConnectionContext) -> [CMD.CommandInfo] -> MVar ShardMap -> Maybe Int -> Bool -> ([NodeConnection] -> IO ShardMap) -> IO Connection
connect withAuth commandInfos shardMapVar timeoutOpt isReadOnly refreshShardMap = do
shardMap <- readMVar shardMapVar
Expand Down Expand Up @@ -256,10 +260,18 @@ evaluatePipeline shardMapVar refreshShardmapAction conn requests = do
-- take a random connection where there are no exceptions.
-- PERF_CONCERN: Since usually we send only one request at time, this won't be
-- heavy perf issue. but still should be evaluated and figured out with complete rewrite.
resps <- concat <$> mapM (\(resp, (cc, r)) -> case resp of
Right v -> return v
Left (_ :: SomeException) -> executeRequests (getRandomConnection cc conn) r
) (zip eresps requestsByNode)

-- throwing exception for timeouts thus closing the connection instead of retrying.
-- otherwise if there is any response in the connection buffer it'll get forwarded to other requests that are reusing the same connection.
-- leading to jumbled up responses
resps <- concat <$>
mapM (\(resp, (cc, r)) -> case resp of
Right v -> return v
Left (err :: SomeException) ->
case fromException err of
Just (er :: TimeoutException) -> throwIO er
_ -> executeRequests (getRandomConnection cc conn) r
) (zip eresps requestsByNode)
-- check for any moved in both responses and continue the flow.
when (any (moved . rawResponse) resps) refreshShardMapVar
retriedResps <- mapM (retry 0) resps
Expand Down Expand Up @@ -446,7 +458,7 @@ requestNode (NodeConnection ctx lastRecvRef _) requests = do
eresp <- race requestNodeImpl (threadDelay envTimeout)
case eresp of
Left e -> return e
Right _ -> putStrLn "timeout happened" *> throwIO NoNodeException
Right _ -> putStrLn "timeout happened" *> throwIO (TimeoutException "Request Timeout")

where
requestNodeImpl :: IO [Reply]
Expand Down
15 changes: 11 additions & 4 deletions src/Database/Redis/Connection.hs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ import qualified Data.Time as Time
import Network.TLS (ClientParams)
import qualified Network.Socket as NS
import qualified Data.HashMap.Strict as HM
import System.Random (randomRIO)
import System.Environment (lookupEnv)
import Data.Maybe (fromMaybe)
import Text.Read (readMaybe)

import qualified Database.Redis.ProtocolPipelining as PP
import Database.Redis.Core(Redis, runRedisInternal, runRedisClusteredInternal)
Expand Down Expand Up @@ -276,19 +280,22 @@ refreshShardMap (Cluster.Connection nodeConns _ _ _ _) =

refreshShardMapWithNodeConn :: [Cluster.NodeConnection] -> IO ShardMap
refreshShardMapWithNodeConn [] = throwIO $ ClusterConnectError (Error "Couldn't refresh shardMap due to connection error")
refreshShardMapWithNodeConn ((Cluster.NodeConnection ctx _ _) : xs) = do
refreshShardMapWithNodeConn nodeConnsList = do
selectedIdx <- randomRIO (0, (length nodeConnsList) - 1)
let (Cluster.NodeConnection ctx _ _) = nodeConnsList !! selectedIdx
pipelineConn <- PP.fromCtx ctx
raceResult <- race (threadDelay (10^(3 :: Int))) (try $ refreshShardMapWithConn pipelineConn True) -- racing with delay of 1 ms
envTimeout <- fromMaybe (10 ^ (3 :: Int)) . (>>= readMaybe) <$> lookupEnv "REDIS_CLUSTER_SLOTS_TIMEOUT"
raceResult <- race (threadDelay envTimeout) (try $ refreshShardMapWithConn pipelineConn True) -- racing with delay of default 1 ms
case raceResult of
Left () -> do
print $ "TimeoutForConnection " <> show ctx
refreshShardMapWithNodeConn xs
throwIO $ Cluster.TimeoutException "ClusterSlots Timeout"
Right eiShardMapResp ->
case eiShardMapResp of
Right shardMap -> pure shardMap
Left (err :: SomeException) -> do
print $ "ShardMapRefreshError-" <> show err
refreshShardMapWithNodeConn xs
throwIO $ ClusterConnectError (Error "Couldn't refresh shardMap due to connection error")

refreshShardMapWithConn :: PP.Connection -> Bool -> IO ShardMap
refreshShardMapWithConn pipelineConn _ = do
Expand Down

0 comments on commit 22d8146

Please sign in to comment.