From a1ef050a1bb103226b723052330a52e5ee9a6b21 Mon Sep 17 00:00:00 2001 From: L4B0MB4 Date: Sat, 30 Nov 2024 00:22:43 +0100 Subject: [PATCH] Refactor event polling to use callback for user event processing --- cmd/queryer/main.go | 2 +- pkg/query/eventpolling/polling.go | 22 +++++++--------------- pkg/query/eventpolling/userevents.go | 24 ++++++++++++++++++++++++ 3 files changed, 32 insertions(+), 16 deletions(-) create mode 100644 pkg/query/eventpolling/userevents.go diff --git a/cmd/queryer/main.go b/cmd/queryer/main.go index 9bbf802..2537512 100644 --- a/cmd/queryer/main.go +++ b/cmd/queryer/main.go @@ -42,7 +42,7 @@ func main() { h := httphandler.NewHttpHandler(uc, aut) eventPolling := eventpolling.NewEventPolling(c, eventRepo, userRepo) - go eventPolling.PollEvents() + go eventPolling.PollEvents(eventPolling.ProcessUserEvent) h.Start() } diff --git a/pkg/query/eventpolling/polling.go b/pkg/query/eventpolling/polling.go index fde2372..8578429 100644 --- a/pkg/query/eventpolling/polling.go +++ b/pkg/query/eventpolling/polling.go @@ -4,9 +4,8 @@ import ( "time" "github.com/L4B0MB4/EVTSRC/pkg/client" - "github.com/L4B0MB4/PRYVT/identification/pkg/aggregates" + "github.com/L4B0MB4/EVTSRC/pkg/models" "github.com/L4B0MB4/PRYVT/identification/pkg/query/store/repository" - "github.com/google/uuid" "github.com/rs/zerolog/log" ) @@ -23,7 +22,7 @@ func NewEventPolling(client *client.EventSourcingHttpClient, eventRepo *reposito return &EventPolling{client: client, eventRepo: eventRepo, userRepo: userRepo} } -func (ep *EventPolling) PollEvents() { +func (ep *EventPolling) PollEvents(callback func(event models.Event) error) { hadMoreThenZeroEvents := true for { @@ -44,18 +43,11 @@ func (ep *EventPolling) PollEvents() { } for _, event := range events { - if event.AggregateType == "user" { - ua, err := aggregates.NewUserAggregate(uuid.MustParse(event.AggregateId)) - if err != nil { - log.Err(err).Msg("Error while creating user aggregate") - break - } - uI := aggregates.GetUserModelFromAggregate(ua) - err = ep.userRepo.AddOrReplaceUser(uI) - if err != nil { - log.Err(err).Msg("Error while adding or replacing user") - break - } + + err := callback(event) + if err != nil { + log.Err(err).Msg("Error while processing event") + break } } if len(events) == 0 { diff --git a/pkg/query/eventpolling/userevents.go b/pkg/query/eventpolling/userevents.go new file mode 100644 index 0000000..5b4de05 --- /dev/null +++ b/pkg/query/eventpolling/userevents.go @@ -0,0 +1,24 @@ +package eventpolling + +import ( + "github.com/L4B0MB4/EVTSRC/pkg/models" + "github.com/L4B0MB4/PRYVT/identification/pkg/aggregates" + "github.com/google/uuid" + "github.com/rs/zerolog/log" +) + +func (ep *EventPolling) ProcessUserEvent(event models.Event) error { + if event.AggregateType == "user" { + ua, err := aggregates.NewUserAggregate(uuid.MustParse(event.AggregateId)) + if err != nil { + return err + } + uI := aggregates.GetUserModelFromAggregate(ua) + err = ep.userRepo.AddOrReplaceUser(uI) + if err != nil { + log.Err(err).Msg("Error while processing user event") + return err + } + } + return nil +}