diff --git a/server/trigger/client/http.go b/server/trigger/client/http.go index f93148c35..ff6333faf 100644 --- a/server/trigger/client/http.go +++ b/server/trigger/client/http.go @@ -33,6 +33,13 @@ func NewHTTPClient(url string) EventClient { } } +func NewHTTPClientWithGateway(url, gateway, headerKey string) EventClient { + c, _ := ce.NewClientHTTP(ce.WithTarget(gateway), ce.WithHeader(headerKey, url)) + return &http{ + client: c, + } +} + func (c *http) Send(ctx context.Context, events ...*ce.Event) Result { event := events[0] res := c.client.Send(ctx, *event) diff --git a/server/trigger/config.go b/server/trigger/config.go index 05364f7c9..fce8e3643 100644 --- a/server/trigger/config.go +++ b/server/trigger/config.go @@ -19,14 +19,16 @@ import ( "time" "github.com/vanus-labs/vanus/pkg/observability" + "github.com/vanus-labs/vanus/server/trigger/trigger" ) type Config struct { Observability observability.Config `yaml:"observability"` TriggerAddr string - Port int `yaml:"port"` - IP string `yaml:"ip"` - ControllerAddr []string `yaml:"controllers"` + Port int `yaml:"port"` + IP string `yaml:"ip"` + ControllerAddr []string `yaml:"controllers"` + Gateway *trigger.TargetGateway `yaml:"gateway"` HeartbeatInterval time.Duration `yaml:"heartbeat_interval"` // send event goroutine size diff --git a/server/trigger/reader/reader.go b/server/trigger/reader/reader.go index 801bd2f99..9b37e4c38 100644 --- a/server/trigger/reader/reader.go +++ b/server/trigger/reader/reader.go @@ -44,6 +44,8 @@ const ( lookupReadableLogsTimeout = 5 * time.Second readEventTimeout = 5 * time.Second readErrSleepTime = 2 * time.Second + checkEventlogInterval = 2 * time.Minute + logFrequencyMini = 10 ) type Config struct { @@ -143,12 +145,12 @@ func (r *reader) Start() error { r.wg.Add(1) go func() { defer r.wg.Done() - ticker := time.NewTicker(time.Minute * 2) + ticker := time.NewTicker(checkEventlogInterval) defer ticker.Stop() for { select { case <-ticker.C: - r.findEventlog(ctx) + _ = r.findEventlog(ctx) case <-ctx.Done(): return } @@ -219,7 +221,7 @@ func (elReader *eventlogReader) stop() { elReader.cancel() } -// get earliest offset +// getOffset get earliest offset. func (elReader *eventlogReader) getOffset(ctx context.Context) (int64, error) { logs, err := elReader.config.Client.Eventbus(ctx, api.WithID(elReader.config.EventbusID.Uint64())).ListLog(ctx) @@ -251,7 +253,7 @@ func (elReader *eventlogReader) run(parentCtx context.Context) { Uint64("offset", elReader.offset). Msg("eventlog reader init success") - min := time.Now().Minute() / 10 + min := time.Now().Minute() / logFrequencyMini for { select { case <-ctx.Done(): @@ -259,7 +261,7 @@ func (elReader *eventlogReader) run(parentCtx context.Context) { default: } err := elReader.loop(ctx, r) - currMin := time.Now().Minute() / 10 + currMin := time.Now().Minute() / logFrequencyMini if currMin != min { min = currMin log.Info().Err(err). diff --git a/server/trigger/trigger/config.go b/server/trigger/trigger/config.go index fb91e0849..f392be3b7 100644 --- a/server/trigger/trigger/config.go +++ b/server/trigger/trigger/config.go @@ -45,6 +45,12 @@ type Config struct { SendBatchSize int PullBatchSize int MaxUACKNumber int + TargetGateway *TargetGateway +} + +type TargetGateway struct { + Address string `yaml:"address"` + TargetHeaderName string `yaml:"header_name"` } func defaultConfig() Config { @@ -155,3 +161,9 @@ func WithMaxUACKNumber(maxUACKNumber int) Option { t.config.MaxUACKNumber = maxUACKNumber } } + +func WithProxy(proxy *TargetGateway) Option { + return func(t *trigger) { + t.config.TargetGateway = proxy + } +} diff --git a/server/trigger/trigger/trigger.go b/server/trigger/trigger/trigger.go index 29a247c30..42a854350 100644 --- a/server/trigger/trigger/trigger.go +++ b/server/trigger/trigger/trigger.go @@ -68,6 +68,7 @@ type Trigger interface { type trigger struct { subscriptionIDStr string + eventbusIDStr string subscription *primitive.Subscription offsetManager *offset.SubscriptionOffset @@ -119,6 +120,7 @@ func newTrigger(subscription *primitive.Subscription, opts ...Option) (*trigger, filter: filter.GetFilter(subscription.Filters), subscription: subscription, subscriptionIDStr: subscription.ID.String(), + eventbusIDStr: subscription.EventbusID.String(), transformer: trans, } if subscription.Protocol == primitive.GRPC { @@ -154,7 +156,11 @@ func (t *trigger) getClient() client.EventClient { func (t *trigger) changeTarget( sink primitive.URI, protocol primitive.Protocol, credential primitive.SinkCredential, ) error { - eventCli := newEventClient(sink, protocol, credential) + eventCli := newEventClient(clientConfig{ + sink: sink, + protocol: protocol, + credential: credential, + gateway: t.config.TargetGateway}) t.lock.Lock() defer t.lock.Unlock() t.eventCli = eventCli @@ -280,7 +286,10 @@ func (t *trigger) runRetryEventFilterTransform(ctx context.Context) { if err != nil { log.Info(ctx).Err(err). Str("event_id", event.record.Event.ID()). - Interface("event_offset", event.record.OffsetInfo). + Str(log.KeySubscriptionID, t.subscriptionIDStr). + Str(log.KeyEventbusID, t.eventbusIDStr). + Stringer(log.KeyEventlogID, event.record.EventlogID). + Uint64("event_offset", event.record.OffsetInfo.Offset). Msg("event transform error") t.writeFailEvent(ctx, record.Event, ErrTransformCode, err) t.offsetManager.EventCommit(record.OffsetInfo) @@ -315,7 +324,10 @@ func (t *trigger) runEventFilterTransform(ctx context.Context) { if err != nil { log.Info(ctx).Err(err). Str("event_id", event.record.Event.ID()). - Interface("event_offset", event.record.OffsetInfo). + Str(log.KeySubscriptionID, t.subscriptionIDStr). + Str(log.KeyEventbusID, t.eventbusIDStr). + Stringer(log.KeyEventlogID, event.record.EventlogID). + Uint64("event_offset", event.record.OffsetInfo.Offset). Msg("event transform error") t.writeFailEvent(ctx, record.Event, ErrTransformCode, err) t.offsetManager.EventCommit(record.OffsetInfo) @@ -395,7 +407,7 @@ func (t *trigger) processEvent(ctx context.Context, events ...*toSendEvent) { es := make([]*ce.Event, l) for i := range events { if events[i].retry { - retryEventCnt += 1 + retryEventCnt++ } es[i] = events[i].transform } @@ -406,7 +418,11 @@ func (t *trigger) processEvent(ctx context.Context, events ...*toSendEvent) { Int("code", r.StatusCode). Int("count", l). Str("event_id", events[0].record.Event.ID()). - Interface("event_offset", events[0].record.OffsetInfo). + Interface("target", t.subscription.Sink). + Str(log.KeySubscriptionID, t.subscriptionIDStr). + Str(log.KeyEventbusID, t.eventbusIDStr). + Stringer(log.KeyEventlogID, events[0].record.EventlogID). + Uint64("event_offset", events[0].record.OffsetInfo.Offset). Msg("send event fail") code := r.StatusCode if t.config.Ordered { @@ -427,11 +443,11 @@ func (t *trigger) processEvent(ctx context.Context, events ...*toSendEvent) { Msg("send event success") } if retryEventCnt > 0 { - metrics.TriggerPushEventCounter.WithLabelValues(t.subscriptionIDStr, t.subscription.EventbusID.String(), metrics.LabelTrue, result). + metrics.TriggerPushEventCounter.WithLabelValues(t.subscriptionIDStr, t.eventbusIDStr, metrics.LabelTrue, result). Add(float64(retryEventCnt)) } if l > retryEventCnt { - metrics.TriggerPushEventCounter.WithLabelValues(t.subscriptionIDStr, t.subscription.EventbusID.String(), metrics.LabelFalse, result). + metrics.TriggerPushEventCounter.WithLabelValues(t.subscriptionIDStr, t.eventbusIDStr, metrics.LabelFalse, result). Add(float64(l - retryEventCnt)) } } @@ -488,7 +504,7 @@ func (t *trigger) writeEventToRetry(ctx context.Context, e *ce.Event, attempts i Observe(time.Since(startTime).Seconds()) if err != nil { log.Info(ctx).Err(err). - Stringer(log.KeySubscriptionID, t.subscription.ID). + Str(log.KeySubscriptionID, t.subscriptionIDStr). Int("attempt", writeAttempt). Interface("event", e). Msg("write retry event error") @@ -502,7 +518,7 @@ func (t *trigger) writeEventToRetry(ctx context.Context, e *ce.Event, attempts i } } log.Debug(ctx). - Stringer(log.KeySubscriptionID, t.subscription.ID). + Str(log.KeySubscriptionID, t.subscriptionIDStr). Interface("event", e). Msg("write retry event success") } @@ -523,7 +539,7 @@ func (t *trigger) writeEventToDeadLetter(ctx context.Context, e *ce.Event, reaso Observe(time.Since(startTime).Seconds()) if err != nil { log.Info(ctx).Err(err). - Stringer(log.KeySubscriptionID, t.subscription.ID). + Str(log.KeySubscriptionID, t.subscriptionIDStr). Int("attempt", writeAttempt). Interface("event", e). Msg("write dl event error") @@ -536,7 +552,7 @@ func (t *trigger) writeEventToDeadLetter(ctx context.Context, e *ce.Event, reaso } } log.Debug(ctx). - Stringer(log.KeySubscriptionID, t.subscription.ID). + Str(log.KeySubscriptionID, t.subscriptionIDStr). Interface("event", e). Msg("write dl event success") } @@ -572,7 +588,11 @@ func getOffset(sub *primitive.Subscription) map[vanus.ID]uint64 { } func (t *trigger) Init(ctx context.Context) error { - t.eventCli = newEventClient(t.subscription.Sink, t.subscription.Protocol, t.subscription.SinkCredential) + t.eventCli = newEventClient(clientConfig{ + sink: t.subscription.Sink, + protocol: t.subscription.Protocol, + credential: t.subscription.SinkCredential, + gateway: t.config.TargetGateway}) t.client = eb.Connect(t.config.Controllers) t.timerEventWriter = t.client.Eventbus(ctx, api.WithID( @@ -592,7 +612,7 @@ func (t *trigger) Init(ctx context.Context) error { func (t *trigger) Start(ctx context.Context) error { log.Info(ctx). - Stringer(log.KeySubscriptionID, t.subscription.ID). + Str(log.KeySubscriptionID, t.subscriptionIDStr). Msg("trigger start...") ctx, cancel := context.WithCancel(context.Background()) t.stop = cancel @@ -613,14 +633,14 @@ func (t *trigger) Start(ctx context.Context) error { t.wg.StartWithContext(ctx, t.runRetryEventFilterTransform) t.state = TriggerRunning log.Info(ctx). - Stringer(log.KeySubscriptionID, t.subscription.ID). + Str(log.KeySubscriptionID, t.subscriptionIDStr). Msg("trigger started") return nil } func (t *trigger) Stop(ctx context.Context) error { log.Info(ctx). - Stringer(log.KeySubscriptionID, t.subscription.ID). + Str(log.KeySubscriptionID, t.subscriptionIDStr). Msg("trigger stop...") if t.state == TriggerStopped { @@ -638,7 +658,7 @@ func (t *trigger) Stop(ctx context.Context) error { t.offsetManager.Close() t.state = TriggerStopped log.Info(ctx). - Stringer(log.KeySubscriptionID, t.subscription.ID). + Str(log.KeySubscriptionID, t.subscriptionIDStr). Msg("trigger stopped") return nil } diff --git a/server/trigger/trigger/util.go b/server/trigger/trigger/util.go index 706e28d86..201ddb1dd 100644 --- a/server/trigger/trigger/util.go +++ b/server/trigger/trigger/util.go @@ -25,20 +25,29 @@ import ( "github.com/vanus-labs/vanus/server/trigger/client" ) -func newEventClient(sink primitive.URI, - protocol primitive.Protocol, - credential primitive.SinkCredential) client.EventClient { - switch protocol { +type clientConfig struct { + gateway *TargetGateway + sink primitive.URI + protocol primitive.Protocol + credential primitive.SinkCredential +} + +func newEventClient(cfg clientConfig) client.EventClient { + sink := string(cfg.sink) + switch cfg.protocol { case primitive.AwsLambdaProtocol: - _credential, _ := credential.(*primitive.AkSkSinkCredential) - return client.NewAwsLambdaClient(_credential.AccessKeyID, _credential.SecretAccessKey, string(sink)) + _credential, _ := cfg.credential.(*primitive.AkSkSinkCredential) + return client.NewAwsLambdaClient(_credential.AccessKeyID, _credential.SecretAccessKey, sink) case primitive.GCloudFunctions: - _credential, _ := credential.(*primitive.GCloudSinkCredential) - return client.NewGCloudFunctionClient(string(sink), _credential.CredentialJSON) + _credential, _ := cfg.credential.(*primitive.GCloudSinkCredential) + return client.NewGCloudFunctionClient(sink, _credential.CredentialJSON) case primitive.GRPC: - return client.NewGRPCClient(string(sink)) + return client.NewGRPCClient(sink) default: - return client.NewHTTPClient(string(sink)) + if cfg.gateway != nil { + return client.NewHTTPClientWithGateway(sink, cfg.gateway.Address, cfg.gateway.TargetHeaderName) + } + return client.NewHTTPClient(sink) } } diff --git a/server/trigger/trigger/util_test.go b/server/trigger/trigger/util_test.go index dec7a7277..14319f45e 100644 --- a/server/trigger/trigger/util_test.go +++ b/server/trigger/trigger/util_test.go @@ -26,13 +26,13 @@ import ( func TestNewEventClient(t *testing.T) { Convey("test new event client", t, func() { Convey("new lambda client", func() { - cli := newEventClient("test", primitive.AwsLambdaProtocol, - primitive.NewAkSkSinkCredential("ak", "sk")) + cli := newEventClient(clientConfig{sink: "test", protocol: primitive.AwsLambdaProtocol, + credential: primitive.NewAkSkSinkCredential("ak", "sk")}) So(cli, ShouldNotBeNil) }) Convey("new http client", func() { - cli := newEventClient("test", primitive.HTTPProtocol, - primitive.NewPlainSinkCredential("identifier", "secret")) + cli := newEventClient(clientConfig{sink: "test", protocol: primitive.HTTPProtocol, + credential: primitive.NewPlainSinkCredential("identifier", "secret")}) So(cli, ShouldNotBeNil) }) }) diff --git a/server/trigger/worker.go b/server/trigger/worker.go index bfeec0a91..866c4c795 100644 --- a/server/trigger/worker.go +++ b/server/trigger/worker.go @@ -261,6 +261,7 @@ func (w *worker) getTriggerOptions(subscription *primitive.Subscription) []trigg trigger.WithGoroutineSize(w.config.SendEventGoroutineSize), trigger.WithSendBatchSize(w.config.SendEventBatchSize), trigger.WithPullBatchSize(w.config.PullEventBatchSize), - trigger.WithMaxUACKNumber(w.config.MaxUACKEventNumber)) + trigger.WithMaxUACKNumber(w.config.MaxUACKEventNumber), + trigger.WithProxy(w.config.Gateway)) return opts }