Skip to content

Commit

Permalink
Clean up files in processingDir (temporary staging dir)
Browse files Browse the repository at this point in the history
  • Loading branch information
DanielCosme committed Aug 9, 2024
1 parent 521ff28 commit 8ce7e5b
Showing 1 changed file with 31 additions and 5 deletions.
36 changes: 31 additions & 5 deletions internal/workflow/processing.go
Original file line number Diff line number Diff line change
Expand Up @@ -457,8 +457,8 @@ func (w *ProcessingWorkflow) SessionHandler(sessCtx temporalsdk_workflow.Context
if tinfo.WatcherName != "" && !tinfo.IsDir {
// TODO: even if TempFile is defined, we should confirm that the file is
// locally available in disk, just in case we're in the context of a
// session retry where a different working is doing the work. In that
// case, the activity whould be executed again.
// session retry where a different worker is doing the work. In that
// case, the activity would be executed again.
if tinfo.TempFile == "" {
activityOpts := withActivityOptsForLongLivedRequest(sessCtx)
err := temporalsdk_workflow.ExecuteActivity(
Expand All @@ -475,6 +475,9 @@ func (w *ProcessingWorkflow) SessionHandler(sessCtx temporalsdk_workflow.Context
}
}

// Both of these values relate to temporary files on Enduro's processing Dir that never get cleaned-up.
var tempBlob, tempExtracted string
tempBlob = tinfo.TempFile

Check warning on line 480 in internal/workflow/processing.go

View check run for this annotation

Codecov / codecov/patch

internal/workflow/processing.go#L479-L480

Added lines #L479 - L480 were not covered by tests
// Extract downloaded archive file contents.
{
if tinfo.WatcherName != "" && !tinfo.IsDir {
Expand All @@ -497,6 +500,7 @@ func (w *ProcessingWorkflow) SessionHandler(sessCtx temporalsdk_workflow.Context
tinfo.TempFile = result.ExtractPath
tinfo.StripTopLevelDir = false
tinfo.IsDir = true
tempExtracted = result.ExtractPath

Check warning on line 503 in internal/workflow/processing.go

View check run for this annotation

Codecov / codecov/patch

internal/workflow/processing.go#L503

Added line #L503 was not covered by tests
}
}
}
Expand All @@ -523,11 +527,33 @@ func (w *ProcessingWorkflow) SessionHandler(sessCtx temporalsdk_workflow.Context

// Delete local temporary files.
defer func() {
// We need disconnected context here because when session gets released the cleanup
// activities get scheduled and then immediately canceled.
cleanUpContext, cancel := temporalsdk_workflow.NewDisconnectedContext(sessCtx)
defer cancel()

Check warning on line 533 in internal/workflow/processing.go

View check run for this annotation

Codecov / codecov/patch

internal/workflow/processing.go#L532-L533

Added lines #L532 - L533 were not covered by tests
if tinfo.Bundle.FullPathBeforeStrip != "" {
activityOpts := withActivityOptsForLocalAction(sessCtx)
_ = temporalsdk_workflow.ExecuteActivity(activityOpts, activities.CleanUpActivityName, &activities.CleanUpActivityParams{
activityOpts := withActivityOptsForLocalAction(cleanUpContext)
if err := temporalsdk_workflow.ExecuteActivity(activityOpts, activities.CleanUpActivityName, &activities.CleanUpActivityParams{

Check warning on line 536 in internal/workflow/processing.go

View check run for this annotation

Codecov / codecov/patch

internal/workflow/processing.go#L535-L536

Added lines #L535 - L536 were not covered by tests
FullPath: tinfo.Bundle.FullPathBeforeStrip,
}).Get(activityOpts, nil)
}).Get(activityOpts, nil); err != nil {
w.logger.Error(err, "failed to clean up", "path", tinfo.Bundle.FullPathBeforeStrip)

Check warning on line 539 in internal/workflow/processing.go

View check run for this annotation

Codecov / codecov/patch

internal/workflow/processing.go#L538-L539

Added lines #L538 - L539 were not covered by tests
}
}
if tempBlob != "" {
activityOpts := withActivityOptsForLocalAction(cleanUpContext)
if err := temporalsdk_workflow.ExecuteActivity(activityOpts, activities.CleanUpActivityName, &activities.CleanUpActivityParams{
FullPath: tempBlob,
}).Get(activityOpts, nil); err != nil {
w.logger.Error(err, "failed to clean up", "path", tempBlob)

Check warning on line 547 in internal/workflow/processing.go

View check run for this annotation

Codecov / codecov/patch

internal/workflow/processing.go#L542-L547

Added lines #L542 - L547 were not covered by tests
}
}
if tempExtracted != "" {
activityOpts := withActivityOptsForLocalAction(cleanUpContext)
if err := temporalsdk_workflow.ExecuteActivity(activityOpts, activities.CleanUpActivityName, &activities.CleanUpActivityParams{
FullPath: tempExtracted,
}).Get(activityOpts, nil); err != nil {
w.logger.Error(err, "failed to clean up", "path", tempExtracted)

Check warning on line 555 in internal/workflow/processing.go

View check run for this annotation

Codecov / codecov/patch

internal/workflow/processing.go#L550-L555

Added lines #L550 - L555 were not covered by tests
}
}
}()

Expand Down

0 comments on commit 8ce7e5b

Please sign in to comment.