Skip to content

Commit 681a4cb

Browse files
committed
[YUNIKORN-1709] Add event streaming logic (apache#533)
Closes: apache#533 Signed-off-by: Peter Bacsko <[email protected]>
1 parent bce75ba commit 681a4cb

10 files changed

+830
-4
lines changed

pkg/events/event_ringbuffer.go

+27-1
Original file line numberDiff line numberDiff line change
@@ -70,14 +70,40 @@ func (e *eventRingBuffer) Add(event *si.EventRecord) {
7070
e.id++
7171
}
7272

73-
// GetEventsFromID returns "count" number of event records from "id" if possible. The id can be determined from
73+
// GetRecentEvents returns the most recent "count" elements from the ring buffer.
74+
// It is allowed for "count" to be larger than the number of elements.
75+
func (e *eventRingBuffer) GetRecentEvents(count uint64) []*si.EventRecord {
76+
e.RLock()
77+
defer e.RUnlock()
78+
79+
lastID := e.getLastEventID()
80+
var startID uint64
81+
if lastID < count {
82+
startID = 0
83+
} else {
84+
startID = lastID - count + 1
85+
}
86+
87+
history, _, _ := e.getEventsFromID(startID, count)
88+
return history
89+
}
90+
91+
// GetEventsFromID returns "count" number of event records from id if possible. The id can be determined from
7492
// the first call of the method - if it returns nothing because the id is not in the buffer, the lowest valid
7593
// identifier is returned which can be used to get the first batch.
7694
// If the caller does not want to pose limit on the number of events returned, "count" must be set to a high
7795
// value, e.g. math.MaxUint64.
7896
func (e *eventRingBuffer) GetEventsFromID(id uint64, count uint64) ([]*si.EventRecord, uint64, uint64) {
7997
e.RLock()
8098
defer e.RUnlock()
99+
100+
return e.getEventsFromID(id, count)
101+
}
102+
103+
// getEventsFromID unlocked version of GetEventsFromID
104+
func (e *eventRingBuffer) getEventsFromID(id uint64, count uint64) ([]*si.EventRecord, uint64, uint64) {
105+
e.RLock()
106+
defer e.RUnlock()
81107
lowest := e.getLowestID()
82108

83109
pos, idFound := e.id2pos(id)

pkg/events/event_ringbuffer_test.go

+33
Original file line numberDiff line numberDiff line change
@@ -277,6 +277,39 @@ func TestResize(t *testing.T) {
277277
assert.Equal(t, uint64(7), ringBuffer.resizeOffset)
278278
}
279279

280+
func TestGetRecentEvents(t *testing.T) {
281+
// empty
282+
buffer := newEventRingBuffer(10)
283+
records := buffer.GetRecentEvents(5)
284+
assert.Equal(t, 0, len(records))
285+
286+
populate(buffer, 5)
287+
288+
// count < elements
289+
records = buffer.GetRecentEvents(2)
290+
assert.Equal(t, 2, len(records))
291+
assert.Equal(t, int64(3), records[0].TimestampNano)
292+
assert.Equal(t, int64(4), records[1].TimestampNano)
293+
294+
// count = elements
295+
records = buffer.GetRecentEvents(5)
296+
assert.Equal(t, 5, len(records))
297+
assert.Equal(t, int64(0), records[0].TimestampNano)
298+
assert.Equal(t, int64(1), records[1].TimestampNano)
299+
assert.Equal(t, int64(2), records[2].TimestampNano)
300+
assert.Equal(t, int64(3), records[3].TimestampNano)
301+
assert.Equal(t, int64(4), records[4].TimestampNano)
302+
303+
// count > elements
304+
records = buffer.GetRecentEvents(15)
305+
assert.Equal(t, 5, len(records))
306+
assert.Equal(t, int64(0), records[0].TimestampNano)
307+
assert.Equal(t, int64(1), records[1].TimestampNano)
308+
assert.Equal(t, int64(2), records[2].TimestampNano)
309+
assert.Equal(t, int64(3), records[3].TimestampNano)
310+
assert.Equal(t, int64(4), records[4].TimestampNano)
311+
}
312+
280313
func populate(buffer *eventRingBuffer, count int) {
281314
for i := 0; i < count; i++ {
282315
buffer.Add(&si.EventRecord{

pkg/events/event_streaming.go

+179
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
/*
2+
Licensed to the Apache Software Foundation (ASF) under one
3+
or more contributor license agreements. See the NOTICE file
4+
distributed with this work for additional information
5+
regarding copyright ownership. The ASF licenses this file
6+
to you under the Apache License, Version 2.0 (the
7+
"License"); you may not use this file except in compliance
8+
with the License. You may obtain a copy of the License at
9+
10+
http://www.apache.org/licenses/LICENSE-2.0
11+
12+
Unless required by applicable law or agreed to in writing, software
13+
distributed under the License is distributed on an "AS IS" BASIS,
14+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
See the License for the specific language governing permissions and
16+
limitations under the License.
17+
*/
18+
19+
package events
20+
21+
import (
22+
"sync"
23+
"time"
24+
25+
"go.uber.org/zap"
26+
27+
"github.com/apache/yunikorn-core/pkg/log"
28+
"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
29+
)
30+
31+
const defaultChannelBufSize = 1000
32+
33+
// EventStreaming implements the event streaming logic.
34+
// New events are immediately forwarded to all active consumers.
35+
type EventStreaming struct {
36+
buffer *eventRingBuffer
37+
stopCh chan struct{}
38+
eventStreams map[*EventStream]eventConsumerDetails
39+
sync.Mutex
40+
}
41+
42+
type eventConsumerDetails struct {
43+
local chan *si.EventRecord
44+
consumer chan<- *si.EventRecord
45+
stopCh chan struct{}
46+
name string
47+
createdAt time.Time
48+
}
49+
50+
// EventStream handle type returned to the client that wants to capture the stream of events.
51+
type EventStream struct {
52+
Events <-chan *si.EventRecord
53+
}
54+
55+
// PublishEvent publishes an event to all event stream consumers.
56+
//
57+
// The streaming logic uses bridging to ensure proper ordering of existing and new events.
58+
// Events are sent to the "local" channel from where it is forwarded to the "consumer" channel.
59+
//
60+
// If "local" is full, it means that the consumer side has not processed the events at an appropriate pace.
61+
// Such a consumer is removed and the related channels are closed.
62+
func (e *EventStreaming) PublishEvent(event *si.EventRecord) {
63+
e.Lock()
64+
defer e.Unlock()
65+
66+
for consumer, details := range e.eventStreams {
67+
if len(details.local) == defaultChannelBufSize {
68+
log.Log(log.Events).Warn("Listener buffer full due to potentially slow consumer, removing it")
69+
e.removeEventStream(consumer)
70+
continue
71+
}
72+
73+
details.local <- event
74+
}
75+
}
76+
77+
// CreateEventStream sets up event streaming for a consumer. The returned EventStream object
78+
// contains a channel that can be used for reading.
79+
//
80+
// When a consumer is finished, it must call RemoveEventStream to free up resources.
81+
//
82+
// Consumers have an arbitrary name for logging purposes. The "count" parameter defines the number
83+
// of maximum historical events from the ring buffer. "0" is a valid value and means no past events.
84+
func (e *EventStreaming) CreateEventStream(name string, count uint64) *EventStream {
85+
consumer := make(chan *si.EventRecord, defaultChannelBufSize)
86+
stream := &EventStream{
87+
Events: consumer,
88+
}
89+
local := make(chan *si.EventRecord, defaultChannelBufSize)
90+
stop := make(chan struct{})
91+
e.createEventStreamInternal(stream, local, consumer, stop, name, count)
92+
history := e.buffer.GetRecentEvents(count)
93+
94+
go func(consumer chan<- *si.EventRecord, local <-chan *si.EventRecord, stop <-chan struct{}) {
95+
// Store the refs of historical events; it's possible that some events are added to the
96+
// ring buffer and also to "local" channel.
97+
// It is because we use two separate locks, so event updates are not atomic.
98+
// Example: an event has been just added to the ring buffer (before createEventStreamInternal()),
99+
// and execution is about to enter PublishEvent(); at this point we have an updated "eventStreams"
100+
// map, so "local" will also contain the new event.
101+
seen := make(map[*si.EventRecord]bool)
102+
for _, event := range history {
103+
consumer <- event
104+
seen[event] = true
105+
}
106+
for {
107+
select {
108+
case <-e.stopCh:
109+
close(consumer)
110+
return
111+
case <-stop:
112+
close(consumer)
113+
return
114+
case event := <-local:
115+
if seen[event] {
116+
continue
117+
}
118+
// since events are processed in a single goroutine, doubling is no longer
119+
// possible at this point
120+
seen = make(map[*si.EventRecord]bool)
121+
consumer <- event
122+
}
123+
}
124+
}(consumer, local, stop)
125+
126+
log.Log(log.Events).Info("Created event stream", zap.String("consumer name", name))
127+
return stream
128+
}
129+
130+
func (e *EventStreaming) createEventStreamInternal(stream *EventStream,
131+
local chan *si.EventRecord,
132+
consumer chan *si.EventRecord,
133+
stop chan struct{},
134+
name string,
135+
count uint64) {
136+
// stuff that needs locking
137+
e.Lock()
138+
defer e.Unlock()
139+
140+
e.eventStreams[stream] = eventConsumerDetails{
141+
local: local,
142+
consumer: consumer,
143+
stopCh: stop,
144+
name: name,
145+
createdAt: time.Now(),
146+
}
147+
}
148+
149+
// RemoveEventStream stops the streaming for a given consumer. Must be called to avoid resource leaks.
150+
func (e *EventStreaming) RemoveEventStream(consumer *EventStream) {
151+
e.Lock()
152+
defer e.Unlock()
153+
154+
e.removeEventStream(consumer)
155+
}
156+
157+
func (e *EventStreaming) removeEventStream(consumer *EventStream) {
158+
if details, ok := e.eventStreams[consumer]; ok {
159+
log.Log(log.Events).Info("Removing event stream consumer", zap.String("name", details.name),
160+
zap.Time("creation time", details.createdAt))
161+
close(details.stopCh)
162+
close(details.local)
163+
delete(e.eventStreams, consumer)
164+
}
165+
}
166+
167+
// Close stops event streaming completely.
168+
func (e *EventStreaming) Close() {
169+
close(e.stopCh)
170+
}
171+
172+
// NewEventStreaming creates a new event streaming infrastructure.
173+
func NewEventStreaming(eventBuffer *eventRingBuffer) *EventStreaming {
174+
return &EventStreaming{
175+
buffer: eventBuffer,
176+
stopCh: make(chan struct{}),
177+
eventStreams: make(map[*EventStream]eventConsumerDetails),
178+
}
179+
}

0 commit comments

Comments
 (0)