From a4fd254e94a1f43cc4376ca9cb18067c029c09a6 Mon Sep 17 00:00:00 2001 From: mina1460 Date: Mon, 29 Jan 2024 15:05:46 +0200 Subject: [PATCH] some refactoring and fixing errors --- internal/jimm/model.go | 21 ++++++++++++++------- internal/jimm/river.go | 9 +++------ 2 files changed, 17 insertions(+), 13 deletions(-) diff --git a/internal/jimm/model.go b/internal/jimm/model.go index a94cc131a..d1f8f2573 100644 --- a/internal/jimm/model.go +++ b/internal/jimm/model.go @@ -622,10 +622,12 @@ func (j *JIMM) AddModel(ctx context.Context, user *openfga.User, args *ModelCrea OwnerName: builder.owner.Username, ModelInfoUUID: mi.UUID, } - insertAndWait(ctx, j.River, func() (*rivertype.JobRow, error) { - return j.River.Client.Insert(ctx, openfgaRiverJobArgs, &river.InsertOpts{MaxAttempts: min(1, int(j.River.MaxAttempts))}) + err = insertAndWait(ctx, j.River, func() (*rivertype.JobRow, error) { + return j.River.Client.Insert(ctx, openfgaRiverJobArgs, &river.InsertOpts{MaxAttempts: j.River.MaxAttempts}) }) - + if err != nil { + return nil, errors.E(err, "Failed to insert and wait for the river job.") + } return mi, nil } @@ -640,12 +642,17 @@ func insertAndWait(ctx context.Context, r *River, insertFunc func() (*rivertype. row, err := insertFunc() if err != nil { zapctx.Error(ctx, "failed to insert river job", zaputil.Error(err)) - return errors.E(err) + return errors.E(err, "Failed to insert river job.") } if wait { - for item := range results { - if item.Job.ID == row.ID { - break + for { + select { + case item := <-results: + if item.Job.ID == row.ID { + return nil + } + case <-ctx.Done(): + return ctx.Err() } } } diff --git a/internal/jimm/river.go b/internal/jimm/river.go index 1813a2ead..fbde08b64 100644 --- a/internal/jimm/river.go +++ b/internal/jimm/river.go @@ -100,12 +100,6 @@ func (r *River) Cleanup(ctx context.Context) error { return r.Client.Stop(ctx) } -// ForceCleanup exits without waiting for running jobs by stopping them forefully and exits ungracefully. -func (r *River) ForceCleanup(ctx context.Context) error { - defer r.dbPool.Close() - return r.Client.StopAndCancel(ctx) -} - func (r *River) doMigration(ctx context.Context) error { migrator := rivermigrate.New(riverpgxv5.New(r.dbPool), nil) tx, err := r.dbPool.Begin(ctx) @@ -163,14 +157,17 @@ func NewRiver(ctx context.Context, newRiverArgs NewRiverArgs) (*River, error) { } riverClient, err := river.NewClient(riverpgxv5.New(dbPool), newRiverArgs.Config) if err != nil { + dbPool.Close() return nil, err } if err := riverClient.Start(ctx); err != nil { + dbPool.Close() return nil, err } r := River{Client: riverClient, dbPool: dbPool, MaxAttempts: maxAttempts} err = r.doMigration(ctx) if err != nil { + dbPool.Close() return nil, err }