Skip to content

Commit

Permalink
kafka: validate topic name in listTopicConfigs (#1793)
Browse files Browse the repository at this point in the history
* validate topic name in listTopicConfigs

* update exception msg for createPartitions
  • Loading branch information
YangKian authored Apr 18, 2024
1 parent 3786d78 commit 093b7f3
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 37 deletions.
57 changes: 23 additions & 34 deletions hstream-kafka/HStream/Kafka/Server/Config/KafkaConfigManager.hs
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE PatternGuards #-}

module HStream.Kafka.Server.Config.KafkaConfigManager where
import qualified Data.Aeson as J
import Data.Bifunctor (Bifunctor (bimap))
import qualified Data.Map as Map
import Data.Maybe (fromMaybe)
import Data.Maybe (fromJust, fromMaybe)
import qualified Data.Set as Set
import qualified Data.Text as T
import qualified Data.Vector as V
import qualified HStream.Kafka.Common.Utils as K
import qualified HStream.Kafka.Server.Config.KafkaConfig as KC
import HStream.Kafka.Server.Handler.Topic (validateTopicName)
import qualified HStream.Store as S
import qualified HStream.Utils as Utils
import qualified Kafka.Protocol as K
Expand All @@ -27,42 +29,29 @@ mkKafkaConfigManager ldClient kafkaBrokerConfigs =
return $ KafkaConfigManager {..}

listTopicConfigs :: KafkaConfigManager -> T.Text -> K.KaArray T.Text -> IO K.DescribeConfigsResult
listTopicConfigs KafkaConfigManager{..} topic keys = do
let streamId = S.transToTopicStreamName topic
S.doesStreamExist ldClient streamId >>= \case
False -> pure $ K.DescribeConfigsResult
{ configs=K.NonNullKaArray V.empty
, errorCode=K.UNKNOWN_TOPIC_OR_PARTITION
, resourceName=topic
, errorMessage=Just "topic not found"
, resourceType=fromIntegral . fromEnum $ KC.TOPIC
}
True -> do
configs <- S.getStreamExtraAttrs ldClient streamId
let keys' = fromMaybe (V.fromList $ Map.keys KC.allTopicConfigs) (K.unKaArray keys)
configs' = convertConfigs configs
case V.mapM (getConfig configs') keys' of
Left msg -> return $ getErrorResponse KC.TOPIC topic K.INVALID_CONFIG msg
Right configsInResp -> return $ K.DescribeConfigsResult
{ configs=K.NonNullKaArray configsInResp
, errorCode=0
, resourceName=topic
, errorMessage=Nothing
, resourceType=fromIntegral . fromEnum $ KC.TOPIC
}
listTopicConfigs KafkaConfigManager{..} topic keys
| Left (code, msg) <- validateTopicName topic = return $ getErrorResponse KC.TOPIC topic code (fromJust msg)
| otherwise = do
let streamId = S.transToTopicStreamName topic
S.doesStreamExist ldClient streamId >>= \case
False -> return $ getErrorResponse KC.TOPIC topic K.UNKNOWN_TOPIC_OR_PARTITION "topic not found"
True -> do
configs <- S.getStreamExtraAttrs ldClient streamId
let keys' = fromMaybe (V.fromList $ Map.keys KC.allTopicConfigs) (K.unKaArray keys)
configs' = convertConfigs configs
case V.mapM (getConfig configs') keys' of
Left msg -> return $ getErrorResponse KC.TOPIC topic K.INVALID_CONFIG msg
Right configsInResp -> return $ K.DescribeConfigsResult
{ configs=K.NonNullKaArray configsInResp
, errorCode=0
, resourceName=topic
, errorMessage=Nothing
, resourceType=fromIntegral . fromEnum $ KC.TOPIC
}
where
convertConfigs = Map.fromList . map (bimap Utils.cBytesToText (J.decode . Utils.cBytesToLazyByteString)) . Map.toList
getConfigByInstance :: KC.KafkaConfigInstance -> K.DescribeConfigsResourceResult
getConfigByInstance (KC.KafkaConfigInstance cfg) =
K.DescribeConfigsResourceResult
{ isSensitive=KC.isSentitive cfg
, isDefault=KC.isDefaultValue cfg
, readOnly=KC.readOnly cfg
, name=KC.name cfg
, value=KC.value cfg
}
getConfig :: Map.Map T.Text (Maybe T.Text) -> T.Text -> Either T.Text K.DescribeConfigsResourceResult
getConfig configs configName = getConfigByInstance <$> KC.getTopicConfig configName configs
getConfig configs configName = getResultFromInstance <$> KC.getTopicConfig configName configs

getErrorResponse :: KC.KafkaConfigResource
-> T.Text
Expand Down
6 changes: 3 additions & 3 deletions hstream-kafka/HStream/Kafka/Server/Core/Topic.hs
Original file line number Diff line number Diff line change
Expand Up @@ -119,16 +119,16 @@ createPartitions ServerContext{..} topicName newPartitionCnt ass timeoutMs valid
case res of
Left (e :: SomeException)
| Just (_ :: S.NOTFOUND) <- fromException e ->
return . Left $ (K.UNKNOWN_TOPIC_OR_PARTITION, "The topic " <> topicName <> " does not exist")
return . Left $ (K.UNKNOWN_TOPIC_OR_PARTITION, "The topic '" <> topicName <> "' does not exist.")
| otherwise -> do
Log.fatal $ "getTotalPartitionCount for topic " <> Log.build topicName <> " failed: " <> Log.build (displayException e)
return . Left $ (K.UNKNOWN_SERVER_ERROR, T.pack $ displayException e)
Right oldPartitions
| newPartitionCnt - oldPartitions < 0 -> do
let msg = "Topic currently has " <> show oldPartitions <> " partitions, which is higher than the requested " <> show newPartitionCnt
let msg = "Topic currently has " <> show oldPartitions <> " partitions, which is higher than the requested " <> show newPartitionCnt <> "."
return . Left $ (K.INVALID_PARTITIONS, T.pack msg)
| newPartitionCnt == oldPartitions ->
return . Left $ (K.INVALID_PARTITIONS, "Topic already has " <> T.pack (show oldPartitions) <> " partitions")
return . Left $ (K.INVALID_PARTITIONS, "Topic already has " <> T.pack (show oldPartitions) <> " partitions.")
| otherwise -> return . Right $ newPartitionCnt - oldPartitions

doCreate streamId cnt
Expand Down

0 comments on commit 093b7f3

Please sign in to comment.