diff --git a/internal/graphql/services/executors.go b/internal/graphql/services/executors.go index a0b2531802..5b8247ac32 100644 --- a/internal/graphql/services/executors.go +++ b/internal/graphql/services/executors.go @@ -32,7 +32,7 @@ func (s *executorsService) List(selector string) ([]testkube.ExecutorDetails, er } func (s *executorsService) SubscribeList(ctx context.Context, selector string) (<-chan []testkube.ExecutorDetails, error) { - return HandleSubscription(ctx, "events.executor.>", s, func() ([]testkube.ExecutorDetails, error) { + return HandleSubscription(ctx, "agent.executor.>", s, func() ([]testkube.ExecutorDetails, error) { return s.List(selector) }) } diff --git a/internal/graphql/services/executors_test.go b/internal/graphql/services/executors_test.go index aae22813a4..de61cbdf5e 100644 --- a/internal/graphql/services/executors_test.go +++ b/internal/graphql/services/executors_test.go @@ -169,7 +169,7 @@ func TestExecutorsService_SubscribeList(t *testing.T) { assert.NoError(t, err) <-ch client.Client = getMockExecutorClient(k8sObjects2).Client - assert.NoError(t, srvMock.BusMock().PublishTopic("events.executor.create", testkube.Event{ + assert.NoError(t, srvMock.BusMock().PublishTopic("agent.executor.create", testkube.Event{ Type_: testkube.EventCreated, Resource: testkube.EventResourceExecutor, })) diff --git a/pkg/api/v1/testkube/model_event_extended.go b/pkg/api/v1/testkube/model_event_extended.go index e16d860dfd..042ccb9c84 100644 --- a/pkg/api/v1/testkube/model_event_extended.go +++ b/pkg/api/v1/testkube/model_event_extended.go @@ -8,8 +8,9 @@ import ( ) const ( - TestStartSubject = "events.test.start" - TestStopSubject = "events.test.stop" + Subscription = "agent" + TestStartSubject = Subscription + ".test.start" + TestStopSubject = Subscription + ".test.stop" ) // check if Event implements model generic event type @@ -262,14 +263,14 @@ func (e Event) Topic() string { } if e.Resource == nil { - return "events.all" + return Subscription + ".all" } if e.ResourceId == "" { - return "events." + string(*e.Resource) + return Subscription + "." + string(*e.Resource) } - return "events." + string(*e.Resource) + "." + e.ResourceId + return Subscription + "." + string(*e.Resource) + "." + e.ResourceId } // GetResourceId implmenents generic event trigger diff --git a/pkg/api/v1/testkube/model_event_extended_test.go b/pkg/api/v1/testkube/model_event_extended_test.go index 811776cd21..a2b02f374f 100644 --- a/pkg/api/v1/testkube/model_event_extended_test.go +++ b/pkg/api/v1/testkube/model_event_extended_test.go @@ -166,16 +166,16 @@ func TestEvent_Topic(t *testing.T) { t.Run("should return events topic if not resource set", func(t *testing.T) { evt := Event{Type_: EventStartTest, Resource: nil} - assert.Equal(t, "events.all", evt.Topic()) + assert.Equal(t, "agent.all", evt.Topic()) }) t.Run("should return event topic with resource name and id if set", func(t *testing.T) { evt := Event{Type_: EventStartTest, Resource: EventResourceExecutor, ResourceId: "a12"} - assert.Equal(t, "events.executor.a12", evt.Topic()) + assert.Equal(t, "agent.executor.a12", evt.Topic()) }) t.Run("should return event topic with resource name when id not set", func(t *testing.T) { evt := Event{Type_: EventStartTest, Resource: EventResourceExecutor} - assert.Equal(t, "events.executor", evt.Topic()) + assert.Equal(t, "agent.executor", evt.Topic()) }) } diff --git a/pkg/event/bus/nats.go b/pkg/event/bus/nats.go index 79e61d1ebf..d5c9613ee3 100644 --- a/pkg/event/bus/nats.go +++ b/pkg/event/bus/nats.go @@ -19,7 +19,7 @@ var ( const ( SubscribeBuffer = 1 - SubscriptionName = "events" + SubscriptionName = "agent" InternalPublishTopic = "internal.all" InternalSubscribeTopic = "internal.>" ) @@ -155,7 +155,7 @@ func (n *NATSBus) queueName(subscription, queue string) string { func (n *NATSBus) TraceEvents() { s, err := n.nc.Subscribe(SubscriptionName+".>", func(event testkube.Event) { - log.Tracew(log.DefaultLogger, "all events.> trace", event.Log()...) + log.Tracew(log.DefaultLogger, "got message on events", event.Log()...) }) if err != nil { diff --git a/pkg/event/emitter.go b/pkg/event/emitter.go index 52fd65fd4e..ab555e8bb7 100644 --- a/pkg/event/emitter.go +++ b/pkg/event/emitter.go @@ -158,11 +158,12 @@ func (e *Emitter) Listen(ctx context.Context) { } func (e *Emitter) startListener(l common.Listener) { - err := e.Bus.SubscribeTopic("events.>", l.Name(), e.notifyHandler(l)) + err := e.Bus.SubscribeTopic(bus.SubscriptionName+".>", l.Name(), e.notifyHandler(l)) if err != nil { e.Log.Errorw("error while starting listener", "error", err) + return } - e.Log.Infow("started listener", l.Name(), l.Metadata()) + e.Log.Infow("started listener", "name", l.Name(), "metadata", l.Metadata()) } func (e *Emitter) stopListener(name string) { @@ -174,13 +175,13 @@ func (e *Emitter) stopListener(name string) { } func (e *Emitter) notifyHandler(l common.Listener) bus.Handler { - logger := e.Log.With("listen-on", l.Events(), "queue-group", l.Name(), "selector", l.Selector(), "metadata", l.Metadata()) + logger := e.Log.With("queue-group", l.Name()) return func(event testkube.Event) error { if event.Valid(l.Selector(), l.Events()) { result := l.Notify(event) log.Tracew(logger, "listener notified", append(event.Log(), "result", result)...) } else { - log.Tracew(logger, "dropping event not matching selector or type", event.Log()...) + log.Tracew(logger, "dropping event not matching selector or type", append(event.Log(), "listen-on", l.Events(), "selector", l.Selector(), "metadata", l.Metadata())...) } return nil } diff --git a/pkg/logs/client/interface.go b/pkg/logs/client/interface.go index d942682f6c..7d62ffad64 100644 --- a/pkg/logs/client/interface.go +++ b/pkg/logs/client/interface.go @@ -9,8 +9,9 @@ import ( const ( StreamPrefix = "log" - StartSubject = "events.logs.start" - StopSubject = "events.logs.stop" + Subscription = "agent" + StartSubject = Subscription + ".logs.start" + StopSubject = Subscription + ".logs.stop" ) //go:generate mockgen -destination=./mock_stream.go -package=client "github.com/kubeshop/testkube/pkg/logs/client" Stream diff --git a/pkg/logs/events.go b/pkg/logs/events.go index 712a5df1a5..d084445ddf 100644 --- a/pkg/logs/events.go +++ b/pkg/logs/events.go @@ -27,8 +27,9 @@ const ( StartQueue = "logsstart" StopQueue = "logsstop" - LogStartSubject = "events.logs.start" - LogStopSubject = "events.logs.stop" + Subscription = "agent" + LogStartSubject = Subscription + ".logs.start" + LogStopSubject = Subscription + ".logs.stop" ) var (