Skip to content

Commit

Permalink
kafka: upgrade Produce to version 5
Browse files Browse the repository at this point in the history
  • Loading branch information
4eUeP committed Jan 2, 2024
1 parent 2ce2e5a commit c4bda20
Show file tree
Hide file tree
Showing 5 changed files with 201 additions and 16 deletions.
4 changes: 2 additions & 2 deletions hstream-kafka/HStream/Kafka/Server/Handler.hsc
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ import qualified Kafka.Protocol.Service as K

#cv_handler ApiVersions, 0, 3
#cv_handler Metadata, 0, 5
#cv_handler Produce, 0, 3
#cv_handler Produce, 0, 5
#cv_handler InitProducerId, 0, 0
#cv_handler Fetch, 0, 4
#cv_handler DescribeConfigs, 0, 0
Expand All @@ -85,7 +85,7 @@ handlers sc =
[ #mk_handler ApiVersions, 0, 3
, #mk_handler Metadata, 0, 5
-- Write
, #mk_handler Produce, 0, 3
, #mk_handler Produce, 0, 5
, #mk_handler InitProducerId, 0, 0
-- Read
, #mk_handler Fetch, 0, 4
Expand Down
50 changes: 44 additions & 6 deletions hstream-kafka/HStream/Kafka/Server/Handler/Produce.hs
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,13 @@ handleProduce
-> K.RequestContext
-> K.ProduceRequest
-> IO K.ProduceResponse
handleProduce ServerContext{..} _ req = do
handleProduce ServerContext{..} reqCtx req = do
-- TODO: handle request args: acks, timeoutMs
let topicData = fromMaybe V.empty (K.unKaArray req.topicData)

responses <- V.forM topicData $ \topic{- TopicProduceData -} -> do
-- A topic is a stream. Here we donot need to check the topic existence,
-- because the metadata api does(?)
-- because the metadata api already does(?)
partitions <- S.listStreamPartitionsOrdered
scLDClient (S.transToTopicStreamName topic.name)
let partitionData = fromMaybe V.empty (K.unKaArray topic.partitionData)
Expand All @@ -71,14 +71,52 @@ handleProduce ServerContext{..} _ req = do

-- Wirte appends
(S.AppendCompletion{..}, offset) <-
appendRecords True scLDClient scOffsetManager (topic.name, partition.index) logid recordBytes
appendRecords True scLDClient scOffsetManager
(topic.name, partition.index) logid recordBytes

Log.debug1 $ "Append done " <> Log.build appendCompLogID
<> ", lsn: " <> Log.build appendCompLSN
<> ", start offset: " <> Log.build offset

-- TODO: logAppendTimeMs, only support LogAppendTime now
pure $ K.PartitionProduceResponse partition.index K.NONE offset appendCompTimestamp
-- TODO: PartitionProduceResponse.logAppendTimeMs
--
-- The timestamp returned by broker after appending the messages. If
-- CreateTime is used for the topic, the timestamp will be -1. If
-- LogAppendTime is used for the topic, the timestamp will be the broker
-- local time when the messages are appended.
--
-- Currently, only support LogAppendTime
if reqCtx.apiVersion >= 5
then do
-- FIXME: performance improvements
--
-- For each append request, we need to read the oldest offset of the
-- log. This may cause performance problems.
m_logStartOffset <- K.getOldestOffset scOffsetManager logid
case m_logStartOffset of
Just logStartOffset ->
pure $ K.PartitionProduceResponse
partition.index
K.NONE
offset
appendCompTimestamp
logStartOffset
Nothing -> do
Log.fatal $ "Cannot get log start offset for logid "
<> Log.build logid
pure $ K.PartitionProduceResponse
partition.index
K.NONE
offset
appendCompTimestamp
(-1)
else
pure $ K.PartitionProduceResponse
partition.index
K.NONE
offset
appendCompTimestamp
(-1)

pure $ K.TopicProduceResponse topic.name (K.KaArray $ Just partitionResponses)

Expand All @@ -90,7 +128,7 @@ handleInitProducerId
-> K.RequestContext
-> K.InitProducerIdRequest
-> IO K.InitProducerIdResponse
handleInitProducerId ServerContext{..} _ req = do
handleInitProducerId _ _ _ = do
Log.warning "InitProducerId is not implemented"
pure $ K.InitProducerIdResponse
{ throttleTimeMs = 0
Expand Down
76 changes: 73 additions & 3 deletions hstream-kafka/protocol/Kafka/Protocol/Message/Struct.hs
Original file line number Diff line number Diff line change
Expand Up @@ -673,6 +673,14 @@ type PartitionProduceDataV3 = PartitionProduceDataV0

type TopicProduceDataV3 = TopicProduceDataV0

type PartitionProduceDataV4 = PartitionProduceDataV0

type TopicProduceDataV4 = TopicProduceDataV0

type PartitionProduceDataV5 = PartitionProduceDataV0

type TopicProduceDataV5 = TopicProduceDataV0

data PartitionProduceResponseV0 = PartitionProduceResponseV0
{ index :: {-# UNPACK #-} !Int32
-- ^ The partition index.
Expand Down Expand Up @@ -722,6 +730,35 @@ type PartitionProduceResponseV3 = PartitionProduceResponseV2

type TopicProduceResponseV3 = TopicProduceResponseV2

type PartitionProduceResponseV4 = PartitionProduceResponseV2

type TopicProduceResponseV4 = TopicProduceResponseV2

data PartitionProduceResponseV5 = PartitionProduceResponseV5
{ index :: {-# UNPACK #-} !Int32
-- ^ The partition index.
, errorCode :: {-# UNPACK #-} !ErrorCode
-- ^ The error code, or 0 if there was no error.
, baseOffset :: {-# UNPACK #-} !Int64
-- ^ The base offset.
, logAppendTimeMs :: {-# UNPACK #-} !Int64
-- ^ The timestamp returned by broker after appending the messages. If
-- CreateTime is used for the topic, the timestamp will be -1. If
-- LogAppendTime is used for the topic, the timestamp will be the broker
-- local time when the messages are appended.
, logStartOffset :: {-# UNPACK #-} !Int64
-- ^ The log start offset.
} deriving (Show, Eq, Generic)
instance Serializable PartitionProduceResponseV5

data TopicProduceResponseV5 = TopicProduceResponseV5
{ name :: !Text
-- ^ The topic name
, partitionResponses :: !(KaArray PartitionProduceResponseV5)
-- ^ Each partition that we produced to within the topic.
} deriving (Show, Eq, Generic)
instance Serializable TopicProduceResponseV5

data SyncGroupRequestAssignmentV0 = SyncGroupRequestAssignmentV0
{ memberId :: !Text
-- ^ The ID of the member to assign.
Expand Down Expand Up @@ -1379,6 +1416,10 @@ data ProduceRequestV3 = ProduceRequestV3
} deriving (Show, Eq, Generic)
instance Serializable ProduceRequestV3

type ProduceRequestV4 = ProduceRequestV3

type ProduceRequestV5 = ProduceRequestV3

newtype ProduceResponseV0 = ProduceResponseV0
{ responses :: (KaArray TopicProduceResponseV0)
} deriving (Show, Eq, Generic)
Expand All @@ -1404,6 +1445,17 @@ instance Serializable ProduceResponseV2

type ProduceResponseV3 = ProduceResponseV2

type ProduceResponseV4 = ProduceResponseV2

data ProduceResponseV5 = ProduceResponseV5
{ responses :: !(KaArray TopicProduceResponseV5)
-- ^ Each produce response
, throttleTimeMs :: {-# UNPACK #-} !Int32
-- ^ The duration in milliseconds for which the request was throttled due
-- to a quota violation, or zero if the request did not violate any quota.
} deriving (Show, Eq, Generic)
instance Serializable ProduceResponseV5

newtype SaslAuthenticateRequestV0 = SaslAuthenticateRequestV0
{ authBytes :: ByteString
} deriving (Show, Eq, Generic)
Expand Down Expand Up @@ -1911,10 +1963,18 @@ data HStreamKafkaV4
instance Service HStreamKafkaV4 where
type ServiceName HStreamKafkaV4 = "HStreamKafkaV4"
type ServiceMethods HStreamKafkaV4 =
'[ "fetch"
'[ "produce"
, "fetch"
, "metadata"
]

instance HasMethodImpl HStreamKafkaV4 "produce" where
type MethodName HStreamKafkaV4 "produce" = "produce"
type MethodKey HStreamKafkaV4 "produce" = 0
type MethodVersion HStreamKafkaV4 "produce" = 4
type MethodInput HStreamKafkaV4 "produce" = ProduceRequestV4
type MethodOutput HStreamKafkaV4 "produce" = ProduceResponseV4

instance HasMethodImpl HStreamKafkaV4 "fetch" where
type MethodName HStreamKafkaV4 "fetch" = "fetch"
type MethodKey HStreamKafkaV4 "fetch" = 1
Expand All @@ -1934,9 +1994,17 @@ data HStreamKafkaV5
instance Service HStreamKafkaV5 where
type ServiceName HStreamKafkaV5 = "HStreamKafkaV5"
type ServiceMethods HStreamKafkaV5 =
'[ "metadata"
'[ "produce"
, "metadata"
]

instance HasMethodImpl HStreamKafkaV5 "produce" where
type MethodName HStreamKafkaV5 "produce" = "produce"
type MethodKey HStreamKafkaV5 "produce" = 0
type MethodVersion HStreamKafkaV5 "produce" = 5
type MethodInput HStreamKafkaV5 "produce" = ProduceRequestV5
type MethodOutput HStreamKafkaV5 "produce" = ProduceResponseV5

instance HasMethodImpl HStreamKafkaV5 "metadata" where
type MethodName HStreamKafkaV5 "metadata" = "metadata"
type MethodKey HStreamKafkaV5 "metadata" = 3
Expand Down Expand Up @@ -1975,7 +2043,7 @@ instance Show ApiKey where

supportedApiVersions :: [ApiVersionV0]
supportedApiVersions =
[ ApiVersionV0 (ApiKey 0) 0 3
[ ApiVersionV0 (ApiKey 0) 0 5
, ApiVersionV0 (ApiKey 1) 0 4
, ApiVersionV0 (ApiKey 2) 0 1
, ApiVersionV0 (ApiKey 3) 0 5
Expand Down Expand Up @@ -2003,6 +2071,8 @@ getHeaderVersion (ApiKey (0)) 0 = (1, 0)
getHeaderVersion (ApiKey (0)) 1 = (1, 0)
getHeaderVersion (ApiKey (0)) 2 = (1, 0)
getHeaderVersion (ApiKey (0)) 3 = (1, 0)
getHeaderVersion (ApiKey (0)) 4 = (1, 0)
getHeaderVersion (ApiKey (0)) 5 = (1, 0)
getHeaderVersion (ApiKey (1)) 0 = (1, 0)
getHeaderVersion (ApiKey (1)) 1 = (1, 0)
getHeaderVersion (ApiKey (1)) 2 = (1, 0)
Expand Down
Loading

0 comments on commit c4bda20

Please sign in to comment.