From 8ce7e5b4282c9b6c0bed3fe50cec2e1ec3c23737 Mon Sep 17 00:00:00 2001 From: Daniel Cosme Date: Fri, 9 Aug 2024 12:51:46 -0400 Subject: [PATCH 1/2] Clean up files in processingDir (temporary staging dir) --- internal/workflow/processing.go | 36 ++++++++++++++++++++++++++++----- 1 file changed, 31 insertions(+), 5 deletions(-) diff --git a/internal/workflow/processing.go b/internal/workflow/processing.go index ffe3d7a9..70013f14 100644 --- a/internal/workflow/processing.go +++ b/internal/workflow/processing.go @@ -457,8 +457,8 @@ func (w *ProcessingWorkflow) SessionHandler(sessCtx temporalsdk_workflow.Context if tinfo.WatcherName != "" && !tinfo.IsDir { // TODO: even if TempFile is defined, we should confirm that the file is // locally available in disk, just in case we're in the context of a - // session retry where a different working is doing the work. In that - // case, the activity whould be executed again. + // session retry where a different worker is doing the work. In that + // case, the activity would be executed again. if tinfo.TempFile == "" { activityOpts := withActivityOptsForLongLivedRequest(sessCtx) err := temporalsdk_workflow.ExecuteActivity( @@ -475,6 +475,9 @@ func (w *ProcessingWorkflow) SessionHandler(sessCtx temporalsdk_workflow.Context } } + // Both of these values relate to temporary files on Enduro's processing Dir that never get cleaned-up. + var tempBlob, tempExtracted string + tempBlob = tinfo.TempFile // Extract downloaded archive file contents. { if tinfo.WatcherName != "" && !tinfo.IsDir { @@ -497,6 +500,7 @@ func (w *ProcessingWorkflow) SessionHandler(sessCtx temporalsdk_workflow.Context tinfo.TempFile = result.ExtractPath tinfo.StripTopLevelDir = false tinfo.IsDir = true + tempExtracted = result.ExtractPath } } } @@ -523,11 +527,33 @@ func (w *ProcessingWorkflow) SessionHandler(sessCtx temporalsdk_workflow.Context // Delete local temporary files. defer func() { + // We need disconnected context here because when session gets released the cleanup + // activities get scheduled and then immediately canceled. + cleanUpContext, cancel := temporalsdk_workflow.NewDisconnectedContext(sessCtx) + defer cancel() if tinfo.Bundle.FullPathBeforeStrip != "" { - activityOpts := withActivityOptsForLocalAction(sessCtx) - _ = temporalsdk_workflow.ExecuteActivity(activityOpts, activities.CleanUpActivityName, &activities.CleanUpActivityParams{ + activityOpts := withActivityOptsForLocalAction(cleanUpContext) + if err := temporalsdk_workflow.ExecuteActivity(activityOpts, activities.CleanUpActivityName, &activities.CleanUpActivityParams{ FullPath: tinfo.Bundle.FullPathBeforeStrip, - }).Get(activityOpts, nil) + }).Get(activityOpts, nil); err != nil { + w.logger.Error(err, "failed to clean up", "path", tinfo.Bundle.FullPathBeforeStrip) + } + } + if tempBlob != "" { + activityOpts := withActivityOptsForLocalAction(cleanUpContext) + if err := temporalsdk_workflow.ExecuteActivity(activityOpts, activities.CleanUpActivityName, &activities.CleanUpActivityParams{ + FullPath: tempBlob, + }).Get(activityOpts, nil); err != nil { + w.logger.Error(err, "failed to clean up", "path", tempBlob) + } + } + if tempExtracted != "" { + activityOpts := withActivityOptsForLocalAction(cleanUpContext) + if err := temporalsdk_workflow.ExecuteActivity(activityOpts, activities.CleanUpActivityName, &activities.CleanUpActivityParams{ + FullPath: tempExtracted, + }).Get(activityOpts, nil); err != nil { + w.logger.Error(err, "failed to clean up", "path", tempExtracted) + } } }() From 1c3f844afa7244bbabefe58d6cc6b6687086aec5 Mon Sep 17 00:00:00 2001 From: Daniel Cosme Date: Mon, 12 Aug 2024 12:11:39 -0400 Subject: [PATCH 2/2] Clean up activity take several paths as input --- internal/workflow/activities/cleanup.go | 12 ++++++---- internal/workflow/processing.go | 32 ++++++++++--------------- 2 files changed, 19 insertions(+), 25 deletions(-) diff --git a/internal/workflow/activities/cleanup.go b/internal/workflow/activities/cleanup.go index de8cf7c7..9c8ff4ac 100644 --- a/internal/workflow/activities/cleanup.go +++ b/internal/workflow/activities/cleanup.go @@ -14,16 +14,18 @@ func NewCleanUpActivity() *CleanUpActivity { } type CleanUpActivityParams struct { - FullPath string + Paths []string } func (a *CleanUpActivity) Execute(ctx context.Context, params *CleanUpActivityParams) error { - if params == nil || params.FullPath == "" { - return fmt.Errorf("error processing parameters: missing or empty") + if params == nil { + return fmt.Errorf("error processing parameters: missing") } - if err := os.RemoveAll(params.FullPath); err != nil { - return fmt.Errorf("error removing transfer directory: %v", err) + for _, p := range params.Paths { + if err := os.RemoveAll(p); err != nil { + return fmt.Errorf("error removing path: %v", err) + } } return nil diff --git a/internal/workflow/processing.go b/internal/workflow/processing.go index 70013f14..f0b5aa05 100644 --- a/internal/workflow/processing.go +++ b/internal/workflow/processing.go @@ -529,31 +529,23 @@ func (w *ProcessingWorkflow) SessionHandler(sessCtx temporalsdk_workflow.Context defer func() { // We need disconnected context here because when session gets released the cleanup // activities get scheduled and then immediately canceled. - cleanUpContext, cancel := temporalsdk_workflow.NewDisconnectedContext(sessCtx) - defer cancel() + var filesToRemove []string if tinfo.Bundle.FullPathBeforeStrip != "" { - activityOpts := withActivityOptsForLocalAction(cleanUpContext) - if err := temporalsdk_workflow.ExecuteActivity(activityOpts, activities.CleanUpActivityName, &activities.CleanUpActivityParams{ - FullPath: tinfo.Bundle.FullPathBeforeStrip, - }).Get(activityOpts, nil); err != nil { - w.logger.Error(err, "failed to clean up", "path", tinfo.Bundle.FullPathBeforeStrip) - } + filesToRemove = append(filesToRemove, tinfo.Bundle.FullPathBeforeStrip) } if tempBlob != "" { - activityOpts := withActivityOptsForLocalAction(cleanUpContext) - if err := temporalsdk_workflow.ExecuteActivity(activityOpts, activities.CleanUpActivityName, &activities.CleanUpActivityParams{ - FullPath: tempBlob, - }).Get(activityOpts, nil); err != nil { - w.logger.Error(err, "failed to clean up", "path", tempBlob) - } + filesToRemove = append(filesToRemove, tempBlob) } if tempExtracted != "" { - activityOpts := withActivityOptsForLocalAction(cleanUpContext) - if err := temporalsdk_workflow.ExecuteActivity(activityOpts, activities.CleanUpActivityName, &activities.CleanUpActivityParams{ - FullPath: tempExtracted, - }).Get(activityOpts, nil); err != nil { - w.logger.Error(err, "failed to clean up", "path", tempExtracted) - } + filesToRemove = append(filesToRemove, tempExtracted) + } + cleanUpCtx, cancel := temporalsdk_workflow.NewDisconnectedContext(sessCtx) + defer cancel() + activityOpts := withActivityOptsForLocalAction(cleanUpCtx) + if err := temporalsdk_workflow.ExecuteActivity(activityOpts, activities.CleanUpActivityName, &activities.CleanUpActivityParams{ + Paths: filesToRemove, + }).Get(activityOpts, nil); err != nil { + w.logger.Error(err, "failed to clean up temporary files", "path", tempExtracted) } }()