diff --git a/hstream-kafka/HStream/Kafka/Server/Handler.hsc b/hstream-kafka/HStream/Kafka/Server/Handler.hsc index 408c7d377..1ca43e8f7 100644 --- a/hstream-kafka/HStream/Kafka/Server/Handler.hsc +++ b/hstream-kafka/HStream/Kafka/Server/Handler.hsc @@ -57,6 +57,7 @@ import qualified Kafka.Protocol.Service as K ------------------------------------------------------------------------------- #cv_handler ApiVersions, 0, 3 +#cv_handler ListOffsets, 0, 2 #cv_handler Metadata, 0, 5 #cv_handler Produce, 0, 3 #cv_handler InitProducerId, 0, 0 @@ -83,6 +84,7 @@ import qualified Kafka.Protocol.Service as K handlers :: ServerContext -> [K.ServiceHandler] handlers sc = [ #mk_handler ApiVersions, 0, 3 + , #mk_handler ListOffsets, 0, 2 , #mk_handler Metadata, 0, 5 -- Write , #mk_handler Produce, 0, 3 @@ -106,9 +108,6 @@ handlers sc = , K.hd (K.RPC :: K.RPC K.HStreamKafkaV0 "createTopics") (handleCreateTopicsV0 sc) , K.hd (K.RPC :: K.RPC K.HStreamKafkaV0 "deleteTopics") (handleDeleteTopicsV0 sc) - , K.hd (K.RPC :: K.RPC K.HStreamKafkaV0 "listOffsets") (handleListOffsetsV0 sc) - , K.hd (K.RPC :: K.RPC K.HStreamKafkaV1 "listOffsets") (handleListOffsetsV1 sc) - , K.hd (K.RPC :: K.RPC K.HStreamKafkaV0 "saslHandshake") (handleAfterAuthSaslHandshakeV0 sc) , K.hd (K.RPC :: K.RPC K.HStreamKafkaV1 "saslHandshake") (handleAfterAuthSaslHandshakeV1 sc) diff --git a/hstream-kafka/HStream/Kafka/Server/Handler/Offset.hs b/hstream-kafka/HStream/Kafka/Server/Handler/Offset.hs index 30e74448b..097f5daa8 100644 --- a/hstream-kafka/HStream/Kafka/Server/Handler/Offset.hs +++ b/hstream-kafka/HStream/Kafka/Server/Handler/Offset.hs @@ -3,11 +3,8 @@ module HStream.Kafka.Server.Handler.Offset ( handleOffsetCommit - , handleOffsetFetch - - , handleListOffsetsV0 - , handleListOffsetsV1 + , handleListOffsets ) where @@ -20,7 +17,7 @@ import qualified Data.Vector as V import HStream.Kafka.Common.OffsetManager (getLatestOffset, getOffsetByTimestamp, getOldestOffset) -import qualified HStream.Kafka.Common.Utils as Utils +import HStream.Kafka.Common.Utils (mapKaArray) import qualified HStream.Kafka.Group.GroupCoordinator as GC import HStream.Kafka.Server.Types (ServerContext (..)) import qualified HStream.Store as S @@ -37,52 +34,60 @@ pattern LatestTimestamp = (-1) pattern EarliestTimestamp :: Int64 pattern EarliestTimestamp = (-2) -handleListOffsetsV0 - :: ServerContext -> K.RequestContext -> K.ListOffsetsRequestV0 -> IO K.ListOffsetsResponseV0 -handleListOffsetsV0 sc _ K.ListOffsetsRequestV0{..} = do - case K.unKaArray topics of - Nothing -> undefined - -- ^ check kafka - Just topics' -> do - res <- V.forM topics' $ \K.ListOffsetsTopicV0 {..} -> do - listOffsetTopicPartitions sc name (K.unKaArray (Utils.mapKaArray convertRequestPartition partitions)) - return $ K.ListOffsetsResponseV0 {topics = K.NonNullKaArray (V.map convertTopic res)} - where convertRequestPartition p = K.ListOffsetsPartitionV1 {timestamp=p.timestamp, partitionIndex=p.partitionIndex} - convertTopic topic = K.ListOffsetsTopicResponseV0 {partitions=Utils.mapKaArray convertResponsePartition topic.partitions, name=topic.name} - convertResponsePartition p = K.ListOffsetsPartitionResponseV0 - { errorCode=0 - , oldStyleOffsets=K.NonNullKaArray (V.singleton p.offset) - , partitionIndex=p.partitionIndex - } +handleListOffsets + :: ServerContext -> K.RequestContext -> K.ListOffsetsRequest -> IO K.ListOffsetsResponse +handleListOffsets sc reqCtx req + | reqCtx.apiVersion >= 2 && req.isolationLevel /= 0 = return $ mkErrResponse req + | otherwise = listOffsets sc reqCtx req + where + mkErrResponse K.ListOffsetsRequest{..} = + let topicsRsp = mapKaArray (\K.ListOffsetsTopic{..} -> + let partitionsRsp = mapKaArray (\K.ListOffsetsPartition{..} -> + K.ListOffsetsPartitionResponse + { offset = -1 + , timestamp = -1 + -- ^ FIXME: read record timestamp ? + , partitionIndex = partitionIndex + , errorCode = K.INVALID_REQUEST + , oldStyleOffsets = K.KaArray Nothing + }) partitions + in K.ListOffsetsTopicResponse {partitions=partitionsRsp, name=name} + ) topics + in K.ListOffsetsResponse {topics=topicsRsp, throttleTimeMs=0} -handleListOffsetsV1 - :: ServerContext -> K.RequestContext -> K.ListOffsetsRequestV1 -> IO K.ListOffsetsResponseV1 -handleListOffsetsV1 sc _ K.ListOffsetsRequestV1{..} = do +listOffsets + :: ServerContext -> K.RequestContext -> K.ListOffsetsRequest -> IO K.ListOffsetsResponse +listOffsets sc _ K.ListOffsetsRequest{..} = do case K.unKaArray topics of Nothing -> undefined -- ^ check kafka Just topics' -> do - res <- V.forM topics' $ \K.ListOffsetsTopicV1 {..} -> do + res <- V.forM topics' $ \K.ListOffsetsTopic {..} -> do listOffsetTopicPartitions sc name (K.unKaArray partitions) - return $ K.ListOffsetsResponseV1 {topics = K.KaArray {unKaArray = Just res}} + return $ K.ListOffsetsResponse {topics = K.KaArray {unKaArray = Just res}, throttleTimeMs = 0} -listOffsetTopicPartitions :: ServerContext -> Text -> Maybe (Vector K.ListOffsetsPartitionV1) -> IO K.ListOffsetsTopicResponseV1 +listOffsetTopicPartitions + :: ServerContext + -> Text + -> Maybe (Vector K.ListOffsetsPartition) + -> IO K.ListOffsetsTopicResponse listOffsetTopicPartitions _ topicName Nothing = do - return $ K.ListOffsetsTopicResponseV1 {partitions = K.KaArray {unKaArray = Nothing}, name = topicName} + return $ K.ListOffsetsTopicResponse {partitions = K.KaArray {unKaArray = Nothing}, name = topicName} listOffsetTopicPartitions ServerContext{..} topicName (Just offsetsPartitions) = do orderedParts <- S.listStreamPartitionsOrdered scLDClient (S.transToTopicStreamName topicName) - res <- V.forM offsetsPartitions $ \K.ListOffsetsPartitionV1{..} -> do + res <- V.forM offsetsPartitions $ \K.ListOffsetsPartition{..} -> do -- TODO: handle Nothing let partition = orderedParts V.! (fromIntegral partitionIndex) offset <- getOffset (snd partition) timestamp - return $ K.ListOffsetsPartitionResponseV1 + return $ K.ListOffsetsPartitionResponse { offset = offset , timestamp = timestamp -- ^ FIXME: read record timestamp ? , partitionIndex = partitionIndex , errorCode = K.NONE + , oldStyleOffsets = K.NonNullKaArray (V.singleton offset) } - return $ K.ListOffsetsTopicResponseV1 {partitions = K.KaArray {unKaArray = Just res}, name = topicName} + return $ K.ListOffsetsTopicResponse {partitions = K.KaArray {unKaArray = Just res}, name = topicName} where -- NOTE: The last offset of a partition is the offset of the upcoming -- message, i.e. the offset of the last available message + 1. diff --git a/hstream-kafka/protocol/Kafka/Protocol/Message/Struct.hs b/hstream-kafka/protocol/Kafka/Protocol/Message/Struct.hs index 4c8018b3c..c3e89e9fa 100644 --- a/hstream-kafka/protocol/Kafka/Protocol/Message/Struct.hs +++ b/hstream-kafka/protocol/Kafka/Protocol/Message/Struct.hs @@ -364,6 +364,10 @@ data ListOffsetsTopicV1 = ListOffsetsTopicV1 } deriving (Show, Eq, Generic) instance Serializable ListOffsetsTopicV1 +type ListOffsetsPartitionV2 = ListOffsetsPartitionV1 + +type ListOffsetsTopicV2 = ListOffsetsTopicV1 + data ListOffsetsPartitionResponseV0 = ListOffsetsPartitionResponseV0 { partitionIndex :: {-# UNPACK #-} !Int32 -- ^ The partition index. @@ -402,6 +406,10 @@ data ListOffsetsTopicResponseV1 = ListOffsetsTopicResponseV1 } deriving (Show, Eq, Generic) instance Serializable ListOffsetsTopicResponseV1 +type ListOffsetsPartitionResponseV2 = ListOffsetsPartitionResponseV1 + +type ListOffsetsTopicResponseV2 = ListOffsetsTopicResponseV1 + newtype MetadataRequestTopicV0 = MetadataRequestTopicV0 { name :: Text } deriving (Show, Eq, Generic) @@ -1160,6 +1168,24 @@ data ListOffsetsRequestV1 = ListOffsetsRequestV1 } deriving (Show, Eq, Generic) instance Serializable ListOffsetsRequestV1 +data ListOffsetsRequestV2 = ListOffsetsRequestV2 + { replicaId :: {-# UNPACK #-} !Int32 + -- ^ The broker ID of the requestor, or -1 if this request is being made by + -- a normal consumer. + , isolationLevel :: {-# UNPACK #-} !Int8 + -- ^ This setting controls the visibility of transactional records. Using + -- READ_UNCOMMITTED (isolation_level = 0) makes all records visible. With + -- READ_COMMITTED (isolation_level = 1), non-transactional and COMMITTED + -- transactional records are visible. To be more concrete, READ_COMMITTED + -- returns all data from offsets smaller than the current LSO (last stable + -- offset), and enables the inclusion of the list of aborted transactions + -- in the result, which allows consumers to discard ABORTED transactional + -- records + , topics :: !(KaArray ListOffsetsTopicV1) + -- ^ Each topic in the request. + } deriving (Show, Eq, Generic) +instance Serializable ListOffsetsRequestV2 + newtype ListOffsetsResponseV0 = ListOffsetsResponseV0 { topics :: (KaArray ListOffsetsTopicResponseV0) } deriving (Show, Eq, Generic) @@ -1170,6 +1196,15 @@ newtype ListOffsetsResponseV1 = ListOffsetsResponseV1 } deriving (Show, Eq, Generic) instance Serializable ListOffsetsResponseV1 +data ListOffsetsResponseV2 = ListOffsetsResponseV2 + { throttleTimeMs :: {-# UNPACK #-} !Int32 + -- ^ The duration in milliseconds for which the request was throttled due + -- to a quota violation, or zero if the request did not violate any quota. + , topics :: !(KaArray ListOffsetsTopicResponseV1) + -- ^ Each topic in the response. + } deriving (Show, Eq, Generic) +instance Serializable ListOffsetsResponseV2 + newtype MetadataRequestV0 = MetadataRequestV0 { topics :: (KaArray MetadataRequestTopicV0) } deriving (Show, Eq, Generic) @@ -1795,6 +1830,7 @@ instance Service HStreamKafkaV2 where type ServiceMethods HStreamKafkaV2 = '[ "produce" , "fetch" + , "listOffsets" , "metadata" , "offsetCommit" , "offsetFetch" @@ -1816,6 +1852,13 @@ instance HasMethodImpl HStreamKafkaV2 "fetch" where type MethodInput HStreamKafkaV2 "fetch" = FetchRequestV2 type MethodOutput HStreamKafkaV2 "fetch" = FetchResponseV2 +instance HasMethodImpl HStreamKafkaV2 "listOffsets" where + type MethodName HStreamKafkaV2 "listOffsets" = "listOffsets" + type MethodKey HStreamKafkaV2 "listOffsets" = 2 + type MethodVersion HStreamKafkaV2 "listOffsets" = 2 + type MethodInput HStreamKafkaV2 "listOffsets" = ListOffsetsRequestV2 + type MethodOutput HStreamKafkaV2 "listOffsets" = ListOffsetsResponseV2 + instance HasMethodImpl HStreamKafkaV2 "metadata" where type MethodName HStreamKafkaV2 "metadata" = "metadata" type MethodKey HStreamKafkaV2 "metadata" = 3 @@ -1977,7 +2020,7 @@ supportedApiVersions :: [ApiVersionV0] supportedApiVersions = [ ApiVersionV0 (ApiKey 0) 0 3 , ApiVersionV0 (ApiKey 1) 0 4 - , ApiVersionV0 (ApiKey 2) 0 1 + , ApiVersionV0 (ApiKey 2) 0 2 , ApiVersionV0 (ApiKey 3) 0 5 , ApiVersionV0 (ApiKey 8) 0 3 , ApiVersionV0 (ApiKey 9) 0 3 @@ -2010,6 +2053,7 @@ getHeaderVersion (ApiKey (1)) 3 = (1, 0) getHeaderVersion (ApiKey (1)) 4 = (1, 0) getHeaderVersion (ApiKey (2)) 0 = (1, 0) getHeaderVersion (ApiKey (2)) 1 = (1, 0) +getHeaderVersion (ApiKey (2)) 2 = (1, 0) getHeaderVersion (ApiKey (3)) 0 = (1, 0) getHeaderVersion (ApiKey (3)) 1 = (1, 0) getHeaderVersion (ApiKey (3)) 2 = (1, 0) diff --git a/hstream-kafka/protocol/Kafka/Protocol/Message/Total.hs b/hstream-kafka/protocol/Kafka/Protocol/Message/Total.hs index 6ac8fbc58..f7d8fae61 100644 --- a/hstream-kafka/protocol/Kafka/Protocol/Message/Total.hs +++ b/hstream-kafka/protocol/Kafka/Protocol/Message/Total.hs @@ -600,6 +600,8 @@ listOffsetsPartitionToV1 x = ListOffsetsPartitionV1 { partitionIndex = x.partitionIndex , timestamp = x.timestamp } +listOffsetsPartitionToV2 :: ListOffsetsPartition -> ListOffsetsPartitionV2 +listOffsetsPartitionToV2 = listOffsetsPartitionToV1 listOffsetsPartitionFromV0 :: ListOffsetsPartitionV0 -> ListOffsetsPartition listOffsetsPartitionFromV0 x = ListOffsetsPartition @@ -613,6 +615,8 @@ listOffsetsPartitionFromV1 x = ListOffsetsPartition , timestamp = x.timestamp , maxNumOffsets = 1 } +listOffsetsPartitionFromV2 :: ListOffsetsPartitionV2 -> ListOffsetsPartition +listOffsetsPartitionFromV2 = listOffsetsPartitionFromV1 data ListOffsetsPartitionResponse = ListOffsetsPartitionResponse { partitionIndex :: {-# UNPACK #-} !Int32 @@ -641,6 +645,8 @@ listOffsetsPartitionResponseToV1 x = ListOffsetsPartitionResponseV1 , timestamp = x.timestamp , offset = x.offset } +listOffsetsPartitionResponseToV2 :: ListOffsetsPartitionResponse -> ListOffsetsPartitionResponseV2 +listOffsetsPartitionResponseToV2 = listOffsetsPartitionResponseToV1 listOffsetsPartitionResponseFromV0 :: ListOffsetsPartitionResponseV0 -> ListOffsetsPartitionResponse listOffsetsPartitionResponseFromV0 x = ListOffsetsPartitionResponse @@ -658,6 +664,8 @@ listOffsetsPartitionResponseFromV1 x = ListOffsetsPartitionResponse , timestamp = x.timestamp , offset = x.offset } +listOffsetsPartitionResponseFromV2 :: ListOffsetsPartitionResponseV2 -> ListOffsetsPartitionResponse +listOffsetsPartitionResponseFromV2 = listOffsetsPartitionResponseFromV1 data ListOffsetsTopic = ListOffsetsTopic { name :: !Text @@ -677,6 +685,8 @@ listOffsetsTopicToV1 x = ListOffsetsTopicV1 { name = x.name , partitions = fmap listOffsetsPartitionToV1 x.partitions } +listOffsetsTopicToV2 :: ListOffsetsTopic -> ListOffsetsTopicV2 +listOffsetsTopicToV2 = listOffsetsTopicToV1 listOffsetsTopicFromV0 :: ListOffsetsTopicV0 -> ListOffsetsTopic listOffsetsTopicFromV0 x = ListOffsetsTopic @@ -688,6 +698,8 @@ listOffsetsTopicFromV1 x = ListOffsetsTopic { name = x.name , partitions = fmap listOffsetsPartitionFromV1 x.partitions } +listOffsetsTopicFromV2 :: ListOffsetsTopicV2 -> ListOffsetsTopic +listOffsetsTopicFromV2 = listOffsetsTopicFromV1 data ListOffsetsTopicResponse = ListOffsetsTopicResponse { name :: !Text @@ -707,6 +719,8 @@ listOffsetsTopicResponseToV1 x = ListOffsetsTopicResponseV1 { name = x.name , partitions = fmap listOffsetsPartitionResponseToV1 x.partitions } +listOffsetsTopicResponseToV2 :: ListOffsetsTopicResponse -> ListOffsetsTopicResponseV2 +listOffsetsTopicResponseToV2 = listOffsetsTopicResponseToV1 listOffsetsTopicResponseFromV0 :: ListOffsetsTopicResponseV0 -> ListOffsetsTopicResponse listOffsetsTopicResponseFromV0 x = ListOffsetsTopicResponse @@ -718,6 +732,8 @@ listOffsetsTopicResponseFromV1 x = ListOffsetsTopicResponse { name = x.name , partitions = fmap listOffsetsPartitionResponseFromV1 x.partitions } +listOffsetsTopicResponseFromV2 :: ListOffsetsTopicResponseV2 -> ListOffsetsTopicResponse +listOffsetsTopicResponseFromV2 = listOffsetsTopicResponseFromV1 data ListedGroup = ListedGroup { groupId :: !Text @@ -2341,11 +2357,20 @@ listGroupsResponseFromV1 x = ListGroupsResponse } data ListOffsetsRequest = ListOffsetsRequest - { replicaId :: {-# UNPACK #-} !Int32 + { replicaId :: {-# UNPACK #-} !Int32 -- ^ The broker ID of the requestor, or -1 if this request is being made by -- a normal consumer. - , topics :: !(KaArray ListOffsetsTopic) + , topics :: !(KaArray ListOffsetsTopic) -- ^ Each topic in the request. + , isolationLevel :: {-# UNPACK #-} !Int8 + -- ^ This setting controls the visibility of transactional records. Using + -- READ_UNCOMMITTED (isolation_level = 0) makes all records visible. With + -- READ_COMMITTED (isolation_level = 1), non-transactional and COMMITTED + -- transactional records are visible. To be more concrete, READ_COMMITTED + -- returns all data from offsets smaller than the current LSO (last stable + -- offset), and enables the inclusion of the list of aborted transactions + -- in the result, which allows consumers to discard ABORTED transactional + -- records } deriving (Show, Eq, Generic) instance Serializable ListOffsetsRequest @@ -2359,20 +2384,38 @@ listOffsetsRequestToV1 x = ListOffsetsRequestV1 { replicaId = x.replicaId , topics = fmap listOffsetsTopicToV1 x.topics } +listOffsetsRequestToV2 :: ListOffsetsRequest -> ListOffsetsRequestV2 +listOffsetsRequestToV2 x = ListOffsetsRequestV2 + { replicaId = x.replicaId + , isolationLevel = x.isolationLevel + , topics = fmap listOffsetsTopicToV2 x.topics + } listOffsetsRequestFromV0 :: ListOffsetsRequestV0 -> ListOffsetsRequest listOffsetsRequestFromV0 x = ListOffsetsRequest { replicaId = x.replicaId , topics = fmap listOffsetsTopicFromV0 x.topics + , isolationLevel = 0 } listOffsetsRequestFromV1 :: ListOffsetsRequestV1 -> ListOffsetsRequest listOffsetsRequestFromV1 x = ListOffsetsRequest { replicaId = x.replicaId , topics = fmap listOffsetsTopicFromV1 x.topics + , isolationLevel = 0 + } +listOffsetsRequestFromV2 :: ListOffsetsRequestV2 -> ListOffsetsRequest +listOffsetsRequestFromV2 x = ListOffsetsRequest + { replicaId = x.replicaId + , topics = fmap listOffsetsTopicFromV2 x.topics + , isolationLevel = x.isolationLevel } -newtype ListOffsetsResponse = ListOffsetsResponse - { topics :: (KaArray ListOffsetsTopicResponse) +data ListOffsetsResponse = ListOffsetsResponse + { topics :: !(KaArray ListOffsetsTopicResponse) + -- ^ Each topic in the response. + , throttleTimeMs :: {-# UNPACK #-} !Int32 + -- ^ The duration in milliseconds for which the request was throttled due + -- to a quota violation, or zero if the request did not violate any quota. } deriving (Show, Eq, Generic) instance Serializable ListOffsetsResponse @@ -2384,14 +2427,26 @@ listOffsetsResponseToV1 :: ListOffsetsResponse -> ListOffsetsResponseV1 listOffsetsResponseToV1 x = ListOffsetsResponseV1 { topics = fmap listOffsetsTopicResponseToV1 x.topics } +listOffsetsResponseToV2 :: ListOffsetsResponse -> ListOffsetsResponseV2 +listOffsetsResponseToV2 x = ListOffsetsResponseV2 + { throttleTimeMs = x.throttleTimeMs + , topics = fmap listOffsetsTopicResponseToV2 x.topics + } listOffsetsResponseFromV0 :: ListOffsetsResponseV0 -> ListOffsetsResponse listOffsetsResponseFromV0 x = ListOffsetsResponse { topics = fmap listOffsetsTopicResponseFromV0 x.topics + , throttleTimeMs = 0 } listOffsetsResponseFromV1 :: ListOffsetsResponseV1 -> ListOffsetsResponse listOffsetsResponseFromV1 x = ListOffsetsResponse { topics = fmap listOffsetsTopicResponseFromV1 x.topics + , throttleTimeMs = 0 + } +listOffsetsResponseFromV2 :: ListOffsetsResponseV2 -> ListOffsetsResponse +listOffsetsResponseFromV2 x = ListOffsetsResponse + { topics = fmap listOffsetsTopicResponseFromV2 x.topics + , throttleTimeMs = x.throttleTimeMs } data MetadataRequest = MetadataRequest diff --git a/script/kafka_gen.py b/script/kafka_gen.py index 330597d87..4fe0506f7 100755 --- a/script/kafka_gen.py +++ b/script/kafka_gen.py @@ -114,7 +114,7 @@ def get_field_default(field_type, default=None): "Fetch": (0, 4), "OffsetFetch": (0, 3), "OffsetCommit": (0, 3), - "ListOffsets": (0, 1), + "ListOffsets": (0, 2), "SaslHandshake": (0, 1), "JoinGroup": (0, 2), "SyncGroup": (0, 1),