Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kafka api: a fake InitProducerId handler #1722

Merged
merged 1 commit into from
Jan 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions hstream-kafka/HStream/Kafka/Server/Handler.hsc
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ import qualified Kafka.Protocol.Service as K
#cv_handler ApiVersions, 0, 3
#cv_handler Metadata, 0, 5
#cv_handler Produce, 0, 3
#cv_handler InitProducerId, 0, 0
#cv_handler Fetch, 0, 4
#cv_handler DescribeConfigs, 0, 0

Expand All @@ -85,6 +86,7 @@ handlers sc =
, #mk_handler Metadata, 0, 5
-- Write
, #mk_handler Produce, 0, 3
, #mk_handler InitProducerId, 0, 0
-- Read
, #mk_handler Fetch, 0, 4

Expand Down
16 changes: 16 additions & 0 deletions hstream-kafka/HStream/Kafka/Server/Handler/Produce.hs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
module HStream.Kafka.Server.Handler.Produce
( handleProduce
, handleInitProducerId
) where

import qualified Control.Concurrent.Async as Async
Expand Down Expand Up @@ -83,6 +84,21 @@ handleProduce ServerContext{..} _ req = do

pure $ K.ProduceResponse (K.KaArray $ Just responses) 0{- TODO: throttleTimeMs -}

-- TODO
handleInitProducerId
:: ServerContext
-> K.RequestContext
-> K.InitProducerIdRequest
-> IO K.InitProducerIdResponse
handleInitProducerId ServerContext{..} _ req = do
Log.warning "InitProducerId is not implemented"
pure $ K.InitProducerIdResponse
{ throttleTimeMs = 0
, errorCode = K.NONE
, producerId = 0
, producerEpoch = 0
}

-------------------------------------------------------------------------------

appendRecords
Expand Down
40 changes: 40 additions & 0 deletions hstream-kafka/message/InitProducerIdRequest.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

{
"apiKey": 22,
"type": "request",
"listeners": ["zkBroker", "broker"],
"name": "InitProducerIdRequest",
// Version 1 is the same as version 0.
//
// Version 2 is the first flexible version.
//
// Version 3 adds ProducerId and ProducerEpoch, allowing producers to try to resume after an INVALID_PRODUCER_EPOCH error
//
// Version 4 adds the support for new error code PRODUCER_FENCED.
"validVersions": "0-4",
"flexibleVersions": "2+",
"fields": [
{ "name": "TransactionalId", "type": "string", "versions": "0+", "nullableVersions": "0+", "entityType": "transactionalId",
"about": "The transactional id, or null if the producer is not transactional." },
{ "name": "TransactionTimeoutMs", "type": "int32", "versions": "0+",
"about": "The time in ms to wait before aborting idle transactions sent by this producer. This is only relevant if a TransactionalId has been defined." },
{ "name": "ProducerId", "type": "int64", "versions": "3+", "default": "-1", "entityType": "producerId",
"about": "The producer id. This is used to disambiguate requests if a transactional id is reused following its expiration." },
{ "name": "ProducerEpoch", "type": "int16", "versions": "3+", "default": "-1",
"about": "The producer's current epoch. This will be checked against the producer epoch on the broker, and the request will return an error if they do not match." }
]
}
39 changes: 39 additions & 0 deletions hstream-kafka/message/InitProducerIdResponse.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

{
"apiKey": 22,
"type": "response",
"name": "InitProducerIdResponse",
// Starting in version 1, on quota violation, brokers send out responses before throttling.
//
// Version 2 is the first flexible version.
//
// Version 3 is the same as version 2.
//
// Version 4 adds the support for new error code PRODUCER_FENCED.
"validVersions": "0-4",
"flexibleVersions": "2+",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true,
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The error code, or 0 if there was no error." },
{ "name": "ProducerId", "type": "int64", "versions": "0+", "entityType": "producerId",
"default": -1, "about": "The current producer id." },
{ "name": "ProducerEpoch", "type": "int16", "versions": "0+",
"about": "The current epoch associated with the producer id." }
]
}
33 changes: 33 additions & 0 deletions hstream-kafka/protocol/Kafka/Protocol/Message/Struct.hs
Original file line number Diff line number Diff line change
Expand Up @@ -974,6 +974,28 @@ data HeartbeatResponseV1 = HeartbeatResponseV1
} deriving (Show, Eq, Generic)
instance Serializable HeartbeatResponseV1

