Skip to content

Commit

Permalink
fix node connected status flappging (#4587)
Browse files Browse the repository at this point in the history
## Problem Statement
Node connection status has been observed to flap between connected and
disconnected states due to race conditions in the `HeartbeatServer` and
its interaction with the priority queue for heartbeat management.

## Root Cause Analysis
The issue stems from non-atomic operations in the existing heartbeat
message
[handler](https://github.com/bacalhau-project/bacalhau/blob/44b44efed9ce84043db9be7b9365e98113b2014a/pkg/node/heartbeat/server.go#L202)
implementation:

1. Checking for an older heartbeat
2. Removing the old heartbeat
3. Enqueuing a new heartbeat

This sequence of operations is vulnerable to race conditions when
concurrent heartbeats arrive from the same node, potentially resulting
in multiple heartbeats for a single node in the queue. This result in
unexpected behaviour as the HashedPriorityQueue is expected to have a
single item in the queue for the same key/node

## Why Now?
While this bug has existed since version 1.4, it only became apparent
with the introduction of concurrent heartbeats in version 1.5. The new
version requires nodes to heartbeat to two topics:
- The old topic (supported by 1.4 orchestrators)
- A new topic (supported by 1.5 orchestrators)

As a result, 1.5 orchestrators now receive two concurrent heartbeats
from 1.5 compute nodes, exposing the race condition.

## Reproduction Steps
1. Set up a devstack environment with approximately 10 nodes
2. Observe the node connection status
3. Note the flapping between connected and disconnected states

## Solution
Instead of simply locking the `HeartbeatServer.Handle()` method, I've
implemented a more comprehensive fix to address the underlying issues:

1. Modified `HashedPriorityQueue` to enforce a single item per key
atomically within the queue
2. Introduced a `Peek` method to allow `HeartbeatServer` to examine the
oldest item without removal and without having to loop over all item
using `DequeueWhere`
3. Corrected the priority and ordering of heartbeat events in the queue

These changes eliminate the need for manual checks, dequeues, and
re-enqueues, while also improving the overall efficiency of the queue
operations.

## Implementation Details
1. `HashedPriorityQueue` Modifications:
   - Ensure atomic operations for maintaining a single item per key
- Implement version tracking for items so that enqueues remain fast,
while dequeues will lazily filter out and remove items that don't match
the latest version for the same key

2. New `Peek` Method:
- Allow examination of the oldest item without altering the queue state
- Improve efficiency of `HeartbeatServer` operations without having to
loop over all item using `DequeueWhere`

3. Heartbeat Event Prioritization:
- Adjust priority calculation to ensure oldest events are dequeued first

## Testing Conducted
- Enhanced test coverage for `HashedPriorityQueue` to ensure unique
items per key
- Improved concurrent heartbeat testing in `HeartbeatServer`
- Manual testing using devstack environments
  • Loading branch information
wdbaruni authored Oct 7, 2024
1 parent 44b44ef commit 7285a99
Show file tree
Hide file tree
Showing 7 changed files with 597 additions and 164 deletions.
137 changes: 112 additions & 25 deletions pkg/lib/collections/hashed_priority_queue.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,60 @@
package collections

import "sync"

import (
"sync"
)

// HashedPriorityQueue is a priority queue that maintains only a single item per unique key.
// It combines the functionality of a hash map and a priority queue to provide efficient
// operations with the following key characteristics:
//
// 1. Single Item Per Key: The queue maintains only the latest version of an item for each
// unique key. When a new item with an existing key is enqueued, it replaces the old item
// instead of adding a duplicate.
//
// 2. Lazy Dequeuing: Outdated items (those that have been replaced by a newer version) are
// not immediately removed from the underlying queue. Instead, they are filtered out
// during dequeue operations. This approach improves enqueue performance
// at the cost of potentially slower dequeue operations.
//
// 3. Higher Enqueue Throughput: By avoiding immediate removal of outdated items, the
// HashedPriorityQueue achieves higher enqueue throughput. This makes it particularly
// suitable for scenarios with frequent updates to existing items.
//
// 4. Eventually Consistent: The queue becomes consistent over time as outdated items are
// lazily removed during dequeue operations. This means that the queue's length and the
// items it contains become accurate as items are dequeued.
//
// 5. Memory Consideration: Due to the lazy removal of outdated items, the underlying queue
// may temporarily hold more items than there are unique keys. This trade-off allows for
// better performance but may use more memory compared to a strictly consistent queue.
//
// Use HashedPriorityQueue when you need a priority queue that efficiently handles updates
// to existing items and can tolerate some latency in removing outdated entries in favor
// of higher enqueue performance.
type HashedPriorityQueue[K comparable, T any] struct {
identifiers map[K]struct{}
queue *PriorityQueue[T]
identifiers map[K]int64
queue *PriorityQueue[versionedItem[T]]
mu sync.RWMutex
indexer IndexerFunc[K, T]
}

// versionedItem wraps the actual data item with a version number.
// This structure is used internally by HashedPriorityQueue to implement
// the versioning mechanism that allows for efficient updates and
// lazy removal of outdated items. The queue is only interested in
// the latest version of an item for each unique key:
// - data: The actual item of type T stored in the queue.
// - version: A monotonically increasing number representing the
// version of this item. When an item with the same key is enqueued,
// its version is incremented. This allows the queue to identify
// the most recent version during dequeue operations and discard
// any older versions of the same item.
type versionedItem[T any] struct {
data T
version int64
}

// IndexerFunc is used to find the key (of type K) from the provided
// item (T). This will be used for the item lookup in `Contains`
type IndexerFunc[K comparable, T any] func(item T) K
Expand All @@ -18,12 +64,24 @@ type IndexerFunc[K comparable, T any] func(item T) K
// be used on Enqueue/Dequeue to keep the index up to date.
func NewHashedPriorityQueue[K comparable, T any](indexer IndexerFunc[K, T]) *HashedPriorityQueue[K, T] {
return &HashedPriorityQueue[K, T]{
identifiers: make(map[K]struct{}),
queue: NewPriorityQueue[T](),
identifiers: make(map[K]int64),
queue: NewPriorityQueue[versionedItem[T]](),
indexer: indexer,
}
}

// isLatestVersion checks if the given item is the latest version
func (q *HashedPriorityQueue[K, T]) isLatestVersion(item versionedItem[T]) bool {
k := q.indexer(item.data)
currentVersion := q.identifiers[k]
return item.version == currentVersion
}

// unwrapQueueItem converts a versionedItem to a QueueItem
func (q *HashedPriorityQueue[K, T]) unwrapQueueItem(item *QueueItem[versionedItem[T]]) *QueueItem[T] {
return &QueueItem[T]{Value: item.Value.data, Priority: item.Priority}
}

// Contains will return true if the provided identifier (of type K)
// will be found in this queue, false if it is not present.
func (q *HashedPriorityQueue[K, T]) Contains(id K) bool {
Expand All @@ -40,9 +98,9 @@ func (q *HashedPriorityQueue[K, T]) Enqueue(data T, priority int64) {
defer q.mu.Unlock()

k := q.indexer(data)

q.identifiers[k] = struct{}{}
q.queue.Enqueue(data, priority)
version := q.identifiers[k] + 1
q.identifiers[k] = version
q.queue.Enqueue(versionedItem[T]{data: data, version: version}, priority)
}

// Dequeue returns the next highest priority item, returning both
Expand All @@ -53,16 +111,39 @@ func (q *HashedPriorityQueue[K, T]) Dequeue() *QueueItem[T] {
q.mu.Lock()
defer q.mu.Unlock()

item := q.queue.Dequeue()
if item == nil {
return nil
for {
item := q.queue.Dequeue()
if item == nil {
return nil
}

if q.isLatestVersion(item.Value) {
k := q.indexer(item.Value.data)
delete(q.identifiers, k)
return q.unwrapQueueItem(item)
}
}
}

// Peek returns the next highest priority item without removing it from the queue.
// It returns nil if the queue is empty.
func (q *HashedPriorityQueue[K, T]) Peek() *QueueItem[T] {
q.mu.RLock()
defer q.mu.RUnlock()

// Find the key for the item and delete it from the presence map
k := q.indexer(item.Value)
delete(q.identifiers, k)
for {
item := q.queue.Peek()
if item == nil {
return nil
}

return item
if q.isLatestVersion(item.Value) {
return q.unwrapQueueItem(item)
}

// If the peeked item is outdated, remove it and continue
q.queue.Dequeue()
}
}

// DequeueWhere allows the caller to iterate through the queue, in priority order, and
Expand All @@ -74,26 +155,32 @@ func (q *HashedPriorityQueue[K, T]) DequeueWhere(matcher MatchingFunction[T]) *Q
q.mu.Lock()
defer q.mu.Unlock()

item := q.queue.DequeueWhere(matcher)
if item == nil {
return nil
}
for {
item := q.queue.DequeueWhere(func(vi versionedItem[T]) bool {
return matcher(vi.data)
})

k := q.indexer(item.Value)
delete(q.identifiers, k)
if item == nil {
return nil
}

return item
if q.isLatestVersion(item.Value) {
k := q.indexer(item.Value.data)
delete(q.identifiers, k)
return q.unwrapQueueItem(item)
}
}
}

// Len returns the number of items currently in the queue
func (q *HashedPriorityQueue[K, T]) Len() int {
return q.queue.Len()
return len(q.identifiers)
}

// IsEmpty returns a boolean denoting whether the queue is
// currently empty or not.
func (q *HashedPriorityQueue[K, T]) IsEmpty() bool {
return q.queue.Len() == 0
return q.Len() == 0
}

var _ PriorityQueueInterface[struct{}] = (*HashedPriorityQueue[string, struct{}])(nil)
135 changes: 122 additions & 13 deletions pkg/lib/collections/hashed_priority_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,33 +5,142 @@ package collections_test
import (
"testing"

"github.com/bacalhau-project/bacalhau/pkg/lib/collections"
"github.com/stretchr/testify/suite"

"github.com/bacalhau-project/bacalhau/pkg/lib/collections"
)

type HashedPriorityQueueSuite struct {
suite.Suite
PriorityQueueTestSuite
}

func (s *HashedPriorityQueueSuite) SetupTest() {
s.NewQueue = func() collections.PriorityQueueInterface[TestData] {
return collections.NewHashedPriorityQueue[string, TestData](func(t TestData) string {
return t.id
})
}
}

func TestHashedPriorityQueueSuite(t *testing.T) {
suite.Run(t, new(HashedPriorityQueueSuite))
}

func (s *HashedPriorityQueueSuite) TestContains() {
type TestData struct {
id string
data int
}

indexer := func(t TestData) string {
return t.id
}

q := collections.NewHashedPriorityQueue[string, TestData](indexer)
q := s.NewQueue().(*collections.HashedPriorityQueue[string, TestData])

s.Require().False(q.Contains("A"))
q.Enqueue(TestData{id: "A", data: 0}, 1)
q.Enqueue(TestData{"A", 0}, 1)
s.Require().True(q.Contains("A"))
_ = q.Dequeue()
s.Require().False(q.Contains("A"))
}

func (s *HashedPriorityQueueSuite) TestPeek() {
q := s.NewQueue().(*collections.HashedPriorityQueue[string, TestData])

q.Enqueue(TestData{"A", 1}, 1)
q.Enqueue(TestData{"B", 2}, 2)

item := q.Peek()
s.Require().NotNil(item)
s.Require().Equal(TestData{"B", 2}, item.Value)
s.Require().True(q.Contains("A"), "Item A should still be in the queue after Peek")
s.Require().True(q.Contains("B"), "Item B should still be in the queue after Peek")

_ = q.Dequeue()
s.Require().False(q.Contains("B"), "Item B should not be in the queue after Dequeue")
s.Require().True(q.Contains("A"), "Item A should still be in the queue after Dequeue")
}

func (s *HashedPriorityQueueSuite) TestSingleItemPerKey() {
q := s.NewQueue().(*collections.HashedPriorityQueue[string, TestData])

q.Enqueue(TestData{"A", 1}, 1)
q.Enqueue(TestData{"A", 2}, 2)
q.Enqueue(TestData{"A", 3}, 3)

s.Require().Equal(1, q.Len(), "Queue should only contain one item for key 'A'")

item := q.Dequeue()
s.Require().NotNil(item)
s.Require().Equal(TestData{"A", 3}, item.Value, "Should return the latest version of item 'A'")
s.Require().Equal(int64(3), item.Priority, "Should have the priority of the latest enqueue")

s.Require().Nil(q.Dequeue(), "Queue should be empty after dequeuing the single item")
}

func (s *HashedPriorityQueueSuite) TestPeekReturnsLatestVersion() {
q := s.NewQueue().(*collections.HashedPriorityQueue[string, TestData])

q.Enqueue(TestData{"A", 1}, 1)
q.Enqueue(TestData{"B", 1}, 3)
q.Enqueue(TestData{"A", 2}, 2)

item := q.Peek()
s.Require().NotNil(item)
s.Require().Equal(TestData{"B", 1}, item.Value, "Peek should return 'B' as it has the highest priority")
s.Require().Equal(int64(3), item.Priority)

q.Enqueue(TestData{"B", 2}, 1) // Lower priority, but newer version

item = q.Peek()
s.Require().NotNil(item)
s.Require().Equal(TestData{"A", 2}, item.Value, "Peek should now return 'A' as 'B' has lower priority")
s.Require().Equal(int64(2), item.Priority)
}

func (s *HashedPriorityQueueSuite) TestDequeueWhereReturnsLatestVersion() {
q := s.NewQueue().(*collections.HashedPriorityQueue[string, TestData])

q.Enqueue(TestData{"A", 1}, 1)
q.Enqueue(TestData{"B", 1}, 2)
q.Enqueue(TestData{"A", 2}, 3)

item := q.DequeueWhere(func(td TestData) bool {
return td.id == "A"
})

s.Require().NotNil(item)
s.Require().Equal(TestData{"A", 2}, item.Value, "DequeueWhere should return the latest version of 'A'")
s.Require().Equal(int64(3), item.Priority)

s.Require().False(q.Contains("A"), "A should no longer be in the queue")
s.Require().True(q.Contains("B"), "B should still be in the queue")
}

func (s *HashedPriorityQueueSuite) TestDuplicateKeys() {
inputs := []struct {
v TestData
p int64
}{
{TestData{"A", 1}, 3},
{TestData{"B", 2}, 2},
{TestData{"A", 3}, 1}, // Duplicate key with lower priority
{TestData{"C", 4}, 4},
{TestData{"B", 5}, 5}, // Duplicate key with higher priority
}

pq := s.NewQueue()
for _, tc := range inputs {
pq.Enqueue(tc.v, tc.p)
}

expected := []struct {
v TestData
p int64
}{
{TestData{"B", 5}, 5},
{TestData{"C", 4}, 4},
{TestData{"A", 3}, 1},
}

for _, exp := range expected {
qitem := pq.Dequeue()
s.Require().NotNil(qitem)
s.Require().Equal(exp.v, qitem.Value)
s.Require().Equal(exp.p, qitem.Priority)
}

s.Require().True(pq.IsEmpty())
}
20 changes: 20 additions & 0 deletions pkg/lib/collections/priority_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ type PriorityQueueInterface[T any] interface {
// extra PriorityQueue) for the dequeued items.
DequeueWhere(matcher MatchingFunction[T]) *QueueItem[T]

// Peek returns the next highest priority item without removing it from the queue.
// It returns nil if the queue is empty.
Peek() *QueueItem[T]

// Len returns the number of items currently in the queue
Len() int

Expand Down Expand Up @@ -117,6 +121,22 @@ func (pq *PriorityQueue[T]) dequeue() *QueueItem[T] {
return &QueueItem[T]{Value: item, Priority: heapItem.priority}
}

// Peek returns the next highest priority item without removing it from the queue.
// It returns nil if the queue is empty.
func (pq *PriorityQueue[T]) Peek() *QueueItem[T] {
pq.mu.Lock()
defer pq.mu.Unlock()

if pq.IsEmpty() {
return nil
}

heapItem := pq.internalQueue[0]
item, _ := heapItem.value.(T)

return &QueueItem[T]{Value: item, Priority: heapItem.priority}
}

// DequeueWhere allows the caller to iterate through the queue, in priority order, and
// attempt to match an item using the provided `MatchingFunction`. This method has a high
// time cost as dequeued but non-matching items must be held and requeued once the process
Expand Down
Loading

0 comments on commit 7285a99

Please sign in to comment.