diff --git a/hstream-kafka/HStream/Kafka/Server/Handler.hsc b/hstream-kafka/HStream/Kafka/Server/Handler.hsc index 408c7d377..11b103d4b 100644 --- a/hstream-kafka/HStream/Kafka/Server/Handler.hsc +++ b/hstream-kafka/HStream/Kafka/Server/Handler.hsc @@ -58,7 +58,7 @@ import qualified Kafka.Protocol.Service as K #cv_handler ApiVersions, 0, 3 #cv_handler Metadata, 0, 5 -#cv_handler Produce, 0, 3 +#cv_handler Produce, 0, 5 #cv_handler InitProducerId, 0, 0 #cv_handler Fetch, 0, 4 #cv_handler DescribeConfigs, 0, 0 @@ -85,7 +85,7 @@ handlers sc = [ #mk_handler ApiVersions, 0, 3 , #mk_handler Metadata, 0, 5 -- Write - , #mk_handler Produce, 0, 3 + , #mk_handler Produce, 0, 5 , #mk_handler InitProducerId, 0, 0 -- Read , #mk_handler Fetch, 0, 4 diff --git a/hstream-kafka/HStream/Kafka/Server/Handler/Produce.hs b/hstream-kafka/HStream/Kafka/Server/Handler/Produce.hs index 706b5406d..da21a58c2 100644 --- a/hstream-kafka/HStream/Kafka/Server/Handler/Produce.hs +++ b/hstream-kafka/HStream/Kafka/Server/Handler/Produce.hs @@ -45,13 +45,13 @@ handleProduce -> K.RequestContext -> K.ProduceRequest -> IO K.ProduceResponse -handleProduce ServerContext{..} _ req = do +handleProduce ServerContext{..} reqCtx req = do -- TODO: handle request args: acks, timeoutMs let topicData = fromMaybe V.empty (K.unKaArray req.topicData) responses <- V.forM topicData $ \topic{- TopicProduceData -} -> do -- A topic is a stream. Here we donot need to check the topic existence, - -- because the metadata api does(?) + -- because the metadata api already does(?) partitions <- S.listStreamPartitionsOrdered scLDClient (S.transToTopicStreamName topic.name) let partitionData = fromMaybe V.empty (K.unKaArray topic.partitionData) @@ -71,14 +71,52 @@ handleProduce ServerContext{..} _ req = do -- Wirte appends (S.AppendCompletion{..}, offset) <- - appendRecords True scLDClient scOffsetManager (topic.name, partition.index) logid recordBytes + appendRecords True scLDClient scOffsetManager + (topic.name, partition.index) logid recordBytes Log.debug1 $ "Append done " <> Log.build appendCompLogID <> ", lsn: " <> Log.build appendCompLSN <> ", start offset: " <> Log.build offset - -- TODO: logAppendTimeMs, only support LogAppendTime now - pure $ K.PartitionProduceResponse partition.index K.NONE offset appendCompTimestamp + -- TODO: PartitionProduceResponse.logAppendTimeMs + -- + -- The timestamp returned by broker after appending the messages. If + -- CreateTime is used for the topic, the timestamp will be -1. If + -- LogAppendTime is used for the topic, the timestamp will be the broker + -- local time when the messages are appended. + -- + -- Currently, only support LogAppendTime + if reqCtx.apiVersion >= 5 + then do + -- FIXME: performance improvements + -- + -- For each append request, we need to read the oldest offset of the + -- log. This may cause performance problems. + m_logStartOffset <- K.getOldestOffset scOffsetManager logid + case m_logStartOffset of + Just logStartOffset -> + pure $ K.PartitionProduceResponse + partition.index + K.NONE + offset + appendCompTimestamp + logStartOffset + Nothing -> do + Log.fatal $ "Cannot get log start offset for logid " + <> Log.build logid + pure $ K.PartitionProduceResponse + partition.index + K.NONE + offset + appendCompTimestamp + (-1) + else + pure $ K.PartitionProduceResponse + partition.index + K.NONE + offset + appendCompTimestamp + (-1) pure $ K.TopicProduceResponse topic.name (K.KaArray $ Just partitionResponses) @@ -90,7 +128,7 @@ handleInitProducerId -> K.RequestContext -> K.InitProducerIdRequest -> IO K.InitProducerIdResponse -handleInitProducerId ServerContext{..} _ req = do +handleInitProducerId _ _ _ = do Log.warning "InitProducerId is not implemented" pure $ K.InitProducerIdResponse { throttleTimeMs = 0 diff --git a/hstream-kafka/protocol/Kafka/Protocol/Message/Struct.hs b/hstream-kafka/protocol/Kafka/Protocol/Message/Struct.hs index 4c8018b3c..1fa2b2fb1 100644 --- a/hstream-kafka/protocol/Kafka/Protocol/Message/Struct.hs +++ b/hstream-kafka/protocol/Kafka/Protocol/Message/Struct.hs @@ -673,6 +673,14 @@ type PartitionProduceDataV3 = PartitionProduceDataV0 type TopicProduceDataV3 = TopicProduceDataV0 +type PartitionProduceDataV4 = PartitionProduceDataV0 + +type TopicProduceDataV4 = TopicProduceDataV0 + +type PartitionProduceDataV5 = PartitionProduceDataV0 + +type TopicProduceDataV5 = TopicProduceDataV0 + data PartitionProduceResponseV0 = PartitionProduceResponseV0 { index :: {-# UNPACK #-} !Int32 -- ^ The partition index. @@ -722,6 +730,35 @@ type PartitionProduceResponseV3 = PartitionProduceResponseV2 type TopicProduceResponseV3 = TopicProduceResponseV2 +type PartitionProduceResponseV4 = PartitionProduceResponseV2 + +type TopicProduceResponseV4 = TopicProduceResponseV2 + +data PartitionProduceResponseV5 = PartitionProduceResponseV5 + { index :: {-# UNPACK #-} !Int32 + -- ^ The partition index. + , errorCode :: {-# UNPACK #-} !ErrorCode + -- ^ The error code, or 0 if there was no error. + , baseOffset :: {-# UNPACK #-} !Int64 + -- ^ The base offset. + , logAppendTimeMs :: {-# UNPACK #-} !Int64 + -- ^ The timestamp returned by broker after appending the messages. If + -- CreateTime is used for the topic, the timestamp will be -1. If + -- LogAppendTime is used for the topic, the timestamp will be the broker + -- local time when the messages are appended. + , logStartOffset :: {-# UNPACK #-} !Int64 + -- ^ The log start offset. + } deriving (Show, Eq, Generic) +instance Serializable PartitionProduceResponseV5 + +data TopicProduceResponseV5 = TopicProduceResponseV5 + { name :: !Text + -- ^ The topic name + , partitionResponses :: !(KaArray PartitionProduceResponseV5) + -- ^ Each partition that we produced to within the topic. + } deriving (Show, Eq, Generic) +instance Serializable TopicProduceResponseV5 + data SyncGroupRequestAssignmentV0 = SyncGroupRequestAssignmentV0 { memberId :: !Text -- ^ The ID of the member to assign. @@ -1379,6 +1416,10 @@ data ProduceRequestV3 = ProduceRequestV3 } deriving (Show, Eq, Generic) instance Serializable ProduceRequestV3 +type ProduceRequestV4 = ProduceRequestV3 + +type ProduceRequestV5 = ProduceRequestV3 + newtype ProduceResponseV0 = ProduceResponseV0 { responses :: (KaArray TopicProduceResponseV0) } deriving (Show, Eq, Generic) @@ -1404,6 +1445,17 @@ instance Serializable ProduceResponseV2 type ProduceResponseV3 = ProduceResponseV2 +type ProduceResponseV4 = ProduceResponseV2 + +data ProduceResponseV5 = ProduceResponseV5 + { responses :: !(KaArray TopicProduceResponseV5) + -- ^ Each produce response + , throttleTimeMs :: {-# UNPACK #-} !Int32 + -- ^ The duration in milliseconds for which the request was throttled due + -- to a quota violation, or zero if the request did not violate any quota. + } deriving (Show, Eq, Generic) +instance Serializable ProduceResponseV5 + newtype SaslAuthenticateRequestV0 = SaslAuthenticateRequestV0 { authBytes :: ByteString } deriving (Show, Eq, Generic) @@ -1911,10 +1963,18 @@ data HStreamKafkaV4 instance Service HStreamKafkaV4 where type ServiceName HStreamKafkaV4 = "HStreamKafkaV4" type ServiceMethods HStreamKafkaV4 = - '[ "fetch" + '[ "produce" + , "fetch" , "metadata" ] +instance HasMethodImpl HStreamKafkaV4 "produce" where + type MethodName HStreamKafkaV4 "produce" = "produce" + type MethodKey HStreamKafkaV4 "produce" = 0 + type MethodVersion HStreamKafkaV4 "produce" = 4 + type MethodInput HStreamKafkaV4 "produce" = ProduceRequestV4 + type MethodOutput HStreamKafkaV4 "produce" = ProduceResponseV4 + instance HasMethodImpl HStreamKafkaV4 "fetch" where type MethodName HStreamKafkaV4 "fetch" = "fetch" type MethodKey HStreamKafkaV4 "fetch" = 1 @@ -1934,9 +1994,17 @@ data HStreamKafkaV5 instance Service HStreamKafkaV5 where type ServiceName HStreamKafkaV5 = "HStreamKafkaV5" type ServiceMethods HStreamKafkaV5 = - '[ "metadata" + '[ "produce" + , "metadata" ] +instance HasMethodImpl HStreamKafkaV5 "produce" where + type MethodName HStreamKafkaV5 "produce" = "produce" + type MethodKey HStreamKafkaV5 "produce" = 0 + type MethodVersion HStreamKafkaV5 "produce" = 5 + type MethodInput HStreamKafkaV5 "produce" = ProduceRequestV5 + type MethodOutput HStreamKafkaV5 "produce" = ProduceResponseV5 + instance HasMethodImpl HStreamKafkaV5 "metadata" where type MethodName HStreamKafkaV5 "metadata" = "metadata" type MethodKey HStreamKafkaV5 "metadata" = 3 @@ -1975,7 +2043,7 @@ instance Show ApiKey where supportedApiVersions :: [ApiVersionV0] supportedApiVersions = - [ ApiVersionV0 (ApiKey 0) 0 3 + [ ApiVersionV0 (ApiKey 0) 0 5 , ApiVersionV0 (ApiKey 1) 0 4 , ApiVersionV0 (ApiKey 2) 0 1 , ApiVersionV0 (ApiKey 3) 0 5 @@ -2003,6 +2071,8 @@ getHeaderVersion (ApiKey (0)) 0 = (1, 0) getHeaderVersion (ApiKey (0)) 1 = (1, 0) getHeaderVersion (ApiKey (0)) 2 = (1, 0) getHeaderVersion (ApiKey (0)) 3 = (1, 0) +getHeaderVersion (ApiKey (0)) 4 = (1, 0) +getHeaderVersion (ApiKey (0)) 5 = (1, 0) getHeaderVersion (ApiKey (1)) 0 = (1, 0) getHeaderVersion (ApiKey (1)) 1 = (1, 0) getHeaderVersion (ApiKey (1)) 2 = (1, 0) diff --git a/hstream-kafka/protocol/Kafka/Protocol/Message/Total.hs b/hstream-kafka/protocol/Kafka/Protocol/Message/Total.hs index 6ac8fbc58..844c4e93c 100644 --- a/hstream-kafka/protocol/Kafka/Protocol/Message/Total.hs +++ b/hstream-kafka/protocol/Kafka/Protocol/Message/Total.hs @@ -1311,6 +1311,10 @@ partitionProduceDataToV2 :: PartitionProduceData -> PartitionProduceDataV2 partitionProduceDataToV2 = partitionProduceDataToV0 partitionProduceDataToV3 :: PartitionProduceData -> PartitionProduceDataV3 partitionProduceDataToV3 = partitionProduceDataToV0 +partitionProduceDataToV4 :: PartitionProduceData -> PartitionProduceDataV4 +partitionProduceDataToV4 = partitionProduceDataToV0 +partitionProduceDataToV5 :: PartitionProduceData -> PartitionProduceDataV5 +partitionProduceDataToV5 = partitionProduceDataToV0 partitionProduceDataFromV0 :: PartitionProduceDataV0 -> PartitionProduceData partitionProduceDataFromV0 x = PartitionProduceData @@ -1323,6 +1327,10 @@ partitionProduceDataFromV2 :: PartitionProduceDataV2 -> PartitionProduceData partitionProduceDataFromV2 = partitionProduceDataFromV0 partitionProduceDataFromV3 :: PartitionProduceDataV3 -> PartitionProduceData partitionProduceDataFromV3 = partitionProduceDataFromV0 +partitionProduceDataFromV4 :: PartitionProduceDataV4 -> PartitionProduceData +partitionProduceDataFromV4 = partitionProduceDataFromV0 +partitionProduceDataFromV5 :: PartitionProduceDataV5 -> PartitionProduceData +partitionProduceDataFromV5 = partitionProduceDataFromV0 data PartitionProduceResponse = PartitionProduceResponse { index :: {-# UNPACK #-} !Int32 @@ -1336,6 +1344,8 @@ data PartitionProduceResponse = PartitionProduceResponse -- CreateTime is used for the topic, the timestamp will be -1. If -- LogAppendTime is used for the topic, the timestamp will be the broker -- local time when the messages are appended. + , logStartOffset :: {-# UNPACK #-} !Int64 + -- ^ The log start offset. } deriving (Show, Eq, Generic) instance Serializable PartitionProduceResponse @@ -1356,6 +1366,16 @@ partitionProduceResponseToV2 x = PartitionProduceResponseV2 } partitionProduceResponseToV3 :: PartitionProduceResponse -> PartitionProduceResponseV3 partitionProduceResponseToV3 = partitionProduceResponseToV2 +partitionProduceResponseToV4 :: PartitionProduceResponse -> PartitionProduceResponseV4 +partitionProduceResponseToV4 = partitionProduceResponseToV2 +partitionProduceResponseToV5 :: PartitionProduceResponse -> PartitionProduceResponseV5 +partitionProduceResponseToV5 x = PartitionProduceResponseV5 + { index = x.index + , errorCode = x.errorCode + , baseOffset = x.baseOffset + , logAppendTimeMs = x.logAppendTimeMs + , logStartOffset = x.logStartOffset + } partitionProduceResponseFromV0 :: PartitionProduceResponseV0 -> PartitionProduceResponse partitionProduceResponseFromV0 x = PartitionProduceResponse @@ -1363,6 +1383,7 @@ partitionProduceResponseFromV0 x = PartitionProduceResponse , errorCode = x.errorCode , baseOffset = x.baseOffset , logAppendTimeMs = (-1) + , logStartOffset = (-1) } partitionProduceResponseFromV1 :: PartitionProduceResponseV1 -> PartitionProduceResponse partitionProduceResponseFromV1 = partitionProduceResponseFromV0 @@ -1372,9 +1393,20 @@ partitionProduceResponseFromV2 x = PartitionProduceResponse , errorCode = x.errorCode , baseOffset = x.baseOffset , logAppendTimeMs = x.logAppendTimeMs + , logStartOffset = (-1) } partitionProduceResponseFromV3 :: PartitionProduceResponseV3 -> PartitionProduceResponse partitionProduceResponseFromV3 = partitionProduceResponseFromV2 +partitionProduceResponseFromV4 :: PartitionProduceResponseV4 -> PartitionProduceResponse +partitionProduceResponseFromV4 = partitionProduceResponseFromV2 +partitionProduceResponseFromV5 :: PartitionProduceResponseV5 -> PartitionProduceResponse +partitionProduceResponseFromV5 x = PartitionProduceResponse + { index = x.index + , errorCode = x.errorCode + , baseOffset = x.baseOffset + , logAppendTimeMs = x.logAppendTimeMs + , logStartOffset = x.logStartOffset + } data SupportedFeatureKey = SupportedFeatureKey { name :: !CompactString @@ -1446,6 +1478,10 @@ topicProduceDataToV2 :: TopicProduceData -> TopicProduceDataV2 topicProduceDataToV2 = topicProduceDataToV0 topicProduceDataToV3 :: TopicProduceData -> TopicProduceDataV3 topicProduceDataToV3 = topicProduceDataToV0 +topicProduceDataToV4 :: TopicProduceData -> TopicProduceDataV4 +topicProduceDataToV4 = topicProduceDataToV0 +topicProduceDataToV5 :: TopicProduceData -> TopicProduceDataV5 +topicProduceDataToV5 = topicProduceDataToV0 topicProduceDataFromV0 :: TopicProduceDataV0 -> TopicProduceData topicProduceDataFromV0 x = TopicProduceData @@ -1458,6 +1494,10 @@ topicProduceDataFromV2 :: TopicProduceDataV2 -> TopicProduceData topicProduceDataFromV2 = topicProduceDataFromV0 topicProduceDataFromV3 :: TopicProduceDataV3 -> TopicProduceData topicProduceDataFromV3 = topicProduceDataFromV0 +topicProduceDataFromV4 :: TopicProduceDataV4 -> TopicProduceData +topicProduceDataFromV4 = topicProduceDataFromV0 +topicProduceDataFromV5 :: TopicProduceDataV5 -> TopicProduceData +topicProduceDataFromV5 = topicProduceDataFromV0 data TopicProduceResponse = TopicProduceResponse { name :: !Text @@ -1481,6 +1521,13 @@ topicProduceResponseToV2 x = TopicProduceResponseV2 } topicProduceResponseToV3 :: TopicProduceResponse -> TopicProduceResponseV3 topicProduceResponseToV3 = topicProduceResponseToV2 +topicProduceResponseToV4 :: TopicProduceResponse -> TopicProduceResponseV4 +topicProduceResponseToV4 = topicProduceResponseToV2 +topicProduceResponseToV5 :: TopicProduceResponse -> TopicProduceResponseV5 +topicProduceResponseToV5 x = TopicProduceResponseV5 + { name = x.name + , partitionResponses = fmap partitionProduceResponseToV5 x.partitionResponses + } topicProduceResponseFromV0 :: TopicProduceResponseV0 -> TopicProduceResponse topicProduceResponseFromV0 x = TopicProduceResponse @@ -1496,6 +1543,13 @@ topicProduceResponseFromV2 x = TopicProduceResponse } topicProduceResponseFromV3 :: TopicProduceResponseV3 -> TopicProduceResponse topicProduceResponseFromV3 = topicProduceResponseFromV2 +topicProduceResponseFromV4 :: TopicProduceResponseV4 -> TopicProduceResponse +topicProduceResponseFromV4 = topicProduceResponseFromV2 +topicProduceResponseFromV5 :: TopicProduceResponseV5 -> TopicProduceResponse +topicProduceResponseFromV5 x = TopicProduceResponse + { name = x.name + , partitionResponses = fmap partitionProduceResponseFromV5 x.partitionResponses + } data ApiVersionsRequest = ApiVersionsRequest { clientSoftwareName :: !CompactString @@ -2751,6 +2805,10 @@ produceRequestToV3 x = ProduceRequestV3 , timeoutMs = x.timeoutMs , topicData = fmap topicProduceDataToV3 x.topicData } +produceRequestToV4 :: ProduceRequest -> ProduceRequestV4 +produceRequestToV4 = produceRequestToV3 +produceRequestToV5 :: ProduceRequest -> ProduceRequestV5 +produceRequestToV5 = produceRequestToV3 produceRequestFromV0 :: ProduceRequestV0 -> ProduceRequest produceRequestFromV0 x = ProduceRequest @@ -2770,6 +2828,10 @@ produceRequestFromV3 x = ProduceRequest , topicData = fmap topicProduceDataFromV3 x.topicData , transactionalId = x.transactionalId } +produceRequestFromV4 :: ProduceRequestV4 -> ProduceRequest +produceRequestFromV4 = produceRequestFromV3 +produceRequestFromV5 :: ProduceRequestV5 -> ProduceRequest +produceRequestFromV5 = produceRequestFromV3 data ProduceResponse = ProduceResponse { responses :: !(KaArray TopicProduceResponse) @@ -2796,6 +2858,13 @@ produceResponseToV2 x = ProduceResponseV2 } produceResponseToV3 :: ProduceResponse -> ProduceResponseV3 produceResponseToV3 = produceResponseToV2 +produceResponseToV4 :: ProduceResponse -> ProduceResponseV4 +produceResponseToV4 = produceResponseToV2 +produceResponseToV5 :: ProduceResponse -> ProduceResponseV5 +produceResponseToV5 x = ProduceResponseV5 + { responses = fmap topicProduceResponseToV5 x.responses + , throttleTimeMs = x.throttleTimeMs + } produceResponseFromV0 :: ProduceResponseV0 -> ProduceResponse produceResponseFromV0 x = ProduceResponse @@ -2814,6 +2883,13 @@ produceResponseFromV2 x = ProduceResponse } produceResponseFromV3 :: ProduceResponseV3 -> ProduceResponse produceResponseFromV3 = produceResponseFromV2 +produceResponseFromV4 :: ProduceResponseV4 -> ProduceResponse +produceResponseFromV4 = produceResponseFromV2 +produceResponseFromV5 :: ProduceResponseV5 -> ProduceResponse +produceResponseFromV5 x = ProduceResponse + { responses = fmap topicProduceResponseFromV5 x.responses + , throttleTimeMs = x.throttleTimeMs + } newtype SaslAuthenticateRequest = SaslAuthenticateRequest { authBytes :: ByteString diff --git a/script/kafka_gen.py b/script/kafka_gen.py index 330597d87..817fe5120 100755 --- a/script/kafka_gen.py +++ b/script/kafka_gen.py @@ -110,7 +110,7 @@ def get_field_default(field_type, default=None): API_VERSION_PATCHES = { "ApiVersions": (0, 3), "Metadata": (0, 5), - "Produce": (0, 3), + "Produce": (0, 5), "Fetch": (0, 4), "OffsetFetch": (0, 3), "OffsetCommit": (0, 3), @@ -357,11 +357,11 @@ def in_version_range(version, min_version, max_version): def int16_to_word16(num): - return struct.unpack('H', struct.pack('h', num))[0] + return struct.unpack("H", struct.pack("h", num))[0] def word16_to_int16(num): - return struct.unpack('h', struct.pack('H', num))[0] + return struct.unpack("h", struct.pack("H", num))[0] # https://github.com/apache/kafka/blob/3.5.1/generator/src/main/java/org/apache/kafka/message/ApiMessageTypeGenerator.java#L329 @@ -457,8 +457,9 @@ def parse_field(field, api_version=0, flexible=False): api_version, min_tagged_version, max_tagged_version ) - # if field is NullableString, the default value should be Nothing(null) - if type_type == "string" and default is None and in_null_version: + # If the field is NullableString and there is no default value, + # we set the default value to "null". + if type_type == "string" and in_null_version and default is None: default = "null" if (in_api_version, in_tagged_version) == (False, False):