Skip to content

Commit

Permalink
feat: orchestrate saga flow for creating subscriptions
Browse files Browse the repository at this point in the history
  • Loading branch information
vladyslavpavlenko committed Jul 12, 2024
1 parent 20981c8 commit 55c9a08
Show file tree
Hide file tree
Showing 6 changed files with 222 additions and 16 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/go-chi/chi v1.5.5
github.com/go-chi/chi/v5 v5.0.12
github.com/golang/mock v1.6.0
github.com/google/uuid v1.6.0
github.com/jackc/pgx/v5 v5.4.3
github.com/kelseyhightower/envconfig v1.4.0
github.com/pkg/errors v0.9.1
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+Licev
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/influxdata/tdigest v0.0.1 h1:XpFptwYmnEKUqmkcDjrzffswZ3nvNeevbUSLPP/ZzIY=
github.com/influxdata/tdigest v0.0.1/go.mod h1:Z0kXnxzbTC2qrx4NaIzYkE1k66+6oEDQTvL95hQFh5Y=
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
Expand Down
59 changes: 59 additions & 0 deletions internal/subscriber/gormsubscriber/actions.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package gormsubscriber

import (
"context"
"errors"
"time"

"github.com/jackc/pgx/v5/pgconn"
"github.com/vladyslavpavlenko/genesis-api-project/internal/email"
"github.com/vladyslavpavlenko/genesis-api-project/internal/models"
"github.com/vladyslavpavlenko/genesis-api-project/internal/storage/gormstorage"
)

var ErrorInvalidEmail = errors.New("invalid email")

// validateSubscription is an action that validates a subscription by validating an
// email address and checking if it already exists.
func validateSubscription(saga *State, s *Subscriber) error {
// Validate the email format
if !email.Email(saga.Email).Validate() {
return ErrorInvalidEmail
}

// Check if the subscription already exists
ctx, cancel := context.WithTimeout(context.Background(), gormstorage.RequestTimeout)
defer cancel()

var count int64
err := s.db.WithContext(ctx).Model(&models.Subscription{}).Where("email = ?", saga.Email).Count(&count).Error
if err != nil {
return err
}

if count > 0 {
return ErrorDuplicateSubscription
}

return nil
}

// addSubscription is an action that creates a new models.Subscription record.
func addSubscription(saga *State, s *Subscriber) error {
ctx, cancel := context.WithTimeout(context.Background(), gormstorage.RequestTimeout)
defer cancel()

subscription := models.Subscription{
Email: saga.Email,
CreatedAt: time.Now(),
}
result := s.db.WithContext(ctx).Create(&subscription)
if result.Error != nil {
var pgErr *pgconn.PgError
if errors.As(result.Error, &pgErr) && pgErr.Code == "23505" {
return ErrorDuplicateSubscription
}
return result.Error
}
return nil
}
21 changes: 21 additions & 0 deletions internal/subscriber/gormsubscriber/compensations.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package gormsubscriber

import (
"context"

"github.com/vladyslavpavlenko/genesis-api-project/internal/models"
"github.com/vladyslavpavlenko/genesis-api-project/internal/storage/gormstorage"
)

// deleteSubscription is a compensation to addSubscription that deletes a
// models.Subscription record, queried by an email address.
func deleteSubscription(saga *State, s *Subscriber) error {
ctx, cancel := context.WithTimeout(context.Background(), gormstorage.RequestTimeout)
defer cancel()

result := s.db.WithContext(ctx).Where("email = ?", saga.Email).Delete(&models.Subscription{})
if result.Error != nil {
return result.Error
}
return nil
}
127 changes: 127 additions & 0 deletions internal/subscriber/gormsubscriber/saga.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package gormsubscriber

import (
"context"

"github.com/google/uuid"
"github.com/pkg/errors"
"github.com/vladyslavpavlenko/genesis-api-project/internal/storage/gormstorage"
"gorm.io/gorm"
)

const (
StatusCompleted = "completed"
StatusInProgress = "in_progress"
StatusFailed = "failed"
)

// State represents the current state of the SAGA transaction.
type State struct {
ID string `gorm:"primary_key"`
CurrentStep int
Email string
IsCompensating bool
Status string // StatusCompleted, StatusInProgress, StatusFailed
}

