From 166e36db23ca82e5d28a264e63ef125457a24858 Mon Sep 17 00:00:00 2001 From: Derek Wang Date: Fri, 25 Sep 2020 14:59:45 -0700 Subject: [PATCH] feat: Implemented Exact Once triggering for NATS event bus (#873) * feat: Implemented Exact Once triggering for NATS event bus * update comments * still comments * fix doc format * fix typo * lowercase id * correct start position * ackwait change back to 1 sec * log minor change --- docs/eventsources/naming.md | 2 +- docs/webhook-authentication.md | 18 +++++----- eventbus/driver/nats.go | 64 +++++++++++++++++++++++++++++++--- eventsources/eventing.go | 3 +- 4 files changed, 71 insertions(+), 16 deletions(-) diff --git a/docs/eventsources/naming.md b/docs/eventsources/naming.md index da7f26560b47..8edf1d56933d 100644 --- a/docs/eventsources/naming.md +++ b/docs/eventsources/naming.md @@ -9,7 +9,7 @@ dependencies: eventName: example ``` -The `eventSourceName` ad `eventName` might be confusing. Take the following +The `eventSourceName` and `eventName` might be confusing. Take the following EventSource example, the `eventSourceName` and `eventName` are described as below. diff --git a/docs/webhook-authentication.md b/docs/webhook-authentication.md index 7706314e0b3e..da0ae1d56cf8 100644 --- a/docs/webhook-authentication.md +++ b/docs/webhook-authentication.md @@ -25,16 +25,16 @@ Then add `authSecret` to your `webhook` EventSource. apiVersion: argoproj.io/v1alpha1 kind: EventSource metadata: -name: webhook + name: webhook spec: -webhook: - example: - port: "12000" - endpoint: /example - method: POST - authSecret: - name: my-webhook-token - key: my-token + webhook: + example: + port: "12000" + endpoint: /example + method: POST + authSecret: + name: my-webhook-token + key: my-token ``` Now you can authenticate your webhook endpoint with the configured token. diff --git a/eventbus/driver/nats.go b/eventbus/driver/nats.go index de230bf13115..34406a0017d0 100644 --- a/eventbus/driver/nats.go +++ b/eventbus/driver/nats.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "strings" + "sync" "time" "github.com/Knetic/govaluate" @@ -148,25 +149,61 @@ func (n *natsStreaming) SubscribeEventSources(ctx context.Context, conn Connecti n.processEventSourceMsg(m, msgHolder, filter, action, log) }, stan.DurableName(durableName), stan.SetManualAckMode(), - stan.StartAt(pb.StartPosition_LastReceived), - stan.AckWait(3*time.Second), + stan.StartAt(pb.StartPosition_NewOnly), + stan.AckWait(1*time.Second), stan.MaxInflight(len(msgHolder.depNames)+2)) if err != nil { log.Errorf("failed to subscribe to subject %s", n.subject) return err } log.Infof("Subscribed to subject %s ...", n.subject) + + // Daemon to evict cache + wg := &sync.WaitGroup{} + cacheEvictorStopCh := make(chan struct{}) + wg.Add(1) + go func() { + defer wg.Done() + log.Info("starting ExactOnce cache clean up daemon ...") + ticker := time.NewTicker(60 * time.Second) + for { + select { + case <-cacheEvictorStopCh: + log.Info("exiting ExactOnce cache clean up daemon...") + return + case <-ticker.C: + now := time.Now().UnixNano() + num := 0 + msgHolder.smap.Range(func(key, value interface{}) bool { + v := value.(int64) + // Evict cached ID older than 5 minutes + if now-v > 5*60*1000*1000*1000 { + msgHolder.smap.Delete(key) + num++ + log.Debugw("cached ID evicted", "id", key) + } + return true + }) + log.Infof("finished evicting %v cached IDs, time cost: %v ms", num, (time.Now().UnixNano()-now)/1000/1000) + } + } + }() + for { select { case <-ctx.Done(): log.Info("existing, unsubscribing and closing connection...") _ = sub.Close() log.Infof("subscription on subject %s closed", n.subject) + cacheEvictorStopCh <- struct{}{} + wg.Wait() return nil case <-closeCh: log.Info("closing subscription...") _ = sub.Close() log.Infof("subscription on subject %s closed", n.subject) + cacheEvictorStopCh <- struct{}{} + wg.Wait() return nil } } @@ -202,6 +239,14 @@ func (n *natsStreaming) processEventSourceMsg(m *stan.Msg, msgHolder *eventSourc } } + // NATS Streaming guarantees At Least Once delivery, + // so need to check if the message is duplicate + if _, ok := msgHolder.smap.Load(event.ID()); ok { + log.Infow("ATTENTION: Duplicate delivered message detected", "message", m) + _ = m.Ack() + return + } + // Clean up old messages before starting a new round if msgHolder.lastMeetTime > 0 || msgHolder.latestGoodMsgTimestamp > 0 { // ACK all the old messages after conditions meet @@ -209,7 +254,7 @@ func (n *natsStreaming) processEventSourceMsg(m *stan.Msg, msgHolder *eventSourc if depName != "" { msgHolder.reset(depName) } - _ = m.Ack() + msgHolder.ackAndCache(m, event.ID()) return } return @@ -222,7 +267,7 @@ func (n *natsStreaming) processEventSourceMsg(m *stan.Msg, msgHolder *eventSourc return } else if m.Timestamp < existingMsg.timestamp { // Redelivered old message, ack and return - _ = m.Ack() + msgHolder.ackAndCache(m, event.ID()) return } } @@ -260,7 +305,7 @@ func (n *natsStreaming) processEventSourceMsg(m *stan.Msg, msgHolder *eventSourc go action(messages) msgHolder.reset(depName) - _ = m.Ack() + msgHolder.ackAndCache(m, event.ID()) } // eventSourceMessage is used by messageHolder to hold the latest message @@ -282,6 +327,8 @@ type eventSourceMessageHolder struct { sourceDepMap map[string]string parameters map[string]interface{} msgs map[string]*eventSourceMessage + // A sync map used to cache the message IDs, it is used to guarantee Exact Once triggering + smap *sync.Map } func newEventSourceMessageHolder(dependencyExpr string, dependencies []Dependency) (*eventSourceMessageHolder, error) { @@ -315,6 +362,7 @@ func newEventSourceMessageHolder(dependencyExpr string, dependencies []Dependenc sourceDepMap: srcDepMap, parameters: parameters, msgs: msgs, + smap: new(sync.Map), }, nil } @@ -331,6 +379,12 @@ func (mh *eventSourceMessageHolder) getDependencyName(eventSourceName, eventName return "", nil } +// Ack the stan message and cache the ID to make sure Exact Once triggering +func (mh *eventSourceMessageHolder) ackAndCache(m *stan.Msg, id string) { + _ = m.Ack() + mh.smap.Store(id, time.Now().UnixNano()) +} + // Reset the parameter and message that a dependency holds func (mh *eventSourceMessageHolder) reset(depName string) { mh.parameters[depName] = false diff --git a/eventsources/eventing.go b/eventsources/eventing.go index c7f3857c6f7a..931b92b807a1 100644 --- a/eventsources/eventing.go +++ b/eventsources/eventing.go @@ -36,6 +36,7 @@ import ( apicommon "github.com/argoproj/argo-events/pkg/apis/common" eventbusv1alpha1 "github.com/argoproj/argo-events/pkg/apis/eventbus/v1alpha1" "github.com/argoproj/argo-events/pkg/apis/eventsource/v1alpha1" + cloudevents "github.com/cloudevents/sdk-go/v2" "github.com/google/uuid" "github.com/pkg/errors" @@ -284,7 +285,7 @@ func (e *EventSourceAdaptor) Start(ctx context.Context, stopCh <-chan struct{}) logger.Error("failed to reconnect to eventbus", zap.Error(err)) continue } - logger.Info("reconnected the NATS streaming server...") + logger.Info("reconnected to eventbus successfully") } } }