Skip to content

Commit

Permalink
Clean up files in processingDir (temporary staging dir) (#629)
Browse files Browse the repository at this point in the history
* Clean up files in processingDir (temporary staging dir)

* Clean up activity take several paths as input
  • Loading branch information
DanielCosme committed Aug 12, 2024
1 parent 521ff28 commit 5677b89
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 11 deletions.
12 changes: 7 additions & 5 deletions internal/workflow/activities/cleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,18 @@ func NewCleanUpActivity() *CleanUpActivity {
}

type CleanUpActivityParams struct {
FullPath string
Paths []string
}

func (a *CleanUpActivity) Execute(ctx context.Context, params *CleanUpActivityParams) error {
if params == nil || params.FullPath == "" {
return fmt.Errorf("error processing parameters: missing or empty")
if params == nil {
return fmt.Errorf("error processing parameters: missing")
}

if err := os.RemoveAll(params.FullPath); err != nil {
return fmt.Errorf("error removing transfer directory: %v", err)
for _, p := range params.Paths {
if err := os.RemoveAll(p); err != nil {
return fmt.Errorf("error removing path: %v", err)
}
}

return nil
Expand Down
30 changes: 24 additions & 6 deletions internal/workflow/processing.go
Original file line number Diff line number Diff line change
Expand Up @@ -457,8 +457,8 @@ func (w *ProcessingWorkflow) SessionHandler(sessCtx temporalsdk_workflow.Context
if tinfo.WatcherName != "" && !tinfo.IsDir {
// TODO: even if TempFile is defined, we should confirm that the file is
// locally available in disk, just in case we're in the context of a
// session retry where a different working is doing the work. In that
// case, the activity whould be executed again.
// session retry where a different worker is doing the work. In that
// case, the activity would be executed again.
if tinfo.TempFile == "" {
activityOpts := withActivityOptsForLongLivedRequest(sessCtx)
err := temporalsdk_workflow.ExecuteActivity(
Expand All @@ -475,6 +475,9 @@ func (w *ProcessingWorkflow) SessionHandler(sessCtx temporalsdk_workflow.Context
}
}

// Both of these values relate to temporary files on Enduro's processing Dir that never get cleaned-up.
var tempBlob, tempExtracted string
tempBlob = tinfo.TempFile
// Extract downloaded archive file contents.
{
if tinfo.WatcherName != "" && !tinfo.IsDir {
Expand All @@ -497,6 +500,7 @@ func (w *ProcessingWorkflow) SessionHandler(sessCtx temporalsdk_workflow.Context
tinfo.TempFile = result.ExtractPath
tinfo.StripTopLevelDir = false
tinfo.IsDir = true
tempExtracted = result.ExtractPath
}
}
}
Expand All @@ -523,11 +527,25 @@ func (w *ProcessingWorkflow) SessionHandler(sessCtx temporalsdk_workflow.Context

// Delete local temporary files.
defer func() {
// We need disconnected context here because when session gets released the cleanup
// activities get scheduled and then immediately canceled.
var filesToRemove []string
if tinfo.Bundle.FullPathBeforeStrip != "" {
activityOpts := withActivityOptsForLocalAction(sessCtx)
_ = temporalsdk_workflow.ExecuteActivity(activityOpts, activities.CleanUpActivityName, &activities.CleanUpActivityParams{
FullPath: tinfo.Bundle.FullPathBeforeStrip,
}).Get(activityOpts, nil)
filesToRemove = append(filesToRemove, tinfo.Bundle.FullPathBeforeStrip)
}
if tempBlob != "" {
filesToRemove = append(filesToRemove, tempBlob)
}
if tempExtracted != "" {
filesToRemove = append(filesToRemove, tempExtracted)
}
cleanUpCtx, cancel := temporalsdk_workflow.NewDisconnectedContext(sessCtx)
defer cancel()
activityOpts := withActivityOptsForLocalAction(cleanUpCtx)
if err := temporalsdk_workflow.ExecuteActivity(activityOpts, activities.CleanUpActivityName, &activities.CleanUpActivityParams{
Paths: filesToRemove,
}).Get(activityOpts, nil); err != nil {
w.logger.Error(err, "failed to clean up temporary files", "path", tempExtracted)
}
}()

Expand Down

0 comments on commit 5677b89

Please sign in to comment.