Skip to content

Commit

Permalink
Kafka: a standalone storage package (#1821)
Browse files Browse the repository at this point in the history
  • Loading branch information
4eUeP authored May 27, 2024
1 parent 577dd18 commit 1f351c8
Show file tree
Hide file tree
Showing 22 changed files with 172 additions and 47 deletions.
2 changes: 1 addition & 1 deletion hstream-kafka/HStream/Kafka/Common/FetchManager.hs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import Foreign.ForeignPtr (newForeignPtr_)
import Foreign.Ptr (nullPtr)

import qualified HStream.Kafka.Common.RecordFormat as K
import qualified HStream.Store as S
import qualified Kafka.Storage as S

data FetchLogContext = FetchLogContext
{ expectedOffset :: Int64
Expand Down
3 changes: 1 addition & 2 deletions hstream-kafka/HStream/Kafka/Common/OffsetManager.hs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ import GHC.Stack (HasCallStack)

import HStream.Kafka.Common.Read
import HStream.Kafka.Common.RecordFormat
import qualified HStream.Store as S
import qualified HStream.Store.Internal.LogDevice as S
import qualified Kafka.Storage as S

-------------------------------------------------------------------------------

Expand Down
3 changes: 1 addition & 2 deletions hstream-kafka/HStream/Kafka/Common/Read.hs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,8 @@ import GHC.Stack (HasCallStack)

import HStream.Kafka.Common.RecordFormat
import qualified HStream.Logger as Log
import qualified HStream.Store as S
import qualified HStream.Store.Internal.LogDevice as S
import qualified Kafka.Protocol.Encoding as K
import qualified Kafka.Storage as S

readOneRecord
:: HasCallStack
Expand Down
2 changes: 1 addition & 1 deletion hstream-kafka/HStream/Kafka/Common/RecordFormat.hs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ import Data.Int
import GHC.Generics (Generic)

import qualified HStream.Logger as Log
import qualified HStream.Store as S
import qualified Kafka.Protocol.Encoding as K
import qualified Kafka.Storage as S

-- | Record is the smallest unit of data in HStream Kafka.
--
Expand Down
2 changes: 1 addition & 1 deletion hstream-kafka/HStream/Kafka/Group/GroupCoordinator.hs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ import qualified HStream.Kafka.Group.Group as G
import qualified HStream.Kafka.Group.GroupOffsetManager as GOM
import qualified HStream.Logger as Log
import qualified HStream.MetaStore.Types as Meta
import HStream.Store (LDClient)
import qualified Kafka.Protocol.Error as K
import Kafka.Storage (LDClient)

data GroupCoordinator = GroupCoordinator
{ groups :: C.MVar (Utils.HashTable T.Text Group)
Expand Down
17 changes: 8 additions & 9 deletions hstream-kafka/HStream/Kafka/Group/GroupOffsetManager.hs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import Data.IORef (IORef, modifyIORef',
import qualified Data.Map.Strict as Map
import Data.Maybe (fromMaybe)
import Data.Set (Set)
import qualified Data.Set as S
import qualified Data.Set as Set
import qualified Data.Text as T
import qualified Data.Vector as V
import Data.Word (Word64)
Expand All @@ -32,15 +32,14 @@ import qualified HStream.Kafka.Common.Metrics as M
import HStream.Kafka.Group.OffsetsStore (OffsetStorage (..),
mkCkpOffsetStorage)
import qualified HStream.Logger as Log
import qualified HStream.Store as LD
import qualified HStream.Store as S
import qualified Kafka.Protocol as K
import Kafka.Protocol.Encoding (KaArray (KaArray, unKaArray))
import qualified Kafka.Protocol.Error as K
import Kafka.Protocol.Message (OffsetCommitRequestPartition (..),
OffsetCommitResponsePartition (..),
OffsetFetchResponsePartition (..),
OffsetFetchResponseTopic (..))
import qualified Kafka.Storage as S

-- NOTE: All operations on the GroupMetadataManager are not concurrency-safe,
-- and the caller needs to ensure concurrency-safety on its own.
Expand Down Expand Up @@ -73,7 +72,7 @@ loadOffsetsFromStorage GroupOffsetManager{..} = do
start <- getTime Monotonic
tpOffsets <- Map.map fromIntegral <$> loadOffsets offsetStorage groupName
let totalPartitions = length tpOffsets
logIds = S.fromList $ Map.keys tpOffsets
logIds = Set.fromList $ Map.keys tpOffsets
topicNumRef <- newIORef 0
tps <- getTopicPartitions logIds [] topicNumRef
totalTopicNum <- readIORef topicNumRef
Expand All @@ -95,10 +94,10 @@ loadOffsetsFromStorage GroupOffsetManager{..} = do
where
getTopicPartitions :: Set S.C_LogID -> [[(TopicPartition, S.C_LogID)]] -> IORef Int -> IO ([(TopicPartition, S.C_LogID)])
getTopicPartitions lgs res topicNum
| S.null lgs = return $ concat res
| Set.null lgs = return $ concat res
| otherwise = do
let lgId = S.elemAt 0 lgs
LD.logIdHasGroup ldClient lgId >>= \case
let lgId = Set.elemAt 0 lgs
S.logIdHasGroup ldClient lgId >>= \case
True -> do
(streamId, _) <- S.getStreamIdFromLogId ldClient lgId
modifyIORef' topicNum (+1)
Expand All @@ -107,11 +106,11 @@ loadOffsetsFromStorage GroupOffsetManager{..} = do
tpWithLogId = zipWith (\(_, logId) idx -> (mkTopicPartition topicName idx, logId)) partitions ([0..])
res' = tpWithLogId : res
-- remove partition ids from lgs because they all have same streamId
lgs' = lgs S.\\ S.fromList (map snd partitions)
lgs' = lgs Set.\\ Set.fromList (map snd partitions)
getTopicPartitions lgs' res' topicNum
False -> do
Log.warning $ "get log group from log id failed, skip this log id:" <> Log.build lgId
getTopicPartitions (S.delete lgId lgs) res topicNum
getTopicPartitions (Set.delete lgId lgs) res topicNum

storeOffsets
:: GroupOffsetManager
Expand Down
2 changes: 1 addition & 1 deletion hstream-kafka/HStream/Kafka/Group/OffsetsStore.hs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ import HStream.Base.Timer (CompactedWorker, startCompactedWorker,
stopCompactedWorker,
triggerCompactedWorker)
import qualified HStream.Logger as Log
import qualified HStream.Store as S
import HStream.Utils (textToCBytes)
import qualified Kafka.Storage as S

type LogID = Word64
type LSN = Word64
Expand Down
7 changes: 3 additions & 4 deletions hstream-kafka/HStream/Kafka/Server/Config/FromCli.hs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@ import Z.Data.CBytes (CBytes)

import HStream.Kafka.Server.Config.Types
import qualified HStream.Logger as Log
import HStream.Store (Compression (..))
import HStream.Store.Logger (LDLogLevel (..))
import qualified Kafka.Storage as S

-------------------------------------------------------------------------------

Expand Down Expand Up @@ -249,7 +248,7 @@ seedNodesParser = strOption
<> metavar "ADDRESS"
<> help "host:port pairs of seed nodes, separated by commas (,)"

storeCompressionParser :: O.Parser Compression
storeCompressionParser :: O.Parser S.Compression
storeCompressionParser = option auto
$ long "store-compression"
<> metavar "none | lz4 | lz4hc"
Expand All @@ -271,7 +270,7 @@ logFlushImmediatelyParser = O.switch
$ long "log-flush-immediately"
<> help "Flush immediately after logging, this may help debugging"

ldLogLevelParser :: O.Parser LDLogLevel
ldLogLevelParser :: O.Parser S.LDLogLevel
ldLogLevelParser = option auto
$ long "store-log-level"
<> metavar "[critical|error|warning|notify|info|debug|spew]"
Expand Down
4 changes: 2 additions & 2 deletions hstream-kafka/HStream/Kafka/Server/Config/FromJson.hs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import Text.Read (readEither)

import qualified HStream.Kafka.Server.Config.KafkaConfig as KC
import HStream.Kafka.Server.Config.Types
import HStream.Store (Compression (..))
import qualified Kafka.Storage as S

-------------------------------------------------------------------------------

Expand Down Expand Up @@ -61,7 +61,7 @@ parseJSONToOptions CliOptions{..} obj = do
let !_serverGossipAddress = fromMaybe _advertisedAddress (cliServerGossipAddress <|> nodeGossipAddress)

let !_metaStore = fromMaybe nodeMetaStore cliMetaStore
let !_compression = fromMaybe CompressionNone cliStoreCompression
let !_compression = fromMaybe S.CompressionNone cliStoreCompression

let !_serverLogLevel = fromMaybe (readWithErrLog "log-level" nodeLogLevel) cliServerLogLevel
let !_serverLogWithColor = nodeLogWithColor || cliServerLogWithColor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ import qualified Data.Vector as V
import qualified HStream.Kafka.Common.Utils as K
import qualified HStream.Kafka.Server.Config.KafkaConfig as KC
import HStream.Kafka.Server.Handler.Topic (validateTopicName)
import qualified HStream.Store as S
import qualified HStream.Utils as Utils
import qualified Kafka.Protocol as K
import qualified Kafka.Protocol.Error as K
import qualified Kafka.Storage as S

data KafkaConfigManager
= KafkaConfigManager
Expand Down
15 changes: 7 additions & 8 deletions hstream-kafka/HStream/Kafka/Server/Config/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,16 @@ import Data.Text (Text)
import qualified Data.Text as Text
import qualified Data.Vector as V
import Data.Word
import HStream.Gossip (GossipOpts (..),
defaultGossipOpts)
import qualified Options.Applicative as O
import qualified Z.Data.CBytes as CBytes
import Z.Data.CBytes (CBytes)

import HStream.Gossip (GossipOpts (..),
defaultGossipOpts)
import qualified HStream.Kafka.Server.Config.KafkaConfig as KC
import qualified HStream.Logger as Log
import qualified HStream.Server.HStreamInternal as SAI
import HStream.Store (Compression (..))
import HStream.Store.Logger (LDLogLevel)
import qualified Kafka.Storage as S

-------------------------------------------------------------------------------

Expand Down Expand Up @@ -83,8 +82,8 @@ data ServerOpts = ServerOpts

-- Store Options
, _storage :: !StorageOptions
, _compression :: !Compression
, _ldLogLevel :: !LDLogLevel
, _compression :: !S.Compression
, _ldLogLevel :: !S.LDLogLevel
, _ldConfigPath :: !CBytes

, experimentalFeatures :: ![ExperimentalFeature]
Expand Down Expand Up @@ -132,9 +131,9 @@ data CliOptions = CliOptions

-- * Store config
, cliStoreConfigPath :: !CBytes
, cliLdLogLevel :: !(Maybe LDLogLevel)
, cliLdLogLevel :: !(Maybe S.LDLogLevel)
-- ** Internal Store options
, cliStoreCompression :: !(Maybe Compression)
, cliStoreCompression :: !(Maybe S.Compression)

-- SASL Authentication
, cliEnableSaslAuth :: !Bool
Expand Down
2 changes: 1 addition & 1 deletion hstream-kafka/HStream/Kafka/Server/Core/Topic.hs
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ import HStream.Kafka.Server.Config.Types (ServerOpts (..),
StorageOptions (..))
import HStream.Kafka.Server.Types (ServerContext (..))
import qualified HStream.Logger as Log
import qualified HStream.Store as S
import qualified HStream.Utils as Utils
import qualified Kafka.Protocol as K
import qualified Kafka.Protocol.Error as K
import qualified Kafka.Storage as S

createTopic
:: ServerContext
Expand Down
12 changes: 6 additions & 6 deletions hstream-kafka/HStream/Kafka/Server/Handler/Basic.hs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import Data.Int (Int32)
import qualified Data.List as L
import qualified Data.Map as Map
import Data.Maybe (fromJust)
import qualified Data.Set as S
import qualified Data.Set as Set
import Data.Text (Text)
import qualified Data.Text as Text
import qualified Data.Vector as V
Expand All @@ -41,12 +41,12 @@ import qualified HStream.Kafka.Server.Handler.Topic as K
import HStream.Kafka.Server.Types (ServerContext (..))
import qualified HStream.Logger as Log
import qualified HStream.Server.HStreamApi as A
import qualified HStream.Store as S
import qualified HStream.Utils as Utils
import qualified Kafka.Protocol.Encoding as K
import qualified Kafka.Protocol.Error as K
import qualified Kafka.Protocol.Message as K
import qualified Kafka.Protocol.Service as K
import qualified Kafka.Storage as S

--------------------
-- 18: ApiVersions
Expand Down Expand Up @@ -107,11 +107,11 @@ handleMetadata ctx reqCtx req = do
-- Note: authorize **DESCRIBE** for existed topics;
-- authorize **DESCRIBE** and **CREATE** for
-- unexisted topics.
let topicNames = S.fromList . V.toList $
let topicNames = Set.fromList . V.toList $
V.map (\K.MetadataRequestTopic{..} -> name) v
allStreamNames <- S.findStreams ctx.scLDClient S.StreamTypeTopic <&> S.fromList . L.map (Utils.cBytesToText . S.streamName)
let needCreate = S.toList $ topicNames S.\\ allStreamNames
alreadyExist = V.fromList . S.toList $ topicNames `S.intersection` allStreamNames
allStreamNames <- S.findStreams ctx.scLDClient S.StreamTypeTopic <&> Set.fromList . L.map (Utils.cBytesToText . S.streamName)
let needCreate = Set.toList $ topicNames Set.\\ allStreamNames
alreadyExist = V.fromList . Set.toList $ topicNames `Set.intersection` allStreamNames
kafkaBrokerConfigs = ctx.kafkaBrokerConfigs

createResp <-
Expand Down
Empty file.
2 changes: 1 addition & 1 deletion hstream-kafka/HStream/Kafka/Server/Handler/Consume.hs
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@ import HStream.Kafka.Server.Config (ServerOpts (..),
import qualified HStream.Kafka.Server.Config.KafkaConfig as KC
import HStream.Kafka.Server.Types (ServerContext (..))
import qualified HStream.Logger as Log
import qualified HStream.Store as S
import qualified HStream.Utils as U
import qualified Kafka.Protocol.Encoding as K
import qualified Kafka.Protocol.Error as K
import qualified Kafka.Protocol.Message as K
import qualified Kafka.Protocol.Service as K
import qualified Kafka.Storage as S

-------------------------------------------------------------------------------

Expand Down
2 changes: 1 addition & 1 deletion hstream-kafka/HStream/Kafka/Server/Handler/Offset.hs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ import qualified HStream.Kafka.Group.Group as G
import qualified HStream.Kafka.Group.GroupCoordinator as GC
import HStream.Kafka.Server.Types (ServerContext (..))
import qualified HStream.Logger as Log
import qualified HStream.Store as S
import qualified Kafka.Protocol as K
import qualified Kafka.Protocol.Error as K
import qualified Kafka.Protocol.Service as K
import qualified Kafka.Storage as S

--------------------
-- 2: ListOffsets
Expand Down
2 changes: 1 addition & 1 deletion hstream-kafka/HStream/Kafka/Server/Handler/Produce.hs
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ import qualified HStream.Kafka.Common.RecordFormat as K
import HStream.Kafka.Common.Resource
import HStream.Kafka.Server.Types (ServerContext (..))
import qualified HStream.Logger as Log
import qualified HStream.Store as S
import qualified HStream.Utils as U
import qualified Kafka.Protocol.Encoding as K
import qualified Kafka.Protocol.Error as K
import qualified Kafka.Protocol.Message as K
import qualified Kafka.Protocol.Service as K
import qualified Kafka.Storage as S

-- acks: (FIXME: Currently we only support -1)
-- 0: The server will not send any response(this is the only case where the
Expand Down
2 changes: 1 addition & 1 deletion hstream-kafka/HStream/Kafka/Server/Handler/Topic.hs
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,13 @@ import qualified HStream.Kafka.Common.Utils as Utils
import qualified HStream.Kafka.Server.Core.Topic as Core
import HStream.Kafka.Server.Types (ServerContext (..))
import qualified HStream.Logger as Log
import qualified HStream.Store as S
import Kafka.Protocol (NullableString)
import qualified Kafka.Protocol.Encoding as K
import Kafka.Protocol.Error (ErrorCode)
import qualified Kafka.Protocol.Error as K
import qualified Kafka.Protocol.Message as K
import qualified Kafka.Protocol.Service as K
import qualified Kafka.Storage as S

--------------------
-- 19: CreateTopics
Expand Down
2 changes: 1 addition & 1 deletion hstream-kafka/HStream/Kafka/Server/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import qualified HStream.Kafka.Server.Config.KafkaConfig as KC
import HStream.MetaStore.Types (MetaHandle (..))
import HStream.Stats (newServerStatsHolder)
import qualified HStream.Stats as Stats
import qualified HStream.Store as S
import qualified Kafka.Storage as S

data ServerContext = ServerContext
{ serverID :: !Word32
Expand Down
29 changes: 26 additions & 3 deletions hstream-kafka/hstream-kafka.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,28 @@ benchmark kafka-protocol-bench-encoding
default-language: Haskell2010
ghc-options: -threaded -rtsopts -with-rtsopts=-N

library kafka-storage
import: shared-properties
exposed-modules: Kafka.Storage
other-modules: Kafka.Storage.Logdevice
hs-source-dirs: storage
build-tool-depends: hpp:hpp >=0.6 && <0.7
build-depends:
, base >=4.11 && <5
, bytestring
, deepseq
, hstream-common-base
, hstream-store
, text
, vector

default-language: GHC2021
default-extensions:
DerivingStrategies
LambdaCase
OverloadedStrings
RecordWildCards

library
import: shared-properties
exposed-modules:
Expand Down Expand Up @@ -225,7 +247,7 @@ library
, hstream-common-stats
, hstream-gossip
, hstream-kafka:kafka-protocol
, hstream-store
, hstream-kafka:kafka-storage
, mtl
, network
, optparse-applicative
Expand Down Expand Up @@ -271,15 +293,16 @@ test-suite hstream-kafka-test
hs-source-dirs: tests
build-depends:
, aeson
, base >=4.11 && <5
, base >=4.11 && <5
, bytestring
, containers
, hspec
, hspec-expectations
, hstream-common
, hstream-common-base
, hstream-common-server
, hstream-kafka:{hstream-kafka, kafka-protocol}
, hstream-kafka
, hstream-kafka:kafka-protocol
, hstream-store
, http-client
, text
Expand Down
Loading

0 comments on commit 1f351c8

Please sign in to comment.