Skip to content


kafka fetch: handle maxBytes & partitionMaxBytes
Browse files Browse the repository at this point in the history
  • Loading branch information
4eUeP committed Dec 21, 2023
1 parent 5084bf8 commit a511add
Show file tree
Hide file tree
Showing 3 changed files with 148 additions and 75 deletions.
1 change: 0 additions & 1 deletion cabal.project
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ constraints:
, zoovisitor ==
, criterion ^>= 1.6
, aeson ^>= 2.1
, filepath >= 1.4.100

-- rocksdb-haskell-bindings cant be built with streamly-0.10
, streamly ^>= 0.9
Expand Down
221 changes: 147 additions & 74 deletions hstream-kafka/HStream/Kafka/Server/Handler/Consume.hs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import qualified Data.Text as T
import qualified Data.Vector as V
import qualified Data.Vector.Hashtables as HT
import qualified Data.Vector.Storable as VS
import GHC.Data.FastMutInt
import GHC.Stack (HasCallStack)
import qualified Prometheus as P

Expand Down Expand Up @@ -46,75 +47,93 @@ type RecordTable =
(GV.Growing V.Vector GV.RealWorld K.RecordFormat)

-- Tuple of (startLsn, tailLsn, highwaterOffset)
type PartitionOffsetData = Either K.PartitionData (S.LSN, S.LSN, Int64)

