Skip to content

Commit

Permalink
kafka workarounds: make the size of replicaNodes equal to the replica…
Browse files Browse the repository at this point in the history
…tionFactor (#1798)
  • Loading branch information
4eUeP authored Apr 23, 2024
1 parent 4c38ac0 commit ee9e1b3
Showing 1 changed file with 5 additions and 1 deletion.
6 changes: 5 additions & 1 deletion hstream-kafka/HStream/Kafka/Server/Handler/Basic.hs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import Data.Functor ((<&>))
import Data.Int (Int32)
import qualified Data.List as L
import qualified Data.Map as Map
import Data.Maybe (fromJust)
import qualified Data.Set as S
import Data.Text (Text)
import qualified Data.Text as Text
Expand Down Expand Up @@ -222,11 +223,14 @@ handleMetadata ctx reqCtx req = do
<> " is too large, it should be less than "
<> Log.build (maxBound :: Int32)
let (theNodeId :: Int32) = fromIntegral (A.serverNodeId theNode)
streamId = S.transToTopicStreamName topicName
-- The logReplicationFactor should not be Nothing, so we use fromJust here.
repfac <- fromJust . S.attrValue . S.logReplicationFactor <$> S.getStreamLogAttrs ctx.scLDClient streamId
pure $ K.MetadataResponsePartition
{ errorCode = K.NONE
, partitionIndex = (fromIntegral idx)
, leaderId = theNodeId
, replicaNodes = K.KaArray $ Just (V.singleton theNodeId) -- FIXME: what should it be?
, replicaNodes = K.NonNullKaArray $ (V.replicate repfac theNodeId) -- FIXME: what should it be?
, isrNodes = K.KaArray $ Just (V.singleton theNodeId) -- FIXME: what should it be?
, offlineReplicas = K.KaArray $ Just V.empty -- TODO
}
Expand Down

0 comments on commit ee9e1b3

Please sign in to comment.