From a7848fc0b02b64f24d82ecbc7f536087f206fe5d Mon Sep 17 00:00:00 2001 From: Vladyslav Pavlenko Date: Fri, 12 Jul 2024 14:31:52 +0300 Subject: [PATCH 1/4] refactor: separate subscriber service --- internal/app/setup.go | 34 ++++++++++-------- internal/handlers/handlers.go | 4 +-- internal/handlers/repository.go | 13 ++++--- internal/notifier/notifier.go | 20 +++++------ .../consumed_event.go | 4 +-- .../{gormrepo => gormstorage}/driver.go | 8 ++--- .../{gormrepo => gormstorage}/event.go | 6 ++-- .../{gormrepo => gormstorage}/offset.go | 6 ++-- .../gormsubscriber/subscriber.go} | 36 ++++++++++++------- 9 files changed, 74 insertions(+), 57 deletions(-) rename internal/storage/{gormrepo => gormstorage}/consumed_event.go (79%) rename internal/storage/{gormrepo => gormstorage}/driver.go (88%) rename internal/storage/{gormrepo => gormstorage}/event.go (81%) rename internal/storage/{gormrepo => gormstorage}/offset.go (81%) rename internal/{storage/gormrepo/subscription.go => subscriber/gormsubscriber/subscriber.go} (60%) diff --git a/internal/app/setup.go b/internal/app/setup.go index 1738251..cf8c9d1 100644 --- a/internal/app/setup.go +++ b/internal/app/setup.go @@ -5,6 +5,10 @@ import ( "log" "net/http" + "github.com/vladyslavpavlenko/genesis-api-project/internal/subscriber/gormsubscriber" + + "github.com/vladyslavpavlenko/genesis-api-project/internal/storage/gormstorage" + notifierpkg "github.com/vladyslavpavlenko/genesis-api-project/internal/notifier" "github.com/vladyslavpavlenko/genesis-api-project/internal/outbox/gormoutbox" producerpkg "github.com/vladyslavpavlenko/genesis-api-project/internal/outbox/producer" @@ -13,8 +17,6 @@ import ( "github.com/vladyslavpavlenko/genesis-api-project/internal/app/config" "github.com/vladyslavpavlenko/genesis-api-project/internal/models" - "github.com/vladyslavpavlenko/genesis-api-project/internal/storage/gormrepo" - "github.com/vladyslavpavlenko/genesis-api-project/internal/rateapi" "github.com/vladyslavpavlenko/genesis-api-project/internal/rateapi/chain" "gopkg.in/gomail.v2" @@ -36,11 +38,12 @@ type envVariables struct { } type services struct { - DBConn *gormrepo.Connection - Sender *email.GomailSender - Fetcher *chain.Node - Notifier *notifierpkg.Notifier - Outbox producerpkg.Outbox + DBConn *gormstorage.Connection + Sender *email.GomailSender + Fetcher *chain.Node + Notifier *notifierpkg.Notifier + Subscriber *gormsubscriber.Subscriber + Outbox producerpkg.Outbox } func setup(app *config.AppConfig) (*services, error) { @@ -78,12 +81,15 @@ func setup(app *config.AppConfig) (*services, error) { return nil, fmt.Errorf("failed to create outbox: %w", err) } - notifier := notifierpkg.NewNotifier(dbConn, fetcher, outbox) + subscriber := gormsubscriber.NewSubscriber(dbConn.DB()) + + notifier := notifierpkg.NewNotifier(subscriber, fetcher, outbox) repo := handlers.NewRepo(app, &handlers.Services{ - Fetcher: fetcher, - Notifier: notifier, - }, dbConn) + Fetcher: fetcher, + Notifier: notifier, + Subscriber: subscriber, + }) handlers.NewHandlers(repo) return &services{ @@ -105,8 +111,8 @@ func readEnv() (envVariables, error) { } // connectDB sets up a GORM database connection and returns an interface. -func connectDB(dsn string) (*gormrepo.Connection, error) { - var conn gormrepo.Connection +func connectDB(dsn string) (*gormstorage.Connection, error) { + var conn gormstorage.Connection err := conn.Setup(dsn) if err != nil { @@ -117,7 +123,7 @@ func connectDB(dsn string) (*gormrepo.Connection, error) { } // migrateDB runs database migrations. -func migrateDB(conn *gormrepo.Connection) error { +func migrateDB(conn *gormstorage.Connection) error { log.Println("Running migrations...") err := conn.Migrate(&models.Subscription{}, &outboxpkg.Event{}) diff --git a/internal/handlers/handlers.go b/internal/handlers/handlers.go index b707d4a..a7a9e6f 100644 --- a/internal/handlers/handlers.go +++ b/internal/handlers/handlers.go @@ -67,7 +67,7 @@ func (m *Repository) Subscribe(w http.ResponseWriter, r *http.Request) { return } - err = m.DB.AddSubscription(email) + err = m.Services.Subscriber.AddSubscription(email) if err != nil { _ = jsonutils.ErrorJSON(w, err, http.StatusInternalServerError) return @@ -89,7 +89,7 @@ func (m *Repository) Unsubscribe(w http.ResponseWriter, r *http.Request) { return } - err = m.DB.DeleteSubscription(email) + err = m.Services.Subscriber.DeleteSubscription(email) if err != nil { _ = jsonutils.ErrorJSON(w, err, http.StatusInternalServerError) return diff --git a/internal/handlers/repository.go b/internal/handlers/repository.go index 9dbdc11..ecdc25a 100644 --- a/internal/handlers/repository.go +++ b/internal/handlers/repository.go @@ -10,19 +10,19 @@ import ( type ( // Services is the repository type. Services struct { - Fetcher rateapi.Fetcher - Notifier *notifier.Notifier + Fetcher rateapi.Fetcher + Notifier *notifier.Notifier + Subscriber subscriber } // Repository is the repository type Repository struct { App *config.AppConfig - DB dbConnection Services *Services } - // dbConnection defines an interface for the database connection. - dbConnection interface { + // subscriber defines an interface for managing subscriptions. + subscriber interface { AddSubscription(emailAddr string) error DeleteSubscription(emailAddr string) error GetSubscriptions(limit, offset int) ([]models.Subscription, error) @@ -33,10 +33,9 @@ type ( var Repo *Repository // NewRepo creates a new Repository -func NewRepo(a *config.AppConfig, services *Services, conn dbConnection) *Repository { +func NewRepo(a *config.AppConfig, services *Services) *Repository { return &Repository{ App: a, - DB: conn, Services: services, } } diff --git a/internal/notifier/notifier.go b/internal/notifier/notifier.go index a12ae35..2e0d88a 100644 --- a/internal/notifier/notifier.go +++ b/internal/notifier/notifier.go @@ -12,8 +12,8 @@ import ( const batchSize = 100 -// dbConnection defines an interface for the database connection. -type dbConnection interface { +// subscriber defines an interface for managing subscribers. +type subscriber interface { GetSubscriptions(limit, offset int) ([]models.Subscription, error) } @@ -28,17 +28,17 @@ type outbox interface { } type Notifier struct { - DB dbConnection - Fetcher fetcher - Outbox outbox + Subscriber subscriber + Fetcher fetcher + Outbox outbox } // NewNotifier creates a new Notifier. -func NewNotifier(db dbConnection, f fetcher, o outbox) *Notifier { +func NewNotifier(s subscriber, f fetcher, o outbox) *Notifier { return &Notifier{ - DB: db, - Fetcher: f, - Outbox: o, + Subscriber: s, + Fetcher: f, + Outbox: o, } } @@ -57,7 +57,7 @@ func (n *Notifier) Start() error { var offset int errChan := make(chan error, 1) for { - subscriptions, err := n.DB.GetSubscriptions(batchSize, offset) + subscriptions, err := n.Subscriber.GetSubscriptions(batchSize, offset) if err != nil { return err } diff --git a/internal/storage/gormrepo/consumed_event.go b/internal/storage/gormstorage/consumed_event.go similarity index 79% rename from internal/storage/gormrepo/consumed_event.go rename to internal/storage/gormstorage/consumed_event.go index eb5fca8..c97db66 100644 --- a/internal/storage/gormrepo/consumed_event.go +++ b/internal/storage/gormstorage/consumed_event.go @@ -1,4 +1,4 @@ -package gormrepo +package gormstorage import ( "context" @@ -8,7 +8,7 @@ import ( // AddConsumedEvent creates a new consumer.ConsumedEvent record. func (c *Connection) AddConsumedEvent(event consumer.ConsumedEvent) error { - ctx, cancel := context.WithTimeout(context.Background(), timeout) + ctx, cancel := context.WithTimeout(context.Background(), RequestTimeout) defer cancel() result := c.db.WithContext(ctx).Create(event) diff --git a/internal/storage/gormrepo/driver.go b/internal/storage/gormstorage/driver.go similarity index 88% rename from internal/storage/gormrepo/driver.go rename to internal/storage/gormstorage/driver.go index 55f8130..7fc79a7 100644 --- a/internal/storage/gormrepo/driver.go +++ b/internal/storage/gormstorage/driver.go @@ -1,4 +1,4 @@ -package gormrepo +package gormstorage import ( "context" @@ -10,7 +10,7 @@ import ( "gorm.io/gorm" ) -const timeout = time.Second * 5 +const RequestTimeout = time.Second * 5 type Connection struct { db *gorm.DB @@ -65,7 +65,7 @@ func (c *Connection) Close() error { // Migrate performs a database migration for given models. func (c *Connection) Migrate(models ...any) error { - ctx, cancel := context.WithTimeout(context.Background(), timeout) + ctx, cancel := context.WithTimeout(context.Background(), RequestTimeout) defer cancel() err := c.db.WithContext(ctx).AutoMigrate(models...) @@ -78,7 +78,7 @@ func (c *Connection) Migrate(models ...any) error { // BeginTransaction begins a transaction. func (c *Connection) BeginTransaction() (*gorm.DB, error) { - ctx, cancel := context.WithTimeout(context.Background(), timeout) + ctx, cancel := context.WithTimeout(context.Background(), RequestTimeout) defer cancel() tx := c.db.WithContext(ctx).Begin() diff --git a/internal/storage/gormrepo/event.go b/internal/storage/gormstorage/event.go similarity index 81% rename from internal/storage/gormrepo/event.go rename to internal/storage/gormstorage/event.go index 0b4f747..e435aed 100644 --- a/internal/storage/gormrepo/event.go +++ b/internal/storage/gormstorage/event.go @@ -1,4 +1,4 @@ -package gormrepo +package gormstorage import ( "context" @@ -8,7 +8,7 @@ import ( // AddEvent creates a new outbox.Event record. func (c *Connection) AddEvent(event *outbox.Event) error { - ctx, cancel := context.WithTimeout(context.Background(), timeout) + ctx, cancel := context.WithTimeout(context.Background(), RequestTimeout) defer cancel() result := c.db.WithContext(ctx).Create(event) @@ -21,7 +21,7 @@ func (c *Connection) AddEvent(event *outbox.Event) error { // FetchUnpublishedEvents retrieves all events from the database that have not been // published after the specified offset. func (c *Connection) FetchUnpublishedEvents(lastOffset uint) ([]outbox.Event, error) { - ctx, cancel := context.WithTimeout(context.Background(), timeout) + ctx, cancel := context.WithTimeout(context.Background(), RequestTimeout) defer cancel() var events []outbox.Event diff --git a/internal/storage/gormrepo/offset.go b/internal/storage/gormstorage/offset.go similarity index 81% rename from internal/storage/gormrepo/offset.go rename to internal/storage/gormstorage/offset.go index dbcc22a..6db88c2 100644 --- a/internal/storage/gormrepo/offset.go +++ b/internal/storage/gormstorage/offset.go @@ -1,4 +1,4 @@ -package gormrepo +package gormstorage import ( "context" @@ -8,7 +8,7 @@ import ( // GetLastOffset retrieves the last offset for a given topic from the database. func (c *Connection) GetLastOffset(topic string, partition int) (producer.Offset, error) { - ctx, cancel := context.WithTimeout(context.Background(), timeout) + ctx, cancel := context.WithTimeout(context.Background(), RequestTimeout) defer cancel() var lastOffset producer.Offset @@ -22,7 +22,7 @@ func (c *Connection) GetLastOffset(topic string, partition int) (producer.Offset // UpdateOffset updates the offset in the database to reflect the latest published // event's ID. func (c *Connection) UpdateOffset(offset *producer.Offset) error { - ctx, cancel := context.WithTimeout(context.Background(), timeout) + ctx, cancel := context.WithTimeout(context.Background(), RequestTimeout) defer cancel() return c.db.WithContext(ctx).Save(offset).Error diff --git a/internal/storage/gormrepo/subscription.go b/internal/subscriber/gormsubscriber/subscriber.go similarity index 60% rename from internal/storage/gormrepo/subscription.go rename to internal/subscriber/gormsubscriber/subscriber.go index 912aa39..3827c1c 100644 --- a/internal/storage/gormrepo/subscription.go +++ b/internal/subscriber/gormsubscriber/subscriber.go @@ -1,13 +1,14 @@ -package gormrepo +package gormsubscriber import ( "context" "errors" "time" - "github.com/vladyslavpavlenko/genesis-api-project/internal/models" - "github.com/jackc/pgx/v5/pgconn" + "github.com/vladyslavpavlenko/genesis-api-project/internal/models" + "github.com/vladyslavpavlenko/genesis-api-project/internal/storage/gormstorage" + "gorm.io/gorm" ) var ( @@ -15,16 +16,27 @@ var ( ErrorNonExistentSubscription = errors.New("subscription does not exist") ) +type Subscriber struct { + db *gorm.DB +} + +// NewSubscriber creates a new Subscriber. +func NewSubscriber(db *gorm.DB) *Subscriber { + return &Subscriber{ + db: db, + } +} + // AddSubscription creates a new models.Subscription record. -func (c *Connection) AddSubscription(email string) error { - ctx, cancel := context.WithTimeout(context.Background(), timeout) +func (s *Subscriber) AddSubscription(email string) error { + ctx, cancel := context.WithTimeout(context.Background(), gormstorage.RequestTimeout) defer cancel() subscription := models.Subscription{ Email: email, CreatedAt: time.Now(), } - result := c.db.WithContext(ctx).Create(&subscription) + result := s.db.WithContext(ctx).Create(&subscription) if result.Error != nil { var pgErr *pgconn.PgError if errors.As(result.Error, &pgErr) && pgErr.Code == "23505" { @@ -36,11 +48,11 @@ func (c *Connection) AddSubscription(email string) error { } // DeleteSubscription deletes a models.Subscription record. -func (c *Connection) DeleteSubscription(email string) error { - ctx, cancel := context.WithTimeout(context.Background(), timeout) +func (s *Subscriber) DeleteSubscription(email string) error { + ctx, cancel := context.WithTimeout(context.Background(), gormstorage.RequestTimeout) defer cancel() - result := c.db.WithContext(ctx).Where("email = ?", email).Delete(&models.Subscription{}) + result := s.db.WithContext(ctx).Where("email = ?", email).Delete(&models.Subscription{}) if result.Error != nil { return result.Error } @@ -55,12 +67,12 @@ func (c *Connection) DeleteSubscription(email string) error { // GetSubscriptions returns a paginated list of subscriptions. Limit specifies the number of records to be retrieved // Limit conditions can be canceled by using `Limit(-1)`. Offset specify the number of records to skip before starting // to return the records. Offset conditions can be canceled by using `Offset(-1)`. -func (c *Connection) GetSubscriptions(limit, offset int) ([]models.Subscription, error) { - ctx, cancel := context.WithTimeout(context.Background(), timeout) +func (s *Subscriber) GetSubscriptions(limit, offset int) ([]models.Subscription, error) { + ctx, cancel := context.WithTimeout(context.Background(), gormstorage.RequestTimeout) defer cancel() var subscriptions []models.Subscription - result := c.db.WithContext(ctx).Limit(limit).Offset(offset).Find(&subscriptions) + result := s.db.WithContext(ctx).Limit(limit).Offset(offset).Find(&subscriptions) if result.Error != nil { return nil, result.Error } From 20981c8e83c54fecdd183df3c5e9731239a5480d Mon Sep 17 00:00:00 2001 From: Vladyslav Pavlenko Date: Fri, 12 Jul 2024 14:32:15 +0300 Subject: [PATCH 2/4] test: update tests for handlers package --- internal/handlers/repository_test.go | 23 ++++++++++------------- 1 file changed, 10 insertions(+), 13 deletions(-) diff --git a/internal/handlers/repository_test.go b/internal/handlers/repository_test.go index f0fc821..7969bfb 100644 --- a/internal/handlers/repository_test.go +++ b/internal/handlers/repository_test.go @@ -4,8 +4,6 @@ import ( "context" "testing" - "github.com/vladyslavpavlenko/genesis-api-project/internal/storage/gormrepo" - "github.com/stretchr/testify/mock" "github.com/vladyslavpavlenko/genesis-api-project/internal/email" "github.com/vladyslavpavlenko/genesis-api-project/internal/models" @@ -25,21 +23,21 @@ func (m *MockSender) Send(cfg email.Config, params email.Params) error { return args.Error(0) } -type MockDB struct { +type MockSubscriber struct { mock.Mock } -func (m *MockDB) GetSubscriptions(_, _ int) ([]models.Subscription, error) { +func (m *MockSubscriber) GetSubscriptions(_, _ int) ([]models.Subscription, error) { args := m.Called() return args.Get(0).([]models.Subscription), args.Error(1) } -func (m *MockDB) AddSubscription(emailAddr string) error { +func (m *MockSubscriber) AddSubscription(emailAddr string) error { args := m.Called(emailAddr) return args.Error(0) } -func (m *MockDB) DeleteSubscription(emailAddr string) error { +func (m *MockSubscriber) DeleteSubscription(emailAddr string) error { args := m.Called(emailAddr) return args.Error(0) } @@ -57,11 +55,10 @@ func (m *MockFetcher) Fetch(ctx context.Context, base, target string) (string, e func TestNewRepo(t *testing.T) { appConfig := &config.AppConfig{} services := &handlers.Services{ - Fetcher: &MockFetcher{}, + Fetcher: &MockFetcher{}, + Subscriber: &MockSubscriber{}, } - dbConn := &gormrepo.Connection{} - - repo := handlers.NewRepo(appConfig, services, dbConn) + repo := handlers.NewRepo(appConfig, services) assert.NotNil(t, repo) assert.Equal(t, appConfig, repo.App) @@ -72,11 +69,11 @@ func TestNewRepo(t *testing.T) { func TestNewHandlers(t *testing.T) { appConfig := &config.AppConfig{} services := &handlers.Services{ - Fetcher: &MockFetcher{}, + Fetcher: &MockFetcher{}, + Subscriber: &MockSubscriber{}, } - dbConn := &gormrepo.Connection{} - repo := handlers.NewRepo(appConfig, services, dbConn) + repo := handlers.NewRepo(appConfig, services) handlers.NewHandlers(repo) assert.Equal(t, repo, handlers.Repo) From 55c9a08e3c4c84ea4398ea0fbb122677d53a26dc Mon Sep 17 00:00:00 2001 From: Vladyslav Pavlenko Date: Fri, 12 Jul 2024 15:48:49 +0300 Subject: [PATCH 3/4] feat: orchestrate saga flow for creating subscriptions --- go.mod | 1 + go.sum | 2 + internal/subscriber/gormsubscriber/actions.go | 59 ++++++++ .../gormsubscriber/compensations.go | 21 +++ internal/subscriber/gormsubscriber/saga.go | 127 ++++++++++++++++++ .../subscriber/gormsubscriber/subscriber.go | 28 ++-- 6 files changed, 222 insertions(+), 16 deletions(-) create mode 100644 internal/subscriber/gormsubscriber/actions.go create mode 100644 internal/subscriber/gormsubscriber/compensations.go create mode 100644 internal/subscriber/gormsubscriber/saga.go diff --git a/go.mod b/go.mod index b4ba7eb..716d6d2 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( github.com/go-chi/chi v1.5.5 github.com/go-chi/chi/v5 v5.0.12 github.com/golang/mock v1.6.0 + github.com/google/uuid v1.6.0 github.com/jackc/pgx/v5 v5.4.3 github.com/kelseyhightower/envconfig v1.4.0 github.com/pkg/errors v0.9.1 diff --git a/go.sum b/go.sum index 569e833..351d30b 100644 --- a/go.sum +++ b/go.sum @@ -15,6 +15,8 @@ github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+Licev github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/influxdata/tdigest v0.0.1 h1:XpFptwYmnEKUqmkcDjrzffswZ3nvNeevbUSLPP/ZzIY= github.com/influxdata/tdigest v0.0.1/go.mod h1:Z0kXnxzbTC2qrx4NaIzYkE1k66+6oEDQTvL95hQFh5Y= github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= diff --git a/internal/subscriber/gormsubscriber/actions.go b/internal/subscriber/gormsubscriber/actions.go new file mode 100644 index 0000000..af22c40 --- /dev/null +++ b/internal/subscriber/gormsubscriber/actions.go @@ -0,0 +1,59 @@ +package gormsubscriber + +import ( + "context" + "errors" + "time" + + "github.com/jackc/pgx/v5/pgconn" + "github.com/vladyslavpavlenko/genesis-api-project/internal/email" + "github.com/vladyslavpavlenko/genesis-api-project/internal/models" + "github.com/vladyslavpavlenko/genesis-api-project/internal/storage/gormstorage" +) + +var ErrorInvalidEmail = errors.New("invalid email") + +// validateSubscription is an action that validates a subscription by validating an +// email address and checking if it already exists. +func validateSubscription(saga *State, s *Subscriber) error { + // Validate the email format + if !email.Email(saga.Email).Validate() { + return ErrorInvalidEmail + } + + // Check if the subscription already exists + ctx, cancel := context.WithTimeout(context.Background(), gormstorage.RequestTimeout) + defer cancel() + + var count int64 + err := s.db.WithContext(ctx).Model(&models.Subscription{}).Where("email = ?", saga.Email).Count(&count).Error + if err != nil { + return err + } + + if count > 0 { + return ErrorDuplicateSubscription + } + + return nil +} + +// addSubscription is an action that creates a new models.Subscription record. +func addSubscription(saga *State, s *Subscriber) error { + ctx, cancel := context.WithTimeout(context.Background(), gormstorage.RequestTimeout) + defer cancel() + + subscription := models.Subscription{ + Email: saga.Email, + CreatedAt: time.Now(), + } + result := s.db.WithContext(ctx).Create(&subscription) + if result.Error != nil { + var pgErr *pgconn.PgError + if errors.As(result.Error, &pgErr) && pgErr.Code == "23505" { + return ErrorDuplicateSubscription + } + return result.Error + } + return nil +} diff --git a/internal/subscriber/gormsubscriber/compensations.go b/internal/subscriber/gormsubscriber/compensations.go new file mode 100644 index 0000000..ea0a208 --- /dev/null +++ b/internal/subscriber/gormsubscriber/compensations.go @@ -0,0 +1,21 @@ +package gormsubscriber + +import ( + "context" + + "github.com/vladyslavpavlenko/genesis-api-project/internal/models" + "github.com/vladyslavpavlenko/genesis-api-project/internal/storage/gormstorage" +) + +// deleteSubscription is a compensation to addSubscription that deletes a +// models.Subscription record, queried by an email address. +func deleteSubscription(saga *State, s *Subscriber) error { + ctx, cancel := context.WithTimeout(context.Background(), gormstorage.RequestTimeout) + defer cancel() + + result := s.db.WithContext(ctx).Where("email = ?", saga.Email).Delete(&models.Subscription{}) + if result.Error != nil { + return result.Error + } + return nil +} diff --git a/internal/subscriber/gormsubscriber/saga.go b/internal/subscriber/gormsubscriber/saga.go new file mode 100644 index 0000000..a7ad42b --- /dev/null +++ b/internal/subscriber/gormsubscriber/saga.go @@ -0,0 +1,127 @@ +package gormsubscriber + +import ( + "context" + + "github.com/google/uuid" + "github.com/pkg/errors" + "github.com/vladyslavpavlenko/genesis-api-project/internal/storage/gormstorage" + "gorm.io/gorm" +) + +const ( + StatusCompleted = "completed" + StatusInProgress = "in_progress" + StatusFailed = "failed" +) + +// State represents the current state of the SAGA transaction. +type State struct { + ID string `gorm:"primary_key"` + CurrentStep int + Email string + IsCompensating bool + Status string // StatusCompleted, StatusInProgress, StatusFailed +} + +// Step represents a single step in the SAGA transaction. +type Step struct { + Action func(saga *State, s *Subscriber) error + Compensation func(saga *State, s *Subscriber) error +} + +// Orchestrator manages the execution of SAGA steps. +type Orchestrator struct { + Steps []Step + State State + db *gorm.DB +} + +// NewSagaOrchestrator creates a new SAGA Orchestrator. +func NewSagaOrchestrator(email string, db *gorm.DB) (*Orchestrator, error) { + err := db.AutoMigrate(&State{}) + if err != nil { + return nil, errors.Wrap(err, "failed to migrate events") + } + + return &Orchestrator{ + Steps: []Step{ + { + Action: validateSubscription, + Compensation: nil, + }, + { + Action: addSubscription, + Compensation: deleteSubscription, + }, + }, + State: State{ + ID: uuid.New().String(), + CurrentStep: 0, + Email: email, + IsCompensating: false, + Status: StatusInProgress, + }, + db: db, + }, nil +} + +// Run runs the SAGA Orchestrator. +func (o *Orchestrator) Run(s *Subscriber) error { + for o.State.CurrentStep < len(o.Steps) { + step := o.Steps[o.State.CurrentStep] + var err error + + if o.State.IsCompensating { + if step.Compensation != nil { + err = step.Compensation(&o.State, s) + } + } else { + err = step.Action(&o.State, s) + } + + if err != nil { + o.State.IsCompensating = true + err = o.saveState() + if err != nil { + return err + } + continue + } + + if o.State.IsCompensating { + o.State.CurrentStep-- + if o.State.CurrentStep < 0 { + o.State.Status = StatusFailed + err = o.saveState() + if err != nil { + return err + } + return err + } + } else { + o.State.CurrentStep++ + } + err = o.saveState() + if err != nil { + return err + } + } + + if !o.State.IsCompensating { + o.State.Status = StatusCompleted + err := o.saveState() + if err != nil { + return err + } + } + return nil +} + +// saveState saves the SAGA transaction to the database. +func (o *Orchestrator) saveState() error { + ctx, cancel := context.WithTimeout(context.Background(), gormstorage.RequestTimeout) + defer cancel() + + return o.db.WithContext(ctx).Save(&o.State).Error +} diff --git a/internal/subscriber/gormsubscriber/subscriber.go b/internal/subscriber/gormsubscriber/subscriber.go index 3827c1c..b6c575a 100644 --- a/internal/subscriber/gormsubscriber/subscriber.go +++ b/internal/subscriber/gormsubscriber/subscriber.go @@ -2,10 +2,8 @@ package gormsubscriber import ( "context" - "errors" - "time" - "github.com/jackc/pgx/v5/pgconn" + "github.com/pkg/errors" "github.com/vladyslavpavlenko/genesis-api-project/internal/models" "github.com/vladyslavpavlenko/genesis-api-project/internal/storage/gormstorage" "gorm.io/gorm" @@ -14,6 +12,7 @@ import ( var ( ErrorDuplicateSubscription = errors.New("subscription already exists") ErrorNonExistentSubscription = errors.New("subscription does not exist") + ErrorSubscriptionFailed = errors.New("subscription failed") ) type Subscriber struct { @@ -29,22 +28,19 @@ func NewSubscriber(db *gorm.DB) *Subscriber { // AddSubscription creates a new models.Subscription record. func (s *Subscriber) AddSubscription(email string) error { - ctx, cancel := context.WithTimeout(context.Background(), gormstorage.RequestTimeout) - defer cancel() + orchestrator, err := NewSagaOrchestrator(email, s.db) + if err != nil { + return errors.Wrap(err, "failed to create orchestrator") + } - subscription := models.Subscription{ - Email: email, - CreatedAt: time.Now(), + err = orchestrator.Run(s) + if err != nil { + return err } - result := s.db.WithContext(ctx).Create(&subscription) - if result.Error != nil { - var pgErr *pgconn.PgError - if errors.As(result.Error, &pgErr) && pgErr.Code == "23505" { - return ErrorDuplicateSubscription - } - return result.Error + if orchestrator.State.Status == StatusCompleted { + return nil } - return nil + return err } // DeleteSubscription deletes a models.Subscription record. From 17078b1a73b9564f37e3b0d2acca6f67a5daa674 Mon Sep 17 00:00:00 2001 From: Vladyslav Pavlenko Date: Fri, 12 Jul 2024 15:50:57 +0300 Subject: [PATCH 4/4] fix: update github linter triggers --- .github/workflows/golangci-lint.yml | 2 -- 1 file changed, 2 deletions(-) diff --git a/.github/workflows/golangci-lint.yml b/.github/workflows/golangci-lint.yml index b286ce4..b3bb7f2 100644 --- a/.github/workflows/golangci-lint.yml +++ b/.github/workflows/golangci-lint.yml @@ -2,8 +2,6 @@ name: Lint on: push: - branches: - - main pull_request: branches: - main