Skip to content

Commit

Permalink
kafka: using a per-connection reader to read records
Browse files Browse the repository at this point in the history
  • Loading branch information
4eUeP committed Dec 14, 2023
1 parent ef32b1d commit a99a467
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 24 deletions.
23 changes: 10 additions & 13 deletions hstream-kafka/HStream/Kafka/Network.hs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ import HStream.Kafka.Metrics.ServerStats (handlerLatencies,
import qualified HStream.Kafka.Network.IO as KIO
import qualified HStream.Kafka.Network.Security as Security
import HStream.Kafka.Server.Config.Types (SaslOptions (..))
import HStream.Kafka.Server.Types (ServerContext (..))
import HStream.Kafka.Server.Types (ServerContext (..),
initConnectionContext)
import qualified HStream.Logger as Log
import Kafka.Protocol.Encoding
import Kafka.Protocol.Message
Expand Down Expand Up @@ -81,30 +82,26 @@ runServer
-> (ServerContext -> [ServiceHandler])
-> (ServerContext -> [ServiceHandler])
-> IO ()
runServer opts sc mkPreAuthedHandlers mkAuthedHandlers =
runServer opts sc_ mkPreAuthedHandlers mkAuthedHandlers =
startTCPServer opts $ \(s, peer) -> do
-- Since the Reader is thread-unsafe, for each connection we create a new
-- Reader.
om <- initOffsetReader $ scOffsetManager sc
let sc' = sc{scOffsetManager = om}

sc <- initConnectionContext sc_
-- Decide if we require SASL authentication
case (serverSaslOptions opts) of
Nothing -> do
void $ State.execStateT (talk (peer, mkAuthedHandlers sc') s) ""
void $ State.execStateT (talk (peer, mkAuthedHandlers sc) s) ""
Just _ -> do
void $ (`State.execStateT` "") $ do
doAuth sc' peer s >>= \case
doAuth sc peer s >>= \case
Security.SaslStateComplete ->
talk (peer, mkAuthedHandlers sc') s
talk (peer, mkAuthedHandlers sc) s
ss -> do
liftIO $ Log.fatal $ "[SASL] authenticate failed with state " <> Log.buildString' ss
where
doAuth sc_ peer s = do
doAuth sc peer s = do
let recv = KIO.recvKafkaMsgBS peer Nothing s
send = KIO.sendKafkaMsgBS s
Security.authenticate sc_
(runHandler peer (mkPreAuthedHandlers sc_))
Security.authenticate sc
(runHandler peer (mkPreAuthedHandlers sc))
recv
send
Security.SaslStateHandshakeOrVersions
Expand Down
18 changes: 13 additions & 5 deletions hstream-kafka/HStream/Kafka/Server/Handler/Consume.hs
Original file line number Diff line number Diff line change
Expand Up @@ -87,18 +87,17 @@ handleFetch ServerContext{..} _ r = K.catchFetchResponseEx $ do
let resp = K.FetchResponse (K.NonNullKaArray respTopics) 0{- TODO: throttleTimeMs -}
throwIO $ K.FetchResponseEx resp

-- New reader
reader <- S.newLDReader scLDClient (fromIntegral numOfReads) Nothing

-- Start reading
--
-- We use a per-connection reader(fetchReader) to read.
V.forM_ topics $ \(_, partitions) -> do
V.forM_ partitions $ \(logid, elsn, _) -> do
case elsn of
Left _ -> pure ()
Right (startlsn, _, _) -> do
Log.debug1 $ "start reading log "
<> Log.build logid <> " from " <> Log.build startlsn
S.readerStartReading reader logid startlsn S.LSN_MAX
S.readerStartReading fetchReader logid startlsn S.LSN_MAX

-- Read records from storage
--
Expand All @@ -111,7 +110,16 @@ handleFetch ServerContext{..} _ r = K.catchFetchResponseEx $ do
-- throughput
--
-- Mode1
records <- readMode1 reader
records <- readMode1 fetchReader

-- For each partition, stop reading
--
-- FIXME: this is needed to avoid memory increasing.
V.forM_ topics $ \(_, partitions) -> do
V.forM_ partitions $ \(logid, elsn, _) -> do
case elsn of
Left _ -> pure ()
Right _ -> S.readerStopReading fetchReader logid

-- Process read records
-- TODO: what if client send two same topic but with different partitions?
Expand Down
43 changes: 37 additions & 6 deletions hstream-kafka/HStream/Kafka/Server/Types.hs
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
module HStream.Kafka.Server.Types
( ServerContext (..)
, initServerContext
, initConnectionContext
) where

import Data.Text (Text)
import Data.Word
import Foreign.ForeignPtr (newForeignPtr_)
import Foreign.Ptr (nullPtr)

import HStream.Common.Server.HashRing (LoadBalanceHashRing,
initializeHashRing)
import HStream.Gossip.Types (GossipContext)
import HStream.Kafka.Common.OffsetManager (OffsetManager,
initOffsetReader,
newOffsetManager)
import HStream.Kafka.Group.GroupCoordinator (GroupCoordinator,
mkGroupCoordinator)
Expand All @@ -31,10 +35,13 @@ data ServerContext = ServerContext
, cmpStrategy :: !S.Compression
, loadBalanceHashRing :: !LoadBalanceHashRing
, gossipContext :: !GossipContext
, scOffsetManager :: !OffsetManager
, scGroupCoordinator :: GroupCoordinator
, scGroupCoordinator :: !GroupCoordinator
, kafkaBrokerConfigs :: !KC.KafkaBrokerConfigs
}
-- { per connection, see 'initConnectionContext'
, scOffsetManager :: !OffsetManager
, fetchReader :: !S.LDReader
-- } per connection end
}

initServerContext
:: ServerOpts
Expand All @@ -46,9 +53,12 @@ initServerContext opts@ServerOpts{..} gossipContext mh = do
-- XXX: Should we add a server option to toggle Stats?
statsHolder <- newServerStatsHolder
epochHashRing <- initializeHashRing gossipContext
offsetManager <- newOffsetManager ldclient
scGroupCoordinator <- mkGroupCoordinator mh ldclient _serverID

offsetManager <- newOffsetManager ldclient
-- Trick to avoid use maybe, must be initialized later
fetchReader <- newForeignPtr_ nullPtr

return
ServerContext
{ serverID = _serverID
Expand All @@ -61,7 +71,28 @@ initServerContext opts@ServerOpts{..} gossipContext mh = do
, cmpStrategy = _compression
, loadBalanceHashRing = epochHashRing
, gossipContext = gossipContext
, scOffsetManager = offsetManager
, scGroupCoordinator = scGroupCoordinator
, kafkaBrokerConfigs = _kafkaBrokerConfigs
, kafkaBrokerConfigs = _kafkaBrokerConfigs
, scOffsetManager = offsetManager
, fetchReader = fetchReader
}

initConnectionContext :: ServerContext -> IO ServerContext
initConnectionContext sc = do
-- Since the Reader inside OffsetManger is thread-unsafe, for each connection
-- we create a new Reader.
!om <- initOffsetReader $ scOffsetManager sc

-- Reader used for fetch.
--
-- Currently, we only need one reader per connection because there will be
-- only one thread to fetch data.
-- TODO: also considering use a pool of readers.
--
-- NOTE: the maxLogs is set to 1000, which means the reader will fetch at most
-- 1000 logs.
-- TODO: maybe we should set maxLogs dynamically according to the max number
-- of all fetch requests in this connection.
!reader <- S.newLDReader (scLDClient sc) 1000 Nothing

pure sc{scOffsetManager = om, fetchReader = reader}

0 comments on commit a99a467

Please sign in to comment.