Skip to content

Commit

Permalink
kafka: upgrade DeleteTopics to version 1 (#1729)
Browse files Browse the repository at this point in the history
  • Loading branch information
YangKian authored Jan 4, 2024
1 parent a958435 commit b8e4d72
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 18 deletions.
5 changes: 4 additions & 1 deletion hstream-kafka/HStream/Kafka/Server/Handler.hsc
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ import qualified Kafka.Protocol.Service as K
#cv_handler Fetch, 0, 4
#cv_handler DescribeConfigs, 0, 0

#cv_handler DeleteTopics, 0, 1

#cv_handler SaslHandshake, 0, 1

#cv_handler FindCoordinator, 0, 1
Expand Down Expand Up @@ -94,6 +96,8 @@ handlers sc =

, #mk_handler FindCoordinator, 0, 1

, #mk_handler DeleteTopics, 0, 1

-- Group
, #mk_handler JoinGroup, 0, 2
, #mk_handler SyncGroup, 0, 1
Expand All @@ -106,7 +110,6 @@ handlers sc =
, #mk_handler OffsetFetch, 0, 3

, K.hd (K.RPC :: K.RPC K.HStreamKafkaV0 "createTopics") (handleCreateTopicsV0 sc)
, K.hd (K.RPC :: K.RPC K.HStreamKafkaV0 "deleteTopics") (handleDeleteTopicsV0 sc)

, K.hd (K.RPC :: K.RPC K.HStreamKafkaV0 "saslHandshake") (handleAfterAuthSaslHandshakeV0 sc)
, K.hd (K.RPC :: K.RPC K.HStreamKafkaV1 "saslHandshake") (handleAfterAuthSaslHandshakeV1 sc)
Expand Down
25 changes: 11 additions & 14 deletions hstream-kafka/HStream/Kafka/Server/Handler/Topic.hs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,8 @@
module HStream.Kafka.Server.Handler.Topic
( -- 19: CreateTopics
handleCreateTopicsV0

-- 20: DeleteTopics
, handleDeleteTopicsV0
, handleDeleteTopics
) where

import Control.Exception
Expand All @@ -21,9 +20,7 @@ import qualified HStream.Kafka.Common.Utils as Utils
import qualified HStream.Kafka.Server.Core.Topic as Core
import HStream.Kafka.Server.Types (ServerContext (..))
import qualified HStream.Logger as Log
import qualified HStream.Stats as Stats
import qualified HStream.Store as S
import qualified HStream.Utils as Utils
import qualified Kafka.Protocol.Encoding as K
import qualified Kafka.Protocol.Error as K
import qualified Kafka.Protocol.Message as K
Expand Down Expand Up @@ -64,36 +61,36 @@ handleCreateTopicsV0 ctx _ K.CreateTopicsRequestV0{..} =
-- 20: DeleteTopics
--------------------
-- FIXME: The `timeoutMs` field of request is omitted.
handleDeleteTopicsV0
:: ServerContext -> K.RequestContext -> K.DeleteTopicsRequestV0 -> IO K.DeleteTopicsResponseV0
handleDeleteTopicsV0 ServerContext{..} _ K.DeleteTopicsRequestV0{..} =
handleDeleteTopics
:: ServerContext -> K.RequestContext -> K.DeleteTopicsRequest -> IO K.DeleteTopicsResponse
handleDeleteTopics ServerContext{..} _ K.DeleteTopicsRequest{..} =
case topicNames of
K.KaArray Nothing ->
-- FIXME: We return `[]` when topics is `Nothing`.
-- Is this proper?
return $ K.DeleteTopicsResponseV0 (K.KaArray $ Just V.empty)
return $ K.DeleteTopicsResponse {responses = K.KaArray $ Just V.empty, throttleTimeMs = 0}
K.KaArray (Just topicNames_)
| V.null topicNames_ -> return $ K.DeleteTopicsResponseV0 (K.KaArray $ Just V.empty)
| V.null topicNames_ -> return $ K.DeleteTopicsResponse {responses = K.KaArray $ Just V.empty, throttleTimeMs = 0}
| otherwise -> do
respTopics <- forM topicNames_ $ \topicName -> do
try (deleteTopic topicName) >>= \case
Left (e :: SomeException)
| Just _ <- fromException @S.NOTFOUND e -> do
Log.warning $ "Delete topic failed, topic " <> Log.build topicName <> " does not exist"
return $ K.DeletableTopicResultV0 topicName K.UNKNOWN_TOPIC_OR_PARTITION
return $ K.DeletableTopicResult topicName K.UNKNOWN_TOPIC_OR_PARTITION
| otherwise -> do
Log.warning $ "Exception occurs when deleting topic " <> Log.build topicName <> ": " <> Log.build (show e)
return $ K.DeletableTopicResultV0 topicName K.UNKNOWN_SERVER_ERROR
return $ K.DeletableTopicResult topicName K.UNKNOWN_SERVER_ERROR
Right res -> return res
return $ K.DeleteTopicsResponseV0 (K.KaArray $ Just respTopics)
return $ K.DeleteTopicsResponse {responses = K.KaArray $ Just respTopics, throttleTimeMs = 0}
where
-- FIXME: There can be some potential exceptions which are difficult to
-- classify using Kafka's error code. So this function may throw
-- exceptions.
-- WARNING: This function may throw exceptions!
--
-- TODO: Handle topic that has subscription (i.e. cannot be deleted)
deleteTopic :: T.Text -> IO K.DeletableTopicResultV0
deleteTopic :: T.Text -> IO K.DeletableTopicResult
deleteTopic topicName = do
let streamId = S.transToTopicStreamName topicName
-- delete offset caches.
Expand All @@ -104,4 +101,4 @@ handleDeleteTopicsV0 ServerContext{..} _ K.DeleteTopicsRequestV0{..} =
V.forM_ partitions $ \(_, logid) ->
cleanOffsetCache scOffsetManager logid
S.removeStream scLDClient streamId
return $ K.DeletableTopicResultV0 topicName K.NONE
return $ K.DeletableTopicResult topicName K.NONE
24 changes: 23 additions & 1 deletion hstream-kafka/protocol/Kafka/Protocol/Message/Struct.hs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ data DeletableTopicResultV0 = DeletableTopicResultV0
} deriving (Show, Eq, Generic)
instance Serializable DeletableTopicResultV0