data InitProducerIdRequestV0 = InitProducerIdRequestV0
{ transactionalId :: !NullableString
-- ^ The transactional id, or null if the producer is not transactional.
, transactionTimeoutMs :: {-# UNPACK #-} !Int32
-- ^ The time in ms to wait before aborting idle transactions sent by this
-- producer. This is only relevant if a TransactionalId has been defined.
} deriving (Show, Eq, Generic)
instance Serializable InitProducerIdRequestV0

data InitProducerIdResponseV0 = InitProducerIdResponseV0
{ throttleTimeMs :: {-# UNPACK #-} !Int32
-- ^ The duration in milliseconds for which the request was throttled due
-- to a quota violation, or zero if the request did not violate any quota.
, errorCode :: {-# UNPACK #-} !ErrorCode
-- ^ The error code, or 0 if there was no error.
, producerId :: {-# UNPACK #-} !Int64
-- ^ The current producer id.
, producerEpoch :: {-# UNPACK #-} !Int16
-- ^ The current epoch associated with the producer id.
} deriving (Show, Eq, Generic)
instance Serializable InitProducerIdResponseV0

data JoinGroupRequestV0 = JoinGroupRequestV0
{ groupId :: !Text
-- ^ The group identifier.
Expand Down Expand Up @@ -1462,6 +1484,7 @@ instance Service HStreamKafkaV0 where
, "apiVersions"
, "createTopics"
, "deleteTopics"
, "initProducerId"
, "describeConfigs"
, "saslAuthenticate"
]
Expand Down Expand Up @@ -1592,6 +1615,13 @@ instance HasMethodImpl HStreamKafkaV0 "deleteTopics" where
type MethodInput HStreamKafkaV0 "deleteTopics" = DeleteTopicsRequestV0
type MethodOutput HStreamKafkaV0 "deleteTopics" = DeleteTopicsResponseV0

instance HasMethodImpl HStreamKafkaV0 "initProducerId" where
type MethodName HStreamKafkaV0 "initProducerId" = "initProducerId"
type MethodKey HStreamKafkaV0 "initProducerId" = 22
type MethodVersion HStreamKafkaV0 "initProducerId" = 0
type MethodInput HStreamKafkaV0 "initProducerId" = InitProducerIdRequestV0
type MethodOutput HStreamKafkaV0 "initProducerId" = InitProducerIdResponseV0

instance HasMethodImpl HStreamKafkaV0 "describeConfigs" where
type MethodName HStreamKafkaV0 "describeConfigs" = "describeConfigs"
type MethodKey HStreamKafkaV0 "describeConfigs" = 32
Expand Down Expand Up @@ -1905,6 +1935,7 @@ instance Show ApiKey where
show (ApiKey (18)) = "ApiVersions(18)"
show (ApiKey (19)) = "CreateTopics(19)"
show (ApiKey (20)) = "DeleteTopics(20)"
show (ApiKey (22)) = "InitProducerId(22)"
show (ApiKey (32)) = "DescribeConfigs(32)"
show (ApiKey (36)) = "SaslAuthenticate(36)"
show (ApiKey n) = "Unknown " <> show n
Expand All @@ -1928,6 +1959,7 @@ supportedApiVersions =
, ApiVersionV0 (ApiKey 18) 0 3
, ApiVersionV0 (ApiKey 19) 0 0
, ApiVersionV0 (ApiKey 20) 0 0
, ApiVersionV0 (ApiKey 22) 0 0
, ApiVersionV0 (ApiKey 32) 0 0
, ApiVersionV0 (ApiKey 36) 0 0
]
Expand Down Expand Up @@ -1981,6 +2013,7 @@ getHeaderVersion (ApiKey (18)) 2 = (1, 0)
getHeaderVersion (ApiKey (18)) 3 = (2, 0)
getHeaderVersion (ApiKey (19)) 0 = (1, 0)
getHeaderVersion (ApiKey (20)) 0 = (1, 0)
getHeaderVersion (ApiKey (22)) 0 = (1, 0)
getHeaderVersion (ApiKey (32)) 0 = (1, 0)
getHeaderVersion (ApiKey (36)) 0 = (1, 0)
getHeaderVersion k v = error $ "Unknown " <> show k <> " v" <> show v
Expand Down
57 changes: 57 additions & 0 deletions hstream-kafka/protocol/Kafka/Protocol/Message/Total.hs
Original file line number Diff line number Diff line change
Expand Up @@ -2022,6 +2022,56 @@ heartbeatResponseFromV1 x = HeartbeatResponse
, throttleTimeMs = x.throttleTimeMs
}

data InitProducerIdRequest = InitProducerIdRequest
{ transactionalId :: !NullableString
-- ^ The transactional id, or null if the producer is not transactional.
, transactionTimeoutMs :: {-# UNPACK #-} !Int32
-- ^ The time in ms to wait before aborting idle transactions sent by this
-- producer. This is only relevant if a TransactionalId has been defined.
} deriving (Show, Eq, Generic)
instance Serializable InitProducerIdRequest

initProducerIdRequestToV0 :: InitProducerIdRequest -> InitProducerIdRequestV0
initProducerIdRequestToV0 x = InitProducerIdRequestV0
{ transactionalId = x.transactionalId
, transactionTimeoutMs = x.transactionTimeoutMs
}

initProducerIdRequestFromV0 :: InitProducerIdRequestV0 -> InitProducerIdRequest
initProducerIdRequestFromV0 x = InitProducerIdRequest
{ transactionalId = x.transactionalId
, transactionTimeoutMs = x.transactionTimeoutMs
}

data InitProducerIdResponse = InitProducerIdResponse
{ throttleTimeMs :: {-# UNPACK #-} !Int32
-- ^ The duration in milliseconds for which the request was throttled due
-- to a quota violation, or zero if the request did not violate any quota.
, errorCode :: {-# UNPACK #-} !ErrorCode
-- ^ The error code, or 0 if there was no error.
, producerId :: {-# UNPACK #-} !Int64
-- ^ The current producer id.
, producerEpoch :: {-# UNPACK #-} !Int16
-- ^ The current epoch associated with the producer id.
} deriving (Show, Eq, Generic)
instance Serializable InitProducerIdResponse

initProducerIdResponseToV0 :: InitProducerIdResponse -> InitProducerIdResponseV0
initProducerIdResponseToV0 x = InitProducerIdResponseV0
{ throttleTimeMs = x.throttleTimeMs
, errorCode = x.errorCode
, producerId = x.producerId
, producerEpoch = x.producerEpoch
}

initProducerIdResponseFromV0 :: InitProducerIdResponseV0 -> InitProducerIdResponse
initProducerIdResponseFromV0 x = InitProducerIdResponse
{ throttleTimeMs = x.throttleTimeMs
, errorCode = x.errorCode
, producerId = x.producerId
, producerEpoch = x.producerEpoch
}

data JoinGroupRequest = JoinGroupRequest
{ groupId :: !Text
-- ^ The group identifier.
Expand Down Expand Up @@ -2935,6 +2985,13 @@ instance Exception HeartbeatResponseEx
catchHeartbeatResponseEx :: IO HeartbeatResponse -> IO HeartbeatResponse
catchHeartbeatResponseEx act = act `catch` \(HeartbeatResponseEx resp) -> pure resp

newtype InitProducerIdResponseEx = InitProducerIdResponseEx InitProducerIdResponse
deriving (Show, Eq)
instance Exception InitProducerIdResponseEx

catchInitProducerIdResponseEx :: IO InitProducerIdResponse -> IO InitProducerIdResponse
catchInitProducerIdResponseEx act = act `catch` \(InitProducerIdResponseEx resp) -> pure resp

newtype JoinGroupResponseEx = JoinGroupResponseEx JoinGroupResponse
deriving (Show, Eq)
instance Exception JoinGroupResponseEx
Expand Down