Skip to content

Commit

Permalink
Refactor event polling to use callback for user event processing
Browse files Browse the repository at this point in the history
  • Loading branch information
L4B0MB4 committed Nov 29, 2024
1 parent 8d3d9de commit a1ef050
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 16 deletions.
2 changes: 1 addition & 1 deletion cmd/queryer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func main() {
h := httphandler.NewHttpHandler(uc, aut)

eventPolling := eventpolling.NewEventPolling(c, eventRepo, userRepo)
go eventPolling.PollEvents()
go eventPolling.PollEvents(eventPolling.ProcessUserEvent)

h.Start()
}
22 changes: 7 additions & 15 deletions pkg/query/eventpolling/polling.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,8 @@ import (
"time"

"github.com/L4B0MB4/EVTSRC/pkg/client"
"github.com/L4B0MB4/PRYVT/identification/pkg/aggregates"
"github.com/L4B0MB4/EVTSRC/pkg/models"
"github.com/L4B0MB4/PRYVT/identification/pkg/query/store/repository"
"github.com/google/uuid"
"github.com/rs/zerolog/log"
)

Expand All @@ -23,7 +22,7 @@ func NewEventPolling(client *client.EventSourcingHttpClient, eventRepo *reposito
return &EventPolling{client: client, eventRepo: eventRepo, userRepo: userRepo}
}

func (ep *EventPolling) PollEvents() {
func (ep *EventPolling) PollEvents(callback func(event models.Event) error) {

hadMoreThenZeroEvents := true
for {
Expand All @@ -44,18 +43,11 @@ func (ep *EventPolling) PollEvents() {
}

for _, event := range events {
if event.AggregateType == "user" {
ua, err := aggregates.NewUserAggregate(uuid.MustParse(event.AggregateId))
if err != nil {
log.Err(err).Msg("Error while creating user aggregate")
break
}
uI := aggregates.GetUserModelFromAggregate(ua)
err = ep.userRepo.AddOrReplaceUser(uI)
if err != nil {
log.Err(err).Msg("Error while adding or replacing user")
break
}

err := callback(event)
if err != nil {
log.Err(err).Msg("Error while processing event")
break
}
}
if len(events) == 0 {
Expand Down
24 changes: 24 additions & 0 deletions pkg/query/eventpolling/userevents.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package eventpolling

import (
"github.com/L4B0MB4/EVTSRC/pkg/models"
"github.com/L4B0MB4/PRYVT/identification/pkg/aggregates"
"github.com/google/uuid"
"github.com/rs/zerolog/log"
)

func (ep *EventPolling) ProcessUserEvent(event models.Event) error {
if event.AggregateType == "user" {
ua, err := aggregates.NewUserAggregate(uuid.MustParse(event.AggregateId))
if err != nil {
return err
}
uI := aggregates.GetUserModelFromAggregate(ua)
err = ep.userRepo.AddOrReplaceUser(uI)
if err != nil {
log.Err(err).Msg("Error while processing user event")
return err
}
}
return nil
}

0 comments on commit a1ef050

Please sign in to comment.