// Step represents a single step in the SAGA transaction.
type Step struct {
Action func(saga *State, s *Subscriber) error
Compensation func(saga *State, s *Subscriber) error
}

// Orchestrator manages the execution of SAGA steps.
type Orchestrator struct {
Steps []Step
State State
db *gorm.DB
}

// NewSagaOrchestrator creates a new SAGA Orchestrator.
func NewSagaOrchestrator(email string, db *gorm.DB) (*Orchestrator, error) {
err := db.AutoMigrate(&State{})
if err != nil {
return nil, errors.Wrap(err, "failed to migrate events")
}

return &Orchestrator{
Steps: []Step{
{
Action: validateSubscription,
Compensation: nil,
},
{
Action: addSubscription,
Compensation: deleteSubscription,
},
},
State: State{
ID: uuid.New().String(),
CurrentStep: 0,
Email: email,
IsCompensating: false,
Status: StatusInProgress,
},
db: db,
}, nil
}

// Run runs the SAGA Orchestrator.
func (o *Orchestrator) Run(s *Subscriber) error {
for o.State.CurrentStep < len(o.Steps) {
step := o.Steps[o.State.CurrentStep]
var err error

if o.State.IsCompensating {
if step.Compensation != nil {
err = step.Compensation(&o.State, s)
}
} else {
err = step.Action(&o.State, s)
}

if err != nil {
o.State.IsCompensating = true
err = o.saveState()
if err != nil {
return err
}
continue
}

if o.State.IsCompensating {
o.State.CurrentStep--
if o.State.CurrentStep < 0 {
o.State.Status = StatusFailed
err = o.saveState()
if err != nil {
return err
}
return err
}
} else {
o.State.CurrentStep++
}
err = o.saveState()
if err != nil {
return err
}
}

if !o.State.IsCompensating {
o.State.Status = StatusCompleted
err := o.saveState()
if err != nil {
return err
}
}
return nil
}

// saveState saves the SAGA transaction to the database.
func (o *Orchestrator) saveState() error {
ctx, cancel := context.WithTimeout(context.Background(), gormstorage.RequestTimeout)
defer cancel()

return o.db.WithContext(ctx).Save(&o.State).Error
}
28 changes: 12 additions & 16 deletions internal/subscriber/gormsubscriber/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@ package gormsubscriber

import (
"context"
"errors"
"time"

"github.com/jackc/pgx/v5/pgconn"
"github.com/pkg/errors"
"github.com/vladyslavpavlenko/genesis-api-project/internal/models"
"github.com/vladyslavpavlenko/genesis-api-project/internal/storage/gormstorage"
"gorm.io/gorm"
Expand All @@ -14,6 +12,7 @@ import (
var (
ErrorDuplicateSubscription = errors.New("subscription already exists")
ErrorNonExistentSubscription = errors.New("subscription does not exist")
ErrorSubscriptionFailed = errors.New("subscription failed")
)

type Subscriber struct {
Expand All @@ -29,22 +28,19 @@ func NewSubscriber(db *gorm.DB) *Subscriber {

// AddSubscription creates a new models.Subscription record.
func (s *Subscriber) AddSubscription(email string) error {
ctx, cancel := context.WithTimeout(context.Background(), gormstorage.RequestTimeout)
defer cancel()
orchestrator, err := NewSagaOrchestrator(email, s.db)
if err != nil {
return errors.Wrap(err, "failed to create orchestrator")
}

subscription := models.Subscription{
Email: email,
CreatedAt: time.Now(),
err = orchestrator.Run(s)
if err != nil {
return err
}
result := s.db.WithContext(ctx).Create(&subscription)
if result.Error != nil {
var pgErr *pgconn.PgError
if errors.As(result.Error, &pgErr) && pgErr.Code == "23505" {
return ErrorDuplicateSubscription
}
return result.Error
if orchestrator.State.Status == StatusCompleted {
return nil
}
return nil
return err
}

// DeleteSubscription deletes a models.Subscription record.
Expand Down

0 comments on commit 55c9a08

Please sign in to comment.