Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

memberlist: Use separate queue for CAS messages and only wait for CAS messages queue to be empty when stopping #539

Merged
merged 7 commits into from
Jul 17, 2024

Conversation

pstibrany
Copy link
Member

What this PR does:

This PR changes memberlist client to send out CAS updates faster, and to reduce chance of dropping CAS updates when memberlist is stopping.

  • memberlist client now keeps two separate queues for messages that should be broadcast. One queue for messages that were created as a result of local CAS operation, other queue for remaining messages, created as a result of gossip updates.
  • CAS messages are prioritized when gossiping messages to other nodes
  • When memberlist client is stopping:
    • as before, no more CAS messages can be enqueued. However other messages can be still enqueued and are forwarded.
    • stopping method will wait only for CAS queue to be empty.
  • Wait timeout on shutdown is now configurable, and can be disabled completely. Previously it was hardcoded to 10 seconds.

Using 2 separate queues avoids optimization performed by the queue, when subsequent updates affecting the same "change" (eg. instances in case of instance ring) can be merged together. However CAS updates typically modify different parts of the key (ie. different instances in the instance ring) than incoming gossip messages, so this doesn't look like a problem.

Checklist

  • Tests updated
  • CHANGELOG.md updated - the order of entries should be [CHANGE], [FEATURE], [ENHANCEMENT], [BUGFIX]

Make wait timeout on shutdown configurable.
@@ -1163,32 +1193,25 @@ func (m *KV) processValueUpdate(workerCh <-chan valueUpdate, key string) {
}
}

func (m *KV) queueBroadcast(key string, content []string, version uint, message []byte) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

inlined into broadcastNewValue

@@ -718,7 +718,7 @@ func testMultipleClientsWithConfigGenerator(t *testing.T, members int, configGen

startTime := time.Now()
firstKv := clients[0]
ctx, cancel := context.WithTimeout(context.Background(), casInterval*3/2) // Watch for 1.5 cas intervals.
ctx, cancel := context.WithTimeout(context.Background(), casInterval*3) // Watch for 3x cas intervals.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is unrelated, only done to avoid test flakiness.

@seizethedave seizethedave self-requested a review July 15, 2024 15:38
Copy link
Contributor

@seizethedave seizethedave left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. A nice improvement.

kv/memberlist/memberlist_client.go Outdated Show resolved Hide resolved
@@ -1335,7 +1358,7 @@ func (m *KV) MergeRemoteState(data []byte, _ bool) {
level.Error(m.logger).Log("msg", "failed to store received value", "key", kvPair.Key, "err", err)
} else if newver > 0 {
m.notifyWatchers(kvPair.Key)
m.broadcastNewValue(kvPair.Key, change, newver, codec)
m.broadcastNewValue(kvPair.Key, change, newver, codec, false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These messages we're persisting to the local KV that come from external broadcasts are non-CAS by definition? (Only the local mutations are called CAS?)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct. Currently only CAS operation can modify KV store. There is also Delete operation in KV client interface, but memberlist implementation doesn't support it yet. But we should implement it eventually, so perhaps it would be better to call it "local updates", instead of "cas updates". WDYT? I think I'll rename it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good! CAS is an implementation detail and doesn't really explain that it's a local mod.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've done this change in 979c031. I've also updated memberlist_client_messages_in_broadcast_queue metric to report values for both queues individually.

Split messages_in_broadcast_queue metric into two values, one for each queue.
@seizethedave seizethedave self-requested a review July 17, 2024 15:44
Copy link
Contributor

@seizethedave seizethedave left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, LGTM.

@pstibrany pstibrany enabled auto-merge (squash) July 17, 2024 16:03
@pstibrany pstibrany merged commit f54a6bf into main Jul 17, 2024
6 checks passed
@pstibrany pstibrany deleted the memberlist-cas-queue branch July 17, 2024 16:12
@pstibrany pstibrany mentioned this pull request Jul 18, 2024
2 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants