diff --git a/cmd/queryer/main.go b/cmd/queryer/main.go index 2537512..2ceaf9b 100644 --- a/cmd/queryer/main.go +++ b/cmd/queryer/main.go @@ -4,6 +4,7 @@ import ( "os" "github.com/L4B0MB4/EVTSRC/pkg/client" + "github.com/L4B0MB4/PRYVT/identification/pkg/query/eventhandling" "github.com/L4B0MB4/PRYVT/identification/pkg/query/eventpolling" "github.com/L4B0MB4/PRYVT/identification/pkg/query/httphandler" "github.com/L4B0MB4/PRYVT/identification/pkg/query/httphandler/controller" @@ -41,8 +42,10 @@ func main() { aut := auth.NewAuthMiddleware(tokenManager) h := httphandler.NewHttpHandler(uc, aut) - eventPolling := eventpolling.NewEventPolling(c, eventRepo, userRepo) - go eventPolling.PollEvents(eventPolling.ProcessUserEvent) + userEventHandler := eventhandling.NewUserEventHandler(userRepo) + + eventPolling := eventpolling.NewEventPolling(c, eventRepo, userEventHandler) + go eventPolling.PollEvents() h.Start() } diff --git a/pkg/query/eventpolling/userevents.go b/pkg/query/eventhandling/user.go similarity index 53% rename from pkg/query/eventpolling/userevents.go rename to pkg/query/eventhandling/user.go index 5b4de05..bab128c 100644 --- a/pkg/query/eventpolling/userevents.go +++ b/pkg/query/eventhandling/user.go @@ -1,20 +1,31 @@ -package eventpolling +package eventhandling import ( "github.com/L4B0MB4/EVTSRC/pkg/models" "github.com/L4B0MB4/PRYVT/identification/pkg/aggregates" + "github.com/L4B0MB4/PRYVT/identification/pkg/query/store/repository" "github.com/google/uuid" "github.com/rs/zerolog/log" ) -func (ep *EventPolling) ProcessUserEvent(event models.Event) error { +type UserEventHandler struct { + userRepo *repository.UserRepository +} + +func NewUserEventHandler(userRepo *repository.UserRepository) *UserEventHandler { + return &UserEventHandler{ + userRepo: userRepo, + } +} + +func (eh *UserEventHandler) HandleEvent(event models.Event) error { if event.AggregateType == "user" { ua, err := aggregates.NewUserAggregate(uuid.MustParse(event.AggregateId)) if err != nil { return err } uI := aggregates.GetUserModelFromAggregate(ua) - err = ep.userRepo.AddOrReplaceUser(uI) + err = eh.userRepo.AddOrReplaceUser(uI) if err != nil { log.Err(err).Msg("Error while processing user event") return err diff --git a/pkg/query/eventpolling/event_handler.go b/pkg/query/eventpolling/event_handler.go new file mode 100644 index 0000000..b729347 --- /dev/null +++ b/pkg/query/eventpolling/event_handler.go @@ -0,0 +1,7 @@ +package eventpolling + +import "github.com/L4B0MB4/EVTSRC/pkg/models" + +type EventHanlder interface { + HandleEvent(event models.Event) error +} diff --git a/pkg/query/eventpolling/polling.go b/pkg/query/eventpolling/polling.go index 8578429..90bfc45 100644 --- a/pkg/query/eventpolling/polling.go +++ b/pkg/query/eventpolling/polling.go @@ -4,25 +4,24 @@ import ( "time" "github.com/L4B0MB4/EVTSRC/pkg/client" - "github.com/L4B0MB4/EVTSRC/pkg/models" "github.com/L4B0MB4/PRYVT/identification/pkg/query/store/repository" "github.com/rs/zerolog/log" ) type EventPolling struct { - client *client.EventSourcingHttpClient - eventRepo *repository.EventRepository - userRepo *repository.UserRepository + client *client.EventSourcingHttpClient + eventRepo *repository.EventRepository + eventHandler EventHanlder } -func NewEventPolling(client *client.EventSourcingHttpClient, eventRepo *repository.EventRepository, userRepo *repository.UserRepository) *EventPolling { - if client == nil || eventRepo == nil || userRepo == nil { +func NewEventPolling(client *client.EventSourcingHttpClient, eventRepo *repository.EventRepository, eventHandler EventHanlder) *EventPolling { + if client == nil || eventRepo == nil || eventHandler == nil { return nil } - return &EventPolling{client: client, eventRepo: eventRepo, userRepo: userRepo} + return &EventPolling{client: client, eventRepo: eventRepo, eventHandler: eventHandler} } -func (ep *EventPolling) PollEvents(callback func(event models.Event) error) { +func (ep *EventPolling) PollEvents() { hadMoreThenZeroEvents := true for { @@ -44,7 +43,7 @@ func (ep *EventPolling) PollEvents(callback func(event models.Event) error) { for _, event := range events { - err := callback(event) + err := ep.eventHandler.HandleEvent(event) if err != nil { log.Err(err).Msg("Error while processing event") break