diff --git a/hstream-kafka/HStream/Kafka/Server/Handler.hsc b/hstream-kafka/HStream/Kafka/Server/Handler.hsc index 1ca43e8f7..f4512d338 100644 --- a/hstream-kafka/HStream/Kafka/Server/Handler.hsc +++ b/hstream-kafka/HStream/Kafka/Server/Handler.hsc @@ -64,6 +64,8 @@ import qualified Kafka.Protocol.Service as K #cv_handler Fetch, 0, 4 #cv_handler DescribeConfigs, 0, 0 +#cv_handler DeleteTopics, 0, 1 + #cv_handler SaslHandshake, 0, 1 #cv_handler FindCoordinator, 0, 1 @@ -94,6 +96,8 @@ handlers sc = , #mk_handler FindCoordinator, 0, 1 + , #mk_handler DeleteTopics, 0, 1 + -- Group , #mk_handler JoinGroup, 0, 2 , #mk_handler SyncGroup, 0, 1 @@ -106,7 +110,6 @@ handlers sc = , #mk_handler OffsetFetch, 0, 3 , K.hd (K.RPC :: K.RPC K.HStreamKafkaV0 "createTopics") (handleCreateTopicsV0 sc) - , K.hd (K.RPC :: K.RPC K.HStreamKafkaV0 "deleteTopics") (handleDeleteTopicsV0 sc) , K.hd (K.RPC :: K.RPC K.HStreamKafkaV0 "saslHandshake") (handleAfterAuthSaslHandshakeV0 sc) , K.hd (K.RPC :: K.RPC K.HStreamKafkaV1 "saslHandshake") (handleAfterAuthSaslHandshakeV1 sc) diff --git a/hstream-kafka/HStream/Kafka/Server/Handler/Topic.hs b/hstream-kafka/HStream/Kafka/Server/Handler/Topic.hs index d304d1fb6..2b890aed8 100644 --- a/hstream-kafka/HStream/Kafka/Server/Handler/Topic.hs +++ b/hstream-kafka/HStream/Kafka/Server/Handler/Topic.hs @@ -5,9 +5,8 @@ module HStream.Kafka.Server.Handler.Topic ( -- 19: CreateTopics handleCreateTopicsV0 - -- 20: DeleteTopics - , handleDeleteTopicsV0 + , handleDeleteTopics ) where import Control.Exception @@ -21,9 +20,7 @@ import qualified HStream.Kafka.Common.Utils as Utils import qualified HStream.Kafka.Server.Core.Topic as Core import HStream.Kafka.Server.Types (ServerContext (..)) import qualified HStream.Logger as Log -import qualified HStream.Stats as Stats import qualified HStream.Store as S -import qualified HStream.Utils as Utils import qualified Kafka.Protocol.Encoding as K import qualified Kafka.Protocol.Error as K import qualified Kafka.Protocol.Message as K @@ -64,28 +61,28 @@ handleCreateTopicsV0 ctx _ K.CreateTopicsRequestV0{..} = -- 20: DeleteTopics -------------------- -- FIXME: The `timeoutMs` field of request is omitted. -handleDeleteTopicsV0 - :: ServerContext -> K.RequestContext -> K.DeleteTopicsRequestV0 -> IO K.DeleteTopicsResponseV0 -handleDeleteTopicsV0 ServerContext{..} _ K.DeleteTopicsRequestV0{..} = +handleDeleteTopics + :: ServerContext -> K.RequestContext -> K.DeleteTopicsRequest -> IO K.DeleteTopicsResponse +handleDeleteTopics ServerContext{..} _ K.DeleteTopicsRequest{..} = case topicNames of K.KaArray Nothing -> -- FIXME: We return `[]` when topics is `Nothing`. -- Is this proper? - return $ K.DeleteTopicsResponseV0 (K.KaArray $ Just V.empty) + return $ K.DeleteTopicsResponse {responses = K.KaArray $ Just V.empty, throttleTimeMs = 0} K.KaArray (Just topicNames_) - | V.null topicNames_ -> return $ K.DeleteTopicsResponseV0 (K.KaArray $ Just V.empty) + | V.null topicNames_ -> return $ K.DeleteTopicsResponse {responses = K.KaArray $ Just V.empty, throttleTimeMs = 0} | otherwise -> do respTopics <- forM topicNames_ $ \topicName -> do try (deleteTopic topicName) >>= \case Left (e :: SomeException) | Just _ <- fromException @S.NOTFOUND e -> do Log.warning $ "Delete topic failed, topic " <> Log.build topicName <> " does not exist" - return $ K.DeletableTopicResultV0 topicName K.UNKNOWN_TOPIC_OR_PARTITION + return $ K.DeletableTopicResult topicName K.UNKNOWN_TOPIC_OR_PARTITION | otherwise -> do Log.warning $ "Exception occurs when deleting topic " <> Log.build topicName <> ": " <> Log.build (show e) - return $ K.DeletableTopicResultV0 topicName K.UNKNOWN_SERVER_ERROR + return $ K.DeletableTopicResult topicName K.UNKNOWN_SERVER_ERROR Right res -> return res - return $ K.DeleteTopicsResponseV0 (K.KaArray $ Just respTopics) + return $ K.DeleteTopicsResponse {responses = K.KaArray $ Just respTopics, throttleTimeMs = 0} where -- FIXME: There can be some potential exceptions which are difficult to -- classify using Kafka's error code. So this function may throw @@ -93,7 +90,7 @@ handleDeleteTopicsV0 ServerContext{..} _ K.DeleteTopicsRequestV0{..} = -- WARNING: This function may throw exceptions! -- -- TODO: Handle topic that has subscription (i.e. cannot be deleted) - deleteTopic :: T.Text -> IO K.DeletableTopicResultV0 + deleteTopic :: T.Text -> IO K.DeletableTopicResult deleteTopic topicName = do let streamId = S.transToTopicStreamName topicName -- delete offset caches. @@ -104,4 +101,4 @@ handleDeleteTopicsV0 ServerContext{..} _ K.DeleteTopicsRequestV0{..} = V.forM_ partitions $ \(_, logid) -> cleanOffsetCache scOffsetManager logid S.removeStream scLDClient streamId - return $ K.DeletableTopicResultV0 topicName K.NONE + return $ K.DeletableTopicResult topicName K.NONE diff --git a/hstream-kafka/protocol/Kafka/Protocol/Message/Struct.hs b/hstream-kafka/protocol/Kafka/Protocol/Message/Struct.hs index c3e89e9fa..2885ea907 100644 --- a/hstream-kafka/protocol/Kafka/Protocol/Message/Struct.hs +++ b/hstream-kafka/protocol/Kafka/Protocol/Message/Struct.hs @@ -120,6 +120,8 @@ data DeletableTopicResultV0 = DeletableTopicResultV0 } deriving (Show, Eq, Generic) instance Serializable DeletableTopicResultV0 +type DeletableTopicResultV1 = DeletableTopicResultV0 + data DescribeConfigsResourceV0 = DescribeConfigsResourceV0 { resourceType :: {-# UNPACK #-} !Int8 -- ^ The resource type. @@ -814,11 +816,22 @@ data DeleteTopicsRequestV0 = DeleteTopicsRequestV0 } deriving (Show, Eq, Generic) instance Serializable DeleteTopicsRequestV0 +type DeleteTopicsRequestV1 = DeleteTopicsRequestV0 + newtype DeleteTopicsResponseV0 = DeleteTopicsResponseV0 { responses :: (KaArray DeletableTopicResultV0) } deriving (Show, Eq, Generic) instance Serializable DeleteTopicsResponseV0 +data DeleteTopicsResponseV1 = DeleteTopicsResponseV1 + { throttleTimeMs :: {-# UNPACK #-} !Int32 + -- ^ The duration in milliseconds for which the request was throttled due + -- to a quota violation, or zero if the request did not violate any quota. + , responses :: !(KaArray DeletableTopicResultV0) + -- ^ The results for each topic we tried to delete. + } deriving (Show, Eq, Generic) +instance Serializable DeleteTopicsResponseV1 + newtype DescribeConfigsRequestV0 = DescribeConfigsRequestV0 { resources :: (KaArray DescribeConfigsResourceV0) } deriving (Show, Eq, Generic) @@ -1716,6 +1729,7 @@ instance Service HStreamKafkaV1 where , "listGroups" , "saslHandshake" , "apiVersions" + , "deleteTopics" ] instance HasMethodImpl HStreamKafkaV1 "produce" where @@ -1823,6 +1837,13 @@ instance HasMethodImpl HStreamKafkaV1 "apiVersions" where type MethodInput HStreamKafkaV1 "apiVersions" = ApiVersionsRequestV1 type MethodOutput HStreamKafkaV1 "apiVersions" = ApiVersionsResponseV1 +instance HasMethodImpl HStreamKafkaV1 "deleteTopics" where + type MethodName HStreamKafkaV1 "deleteTopics" = "deleteTopics" + type MethodKey HStreamKafkaV1 "deleteTopics" = 20 + type MethodVersion HStreamKafkaV1 "deleteTopics" = 1 + type MethodInput HStreamKafkaV1 "deleteTopics" = DeleteTopicsRequestV1 + type MethodOutput HStreamKafkaV1 "deleteTopics" = DeleteTopicsResponseV1 + data HStreamKafkaV2 instance Service HStreamKafkaV2 where @@ -2034,7 +2055,7 @@ supportedApiVersions = , ApiVersionV0 (ApiKey 17) 0 1 , ApiVersionV0 (ApiKey 18) 0 3 , ApiVersionV0 (ApiKey 19) 0 0 - , ApiVersionV0 (ApiKey 20) 0 0 + , ApiVersionV0 (ApiKey 20) 0 1 , ApiVersionV0 (ApiKey 22) 0 0 , ApiVersionV0 (ApiKey 32) 0 0 , ApiVersionV0 (ApiKey 36) 0 0 @@ -2091,6 +2112,7 @@ getHeaderVersion (ApiKey (18)) 2 = (1, 0) getHeaderVersion (ApiKey (18)) 3 = (2, 0) getHeaderVersion (ApiKey (19)) 0 = (1, 0) getHeaderVersion (ApiKey (20)) 0 = (1, 0) +getHeaderVersion (ApiKey (20)) 1 = (1, 0) getHeaderVersion (ApiKey (22)) 0 = (1, 0) getHeaderVersion (ApiKey (32)) 0 = (1, 0) getHeaderVersion (ApiKey (36)) 0 = (1, 0) diff --git a/hstream-kafka/protocol/Kafka/Protocol/Message/Total.hs b/hstream-kafka/protocol/Kafka/Protocol/Message/Total.hs index f7d8fae61..f3101542a 100644 --- a/hstream-kafka/protocol/Kafka/Protocol/Message/Total.hs +++ b/hstream-kafka/protocol/Kafka/Protocol/Message/Total.hs @@ -203,12 +203,16 @@ deletableTopicResultToV0 x = DeletableTopicResultV0 { name = x.name , errorCode = x.errorCode } +deletableTopicResultToV1 :: DeletableTopicResult -> DeletableTopicResultV1 +deletableTopicResultToV1 = deletableTopicResultToV0 deletableTopicResultFromV0 :: DeletableTopicResultV0 -> DeletableTopicResult deletableTopicResultFromV0 x = DeletableTopicResult { name = x.name , errorCode = x.errorCode } +deletableTopicResultFromV1 :: DeletableTopicResultV1 -> DeletableTopicResult +deletableTopicResultFromV1 = deletableTopicResultFromV0 data DescribeConfigsResource = DescribeConfigsResource { resourceType :: {-# UNPACK #-} !Int8 @@ -1658,15 +1662,23 @@ deleteTopicsRequestToV0 x = DeleteTopicsRequestV0 { topicNames = x.topicNames , timeoutMs = x.timeoutMs } +deleteTopicsRequestToV1 :: DeleteTopicsRequest -> DeleteTopicsRequestV1 +deleteTopicsRequestToV1 = deleteTopicsRequestToV0 deleteTopicsRequestFromV0 :: DeleteTopicsRequestV0 -> DeleteTopicsRequest deleteTopicsRequestFromV0 x = DeleteTopicsRequest { topicNames = x.topicNames , timeoutMs = x.timeoutMs } +deleteTopicsRequestFromV1 :: DeleteTopicsRequestV1 -> DeleteTopicsRequest +deleteTopicsRequestFromV1 = deleteTopicsRequestFromV0 -newtype DeleteTopicsResponse = DeleteTopicsResponse - { responses :: (KaArray DeletableTopicResult) +data DeleteTopicsResponse = DeleteTopicsResponse + { responses :: !(KaArray DeletableTopicResult) + -- ^ The results for each topic we tried to delete. + , throttleTimeMs :: {-# UNPACK #-} !Int32 + -- ^ The duration in milliseconds for which the request was throttled due + -- to a quota violation, or zero if the request did not violate any quota. } deriving (Show, Eq, Generic) instance Serializable DeleteTopicsResponse @@ -1674,10 +1686,21 @@ deleteTopicsResponseToV0 :: DeleteTopicsResponse -> DeleteTopicsResponseV0 deleteTopicsResponseToV0 x = DeleteTopicsResponseV0 { responses = fmap deletableTopicResultToV0 x.responses } +deleteTopicsResponseToV1 :: DeleteTopicsResponse -> DeleteTopicsResponseV1 +deleteTopicsResponseToV1 x = DeleteTopicsResponseV1 + { throttleTimeMs = x.throttleTimeMs + , responses = fmap deletableTopicResultToV1 x.responses + } deleteTopicsResponseFromV0 :: DeleteTopicsResponseV0 -> DeleteTopicsResponse deleteTopicsResponseFromV0 x = DeleteTopicsResponse { responses = fmap deletableTopicResultFromV0 x.responses + , throttleTimeMs = 0 + } +deleteTopicsResponseFromV1 :: DeleteTopicsResponseV1 -> DeleteTopicsResponse +deleteTopicsResponseFromV1 x = DeleteTopicsResponse + { responses = fmap deletableTopicResultFromV1 x.responses + , throttleTimeMs = x.throttleTimeMs } newtype DescribeConfigsRequest = DescribeConfigsRequest diff --git a/script/kafka_gen.py b/script/kafka_gen.py index 4fe0506f7..97ca589b6 100755 --- a/script/kafka_gen.py +++ b/script/kafka_gen.py @@ -111,6 +111,7 @@ def get_field_default(field_type, default=None): "ApiVersions": (0, 3), "Metadata": (0, 5), "Produce": (0, 3), + "DeleteTopics": (0, 1), "Fetch": (0, 4), "OffsetFetch": (0, 3), "OffsetCommit": (0, 3),