Skip to content

Commit

Permalink
server: try to fix cluster consistence of resource allocation (#1824)
Browse files Browse the repository at this point in the history
* server: do not clean allocation table on startup

* server: retry before actually re-allocate a resource

* connector: avoid creating connectors with the same name
  • Loading branch information
Commelina authored May 28, 2024
1 parent 5d2e0f4 commit 87a87f5
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 38 deletions.
84 changes: 50 additions & 34 deletions common/server/HStream/Common/Server/Lookup.hs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ module HStream.Common.Server.Lookup
, kafkaResourceMetaId
) where

import Control.Concurrent (threadDelay)
import Control.Concurrent.STM
import Control.Exception (SomeException (..), throwIO,
try)
Expand Down Expand Up @@ -41,47 +42,62 @@ lookupNodePersist
-> Text
-> Maybe Text
-> IO A.ServerNode
lookupNodePersist metaHandle gossipContext loadBalanceHashRing
key metaId advertisedListenersKey = do
-- FIXME: it will insert the results of lookup no matter the resource exists
-- or not
M.getMetaWithVer @TaskAllocation metaId metaHandle >>= \case
Nothing -> do
(epoch, hashRing) <- readTVarIO loadBalanceHashRing
theNode <- getResNode hashRing key advertisedListenersKey
try (M.insertMeta @TaskAllocation
metaId
(TaskAllocation epoch (A.serverNodeId theNode))
metaHandle) >>= \case
Left (e :: SomeException) -> do
-- TODO: add a retry limit here
Log.warning $ "lookupNodePersist exception: " <> Log.buildString' e
<> ", retry..."
lookupNodePersist metaHandle gossipContext loadBalanceHashRing
key metaId advertisedListenersKey
Right () -> return theNode
Just (TaskAllocation epoch nodeId, version) -> do
serverList <- getMemberList gossipContext >>=
fmap V.concat . mapM (fromInternalServerNodeWithKey advertisedListenersKey)
case find ((nodeId == ) . A.serverNodeId) serverList of
Just theNode -> return theNode
lookupNodePersist metaHandle_ gossipContext_ loadBalanceHashRing_
key_ metaId_ advertisedListenersKey_ =
-- FIXME: This is only a mitigation for the case that the node has not
-- known the full cluster info. Reinvestigate it!!!
-- And as you see, a hard-coded constant...
go metaHandle_ gossipContext_ loadBalanceHashRing_ key_ metaId_ advertisedListenersKey_ 5
where
-- TODO: Currerntly, 'leftRetries' only works before a re-allocation. It can be also
-- used on other cases such as encountering an exception.
go metaHandle gossipContext loadBalanceHashRing
key metaId advertisedListenersKey leftRetries = do
-- FIXME: it will insert the results of lookup no matter the resource exists
-- or not
M.getMetaWithVer @TaskAllocation metaId metaHandle >>= \case
Nothing -> do
(epoch', hashRing) <- atomically $ do
(epoch', hashRing) <- readTVar loadBalanceHashRing
if epoch' > epoch
then pure (epoch', hashRing)
else retry
theNode' <- getResNode hashRing key advertisedListenersKey
try (M.updateMeta @TaskAllocation metaId
(TaskAllocation epoch' (A.serverNodeId theNode'))
(Just version) metaHandle) >>= \case
(epoch, hashRing) <- readTVarIO loadBalanceHashRing
theNode <- getResNode hashRing key advertisedListenersKey
try (M.insertMeta @TaskAllocation
metaId
(TaskAllocation epoch (A.serverNodeId theNode))
metaHandle) >>= \case
Left (e :: SomeException) -> do
-- TODO: add a retry limit here
Log.warning $ "lookupNodePersist exception: " <> Log.buildString' e
<> ", retry..."
lookupNodePersist metaHandle gossipContext loadBalanceHashRing
key metaId advertisedListenersKey
Right () -> return theNode'
Right () -> return theNode
Just (TaskAllocation epoch nodeId, version) -> do
serverList <- getMemberList gossipContext >>=
fmap V.concat . mapM (fromInternalServerNodeWithKey advertisedListenersKey)
case find ((nodeId == ) . A.serverNodeId) serverList of
Just theNode -> return theNode
Nothing -> do
if leftRetries > 0
then do
Log.info $ "<lookupNodePersist> on <key=" <> Log.buildString' key <> ", metaId=" <>
Log.buildString' metaId <> ">: found on Node=" <> Log.buildString' nodeId <>
", but not sure if it's really dead. Left " <> Log.buildString' leftRetries <>
" retries before re-allocate it..."
threadDelay (1 * 1000 * 1000)
go metaHandle gossipContext loadBalanceHashRing
key metaId advertisedListenersKey (leftRetries - 1)
else do
(epoch', hashRing) <- readTVarIO loadBalanceHashRing
theNode' <- getResNode hashRing key advertisedListenersKey
try (M.updateMeta @TaskAllocation metaId
(TaskAllocation epoch' (A.serverNodeId theNode'))
(Just version) metaHandle) >>= \case
Left (e :: SomeException) -> do
-- TODO: add a retry limit here
Log.warning $ "lookupNodePersist exception: " <> Log.buildString' e
<> ", retry..."
lookupNodePersist metaHandle gossipContext loadBalanceHashRing
key metaId advertisedListenersKey
Right () -> return theNode'

data KafkaResource
= KafkaResTopic Text
Expand Down
16 changes: 14 additions & 2 deletions hstream-io/HStream/IO/Worker.hs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ createIOTaskFromTaskInfo
=> Worker -> T.Text -> TaskInfo -> IOOptions -> Bool -> Bool -> Bool -> IO ()
createIOTaskFromTaskInfo worker@Worker{..} taskId taskInfo@TaskInfo {..}
ioOptions cleanIfExists createMetaData enableCheck = do
getIOTask worker taskName >>= \case
M.getIOTaskFromName workerHandle taskName >>= \case
Nothing -> pure ()
Just _ -> do
if cleanIfExists
Expand All @@ -106,7 +106,7 @@ createIOTaskFromTaskInfo worker@Worker{..} taskId taskInfo@TaskInfo {..}

when createMetaData $ M.createIOTaskMeta workerHandle taskName taskId taskInfo
C.modifyMVar_ ioTasksM $ \ioTasks -> do
-- FIXME: already check ioTask exist in `getIOTask worker` step, no need check again
-- FIXME: already check ioTask exist in `getIOTaskFromName` step, no need check again
case HM.lookup taskName ioTasks of
Just _ -> throwIO $ HE.ConnectorExists taskName
Nothing -> do
Expand Down Expand Up @@ -220,9 +220,21 @@ updateConnectorConfig worker name config = do
<> ", new config:" <> Log.buildString' newConnCfg
return True

-- WARNING: This function uses only cache in memory, which can be
-- outdated, especially under complex cluster circumstances.
-- Please be very careful when using this function, e.g. check
-- if a task already exists before creating it.
-- And remember there are <name -> task id -> task meta> mappings
-- in meta store, and the latter is never cleaned up!!!
getIOTask :: Worker -> T.Text -> IO (Maybe IOTask)
getIOTask Worker{..} name = HM.lookup name <$> C.readMVar ioTasksM

-- WARNING: This function uses only cache in memory, which can be
-- outdated, especially under complex cluster circumstances.
-- Please be very careful when using this function, e.g. check
-- if a task already exists before creating it.
-- And remember there are <name -> task id -> task meta> mappings
-- in meta store, and the latter is never cleaned up!!!
getIOTask_ :: Worker -> T.Text -> IO IOTask
getIOTask_ Worker{..} name = do
ioTasks <- C.readMVar ioTasksM
Expand Down
9 changes: 8 additions & 1 deletion hstream/app/lib/KafkaServer.hs
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,14 @@ serve sc@ServerContext{..} netOpts usingCppServer usingSparseOffset = do
-- FIXME: Why need to call deleteAll here?
-- Also in CI, getRqResult(common/hstream/HStream/MetaStore/RqliteUtils.hs) may throw a RQLiteUnspecifiedErr
-- because the affected rows are more than 1, why that's invalid ?
deleteAllMeta @M.TaskAllocation metaHandle `catches` exceptionHandlers
-- FIXME: The following line is very delicate and can cause weird problems.
-- It was intended to re-allocate tasks after a server restart. However,
-- this should be done BEFORE any node serves any client request or
-- internal task. However, the current `serverOnStarted` is not
-- ensured to be called before serving outside.
-- TODO: I do not have 100% confidence this is correct. So it should be
-- carefully investigated and tested.
-- deleteAllMeta @M.TaskAllocation metaHandle `catches` exceptionHandlers

Log.info "starting task detector"
TM.runTaskDetector $ TM.TaskDetector {
Expand Down
9 changes: 8 additions & 1 deletion hstream/app/server.hs
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,14 @@ serve sc@ServerContext{..} rpcOpts enableStreamV2 = do
Gossip -> return ()
_ -> do
getProtoTimestamp >>= \x -> upsertMeta @Proto.Timestamp clusterStartTimeId x metaHandle
handle (\(_ :: RQLiteRowNotFound) -> return ()) $ deleteAllMeta @TaskAllocation metaHandle
-- FIXME: The following line is very delicate and can cause weird problems.
-- It was intended to re-allocate tasks after a server restart. However,
-- this should be done BEFORE any node serves any client request or
-- internal task. However, the current `serverOnStarted` is not
-- ensured to be called before serving outside.
-- TODO: I do not have 100% confidence this is correct. So it should be
-- carefully investigated and tested.
-- handle (\(_ :: RQLiteRowNotFound) -> return ()) $ deleteAllMeta @TaskAllocation metaHandle
-- recover tasks
Log.info "recovering local io tasks"
Cluster.recoverLocalTasks sc scIOWorker
Expand Down

0 comments on commit 87a87f5

Please sign in to comment.