Skip to content

Commit

Permalink
resolving Ales comments
Browse files Browse the repository at this point in the history
  • Loading branch information
mina1460 committed Jan 29, 2024
1 parent b1b1f68 commit 41d00cf
Showing 1 changed file with 20 additions and 14 deletions.
34 changes: 20 additions & 14 deletions internal/jimm/river.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/canonical/jimm/internal/db"
"github.com/canonical/jimm/internal/dbmodel"
"github.com/canonical/jimm/internal/errors"
"github.com/canonical/jimm/internal/openfga"
ofganames "github.com/canonical/jimm/internal/openfga/names"
)
Expand Down Expand Up @@ -61,7 +62,7 @@ func (w *OpenFGAWorker) Work(ctx context.Context, job *river.Job[OpenFGAArgs]) e
zap.String("controller", controllerTag.Id()),
zap.String("model", modelTag.Id()),
)
return err
return errors.E(err, "Failed to add the controller-model relation from the river job.")
}
err = openfga.NewUser(owner, w.OfgaClient).SetModelAccess(ctx, names.NewModelTag(job.Args.ModelInfoUUID), ofganames.AdministratorRelation)
if err != nil {
Expand All @@ -71,19 +72,11 @@ func (w *OpenFGAWorker) Work(ctx context.Context, job *river.Job[OpenFGAArgs]) e
zap.String("user", owner.Tag().String()),
zap.String("model", modelTag.Id()),
)
return err
return errors.E(err, "Failed to add the adminstrator relation from the river job.")
}
return nil
}

func RegisterJimmWorkers(ctx context.Context, ofgaConn *openfga.OFGAClient, db *db.Database) (*river.Workers, error) {
workers := river.NewWorkers()
if err := river.AddWorkerSafely(workers, &OpenFGAWorker{OfgaClient: ofgaConn, Database: *db}); err != nil {
return nil, err
}
return workers, nil
}

// River is the struct that holds that Client connection to river.
type River struct {
Client *river.Client[pgx.Tx]
Expand All @@ -92,11 +85,22 @@ type River struct {
RetryPolicy river.ClientRetryPolicy
}

// RegisterJimmWorkers would register known workers safely and return a pointer to a river.workers struct that should be used in river creation.
func RegisterJimmWorkers(ctx context.Context, ofgaConn *openfga.OFGAClient, db *db.Database) (*river.Workers, error) {
workers := river.NewWorkers()
if err := river.AddWorkerSafely(workers, &OpenFGAWorker{OfgaClient: ofgaConn, Database: *db}); err != nil {
return nil, err
}
return workers, nil
}

// Cleanup closes the pool on exit and waits for running jobs to finish before doing a graceful shutdown.
func (r *River) Cleanup(ctx context.Context) error {
defer r.dbPool.Close()
return r.Client.Stop(ctx)
}

// ForceCleanup exits without waiting for running jobs by stopping them forefully and exits ungracefully.
func (r *River) ForceCleanup(ctx context.Context) error {
defer r.dbPool.Close()
return r.Client.StopAndCancel(ctx)
Expand Down Expand Up @@ -125,6 +129,7 @@ func (r *River) doMigration(ctx context.Context) error {
return nil
}

// NewRiverArgs is a struct that represents
type NewRiverArgs struct {
Config *river.Config
Db *db.Database
Expand All @@ -138,11 +143,11 @@ type NewRiverArgs struct {
// and open a postgres connections pool that would be closed in the Cleanup routine.
func NewRiver(ctx context.Context, newRiverArgs NewRiverArgs) (*River, error) {
maxAttempts := max(1, newRiverArgs.MaxAttempts)
workers, err := RegisterJimmWorkers(ctx, newRiverArgs.OfgaClient, newRiverArgs.Db)
if err != nil {
return nil, err
}
if newRiverArgs.Config == nil {
workers, err := RegisterJimmWorkers(ctx, newRiverArgs.OfgaClient, newRiverArgs.Db)
if err != nil {
return nil, err
}
newRiverArgs.Config = &river.Config{
RetryPolicy: &river.DefaultClientRetryPolicy{},
Queues: map[string]river.QueueConfig{
Expand All @@ -168,5 +173,6 @@ func NewRiver(ctx context.Context, newRiverArgs NewRiverArgs) (*River, error) {
if err != nil {
return nil, err
}

return &r, nil
}

0 comments on commit 41d00cf

Please sign in to comment.