Skip to content

Commit

Permalink
some refactoring and fixing errors
Browse files Browse the repository at this point in the history
  • Loading branch information
mina1460 committed Jan 29, 2024
1 parent 41d00cf commit a4fd254
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 13 deletions.
21 changes: 14 additions & 7 deletions internal/jimm/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -622,10 +622,12 @@ func (j *JIMM) AddModel(ctx context.Context, user *openfga.User, args *ModelCrea
OwnerName: builder.owner.Username,
ModelInfoUUID: mi.UUID,
}
insertAndWait(ctx, j.River, func() (*rivertype.JobRow, error) {
return j.River.Client.Insert(ctx, openfgaRiverJobArgs, &river.InsertOpts{MaxAttempts: min(1, int(j.River.MaxAttempts))})
err = insertAndWait(ctx, j.River, func() (*rivertype.JobRow, error) {
return j.River.Client.Insert(ctx, openfgaRiverJobArgs, &river.InsertOpts{MaxAttempts: j.River.MaxAttempts})
})

if err != nil {
return nil, errors.E(err, "Failed to insert and wait for the river job.")
}
return mi, nil
}

Expand All @@ -640,12 +642,17 @@ func insertAndWait(ctx context.Context, r *River, insertFunc func() (*rivertype.
row, err := insertFunc()
if err != nil {
zapctx.Error(ctx, "failed to insert river job", zaputil.Error(err))
return errors.E(err)
return errors.E(err, "Failed to insert river job.")
}
if wait {
for item := range results {
if item.Job.ID == row.ID {
break
for {
select {
case item := <-results:
if item.Job.ID == row.ID {
return nil
}
case <-ctx.Done():
return ctx.Err()
}
}
}
Expand Down
9 changes: 3 additions & 6 deletions internal/jimm/river.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,6 @@ func (r *River) Cleanup(ctx context.Context) error {
return r.Client.Stop(ctx)
}

// ForceCleanup exits without waiting for running jobs by stopping them forefully and exits ungracefully.
func (r *River) ForceCleanup(ctx context.Context) error {
defer r.dbPool.Close()
return r.Client.StopAndCancel(ctx)
}

func (r *River) doMigration(ctx context.Context) error {
migrator := rivermigrate.New(riverpgxv5.New(r.dbPool), nil)
tx, err := r.dbPool.Begin(ctx)
Expand Down Expand Up @@ -163,14 +157,17 @@ func NewRiver(ctx context.Context, newRiverArgs NewRiverArgs) (*River, error) {
}
riverClient, err := river.NewClient(riverpgxv5.New(dbPool), newRiverArgs.Config)
if err != nil {
dbPool.Close()
return nil, err
}
if err := riverClient.Start(ctx); err != nil {
dbPool.Close()
return nil, err
}
r := River{Client: riverClient, dbPool: dbPool, MaxAttempts: maxAttempts}
err = r.doMigration(ctx)
if err != nil {
dbPool.Close()
return nil, err
}

Expand Down

0 comments on commit a4fd254

Please sign in to comment.