diff --git a/internal/workflow/activities/cleanup.go b/internal/workflow/activities/cleanup.go index de8cf7c7..9c8ff4ac 100644 --- a/internal/workflow/activities/cleanup.go +++ b/internal/workflow/activities/cleanup.go @@ -14,16 +14,18 @@ func NewCleanUpActivity() *CleanUpActivity { } type CleanUpActivityParams struct { - FullPath string + Paths []string } func (a *CleanUpActivity) Execute(ctx context.Context, params *CleanUpActivityParams) error { - if params == nil || params.FullPath == "" { - return fmt.Errorf("error processing parameters: missing or empty") + if params == nil { + return fmt.Errorf("error processing parameters: missing") } - if err := os.RemoveAll(params.FullPath); err != nil { - return fmt.Errorf("error removing transfer directory: %v", err) + for _, p := range params.Paths { + if err := os.RemoveAll(p); err != nil { + return fmt.Errorf("error removing path: %v", err) + } } return nil diff --git a/internal/workflow/processing.go b/internal/workflow/processing.go index ffe3d7a9..f0b5aa05 100644 --- a/internal/workflow/processing.go +++ b/internal/workflow/processing.go @@ -457,8 +457,8 @@ func (w *ProcessingWorkflow) SessionHandler(sessCtx temporalsdk_workflow.Context if tinfo.WatcherName != "" && !tinfo.IsDir { // TODO: even if TempFile is defined, we should confirm that the file is // locally available in disk, just in case we're in the context of a - // session retry where a different working is doing the work. In that - // case, the activity whould be executed again. + // session retry where a different worker is doing the work. In that + // case, the activity would be executed again. if tinfo.TempFile == "" { activityOpts := withActivityOptsForLongLivedRequest(sessCtx) err := temporalsdk_workflow.ExecuteActivity( @@ -475,6 +475,9 @@ func (w *ProcessingWorkflow) SessionHandler(sessCtx temporalsdk_workflow.Context } } + // Both of these values relate to temporary files on Enduro's processing Dir that never get cleaned-up. + var tempBlob, tempExtracted string + tempBlob = tinfo.TempFile // Extract downloaded archive file contents. { if tinfo.WatcherName != "" && !tinfo.IsDir { @@ -497,6 +500,7 @@ func (w *ProcessingWorkflow) SessionHandler(sessCtx temporalsdk_workflow.Context tinfo.TempFile = result.ExtractPath tinfo.StripTopLevelDir = false tinfo.IsDir = true + tempExtracted = result.ExtractPath } } } @@ -523,11 +527,25 @@ func (w *ProcessingWorkflow) SessionHandler(sessCtx temporalsdk_workflow.Context // Delete local temporary files. defer func() { + // We need disconnected context here because when session gets released the cleanup + // activities get scheduled and then immediately canceled. + var filesToRemove []string if tinfo.Bundle.FullPathBeforeStrip != "" { - activityOpts := withActivityOptsForLocalAction(sessCtx) - _ = temporalsdk_workflow.ExecuteActivity(activityOpts, activities.CleanUpActivityName, &activities.CleanUpActivityParams{ - FullPath: tinfo.Bundle.FullPathBeforeStrip, - }).Get(activityOpts, nil) + filesToRemove = append(filesToRemove, tinfo.Bundle.FullPathBeforeStrip) + } + if tempBlob != "" { + filesToRemove = append(filesToRemove, tempBlob) + } + if tempExtracted != "" { + filesToRemove = append(filesToRemove, tempExtracted) + } + cleanUpCtx, cancel := temporalsdk_workflow.NewDisconnectedContext(sessCtx) + defer cancel() + activityOpts := withActivityOptsForLocalAction(cleanUpCtx) + if err := temporalsdk_workflow.ExecuteActivity(activityOpts, activities.CleanUpActivityName, &activities.CleanUpActivityParams{ + Paths: filesToRemove, + }).Get(activityOpts, nil); err != nil { + w.logger.Error(err, "failed to clean up temporary files", "path", tempExtracted) } }()