From c7de18152136d0257760e2066c62e929fa4462eb Mon Sep 17 00:00:00 2001 From: Peter Broadhurst Date: Mon, 15 Jan 2024 14:16:25 -0500 Subject: [PATCH 1/9] Enable batch delivery over WebSockets Signed-off-by: Peter Broadhurst --- .../events/websockets/websocket_connection.go | 64 +++++- internal/events/websockets/websockets.go | 19 +- internal/events/websockets/websockets_test.go | 199 ++++++++++++++++-- pkg/core/websocket_actions.go | 10 +- 4 files changed, 269 insertions(+), 23 deletions(-) diff --git a/internal/events/websockets/websocket_connection.go b/internal/events/websockets/websocket_connection.go index e72d1eff4..b41945a0f 100644 --- a/internal/events/websockets/websocket_connection.go +++ b/internal/events/websockets/websocket_connection.go @@ -1,4 +1,4 @@ -// Copyright © 2023 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -49,6 +49,7 @@ type websocketConnection struct { autoAck bool started []*websocketStartedSub inflight []*core.EventDeliveryResponse + inflightBatches []*core.WSEventBatch mux sync.Mutex closed bool remoteAddr string @@ -231,6 +232,47 @@ func (wc *websocketConnection) dispatch(event *core.EventDelivery) error { return nil } +func (wc *websocketConnection) dispatchBatch(sub *core.Subscription, events []*core.CombinedEventDataDelivery) error { + inflightBatch := &core.WSEventBatch{ + ID: fftypes.NewUUID(), + Subscription: sub.SubscriptionRef, + Events: make([]*core.EventDelivery, len(events)), + } + for i, e := range events { + inflightBatch.Events[i] = e.Event + } + + var autoAck bool + wc.mux.Lock() + autoAck = wc.autoAck + if !autoAck { + wc.inflightBatches = append(wc.inflightBatches, inflightBatch) + } + wc.mux.Unlock() + + err := wc.send(inflightBatch) + if err != nil { + return err + } + + if autoAck { + wc.ackBatch(inflightBatch) + } + + return nil +} + +func (wc *websocketConnection) ackBatch(batch *core.WSEventBatch) { + for _, e := range batch.Events { + // We individually drive an ack back on each event, but do so in one pass + // (this matches the webhook implementation of batching). + wc.ws.ack(wc.connID, &core.EventDeliveryResponse{ + ID: e.ID, + Subscription: batch.Subscription, + }) + } +} + func (wc *websocketConnection) protocolError(err error) { log.L(wc.ctx).Errorf("Sending protocol error to client: %s", err) sendErr := wc.send(&core.WSError{ @@ -309,6 +351,22 @@ func (wc *websocketConnection) durableSubMatcher(sr core.SubscriptionRef) bool { return false } +func (wc *websocketConnection) handleBatchAck(ack *core.WSAck) (handled bool) { + wc.mux.Lock() + defer wc.mux.Unlock() + var newInflightBatches []*core.WSEventBatch + for _, batch := range wc.inflightBatches { + if batch.ID.Equals(ack.ID) { // nil safe check + wc.ackBatch(batch) + handled = true + } else { + newInflightBatches = append(newInflightBatches, batch) + } + } + wc.inflightBatches = newInflightBatches + return handled +} + func (wc *websocketConnection) checkAck(ack *core.WSAck) (*core.EventDeliveryResponse, error) { l := log.L(wc.ctx) var inflight *core.EventDeliveryResponse @@ -363,6 +421,10 @@ func (wc *websocketConnection) checkAck(ack *core.WSAck) (*core.EventDeliveryRes } func (wc *websocketConnection) handleAck(ack *core.WSAck) error { + if handled := wc.handleBatchAck(ack); handled { + return nil + } + // Perform a locked set of check inflight, err := wc.checkAck(ack) if err != nil { diff --git a/internal/events/websockets/websockets.go b/internal/events/websockets/websockets.go index a12a325dd..fe6dc9bf0 100644 --- a/internal/events/websockets/websockets.go +++ b/internal/events/websockets/websockets.go @@ -1,4 +1,4 @@ -// Copyright © 2023 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -54,9 +54,11 @@ func (ws *WebSockets) Name() string { return "websockets" } func (ws *WebSockets) Init(ctx context.Context, config config.Section) error { *ws = WebSockets{ - ctx: ctx, - connections: make(map[string]*websocketConnection), - capabilities: &events.Capabilities{}, + ctx: ctx, + connections: make(map[string]*websocketConnection), + capabilities: &events.Capabilities{ + BatchDelivery: true, + }, callbacks: callbacks{ handlers: make(map[string]events.Callbacks), }, @@ -242,6 +244,11 @@ func (ws *WebSockets) GetStatus() *core.WebSocketStatus { } func (ws *WebSockets) BatchDeliveryRequest(ctx context.Context, connID string, sub *core.Subscription, events []*core.CombinedEventDataDelivery) error { - // We should have rejected creation of the subscription, due to us not supporting this in our capabilities - return i18n.NewError(ctx, coremsgs.MsgBatchDeliveryNotSupported, ws.Name()) + ws.connMux.Lock() + conn, ok := ws.connections[connID] + ws.connMux.Unlock() + if !ok { + return i18n.NewError(ctx, coremsgs.MsgWSConnectionNotActive, connID) + } + return conn.dispatchBatch(sub, events) } diff --git a/internal/events/websockets/websockets_test.go b/internal/events/websockets/websockets_test.go index e2f837988..44fdf407d 100644 --- a/internal/events/websockets/websockets_test.go +++ b/internal/events/websockets/websockets_test.go @@ -236,6 +236,67 @@ func TestStartReceiveAckEphemeral(t *testing.T) { cbs.AssertExpectations(t) } +func TestAutoAckBatch(t *testing.T) { + log.SetLevel("trace") + + cbs := &eventsmocks.Callbacks{} + ws, wsc, cancel := newTestWebsockets(t, cbs, nil, "autoack=true", "ephemeral") + defer cancel() + var connID string + mes := cbs.On("EphemeralSubscription", + mock.MatchedBy(func(s string) bool { connID = s; return true }), + "ns1", mock.Anything, mock.MatchedBy(func(o *core.SubscriptionOptions) bool { + return *o.Batch + })).Return(nil) + ack := cbs.On("DeliveryResponse", + mock.MatchedBy(func(s string) bool { return s == connID }), + mock.Anything).Return(nil) + + waitSubscribed := make(chan struct{}) + mes.RunFn = func(a mock.Arguments) { + close(waitSubscribed) + } + + waitAcked := make(chan struct{}) + ack.RunFn = func(a mock.Arguments) { + close(waitAcked) + } + + err := wsc.Send(context.Background(), []byte(`{ + "type":"start", + "namespace":"ns1", + "ephemeral":true, + "options": { + "batch": true + } + }`)) + assert.NoError(t, err) + + <-waitSubscribed + sub := &core.Subscription{ + SubscriptionRef: core.SubscriptionRef{ID: fftypes.NewUUID(), Namespace: "ns1", Name: "sub1"}, + } + ws.BatchDeliveryRequest(ws.ctx, connID, sub, []*core.CombinedEventDataDelivery{ + {Event: &core.EventDelivery{ + EnrichedEvent: core.EnrichedEvent{ + Event: core.Event{ID: fftypes.NewUUID()}, + }, + Subscription: core.SubscriptionRef{ + ID: fftypes.NewUUID(), + Namespace: "ns1", + }, + }}, + }) + + b := <-wsc.Receive() + var res core.EventDelivery + err = json.Unmarshal(b, &res) + assert.NoError(t, err) + + <-waitAcked + cbs.AssertExpectations(t) +} + func TestStartReceiveDurable(t *testing.T) { cbs := &eventsmocks.Callbacks{} ws, wsc, cancel := newTestWebsockets(t, cbs, nil) @@ -316,6 +377,86 @@ func TestStartReceiveDurable(t *testing.T) { cbs.AssertExpectations(t) } +func TestStartReceiveDurableBatch(t *testing.T) { + cbs := &eventsmocks.Callbacks{} + ws, wsc, cancel := newTestWebsockets(t, cbs, nil) + defer cancel() + var connID string + mrg := cbs.On("RegisterConnection", + mock.MatchedBy(func(s string) bool { connID = s; return true }), + mock.MatchedBy(func(subMatch events.SubscriptionMatcher) bool { + return subMatch(core.SubscriptionRef{Namespace: "ns1", Name: "sub1"}) && + !subMatch(core.SubscriptionRef{Namespace: "ns2", Name: "sub1"}) && + !subMatch(core.SubscriptionRef{Namespace: "ns1", Name: "sub2"}) + }), + ).Return(nil) + ack := cbs.On("DeliveryResponse", + mock.MatchedBy(func(s string) bool { return s == connID }), + mock.Anything).Return(nil) + + waitSubscribed := make(chan struct{}) + mrg.RunFn = func(a mock.Arguments) { + close(waitSubscribed) + } + + acks := make(chan *core.EventDeliveryResponse) + ack.RunFn = func(a mock.Arguments) { + acks <- a[1].(*core.EventDeliveryResponse) + } + + err := wsc.Send(context.Background(), []byte(`{"type":"start","namespace":"ns1","name":"sub1"}`)) + assert.NoError(t, err) + + <-waitSubscribed + sub := &core.Subscription{ + SubscriptionRef: core.SubscriptionRef{ID: fftypes.NewUUID(), Namespace: "ns1", Name: "sub1"}, + } + event1ID := fftypes.NewUUID() + event2ID := fftypes.NewUUID() + ws.BatchDeliveryRequest(ws.ctx, connID, sub, []*core.CombinedEventDataDelivery{ + { + Event: &core.EventDelivery{ + EnrichedEvent: core.EnrichedEvent{ + Event: core.Event{ID: event1ID}, + }, + Subscription: sub.SubscriptionRef, + }, + }, + { + Event: &core.EventDelivery{ + EnrichedEvent: core.EnrichedEvent{ + Event: core.Event{ID: event2ID}, + }, + Subscription: sub.SubscriptionRef, + }, + }, + }) + + b := <-wsc.Receive() + var deliveredBatch core.WSEventBatch + err = json.Unmarshal(b, &deliveredBatch) + assert.NoError(t, err) + assert.Len(t, deliveredBatch.Events, 2) + assert.Equal(t, "ns1", deliveredBatch.Subscription.Namespace) + assert.Equal(t, "sub1", deliveredBatch.Subscription.Name) + err = wsc.Send(context.Background(), []byte(fmt.Sprintf(`{ + "type":"ack", + "id": "%s", + "subscription": { + "namespace": "ns1", + "name": "sub1" + } + }`, deliveredBatch.ID))) + assert.NoError(t, err) + + ack1 := <-acks + assert.Equal(t, *event1ID, *ack1.ID) + ack2 := <-acks + assert.Equal(t, *event2ID, *ack2.ID) + + cbs.AssertExpectations(t) +} + func TestStartReceiveDurableWithAuth(t *testing.T) { cbs := &eventsmocks.Callbacks{} ws, wsc, cancel := newTestWebsockets(t, cbs, &testAuthorizer{}) @@ -523,6 +664,30 @@ func TestHandleAckWithAutoAck(t *testing.T) { assert.Regexp(t, "FF10180", err) } +func TestHandleBatchNotMatch(t *testing.T) { + eventUUID := fftypes.NewUUID() + wsc := &websocketConnection{ + ctx: context.Background(), + started: []*websocketStartedSub{{WSStart: core.WSStart{ + Ephemeral: false, Name: "name1", Namespace: "ns1", + }}}, + sendMessages: make(chan interface{}, 1), + inflight: []*core.EventDeliveryResponse{ + {ID: eventUUID}, + }, + inflightBatches: []*core.WSEventBatch{ + {ID: fftypes.NewUUID()}, + }, + autoAck: true, + } + err := wsc.handleAck(&core.WSAck{ + ID: eventUUID, + }) + assert.Regexp(t, "FF10180", err) + assert.Len(t, wsc.inflight, 1) + assert.Len(t, wsc.inflightBatches, 1) +} + func TestHandleStartFlippingAutoAck(t *testing.T) { eventUUID := fftypes.NewUUID() wsc := &websocketConnection{ @@ -665,6 +830,16 @@ func TestConnectionDispatchAfterClose(t *testing.T) { assert.Regexp(t, "FF00147", err) } +func TestConnectionDispatchBatchAfterClose(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + cancel() + wsc := &websocketConnection{ + ctx: ctx, + } + err := wsc.dispatchBatch(&core.Subscription{}, []*core.CombinedEventDataDelivery{}) + assert.Regexp(t, "FF00147", err) +} + func TestWebsocketDispatchAfterClose(t *testing.T) { ws := &WebSockets{ ctx: context.Background(), @@ -674,6 +849,15 @@ func TestWebsocketDispatchAfterClose(t *testing.T) { assert.Regexp(t, "FF10173", err) } +func TestWebsocketBatchDispatchAfterClose(t *testing.T) { + ws := &WebSockets{ + ctx: context.Background(), + connections: make(map[string]*websocketConnection), + } + err := ws.BatchDeliveryRequest(ws.ctx, "gone", nil, []*core.CombinedEventDataDelivery{}) + assert.Regexp(t, "FF10173", err) +} + func TestDispatchAutoAck(t *testing.T) { cbs := &eventsmocks.Callbacks{} cbs.On("DeliveryResponse", mock.Anything, mock.Anything).Return(nil) @@ -826,21 +1010,6 @@ func TestNamespaceRestartedFailClose(t *testing.T) { mcb.AssertExpectations(t) } -func TestEventDeliveryBatchReturnsUnsupported(t *testing.T) { - cbs := &eventsmocks.Callbacks{} - ws, _, cancel := newTestWebsockets(t, cbs, nil) - defer cancel() - - sub := &core.Subscription{ - SubscriptionRef: core.SubscriptionRef{ - Namespace: "ns1", - }, - } - - err := ws.BatchDeliveryRequest(ws.ctx, "id", sub, []*core.CombinedEventDataDelivery{}) - assert.Regexp(t, "FF10461", err) -} - func TestNamespaceScopedSendWrongNamespaceStartAction(t *testing.T) { cbs := &eventsmocks.Callbacks{} _, wsc, cancel := newTestWebsocketsCommon(t, cbs, nil, "ns1") diff --git a/pkg/core/websocket_actions.go b/pkg/core/websocket_actions.go index d02fb9940..772d51344 100644 --- a/pkg/core/websocket_actions.go +++ b/pkg/core/websocket_actions.go @@ -1,4 +1,4 @@ -// Copyright © 2022 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -61,3 +61,11 @@ type WSError struct { Type WSClientPayloadType `ffstruct:"WSAck" json:"type" ffenum:"wstype"` Error string `ffstruct:"WSAck" json:"error"` } + +// WSEventBatch is used when batched delivery is enabled over the websocket, allowing +// an array of events to be ack'd as a whole (rather than ack'ing individually) +type WSEventBatch struct { + ID *fftypes.UUID `ffstruct:"WSEventBatch" json:"id"` + Subscription SubscriptionRef `ffstruct:"WSEventBatch" json:"subscription"` + Events []*EventDelivery `ffstruct:"WSEventBatch" json:"events"` +} From d1598d88299d1eb2620b9f790d07710619e5044f Mon Sep 17 00:00:00 2001 From: Peter Broadhurst Date: Mon, 15 Jan 2024 16:25:11 -0500 Subject: [PATCH 2/9] Change how we do defaults to ensure ephemeral subs with batch:true have readahead set Signed-off-by: Peter Broadhurst --- docs/reference/config.md | 3 +- internal/coreconfig/coreconfig.go | 11 ++- internal/coremsgs/en_config_descriptions.go | 7 +- internal/events/event_dispatcher.go | 13 ++-- internal/events/event_dispatcher_test.go | 21 ++---- internal/events/subscription_manager.go | 20 ++++- internal/events/subscription_manager_test.go | 24 ++++++ internal/events/websockets/config.go | 3 +- .../events/websockets/websocket_connection.go | 55 ++++++++++++-- internal/events/websockets/websockets_test.go | 75 ++++++++++++++++++- 10 files changed, 191 insertions(+), 41 deletions(-) diff --git a/docs/reference/config.md b/docs/reference/config.md index fbb937f57..85524723d 100644 --- a/docs/reference/config.md +++ b/docs/reference/config.md @@ -1387,7 +1387,8 @@ nav_order: 2 |Key|Description|Type|Default Value| |---|-----------|----|-------------| -|batchSize|Default read ahead to enable for subscriptions that do not explicitly configure readahead|`int`|`0` +|batchSize|Default read ahead to enable for subscriptions that do not explicitly configure readahead|`int`|`50` +|batchTimeout|Default batch timeout|`int`|`50ms` ## subscription.retry diff --git a/internal/coreconfig/coreconfig.go b/internal/coreconfig/coreconfig.go index 1a8e904ec..6c2f7812f 100644 --- a/internal/coreconfig/coreconfig.go +++ b/internal/coreconfig/coreconfig.go @@ -1,4 +1,4 @@ -// Copyright © 2023 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -327,8 +327,10 @@ var ( OrgDescription = ffc("org.description") // OrchestratorStartupAttempts is how many time to attempt to connect to core infrastructure on startup OrchestratorStartupAttempts = ffc("orchestrator.startupAttempts") - // SubscriptionDefaultsReadAhead default read ahead to enable for subscriptions that do not explicitly configure readahead - SubscriptionDefaultsReadAhead = ffc("subscription.defaults.batchSize") + // SubscriptionDefaultsBatchSize default read ahead to enable for subscriptions that do not explicitly configure readahead + SubscriptionDefaultsBatchSize = ffc("subscription.defaults.batchSize") + // SubscriptionDefaultsBatchTimeout default batch timeout + SubscriptionDefaultsBatchTimeout = ffc("subscription.defaults.batchTimeout") // SubscriptionMax maximum number of pre-defined subscriptions that can exist (note for high fan-out consider connecting a dedicated pub/sub broker to the dispatcher) SubscriptionMax = ffc("subscription.max") // SubscriptionsRetryInitialDelay is the initial retry delay @@ -451,7 +453,8 @@ func setDefaults() { viper.SetDefault(string(PrivateMessagingBatchSize), 200) viper.SetDefault(string(PrivateMessagingBatchTimeout), "1s") viper.SetDefault(string(PrivateMessagingBatchPayloadLimit), "800Kb") - viper.SetDefault(string(SubscriptionDefaultsReadAhead), 0) + viper.SetDefault(string(SubscriptionDefaultsBatchSize), 50) + viper.SetDefault(string(SubscriptionDefaultsBatchTimeout), "50ms") viper.SetDefault(string(SubscriptionMax), 500) viper.SetDefault(string(SubscriptionsRetryInitialDelay), "250ms") viper.SetDefault(string(SubscriptionsRetryMaxDelay), "30s") diff --git a/internal/coremsgs/en_config_descriptions.go b/internal/coremsgs/en_config_descriptions.go index a148b5b2b..b766b72b6 100644 --- a/internal/coremsgs/en_config_descriptions.go +++ b/internal/coremsgs/en_config_descriptions.go @@ -1,4 +1,4 @@ -// Copyright © 2023 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -383,8 +383,9 @@ var ( ConfigPluginSharedstorageIpfsGatewayURL = ffc("config.plugins.sharedstorage[].ipfs.gateway.url", "The URL for the IPFS Gateway", urlStringType) ConfigPluginSharedstorageIpfsGatewayProxyURL = ffc("config.plugins.sharedstorage[].ipfs.gateway.proxy.url", "Optional HTTP proxy server to use when connecting to the IPFS Gateway", urlStringType) - ConfigSubscriptionMax = ffc("config.subscription.max", "The maximum number of pre-defined subscriptions that can exist (note for high fan-out consider connecting a dedicated pub/sub broker to the dispatcher)", i18n.IntType) - ConfigSubscriptionDefaultsBatchSize = ffc("config.subscription.defaults.batchSize", "Default read ahead to enable for subscriptions that do not explicitly configure readahead", i18n.IntType) + ConfigSubscriptionMax = ffc("config.subscription.max", "The maximum number of pre-defined subscriptions that can exist (note for high fan-out consider connecting a dedicated pub/sub broker to the dispatcher)", i18n.IntType) + ConfigSubscriptionDefaultsBatchSize = ffc("config.subscription.defaults.batchSize", "Default read ahead to enable for subscriptions that do not explicitly configure readahead", i18n.IntType) + ConfigSubscriptionDefaultsBatchTimeout = ffc("config.subscription.defaults.batchTimeout", "Default batch timeout", i18n.IntType) ConfigTokensName = ffc("config.tokens[].name", "A name to identify this token plugin", i18n.StringType) ConfigTokensPlugin = ffc("config.tokens[].plugin", "The type of the token plugin to use", i18n.StringType) diff --git a/internal/events/event_dispatcher.go b/internal/events/event_dispatcher.go index a862d2dd5..9c2d453e6 100644 --- a/internal/events/event_dispatcher.go +++ b/internal/events/event_dispatcher.go @@ -1,4 +1,4 @@ -// Copyright © 2023 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -77,13 +77,10 @@ type eventDispatcher struct { func newEventDispatcher(ctx context.Context, enricher *eventEnricher, ei events.Plugin, di database.Plugin, dm data.Manager, bm broadcast.Manager, pm privatemessaging.Manager, connID string, sub *subscription, en *eventNotifier, txHelper txcommon.Helper) *eventDispatcher { ctx, cancelCtx := context.WithCancel(ctx) - readAhead := config.GetUint(coreconfig.SubscriptionDefaultsReadAhead) + readAhead := uint(0) if sub.definition.Options.ReadAhead != nil { readAhead = uint(*sub.definition.Options.ReadAhead) } - if readAhead > maxReadAhead { - readAhead = maxReadAhead - } batchTimeout := defaultBatchTimeout if sub.definition.Options.BatchTimeout != nil && *sub.definition.Options.BatchTimeout != "" { @@ -387,6 +384,10 @@ func (ed *eventDispatcher) handleAckOffsetUpdate(ack ackNack) { func (ed *eventDispatcher) deliverBatchedEvents() { withData := ed.subscription.definition.Options.WithData != nil && *ed.subscription.definition.Options.WithData + batchSize := 1 + if ed.readAhead > 1 { + batchSize = ed.readAhead + } var events []*core.CombinedEventDataDelivery var batchTimeoutContext context.Context var batchTimeoutCancel func() @@ -435,7 +436,7 @@ func (ed *eventDispatcher) deliverBatchedEvents() { return } - if len(events) == ed.readAhead || (timedOut && len(events) > 0) { + if len(events) == batchSize || (timedOut && len(events) > 0) { _ = ed.transport.BatchDeliveryRequest(ed.ctx, ed.connID, ed.subscription.definition, events) // If err handle all the delivery responses for all the events?? events = nil diff --git a/internal/events/event_dispatcher_test.go b/internal/events/event_dispatcher_test.go index 2f592c92d..1f50ef38c 100644 --- a/internal/events/event_dispatcher_test.go +++ b/internal/events/event_dispatcher_test.go @@ -23,7 +23,6 @@ import ( "testing" "time" - "github.com/hyperledger/firefly-common/pkg/config" "github.com/hyperledger/firefly-common/pkg/fftypes" "github.com/hyperledger/firefly-common/pkg/log" "github.com/hyperledger/firefly/internal/cache" @@ -131,20 +130,6 @@ func TestEventDispatcherStartStopBatched(t *testing.T) { ed.close() } -func TestMaxReadAhead(t *testing.T) { - config.Set(coreconfig.SubscriptionDefaultsReadAhead, 65537) - ed, cancel := newTestEventDispatcher(&subscription{ - dispatcherElection: make(chan bool, 1), - definition: &core.Subscription{ - SubscriptionRef: core.SubscriptionRef{Namespace: "ns1", Name: "sub1"}, - Ephemeral: true, - Options: core.SubscriptionOptions{}, - }, - }) - defer cancel() - assert.Equal(t, int(65536), ed.readAhead) -} - func TestEventDispatcherLeaderElection(t *testing.T) { log.SetLevel("debug") @@ -974,6 +959,9 @@ func TestBatchEventDeliveryClosed(t *testing.T) { ed, cancel := newTestEventDispatcher(sub) defer cancel() + mei := ed.transport.(*eventsmocks.Plugin) + mei.On("BatchDeliveryRequest", ed.ctx, mock.Anything, mock.Anything, mock.Anything).Return(nil) + ed.batchTimeout = 1 * time.Minute ed.eventDelivery <- &core.EventDelivery{} close(ed.eventDelivery) @@ -1293,6 +1281,9 @@ func TestBatchDeliverEventsWithDataFail(t *testing.T) { mdm := ed.data.(*datamocks.Manager) mdm.On("GetMessageDataCached", ed.ctx, mock.Anything).Return(nil, false, fmt.Errorf("pop")) + mei := ed.transport.(*eventsmocks.Plugin) + mei.On("BatchDeliveryRequest", ed.ctx, mock.Anything, mock.Anything, mock.Anything).Return(nil) + id1 := fftypes.NewUUID() ed.eventDelivery <- &core.EventDelivery{ EnrichedEvent: core.EnrichedEvent{ diff --git a/internal/events/subscription_manager.go b/internal/events/subscription_manager.go index 788e72ec3..928e3789e 100644 --- a/internal/events/subscription_manager.go +++ b/internal/events/subscription_manager.go @@ -1,4 +1,4 @@ -// Copyright © 2023 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -20,6 +20,7 @@ import ( "context" "regexp" "sync" + "time" "github.com/hyperledger/firefly-common/pkg/config" "github.com/hyperledger/firefly-common/pkg/fftypes" @@ -90,6 +91,9 @@ type subscriptionManager struct { newOrUpdatedSubscriptions chan *fftypes.UUID deletedSubscriptions chan *fftypes.UUID retry retry.Retry + + defaultBatchSize uint16 + defaultBatchTimeout time.Duration } func newSubscriptionManager(ctx context.Context, ns *core.Namespace, enricher *eventEnricher, di database.Plugin, dm data.Manager, en *eventNotifier, bm broadcast.Manager, pm privatemessaging.Manager, txHelper txcommon.Helper, transports map[string]events.Plugin) (*subscriptionManager, error) { @@ -116,6 +120,8 @@ func newSubscriptionManager(ctx context.Context, ns *core.Namespace, enricher *e MaximumDelay: config.GetDuration(coreconfig.SubscriptionsRetryMaxDelay), Factor: config.GetFloat64(coreconfig.SubscriptionsRetryFactor), }, + defaultBatchSize: uint16(config.GetInt(coreconfig.SubscriptionDefaultsBatchSize)), + defaultBatchTimeout: config.GetDuration(coreconfig.SubscriptionDefaultsBatchTimeout), } for _, ei := range sm.transports { @@ -270,6 +276,18 @@ func (sm *subscriptionManager) parseSubscriptionDef(ctx context.Context, subDef subDef.Options.TLSConfig = sm.namespace.TLSConfigs[subDef.Options.TLSConfigName] } + // Defaults that only apply in batch mode + if subDef.Options.Batch != nil && *subDef.Options.Batch { + if subDef.Options.ReadAhead == nil || *subDef.Options.ReadAhead == 0 { + defaultBatchSize := sm.defaultBatchSize + subDef.Options.ReadAhead = &defaultBatchSize + } + if subDef.Options.BatchTimeout == nil || *subDef.Options.BatchTimeout == "" { + defaultBatchTimeout := sm.defaultBatchTimeout.String() + subDef.Options.BatchTimeout = &defaultBatchTimeout + } + } + if err := transport.ValidateOptions(ctx, &subDef.Options); err != nil { return nil, err } diff --git a/internal/events/subscription_manager_test.go b/internal/events/subscription_manager_test.go index d72b8d111..ef509c705 100644 --- a/internal/events/subscription_manager_test.go +++ b/internal/events/subscription_manager_test.go @@ -551,6 +551,30 @@ func TestCreateSubscriptionSuccessTLSConfig(t *testing.T) { assert.NotNil(t, sub.definition.Options.TLSConfig) } +func TestCreateSubscriptionSuccessBatch(t *testing.T) { + coreconfig.Reset() + + mei := &eventsmocks.Plugin{} + sm, cancel := newTestSubManager(t, mei) + defer cancel() + + mei.On("GetFFRestyConfig", mock.Anything).Return(&ffresty.Config{}) + mei.On("ValidateOptions", mock.Anything, mock.Anything).Return(nil) + truthy := true + sub, err := sm.parseSubscriptionDef(sm.ctx, &core.Subscription{ + Options: core.SubscriptionOptions{ + SubscriptionCoreOptions: core.SubscriptionCoreOptions{ + Batch: &truthy, + }, + }, + Transport: "ut", + }) + assert.NoError(t, err) + + assert.Equal(t, uint16(50), *sub.definition.Options.ReadAhead) + assert.Equal(t, "50ms", *sub.definition.Options.BatchTimeout) +} + func TestCreateSubscriptionWithDeprecatedFilters(t *testing.T) { mei := &eventsmocks.Plugin{} sm, cancel := newTestSubManager(t, mei) diff --git a/internal/events/websockets/config.go b/internal/events/websockets/config.go index 8ce482a87..c5d8df776 100644 --- a/internal/events/websockets/config.go +++ b/internal/events/websockets/config.go @@ -1,4 +1,4 @@ -// Copyright © 2022 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -32,5 +32,4 @@ const ( func (ws *WebSockets) InitConfig(config config.Section) { config.AddKnownKey(ReadBufferSize, bufferSizeDefault) config.AddKnownKey(WriteBufferSize, bufferSizeDefault) - } diff --git a/internal/events/websockets/websocket_connection.go b/internal/events/websockets/websocket_connection.go index b41945a0f..29c66c17e 100644 --- a/internal/events/websockets/websocket_connection.go +++ b/internal/events/websockets/websocket_connection.go @@ -21,6 +21,8 @@ import ( "encoding/json" "io" "net/http" + "net/url" + "strconv" "sync" "time" @@ -95,21 +97,45 @@ func (wc *websocketConnection) assertNamespace(namespace string) (string, error) return namespace, nil } +func isBoolQuerySet(query url.Values, boolOption string) bool { + optionValues, hasOptionValues := query[boolOption] + return hasOptionValues && (len(optionValues) == 0 || optionValues[0] != "false") +} + +func (wc *websocketConnection) getReadAhead(query url.Values, isBatch bool) *uint16 { + readaheadStr := query.Get("readahead") + if readaheadStr != "" { + readAheadInt, err := strconv.ParseUint(readaheadStr, 10, 16) + if err == nil { + readahead := uint16(readAheadInt) + return &readahead + } + } + return nil +} + +func (wc *websocketConnection) getBatchTimeout(query url.Values) *string { + batchTimeout := query.Get("batchtimeout") + if batchTimeout != "" { + return &batchTimeout + } + return nil +} + // processAutoStart gives a helper to specify query parameters to auto-start your subscription func (wc *websocketConnection) processAutoStart(req *http.Request) { query := req.URL.Query() - ephemeral, hasEphemeral := req.URL.Query()["ephemeral"] - isEphemeral := hasEphemeral && (len(ephemeral) == 0 || ephemeral[0] != "false") + isEphemeral := isBoolQuerySet(query, "ephemeral") _, hasName := query["name"] - autoAck, hasAutoack := req.URL.Query()["autoack"] - isAutoack := hasAutoack && (len(autoAck) == 0 || autoAck[0] != "false") + isAutoack := isBoolQuerySet(query, "autoack") namespace, err := wc.assertNamespace(query.Get("namespace")) if err != nil { wc.protocolError(err) return } - if hasEphemeral || hasName { + if isEphemeral || hasName { + isBatch := isBoolQuerySet(query, "batch") filter := core.NewSubscriptionFilterFromQuery(query) err := wc.handleStart(&core.WSStart{ AutoAck: &isAutoack, @@ -117,6 +143,13 @@ func (wc *websocketConnection) processAutoStart(req *http.Request) { Namespace: namespace, Name: query.Get("name"), Filter: filter, + Options: core.SubscriptionOptions{ + SubscriptionCoreOptions: core.SubscriptionCoreOptions{ + Batch: &isBatch, + BatchTimeout: wc.getBatchTimeout(query), + ReadAhead: wc.getReadAhead(query, isBatch), + }, + }, }) if err != nil { wc.protocolError(err) @@ -234,11 +267,17 @@ func (wc *websocketConnection) dispatch(event *core.EventDelivery) error { func (wc *websocketConnection) dispatchBatch(sub *core.Subscription, events []*core.CombinedEventDataDelivery) error { inflightBatch := &core.WSEventBatch{ - ID: fftypes.NewUUID(), - Subscription: sub.SubscriptionRef, - Events: make([]*core.EventDelivery, len(events)), + ID: fftypes.NewUUID(), + Events: make([]*core.EventDelivery, len(events)), + } + if sub != nil { + inflightBatch.Subscription = sub.SubscriptionRef } for i, e := range events { + // For ephemeral there's no sub, so we pick up from first event + if inflightBatch.Subscription.Namespace == "" { + inflightBatch.Subscription = e.Event.Subscription + } inflightBatch.Events[i] = e.Event } diff --git a/internal/events/websockets/websockets_test.go b/internal/events/websockets/websockets_test.go index 44fdf407d..30c409491 100644 --- a/internal/events/websockets/websockets_test.go +++ b/internal/events/websockets/websockets_test.go @@ -240,7 +240,7 @@ func TestAutoAckBatch(t *testing.T) { log.SetLevel("trace") cbs := &eventsmocks.Callbacks{} - ws, wsc, cancel := newTestWebsockets(t, cbs, nil, "autoack=true", "ephemeral") + ws, wsc, cancel := newTestWebsockets(t, cbs, nil, "autoack=true") defer cancel() var connID string mes := cbs.On("EphemeralSubscription", @@ -266,6 +266,7 @@ func TestAutoAckBatch(t *testing.T) { "type":"start", "namespace":"ns1", "ephemeral":true, + "autoack": true, "options": { "batch": true } @@ -619,6 +620,55 @@ func TestAutoStartReceiveAckEphemeral(t *testing.T) { cbs.AssertExpectations(t) } +func TestAutoStartReceiveAckBatchEphemeral(t *testing.T) { + var connID string + cbs := &eventsmocks.Callbacks{} + sub := cbs.On("EphemeralSubscription", + mock.MatchedBy(func(s string) bool { connID = s; return true }), + "ns1", mock.Anything, mock.Anything).Return(nil) + ack := cbs.On("DeliveryResponse", + mock.MatchedBy(func(s string) bool { return s == connID }), + mock.Anything).Return(nil) + + waitSubscribed := make(chan struct{}) + sub.RunFn = func(a mock.Arguments) { + close(waitSubscribed) + } + + waitAcked := make(chan struct{}) + ack.RunFn = func(a mock.Arguments) { + close(waitAcked) + } + + ws, wsc, cancel := newTestWebsockets(t, cbs, nil, "ephemeral", "namespace=ns1", "batch") + defer cancel() + + <-waitSubscribed + ws.BatchDeliveryRequest(ws.ctx, connID, nil, []*core.CombinedEventDataDelivery{ + {Event: &core.EventDelivery{ + EnrichedEvent: core.EnrichedEvent{ + Event: core.Event{ID: fftypes.NewUUID()}, + }, + Subscription: core.SubscriptionRef{ + ID: fftypes.NewUUID(), + Namespace: "ns1", + }, + }}, + }) + + b := <-wsc.Receive() + var deliveredBatch core.WSEventBatch + err := json.Unmarshal(b, &deliveredBatch) + assert.NoError(t, err) + assert.Len(t, deliveredBatch.Events, 1) + + err = wsc.Send(context.Background(), []byte(`{"type":"ack", "id": "`+deliveredBatch.ID.String()+`"}`)) + assert.NoError(t, err) + + <-waitAcked + cbs.AssertExpectations(t) +} + func TestAutoStartBadOptions(t *testing.T) { cbs := &eventsmocks.Callbacks{} _, wsc, cancel := newTestWebsockets(t, cbs, nil, "name=missingnamespace") @@ -632,6 +682,29 @@ func TestAutoStartBadOptions(t *testing.T) { cbs.AssertExpectations(t) } +func TestAutoStartCustomReadAheadBatch(t *testing.T) { + cbs := &eventsmocks.Callbacks{} + + subscribedConn := make(chan string, 1) + cbs.On("EphemeralSubscription", + mock.MatchedBy(func(s string) bool { + subscribedConn <- s + return true + }), + "ns1", + mock.Anything, + mock.MatchedBy(func(o *core.SubscriptionOptions) bool { + return *o.ReadAhead == 42 && *o.BatchTimeout == "1s" + }), + ).Return(nil) + + _, _, cancel := newTestWebsockets(t, cbs, nil, "namespace=ns1", "ephemeral", "batch", "batchtimeout=1s", "readahead=42") + defer cancel() + + <-subscribedConn + +} + func TestAutoStartBadNamespace(t *testing.T) { cbs := &eventsmocks.Callbacks{} _, wsc, cancel := newTestWebsockets(t, cbs, nil, "ephemeral", "namespace=ns2") From 1c5f7e6fe9004052770fb722bbdc8a656cecba1d Mon Sep 17 00:00:00 2001 From: Peter Broadhurst Date: Tue, 16 Jan 2024 11:36:04 -0500 Subject: [PATCH 3/9] Push batch delivery semantics back up to the event poller Signed-off-by: Peter Broadhurst --- internal/events/event_dispatcher.go | 139 +++++++--------- internal/events/event_dispatcher_test.go | 194 ++--------------------- internal/events/event_poller.go | 9 +- 3 files changed, 73 insertions(+), 269 deletions(-) diff --git a/internal/events/event_dispatcher.go b/internal/events/event_dispatcher.go index 9c2d453e6..f667336d6 100644 --- a/internal/events/event_dispatcher.go +++ b/internal/events/event_dispatcher.go @@ -65,12 +65,11 @@ type eventDispatcher struct { elected bool eventPoller *eventPoller inflight map[fftypes.UUID]*core.Event - eventDelivery chan *core.EventDelivery + eventDelivery chan []*core.EventDelivery mux sync.Mutex namespace string readAhead int batch bool - batchTimeout time.Duration subscription *subscription txHelper txcommon.Helper } @@ -105,13 +104,12 @@ func newEventDispatcher(ctx context.Context, enricher *eventEnricher, ei events. subscription: sub, namespace: sub.definition.Namespace, inflight: make(map[fftypes.UUID]*core.Event), - eventDelivery: make(chan *core.EventDelivery, readAhead+1), + eventDelivery: make(chan []*core.EventDelivery, readAhead+1), readAhead: int(readAhead), acksNacks: make(chan ackNack), closed: make(chan struct{}), txHelper: txHelper, batch: batch, - batchTimeout: batchTimeout, } pollerConf := &eventPollerConf{ @@ -135,6 +133,20 @@ func newEventDispatcher(ctx context.Context, enricher *eventEnricher, ei events. firstEvent: sub.definition.Options.FirstEvent, } + // Users can tune the batch related settings. + // This is always true in batch:true cases, and optionally you can use the batchTimeout setting + // to tweak how we optimize ourselves for readahead / latency detection without batching + // (most likely users with this requirement would be best to just move to batch:true). + if batchTimeout > 0 { + pollerConf.eventBatchTimeout = batchTimeout + if batchTimeout > pollerConf.eventPollTimeout { + pollerConf.eventPollTimeout = batchTimeout + } + } + if batch || pollerConf.eventBatchSize < int(readAhead) { + pollerConf.eventBatchSize = ed.readAhead + } + ed.eventPoller = newEventPoller(ctx, di, en, pollerConf) return ed } @@ -162,11 +174,8 @@ func (ed *eventDispatcher) electAndStart() { ed.elected = true ed.eventPoller.start() - if ed.batch { - go ed.deliverBatchedEvents() - } else { - go ed.deliverEvents() - } + go ed.deliverEvents() + // Wait until the event poller closes <-ed.eventPoller.closed } @@ -323,7 +332,15 @@ func (ed *eventDispatcher) bufferedDelivery(events []core.LocallySequenced) (boo ed.mux.Unlock() dispatched++ - ed.eventDelivery <- event + if !ed.batch { + // dispatch individually + ed.eventDelivery <- []*core.EventDelivery{event} + } + } + + if ed.batch && len(dispatchable) > 0 { + // Dispatch the whole batch now marked in-flight + ed.eventDelivery <- dispatchable } if inflightCount == 0 { @@ -381,92 +398,48 @@ func (ed *eventDispatcher) handleAckOffsetUpdate(ack ackNack) { } } -func (ed *eventDispatcher) deliverBatchedEvents() { - withData := ed.subscription.definition.Options.WithData != nil && *ed.subscription.definition.Options.WithData - - batchSize := 1 - if ed.readAhead > 1 { - batchSize = ed.readAhead - } - var events []*core.CombinedEventDataDelivery - var batchTimeoutContext context.Context - var batchTimeoutCancel func() - for { - var timeoutContext context.Context - var timedOut bool - if batchTimeoutContext != nil { - timeoutContext = batchTimeoutContext - } else { - timeoutContext = ed.ctx - } - select { - case event, ok := <-ed.eventDelivery: - if !ok { - if batchTimeoutCancel != nil { - batchTimeoutCancel() - } - return - } - - if events == nil { - events = []*core.CombinedEventDataDelivery{} - batchTimeoutContext, batchTimeoutCancel = context.WithTimeout(ed.ctx, ed.batchTimeout) - } - - log.L(ed.ctx).Debugf("Dispatching %s event in a batch: %.10d/%s [%s]: ref=%s/%s", ed.transport.Name(), event.Sequence, event.ID, event.Type, event.Namespace, event.Reference) - - var data []*core.Data - var err error - if withData && event.Message != nil { - data, _, err = ed.data.GetMessageDataCached(ed.ctx, event.Message) - } - - events = append(events, &core.CombinedEventDataDelivery{Event: event, Data: data}) - - if err != nil { - ed.deliveryResponse(&core.EventDeliveryResponse{ID: event.ID, Rejected: true}) - } - - case <-timeoutContext.Done(): - timedOut = true - case <-ed.ctx.Done(): - if batchTimeoutCancel != nil { - batchTimeoutCancel() - } - return - } - - if len(events) == batchSize || (timedOut && len(events) > 0) { - _ = ed.transport.BatchDeliveryRequest(ed.ctx, ed.connID, ed.subscription.definition, events) - // If err handle all the delivery responses for all the events?? - events = nil - } - } -} - // TODO issue here, we can't just call DeliveryRequest with one thing. func (ed *eventDispatcher) deliverEvents() { withData := ed.subscription.definition.Options.WithData != nil && *ed.subscription.definition.Options.WithData for { select { - case event, ok := <-ed.eventDelivery: + case events, ok := <-ed.eventDelivery: if !ok { return } - log.L(ed.ctx).Debugf("Dispatching %s event: %.10d/%s [%s]: ref=%s/%s", ed.transport.Name(), event.Sequence, event.ID, event.Type, event.Namespace, event.Reference) - var data []*core.Data + // As soon as we hit an error, we need to trigger into nack mode var err error - if withData && event.Message != nil { - data, _, err = ed.data.GetMessageDataCached(ed.ctx, event.Message) + eventsWithData := make([]*core.CombinedEventDataDelivery, len(events)) + for i := 0; i < len(events) && err == nil; i++ { + e := &core.CombinedEventDataDelivery{ + Event: events[i], + } + eventsWithData[i] = e + log.L(ed.ctx).Debugf("Dispatching %s event: %.10d/%s [%s]: ref=%s/%s", ed.transport.Name(), e.Event.Sequence, e.Event.ID, e.Event.Type, e.Event.Namespace, e.Event.Reference) + if withData && e.Event.Message != nil { + e.Data, _, err = ed.data.GetMessageDataCached(ed.ctx, e.Event.Message) + } + // Individual events (in reality there is only ever i==0 for this case) + if err == nil && !ed.batch { + err = ed.transport.DeliveryRequest(ed.ctx, ed.connID, ed.subscription.definition, e.Event, e.Data) + } + if err != nil { + ed.deliveryResponse(&core.EventDeliveryResponse{ID: e.Event.ID, Rejected: true}) + } } - if err == nil { - err = ed.transport.DeliveryRequest(ed.ctx, ed.connID, ed.subscription.definition, event, data) - } - if err != nil { - ed.deliveryResponse(&core.EventDeliveryResponse{ID: event.ID, Rejected: true}) + // In batch mode we do one dispatch of the whole set as one + if err == nil && ed.batch { + err = ed.transport.BatchDeliveryRequest(ed.ctx, ed.connID, ed.subscription.definition, eventsWithData) + if err != nil { + // nack everything on behalf of the failed delivery + for _, e := range events { + ed.deliveryResponse(&core.EventDeliveryResponse{ID: e.Event.ID, Rejected: true}) + } + } } + case <-ed.ctx.Done(): return } diff --git a/internal/events/event_dispatcher_test.go b/internal/events/event_dispatcher_test.go index 1f50ef38c..d1cd02b23 100644 --- a/internal/events/event_dispatcher_test.go +++ b/internal/events/event_dispatcher_test.go @@ -951,24 +951,6 @@ func TestEventDeliveryClosed(t *testing.T) { cancel() } -func TestBatchEventDeliveryClosed(t *testing.T) { - - sub := &subscription{ - definition: &core.Subscription{}, - } - ed, cancel := newTestEventDispatcher(sub) - defer cancel() - - mei := ed.transport.(*eventsmocks.Plugin) - mei.On("BatchDeliveryRequest", ed.ctx, mock.Anything, mock.Anything, mock.Anything).Return(nil) - - ed.batchTimeout = 1 * time.Minute - ed.eventDelivery <- &core.EventDelivery{} - close(ed.eventDelivery) - - ed.deliverBatchedEvents() -} - func TestAckClosed(t *testing.T) { sub := &subscription{ @@ -1027,17 +1009,19 @@ func TestDeliverEventsWithDataFail(t *testing.T) { mdm.On("GetMessageDataCached", ed.ctx, mock.Anything).Return(nil, false, fmt.Errorf("pop")) id1 := fftypes.NewUUID() - ed.eventDelivery <- &core.EventDelivery{ - EnrichedEvent: core.EnrichedEvent{ - Event: core.Event{ - ID: id1, - }, - Message: &core.Message{ - Header: core.MessageHeader{ - ID: fftypes.NewUUID(), + ed.eventDelivery <- []*core.EventDelivery{ + { + EnrichedEvent: core.EnrichedEvent{ + Event: core.Event{ + ID: id1, }, - Data: core.DataRefs{ - {ID: fftypes.NewUUID()}, + Message: &core.Message{ + Header: core.MessageHeader{ + ID: fftypes.NewUUID(), + }, + Data: core.DataRefs{ + {ID: fftypes.NewUUID()}, + }, }, }, }, @@ -1154,157 +1138,3 @@ func TestEventDeliveryBatch(t *testing.T) { mbm.AssertExpectations(t) mms.AssertExpectations(t) } - -func TestEventDispatcherBatchReadAhead(t *testing.T) { - log.SetLevel("debug") - var five = uint16(5) - subID := fftypes.NewUUID() - truthy := true - oneSec := "1s" - sub := &subscription{ - dispatcherElection: make(chan bool, 1), - definition: &core.Subscription{ - SubscriptionRef: core.SubscriptionRef{ID: subID, Namespace: "ns1", Name: "sub1"}, - Options: core.SubscriptionOptions{ - SubscriptionCoreOptions: core.SubscriptionCoreOptions{ - ReadAhead: &five, - Batch: &truthy, - BatchTimeout: &oneSec, - }, - }, - }, - eventMatcher: regexp.MustCompile(fmt.Sprintf("^%s|%s$", core.EventTypeMessageConfirmed, core.EventTypeMessageConfirmed)), - } - - ed, cancel := newTestEventDispatcher(sub) - defer cancel() - go ed.deliverBatchedEvents() - ed.eventPoller.offsetCommitted = make(chan int64, 3) - mdi := ed.database.(*databasemocks.Plugin) - mei := ed.transport.(*eventsmocks.Plugin) - mdm := ed.data.(*datamocks.Manager) - - eventDeliveries := make(chan *core.EventDelivery) - deliveryRequestMock := mei.On("BatchDeliveryRequest", ed.ctx, mock.Anything, mock.Anything, mock.Anything).Return(nil) - deliveryRequestMock.RunFn = func(a mock.Arguments) { - batchEvents := a.Get(3).([]*core.CombinedEventDataDelivery) - for _, event := range batchEvents { - eventDeliveries <- event.Event - } - } - - // Setup the IDs - ref1 := fftypes.NewUUID() - ev1 := fftypes.NewUUID() - ref2 := fftypes.NewUUID() - ev2 := fftypes.NewUUID() - ref3 := fftypes.NewUUID() - ev3 := fftypes.NewUUID() - ref4 := fftypes.NewUUID() - ev4 := fftypes.NewUUID() - - // Setup enrichment - mdm.On("GetMessageWithDataCached", mock.Anything, ref1).Return(&core.Message{ - Header: core.MessageHeader{ID: ref1}, - }, nil, true, nil) - mdm.On("GetMessageWithDataCached", mock.Anything, ref2).Return(&core.Message{ - Header: core.MessageHeader{ID: ref2}, - }, nil, true, nil) - mdm.On("GetMessageWithDataCached", mock.Anything, ref3).Return(&core.Message{ - Header: core.MessageHeader{ID: ref3}, - }, nil, true, nil) - mdm.On("GetMessageWithDataCached", mock.Anything, ref4).Return(&core.Message{ - Header: core.MessageHeader{ID: ref4}, - }, nil, true, nil) - - // Deliver a batch of messages - batch1Done := make(chan struct{}) - go func() { - repoll, err := ed.bufferedDelivery([]core.LocallySequenced{ - &core.Event{ID: ev1, Sequence: 10000001, Reference: ref1, Type: core.EventTypeMessageConfirmed}, // match - &core.Event{ID: ev2, Sequence: 10000002, Reference: ref2, Type: core.EventTypeMessageRejected}, - &core.Event{ID: ev3, Sequence: 10000003, Reference: ref3, Type: core.EventTypeMessageConfirmed}, // match - &core.Event{ID: ev4, Sequence: 10000004, Reference: ref4, Type: core.EventTypeMessageConfirmed}, // match - }) - assert.NoError(t, err) - assert.True(t, repoll) - close(batch1Done) - }() - - // Wait for the two calls to deliver the matching messages to the client (read ahead allows this) - event1 := <-eventDeliveries - assert.Equal(t, *ev1, *event1.ID) - assert.Equal(t, *ref1, *event1.Message.Header.ID) - event3 := <-eventDeliveries - assert.Equal(t, *ev3, *event3.ID) - assert.Equal(t, *ref3, *event3.Message.Header.ID) - event4 := <-eventDeliveries - assert.Equal(t, *ev4, *event4.ID) - assert.Equal(t, *ref4, *event4.Message.Header.ID) - - // Send back the two acks - out of order to validate the read-ahead logic - go func() { - ed.deliveryResponse(&core.EventDeliveryResponse{ID: event4.ID}) - ed.deliveryResponse(&core.EventDeliveryResponse{ID: event1.ID}) - ed.deliveryResponse(&core.EventDeliveryResponse{ID: event3.ID}) - }() - - // Confirm we get the offset updates in the correct order, even though the confirmations - // came in a different order from the app. - assert.Equal(t, int64(10000001), <-ed.eventPoller.offsetCommitted) - assert.Equal(t, int64(10000003), <-ed.eventPoller.offsetCommitted) - assert.Equal(t, int64(10000004), <-ed.eventPoller.offsetCommitted) - - // This should complete the batch - <-batch1Done - - mdi.AssertExpectations(t) - mei.AssertExpectations(t) - mdm.AssertExpectations(t) -} - -func TestBatchDeliverEventsWithDataFail(t *testing.T) { - yes := true - sub := &subscription{ - definition: &core.Subscription{ - Options: core.SubscriptionOptions{ - SubscriptionCoreOptions: core.SubscriptionCoreOptions{ - WithData: &yes, - }, - }, - }, - } - - ed, cancel := newTestEventDispatcher(sub) - defer cancel() - - mdm := ed.data.(*datamocks.Manager) - mdm.On("GetMessageDataCached", ed.ctx, mock.Anything).Return(nil, false, fmt.Errorf("pop")) - - mei := ed.transport.(*eventsmocks.Plugin) - mei.On("BatchDeliveryRequest", ed.ctx, mock.Anything, mock.Anything, mock.Anything).Return(nil) - - id1 := fftypes.NewUUID() - ed.eventDelivery <- &core.EventDelivery{ - EnrichedEvent: core.EnrichedEvent{ - Event: core.Event{ - ID: id1, - }, - Message: &core.Message{ - Header: core.MessageHeader{ - ID: fftypes.NewUUID(), - }, - Data: core.DataRefs{ - {ID: fftypes.NewUUID()}, - }, - }, - }, - } - - ed.inflight[*id1] = &core.Event{ID: id1} - go ed.deliverBatchedEvents() - - an := <-ed.acksNacks - assert.True(t, an.isNack) - -} diff --git a/internal/events/event_poller.go b/internal/events/event_poller.go index 9d876b6c8..065b4ed8c 100644 --- a/internal/events/event_poller.go +++ b/internal/events/event_poller.go @@ -1,4 +1,4 @@ -// Copyright © 2023 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -289,18 +289,19 @@ func (ep *eventPoller) waitForShoulderTapOrPollTimeout(lastEventCount int) bool return true } - // For throughput optimized environments, we can set an eventBatchingTimeout to allow messages to arrive - // between polling cycles (at the cost of some dispatch latency) + // For throughput optimized environments, we can set an eventBatchingTimeout to allow + // dispatching of incomplete batches at a shorter timeout than the + // long timeout between polling cycles (at the cost of some dispatch latency) if ep.conf.eventBatchTimeout > 0 && lastEventCount > 0 { shortTimeout := time.NewTimer(ep.conf.eventBatchTimeout) select { case <-shortTimeout.C: l.Tracef("Woken after batch timeout") + return true case <-ep.ctx.Done(): l.Debugf("Exiting due to cancelled context") return false } - longTimeoutDuration -= ep.conf.eventBatchTimeout } longTimeout := time.NewTimer(longTimeoutDuration) From 011dda37a5f718fbd75c176b9fb8a4bad8bde7d6 Mon Sep 17 00:00:00 2001 From: Peter Broadhurst Date: Tue, 16 Jan 2024 11:56:48 -0500 Subject: [PATCH 4/9] Honor batch timeout correctly in poller, and set default to 0 as it was previously inert Signed-off-by: Peter Broadhurst --- internal/coreconfig/coreconfig.go | 4 +-- internal/events/event_poller.go | 39 +++++++++++++++++----------- internal/events/event_poller_test.go | 2 +- 3 files changed, 27 insertions(+), 18 deletions(-) diff --git a/internal/coreconfig/coreconfig.go b/internal/coreconfig/coreconfig.go index 6c2f7812f..6ae2b8a78 100644 --- a/internal/coreconfig/coreconfig.go +++ b/internal/coreconfig/coreconfig.go @@ -406,7 +406,7 @@ func setDefaults() { viper.SetDefault(string(DownloadRetryFactor), 2.0) viper.SetDefault(string(EventAggregatorFirstEvent), core.SubOptsFirstEventOldest) viper.SetDefault(string(EventAggregatorBatchSize), 200) - viper.SetDefault(string(EventAggregatorBatchTimeout), "250ms") + viper.SetDefault(string(EventAggregatorBatchTimeout), "0ms") viper.SetDefault(string(EventAggregatorPollTimeout), "30s") viper.SetDefault(string(EventAggregatorRewindTimeout), "50ms") viper.SetDefault(string(EventAggregatorRewindQueueLength), 10) @@ -416,7 +416,7 @@ func setDefaults() { viper.SetDefault(string(EventAggregatorRetryMaxDelay), "30s") viper.SetDefault(string(EventDBEventsBufferSize), 100) viper.SetDefault(string(EventDispatcherBufferLength), 5) - viper.SetDefault(string(EventDispatcherBatchTimeout), "250ms") + viper.SetDefault(string(EventDispatcherBatchTimeout), "0ms") viper.SetDefault(string(EventDispatcherPollTimeout), "30s") viper.SetDefault(string(EventTransportsEnabled), []string{"websockets", "webhooks"}) viper.SetDefault(string(EventTransportsDefault), "websockets") diff --git a/internal/events/event_poller.go b/internal/events/event_poller.go index 065b4ed8c..cd509cc20 100644 --- a/internal/events/event_poller.go +++ b/internal/events/event_poller.go @@ -196,7 +196,12 @@ func (ep *eventPoller) eventLoop() { close(ep.offsetCommitted) }() + doBatchDelay := false for { + if doBatchDelay { + ep.waitForBatchTimeout() + } + // Read messages from the DB - in an error condition we retry until success, or a closed context events, err := ep.readPage() if err != nil { @@ -205,6 +210,15 @@ func (ep *eventPoller) eventLoop() { } eventCount := len(events) + + // We might want to wait for the batch to fill - so we delay and re-poll + if ep.conf.eventBatchTimeout > 0 && !doBatchDelay && eventCount < ep.conf.eventBatchSize { + doBatchDelay = true + l.Tracef("Batch delay: detected=%d, batchSize=%d batchTimeout=%s", eventCount, ep.conf.eventBatchSize, ep.conf.eventBatchTimeout) + continue + } + doBatchDelay = false // clear any batch delay for next iteration + repoll := false if eventCount > 0 { // We process all the events in the page in a single database run group, and @@ -280,6 +294,16 @@ func (ep *eventPoller) shoulderTap() { } } +func (ep *eventPoller) waitForBatchTimeout() { + // For throughput optimized environments, we can set an eventBatchingTimeout to allow + // dispatching of incomplete batches at a shorter timeout than the + // long timeout between polling cycles (at the cost of some dispatch latency) + select { + case <-time.After(ep.conf.eventBatchTimeout): + case <-ep.ctx.Done(): + } +} + func (ep *eventPoller) waitForShoulderTapOrPollTimeout(lastEventCount int) bool { l := log.L(ep.ctx) longTimeoutDuration := ep.conf.eventPollTimeout @@ -289,21 +313,6 @@ func (ep *eventPoller) waitForShoulderTapOrPollTimeout(lastEventCount int) bool return true } - // For throughput optimized environments, we can set an eventBatchingTimeout to allow - // dispatching of incomplete batches at a shorter timeout than the - // long timeout between polling cycles (at the cost of some dispatch latency) - if ep.conf.eventBatchTimeout > 0 && lastEventCount > 0 { - shortTimeout := time.NewTimer(ep.conf.eventBatchTimeout) - select { - case <-shortTimeout.C: - l.Tracef("Woken after batch timeout") - return true - case <-ep.ctx.Done(): - l.Debugf("Exiting due to cancelled context") - return false - } - } - longTimeout := time.NewTimer(longTimeoutDuration) select { case <-longTimeout.C: diff --git a/internal/events/event_poller_test.go b/internal/events/event_poller_test.go index 8215d2c8a..04f2ac8fc 100644 --- a/internal/events/event_poller_test.go +++ b/internal/events/event_poller_test.go @@ -36,7 +36,7 @@ func newTestEventPoller(t *testing.T, mdi *databasemocks.Plugin, neh newEventsHa ctx, cancel := context.WithCancel(context.Background()) ep = newEventPoller(ctx, mdi, newEventNotifier(ctx, "ut"), &eventPollerConf{ eventBatchSize: 10, - eventBatchTimeout: 1 * time.Millisecond, + eventBatchTimeout: 0, // customized for individual tests that enable this eventPollTimeout: 10 * time.Second, startupOffsetRetryAttempts: 1, retry: retry.Retry{ From 93894ffc5c0e35ae2acb92b86ab70742e7e60ea0 Mon Sep 17 00:00:00 2001 From: Peter Broadhurst Date: Tue, 16 Jan 2024 12:54:55 -0500 Subject: [PATCH 5/9] Work through validation of batch logic Signed-off-by: Peter Broadhurst --- docs/reference/config.md | 4 +- internal/events/event_dispatcher_test.go | 165 +++++++++++++++++++++++ internal/events/event_poller_test.go | 59 ++++++++ 3 files changed, 226 insertions(+), 2 deletions(-) diff --git a/docs/reference/config.md b/docs/reference/config.md index 85524723d..45f63e89b 100644 --- a/docs/reference/config.md +++ b/docs/reference/config.md @@ -224,7 +224,7 @@ nav_order: 2 |Key|Description|Type|Default Value| |---|-----------|----|-------------| |batchSize|The maximum number of records to read from the DB before performing an aggregation run|[`BytesSize`](https://pkg.go.dev/github.com/docker/go-units#BytesSize)|`200` -|batchTimeout|How long to wait for new events to arrive before performing aggregation on a page of events|[`time.Duration`](https://pkg.go.dev/time#Duration)|`250ms` +|batchTimeout|How long to wait for new events to arrive before performing aggregation on a page of events|[`time.Duration`](https://pkg.go.dev/time#Duration)|`0ms` |firstEvent|The first event the aggregator should process, if no previous offest is stored in the DB. Valid options are `oldest` or `newest`|`string`|`oldest` |pollTimeout|The time to wait without a notification of new events, before trying a select on the table|[`time.Duration`](https://pkg.go.dev/time#Duration)|`30s` |rewindQueryLimit|Safety limit on the maximum number of records to search when performing queries to search for rewinds|`int`|`1000` @@ -249,7 +249,7 @@ nav_order: 2 |Key|Description|Type|Default Value| |---|-----------|----|-------------| -|batchTimeout|A short time to wait for new events to arrive before re-polling for new events|[`time.Duration`](https://pkg.go.dev/time#Duration)|`250ms` +|batchTimeout|A short time to wait for new events to arrive before re-polling for new events|[`time.Duration`](https://pkg.go.dev/time#Duration)|`0ms` |bufferLength|The number of events + attachments an individual dispatcher should hold in memory ready for delivery to the subscription|`int`|`5` |pollTimeout|The time to wait without a notification of new events, before trying a select on the table|[`time.Duration`](https://pkg.go.dev/time#Duration)|`30s` diff --git a/internal/events/event_dispatcher_test.go b/internal/events/event_dispatcher_test.go index d1cd02b23..a056c3251 100644 --- a/internal/events/event_dispatcher_test.go +++ b/internal/events/event_dispatcher_test.go @@ -362,6 +362,171 @@ func TestEventDispatcherNoReadAheadInOrder(t *testing.T) { mdm.AssertExpectations(t) } +func TestEventDispatcherBatchBased(t *testing.T) { + log.SetLevel("debug") + three := uint16(3) + longTime := "1m" + subID := fftypes.NewUUID() + truthy := true + sub := &subscription{ + dispatcherElection: make(chan bool, 1), + definition: &core.Subscription{ + SubscriptionRef: core.SubscriptionRef{ID: subID, Namespace: "ns1", Name: "sub1"}, + Options: core.SubscriptionOptions{ + SubscriptionCoreOptions: core.SubscriptionCoreOptions{ + Batch: &truthy, + ReadAhead: &three, + BatchTimeout: &longTime, // because the batch should fill + }, + }, + }, + eventMatcher: regexp.MustCompile(fmt.Sprintf("^%s|%s$", core.EventTypeMessageConfirmed, core.EventTypeMessageConfirmed)), + } + + ed, cancel := newTestEventDispatcher(sub) + defer cancel() + go ed.deliverEvents() + ed.eventPoller.offsetCommitted = make(chan int64, 3) + mdi := ed.database.(*databasemocks.Plugin) + mei := ed.transport.(*eventsmocks.Plugin) + mdm := ed.data.(*datamocks.Manager) + + eventDeliveries := make(chan []*core.CombinedEventDataDelivery) + deliveryRequestMock := mei.On("BatchDeliveryRequest", ed.ctx, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) + deliveryRequestMock.RunFn = func(a mock.Arguments) { + eventDeliveries <- a.Get(3).([]*core.CombinedEventDataDelivery) + } + + // Setup the IDs + ref1 := fftypes.NewUUID() + ev1 := fftypes.NewUUID() + ref2 := fftypes.NewUUID() + ev2 := fftypes.NewUUID() + ref3 := fftypes.NewUUID() + ev3 := fftypes.NewUUID() + ref4 := fftypes.NewUUID() + ev4 := fftypes.NewUUID() + + // Setup enrichment + mdm.On("GetMessageWithDataCached", mock.Anything, ref1).Return(&core.Message{ + Header: core.MessageHeader{ID: ref1}, + }, nil, true, nil) + mdm.On("GetMessageWithDataCached", mock.Anything, ref2).Return(&core.Message{ + Header: core.MessageHeader{ID: ref2}, + }, nil, true, nil) + mdm.On("GetMessageWithDataCached", mock.Anything, ref3).Return(&core.Message{ + Header: core.MessageHeader{ID: ref3}, + }, nil, true, nil) + mdm.On("GetMessageWithDataCached", mock.Anything, ref4).Return(&core.Message{ + Header: core.MessageHeader{ID: ref4}, + }, nil, true, nil) + + // Deliver a batch of messages + batch1Done := make(chan struct{}) + go func() { + repoll, err := ed.bufferedDelivery([]core.LocallySequenced{ + &core.Event{ID: ev1, Sequence: 10000001, Reference: ref1, Type: core.EventTypeMessageConfirmed}, // match + &core.Event{ID: ev2, Sequence: 10000002, Reference: ref2, Type: core.EventTypeMessageRejected}, + &core.Event{ID: ev3, Sequence: 10000003, Reference: ref3, Type: core.EventTypeMessageConfirmed}, // match + &core.Event{ID: ev4, Sequence: 10000004, Reference: ref4, Type: core.EventTypeMessageConfirmed}, // match + }) + assert.NoError(t, err) + assert.True(t, repoll) + close(batch1Done) + }() + + // Expect to get the batch dispatched - with the three matching events + events := <-eventDeliveries + assert.Len(t, events, 3) + assert.Equal(t, *ev1, *events[0].Event.ID) + assert.Equal(t, *ref1, *events[0].Event.Message.Header.ID) + assert.Equal(t, *ev3, *events[1].Event.ID) + assert.Equal(t, *ref3, *events[1].Event.Message.Header.ID) + assert.Equal(t, *ev4, *events[2].Event.ID) + assert.Equal(t, *ref4, *events[2].Event.Message.Header.ID) + + // Ack the batch + go func() { + ed.deliveryResponse(&core.EventDeliveryResponse{ID: events[0].Event.ID}) + ed.deliveryResponse(&core.EventDeliveryResponse{ID: events[1].Event.ID}) + ed.deliveryResponse(&core.EventDeliveryResponse{ID: events[2].Event.ID}) + }() + + assert.Equal(t, int64(10000001), <-ed.eventPoller.offsetCommitted) + assert.Equal(t, int64(10000003), <-ed.eventPoller.offsetCommitted) + assert.Equal(t, int64(10000004), <-ed.eventPoller.offsetCommitted) + + // This should complete the batch + <-batch1Done + + mdi.AssertExpectations(t) + mei.AssertExpectations(t) + mdm.AssertExpectations(t) +} + +func TestEventDispatcherBatchDispatchFail(t *testing.T) { + log.SetLevel("debug") + two := uint16(2) + longTime := "1m" + subID := fftypes.NewUUID() + truthy := true + sub := &subscription{ + dispatcherElection: make(chan bool, 1), + definition: &core.Subscription{ + SubscriptionRef: core.SubscriptionRef{ID: subID, Namespace: "ns1", Name: "sub1"}, + Options: core.SubscriptionOptions{ + SubscriptionCoreOptions: core.SubscriptionCoreOptions{ + Batch: &truthy, + ReadAhead: &two, + BatchTimeout: &longTime, // because the batch should fill + }, + }, + }, + eventMatcher: regexp.MustCompile(fmt.Sprintf("^%s|%s$", core.EventTypeMessageConfirmed, core.EventTypeMessageConfirmed)), + } + + ed, cancel := newTestEventDispatcher(sub) + defer cancel() + go ed.deliverEvents() + ed.eventPoller.offsetCommitted = make(chan int64, 3) + mdi := ed.database.(*databasemocks.Plugin) + mei := ed.transport.(*eventsmocks.Plugin) + mdm := ed.data.(*datamocks.Manager) + + eventDeliveries := make(chan []*core.CombinedEventDataDelivery) + deliveryRequestMock := mei.On("BatchDeliveryRequest", ed.ctx, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(fmt.Errorf("pop")) + deliveryRequestMock.RunFn = func(a mock.Arguments) { + eventDeliveries <- a.Get(3).([]*core.CombinedEventDataDelivery) + } + + // Deliver a batch of messages + batch1Done := make(chan struct{}) + go func() { + repoll, err := ed.bufferedDelivery([]core.LocallySequenced{ + &core.Event{ID: fftypes.NewUUID(), Sequence: 10000001, Type: core.EventTypeMessageConfirmed}, + &core.Event{ID: fftypes.NewUUID(), Sequence: 10000002, Type: core.EventTypeMessageConfirmed}, + }) + assert.NoError(t, err) + assert.True(t, repoll) + close(batch1Done) + }() + + mdm.On("GetMessageWithDataCached", mock.Anything, mock.Anything).Return(&core.Message{ + Header: core.MessageHeader{ID: fftypes.NewUUID()}, + }, nil, true, nil) + + // Expect to get the batch dispatched - with the three matching events + events := <-eventDeliveries + assert.Len(t, events, 2) + + // This should complete the batch + <-batch1Done + + mdi.AssertExpectations(t) + mei.AssertExpectations(t) + mdm.AssertExpectations(t) +} + func TestEnrichEventsFailGetMessages(t *testing.T) { sub := &subscription{ diff --git a/internal/events/event_poller_test.go b/internal/events/event_poller_test.go index 04f2ac8fc..de080a5a2 100644 --- a/internal/events/event_poller_test.go +++ b/internal/events/event_poller_test.go @@ -219,6 +219,57 @@ func TestReadPageSingleCommitEvent(t *testing.T) { mdi.AssertExpectations(t) } +func TestReadPageBatchTimeoutNotFull(t *testing.T) { + mdi := &databasemocks.Plugin{} + processEventCalled := make(chan []core.LocallySequenced, 1) + ep, cancel := newTestEventPoller(t, mdi, func(events []core.LocallySequenced) (bool, error) { + processEventCalled <- events + return false, nil + }, nil) + ep.conf.eventBatchTimeout = 1 * time.Microsecond + ep.conf.eventBatchSize = 3 + ev1 := core.NewEvent(core.EventTypeMessageConfirmed, "ns1", fftypes.NewUUID(), nil, "") + ev2 := core.NewEvent(core.EventTypeMessageConfirmed, "ns1", fftypes.NewUUID(), nil, "") + mdi.On("GetEvents", mock.Anything, "unit", mock.Anything).Return([]*core.Event{ev1}, nil, nil).Once() // half batch + mdi.On("GetEvents", mock.Anything, "unit", mock.Anything).Return([]*core.Event{ev1, ev2}, nil, nil).Run(func(args mock.Arguments) { + ep.shoulderTap() + }).Once() + mdi.On("GetEvents", mock.Anything, "unit", mock.Anything).Return(nil, nil, fmt.Errorf("context done")).Run(func(args mock.Arguments) { + cancel() + }) + ep.eventLoop() + + events := <-processEventCalled + assert.Equal(t, *ev1.ID, *events[0].(*core.Event).ID) + assert.Equal(t, *ev2.ID, *events[1].(*core.Event).ID) + mdi.AssertExpectations(t) +} + +func TestReadPageBatchFull(t *testing.T) { + mdi := &databasemocks.Plugin{} + processEventCalled := make(chan []core.LocallySequenced, 1) + ep, cancel := newTestEventPoller(t, mdi, func(events []core.LocallySequenced) (bool, error) { + processEventCalled <- events + return false, nil + }, nil) + ep.conf.eventBatchTimeout = 1 * time.Microsecond + ep.conf.eventBatchSize = 2 + ev1 := core.NewEvent(core.EventTypeMessageConfirmed, "ns1", fftypes.NewUUID(), nil, "") + ev2 := core.NewEvent(core.EventTypeMessageConfirmed, "ns1", fftypes.NewUUID(), nil, "") + mdi.On("GetEvents", mock.Anything, "unit", mock.Anything).Return([]*core.Event{ev1, ev2}, nil, nil).Run(func(args mock.Arguments) { + ep.shoulderTap() + }).Once() + mdi.On("GetEvents", mock.Anything, "unit", mock.Anything).Return(nil, nil, fmt.Errorf("context done")).Run(func(args mock.Arguments) { + cancel() + }) + ep.eventLoop() + + events := <-processEventCalled + assert.Equal(t, *ev1.ID, *events[0].(*core.Event).ID) + assert.Equal(t, *ev2.ID, *events[1].(*core.Event).ID) + mdi.AssertExpectations(t) +} + func TestReadPageRewind(t *testing.T) { mdi := &databasemocks.Plugin{} processEventCalled := make(chan core.LocallySequenced, 1) @@ -325,6 +376,14 @@ func TestDoubleTap(t *testing.T) { ep.shoulderTap() // this should not block } +func TestWaitForBatchTimeoutClosedContext(t *testing.T) { + mdi := &databasemocks.Plugin{} + ep, cancel := newTestEventPoller(t, mdi, nil, nil) + ep.conf.eventBatchTimeout = 1 * time.Minute + cancel() + ep.waitForBatchTimeout() +} + func TestDoubleConfirm(t *testing.T) { mdi := &databasemocks.Plugin{} ep, cancel := newTestEventPoller(t, mdi, nil, nil) From 34c43779ec5b6c25c4229a3acb1942c6032582c4 Mon Sep 17 00:00:00 2001 From: Peter Broadhurst Date: Tue, 16 Jan 2024 14:31:00 -0500 Subject: [PATCH 6/9] Avoid deadlock condition by running logic in matchers Signed-off-by: Peter Broadhurst --- internal/events/websockets/websockets_test.go | 119 +++++++++--------- 1 file changed, 58 insertions(+), 61 deletions(-) diff --git a/internal/events/websockets/websockets_test.go b/internal/events/websockets/websockets_test.go index 30c409491..cba25ebfa 100644 --- a/internal/events/websockets/websockets_test.go +++ b/internal/events/websockets/websockets_test.go @@ -305,12 +305,13 @@ func TestStartReceiveDurable(t *testing.T) { var connID string sub := cbs.On("RegisterConnection", mock.MatchedBy(func(s string) bool { connID = s; return true }), - mock.MatchedBy(func(subMatch events.SubscriptionMatcher) bool { - return subMatch(core.SubscriptionRef{Namespace: "ns1", Name: "sub1"}) && - !subMatch(core.SubscriptionRef{Namespace: "ns2", Name: "sub1"}) && - !subMatch(core.SubscriptionRef{Namespace: "ns1", Name: "sub2"}) - }), - ).Return(nil) + mock.Anything, + ).Return(nil).Run(func(args mock.Arguments) { + subMatch := args[1].(events.SubscriptionMatcher) + assert.True(t, subMatch(core.SubscriptionRef{Namespace: "ns1", Name: "sub1"})) + assert.False(t, subMatch(core.SubscriptionRef{Namespace: "ns2", Name: "sub1"})) + assert.False(t, subMatch(core.SubscriptionRef{Namespace: "ns1", Name: "sub2"})) + }) ack := cbs.On("DeliveryResponse", mock.MatchedBy(func(s string) bool { return s == connID }), mock.Anything).Return(nil) @@ -385,12 +386,13 @@ func TestStartReceiveDurableBatch(t *testing.T) { var connID string mrg := cbs.On("RegisterConnection", mock.MatchedBy(func(s string) bool { connID = s; return true }), - mock.MatchedBy(func(subMatch events.SubscriptionMatcher) bool { - return subMatch(core.SubscriptionRef{Namespace: "ns1", Name: "sub1"}) && - !subMatch(core.SubscriptionRef{Namespace: "ns2", Name: "sub1"}) && - !subMatch(core.SubscriptionRef{Namespace: "ns1", Name: "sub2"}) - }), - ).Return(nil) + mock.Anything, + ).Return(nil).Run(func(args mock.Arguments) { + subMatch := args[1].(events.SubscriptionMatcher) + assert.True(t, subMatch(core.SubscriptionRef{Namespace: "ns1", Name: "sub1"})) + assert.False(t, subMatch(core.SubscriptionRef{Namespace: "ns2", Name: "sub1"})) + assert.False(t, subMatch(core.SubscriptionRef{Namespace: "ns1", Name: "sub2"})) + }) ack := cbs.On("DeliveryResponse", mock.MatchedBy(func(s string) bool { return s == connID }), mock.Anything).Return(nil) @@ -463,23 +465,21 @@ func TestStartReceiveDurableWithAuth(t *testing.T) { ws, wsc, cancel := newTestWebsockets(t, cbs, &testAuthorizer{}) defer cancel() var connID string - sub := cbs.On("RegisterConnection", + waitSubscribed := make(chan struct{}) + cbs.On("RegisterConnection", mock.MatchedBy(func(s string) bool { connID = s; return true }), - mock.MatchedBy(func(subMatch events.SubscriptionMatcher) bool { - return subMatch(core.SubscriptionRef{Namespace: "ns1", Name: "sub1"}) && - !subMatch(core.SubscriptionRef{Namespace: "ns2", Name: "sub1"}) && - !subMatch(core.SubscriptionRef{Namespace: "ns1", Name: "sub2"}) - }), - ).Return(nil) + mock.Anything, + ).Run(func(args mock.Arguments) { + subMatch := args[1].(events.SubscriptionMatcher) + assert.True(t, subMatch(core.SubscriptionRef{Namespace: "ns1", Name: "sub1"})) + assert.False(t, subMatch(core.SubscriptionRef{Namespace: "ns2", Name: "sub1"})) + assert.False(t, subMatch(core.SubscriptionRef{Namespace: "ns1", Name: "sub2"})) + close(waitSubscribed) + }).Return(nil) ack := cbs.On("DeliveryResponse", mock.MatchedBy(func(s string) bool { return s == connID }), mock.Anything).Return(nil) - waitSubscribed := make(chan struct{}) - sub.RunFn = func(a mock.Arguments) { - close(waitSubscribed) - } - waitAcked := make(chan struct{}) ack.RunFn = func(a mock.Arguments) { close(waitAcked) @@ -543,22 +543,20 @@ func TestStartReceiveDurableUnauthorized(t *testing.T) { _, wsc, cancel := newTestWebsockets(t, cbs, &testAuthorizer{}) defer cancel() var connID string - sub := cbs.On("RegisterConnection", + waitSubscribed := make(chan struct{}) + cbs.On("RegisterConnection", mock.MatchedBy(func(s string) bool { connID = s; return true }), - mock.MatchedBy(func(subMatch events.SubscriptionMatcher) bool { - return subMatch(core.SubscriptionRef{Namespace: "ns2", Name: "sub1"}) && - !subMatch(core.SubscriptionRef{Namespace: "ns1", Name: "sub2"}) - }), - ).Return(nil) + mock.Anything, + ).Return(nil).Run(func(args mock.Arguments) { + subMatch := args[1].(events.SubscriptionMatcher) + assert.True(t, subMatch(core.SubscriptionRef{Namespace: "ns2", Name: "sub1"})) + assert.False(t, subMatch(core.SubscriptionRef{Namespace: "ns1", Name: "sub2"})) + close(waitSubscribed) + }) ack := cbs.On("DeliveryResponse", mock.MatchedBy(func(s string) bool { return s == connID }), mock.Anything).Return(nil) - waitSubscribed := make(chan struct{}) - sub.RunFn = func(a mock.Arguments) { - close(waitSubscribed) - } - waitAcked := make(chan struct{}) ack.RunFn = func(a mock.Arguments) { close(waitAcked) @@ -577,18 +575,18 @@ func TestStartReceiveDurableUnauthorized(t *testing.T) { func TestAutoStartReceiveAckEphemeral(t *testing.T) { var connID string cbs := &eventsmocks.Callbacks{} - sub := cbs.On("EphemeralSubscription", + waitSubscribed := make(chan struct{}) + cbs.On("EphemeralSubscription", mock.MatchedBy(func(s string) bool { connID = s; return true }), - "ns1", mock.Anything, mock.Anything).Return(nil) + "ns1", mock.Anything, mock.Anything). + Return(nil). + Run(func(args mock.Arguments) { + close(waitSubscribed) + }) ack := cbs.On("DeliveryResponse", mock.MatchedBy(func(s string) bool { return s == connID }), mock.Anything).Return(nil) - waitSubscribed := make(chan struct{}) - sub.RunFn = func(a mock.Arguments) { - close(waitSubscribed) - } - waitAcked := make(chan struct{}) ack.RunFn = func(a mock.Arguments) { close(waitAcked) @@ -623,18 +621,18 @@ func TestAutoStartReceiveAckEphemeral(t *testing.T) { func TestAutoStartReceiveAckBatchEphemeral(t *testing.T) { var connID string cbs := &eventsmocks.Callbacks{} - sub := cbs.On("EphemeralSubscription", + waitSubscribed := make(chan struct{}) + cbs.On("EphemeralSubscription", mock.MatchedBy(func(s string) bool { connID = s; return true }), - "ns1", mock.Anything, mock.Anything).Return(nil) + "ns1", mock.Anything, mock.Anything). + Return(nil). + Run(func(args mock.Arguments) { + close(waitSubscribed) + }) ack := cbs.On("DeliveryResponse", mock.MatchedBy(func(s string) bool { return s == connID }), mock.Anything).Return(nil) - waitSubscribed := make(chan struct{}) - sub.RunFn = func(a mock.Arguments) { - close(waitSubscribed) - } - waitAcked := make(chan struct{}) ack.RunFn = func(a mock.Arguments) { close(waitAcked) @@ -1131,23 +1129,22 @@ func TestNamespaceScopedSuccess(t *testing.T) { ws, wsc, cancel := newTestWebsocketsCommon(t, cbs, nil, "ns1") defer cancel() var connID string - sub := cbs.On("RegisterConnection", + waitSubscribed := make(chan struct{}) + + cbs.On("RegisterConnection", mock.MatchedBy(func(s string) bool { connID = s; return true }), - mock.MatchedBy(func(subMatch events.SubscriptionMatcher) bool { - return subMatch(core.SubscriptionRef{Namespace: "ns1", Name: "sub1"}) && - !subMatch(core.SubscriptionRef{Namespace: "ns2", Name: "sub1"}) && - !subMatch(core.SubscriptionRef{Namespace: "ns1", Name: "sub2"}) - }), - ).Return(nil) + mock.Anything, + ).Return(nil).Run(func(args mock.Arguments) { + subMatch := args[1].(events.SubscriptionMatcher) + assert.True(t, subMatch(core.SubscriptionRef{Namespace: "ns1", Name: "sub1"})) + assert.False(t, subMatch(core.SubscriptionRef{Namespace: "ns2", Name: "sub1"})) + assert.False(t, subMatch(core.SubscriptionRef{Namespace: "ns1", Name: "sub2"})) + close(waitSubscribed) + }) ack := cbs.On("DeliveryResponse", mock.MatchedBy(func(s string) bool { return s == connID }), mock.Anything).Return(nil) - waitSubscribed := make(chan struct{}) - sub.RunFn = func(a mock.Arguments) { - close(waitSubscribed) - } - waitAcked := make(chan struct{}) ack.RunFn = func(a mock.Arguments) { close(waitAcked) From 88c820efa7fe9585ce4aa30a945f08f6eb6f01bc Mon Sep 17 00:00:00 2001 From: Peter Broadhurst Date: Tue, 16 Jan 2024 15:58:28 -0500 Subject: [PATCH 7/9] Add type to WS payload, to distinguish batch vs. event delivery Signed-off-by: Peter Broadhurst --- docs/reference/types/wsack.md | 2 +- docs/reference/types/wserror.md | 2 +- docs/reference/types/wsstart.md | 2 +- internal/events/websockets/websocket_connection.go | 1 + pkg/core/websocket_actions.go | 10 +++++++--- 5 files changed, 11 insertions(+), 6 deletions(-) diff --git a/docs/reference/types/wsack.md b/docs/reference/types/wsack.md index 5b2385331..fc44ff2f0 100644 --- a/docs/reference/types/wsack.md +++ b/docs/reference/types/wsack.md @@ -37,7 +37,7 @@ nav_order: 24 | Field Name | Description | Type | |------------|-------------|------| -| `type` | WSActionBase.type | `FFEnum`:
`"start"`
`"ack"`
`"protocol_error"` | +| `type` | WSActionBase.type | `FFEnum`:
`"start"`
`"ack"`
`"protocol_error"`
`"event_batch"` | | `id` | WSAck.id | [`UUID`](simpletypes#uuid) | | `subscription` | WSAck.subscription | [`SubscriptionRef`](#subscriptionref) | diff --git a/docs/reference/types/wserror.md b/docs/reference/types/wserror.md index e7052b0a1..ed2cb06e1 100644 --- a/docs/reference/types/wserror.md +++ b/docs/reference/types/wserror.md @@ -33,6 +33,6 @@ nav_order: 25 | Field Name | Description | Type | |------------|-------------|------| -| `type` | WSAck.type | `FFEnum`:
`"start"`
`"ack"`
`"protocol_error"` | +| `type` | WSAck.type | `FFEnum`:
`"start"`
`"ack"`
`"protocol_error"`
`"event_batch"` | | `error` | WSAck.error | `string` | diff --git a/docs/reference/types/wsstart.md b/docs/reference/types/wsstart.md index 3c606a79b..f8f3798f7 100644 --- a/docs/reference/types/wsstart.md +++ b/docs/reference/types/wsstart.md @@ -42,7 +42,7 @@ nav_order: 23 | Field Name | Description | Type | |------------|-------------|------| -| `type` | WSActionBase.type | `FFEnum`:
`"start"`
`"ack"`
`"protocol_error"` | +| `type` | WSActionBase.type | `FFEnum`:
`"start"`
`"ack"`
`"protocol_error"`
`"event_batch"` | | `autoack` | WSStart.autoack | `bool` | | `namespace` | WSStart.namespace | `string` | | `name` | WSStart.name | `string` | diff --git a/internal/events/websockets/websocket_connection.go b/internal/events/websockets/websocket_connection.go index 29c66c17e..58de422fa 100644 --- a/internal/events/websockets/websocket_connection.go +++ b/internal/events/websockets/websocket_connection.go @@ -267,6 +267,7 @@ func (wc *websocketConnection) dispatch(event *core.EventDelivery) error { func (wc *websocketConnection) dispatchBatch(sub *core.Subscription, events []*core.CombinedEventDataDelivery) error { inflightBatch := &core.WSEventBatch{ + Type: core.WSEventBatchType, ID: fftypes.NewUUID(), Events: make([]*core.EventDelivery, len(events)), } diff --git a/pkg/core/websocket_actions.go b/pkg/core/websocket_actions.go index 772d51344..551148946 100644 --- a/pkg/core/websocket_actions.go +++ b/pkg/core/websocket_actions.go @@ -29,6 +29,9 @@ var ( // WSProtocolErrorEventType is a special event "type" field for server to send the client, if it performs a ProtocolError WSProtocolErrorEventType = fftypes.FFEnumValue("wstype", "protocol_error") + + // WSEventBatchType is the type set when the message contains an array of events + WSEventBatchType = fftypes.FFEnumValue("wstype", "event_batch") ) // WSActionBase is the base fields of all client actions sent on the websocket @@ -65,7 +68,8 @@ type WSError struct { // WSEventBatch is used when batched delivery is enabled over the websocket, allowing // an array of events to be ack'd as a whole (rather than ack'ing individually) type WSEventBatch struct { - ID *fftypes.UUID `ffstruct:"WSEventBatch" json:"id"` - Subscription SubscriptionRef `ffstruct:"WSEventBatch" json:"subscription"` - Events []*EventDelivery `ffstruct:"WSEventBatch" json:"events"` + Type WSClientPayloadType `ffstruct:"WSEventBatch" json:"type" ffenum:"wstype"` + ID *fftypes.UUID `ffstruct:"WSEventBatch" json:"id"` + Subscription SubscriptionRef `ffstruct:"WSEventBatch" json:"subscription"` + Events []*EventDelivery `ffstruct:"WSEventBatch" json:"events"` } From 7a26e8b0ab2fb664f17d685e101c6c5c402acbb4 Mon Sep 17 00:00:00 2001 From: Peter Broadhurst Date: Wed, 17 Jan 2024 21:07:34 -0500 Subject: [PATCH 8/9] Fix batch serialization Signed-off-by: Peter Broadhurst --- pkg/core/subscription.go | 9 ++++++++- pkg/core/subscription_test.go | 22 ++++++++++++++++++---- 2 files changed, 26 insertions(+), 5 deletions(-) diff --git a/pkg/core/subscription.go b/pkg/core/subscription.go index 7af6fc85e..e9e789d1d 100644 --- a/pkg/core/subscription.go +++ b/pkg/core/subscription.go @@ -1,4 +1,4 @@ -// Copyright © 2023 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -88,6 +88,7 @@ const ( ) // SubscriptionCoreOptions are the core options that apply across all transports +// REMEMBER TO ADD OPTIONS HERE TO MarshalJSON() type SubscriptionCoreOptions struct { FirstEvent *SubOptsFirstEvent `ffstruct:"SubscriptionCoreOptions" json:"firstEvent,omitempty"` ReadAhead *uint16 `ffstruct:"SubscriptionCoreOptions" json:"readAhead,omitempty"` @@ -167,6 +168,12 @@ func (so SubscriptionOptions) MarshalJSON() ([]byte, error) { if so.TLSConfigName != "" { so.additionalOptions["tlsConfigName"] = so.TLSConfigName } + if so.Batch != nil { + so.additionalOptions["batch"] = so.Batch + } + if so.BatchTimeout != nil { + so.additionalOptions["batchTimeout"] = so.BatchTimeout + } return json.Marshal(&so.additionalOptions) } diff --git a/pkg/core/subscription_test.go b/pkg/core/subscription_test.go index 0165dc593..45d1ebf00 100644 --- a/pkg/core/subscription_test.go +++ b/pkg/core/subscription_test.go @@ -28,12 +28,15 @@ func TestSubscriptionOptionsDatabaseSerialization(t *testing.T) { firstEvent := SubOptsFirstEventNewest readAhead := uint16(50) yes := true + oneSec := "1s" sub1 := &Subscription{ Options: SubscriptionOptions{ SubscriptionCoreOptions: SubscriptionCoreOptions{ - FirstEvent: &firstEvent, - ReadAhead: &readAhead, - WithData: &yes, + FirstEvent: &firstEvent, + ReadAhead: &readAhead, + WithData: &yes, + Batch: &yes, + BatchTimeout: &oneSec, }, WebhookSubOptions: WebhookSubOptions{ TLSConfigName: "myconfig", @@ -49,7 +52,18 @@ func TestSubscriptionOptionsDatabaseSerialization(t *testing.T) { // Verify it serializes as bytes to the database b1, err := sub1.Options.Value() assert.NoError(t, err) - assert.Equal(t, `{"firstEvent":"newest","my-nested-opts":{"myopt1":12345,"myopt2":"test"},"readAhead":50,"tlsConfigName":"myconfig","withData":true}`, string(b1.([]byte))) + assert.JSONEq(t, `{ + "firstEvent":"newest", + "my-nested-opts":{ + "myopt1":12345, + "myopt2":"test" + }, + "readAhead":50, + "tlsConfigName":"myconfig", + "withData":true, + "batch":true, + "batchTimeout":"1s" + }`, string(b1.([]byte))) f1, err := sub1.Filter.Value() assert.NoError(t, err) From fee90bd3f04ecf63b1d8e0025542c84d99592d99 Mon Sep 17 00:00:00 2001 From: Peter Broadhurst Date: Fri, 19 Jan 2024 12:45:49 -0500 Subject: [PATCH 9/9] Ensure nacks sent for whole batch Signed-off-by: Peter Broadhurst --- internal/events/event_dispatcher.go | 39 ++++++++++++++++++----------- 1 file changed, 25 insertions(+), 14 deletions(-) diff --git a/internal/events/event_dispatcher.go b/internal/events/event_dispatcher.go index f667336d6..8afd99e39 100644 --- a/internal/events/event_dispatcher.go +++ b/internal/events/event_dispatcher.go @@ -398,7 +398,6 @@ func (ed *eventDispatcher) handleAckOffsetUpdate(ack ackNack) { } } -// TODO issue here, we can't just call DeliveryRequest with one thing. func (ed *eventDispatcher) deliverEvents() { withData := ed.subscription.definition.Options.WithData != nil && *ed.subscription.definition.Options.WithData for { @@ -410,30 +409,42 @@ func (ed *eventDispatcher) deliverEvents() { // As soon as we hit an error, we need to trigger into nack mode var err error + + // Loop through the events enriching them, and dispatching individually in non-batch mode eventsWithData := make([]*core.CombinedEventDataDelivery, len(events)) - for i := 0; i < len(events) && err == nil; i++ { + for i := 0; i < len(events); i++ { e := &core.CombinedEventDataDelivery{ Event: events[i], } eventsWithData[i] = e - log.L(ed.ctx).Debugf("Dispatching %s event: %.10d/%s [%s]: ref=%s/%s", ed.transport.Name(), e.Event.Sequence, e.Event.ID, e.Event.Type, e.Event.Namespace, e.Event.Reference) - if withData && e.Event.Message != nil { - e.Data, _, err = ed.data.GetMessageDataCached(ed.ctx, e.Event.Message) - } - // Individual events (in reality there is only ever i==0 for this case) - if err == nil && !ed.batch { - err = ed.transport.DeliveryRequest(ed.ctx, ed.connID, ed.subscription.definition, e.Event, e.Data) + // The first error we encounter stops us attempting to enrich or dispatch any more events + if err == nil { + log.L(ed.ctx).Debugf("Dispatching %s event: %.10d/%s [%s]: ref=%s/%s", ed.transport.Name(), e.Event.Sequence, e.Event.ID, e.Event.Type, e.Event.Namespace, e.Event.Reference) + if withData && e.Event.Message != nil { + e.Data, _, err = ed.data.GetMessageDataCached(ed.ctx, e.Event.Message) + } } - if err != nil { - ed.deliveryResponse(&core.EventDeliveryResponse{ID: e.Event.ID, Rejected: true}) + // If we are non-batched, we have to deliver each event individually... + if !ed.batch { + // .. only attempt to deliver if we've not triggered into an error scenario for one of the events already + if err == nil { + err = ed.transport.DeliveryRequest(ed.ctx, ed.connID, ed.subscription.definition, e.Event, e.Data) + } + // ... if we've triggered into an error scenario, we need to nack immediately for this and all the rest of the events + if err != nil { + ed.deliveryResponse(&core.EventDeliveryResponse{ID: e.Event.ID, Rejected: true}) + } } } // In batch mode we do one dispatch of the whole set as one - if err == nil && ed.batch { - err = ed.transport.BatchDeliveryRequest(ed.ctx, ed.connID, ed.subscription.definition, eventsWithData) + if ed.batch { + // Only attempt to deliver if we're in a non error case (enrich might have failed above) + if err == nil { + err = ed.transport.BatchDeliveryRequest(ed.ctx, ed.connID, ed.subscription.definition, eventsWithData) + } + // If we're in an error case we have to nack everything immediately if err != nil { - // nack everything on behalf of the failed delivery for _, e := range events { ed.deliveryResponse(&core.EventDeliveryResponse{ID: e.Event.ID, Rejected: true}) }