Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kafka: upgrade ListOffsets to version 2 #1727

Merged
merged 2 commits into from
Jan 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions hstream-kafka/HStream/Kafka/Server/Handler.hsc
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ import qualified Kafka.Protocol.Service as K
-------------------------------------------------------------------------------

#cv_handler ApiVersions, 0, 3
#cv_handler ListOffsets, 0, 2
#cv_handler Metadata, 0, 5
#cv_handler Produce, 0, 3
#cv_handler InitProducerId, 0, 0
Expand All @@ -83,6 +84,7 @@ import qualified Kafka.Protocol.Service as K
handlers :: ServerContext -> [K.ServiceHandler]
handlers sc =
[ #mk_handler ApiVersions, 0, 3
, #mk_handler ListOffsets, 0, 2
, #mk_handler Metadata, 0, 5
-- Write
, #mk_handler Produce, 0, 3
Expand All @@ -106,9 +108,6 @@ handlers sc =
, K.hd (K.RPC :: K.RPC K.HStreamKafkaV0 "createTopics") (handleCreateTopicsV0 sc)
, K.hd (K.RPC :: K.RPC K.HStreamKafkaV0 "deleteTopics") (handleDeleteTopicsV0 sc)

, K.hd (K.RPC :: K.RPC K.HStreamKafkaV0 "listOffsets") (handleListOffsetsV0 sc)
, K.hd (K.RPC :: K.RPC K.HStreamKafkaV1 "listOffsets") (handleListOffsetsV1 sc)

, K.hd (K.RPC :: K.RPC K.HStreamKafkaV0 "saslHandshake") (handleAfterAuthSaslHandshakeV0 sc)
, K.hd (K.RPC :: K.RPC K.HStreamKafkaV1 "saslHandshake") (handleAfterAuthSaslHandshakeV1 sc)

Expand Down
69 changes: 37 additions & 32 deletions hstream-kafka/HStream/Kafka/Server/Handler/Offset.hs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,8 @@

module HStream.Kafka.Server.Handler.Offset
( handleOffsetCommit

, handleOffsetFetch

, handleListOffsetsV0
, handleListOffsetsV1
, handleListOffsets
)
where

Expand All @@ -20,7 +17,7 @@ import qualified Data.Vector as V
import HStream.Kafka.Common.OffsetManager (getLatestOffset,
getOffsetByTimestamp,
getOldestOffset)
import qualified HStream.Kafka.Common.Utils as Utils
import HStream.Kafka.Common.Utils (mapKaArray)
import qualified HStream.Kafka.Group.GroupCoordinator as GC
import HStream.Kafka.Server.Types (ServerContext (..))
import qualified HStream.Store as S
Expand All @@ -37,52 +34,60 @@ pattern LatestTimestamp = (-1)
pattern EarliestTimestamp :: Int64
pattern EarliestTimestamp = (-2)

handleListOffsetsV0
:: ServerContext -> K.RequestContext -> K.ListOffsetsRequestV0 -> IO K.ListOffsetsResponseV0
handleListOffsetsV0 sc _ K.ListOffsetsRequestV0{..} = do
case K.unKaArray topics of
Nothing -> undefined
-- ^ check kafka
Just topics' -> do
res <- V.forM topics' $ \K.ListOffsetsTopicV0 {..} -> do
listOffsetTopicPartitions sc name (K.unKaArray (Utils.mapKaArray convertRequestPartition partitions))
return $ K.ListOffsetsResponseV0 {topics = K.NonNullKaArray (V.map convertTopic res)}
where convertRequestPartition p = K.ListOffsetsPartitionV1 {timestamp=p.timestamp, partitionIndex=p.partitionIndex}
convertTopic topic = K.ListOffsetsTopicResponseV0 {partitions=Utils.mapKaArray convertResponsePartition topic.partitions, name=topic.name}
convertResponsePartition p = K.ListOffsetsPartitionResponseV0
{ errorCode=0
, oldStyleOffsets=K.NonNullKaArray (V.singleton p.offset)
, partitionIndex=p.partitionIndex
}
handleListOffsets
:: ServerContext -> K.RequestContext -> K.ListOffsetsRequest -> IO K.ListOffsetsResponse
handleListOffsets sc reqCtx req
| reqCtx.apiVersion >= 2 && req.isolationLevel /= 0 = return $ mkErrResponse req
| otherwise = listOffsets sc reqCtx req
where
mkErrResponse K.ListOffsetsRequest{..} =
let topicsRsp = mapKaArray (\K.ListOffsetsTopic{..} ->
let partitionsRsp = mapKaArray (\K.ListOffsetsPartition{..} ->
K.ListOffsetsPartitionResponse
{ offset = -1
, timestamp = -1
-- ^ FIXME: read record timestamp ?
, partitionIndex = partitionIndex
, errorCode = K.INVALID_REQUEST
, oldStyleOffsets = K.KaArray Nothing
}) partitions
in K.ListOffsetsTopicResponse {partitions=partitionsRsp, name=name}
) topics
in K.ListOffsetsResponse {topics=topicsRsp, throttleTimeMs=0}

handleListOffsetsV1
:: ServerContext -> K.RequestContext -> K.ListOffsetsRequestV1 -> IO K.ListOffsetsResponseV1
handleListOffsetsV1 sc _ K.ListOffsetsRequestV1{..} = do
listOffsets
:: ServerContext -> K.RequestContext -> K.ListOffsetsRequest -> IO K.ListOffsetsResponse
listOffsets sc _ K.ListOffsetsRequest{..} = do
case K.unKaArray topics of
Nothing -> undefined
-- ^ check kafka
Just topics' -> do
res <- V.forM topics' $ \K.ListOffsetsTopicV1 {..} -> do
res <- V.forM topics' $ \K.ListOffsetsTopic {..} -> do
listOffsetTopicPartitions sc name (K.unKaArray partitions)
return $ K.ListOffsetsResponseV1 {topics = K.KaArray {unKaArray = Just res}}
return $ K.ListOffsetsResponse {topics = K.KaArray {unKaArray = Just res}, throttleTimeMs = 0}

listOffsetTopicPartitions :: ServerContext -> Text -> Maybe (Vector K.ListOffsetsPartitionV1) -> IO K.ListOffsetsTopicResponseV1
listOffsetTopicPartitions
:: ServerContext
-> Text
-> Maybe (Vector K.ListOffsetsPartition)
-> IO K.ListOffsetsTopicResponse
listOffsetTopicPartitions _ topicName Nothing = do
return $ K.ListOffsetsTopicResponseV1 {partitions = K.KaArray {unKaArray = Nothing}, name = topicName}
return $ K.ListOffsetsTopicResponse {partitions = K.KaArray {unKaArray = Nothing}, name = topicName}
listOffsetTopicPartitions ServerContext{..} topicName (Just offsetsPartitions) = do
orderedParts <- S.listStreamPartitionsOrdered scLDClient (S.transToTopicStreamName topicName)
res <- V.forM offsetsPartitions $ \K.ListOffsetsPartitionV1{..} -> do
res <- V.forM offsetsPartitions $ \K.ListOffsetsPartition{..} -> do
-- TODO: handle Nothing
let partition = orderedParts V.! (fromIntegral partitionIndex)
offset <- getOffset (snd partition) timestamp
return $ K.ListOffsetsPartitionResponseV1
return $ K.ListOffsetsPartitionResponse
{ offset = offset
, timestamp = timestamp
-- ^ FIXME: read record timestamp ?
, partitionIndex = partitionIndex
, errorCode = K.NONE
, oldStyleOffsets = K.NonNullKaArray (V.singleton offset)
}
return $ K.ListOffsetsTopicResponseV1 {partitions = K.KaArray {unKaArray = Just res}, name = topicName}
return $ K.ListOffsetsTopicResponse {partitions = K.KaArray {unKaArray = Just res}, name = topicName}
where
-- NOTE: The last offset of a partition is the offset of the upcoming
-- message, i.e. the offset of the last available message + 1.
Expand Down
46 changes: 45 additions & 1 deletion hstream-kafka/protocol/Kafka/Protocol/Message/Struct.hs
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,10 @@ data ListOffsetsTopicV1 = ListOffsetsTopicV1
} deriving (Show, Eq, Generic)
instance Serializable ListOffsetsTopicV1

