Skip to content

Commit

Permalink
Addresses review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
vyzaldysanchez committed Oct 7, 2024
1 parent 9370cc5 commit 0e116fc
Showing 1 changed file with 11 additions and 9 deletions.
20 changes: 11 additions & 9 deletions core/services/workflows/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,14 @@ type stepUpdateManager struct {
}

func (sucm *stepUpdateManager) add(executionID string, ch stepUpdateChannel) (added bool) {
sucm.mu.Lock()
defer sucm.mu.Unlock()
if _, ok := sucm.m[executionID]; ok {
sucm.mu.RLock()
_, ok := sucm.m[executionID]
sucm.mu.RUnlock()
if ok {
return false
}
sucm.mu.Lock()
defer sucm.mu.Unlock()
sucm.m[executionID] = ch
return true
}
Expand All @@ -63,11 +66,10 @@ func (sucm *stepUpdateManager) remove(executionID string) {
func (sucm *stepUpdateManager) send(ctx context.Context, executionID string, stepUpdate store.WorkflowExecutionStep) error {
sucm.mu.RLock()
stepUpdateCh, ok := sucm.m[executionID]
sucm.mu.RUnlock()
if !ok {
sucm.mu.RUnlock()
return fmt.Errorf("step update channel not found for execution %s, dropping step update", executionID)
}
sucm.mu.RUnlock()

select {
case <-ctx.Done():
Expand Down Expand Up @@ -474,12 +476,12 @@ func (e *Engine) stepUpdateLoop(ctx context.Context, executionID string, stepUpd
return
}
// Executed synchronously to ensure we correctly schedule subsequent tasks.
e.logger.With(eIDKey, stepUpdate.ExecutionID, sRKey, stepUpdate.Ref).
Debugf("received step update for execution %s", stepUpdate.ExecutionID)
e.logger.Debugw(fmt.Sprintf("received step update for execution %s", stepUpdate.ExecutionID),
eIDKey, stepUpdate.ExecutionID, sRKey, stepUpdate.Ref)
err := e.handleStepUpdate(ctx, stepUpdate, workflowCreatedAt)
if err != nil {
e.logger.With(eIDKey, stepUpdate.ExecutionID, sRKey, stepUpdate.Ref).
Errorf("failed to update step state: %+v, %s", stepUpdate, err)
e.logger.Errorf(fmt.Sprintf("failed to update step state: %+v, %s", stepUpdate, err),
eIDKey, stepUpdate.ExecutionID, sRKey, stepUpdate.Ref)
}
}
}
Expand Down

0 comments on commit 0e116fc

Please sign in to comment.