Skip to content

Commit

Permalink
changing NodeConnection to Pool of NodeConnection and taking Cluster …
Browse files Browse the repository at this point in the history
…Connection out of the Pool
  • Loading branch information
shashitnak committed Apr 11, 2023
1 parent 22d8146 commit 039a83d
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 50 deletions.
48 changes: 24 additions & 24 deletions src/Database/Redis/Cluster.hs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ module Database.Redis.Cluster
, Shard(..)
, TimeoutException(..)
, connect
, disconnect
, destroyNodeResources
, requestPipelined
, requestMasterNodes
, nodes
Expand All @@ -28,6 +28,7 @@ import Data.List(nub, sortBy, find)
import Data.Map(fromListWith, assocs)
import Data.Function(on)
import Control.Exception(Exception, SomeException, throwIO, BlockedIndefinitelyOnMVar(..), catches, Handler(..), try, fromException)
import Data.Pool(Pool, createPool, withResource, destroyAllResources)
import Control.Concurrent.Async(race)
import Control.Concurrent(threadDelay)
import Control.Concurrent.MVar(MVar, newMVar, readMVar, modifyMVar, modifyMVar_)
Expand Down Expand Up @@ -62,7 +63,7 @@ type IsReadOnly = Bool
data Connection = Connection (HM.HashMap NodeID NodeConnection) (MVar Pipeline) (MVar ShardMap) CMD.InfoMap IsReadOnly

-- | A connection to a single node in the cluster, similar to 'ProtocolPipelining.Connection'
data NodeConnection = NodeConnection CC.ConnectionContext (IOR.IORef (Maybe B.ByteString)) NodeID
data NodeConnection = NodeConnection (Pool CC.ConnectionContext) (IOR.IORef (Maybe B.ByteString)) NodeID

instance Show NodeConnection where
show (NodeConnection _ _ id1) = "nodeId: " <> show id1
Expand Down Expand Up @@ -128,8 +129,8 @@ instance Exception NoNodeException
data TimeoutException = TimeoutException String deriving (Show, Typeable)
instance Exception TimeoutException

connect :: (Host -> CC.PortID -> Maybe Int -> IO CC.ConnectionContext) -> [CMD.CommandInfo] -> MVar ShardMap -> Maybe Int -> Bool -> ([NodeConnection] -> IO ShardMap) -> IO Connection
connect withAuth commandInfos shardMapVar timeoutOpt isReadOnly refreshShardMap = do
connect :: (Host -> CC.PortID -> Maybe Int -> IO CC.ConnectionContext) -> [CMD.CommandInfo] -> MVar ShardMap -> Maybe Int -> Bool -> ([NodeConnection] -> IO ShardMap) -> Time.NominalDiffTime -> Int -> IO Connection
connect withAuth commandInfos shardMapVar timeoutOpt isReadOnly refreshShardMap idleTime maxResources = do
shardMap <- readMVar shardMapVar
stateVar <- newMVar $ Pending []
pipelineVar <- newMVar $ Pipeline stateVar
Expand Down Expand Up @@ -161,15 +162,15 @@ connect withAuth commandInfos shardMapVar timeoutOpt isReadOnly refreshShardMap
) (mempty, False) info
connectNode :: Node -> IO (NodeID, NodeConnection)
connectNode (Node n _ host port) = do
ctx <- withAuth host (CC.PortNumber $ toEnum port) timeoutOpt
ctx <- createPool (withAuth host (CC.PortNumber $ toEnum port) timeoutOpt) CC.disconnect 1 idleTime maxResources
ref <- IOR.newIORef Nothing
return (n, NodeConnection ctx ref n)
refreshShardMapVar :: ShardMap -> IO ()
refreshShardMapVar shardMap = hasLocked $ modifyMVar_ shardMapVar (const (pure shardMap))

disconnect :: Connection -> IO ()
disconnect (Connection nodeConnMap _ _ _ _ ) = mapM_ disconnectNode (HM.elems nodeConnMap) where
disconnectNode (NodeConnection nodeCtx _ _) = CC.disconnect nodeCtx
destroyNodeResources :: Connection -> IO ()
destroyNodeResources (Connection nodeConnMap _ _ _ _ ) = mapM_ disconnectNode (HM.elems nodeConnMap) where
disconnectNode (NodeConnection nodePool _ _) = destroyAllResources nodePool

-- Add a request to the current pipeline for this connection. The pipeline will
-- be executed implicitly as soon as any result returned from this function is
Expand Down Expand Up @@ -453,34 +454,33 @@ allMasterNodes (Connection nodeConns _ _ _ _) (ShardMap shardMap) =
onlyMasterNodes = (\(Shard master _) -> master) <$> nub (IntMap.elems shardMap)

