diff --git a/.codecov.yml b/.codecov.yml new file mode 100644 index 000000000..e36154785 --- /dev/null +++ b/.codecov.yml @@ -0,0 +1,25 @@ +codecov: + require_ci_to_pass: yes + +coverage: + precision: 2 + round: down + range: "70...100" + +parsers: + gcov: + branch_detection: + conditional: yes + loop: yes + method: no + macro: no + +ignore: + - "**/mock*.go" + - "**/fake*.go" + - "**/fake.go" + +comment: + layout: "reach,diff,flags,files,footer" + behavior: default + require_changes: no diff --git a/internal/controller/trigger/info/info.go b/internal/controller/trigger/info/info.go index 331b41862..f2f659404 100644 --- a/internal/controller/trigger/info/info.go +++ b/internal/controller/trigger/info/info.go @@ -31,7 +31,7 @@ const ( ) type TriggerWorkerInfo struct { - Id string `json:"-"` + ID string `json:"-"` Addr string `json:"addr"` Phase TriggerWorkerPhase `json:"phase"` AssignSubIds map[vanus.ID]time.Time `json:"-"` @@ -43,7 +43,7 @@ type TriggerWorkerInfo struct { func NewTriggerWorkerInfo(addr string) *TriggerWorkerInfo { twInfo := &TriggerWorkerInfo{ Addr: addr, - Id: util.GetIdByAddr(addr), + ID: util.GetIdByAddr(addr), Phase: TriggerWorkerPhasePending, } twInfo.Init() diff --git a/internal/controller/trigger/storage/fake.go b/internal/controller/trigger/storage/fake.go index 5698251fa..1bb353660 100644 --- a/internal/controller/trigger/storage/fake.go +++ b/internal/controller/trigger/storage/fake.go @@ -104,7 +104,7 @@ func (f *fake) DeleteOffset(ctx context.Context, subId vanus.ID) error { } func (f *fake) SaveTriggerWorker(ctx context.Context, info info.TriggerWorkerInfo) error { - f.tWorkers[info.Id] = &info + f.tWorkers[info.ID] = &info return nil } func (f *fake) GetTriggerWorker(ctx context.Context, id string) (*info.TriggerWorkerInfo, error) { diff --git a/internal/controller/trigger/storage/mock_offset.go b/internal/controller/trigger/storage/mock_offset.go new file mode 100644 index 000000000..a3fe7a358 --- /dev/null +++ b/internal/controller/trigger/storage/mock_offset.go @@ -0,0 +1,94 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: offset.go + +// Package storage is a generated GoMock package. +package storage + +import ( + context "context" + reflect "reflect" + + gomock "github.com/golang/mock/gomock" + info "github.com/linkall-labs/vanus/internal/primitive/info" + vanus "github.com/linkall-labs/vanus/internal/primitive/vanus" +) + +// MockOffsetStorage is a mock of OffsetStorage interface. +type MockOffsetStorage struct { + ctrl *gomock.Controller + recorder *MockOffsetStorageMockRecorder +} + +// MockOffsetStorageMockRecorder is the mock recorder for MockOffsetStorage. +type MockOffsetStorageMockRecorder struct { + mock *MockOffsetStorage +} + +// NewMockOffsetStorage creates a new mock instance. +func NewMockOffsetStorage(ctrl *gomock.Controller) *MockOffsetStorage { + mock := &MockOffsetStorage{ctrl: ctrl} + mock.recorder = &MockOffsetStorageMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockOffsetStorage) EXPECT() *MockOffsetStorageMockRecorder { + return m.recorder +} + +// CreateOffset mocks base method. +func (m *MockOffsetStorage) CreateOffset(ctx context.Context, subId vanus.ID, info info.OffsetInfo) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CreateOffset", ctx, subId, info) + ret0, _ := ret[0].(error) + return ret0 +} + +// CreateOffset indicates an expected call of CreateOffset. +func (mr *MockOffsetStorageMockRecorder) CreateOffset(ctx, subId, info interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateOffset", reflect.TypeOf((*MockOffsetStorage)(nil).CreateOffset), ctx, subId, info) +} + +// DeleteOffset mocks base method. +func (m *MockOffsetStorage) DeleteOffset(ctx context.Context, subId vanus.ID) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DeleteOffset", ctx, subId) + ret0, _ := ret[0].(error) + return ret0 +} + +// DeleteOffset indicates an expected call of DeleteOffset. +func (mr *MockOffsetStorageMockRecorder) DeleteOffset(ctx, subId interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteOffset", reflect.TypeOf((*MockOffsetStorage)(nil).DeleteOffset), ctx, subId) +} + +// GetOffsets mocks base method. +func (m *MockOffsetStorage) GetOffsets(ctx context.Context, subId vanus.ID) (info.ListOffsetInfo, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetOffsets", ctx, subId) + ret0, _ := ret[0].(info.ListOffsetInfo) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetOffsets indicates an expected call of GetOffsets. +func (mr *MockOffsetStorageMockRecorder) GetOffsets(ctx, subId interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetOffsets", reflect.TypeOf((*MockOffsetStorage)(nil).GetOffsets), ctx, subId) +} + +// UpdateOffset mocks base method. +func (m *MockOffsetStorage) UpdateOffset(ctx context.Context, subId vanus.ID, info info.OffsetInfo) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "UpdateOffset", ctx, subId, info) + ret0, _ := ret[0].(error) + return ret0 +} + +// UpdateOffset indicates an expected call of UpdateOffset. +func (mr *MockOffsetStorageMockRecorder) UpdateOffset(ctx, subId, info interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateOffset", reflect.TypeOf((*MockOffsetStorage)(nil).UpdateOffset), ctx, subId, info) +} diff --git a/internal/controller/trigger/storage/mock_storage.go b/internal/controller/trigger/storage/mock_storage.go new file mode 100644 index 000000000..226d45d9f --- /dev/null +++ b/internal/controller/trigger/storage/mock_storage.go @@ -0,0 +1,36 @@ +// Copyright 2022 Linkall Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import "github.com/golang/mock/gomock" + +type MockStorage struct { + *MockOffsetStorage + *MockSubscriptionStorage + *MockTriggerWorkerStorage +} + +// NewMockStorage creates a new mock instance. +func NewMockStorage(ctrl *gomock.Controller) *MockStorage { + mock := &MockStorage{ + MockOffsetStorage: NewMockOffsetStorage(ctrl), + MockSubscriptionStorage: NewMockSubscriptionStorage(ctrl), + MockTriggerWorkerStorage: NewMockTriggerWorkerStorage(ctrl), + } + return mock +} + +func (m *MockStorage) Close() { +} diff --git a/internal/controller/trigger/storage/mock_subscription.go b/internal/controller/trigger/storage/mock_subscription.go new file mode 100644 index 000000000..ec2b763da --- /dev/null +++ b/internal/controller/trigger/storage/mock_subscription.go @@ -0,0 +1,109 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: subscription.go + +// Package storage is a generated GoMock package. +package storage + +import ( + context "context" + reflect "reflect" + + gomock "github.com/golang/mock/gomock" + primitive "github.com/linkall-labs/vanus/internal/primitive" + vanus "github.com/linkall-labs/vanus/internal/primitive/vanus" +) + +// MockSubscriptionStorage is a mock of SubscriptionStorage interface. +type MockSubscriptionStorage struct { + ctrl *gomock.Controller + recorder *MockSubscriptionStorageMockRecorder +} + +// MockSubscriptionStorageMockRecorder is the mock recorder for MockSubscriptionStorage. +type MockSubscriptionStorageMockRecorder struct { + mock *MockSubscriptionStorage +} + +// NewMockSubscriptionStorage creates a new mock instance. +func NewMockSubscriptionStorage(ctrl *gomock.Controller) *MockSubscriptionStorage { + mock := &MockSubscriptionStorage{ctrl: ctrl} + mock.recorder = &MockSubscriptionStorageMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockSubscriptionStorage) EXPECT() *MockSubscriptionStorageMockRecorder { + return m.recorder +} + +// CreateSubscription mocks base method. +func (m *MockSubscriptionStorage) CreateSubscription(ctx context.Context, sub *primitive.SubscriptionData) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CreateSubscription", ctx, sub) + ret0, _ := ret[0].(error) + return ret0 +} + +// CreateSubscription indicates an expected call of CreateSubscription. +func (mr *MockSubscriptionStorageMockRecorder) CreateSubscription(ctx, sub interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateSubscription", reflect.TypeOf((*MockSubscriptionStorage)(nil).CreateSubscription), ctx, sub) +} + +// DeleteSubscription mocks base method. +func (m *MockSubscriptionStorage) DeleteSubscription(ctx context.Context, subId vanus.ID) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DeleteSubscription", ctx, subId) + ret0, _ := ret[0].(error) + return ret0 +} + +// DeleteSubscription indicates an expected call of DeleteSubscription. +func (mr *MockSubscriptionStorageMockRecorder) DeleteSubscription(ctx, subId interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteSubscription", reflect.TypeOf((*MockSubscriptionStorage)(nil).DeleteSubscription), ctx, subId) +} + +// GetSubscription mocks base method. +func (m *MockSubscriptionStorage) GetSubscription(ctx context.Context, subId vanus.ID) (*primitive.SubscriptionData, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetSubscription", ctx, subId) + ret0, _ := ret[0].(*primitive.SubscriptionData) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetSubscription indicates an expected call of GetSubscription. +func (mr *MockSubscriptionStorageMockRecorder) GetSubscription(ctx, subId interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetSubscription", reflect.TypeOf((*MockSubscriptionStorage)(nil).GetSubscription), ctx, subId) +} + +// ListSubscription mocks base method. +func (m *MockSubscriptionStorage) ListSubscription(ctx context.Context) ([]*primitive.SubscriptionData, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ListSubscription", ctx) + ret0, _ := ret[0].([]*primitive.SubscriptionData) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ListSubscription indicates an expected call of ListSubscription. +func (mr *MockSubscriptionStorageMockRecorder) ListSubscription(ctx interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListSubscription", reflect.TypeOf((*MockSubscriptionStorage)(nil).ListSubscription), ctx) +} + +// UpdateSubscription mocks base method. +func (m *MockSubscriptionStorage) UpdateSubscription(ctx context.Context, sub *primitive.SubscriptionData) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "UpdateSubscription", ctx, sub) + ret0, _ := ret[0].(error) + return ret0 +} + +// UpdateSubscription indicates an expected call of UpdateSubscription. +func (mr *MockSubscriptionStorageMockRecorder) UpdateSubscription(ctx, sub interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateSubscription", reflect.TypeOf((*MockSubscriptionStorage)(nil).UpdateSubscription), ctx, sub) +} diff --git a/internal/controller/trigger/storage/mock_trigger_worker.go b/internal/controller/trigger/storage/mock_trigger_worker.go new file mode 100644 index 000000000..baa8ca448 --- /dev/null +++ b/internal/controller/trigger/storage/mock_trigger_worker.go @@ -0,0 +1,94 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: trigger_worker.go + +// Package storage is a generated GoMock package. +package storage + +import ( + context "context" + reflect "reflect" + + gomock "github.com/golang/mock/gomock" + info "github.com/linkall-labs/vanus/internal/controller/trigger/info" +) + +// MockTriggerWorkerStorage is a mock of TriggerWorkerStorage interface. +type MockTriggerWorkerStorage struct { + ctrl *gomock.Controller + recorder *MockTriggerWorkerStorageMockRecorder +} + +// MockTriggerWorkerStorageMockRecorder is the mock recorder for MockTriggerWorkerStorage. +type MockTriggerWorkerStorageMockRecorder struct { + mock *MockTriggerWorkerStorage +} + +// NewMockTriggerWorkerStorage creates a new mock instance. +func NewMockTriggerWorkerStorage(ctrl *gomock.Controller) *MockTriggerWorkerStorage { + mock := &MockTriggerWorkerStorage{ctrl: ctrl} + mock.recorder = &MockTriggerWorkerStorageMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockTriggerWorkerStorage) EXPECT() *MockTriggerWorkerStorageMockRecorder { + return m.recorder +} + +// DeleteTriggerWorker mocks base method. +func (m *MockTriggerWorkerStorage) DeleteTriggerWorker(ctx context.Context, id string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DeleteTriggerWorker", ctx, id) + ret0, _ := ret[0].(error) + return ret0 +} + +// DeleteTriggerWorker indicates an expected call of DeleteTriggerWorker. +func (mr *MockTriggerWorkerStorageMockRecorder) DeleteTriggerWorker(ctx, id interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteTriggerWorker", reflect.TypeOf((*MockTriggerWorkerStorage)(nil).DeleteTriggerWorker), ctx, id) +} + +// GetTriggerWorker mocks base method. +func (m *MockTriggerWorkerStorage) GetTriggerWorker(ctx context.Context, id string) (*info.TriggerWorkerInfo, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetTriggerWorker", ctx, id) + ret0, _ := ret[0].(*info.TriggerWorkerInfo) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetTriggerWorker indicates an expected call of GetTriggerWorker. +func (mr *MockTriggerWorkerStorageMockRecorder) GetTriggerWorker(ctx, id interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetTriggerWorker", reflect.TypeOf((*MockTriggerWorkerStorage)(nil).GetTriggerWorker), ctx, id) +} + +// ListTriggerWorker mocks base method. +func (m *MockTriggerWorkerStorage) ListTriggerWorker(ctx context.Context) ([]*info.TriggerWorkerInfo, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ListTriggerWorker", ctx) + ret0, _ := ret[0].([]*info.TriggerWorkerInfo) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ListTriggerWorker indicates an expected call of ListTriggerWorker. +func (mr *MockTriggerWorkerStorageMockRecorder) ListTriggerWorker(ctx interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListTriggerWorker", reflect.TypeOf((*MockTriggerWorkerStorage)(nil).ListTriggerWorker), ctx) +} + +// SaveTriggerWorker mocks base method. +func (m *MockTriggerWorkerStorage) SaveTriggerWorker(arg0 context.Context, arg1 info.TriggerWorkerInfo) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SaveTriggerWorker", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// SaveTriggerWorker indicates an expected call of SaveTriggerWorker. +func (mr *MockTriggerWorkerStorageMockRecorder) SaveTriggerWorker(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SaveTriggerWorker", reflect.TypeOf((*MockTriggerWorkerStorage)(nil).SaveTriggerWorker), arg0, arg1) +} diff --git a/internal/controller/trigger/storage/offset.go b/internal/controller/trigger/storage/offset.go index 2c8d3def3..f2e82f330 100644 --- a/internal/controller/trigger/storage/offset.go +++ b/internal/controller/trigger/storage/offset.go @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +//go:generate mockgen -source=offset.go -destination=mock_offset.go -package=storage package storage import ( diff --git a/internal/controller/trigger/storage/offset_test.go b/internal/controller/trigger/storage/offset_test.go new file mode 100644 index 000000000..569f0edee --- /dev/null +++ b/internal/controller/trigger/storage/offset_test.go @@ -0,0 +1,101 @@ +// Copyright 2022 Linkall Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "context" + "fmt" + "testing" + + "github.com/linkall-labs/vanus/internal/kv" + "github.com/linkall-labs/vanus/internal/primitive/info" + "github.com/linkall-labs/vanus/internal/primitive/vanus" + + "github.com/golang/mock/gomock" + . "github.com/smartystreets/goconvey/convey" +) + +func TestCreateOffset(t *testing.T) { + ctx := context.Background() + ctrl := gomock.NewController(t) + defer ctrl.Finish() + kvClient := kv.NewMockClient(ctrl) + s := NewOffsetStorage(kvClient).(*offsetStorage) + subID := vanus.ID(1) + eventLogID := vanus.ID(1) + offset := uint64(100) + Convey("create offset", t, func() { + kvClient.EXPECT().Create(ctx, s.getKey(subID, eventLogID), s.int64ToByteArr(offset)).Return(nil) + err := s.CreateOffset(ctx, subID, info.OffsetInfo{ + EventLogID: eventLogID, + Offset: offset, + }) + So(err, ShouldBeNil) + }) +} + +func TestUpdateOffset(t *testing.T) { + ctx := context.Background() + ctrl := gomock.NewController(t) + defer ctrl.Finish() + kvClient := kv.NewMockClient(ctrl) + s := NewOffsetStorage(kvClient).(*offsetStorage) + subID := vanus.ID(1) + eventLogID := vanus.ID(1) + offset := uint64(100) + Convey("update offset", t, func() { + kvClient.EXPECT().Update(ctx, s.getKey(subID, eventLogID), s.int64ToByteArr(offset)).Return(nil) + err := s.UpdateOffset(ctx, subID, info.OffsetInfo{ + EventLogID: eventLogID, + Offset: offset, + }) + So(err, ShouldBeNil) + }) +} + +func TestGetOffset(t *testing.T) { + ctx := context.Background() + ctrl := gomock.NewController(t) + defer ctrl.Finish() + kvClient := kv.NewMockClient(ctrl) + s := NewOffsetStorage(kvClient).(*offsetStorage) + subID := vanus.ID(1) + eventLogID := vanus.ID(1) + offset := uint64(100) + Convey("get offset", t, func() { + kvClient.EXPECT().List(ctx, s.getSubKey(subID)).Return([]kv.Pair{ + {Key: fmt.Sprintf("/test/%d", eventLogID), Value: s.int64ToByteArr(offset)}, + }, nil) + offsets, err := s.GetOffsets(ctx, subID) + So(err, ShouldBeNil) + So(len(offsets), ShouldEqual, 1) + So(offsets[0].Offset, ShouldEqual, offset) + So(offsets[0].EventLogID, ShouldEqual, eventLogID) + }) +} + +func TestDeleteOffset(t *testing.T) { + ctx := context.Background() + ctrl := gomock.NewController(t) + defer ctrl.Finish() + kvClient := kv.NewMockClient(ctrl) + s := NewOffsetStorage(kvClient).(*offsetStorage) + subID := vanus.ID(1) + Convey("delete offset", t, func() { + kvClient.EXPECT().DeleteDir(ctx, s.getSubKey(subID)).Return(nil) + err := s.DeleteOffset(ctx, subID) + So(err, ShouldBeNil) + }) +} diff --git a/internal/controller/trigger/storage/subscription.go b/internal/controller/trigger/storage/subscription.go index 9ddc21745..a33d21417 100644 --- a/internal/controller/trigger/storage/subscription.go +++ b/internal/controller/trigger/storage/subscription.go @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +//go:generate mockgen -source=subscription.go -destination=mock_subscription.go -package=storage package storage import ( diff --git a/internal/controller/trigger/storage/subscription_test.go b/internal/controller/trigger/storage/subscription_test.go new file mode 100644 index 000000000..7d0b0e6fa --- /dev/null +++ b/internal/controller/trigger/storage/subscription_test.go @@ -0,0 +1,118 @@ +// Copyright 2022 Linkall Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "context" + "encoding/json" + "fmt" + "testing" + + "github.com/linkall-labs/vanus/internal/kv" + "github.com/linkall-labs/vanus/internal/primitive" + "github.com/linkall-labs/vanus/internal/primitive/vanus" + + "github.com/golang/mock/gomock" + . "github.com/smartystreets/goconvey/convey" +) + +func TestCreateSubscription(t *testing.T) { + ctx := context.Background() + ctrl := gomock.NewController(t) + defer ctrl.Finish() + kvClient := kv.NewMockClient(ctrl) + s := NewSubscriptionStorage(kvClient).(*subscriptionStorage) + subID := vanus.ID(1) + Convey("create subscription", t, func() { + kvClient.EXPECT().Create(ctx, s.getKey(subID), gomock.Any()).Return(nil) + err := s.CreateSubscription(ctx, &primitive.SubscriptionData{ + ID: subID, + }) + So(err, ShouldBeNil) + }) +} + +func TestUpdateSubscription(t *testing.T) { + ctx := context.Background() + ctrl := gomock.NewController(t) + defer ctrl.Finish() + kvClient := kv.NewMockClient(ctrl) + s := NewSubscriptionStorage(kvClient).(*subscriptionStorage) + subID := vanus.ID(1) + Convey("update subscription", t, func() { + kvClient.EXPECT().Update(ctx, s.getKey(subID), gomock.Any()).Return(nil) + err := s.UpdateSubscription(ctx, &primitive.SubscriptionData{ + ID: subID, + }) + So(err, ShouldBeNil) + }) +} + +func TestGetSubscription(t *testing.T) { + ctx := context.Background() + ctrl := gomock.NewController(t) + defer ctrl.Finish() + kvClient := kv.NewMockClient(ctrl) + s := NewSubscriptionStorage(kvClient).(*subscriptionStorage) + subID := vanus.ID(1) + Convey("get subscription", t, func() { + expect := &primitive.SubscriptionData{ + ID: subID, + EventBus: "bus", + } + v, _ := json.Marshal(expect) + kvClient.EXPECT().Get(ctx, s.getKey(subID)).Return(v, nil) + data, err := s.GetSubscription(ctx, subID) + So(err, ShouldBeNil) + So(data.EventBus, ShouldEqual, expect.EventBus) + }) +} + +func TestDeleteSubscription(t *testing.T) { + ctx := context.Background() + ctrl := gomock.NewController(t) + defer ctrl.Finish() + kvClient := kv.NewMockClient(ctrl) + s := NewSubscriptionStorage(kvClient).(*subscriptionStorage) + subID := vanus.ID(1) + Convey("delete subscription", t, func() { + kvClient.EXPECT().Delete(ctx, s.getKey(subID)).Return(nil) + err := s.DeleteSubscription(ctx, subID) + So(err, ShouldBeNil) + }) +} + +func TestListSubscription(t *testing.T) { + ctx := context.Background() + ctrl := gomock.NewController(t) + defer ctrl.Finish() + kvClient := kv.NewMockClient(ctrl) + s := NewSubscriptionStorage(kvClient).(*subscriptionStorage) + subID := vanus.ID(1) + Convey("list subscription", t, func() { + expect := &primitive.SubscriptionData{ + ID: subID, + EventBus: "bus", + } + v, _ := json.Marshal(expect) + kvClient.EXPECT().List(ctx, StorageSubscription.String()).Return([]kv.Pair{ + {Key: fmt.Sprintf("%d", subID), Value: v}, + }, nil) + list, err := s.ListSubscription(ctx) + So(err, ShouldBeNil) + So(len(list), ShouldEqual, 1) + So(list[0].EventBus, ShouldEqual, expect.EventBus) + }) +} diff --git a/internal/controller/trigger/storage/trigger_worker.go b/internal/controller/trigger/storage/trigger_worker.go index 7dc430dab..6b1edf1ea 100644 --- a/internal/controller/trigger/storage/trigger_worker.go +++ b/internal/controller/trigger/storage/trigger_worker.go @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +//go:generate mockgen -source=trigger_worker.go -destination=mock_trigger_worker.go -package=storage package storage import ( @@ -47,7 +48,7 @@ func (s *triggerWorkerStorage) getKey(id string) string { } func (s *triggerWorkerStorage) SaveTriggerWorker(ctx context.Context, info info.TriggerWorkerInfo) error { - key := s.getKey(info.Id) + key := s.getKey(info.ID) exist, err := s.client.Exists(ctx, key) if err != nil { return err @@ -90,7 +91,7 @@ func (s *triggerWorkerStorage) ListTriggerWorker(ctx context.Context) ([]*info.T if err != nil { return nil, errors.ErrJsonUnMarshal.Wrap(err) } - tWorker.Id = filepath.Base(v.Key) + tWorker.ID = filepath.Base(v.Key) list = append(list, &tWorker) } return list, nil diff --git a/internal/controller/trigger/storage/trigger_worker_test.go b/internal/controller/trigger/storage/trigger_worker_test.go new file mode 100644 index 000000000..7394b9c88 --- /dev/null +++ b/internal/controller/trigger/storage/trigger_worker_test.go @@ -0,0 +1,113 @@ +// Copyright 2022 Linkall Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "context" + "encoding/json" + "fmt" + "testing" + + "github.com/linkall-labs/vanus/internal/controller/trigger/info" + "github.com/linkall-labs/vanus/internal/kv" + + "github.com/golang/mock/gomock" + . "github.com/smartystreets/goconvey/convey" +) + +func TestSaveTriggerWorker(t *testing.T) { + ctx := context.Background() + ctrl := gomock.NewController(t) + defer ctrl.Finish() + kvClient := kv.NewMockClient(ctrl) + s := NewTriggerWorkerStorage(kvClient).(*triggerWorkerStorage) + ID := "testID" + Convey("create trigger worker", t, func() { + kvClient.EXPECT().Exists(ctx, s.getKey(ID)).Return(false, nil) + kvClient.EXPECT().Create(ctx, s.getKey(ID), gomock.Any()).Return(nil) + err := s.SaveTriggerWorker(ctx, info.TriggerWorkerInfo{ + ID: ID, + Addr: "test", + }) + So(err, ShouldBeNil) + }) + + Convey("update trigger worker", t, func() { + kvClient.EXPECT().Exists(ctx, s.getKey(ID)).Return(true, nil) + kvClient.EXPECT().Update(ctx, s.getKey(ID), gomock.Any()).Return(nil) + err := s.SaveTriggerWorker(ctx, info.TriggerWorkerInfo{ + ID: ID, + Addr: "test", + }) + So(err, ShouldBeNil) + }) +} + +func TestGetTriggerWorker(t *testing.T) { + ctx := context.Background() + ctrl := gomock.NewController(t) + defer ctrl.Finish() + kvClient := kv.NewMockClient(ctrl) + s := NewTriggerWorkerStorage(kvClient).(*triggerWorkerStorage) + ID := "testID" + Convey("get trigger worker", t, func() { + expect := info.TriggerWorkerInfo{ + ID: ID, + Addr: "test", + } + v, _ := json.Marshal(expect) + kvClient.EXPECT().Get(ctx, s.getKey(ID)).Return(v, nil) + data, err := s.GetTriggerWorker(ctx, ID) + So(err, ShouldBeNil) + So(data.Addr, ShouldEqual, expect.Addr) + }) +} + +func TestDeleteTriggerWorker(t *testing.T) { + ctx := context.Background() + ctrl := gomock.NewController(t) + defer ctrl.Finish() + kvClient := kv.NewMockClient(ctrl) + s := NewTriggerWorkerStorage(kvClient).(*triggerWorkerStorage) + ID := "testID" + Convey("delete trigger worker", t, func() { + kvClient.EXPECT().Delete(ctx, s.getKey(ID)).Return(nil) + err := s.DeleteTriggerWorker(ctx, ID) + So(err, ShouldBeNil) + }) +} + +func TestListTriggerWorker(t *testing.T) { + ctx := context.Background() + ctrl := gomock.NewController(t) + defer ctrl.Finish() + kvClient := kv.NewMockClient(ctrl) + s := NewTriggerWorkerStorage(kvClient).(*triggerWorkerStorage) + ID := "testID" + Convey("list trigger worker", t, func() { + expect := info.TriggerWorkerInfo{ + ID: ID, + Addr: "test", + } + v, _ := json.Marshal(expect) + kvClient.EXPECT().List(ctx, s.getKey("/")).Return([]kv.Pair{ + {Key: fmt.Sprintf("%s", ID), Value: v}, + }, nil) + list, err := s.ListTriggerWorker(ctx) + So(err, ShouldBeNil) + So(len(list), ShouldEqual, 1) + So(list[0].Addr, ShouldEqual, expect.Addr) + }) +} diff --git a/internal/controller/trigger/subscription/testing/mock_subscription_manager.go b/internal/controller/trigger/subscription/mock_subscription_manager.go similarity index 98% rename from internal/controller/trigger/subscription/testing/mock_subscription_manager.go rename to internal/controller/trigger/subscription/mock_subscription_manager.go index 8df53b555..95487ac57 100644 --- a/internal/controller/trigger/subscription/testing/mock_subscription_manager.go +++ b/internal/controller/trigger/subscription/mock_subscription_manager.go @@ -1,8 +1,8 @@ // Code generated by MockGen. DO NOT EDIT. // Source: subscription_manager.go -// Package testing is a generated GoMock package. -package testing +// Package subscription is a generated GoMock package. +package subscription import ( context "context" diff --git a/internal/controller/trigger/subscription/offset/mock_offset.go b/internal/controller/trigger/subscription/offset/mock_offset.go new file mode 100644 index 000000000..4f0791a61 --- /dev/null +++ b/internal/controller/trigger/subscription/offset/mock_offset.go @@ -0,0 +1,104 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: offset.go + +// Package offset is a generated GoMock package. +package offset + +import ( + context "context" + reflect "reflect" + + gomock "github.com/golang/mock/gomock" + info "github.com/linkall-labs/vanus/internal/primitive/info" + vanus "github.com/linkall-labs/vanus/internal/primitive/vanus" +) + +// MockManager is a mock of Manager interface. +type MockManager struct { + ctrl *gomock.Controller + recorder *MockManagerMockRecorder +} + +// MockManagerMockRecorder is the mock recorder for MockManager. +type MockManagerMockRecorder struct { + mock *MockManager +} + +// NewMockManager creates a new mock instance. +func NewMockManager(ctrl *gomock.Controller) *MockManager { + mock := &MockManager{ctrl: ctrl} + mock.recorder = &MockManagerMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockManager) EXPECT() *MockManagerMockRecorder { + return m.recorder +} + +// GetOffset mocks base method. +func (m *MockManager) GetOffset(ctx context.Context, subId vanus.ID) (info.ListOffsetInfo, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetOffset", ctx, subId) + ret0, _ := ret[0].(info.ListOffsetInfo) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetOffset indicates an expected call of GetOffset. +func (mr *MockManagerMockRecorder) GetOffset(ctx, subId interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetOffset", reflect.TypeOf((*MockManager)(nil).GetOffset), ctx, subId) +} + +// Offset mocks base method. +func (m *MockManager) Offset(ctx context.Context, subId vanus.ID, offsets info.ListOffsetInfo) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Offset", ctx, subId, offsets) + ret0, _ := ret[0].(error) + return ret0 +} + +// Offset indicates an expected call of Offset. +func (mr *MockManagerMockRecorder) Offset(ctx, subId, offsets interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Offset", reflect.TypeOf((*MockManager)(nil).Offset), ctx, subId, offsets) +} + +// RemoveRegisterSubscription mocks base method. +func (m *MockManager) RemoveRegisterSubscription(ctx context.Context, subId vanus.ID) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "RemoveRegisterSubscription", ctx, subId) + ret0, _ := ret[0].(error) + return ret0 +} + +// RemoveRegisterSubscription indicates an expected call of RemoveRegisterSubscription. +func (mr *MockManagerMockRecorder) RemoveRegisterSubscription(ctx, subId interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RemoveRegisterSubscription", reflect.TypeOf((*MockManager)(nil).RemoveRegisterSubscription), ctx, subId) +} + +// Start mocks base method. +func (m *MockManager) Start() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Start") +} + +// Start indicates an expected call of Start. +func (mr *MockManagerMockRecorder) Start() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Start", reflect.TypeOf((*MockManager)(nil).Start)) +} + +// Stop mocks base method. +func (m *MockManager) Stop() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Stop") +} + +// Stop indicates an expected call of Stop. +func (mr *MockManagerMockRecorder) Stop() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Stop", reflect.TypeOf((*MockManager)(nil).Stop)) +} diff --git a/internal/controller/trigger/subscription/offset/offset.go b/internal/controller/trigger/subscription/offset/offset.go index 399b26d89..7ebe6b043 100644 --- a/internal/controller/trigger/subscription/offset/offset.go +++ b/internal/controller/trigger/subscription/offset/offset.go @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +//go:generate mockgen -source=offset.go -destination=mock_offset.go -package=offset package offset import ( diff --git a/internal/controller/trigger/subscription/offset/offset_test.go b/internal/controller/trigger/subscription/offset/offset_test.go index c7a5efce8..b87eddc33 100644 --- a/internal/controller/trigger/subscription/offset/offset_test.go +++ b/internal/controller/trigger/subscription/offset/offset_test.go @@ -23,53 +23,95 @@ import ( "github.com/linkall-labs/vanus/internal/primitive/info" "github.com/linkall-labs/vanus/internal/primitive/vanus" + "github.com/golang/mock/gomock" . "github.com/smartystreets/goconvey/convey" ) -func TestOffset(t *testing.T) { - storage := storage.NewFakeStorage() - m := NewOffsetManager(storage, 10*time.Microsecond) +func TestGetOffset(t *testing.T) { ctx := context.Background() - Convey("offset", t, func() { - subId := vanus.ID(1) - eventLogID := vanus.ID(1) - offset := uint64(1) - Convey("get offset is empty", func() { - m.RemoveRegisterSubscription(ctx, subId) - offsets, _ := m.GetOffset(ctx, subId) - So(len(offsets), ShouldEqual, 0) - }) + ctrl := gomock.NewController(t) + defer ctrl.Finish() + storage := storage.NewMockOffsetStorage(ctrl) + storage.EXPECT().DeleteOffset(gomock.Any(), gomock.Any()).AnyTimes().Return(nil) + m := NewOffsetManager(storage, 10*time.Microsecond).(*manager) + subId := vanus.ID(1) + eventLogID := vanus.ID(1) + offset := uint64(1) - Convey("get offset", func() { - m.RemoveRegisterSubscription(ctx, subId) - storage.CreateOffset(ctx, subId, info.OffsetInfo{ - EventLogID: eventLogID, - Offset: offset, - }) - offsets, _ := m.GetOffset(ctx, subId) - So(len(offsets), ShouldEqual, 1) - So(offsets[0].Offset, ShouldEqual, offset) + Convey("get offset storage is empty", t, func() { + storage.EXPECT().GetOffsets(gomock.Any(), subId).Return(info.ListOffsetInfo{}, nil) + offsets, _ := m.GetOffset(ctx, subId) + So(len(offsets), ShouldEqual, 0) + subOffset, exist := m.subOffset.Load(subId) + So(exist, ShouldBeTrue) + So(subOffset, ShouldNotBeNil) + m.RemoveRegisterSubscription(ctx, subId) + }) - }) + Convey("get offset storage has", t, func() { + storage.EXPECT().GetOffsets(gomock.Any(), subId).Return(info.ListOffsetInfo{info.OffsetInfo{ + EventLogID: eventLogID, + Offset: offset, + }}, nil) + offsets, _ := m.GetOffset(ctx, subId) + So(len(offsets), ShouldEqual, 1) + So(offsets[0].Offset, ShouldEqual, offset) + subOffset, exist := m.subOffset.Load(subId) + So(exist, ShouldBeTrue) + So(subOffset, ShouldNotBeNil) + }) +} - Convey("offset", func() { - m.RemoveRegisterSubscription(ctx, subId) - m.Offset(ctx, subId, []info.OffsetInfo{{EventLogID: eventLogID, Offset: offset}}) - offsets, _ := m.GetOffset(ctx, subId) - So(len(offsets), ShouldEqual, 1) - So(offsets[0].Offset, ShouldEqual, offset) - }) +func TestSetOffset(t *testing.T) { + ctx := context.Background() + ctrl := gomock.NewController(t) + defer ctrl.Finish() + storage := storage.NewMockOffsetStorage(ctrl) + storage.EXPECT().DeleteOffset(gomock.Any(), gomock.Any()).AnyTimes().Return(nil) + m := NewOffsetManager(storage, 10*time.Microsecond) + subId := vanus.ID(1) + eventLogID := vanus.ID(1) + offset := uint64(1) - Convey("commit", func() { - m.RemoveRegisterSubscription(ctx, subId) + Convey("set offset", t, func() { + storage.EXPECT().GetOffsets(gomock.Any(), subId).Return(info.ListOffsetInfo{}, nil) + m.Offset(ctx, subId, []info.OffsetInfo{{EventLogID: eventLogID, Offset: offset}}) + offsets, _ := m.GetOffset(ctx, subId) + So(len(offsets), ShouldEqual, 1) + So(offsets[0].Offset, ShouldEqual, offset) + m.RemoveRegisterSubscription(ctx, subId) + }) +} + +func TestCommit(t *testing.T) { + ctx := context.Background() + ctrl := gomock.NewController(t) + defer ctrl.Finish() + storage := storage.NewMockOffsetStorage(ctrl) + storage.EXPECT().DeleteOffset(gomock.Any(), gomock.Any()).AnyTimes().Return(nil) + m := NewOffsetManager(storage, 10*time.Microsecond).(*manager) + subId := vanus.ID(1) + eventLogID := vanus.ID(1) + offset := uint64(1) + + Convey("commit", t, func() { + Convey("commit with storage create", func() { + storage.EXPECT().GetOffsets(gomock.Any(), subId).Return(info.ListOffsetInfo{}, nil) + storage.EXPECT().CreateOffset(gomock.Any(), subId, gomock.Any()).Return(nil) m.Offset(ctx, subId, []info.OffsetInfo{{EventLogID: eventLogID, Offset: offset}}) offsets, _ := m.GetOffset(ctx, subId) So(len(offsets), ShouldEqual, 1) So(offsets[0].Offset, ShouldEqual, offset) - m.Start() - m.Stop() + m.commit(ctx) + Convey("commit with storage update", func() { + offset++ + m.Offset(ctx, subId, []info.OffsetInfo{{EventLogID: eventLogID, Offset: offset}}) + storage.EXPECT().UpdateOffset(gomock.Any(), subId, gomock.Any()).Return(nil) + m.commit(ctx) + }) }) }) + } func TestStart(t *testing.T) { diff --git a/internal/controller/trigger/subscription/subscription_manager.go b/internal/controller/trigger/subscription/subscription_manager.go index 41872eac7..8c62fe0c6 100644 --- a/internal/controller/trigger/subscription/subscription_manager.go +++ b/internal/controller/trigger/subscription/subscription_manager.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -//go:generate mockgen -source=subscription_manager.go -destination=testing/mock_subscription_manager.go -package=testing +//go:generate mockgen -source=subscription_manager.go -destination=mock_subscription_manager.go -package=subscription package subscription import ( diff --git a/internal/controller/trigger/subscription/subscription_manager_test.go b/internal/controller/trigger/subscription/subscription_manager_test.go new file mode 100644 index 000000000..493087d8a --- /dev/null +++ b/internal/controller/trigger/subscription/subscription_manager_test.go @@ -0,0 +1,177 @@ +// Copyright 2022 Linkall Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package subscription + +import ( + "context" + "testing" + "time" + + "github.com/linkall-labs/vanus/internal/controller/trigger/storage" + "github.com/linkall-labs/vanus/internal/controller/trigger/subscription/offset" + "github.com/linkall-labs/vanus/internal/primitive" + "github.com/linkall-labs/vanus/internal/primitive/info" + "github.com/linkall-labs/vanus/internal/primitive/vanus" + + "github.com/golang/mock/gomock" + . "github.com/smartystreets/goconvey/convey" +) + +func TestInit(t *testing.T) { + ctx := context.Background() + ctrl := gomock.NewController(t) + defer ctrl.Finish() + storage := storage.NewMockStorage(ctrl) + m := NewSubscriptionManager(storage) + + Convey("init ", t, func() { + storage.MockSubscriptionStorage.EXPECT().ListSubscription(ctx).Return([]*primitive.SubscriptionData{ + {ID: 1}, + }, nil) + err := m.Init(ctx) + So(err, ShouldBeNil) + }) + Convey("start stop", t, func() { + m.Start() + time.Sleep(time.Millisecond * 10) + m.Stop() + }) +} + +func TestSubscriptionData(t *testing.T) { + ctx := context.Background() + ctrl := gomock.NewController(t) + defer ctrl.Finish() + storage := storage.NewMockStorage(ctrl) + m := NewSubscriptionManager(storage) + Convey("list subscription size 0", t, func() { + subscriptionMap := m.ListSubscription(ctx) + So(len(subscriptionMap), ShouldEqual, 0) + }) + Convey("get subscription not exist", t, func() { + subscription := m.GetSubscriptionData(ctx, 1) + So(subscription, ShouldBeNil) + }) + Convey("add subscription", t, func() { + storage.MockSubscriptionStorage.EXPECT().CreateSubscription(ctx, gomock.Any()).Return(nil) + err := m.AddSubscription(ctx, makeSubscription()) + So(err, ShouldBeNil) + }) + var ID vanus.ID + var subscriptionData *primitive.SubscriptionData + Convey("list subscription", t, func() { + subscriptionMap := m.ListSubscription(ctx) + So(len(subscriptionMap), ShouldEqual, 1) + for id, data := range subscriptionMap { + ID = id + subscriptionData = data + } + }) + Convey("get subscription data", t, func() { + subscription := m.GetSubscriptionData(ctx, ID) + So(subscription, ShouldNotBeNil) + }) + Convey("update subscription", t, func() { + storage.MockSubscriptionStorage.EXPECT().UpdateSubscription(ctx, gomock.Any()).Return(nil) + subscriptionData.Sink = "newSink" + err := m.UpdateSubscription(ctx, subscriptionData) + So(err, ShouldBeNil) + subscription := m.GetSubscriptionData(ctx, ID) + So(subscription.Sink, ShouldEqual, subscriptionData.Sink) + }) + Convey("heartbeat", t, func() { + storage.MockSubscriptionStorage.EXPECT().UpdateSubscription(ctx, gomock.Any()).Return(nil) + err := m.Heartbeat(ctx, ID, "addr", time.Now()) + So(err, ShouldBeNil) + }) +} +func TestOffset(t *testing.T) { + ctx := context.Background() + ctrl := gomock.NewController(t) + defer ctrl.Finish() + storage := storage.NewMockStorage(ctrl) + m := NewSubscriptionManager(storage).(*manager) + offsetManager := offset.NewMockManager(ctrl) + m.offsetManager = offsetManager + storage.MockSubscriptionStorage.EXPECT().CreateSubscription(ctx, gomock.Any()).Return(nil) + m.AddSubscription(ctx, makeSubscription()) + var ID vanus.ID + subscriptionMap := m.ListSubscription(ctx) + for id := range subscriptionMap { + ID = id + } + listOffsetInfo := info.ListOffsetInfo{ + {EventLogID: 1, Offset: 10}, + } + Convey("set offset ", t, func() { + err := m.Offset(ctx, 1, listOffsetInfo) + So(err, ShouldBeNil) + offsetManager.EXPECT().Offset(ctx, ID, listOffsetInfo).Return(nil) + err = m.Offset(ctx, ID, listOffsetInfo) + So(err, ShouldBeNil) + }) + Convey("get offset", t, func() { + offsets, err := m.GetOffset(ctx, 1) + So(err, ShouldBeNil) + So(len(offsets), ShouldEqual, 0) + offsetManager.EXPECT().GetOffset(ctx, ID).Return(listOffsetInfo, nil) + offsets, err = m.GetOffset(ctx, ID) + So(err, ShouldBeNil) + So(len(offsets), ShouldEqual, 1) + So(offsets[0].EventLogID, ShouldEqual, 1) + So(offsets[0].Offset, ShouldEqual, 10) + }) +} + +func TestSubscription(t *testing.T) { + ctx := context.Background() + ctrl := gomock.NewController(t) + defer ctrl.Finish() + storage := storage.NewMockStorage(ctrl) + m := NewSubscriptionManager(storage).(*manager) + offsetManager := offset.NewMockManager(ctrl) + m.offsetManager = offsetManager + storage.MockSubscriptionStorage.EXPECT().CreateSubscription(ctx, gomock.Any()).Return(nil) + m.AddSubscription(ctx, makeSubscription()) + var ID vanus.ID + subscriptionMap := m.ListSubscription(ctx) + for id := range subscriptionMap { + ID = id + } + listOffsetInfo := info.ListOffsetInfo{ + {EventLogID: 1, Offset: 10}, + } + offsetManager.EXPECT().GetOffset(ctx, ID).Return(listOffsetInfo, nil) + Convey("get subscription", t, func() { + subscription, err := m.GetSubscription(ctx, ID) + So(err, ShouldBeNil) + So(subscription, ShouldNotBeNil) + So(subscription.Offsets[0].Offset, ShouldEqual, listOffsetInfo[0].Offset) + So(subscription.Offsets[0].EventLogID, ShouldEqual, listOffsetInfo[0].EventLogID) + }) + Convey("delete subscription data", t, func() { + offsetManager.EXPECT().RemoveRegisterSubscription(ctx, gomock.Any()).Return(nil) + storage.MockSubscriptionStorage.EXPECT().DeleteSubscription(ctx, gomock.Any()).Return(nil) + err := m.DeleteSubscription(ctx, ID) + So(err, ShouldBeNil) + subscription, err := m.GetSubscription(ctx, ID) + So(err, ShouldNotBeNil) + So(subscription, ShouldBeNil) + }) +} + +func makeSubscription() *primitive.SubscriptionData { + return &primitive.SubscriptionData{} +} diff --git a/internal/controller/trigger/trigger_controller.go b/internal/controller/trigger/trigger_controller.go index ee93eca5b..cd1f874fd 100644 --- a/internal/controller/trigger/trigger_controller.go +++ b/internal/controller/trigger/trigger_controller.go @@ -71,7 +71,7 @@ func (ctrl *triggerController) CreateSubscription(ctx context.Context, request * if ctrl.state != primitive.ServerStateRunning { return nil, errors.ErrServerNotStart } - err := validation.ConvertCreateSubscriptionRequest(request).Validate(ctx) + err := validation.ValidateCreateSubscription(ctx, request) if err != nil { return nil, err } diff --git a/internal/controller/trigger/validation/subscripton_validation.go b/internal/controller/trigger/validation/subscripton_validation.go index 28d57b4ca..e1cbbceee 100644 --- a/internal/controller/trigger/validation/subscripton_validation.go +++ b/internal/controller/trigger/validation/subscripton_validation.go @@ -25,29 +25,21 @@ import ( cesqlparser "github.com/cloudevents/sdk-go/sql/v2/parser" ) -type createSubscriptionRequestValidator ctrlpb.CreateSubscriptionRequest - -func ConvertCreateSubscriptionRequest(request *ctrlpb.CreateSubscriptionRequest) createSubscriptionRequestValidator { - if request == nil { - return createSubscriptionRequestValidator{} - } - return createSubscriptionRequestValidator(*request) -} - -func (request createSubscriptionRequestValidator) Validate(ctx context.Context) error { - err := filterListValidator(request.Filters).Validate(ctx) +func ValidateCreateSubscription(ctx context.Context, request *ctrlpb.CreateSubscriptionRequest) error { + err := ValidateFilterList(ctx, request.Filters) if err != nil { return errors.ErrInvalidRequest.WithMessage("filters is invalid").Wrap(err) } if request.Sink == "" { return errors.ErrInvalidRequest.WithMessage("sink is empty") } + if request.EventBus == "" { + return errors.ErrInvalidRequest.WithMessage("eventBus is empty") + } return nil } -type filterListValidator []*metapb.Filter - -func (filters filterListValidator) Validate(ctx context.Context) error { +func ValidateFilterList(ctx context.Context, filters []*metapb.Filter) error { if len(filters) == 0 { return nil } @@ -55,7 +47,7 @@ func (filters filterListValidator) Validate(ctx context.Context) error { if f == nil { continue } - err := convertFilterValidation(f).Validate(ctx) + err := ValidateFilter(ctx, f) if err != nil { return err } @@ -63,17 +55,8 @@ func (filters filterListValidator) Validate(ctx context.Context) error { return nil } -type filterValidator metapb.Filter - -func convertFilterValidation(f *metapb.Filter) filterValidator { - if f == nil { - return filterValidator{} - } - return filterValidator(*f) -} - -func (f filterValidator) Validate(ctx context.Context) error { - if f.hasMultipleDialects() { +func ValidateFilter(ctx context.Context, f *metapb.Filter) error { + if hasMultipleDialects(f) { return errors.ErrFilterMultiple.WithMessage("filters can have only one dialect") } err := validateAttributeMap("exact", f.Exact) @@ -101,16 +84,16 @@ func (f filterValidator) Validate(ctx context.Context) error { } } if f.Not != nil { - err = convertFilterValidation(f.Not).Validate(ctx) + err = ValidateFilter(ctx, f.Not) if err != nil { return errors.ErrInvalidRequest.WithMessage("not filter dialect invalid").Wrap(err) } } - err = filterListValidator(f.All).Validate(ctx) + err = ValidateFilterList(ctx, f.All) if err != nil { return errors.ErrInvalidRequest.WithMessage("all filter dialect invalid").Wrap(err) } - err = filterListValidator(f.Any).Validate(ctx) + err = ValidateFilterList(ctx, f.Any) if err != nil { return errors.ErrInvalidRequest.WithMessage("any filter dialect invalid").Wrap(err) } @@ -156,7 +139,7 @@ func validateAttributeMap(attributeName string, attribute map[string]string) err return nil } -func (f filterValidator) hasMultipleDialects() bool { +func hasMultipleDialects(f *metapb.Filter) bool { dialectFound := false if len(f.Exact) > 0 { dialectFound = true @@ -196,7 +179,14 @@ func (f filterValidator) hasMultipleDialects() bool { dialectFound = true } } - if f.Sql != "" && dialectFound { + if f.Sql != "" { + if dialectFound { + return true + } else { + dialectFound = true + } + } + if f.Cel != "" && dialectFound { return true } return false diff --git a/internal/controller/trigger/validation/suscription_validation_test.go b/internal/controller/trigger/validation/suscription_validation_test.go index 67b4296db..a38b1b88c 100644 --- a/internal/controller/trigger/validation/suscription_validation_test.go +++ b/internal/controller/trigger/validation/suscription_validation_test.go @@ -12,12 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -package validation_test +package validation import ( + "context" "testing" - "github.com/linkall-labs/vanus/internal/controller/trigger/validation" ctrlpb "github.com/linkall-labs/vsproto/pkg/controller" metapb "github.com/linkall-labs/vsproto/pkg/meta" @@ -25,26 +25,165 @@ import ( ) func TestCreateSubscriptionRequestValidator(t *testing.T) { + ctx := context.Background() Convey("multiple dialect", t, func() { - request := validation.ConvertCreateSubscriptionRequest(&ctrlpb.CreateSubscriptionRequest{ + request := &ctrlpb.CreateSubscriptionRequest{ Filters: []*metapb.Filter{{ Exact: map[string]string{ "key1": "value1", }, - Suffix: map[string]string{ - "key2": "values2", - }, + Cel: "$type.(string) =='test'", }}, - }) - So(request.Validate(nil), ShouldNotBeNil) + } + So(ValidateCreateSubscription(ctx, request), ShouldNotBeNil) + }) + Convey("sink empty", t, func() { + request := &ctrlpb.CreateSubscriptionRequest{ + Sink: "", + EventBus: "bus", + } + So(ValidateCreateSubscription(ctx, request), ShouldNotBeNil) + }) + Convey("eventBus empty", t, func() { + request := &ctrlpb.CreateSubscriptionRequest{ + Sink: "sink", + EventBus: "", + } + So(ValidateCreateSubscription(ctx, request), ShouldNotBeNil) + }) + +} + +func TestValidateFilter(t *testing.T) { + ctx := context.Background() + Convey("exact key empty", t, func() { + f := &metapb.Filter{ + Exact: map[string]string{ + "": "value", + }, + } + So(ValidateFilter(ctx, f), ShouldNotBeNil) + }) + Convey("exact value empty", t, func() { + f := &metapb.Filter{ + Exact: map[string]string{ + "key": "", + }, + } + So(ValidateFilter(ctx, f), ShouldNotBeNil) + }) + Convey("suffix key empty", t, func() { + f := &metapb.Filter{ + Suffix: map[string]string{ + "": "value", + }, + } + So(ValidateFilter(ctx, f), ShouldNotBeNil) + }) + Convey("suffix value empty", t, func() { + f := &metapb.Filter{ + Suffix: map[string]string{ + "key": "", + }, + } + So(ValidateFilter(ctx, f), ShouldNotBeNil) + }) + Convey("prefix key empty", t, func() { + f := &metapb.Filter{ + Prefix: map[string]string{ + "": "value", + }, + } + So(ValidateFilter(ctx, f), ShouldNotBeNil) + }) + Convey("prefix value empty", t, func() { + f := &metapb.Filter{ + Prefix: map[string]string{ + "key": "", + }, + } + So(ValidateFilter(ctx, f), ShouldNotBeNil) + }) + Convey("sql", t, func() { + f := &metapb.Filter{ + Sql: "source = 'test'", + } + So(ValidateFilter(ctx, f), ShouldBeNil) + }) + Convey("sql invalid", t, func() { + f := &metapb.Filter{ + Sql: "source == 'test'", + } + So(ValidateFilter(ctx, f), ShouldNotBeNil) }) Convey("cel", t, func() { - request := validation.ConvertCreateSubscriptionRequest(&ctrlpb.CreateSubscriptionRequest{ - Filters: []*metapb.Filter{{ - Cel: "$type.(string) =='test'", - }}, - Sink: "http", - }) - So(request.Validate(nil), ShouldBeNil) + f := &metapb.Filter{ + Cel: "$type.(string) =='test'", + } + So(ValidateFilter(ctx, f), ShouldBeNil) + }) + Convey("cel invalid", t, func() { + f := &metapb.Filter{ + Cel: "$type.(string) ==test", + } + So(ValidateFilter(ctx, f), ShouldNotBeNil) + }) + Convey("not", t, func() { + f := &metapb.Filter{ + Exact: map[string]string{ + "key": "value", + }, + } + So(ValidateFilter(ctx, f), ShouldBeNil) + }) + filters := []*metapb.Filter{ + { + Exact: map[string]string{ + "key": "value", + }, + }, { + Suffix: map[string]string{ + "key": "value", + }, + }, + } + filtersInvalid := []*metapb.Filter{ + { + Exact: map[string]string{ + "": "value", + }, + }, { + Suffix: map[string]string{ + "key": "value", + }, + }, + } + Convey("filter list", t, func() { + So(ValidateFilterList(ctx, filters), ShouldBeNil) + So(ValidateFilterList(ctx, filtersInvalid), ShouldNotBeNil) + }) + Convey("all", t, func() { + f := &metapb.Filter{ + All: filters, + } + So(ValidateFilter(ctx, f), ShouldBeNil) + }) + Convey("all invalid", t, func() { + f := &metapb.Filter{ + All: filtersInvalid, + } + So(ValidateFilter(ctx, f), ShouldNotBeNil) + }) + Convey("any", t, func() { + f := &metapb.Filter{ + Any: filters, + } + So(ValidateFilter(ctx, f), ShouldBeNil) + }) + Convey("any invalid", t, func() { + f := &metapb.Filter{ + Any: filtersInvalid, + } + So(ValidateFilter(ctx, f), ShouldNotBeNil) }) } diff --git a/internal/controller/trigger/worker/testing/mock_trigger_worker_manager.go b/internal/controller/trigger/worker/mock_trigger_worker_manager.go similarity index 95% rename from internal/controller/trigger/worker/testing/mock_trigger_worker_manager.go rename to internal/controller/trigger/worker/mock_trigger_worker_manager.go index 525316a97..c30681849 100644 --- a/internal/controller/trigger/worker/testing/mock_trigger_worker_manager.go +++ b/internal/controller/trigger/worker/mock_trigger_worker_manager.go @@ -1,8 +1,8 @@ // Code generated by MockGen. DO NOT EDIT. // Source: trigger_worker_manager.go -// Package testing is a generated GoMock package. -package testing +// Package worker is a generated GoMock package. +package worker import ( context "context" @@ -10,7 +10,6 @@ import ( gomock "github.com/golang/mock/gomock" info "github.com/linkall-labs/vanus/internal/controller/trigger/info" - worker "github.com/linkall-labs/vanus/internal/controller/trigger/worker" vanus "github.com/linkall-labs/vanus/internal/primitive/vanus" ) @@ -52,7 +51,7 @@ func (mr *MockManagerMockRecorder) AddTriggerWorker(ctx, addr interface{}) *gomo } // AssignSubscription mocks base method. -func (m *MockManager) AssignSubscription(ctx context.Context, tWorker *worker.TriggerWorker, subId vanus.ID) { +func (m *MockManager) AssignSubscription(ctx context.Context, tWorker *TriggerWorker, subId vanus.ID) { m.ctrl.T.Helper() m.ctrl.Call(m, "AssignSubscription", ctx, tWorker, subId) } @@ -78,10 +77,10 @@ func (mr *MockManagerMockRecorder) GetActiveRunningTriggerWorker() *gomock.Call } // GetTriggerWorker mocks base method. -func (m *MockManager) GetTriggerWorker(ctx context.Context, addr string) *worker.TriggerWorker { +func (m *MockManager) GetTriggerWorker(ctx context.Context, addr string) *TriggerWorker { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "GetTriggerWorker", ctx, addr) - ret0, _ := ret[0].(*worker.TriggerWorker) + ret0, _ := ret[0].(*TriggerWorker) return ret0 } diff --git a/internal/controller/trigger/worker/trigger_worker_manager.go b/internal/controller/trigger/worker/trigger_worker_manager.go index 6076355cd..caa11e4d5 100644 --- a/internal/controller/trigger/worker/trigger_worker_manager.go +++ b/internal/controller/trigger/worker/trigger_worker_manager.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -//go:generate mockgen -source=trigger_worker_manager.go -destination=testing/mock_trigger_worker_manager.go -package=testing +//go:generate mockgen -source=trigger_worker_manager.go -destination=mock_trigger_worker_manager.go -package=worker package worker import ( @@ -274,7 +274,7 @@ func (m *manager) cleanTriggerWorker(ctx context.Context, tWorker *TriggerWorker log.Info(ctx, "do trigger worker leave success", map[string]interface{}{ log.KeyTriggerWorkerAddr: tWorker.info.Addr, }) - err := m.storage.DeleteTriggerWorker(ctx, tWorker.info.Id) + err := m.storage.DeleteTriggerWorker(ctx, tWorker.info.ID) if err != nil { log.Warning(ctx, "storage delete trigger worker error", map[string]interface{}{ log.KeyError: err, diff --git a/internal/controller/trigger/worker/trigger_worker_manager_test.go b/internal/controller/trigger/worker/trigger_worker_manager_test.go index b60359db0..bbce9e2aa 100644 --- a/internal/controller/trigger/worker/trigger_worker_manager_test.go +++ b/internal/controller/trigger/worker/trigger_worker_manager_test.go @@ -19,13 +19,14 @@ import ( "fmt" "testing" - "github.com/golang/mock/gomock" "github.com/linkall-labs/vanus/internal/controller/trigger/info" "github.com/linkall-labs/vanus/internal/controller/trigger/storage" - subscriptiontest "github.com/linkall-labs/vanus/internal/controller/trigger/subscription/testing" + "github.com/linkall-labs/vanus/internal/controller/trigger/subscription" "github.com/linkall-labs/vanus/internal/primitive" "github.com/linkall-labs/vanus/internal/primitive/vanus" "github.com/linkall-labs/vanus/internal/util" + + "github.com/golang/mock/gomock" . "github.com/smartystreets/goconvey/convey" ) @@ -48,11 +49,11 @@ func TestInit(t *testing.T) { addr := "test" storage := storage.NewFakeStorage() storage.SaveTriggerWorker(ctx, info.TriggerWorkerInfo{ - Id: util.GetIdByAddr(addr), + ID: util.GetIdByAddr(addr), Addr: addr, }) ctrl := gomock.NewController(t) - subManager := subscriptiontest.NewMockManager(ctrl) + subManager := subscription.NewMockManager(ctrl) sub := getTestSubscription() sub.TriggerWorker = addr subManager.EXPECT().ListSubscription(ctx).Return(map[vanus.ID]*primitive.SubscriptionData{ @@ -125,7 +126,7 @@ func TestAssignSubscription(t *testing.T) { storage := storage.NewFakeStorage() sub := getTestSubscription() ctrl := gomock.NewController(t) - subManager := subscriptiontest.NewMockManager(ctrl) + subManager := subscription.NewMockManager(ctrl) sub.TriggerWorker = addr twManager := NewTriggerWorkerManager(Config{}, storage, subManager, getTestTriggerWorkerRemoveSubscription()).(*manager) Convey("assign subscription", t, func() {