Skip to content

Commit

Permalink
Fix data race when moving a stream (#5880)
Browse files Browse the repository at this point in the history
There was a data race that triggered in
`TestJetStreamSuperClusterMoveCancel` due to the addition of JetStream
asset version metadata.

All JS API requests receive user-provided
`StreamConfig`/`ConsumerConfig`, which ensures that any mutations done
in `setStaticStreamMetadata` for example is safe.

However, `jsLeaderServerStreamMoveRequest` and
`jsLeaderServerStreamCancelMoveRequest` would use the `sa.Config` from
the stream assignment and call into `jsClusteredStreamUpdateRequest`.
This resulted in a reference to the same `Metadata` map being available
and modified.

To ensure any changes made to config defaults or metadata doesn't result
in a data race, a `StreamConfig.clone` method is added.


[data-race-TestJetStreamSuperClusterMoveCancel.txt](https://github.com/user-attachments/files/16968281/data-race-TestJetStreamSuperClusterMoveCancel.txt)


Signed-off-by: Maurice van Veen <[email protected]>

---------

Signed-off-by: Maurice van Veen <[email protected]>
  • Loading branch information
MauriceVanVeen authored Sep 11, 2024
1 parent 789a216 commit 93d173c
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 2 deletions.
4 changes: 2 additions & 2 deletions server/jetstream_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -2493,7 +2493,7 @@ func (s *Server) jsLeaderServerStreamMoveRequest(sub *subscription, c *client, _
if ok {
sa, ok := streams[streamName]
if ok {
cfg = *sa.Config
cfg = *sa.Config.clone()
streamFound = true
currPeers = sa.Group.Peers
currCluster = sa.Group.Cluster
Expand Down Expand Up @@ -2635,7 +2635,7 @@ func (s *Server) jsLeaderServerStreamCancelMoveRequest(sub *subscription, c *cli
if ok {
sa, ok := streams[streamName]
if ok {
cfg = *sa.Config
cfg = *sa.Config.clone()
streamFound = true
currPeers = sa.Group.Peers
}
Expand Down
35 changes: 35 additions & 0 deletions server/jetstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24329,3 +24329,38 @@ func TestJetStreamRateLimitHighStreamIngestDefaults(t *testing.T) {
require_Equal(t, stream.msgs.mlen, streamDefaultMaxQueueMsgs)
require_Equal(t, stream.msgs.msz, streamDefaultMaxQueueBytes)
}

func TestJetStreamStreamConfigClone(t *testing.T) {
cfg := &StreamConfig{
Name: "name",
Placement: &Placement{Cluster: "placement", Tags: []string{"tag"}},
Mirror: &StreamSource{Name: "mirror"},
Sources: []*StreamSource{&StreamSource{Name: "source"}},
SubjectTransform: &SubjectTransformConfig{Source: "source", Destination: "dest"},
RePublish: &RePublish{Source: "source", Destination: "dest", HeadersOnly: false},
Metadata: make(map[string]string),
}

// Copy should be complete.
clone := cfg.clone()
require_True(t, reflect.DeepEqual(cfg, clone))

// Changing fields should not update the original.
clone.Placement.Cluster = "diff"
require_False(t, reflect.DeepEqual(cfg.Placement, clone.Placement))

clone.Mirror.Name = "diff"
require_False(t, reflect.DeepEqual(cfg.Mirror, clone.Mirror))

clone.Sources[0].Name = "diff"
require_False(t, reflect.DeepEqual(cfg.Sources, clone.Sources))

clone.SubjectTransform.Source = "diff"
require_False(t, reflect.DeepEqual(cfg.SubjectTransform, clone.SubjectTransform))

clone.RePublish.Source = "diff"
require_False(t, reflect.DeepEqual(cfg.RePublish, clone.RePublish))

clone.Metadata["key"] = "value"
require_False(t, reflect.DeepEqual(cfg.Metadata, clone.Metadata))
}
36 changes: 36 additions & 0 deletions server/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,42 @@ type StreamConfig struct {
Metadata map[string]string `json:"metadata,omitempty"`
}

// clone performs a deep copy of the StreamConfig struct, returning a new clone with
// all values copied.
func (cfg *StreamConfig) clone() *StreamConfig {
clone := *cfg
if cfg.Placement != nil {
placement := *cfg.Placement
clone.Placement = &placement
}
if cfg.Mirror != nil {
mirror := *cfg.Mirror
clone.Mirror = &mirror
}
if len(cfg.Sources) > 0 {
clone.Sources = make([]*StreamSource, len(cfg.Sources))
for i, cfgSource := range cfg.Sources {
source := *cfgSource
clone.Sources[i] = &source
}
}
if cfg.SubjectTransform != nil {
transform := *cfg.SubjectTransform
clone.SubjectTransform = &transform
}
if cfg.RePublish != nil {
rePublish := *cfg.RePublish
clone.RePublish = &rePublish
}
if cfg.Metadata != nil {
clone.Metadata = make(map[string]string, len(cfg.Metadata))
for k, v := range cfg.Metadata {
clone.Metadata[k] = v
}
}
return &clone
}

type StreamConsumerLimits struct {
InactiveThreshold time.Duration `json:"inactive_threshold,omitempty"`
MaxAckPending int `json:"max_ack_pending,omitempty"`
Expand Down

0 comments on commit 93d173c

Please sign in to comment.