requestNode :: NodeConnection -> [[B.ByteString]] -> IO [Reply]
requestNode (NodeConnection ctx lastRecvRef _) requests = do
requestNode (NodeConnection pool lastRecvRef _) requests = do
envTimeout <- round . (\x -> (x :: Time.NominalDiffTime) * 1000000) . realToFrac . fromMaybe (0.5 :: Double) . (>>= readMaybe) <$> lookupEnv "REDIS_REQUEST_NODE_TIMEOUT"
eresp <- race requestNodeImpl (threadDelay envTimeout)
eresp <- race (withResource pool requestNodeImpl) (threadDelay envTimeout)

This comment has been minimized.

Copy link
@ishan-juspay

ishan-juspay Apr 11, 2023

race on threadDelay needs to be invoked within withResource call. The idea is withResource call should itself destroy the connection in case of an exception (which will be raised when race function throws the exception).

case eresp of
Left e -> return e
Right _ -> putStrLn "timeout happened" *> throwIO (TimeoutException "Request Timeout")

where
requestNodeImpl :: IO [Reply]
requestNodeImpl = do
mapM_ (sendNode . renderRequest) requests
requestNodeImpl :: CC.ConnectionContext -> IO [Reply]
requestNodeImpl ctx = do
mapM_ (sendNode ctx . renderRequest) requests
_ <- CC.flush ctx
replicateM (length requests) recvNode
sendNode :: B.ByteString -> IO ()
sendNode = CC.send ctx
recvNode :: IO Reply
recvNode = do
replicateM (length requests) $ recvNode ctx
sendNode :: CC.ConnectionContext -> B.ByteString -> IO ()
sendNode = CC.send
recvNode :: CC.ConnectionContext -> IO Reply
recvNode ctx = do
maybeLastRecv <- IOR.readIORef lastRecvRef
scanResult <- case maybeLastRecv of
Just lastRecv -> Scanner.scanWith (CC.recv ctx) reply lastRecv
Nothing -> Scanner.scanWith (CC.recv ctx) reply B.empty

