Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean up files in processingDir (temporary staging dir) #629

Merged
merged 2 commits into from
Aug 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions internal/workflow/activities/cleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,18 @@
}

type CleanUpActivityParams struct {
FullPath string
Paths []string
}

func (a *CleanUpActivity) Execute(ctx context.Context, params *CleanUpActivityParams) error {
if params == nil || params.FullPath == "" {
return fmt.Errorf("error processing parameters: missing or empty")
if params == nil {
return fmt.Errorf("error processing parameters: missing")

Check warning on line 22 in internal/workflow/activities/cleanup.go

View check run for this annotation

Codecov / codecov/patch

internal/workflow/activities/cleanup.go#L21-L22

Added lines #L21 - L22 were not covered by tests
}

if err := os.RemoveAll(params.FullPath); err != nil {
return fmt.Errorf("error removing transfer directory: %v", err)
for _, p := range params.Paths {
if err := os.RemoveAll(p); err != nil {
return fmt.Errorf("error removing path: %v", err)

Check warning on line 27 in internal/workflow/activities/cleanup.go

View check run for this annotation

Codecov / codecov/patch

internal/workflow/activities/cleanup.go#L25-L27

Added lines #L25 - L27 were not covered by tests
}
}

return nil
Expand Down
30 changes: 24 additions & 6 deletions internal/workflow/processing.go
Original file line number Diff line number Diff line change
Expand Up @@ -457,8 +457,8 @@
if tinfo.WatcherName != "" && !tinfo.IsDir {
// TODO: even if TempFile is defined, we should confirm that the file is
// locally available in disk, just in case we're in the context of a
// session retry where a different working is doing the work. In that
// case, the activity whould be executed again.
// session retry where a different worker is doing the work. In that
// case, the activity would be executed again.
if tinfo.TempFile == "" {
activityOpts := withActivityOptsForLongLivedRequest(sessCtx)
err := temporalsdk_workflow.ExecuteActivity(
Expand All @@ -475,6 +475,9 @@
}
}

// Both of these values relate to temporary files on Enduro's processing Dir that never get cleaned-up.
var tempBlob, tempExtracted string
tempBlob = tinfo.TempFile

Check warning on line 480 in internal/workflow/processing.go

View check run for this annotation

Codecov / codecov/patch

internal/workflow/processing.go#L479-L480

Added lines #L479 - L480 were not covered by tests
// Extract downloaded archive file contents.
{
if tinfo.WatcherName != "" && !tinfo.IsDir {
Expand All @@ -497,6 +500,7 @@
tinfo.TempFile = result.ExtractPath
tinfo.StripTopLevelDir = false
tinfo.IsDir = true
tempExtracted = result.ExtractPath

Check warning on line 503 in internal/workflow/processing.go

View check run for this annotation

Codecov / codecov/patch

internal/workflow/processing.go#L503

Added line #L503 was not covered by tests
}
}
}
Expand All @@ -523,11 +527,25 @@

// Delete local temporary files.
defer func() {
// We need disconnected context here because when session gets released the cleanup
// activities get scheduled and then immediately canceled.
var filesToRemove []string

Check warning on line 532 in internal/workflow/processing.go

View check run for this annotation

Codecov / codecov/patch

internal/workflow/processing.go#L532

Added line #L532 was not covered by tests
if tinfo.Bundle.FullPathBeforeStrip != "" {
activityOpts := withActivityOptsForLocalAction(sessCtx)
_ = temporalsdk_workflow.ExecuteActivity(activityOpts, activities.CleanUpActivityName, &activities.CleanUpActivityParams{
FullPath: tinfo.Bundle.FullPathBeforeStrip,
}).Get(activityOpts, nil)
filesToRemove = append(filesToRemove, tinfo.Bundle.FullPathBeforeStrip)

Check warning on line 534 in internal/workflow/processing.go

View check run for this annotation

Codecov / codecov/patch

internal/workflow/processing.go#L534

Added line #L534 was not covered by tests
}
if tempBlob != "" {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Calling CleanUpActivity multiple times works, but there is some unnecessary overhead for temporal to schedule the activity and for the worker to pull it off the queue each time the activity runs. A more efficient option would be to have the clean up activity clean up all the temp files/directories at the same time. You could use https://github.com/artefactual-sdps/temporal-activities/blob/main/filesys/remove_activity.go or modify the local CleanUpActivity.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the feedback. Changes are here: 1c3f844

filesToRemove = append(filesToRemove, tempBlob)

Check warning on line 537 in internal/workflow/processing.go

View check run for this annotation

Codecov / codecov/patch

internal/workflow/processing.go#L536-L537

Added lines #L536 - L537 were not covered by tests
}
if tempExtracted != "" {
filesToRemove = append(filesToRemove, tempExtracted)

Check warning on line 540 in internal/workflow/processing.go

View check run for this annotation

Codecov / codecov/patch

internal/workflow/processing.go#L539-L540

Added lines #L539 - L540 were not covered by tests
}
cleanUpCtx, cancel := temporalsdk_workflow.NewDisconnectedContext(sessCtx)
defer cancel()
activityOpts := withActivityOptsForLocalAction(cleanUpCtx)
if err := temporalsdk_workflow.ExecuteActivity(activityOpts, activities.CleanUpActivityName, &activities.CleanUpActivityParams{
Paths: filesToRemove,
}).Get(activityOpts, nil); err != nil {
w.logger.Error(err, "failed to clean up temporary files", "path", tempExtracted)

Check warning on line 548 in internal/workflow/processing.go

View check run for this annotation

Codecov / codecov/patch

internal/workflow/processing.go#L542-L548

Added lines #L542 - L548 were not covered by tests
}
}()

Expand Down
Loading