type ListOffsetsPartitionV2 = ListOffsetsPartitionV1

type ListOffsetsTopicV2 = ListOffsetsTopicV1

data ListOffsetsPartitionResponseV0 = ListOffsetsPartitionResponseV0
{ partitionIndex :: {-# UNPACK #-} !Int32
-- ^ The partition index.
Expand Down Expand Up @@ -402,6 +406,10 @@ data ListOffsetsTopicResponseV1 = ListOffsetsTopicResponseV1
} deriving (Show, Eq, Generic)
instance Serializable ListOffsetsTopicResponseV1

type ListOffsetsPartitionResponseV2 = ListOffsetsPartitionResponseV1

type ListOffsetsTopicResponseV2 = ListOffsetsTopicResponseV1

newtype MetadataRequestTopicV0 = MetadataRequestTopicV0
{ name :: Text
} deriving (Show, Eq, Generic)
Expand Down Expand Up @@ -1160,6 +1168,24 @@ data ListOffsetsRequestV1 = ListOffsetsRequestV1
} deriving (Show, Eq, Generic)
instance Serializable ListOffsetsRequestV1

data ListOffsetsRequestV2 = ListOffsetsRequestV2
{ replicaId :: {-# UNPACK #-} !Int32
-- ^ The broker ID of the requestor, or -1 if this request is being made by
-- a normal consumer.
, isolationLevel :: {-# UNPACK #-} !Int8
-- ^ This setting controls the visibility of transactional records. Using
-- READ_UNCOMMITTED (isolation_level = 0) makes all records visible. With
-- READ_COMMITTED (isolation_level = 1), non-transactional and COMMITTED
-- transactional records are visible. To be more concrete, READ_COMMITTED
-- returns all data from offsets smaller than the current LSO (last stable
-- offset), and enables the inclusion of the list of aborted transactions
-- in the result, which allows consumers to discard ABORTED transactional
-- records
, topics :: !(KaArray ListOffsetsTopicV1)
-- ^ Each topic in the request.
} deriving (Show, Eq, Generic)
instance Serializable ListOffsetsRequestV2

newtype ListOffsetsResponseV0 = ListOffsetsResponseV0
{ topics :: (KaArray ListOffsetsTopicResponseV0)
} deriving (Show, Eq, Generic)
Expand All @@ -1170,6 +1196,15 @@ newtype ListOffsetsResponseV1 = ListOffsetsResponseV1
} deriving (Show, Eq, Generic)
instance Serializable ListOffsetsResponseV1

data ListOffsetsResponseV2 = ListOffsetsResponseV2
{ throttleTimeMs :: {-# UNPACK #-} !Int32
-- ^ The duration in milliseconds for which the request was throttled due
-- to a quota violation, or zero if the request did not violate any quota.
, topics :: !(KaArray ListOffsetsTopicResponseV1)
-- ^ Each topic in the response.
} deriving (Show, Eq, Generic)
instance Serializable ListOffsetsResponseV2

newtype MetadataRequestV0 = MetadataRequestV0
{ topics :: (KaArray MetadataRequestTopicV0)
} deriving (Show, Eq, Generic)
Expand Down Expand Up @@ -1795,6 +1830,7 @@ instance Service HStreamKafkaV2 where
type ServiceMethods HStreamKafkaV2 =
'[ "produce"
, "fetch"
, "listOffsets"
, "metadata"
, "offsetCommit"
, "offsetFetch"
Expand All @@ -1816,6 +1852,13 @@ instance HasMethodImpl HStreamKafkaV2 "fetch" where
type MethodInput HStreamKafkaV2 "fetch" = FetchRequestV2
type MethodOutput HStreamKafkaV2 "fetch" = FetchResponseV2

instance HasMethodImpl HStreamKafkaV2 "listOffsets" where
type MethodName HStreamKafkaV2 "listOffsets" = "listOffsets"
type MethodKey HStreamKafkaV2 "listOffsets" = 2
type MethodVersion HStreamKafkaV2 "listOffsets" = 2
type MethodInput HStreamKafkaV2 "listOffsets" = ListOffsetsRequestV2
type MethodOutput HStreamKafkaV2 "listOffsets" = ListOffsetsResponseV2

instance HasMethodImpl HStreamKafkaV2 "metadata" where
type MethodName HStreamKafkaV2 "metadata" = "metadata"
type MethodKey HStreamKafkaV2 "metadata" = 3
Expand Down Expand Up @@ -1977,7 +2020,7 @@ supportedApiVersions :: [ApiVersionV0]
supportedApiVersions =
[ ApiVersionV0 (ApiKey 0) 0 3
, ApiVersionV0 (ApiKey 1) 0 4
, ApiVersionV0 (ApiKey 2) 0 1
, ApiVersionV0 (ApiKey 2) 0 2
, ApiVersionV0 (ApiKey 3) 0 5
, ApiVersionV0 (ApiKey 8) 0 3
, ApiVersionV0 (ApiKey 9) 0 3
Expand Down Expand Up @@ -2010,6 +2053,7 @@ getHeaderVersion (ApiKey (1)) 3 = (1, 0)
getHeaderVersion (ApiKey (1)) 4 = (1, 0)
getHeaderVersion (ApiKey (2)) 0 = (1, 0)
getHeaderVersion (ApiKey (2)) 1 = (1, 0)
getHeaderVersion (ApiKey (2)) 2 = (1, 0)
getHeaderVersion (ApiKey (3)) 0 = (1, 0)
getHeaderVersion (ApiKey (3)) 1 = (1, 0)
getHeaderVersion (ApiKey (3)) 2 = (1, 0)
Expand Down
Loading