Skip to content

Commit

Permalink
feat: add trigger config (#388)
Browse files Browse the repository at this point in the history
* feat: add trigger config

Signed-off-by: xdlbdy <[email protected]>

* feat: add trigger config

Signed-off-by: xdlbdy <[email protected]>

* feat: add trigger config

Signed-off-by: xdlbdy <[email protected]>

* feat: add trigger config

Signed-off-by: xdlbdy <[email protected]>

Signed-off-by: xdlbdy <[email protected]>
  • Loading branch information
xdlbdy authored Jan 5, 2023
1 parent 303e88a commit 1395ee8
Show file tree
Hide file tree
Showing 9 changed files with 151 additions and 91 deletions.
10 changes: 9 additions & 1 deletion internal/trigger/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,15 @@ type Config struct {
ControllerAddr []string `yaml:"controllers"`
Observability observability.Config `yaml:"observability"`

HeartbeatInterval time.Duration
HeartbeatInterval time.Duration `yaml:"heartbeat_interval"`
// send event goroutine size
SendEventGoroutineSize int `yaml:"send_event_goroutine_size"`
// push event batch size when use grpc
SendEventBatchSize int `yaml:"send_event_batch_size"`
// var client read event from segment batch size.
PullEventBatchSize int `yaml:"pull_event_batch_size"`
// max uack event number
MaxUACKEventNumber int `yaml:"max_uack_event_number"`
}

func InitConfig(filename string) (*Config, error) {
Expand Down
76 changes: 48 additions & 28 deletions internal/trigger/offset/offset.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,63 +15,89 @@
package offset

import (
"math"
"sync"

"github.com/huandu/skiplist"
"github.com/linkall-labs/vanus/internal/primitive/info"
"github.com/linkall-labs/vanus/internal/primitive/vanus"
)

func NewSubscriptionOffset(id vanus.ID) *SubscriptionOffset {
return &SubscriptionOffset{
func NewSubscriptionOffset(id vanus.ID, maxUACKNumber int, initOffsets info.ListOffsetInfo) *SubscriptionOffset {
sub := &SubscriptionOffset{
subscriptionID: id,
cond: sync.NewCond(&sync.Mutex{}),
maxUACKNumber: maxUACKNumber,
elOffsets: make(map[vanus.ID]*offsetTracker, len(initOffsets)),
}
for _, offset := range initOffsets {
sub.elOffsets[offset.EventLogID] = initOffset(offset.Offset)
}
return sub
}

type SubscriptionOffset struct {
subscriptionID vanus.ID
elOffset sync.Map
cond *sync.Cond
maxUACKNumber int
uACKNumber int
elOffsets map[vanus.ID]*offsetTracker
closed bool
}

func (offset *SubscriptionOffset) Clear() {
offset.elOffset.Range(func(key, value interface{}) bool {
offset.elOffset.Delete(key)
return true
})
func (offset *SubscriptionOffset) Close() {
offset.cond.L.Lock()
defer offset.cond.L.Unlock()
offset.closed = true
offset.cond.Broadcast()
}

func (offset *SubscriptionOffset) EventReceive(info info.OffsetInfo) {
o, exist := offset.elOffset.Load(info.EventLogID)
offset.cond.L.Lock()
defer offset.cond.L.Unlock()
for offset.uACKNumber >= offset.maxUACKNumber && !offset.closed {
offset.cond.Wait()
}
if offset.closed {
return
}
offset.uACKNumber++
tracker, exist := offset.elOffsets[info.EventLogID]
if !exist {
o, _ = offset.elOffset.LoadOrStore(info.EventLogID, initOffset(info.Offset))
tracker = initOffset(info.Offset)
offset.elOffsets[info.EventLogID] = tracker
}
o.(*offsetTracker).putOffset(info.Offset)
tracker.putOffset(info.Offset)
}

func (offset *SubscriptionOffset) EventCommit(info info.OffsetInfo) {
o, exist := offset.elOffset.Load(info.EventLogID)
offset.cond.L.Lock()
defer offset.cond.L.Unlock()
if offset.closed {
return
}
tracker, exist := offset.elOffsets[info.EventLogID]
if !exist {
return
}
o.(*offsetTracker).commitOffset(info.Offset)
offset.uACKNumber--
offset.cond.Signal()
tracker.commitOffset(info.Offset)
}

func (offset *SubscriptionOffset) GetCommit() info.ListOffsetInfo {
offset.cond.L.Lock()
defer offset.cond.L.Unlock()
var commit info.ListOffsetInfo
offset.elOffset.Range(func(key, value interface{}) bool {
tracker, _ := value.(*offsetTracker)
for id, tracker := range offset.elOffsets {
commit = append(commit, info.OffsetInfo{
EventLogID: key.(vanus.ID),
EventLogID: id,
Offset: tracker.offsetToCommit(),
})
return true
})
}
return commit
}

type offsetTracker struct {
mutex sync.Mutex
maxOffset uint64
initOffset uint64
list *skiplist.SkipList
Expand All @@ -80,7 +106,7 @@ type offsetTracker struct {
func initOffset(initOffset uint64) *offsetTracker {
return &offsetTracker{
initOffset: initOffset,
maxOffset: math.MaxUint64,
maxOffset: initOffset,
list: skiplist.New(skiplist.GreaterThanFunc(func(lhs, rhs interface{}) int {
v1, _ := lhs.(uint64)
v2, _ := rhs.(uint64)
Expand All @@ -95,23 +121,17 @@ func initOffset(initOffset uint64) *offsetTracker {
}

func (o *offsetTracker) putOffset(offset uint64) {
o.mutex.Lock()
defer o.mutex.Unlock()
o.list.Set(offset, offset)
o.maxOffset, _ = o.list.Back().Key().(uint64)
}

func (o *offsetTracker) commitOffset(offset uint64) {
o.mutex.Lock()
defer o.mutex.Unlock()
o.list.Remove(offset)
}

func (o *offsetTracker) offsetToCommit() uint64 {
o.mutex.Lock()
defer o.mutex.Unlock()
if o.list.Len() == 0 {
if o.maxOffset == math.MaxUint64 {
if o.maxOffset == o.initOffset {
return o.initOffset
}
return o.maxOffset + 1
Expand Down
9 changes: 5 additions & 4 deletions internal/trigger/offset/offset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ import (
func TestSubscriptionOffset(t *testing.T) {
Convey("subscription offset", t, func() {
eventLogID := vanus.NewTestID()
subOffset := NewSubscriptionOffset(vanus.NewTestID())
Convey("commit with no receive", func() {
Convey("commit with no exist eventlog", func() {
subOffset := NewSubscriptionOffset(vanus.NewTestID(), 100, info.ListOffsetInfo{})
offsetBegin := uint64(1)
commitEnd := offsetBegin + 10
for offset := offsetBegin; offset < commitEnd; offset++ {
Expand All @@ -40,6 +40,7 @@ func TestSubscriptionOffset(t *testing.T) {
So(0, ShouldEqual, len(commits))
})
Convey("commit with receive", func() {
subOffset := NewSubscriptionOffset(vanus.NewTestID(), 100, info.ListOffsetInfo{})
offsetBegin := uint64(1)
offsetEnd := uint64(100)
var wg sync.WaitGroup
Expand Down Expand Up @@ -88,9 +89,9 @@ func TestSubscriptionOffset(t *testing.T) {
commits = subOffset.GetCommit()
So(1, ShouldEqual, len(commits))
So(offsetEnd, ShouldEqual, commits[0].Offset)
subOffset.Clear()
subOffset.Close()
commits = subOffset.GetCommit()
So(0, ShouldEqual, len(commits))
So(1, ShouldEqual, len(commits))
})
})
}
Expand Down
13 changes: 8 additions & 5 deletions internal/trigger/reader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type Config struct {
SubscriptionID vanus.ID
SubscriptionIDStr string
Offset EventLogOffset
BatchSize int

CheckEventLogInterval time.Duration
}
Expand Down Expand Up @@ -255,7 +256,7 @@ func (elReader *eventLogReader) run(ctx context.Context) {
}

func (elReader *eventLogReader) readEvent(ctx context.Context, lr api.BusReader) error {
events, err := readEvents(ctx, lr, elReader.policy)
events, err := readEvents(ctx, lr)
if err != nil {
return err
}
Expand All @@ -272,13 +273,14 @@ func (elReader *eventLogReader) readEvent(ctx context.Context, lr api.BusReader)
return err
}
elReader.offset = offset
elReader.policy.Forward(1)
}
elReader.policy.Forward(len(events))
metrics.TriggerPullEventCounter.WithLabelValues(
elReader.config.SubscriptionIDStr, elReader.config.EventBusName, elReader.eventLogIDStr).
Add(float64(len(events)))
return nil
}

func (elReader *eventLogReader) putEvent(ctx context.Context, event info.EventRecord) error {
select {
case elReader.events <- event:
Expand All @@ -288,14 +290,15 @@ func (elReader *eventLogReader) putEvent(ctx context.Context, event info.EventRe
}
}

func readEvents(ctx context.Context, lr api.BusReader, p api.ReadPolicy) ([]*ce.Event, error) {
func readEvents(ctx context.Context, lr api.BusReader) ([]*ce.Event, error) {
timeout, cancel := context.WithTimeout(ctx, readEventTimeout)
defer cancel()
events, _, _, err := lr.Read(timeout, option.WithReadPolicy(p), option.WithBatchSize(int(readSize)))
events, _, _, err := lr.Read(timeout)
return events, err
}

func (elReader *eventLogReader) init(ctx context.Context) (api.BusReader, error) {
lr := elReader.config.Client.Eventbus(ctx, elReader.config.EventBusName).Reader()
lr := elReader.config.Client.Eventbus(ctx, elReader.config.EventBusName).Reader(
option.WithReadPolicy(elReader.policy), option.WithBatchSize(elReader.config.BatchSize))
return lr, nil
}
6 changes: 3 additions & 3 deletions internal/trigger/reader/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestReaderStart(t *testing.T) {
mockBusReader := api.NewMockBusReader(mockCtrl)
mockClient.EXPECT().Eventbus(Any(), Any()).AnyTimes().Return(mockEventbus)
mockEventbus.EXPECT().Writer().AnyTimes().Return(mockBusWriter)
mockEventbus.EXPECT().Reader(Any()).AnyTimes().Return(mockBusReader)
mockEventbus.EXPECT().Reader(Any(), Any()).AnyTimes().Return(mockBusReader)
mockEventbus.EXPECT().GetLog(Any(), Any()).AnyTimes().Return(mockEventlog, nil)
mockEventbus.EXPECT().ListLog(Any()).AnyTimes().Return([]api.Eventlog{mockEventlog}, nil)
mockEventlog.EXPECT().ID().AnyTimes().Return(uint64(0))
Expand All @@ -51,7 +51,7 @@ func TestReaderStart(t *testing.T) {
index := uint64(offset)
mockEventlog.EXPECT().LatestOffset(Any()).AnyTimes().Return(offset, nil)
mockEventlog.EXPECT().EarliestOffset(Any()).AnyTimes().Return(offset, nil)
mockBusReader.EXPECT().Read(Any(), Any(), Any()).AnyTimes().DoAndReturn(
mockBusReader.EXPECT().Read(Any()).AnyTimes().DoAndReturn(
func(ctx context.Context, opts ...api.ReadOption) ([]*ce.Event, int64, uint64, error) {
time.Sleep(time.Millisecond)
e := ce.NewEvent()
Expand All @@ -63,7 +63,7 @@ func TestReaderStart(t *testing.T) {
return []*ce.Event{&e}, int64(0), uint64(0), nil
})
eventCh := make(chan info.EventRecord, 100)
r := NewReader(Config{EventBusName: "test"}, eventCh).(*reader)
r := NewReader(Config{EventBusName: "test", BatchSize: 1}, eventCh).(*reader)
r.config.Client = mockClient
r.Start()
var wg sync.WaitGroup
Expand Down
68 changes: 48 additions & 20 deletions internal/trigger/trigger/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,15 @@ import (
)

const (
defaultBufferSize = 1 << 10
defaultFilterProcessSize = 2
defaultDeliveryTimeout = 5 * time.Second
defaultMaxWriteAttempt = 3
defaultGoroutineSize = 10000
defaultBatchSize = 32
defaultBufferSize = 1 << 10
defaultDeliveryTimeout = 5 * time.Second
defaultMaxWriteAttempt = 3
defaultGoroutineSize = 10000
defaultMaxUACKNumber = 10000
defaultBatchSize = 32
)

type Config struct {
FilterProcessSize int
BufferSize int
MaxRetryAttempts int32
DeliveryTimeout time.Duration
Expand All @@ -43,34 +42,28 @@ type Config struct {
Ordered bool

GoroutineSize int
BatchSize int
SendBatchSize int
PullBatchSize int
MaxUACKNumber int
}

func defaultConfig() Config {
c := Config{
FilterProcessSize: defaultFilterProcessSize,
BufferSize: defaultBufferSize,
MaxRetryAttempts: primitive.MaxRetryAttempts,
DeliveryTimeout: defaultDeliveryTimeout,
DeadLetterEventbus: primitive.DeadLetterEventbusName,
MaxWriteAttempt: defaultMaxWriteAttempt,
GoroutineSize: defaultGoroutineSize,
BatchSize: defaultBatchSize,
SendBatchSize: defaultBatchSize,
MaxUACKNumber: defaultMaxUACKNumber,
PullBatchSize: defaultBatchSize,
}
return c
}

type Option func(t *trigger)

func WithFilterProcessSize(size int) Option {
return func(t *trigger) {
if size <= 0 {
return
}
t.config.FilterProcessSize = size
}
}

func WithBufferSize(size int) Option {
return func(t *trigger) {
if size <= 0 {
Expand Down Expand Up @@ -102,7 +95,6 @@ func WithDeliveryTimeout(timeout uint32) Option {
func WithOrdered(ordered bool) Option {
return func(t *trigger) {
t.config.Ordered = ordered
t.config.FilterProcessSize = 1
}
}

Expand Down Expand Up @@ -131,3 +123,39 @@ func WithDeadLetterEventbus(eventbus string) Option {
t.config.DeadLetterEventbus = eventbus
}
}

func WithGoroutineSize(size int) Option {
return func(t *trigger) {
if size <= 0 {
return
}
t.config.GoroutineSize = size
}
}

func WithSendBatchSize(batchSize int) Option {
return func(t *trigger) {
if batchSize <= 0 {
return
}
t.config.SendBatchSize = batchSize
}
}

func WithPullBatchSize(batchSize int) Option {
return func(t *trigger) {
if batchSize <= 0 {
return
}
t.config.PullBatchSize = batchSize
}
}

func WithMaxUACKNumber(maxUACKNumber int) Option {
return func(t *trigger) {
if maxUACKNumber <= 0 {
return
}
t.config.MaxUACKNumber = maxUACKNumber
}
}
Loading

0 comments on commit 1395ee8

Please sign in to comment.