type DeletableTopicResultV1 = DeletableTopicResultV0

data DescribeConfigsResourceV0 = DescribeConfigsResourceV0
{ resourceType :: {-# UNPACK #-} !Int8
-- ^ The resource type.
Expand Down Expand Up @@ -814,11 +816,22 @@ data DeleteTopicsRequestV0 = DeleteTopicsRequestV0
} deriving (Show, Eq, Generic)
instance Serializable DeleteTopicsRequestV0

type DeleteTopicsRequestV1 = DeleteTopicsRequestV0

newtype DeleteTopicsResponseV0 = DeleteTopicsResponseV0
{ responses :: (KaArray DeletableTopicResultV0)
} deriving (Show, Eq, Generic)
instance Serializable DeleteTopicsResponseV0

data DeleteTopicsResponseV1 = DeleteTopicsResponseV1
{ throttleTimeMs :: {-# UNPACK #-} !Int32
-- ^ The duration in milliseconds for which the request was throttled due
-- to a quota violation, or zero if the request did not violate any quota.
, responses :: !(KaArray DeletableTopicResultV0)
-- ^ The results for each topic we tried to delete.
} deriving (Show, Eq, Generic)
instance Serializable DeleteTopicsResponseV1

newtype DescribeConfigsRequestV0 = DescribeConfigsRequestV0
{ resources :: (KaArray DescribeConfigsResourceV0)
} deriving (Show, Eq, Generic)
Expand Down Expand Up @@ -1716,6 +1729,7 @@ instance Service HStreamKafkaV1 where
, "listGroups"
, "saslHandshake"
, "apiVersions"
, "deleteTopics"
]

instance HasMethodImpl HStreamKafkaV1 "produce" where
Expand Down Expand Up @@ -1823,6 +1837,13 @@ instance HasMethodImpl HStreamKafkaV1 "apiVersions" where
type MethodInput HStreamKafkaV1 "apiVersions" = ApiVersionsRequestV1
type MethodOutput HStreamKafkaV1 "apiVersions" = ApiVersionsResponseV1

instance HasMethodImpl HStreamKafkaV1 "deleteTopics" where
type MethodName HStreamKafkaV1 "deleteTopics" = "deleteTopics"
type MethodKey HStreamKafkaV1 "deleteTopics" = 20
type MethodVersion HStreamKafkaV1 "deleteTopics" = 1
type MethodInput HStreamKafkaV1 "deleteTopics" = DeleteTopicsRequestV1
type MethodOutput HStreamKafkaV1 "deleteTopics" = DeleteTopicsResponseV1

data HStreamKafkaV2

instance Service HStreamKafkaV2 where
Expand Down Expand Up @@ -2034,7 +2055,7 @@ supportedApiVersions =
, ApiVersionV0 (ApiKey 17) 0 1
, ApiVersionV0 (ApiKey 18) 0 3
, ApiVersionV0 (ApiKey 19) 0 0
, ApiVersionV0 (ApiKey 20) 0 0
, ApiVersionV0 (ApiKey 20) 0 1
, ApiVersionV0 (ApiKey 22) 0 0
, ApiVersionV0 (ApiKey 32) 0 0
, ApiVersionV0 (ApiKey 36) 0 0
Expand Down Expand Up @@ -2091,6 +2112,7 @@ getHeaderVersion (ApiKey (18)) 2 = (1, 0)
getHeaderVersion (ApiKey (18)) 3 = (2, 0)
getHeaderVersion (ApiKey (19)) 0 = (1, 0)
getHeaderVersion (ApiKey (20)) 0 = (1, 0)
getHeaderVersion (ApiKey (20)) 1 = (1, 0)
getHeaderVersion (ApiKey (22)) 0 = (1, 0)
getHeaderVersion (ApiKey (32)) 0 = (1, 0)
getHeaderVersion (ApiKey (36)) 0 = (1, 0)
Expand Down
27 changes: 25 additions & 2 deletions hstream-kafka/protocol/Kafka/Protocol/Message/Total.hs
Original file line number Diff line number Diff line change
Expand Up @@ -203,12 +203,16 @@ deletableTopicResultToV0 x = DeletableTopicResultV0
{ name = x.name
, errorCode = x.errorCode
}
deletableTopicResultToV1 :: DeletableTopicResult -> DeletableTopicResultV1
deletableTopicResultToV1 = deletableTopicResultToV0

deletableTopicResultFromV0 :: DeletableTopicResultV0 -> DeletableTopicResult
deletableTopicResultFromV0 x = DeletableTopicResult
{ name = x.name
, errorCode = x.errorCode
}
deletableTopicResultFromV1 :: DeletableTopicResultV1 -> DeletableTopicResult
deletableTopicResultFromV1 = deletableTopicResultFromV0

data DescribeConfigsResource = DescribeConfigsResource
{ resourceType :: {-# UNPACK #-} !Int8
Expand Down Expand Up @@ -1658,26 +1662,45 @@ deleteTopicsRequestToV0 x = DeleteTopicsRequestV0
{ topicNames = x.topicNames
, timeoutMs = x.timeoutMs
}
deleteTopicsRequestToV1 :: DeleteTopicsRequest -> DeleteTopicsRequestV1
deleteTopicsRequestToV1 = deleteTopicsRequestToV0

deleteTopicsRequestFromV0 :: DeleteTopicsRequestV0 -> DeleteTopicsRequest
deleteTopicsRequestFromV0 x = DeleteTopicsRequest
{ topicNames = x.topicNames
, timeoutMs = x.timeoutMs
}
deleteTopicsRequestFromV1 :: DeleteTopicsRequestV1 -> DeleteTopicsRequest
deleteTopicsRequestFromV1 = deleteTopicsRequestFromV0

newtype DeleteTopicsResponse = DeleteTopicsResponse
{ responses :: (KaArray DeletableTopicResult)
data DeleteTopicsResponse = DeleteTopicsResponse
{ responses :: !(KaArray DeletableTopicResult)
-- ^ The results for each topic we tried to delete.
, throttleTimeMs :: {-# UNPACK #-} !Int32
-- ^ The duration in milliseconds for which the request was throttled due
-- to a quota violation, or zero if the request did not violate any quota.
} deriving (Show, Eq, Generic)
instance Serializable DeleteTopicsResponse

deleteTopicsResponseToV0 :: DeleteTopicsResponse -> DeleteTopicsResponseV0
deleteTopicsResponseToV0 x = DeleteTopicsResponseV0
{ responses = fmap deletableTopicResultToV0 x.responses
}
deleteTopicsResponseToV1 :: DeleteTopicsResponse -> DeleteTopicsResponseV1
deleteTopicsResponseToV1 x = DeleteTopicsResponseV1
{ throttleTimeMs = x.throttleTimeMs
, responses = fmap deletableTopicResultToV1 x.responses
}

deleteTopicsResponseFromV0 :: DeleteTopicsResponseV0 -> DeleteTopicsResponse
deleteTopicsResponseFromV0 x = DeleteTopicsResponse
{ responses = fmap deletableTopicResultFromV0 x.responses
, throttleTimeMs = 0
}
deleteTopicsResponseFromV1 :: DeleteTopicsResponseV1 -> DeleteTopicsResponse
deleteTopicsResponseFromV1 x = DeleteTopicsResponse
{ responses = fmap deletableTopicResultFromV1 x.responses
, throttleTimeMs = x.throttleTimeMs
}

newtype DescribeConfigsRequest = DescribeConfigsRequest
Expand Down
1 change: 1 addition & 0 deletions script/kafka_gen.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ def get_field_default(field_type, default=None):
"ApiVersions": (0, 3),
"Metadata": (0, 5),
"Produce": (0, 3),
"DeleteTopics": (0, 1),
"Fetch": (0, 4),
"OffsetFetch": (0, 3),
"OffsetCommit": (0, 3),
Expand Down

0 comments on commit b8e4d72

Please sign in to comment.