case scanResult of
Scanner.Fail{} -> CC.errConnClosed
Scanner.More{} -> error "Hedis: parseWith returned Partial"
Scanner.Done rest' r -> do
IOR.writeIORef lastRecvRef (Just rest')
return r
Scanner.Fail{} -> CC.errConnClosed
Scanner.More{} -> error "Hedis: parseWith returned Partial"
Scanner.Done rest' r -> do
IOR.writeIORef lastRecvRef (Just rest')
return r

{-# INLINE nodes #-}
nodes :: ShardMap -> [Node]
Expand Down
65 changes: 39 additions & 26 deletions src/Database/Redis/Connection.hs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ScopedTypeVariables #-}

module Database.Redis.Connection where

import Control.Exception
Expand Down Expand Up @@ -33,6 +32,7 @@ import qualified Database.Redis.Cluster as Cluster
import qualified Database.Redis.ConnectionContext as CC
import Control.Concurrent (threadDelay)
import Control.Concurrent.Async (race)
import qualified Database.Redis.Types as T
--import qualified Database.Redis.Cluster.Pipeline as ClusterPipeline

import Database.Redis.Commands
Expand All @@ -54,7 +54,7 @@ import Database.Redis.Commands
-- 'connect' function to create one.
data Connection
= NonClusteredConnection (Pool PP.Connection)
| ClusteredConnection (MVar ShardMap) (Pool Cluster.Connection)
| ClusteredConnection (MVar ShardMap) Cluster.Connection

-- |Information for connnecting to a Redis server.
--
Expand Down Expand Up @@ -173,7 +173,7 @@ checkedConnect connInfo = do
-- |Destroy all idle resources in the pool.
disconnect :: Connection -> IO ()
disconnect (NonClusteredConnection pool) = destroyAllResources pool
disconnect (ClusteredConnection _ pool) = destroyAllResources pool
disconnect (ClusteredConnection _ conn) = Cluster.destroyNodeResources conn

-- | Memory bracket around 'connect' and 'disconnect'.
withConnect :: (Catch.MonadMask m, MonadIO m) => ConnectInfo -> (Connection -> m c) -> m c
Expand All @@ -191,8 +191,8 @@ withCheckedConnect connInfo = bracket (checkedConnect connInfo) disconnect
runRedis :: Connection -> Redis a -> IO a
runRedis (NonClusteredConnection pool) redis =
withResource pool $ \conn -> runRedisInternal conn redis
runRedis (ClusteredConnection _ pool) redis =
withResource pool $ \conn -> runRedisClusteredInternal conn (refreshShardMap conn) redis
runRedis (ClusteredConnection _ conn) redis =
runRedisClusteredInternal conn (refreshShardMap conn) redis

newtype ClusterConnectError = ClusterConnectError Reply
deriving (Eq, Show, Typeable)
Expand Down Expand Up @@ -224,9 +224,10 @@ connectCluster bootstrapConnInfo = do
Right infos -> do
let
isConnectionReadOnly = connectReadOnly bootstrapConnInfo
clusterConnection = Cluster.connect withAuth infos shardMapVar timeoutOptUs isConnectionReadOnly refreshShardMapWithNodeConn
pool <- createPool (clusterConnect isConnectionReadOnly clusterConnection) Cluster.disconnect 1 (connectMaxIdleTime bootstrapConnInfo) (connectMaxConnections bootstrapConnInfo)
return $ ClusteredConnection shardMapVar pool
clusterConnection = Cluster.connect withAuth infos shardMapVar timeoutOptUs isConnectionReadOnly refreshShardMapWithNodeConn (connectMaxIdleTime bootstrapConnInfo) (connectMaxConnections bootstrapConnInfo)
-- pool <- createPool (clusterConnect isConnectionReadOnly clusterConnection) Cluster.disconnect 3 (connectMaxIdleTime bootstrapConnInfo) (connectMaxConnections bootstrapConnInfo)
connection <- clusterConnect isConnectionReadOnly clusterConnection
return $ ClusteredConnection shardMapVar connection
where
withAuth host port timeout = do
conn <- PP.connect host port timeout
Expand All @@ -249,13 +250,24 @@ connectCluster bootstrapConnInfo = do
clusterConnect :: Bool -> IO Cluster.Connection -> IO Cluster.Connection
clusterConnect readOnlyConnection connection = do
clusterConn@(Cluster.Connection nodeMap _ _ _ _) <- connection
nodesConns <- sequence $ ( PP.fromCtx . (\(Cluster.NodeConnection ctx _ _) -> ctx ) . snd) <$> (HM.toList nodeMap)
nodesConns <- sequence $ (ctxToConn . snd) <$> (HM.toList nodeMap)
when readOnlyConnection $
mapM_ (\conn -> do
PP.beginReceiving conn
runRedisInternal conn readOnly
mapM_ (\maybeConn -> case maybeConn of
Just conn -> do
PP.beginReceiving conn
runRedisInternal conn readOnly
Nothing -> return $ Right (T.Status "Connection does not exist")
) nodesConns
return clusterConn
where
ctxToConn :: Cluster.NodeConnection -> IO (Maybe PP.Connection)
ctxToConn (Cluster.NodeConnection pool _ nid) = do
maybeConn <- try $ withResource pool PP.fromCtx
case maybeConn of
Right ppConn -> return $ Just ppConn
Left (_ :: SomeException) -> do
putStrLn ("SomeException Occured in NodeID " ++ show nid)
return Nothing

shardMapFromClusterSlotsResponse :: ClusterSlotsResponse -> IO ShardMap
shardMapFromClusterSlotsResponse ClusterSlotsResponse{..} = ShardMap <$> foldr mkShardMap (pure IntMap.empty) clusterSlotsResponseEntries where
Expand All @@ -282,20 +294,21 @@ refreshShardMapWithNodeConn :: [Cluster.NodeConnection] -> IO ShardMap
refreshShardMapWithNodeConn [] = throwIO $ ClusterConnectError (Error "Couldn't refresh shardMap due to connection error")
refreshShardMapWithNodeConn nodeConnsList = do
selectedIdx <- randomRIO (0, (length nodeConnsList) - 1)
let (Cluster.NodeConnection ctx _ _) = nodeConnsList !! selectedIdx
pipelineConn <- PP.fromCtx ctx
envTimeout <- fromMaybe (10 ^ (3 :: Int)) . (>>= readMaybe) <$> lookupEnv "REDIS_CLUSTER_SLOTS_TIMEOUT"
raceResult <- race (threadDelay envTimeout) (try $ refreshShardMapWithConn pipelineConn True) -- racing with delay of default 1 ms
case raceResult of
Left () -> do
print $ "TimeoutForConnection " <> show ctx
throwIO $ Cluster.TimeoutException "ClusterSlots Timeout"
Right eiShardMapResp ->
case eiShardMapResp of
Right shardMap -> pure shardMap
Left (err :: SomeException) -> do
print $ "ShardMapRefreshError-" <> show err
throwIO $ ClusterConnectError (Error "Couldn't refresh shardMap due to connection error")
let (Cluster.NodeConnection pool _ _) = nodeConnsList !! selectedIdx
withResource pool $ \ctx -> do
pipelineConn <- PP.fromCtx ctx
envTimeout <- fromMaybe (10 ^ (3 :: Int)) . (>>= readMaybe) <$> lookupEnv "REDIS_CLUSTER_SLOTS_TIMEOUT"
raceResult <- race (threadDelay envTimeout) (try $ refreshShardMapWithConn pipelineConn True) -- racing with delay of default 1 ms
case raceResult of
Left () -> do
print $ "TimeoutForConnection " <> show ctx
throwIO $ Cluster.TimeoutException "ClusterSlots Timeout"
Right eiShardMapResp ->
case eiShardMapResp of
Right shardMap -> pure shardMap
Left (err :: SomeException) -> do
print $ "ShardMapRefreshError-" <> show err
throwIO $ ClusterConnectError (Error "Couldn't refresh shardMap due to connection error")

refreshShardMapWithConn :: PP.Connection -> Bool -> IO ShardMap
refreshShardMapWithConn pipelineConn _ = do
Expand Down

0 comments on commit 039a83d

Please sign in to comment.