diff --git a/integration/watermill/keel/subscriber.go b/integration/watermill/keel/subscriber.go index 6e8c6ed..0267e97 100644 --- a/integration/watermill/keel/subscriber.go +++ b/integration/watermill/keel/subscriber.go @@ -161,6 +161,7 @@ func (s *Subscriber) handle(l *zap.Logger, r *http.Request, event *mpv2.Event) e } msg := message.NewMessage(s.uuidFunc(), payload) + l = l.With(zap.String("message_id", msg.UUID)) if labeler, ok := keellog.LabelerFromRequest(r); ok { labeler.Add(zap.String("message_id", msg.UUID)) } @@ -174,6 +175,10 @@ func (s *Subscriber) handle(l *zap.Logger, r *http.Request, event *mpv2.Event) e msg.Metadata.Set(name, strings.Join(headers, ",")) } + for k, v := range msg.Metadata { + l = l.With(zap.String(k, v)) + } + // TODO different context? ctx, cancelCtx := context.WithCancel(r.Context()) msg.SetContext(ctx) @@ -185,13 +190,13 @@ func (s *Subscriber) handle(l *zap.Logger, r *http.Request, event *mpv2.Event) e // wait for ACK select { case <-msg.Acked(): - l.Info("message acked") + l.Debug("message acked") return nil case <-msg.Nacked(): - l.Info("message nacked") + l.Debug("message nacked") return ErrMessageNacked case <-r.Context().Done(): - l.Info("message cancled") + l.Debug("message cancled") return ErrContextCanceled } }