data Partition = Partition
{ logid :: {-# UNPACK #-} !S.C_LogID
, elsn :: {-# UNPACK #-} !PartitionOffsetData
, request :: !K.FetchPartition

-- NOTE: this behaviour is not the same as kafka broker
:: HasCallStack
=> ServerContext
-> K.RequestContext -> K.FetchRequest -> IO K.FetchResponse
handleFetch ServerContext{..} _ r = K.catchFetchResponseEx $ do
-- * Preprocess request
mutNumOfReads <- newFastMutInt 0 -- Total number of real reads
-- kafka broker just throw java.lang.RuntimeException if topics is null, here
-- we do the same.
let K.NonNullKaArray topicReqs = r.topics
topics <- V.forM topicReqs $ \topic{- K.FetchTopic -} -> do
topics <- V.forM topicReqs $ \t{- K.FetchTopic -} -> do
-- Partition should be non-empty
let K.NonNullKaArray partitionReqs = t.partitions
orderedParts <- S.listStreamPartitionsOrdered scLDClient
(S.transToTopicStreamName topic.topic)
let K.NonNullKaArray partitionReqs = topic.partitions
(S.transToTopicStreamName t.topic)
ps <- V.forM partitionReqs $ \p{- K.FetchPartition -} -> do
P.withLabel totalConsumeRequest (topic.topic, T.pack . show $ p.partition) $
P.withLabel totalConsumeRequest (t.topic, T.pack . show $ p.partition) $
\counter -> void $ P.addCounter counter 1
let m_logid = orderedParts V.!? fromIntegral p.partition
case m_logid of
Nothing -> do
let elsn = errorPartitionResponse p.partition K.UNKNOWN_TOPIC_OR_PARTITION
-- Actually, the logid should be Nothing but 0, however, we won't
-- use it, so just set it to 0
pure (0, Left elsn, p)
pure $ Partition 0 (Left elsn) p
Just (_, logid) -> do
elsn <- getPartitionLsn scLDClient scOffsetManager logid p.partition
pure (logid, elsn, p)
pure (topic.topic, ps)
when (isRight elsn) $ void $ atomicFetchAddFastMut mutNumOfReads 1
pure $ Partition logid elsn p
pure (t.topic, ps)

let numOfReads = V.sum $ (V.length . V.filter (\(_, x, _) -> isRight x) . snd) topics
numOfReads <- readFastMutInt mutNumOfReads
when (numOfReads < 1) $ do
respTopics <- V.forM topics $ \(topic, partitions) -> do
respPartitionDatas <- V.forM partitions $ \(_, elsn, _) -> do
case elsn of
respPartitionDatas <- V.forM partitions $ \partition -> do
case partition.elsn of
Left pd -> pure pd
Right _ -> error "LogicError: this should not be right"
pure $ K.FetchableTopicResponse topic (K.NonNullKaArray respPartitionDatas)
let resp = K.FetchResponse (K.NonNullKaArray respTopics) 0{- TODO: throttleTimeMs -}
throwIO $ K.FetchResponseEx resp

-- Start reading
-- * Start reading
-- We use a per-connection reader(fetchReader) to read.
-- Currently, we use a per-connection reader(fetchReader) to read.
V.forM_ topics $ \(_, partitions) -> do
V.forM_ partitions $ \(logid, elsn, _) -> do
case elsn of
V.forM_ partitions $ \partition -> do
case partition.elsn of
Left _ -> pure ()
Right (startlsn, _, _) -> do
Log.debug1 $ "start reading log "
<> logid <> " from " <> startlsn
S.readerStartReading fetchReader logid startlsn S.LSN_MAX
<> partition.logid
<> " from " <> startlsn
S.readerStartReading fetchReader partition.logid startlsn S.LSN_MAX

-- Read records from storage
-- * Read records from storage
-- TODO:
-- - dynamically change reader settings according to the client request
-- (e.g. maxWaitMs, minBytes...)
-- - handle maxBytes
-- FIXME: Do not setWaitOnlyWhenNoData if you are mostly focusing on
-- throughput
-- Mode1
records <- readMode1 fetchReader

-- Process read records
-- TODO: what if client send two same topic but with different partitions?
-- * Process read records
-- TODO: what if client send two same topic but with different partitions?
-- {logid: [RecordFormat]}
readRecords <- HT.initialize numOfReads :: IO RecordTable
forM_ records $ \record -> do
Expand All @@ -124,36 +143,44 @@ handleFetch ServerContext{..} _ r = K.catchFetchResponseEx $ do
v' <- GV.append v recordFormat
HT.insert readRecords logid v'

-- Generate response
-- * Generate response
mutMaxBytes <- newFastMutInt $ fromIntegral r.maxBytes
mutIsFirstPartition <- newFastMutInt 1 -- TODO: improve this
respTopics <- V.forM topics $ \(topic, partitions) -> do
respPartitionDatas <- V.forM partitions $ \(logid, elsn, p) -> do
case elsn of
respPartitionDatas <- V.forM partitions $ \partition -> do
let request = partition.request
case partition.elsn of
Left pd -> pure pd
Right (_startlsn, _endlsn, hioffset) -> do
mgv <- HT.lookup readRecords logid
mgv <- HT.lookup readRecords partition.logid
case mgv of
Nothing ->
pure $ K.PartitionData p.partition K.NONE hioffset (Just "")
pure $ K.PartitionData request.partition K.NONE
(Just "")
(-1){- TODO: lastStableOffset -}
(K.NonNullKaArray V.empty){- TODO: abortedTransactions -}
Just gv -> do
v <- GV.unsafeFreeze gv
bs <- encodePartition p v

let partLabel = (topic, T.pack . show $ p.partition)
bs <- encodePartition mutMaxBytes mutIsFirstPartition request v
-- Stats
let partLabel = (topic, T.pack . show $ request.partition)
P.withLabel topicTotalSendBytes partLabel $ \counter -> void $
P.addCounter counter (fromIntegral $ BS.length bs)
P.withLabel topicTotalSendMessages partLabel $ \counter -> void $ do
let totalRecords = V.sum $ (\K.RecordFormat{..} -> batchLength) v
P.addCounter counter (fromIntegral totalRecords)

pure $ K.PartitionData p.partition K.NONE hioffset (Just bs)
-- PartitionData
pure $ K.PartitionData request.partition K.NONE hioffset (Just bs)
(-1){- TODO: lastStableOffset -}
(K.NonNullKaArray V.empty){- TODO: abortedTransactions -}
pure $ K.FetchableTopicResponse topic (K.NonNullKaArray respPartitionDatas)
pure $ K.FetchResponse (K.NonNullKaArray respTopics) 0{- TODO: throttleTimeMs -}

-- Currently unused
readMode0 :: S.LDReader -> IO [S.DataRecord ByteString]
readMode0 reader = do
if r.minBytes <= 0 || r.maxWaitMs <= 0
Expand Down Expand Up @@ -194,46 +221,6 @@ handleFetch ServerContext{..} _ r = K.catchFetchResponseEx $ do
S.readerSetTimeout reader r.maxWaitMs
S.readerRead reader storageOpts.fetchMaxLen

-- Note this function's behaviour is not the same as kafka broker
-- In kafka broker, regarding the format on disk, the broker will return
-- the message format according to the fetch api version. Which means
-- * if the fetch api version is less than 4, the broker will always
-- return MessageSet even the message format on disk is RecordBatch.
-- * if the fetch api version is 4+, the broker will always return
-- RecordBath.
-- Here, we donot handle the fetch api version, we just return the message
-- format according to the message format on disk.
-- However, if you always use RecordBath for appending and reading, it
-- won't be a problem.
:: K.FetchPartition -> V.Vector K.RecordFormat -> IO ByteString
encodePartition p v = do
let (rf :: K.RecordFormat, vs) =
-- This should not be Nothing, because if we found the key
-- in `readRecords`, it means we have at least one record
-- in this
fromMaybe (error "LogicError: got empty vector value")
(V.uncons v)
bytesOnDisk = K.unCompactBytes rf.recordBytes
-- only the first MessageSet need to to this seeking
magic <- K.decodeRecordMagic bytesOnDisk
fstRecordBytes <-
if | magic >= 2 -> pure bytesOnDisk
| otherwise -> do
let absStartOffset = rf.offset + 1 - fromIntegral rf.batchLength
offset = p.fetchOffset - absStartOffset
if offset > 0
then K.seekBatch (fromIntegral offset) bytesOnDisk
else pure bytesOnDisk

let v' = (BB.byteString . K.unCompactBytes . (.recordBytes)) vs
b = V.foldl (<>) (BB.byteString fstRecordBytes) v'
pure $ BS.toStrict $ BB.toLazyByteString b


-- Return tuple of (startLsn, tailLsn, highwaterOffset)
Expand All @@ -244,7 +231,7 @@ getPartitionLsn
-> K.OffsetManager
-> S.C_LogID -> Int32
-> Int64 -- ^ kafka start offset
-> IO (Either K.PartitionData (S.LSN, S.LSN, Int64))
-> IO PartitionOffsetData
getPartitionLsn ldclient om logid partition offset = do
m <- K.getLatestOffsetWithLsn om logid
case m of
Expand Down Expand Up @@ -272,6 +259,92 @@ getPartitionLsn ldclient om logid partition offset = do
then pure $ Right (S.LSN_MIN, S.LSN_INVALID, 0)
else pure $ Left $ errorPartitionResponse partition K.OFFSET_OUT_OF_RANGE

-- Note this function's behaviour is not the same as kafka broker
-- In kafka broker, regarding the format on disk, the broker will return
-- the message format according to the fetch api version. Which means
-- * if the fetch api version is less than 4, the broker will always
-- return MessageSet even the message format on disk is RecordBatch.
-- * if the fetch api version is 4+, the broker will always return
-- RecordBath.
-- Here, we donot handle the fetch api version, we just return the message
-- format according to the message format on disk.
-- However, if you always use RecordBath for appending and reading, it
-- won't be a problem.
:: FastMutInt
-> FastMutInt
-> K.FetchPartition
-> V.Vector K.RecordFormat
-> IO ByteString
encodePartition mutMaxBytes mutIsFirstPartition p v = do
maxBytes <- readFastMutInt mutMaxBytes
if maxBytes > 0 then doEncode maxBytes else pure ""
doEncode maxBytes = do
isFristPartition <- readFastMutInt mutIsFirstPartition
(fstRecordBytes, vs) <- reFormat
let fstLen = BS.length fstRecordBytes
if isFristPartition == 1
-- First partition
then do
writeFastMutInt mutIsFirstPartition 0 -- next partition should not be the first
if fstLen >= maxBytes
then do writeFastMutInt mutMaxBytes (-1)
pure fstRecordBytes
else if fstLen >= (fromIntegral p.partitionMaxBytes)
then do void $ atomicFetchAddFastMut mutMaxBytes (-fstLen)
pure fstRecordBytes
else doEncodeElse fstRecordBytes vs
-- Not the first partition
else do
if fstLen <= maxBytes
then doEncodeElse fstRecordBytes vs
else pure ""

doEncodeElse fstRecordBytes vs = do
let fstLen = BS.length fstRecordBytes
void $ atomicFetchAddFastMut mutMaxBytes (-fstLen)
mutPartitionMaxBytes <-
newFastMutInt (fromIntegral p.partitionMaxBytes - fstLen)

bb <- V.foldM (\b r -> do
let rbs = K.unCompactBytes r.recordBytes
rlen = BS.length rbs
curMaxBytes <- atomicFetchAddFastMut mutMaxBytes (-rlen)
curPartMaxBytes <- atomicFetchAddFastMut mutPartitionMaxBytes (-rlen)
-- take a negative number of bytes will return an empty ByteString
let rbs' = BS.take (min curPartMaxBytes curMaxBytes) rbs
if BS.null rbs'
then pure b
else pure $ b <> BB.byteString rbs'
) (BB.byteString fstRecordBytes) vs

pure $ BS.toStrict $ BB.toLazyByteString bb

reFormat = do
let (rf :: K.RecordFormat, vs) =
-- This should not be Nothing, because if we found the key
-- in `readRecords`, it means we have at least one record
-- in this
fromMaybe (error "LogicError: got empty vector value")
(V.uncons v)
bytesOnDisk = K.unCompactBytes rf.recordBytes
-- only the first MessageSet need to to this seeking
magic <- K.decodeRecordMagic bytesOnDisk
fstRecordBytes <-
if | magic >= 2 -> pure bytesOnDisk
| otherwise -> do
let absStartOffset = rf.offset + 1 - fromIntegral rf.batchLength
offset = p.fetchOffset - absStartOffset
if offset > 0
then K.seekBatch (fromIntegral offset) bytesOnDisk
else pure bytesOnDisk
pure (fstRecordBytes, vs)

errorPartitionResponse :: Int32 -> K.ErrorCode -> K.PartitionData
errorPartitionResponse partitionIndex ec =
K.PartitionData partitionIndex ec (-1) (Just "")
Expand Down
1 change: 1 addition & 0 deletions hstream-kafka/hstream-kafka.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ library
, containers
, directory
, foreign
, ghc
, gsasl-hs
, hashable
, hashtables
Expand Down

0 comments on commit a511add

Please sign in to comment.