Skip to content

Commit

Permalink
Refactor event handling by introducing UserEventHandler and updating …
Browse files Browse the repository at this point in the history
…event polling to use it
  • Loading branch information
L4B0MB4 committed Nov 29, 2024
1 parent a1ef050 commit 6b932f8
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 14 deletions.
7 changes: 5 additions & 2 deletions cmd/queryer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"os"

"github.com/L4B0MB4/EVTSRC/pkg/client"
"github.com/L4B0MB4/PRYVT/identification/pkg/query/eventhandling"
"github.com/L4B0MB4/PRYVT/identification/pkg/query/eventpolling"
"github.com/L4B0MB4/PRYVT/identification/pkg/query/httphandler"
"github.com/L4B0MB4/PRYVT/identification/pkg/query/httphandler/controller"
Expand Down Expand Up @@ -41,8 +42,10 @@ func main() {
aut := auth.NewAuthMiddleware(tokenManager)
h := httphandler.NewHttpHandler(uc, aut)

eventPolling := eventpolling.NewEventPolling(c, eventRepo, userRepo)
go eventPolling.PollEvents(eventPolling.ProcessUserEvent)
userEventHandler := eventhandling.NewUserEventHandler(userRepo)

eventPolling := eventpolling.NewEventPolling(c, eventRepo, userEventHandler)
go eventPolling.PollEvents()

h.Start()
}
Original file line number Diff line number Diff line change
@@ -1,20 +1,31 @@
package eventpolling
package eventhandling

import (
"github.com/L4B0MB4/EVTSRC/pkg/models"
"github.com/L4B0MB4/PRYVT/identification/pkg/aggregates"
"github.com/L4B0MB4/PRYVT/identification/pkg/query/store/repository"
"github.com/google/uuid"
"github.com/rs/zerolog/log"
)

func (ep *EventPolling) ProcessUserEvent(event models.Event) error {
type UserEventHandler struct {
userRepo *repository.UserRepository
}

func NewUserEventHandler(userRepo *repository.UserRepository) *UserEventHandler {
return &UserEventHandler{
userRepo: userRepo,
}
}

func (eh *UserEventHandler) HandleEvent(event models.Event) error {
if event.AggregateType == "user" {
ua, err := aggregates.NewUserAggregate(uuid.MustParse(event.AggregateId))
if err != nil {
return err
}
uI := aggregates.GetUserModelFromAggregate(ua)
err = ep.userRepo.AddOrReplaceUser(uI)
err = eh.userRepo.AddOrReplaceUser(uI)
if err != nil {
log.Err(err).Msg("Error while processing user event")
return err
Expand Down
7 changes: 7 additions & 0 deletions pkg/query/eventpolling/event_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package eventpolling

import "github.com/L4B0MB4/EVTSRC/pkg/models"

type EventHanlder interface {
HandleEvent(event models.Event) error
}
17 changes: 8 additions & 9 deletions pkg/query/eventpolling/polling.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,24 @@ import (
"time"

"github.com/L4B0MB4/EVTSRC/pkg/client"
"github.com/L4B0MB4/EVTSRC/pkg/models"
"github.com/L4B0MB4/PRYVT/identification/pkg/query/store/repository"
"github.com/rs/zerolog/log"
)

type EventPolling struct {
client *client.EventSourcingHttpClient
eventRepo *repository.EventRepository
userRepo *repository.UserRepository
client *client.EventSourcingHttpClient
eventRepo *repository.EventRepository
eventHandler EventHanlder
}

func NewEventPolling(client *client.EventSourcingHttpClient, eventRepo *repository.EventRepository, userRepo *repository.UserRepository) *EventPolling {
if client == nil || eventRepo == nil || userRepo == nil {
func NewEventPolling(client *client.EventSourcingHttpClient, eventRepo *repository.EventRepository, eventHandler EventHanlder) *EventPolling {
if client == nil || eventRepo == nil || eventHandler == nil {
return nil
}
return &EventPolling{client: client, eventRepo: eventRepo, userRepo: userRepo}
return &EventPolling{client: client, eventRepo: eventRepo, eventHandler: eventHandler}
}

func (ep *EventPolling) PollEvents(callback func(event models.Event) error) {
func (ep *EventPolling) PollEvents() {

hadMoreThenZeroEvents := true
for {
Expand All @@ -44,7 +43,7 @@ func (ep *EventPolling) PollEvents(callback func(event models.Event) error) {

for _, event := range events {

err := callback(event)
err := ep.eventHandler.HandleEvent(event)
if err != nil {
log.Err(err).Msg("Error while processing event")
break
Expand Down

0 comments on commit 6b932f8

Please sign in to comment.