From a99a467cbd7a3f59c2d14a4eb9d74fcde3bc9eaf Mon Sep 17 00:00:00 2001 From: mu <59917266+4eUeP@users.noreply.github.com> Date: Tue, 12 Dec 2023 16:44:30 +0800 Subject: [PATCH] kafka: using a per-connection reader to read records --- hstream-kafka/HStream/Kafka/Network.hs | 23 +++++----- .../HStream/Kafka/Server/Handler/Consume.hs | 18 +++++--- hstream-kafka/HStream/Kafka/Server/Types.hs | 43 ++++++++++++++++--- 3 files changed, 60 insertions(+), 24 deletions(-) diff --git a/hstream-kafka/HStream/Kafka/Network.hs b/hstream-kafka/HStream/Kafka/Network.hs index 8e1c020c9..23f476740 100644 --- a/hstream-kafka/HStream/Kafka/Network.hs +++ b/hstream-kafka/HStream/Kafka/Network.hs @@ -44,7 +44,8 @@ import HStream.Kafka.Metrics.ServerStats (handlerLatencies, import qualified HStream.Kafka.Network.IO as KIO import qualified HStream.Kafka.Network.Security as Security import HStream.Kafka.Server.Config.Types (SaslOptions (..)) -import HStream.Kafka.Server.Types (ServerContext (..)) +import HStream.Kafka.Server.Types (ServerContext (..), + initConnectionContext) import qualified HStream.Logger as Log import Kafka.Protocol.Encoding import Kafka.Protocol.Message @@ -81,30 +82,26 @@ runServer -> (ServerContext -> [ServiceHandler]) -> (ServerContext -> [ServiceHandler]) -> IO () -runServer opts sc mkPreAuthedHandlers mkAuthedHandlers = +runServer opts sc_ mkPreAuthedHandlers mkAuthedHandlers = startTCPServer opts $ \(s, peer) -> do - -- Since the Reader is thread-unsafe, for each connection we create a new - -- Reader. - om <- initOffsetReader $ scOffsetManager sc - let sc' = sc{scOffsetManager = om} - + sc <- initConnectionContext sc_ -- Decide if we require SASL authentication case (serverSaslOptions opts) of Nothing -> do - void $ State.execStateT (talk (peer, mkAuthedHandlers sc') s) "" + void $ State.execStateT (talk (peer, mkAuthedHandlers sc) s) "" Just _ -> do void $ (`State.execStateT` "") $ do - doAuth sc' peer s >>= \case + doAuth sc peer s >>= \case Security.SaslStateComplete -> - talk (peer, mkAuthedHandlers sc') s + talk (peer, mkAuthedHandlers sc) s ss -> do liftIO $ Log.fatal $ "[SASL] authenticate failed with state " <> Log.buildString' ss where - doAuth sc_ peer s = do + doAuth sc peer s = do let recv = KIO.recvKafkaMsgBS peer Nothing s send = KIO.sendKafkaMsgBS s - Security.authenticate sc_ - (runHandler peer (mkPreAuthedHandlers sc_)) + Security.authenticate sc + (runHandler peer (mkPreAuthedHandlers sc)) recv send Security.SaslStateHandshakeOrVersions diff --git a/hstream-kafka/HStream/Kafka/Server/Handler/Consume.hs b/hstream-kafka/HStream/Kafka/Server/Handler/Consume.hs index bf55a7a08..4ee2736fe 100644 --- a/hstream-kafka/HStream/Kafka/Server/Handler/Consume.hs +++ b/hstream-kafka/HStream/Kafka/Server/Handler/Consume.hs @@ -87,10 +87,9 @@ handleFetch ServerContext{..} _ r = K.catchFetchResponseEx $ do let resp = K.FetchResponse (K.NonNullKaArray respTopics) 0{- TODO: throttleTimeMs -} throwIO $ K.FetchResponseEx resp - -- New reader - reader <- S.newLDReader scLDClient (fromIntegral numOfReads) Nothing - -- Start reading + -- + -- We use a per-connection reader(fetchReader) to read. V.forM_ topics $ \(_, partitions) -> do V.forM_ partitions $ \(logid, elsn, _) -> do case elsn of @@ -98,7 +97,7 @@ handleFetch ServerContext{..} _ r = K.catchFetchResponseEx $ do Right (startlsn, _, _) -> do Log.debug1 $ "start reading log " <> Log.build logid <> " from " <> Log.build startlsn - S.readerStartReading reader logid startlsn S.LSN_MAX + S.readerStartReading fetchReader logid startlsn S.LSN_MAX -- Read records from storage -- @@ -111,7 +110,16 @@ handleFetch ServerContext{..} _ r = K.catchFetchResponseEx $ do -- throughput -- -- Mode1 - records <- readMode1 reader + records <- readMode1 fetchReader + + -- For each partition, stop reading + -- + -- FIXME: this is needed to avoid memory increasing. + V.forM_ topics $ \(_, partitions) -> do + V.forM_ partitions $ \(logid, elsn, _) -> do + case elsn of + Left _ -> pure () + Right _ -> S.readerStopReading fetchReader logid -- Process read records -- TODO: what if client send two same topic but with different partitions? diff --git a/hstream-kafka/HStream/Kafka/Server/Types.hs b/hstream-kafka/HStream/Kafka/Server/Types.hs index ac70f09a5..442933a1a 100644 --- a/hstream-kafka/HStream/Kafka/Server/Types.hs +++ b/hstream-kafka/HStream/Kafka/Server/Types.hs @@ -1,15 +1,19 @@ module HStream.Kafka.Server.Types ( ServerContext (..) , initServerContext + , initConnectionContext ) where import Data.Text (Text) import Data.Word +import Foreign.ForeignPtr (newForeignPtr_) +import Foreign.Ptr (nullPtr) import HStream.Common.Server.HashRing (LoadBalanceHashRing, initializeHashRing) import HStream.Gossip.Types (GossipContext) import HStream.Kafka.Common.OffsetManager (OffsetManager, + initOffsetReader, newOffsetManager) import HStream.Kafka.Group.GroupCoordinator (GroupCoordinator, mkGroupCoordinator) @@ -31,10 +35,13 @@ data ServerContext = ServerContext , cmpStrategy :: !S.Compression , loadBalanceHashRing :: !LoadBalanceHashRing , gossipContext :: !GossipContext - , scOffsetManager :: !OffsetManager - , scGroupCoordinator :: GroupCoordinator + , scGroupCoordinator :: !GroupCoordinator , kafkaBrokerConfigs :: !KC.KafkaBrokerConfigs -} + -- { per connection, see 'initConnectionContext' + , scOffsetManager :: !OffsetManager + , fetchReader :: !S.LDReader + -- } per connection end + } initServerContext :: ServerOpts @@ -46,9 +53,12 @@ initServerContext opts@ServerOpts{..} gossipContext mh = do -- XXX: Should we add a server option to toggle Stats? statsHolder <- newServerStatsHolder epochHashRing <- initializeHashRing gossipContext - offsetManager <- newOffsetManager ldclient scGroupCoordinator <- mkGroupCoordinator mh ldclient _serverID + offsetManager <- newOffsetManager ldclient + -- Trick to avoid use maybe, must be initialized later + fetchReader <- newForeignPtr_ nullPtr + return ServerContext { serverID = _serverID @@ -61,7 +71,28 @@ initServerContext opts@ServerOpts{..} gossipContext mh = do , cmpStrategy = _compression , loadBalanceHashRing = epochHashRing , gossipContext = gossipContext - , scOffsetManager = offsetManager , scGroupCoordinator = scGroupCoordinator - , kafkaBrokerConfigs = _kafkaBrokerConfigs + , kafkaBrokerConfigs = _kafkaBrokerConfigs + , scOffsetManager = offsetManager + , fetchReader = fetchReader } + +initConnectionContext :: ServerContext -> IO ServerContext +initConnectionContext sc = do + -- Since the Reader inside OffsetManger is thread-unsafe, for each connection + -- we create a new Reader. + !om <- initOffsetReader $ scOffsetManager sc + + -- Reader used for fetch. + -- + -- Currently, we only need one reader per connection because there will be + -- only one thread to fetch data. + -- TODO: also considering use a pool of readers. + -- + -- NOTE: the maxLogs is set to 1000, which means the reader will fetch at most + -- 1000 logs. + -- TODO: maybe we should set maxLogs dynamically according to the max number + -- of all fetch requests in this connection. + !reader <- S.newLDReader (scLDClient sc) 1000 Nothing + + pure sc{scOffsetManager = om, fetchReader = reader}