diff --git a/enduro.toml b/enduro.toml index fba769ef..d9fc4dec 100644 --- a/enduro.toml +++ b/enduro.toml @@ -29,6 +29,7 @@ bucket = "sips" pipeline = "am" retentionPeriod = "10s" stripTopLevelDir = true +rejectDuplicates = false [[watcher.filesystem]] name = "dev-fs" @@ -38,6 +39,7 @@ pipeline = "am" completedDir = "./hack/landfill" ignore = '(^\.gitkeep)|(^*\.mft)$' stripTopLevelDir = true +rejectDuplicates = false [[pipeline]] name = "am" diff --git a/internal/api/design/batch.go b/internal/api/design/batch.go index 77c07f31..6ff068b3 100644 --- a/internal/api/design/batch.go +++ b/internal/api/design/batch.go @@ -17,6 +17,7 @@ var _ = Service("batch", func() { Attribute("processing_config", String) Attribute("completed_dir", String) Attribute("retention_period", String) + Attribute("reject_duplicates", Boolean, func() { Default(false) }) Required("path") }) Result(BatchResult) diff --git a/internal/api/gen/batch/service.go b/internal/api/gen/batch/service.go index 611c3c02..aad60c3d 100644 --- a/internal/api/gen/batch/service.go +++ b/internal/api/gen/batch/service.go @@ -61,6 +61,7 @@ type SubmitPayload struct { ProcessingConfig *string CompletedDir *string RetentionPeriod *string + RejectDuplicates bool } // MakeNotAvailable builds a goa.ServiceError from an error. diff --git a/internal/api/gen/http/batch/client/cli.go b/internal/api/gen/http/batch/client/cli.go index 5b1b3d06..9910c373 100644 --- a/internal/api/gen/http/batch/client/cli.go +++ b/internal/api/gen/http/batch/client/cli.go @@ -23,7 +23,7 @@ func BuildSubmitPayload(batchSubmitBody string) (*batch.SubmitPayload, error) { { err = json.Unmarshal([]byte(batchSubmitBody), &body) if err != nil { - return nil, fmt.Errorf("invalid JSON for body, \nerror: %s, \nexample of valid JSON:\n%s", err, "'{\n \"completed_dir\": \"abc123\",\n \"path\": \"abc123\",\n \"pipeline\": \"abc123\",\n \"processing_config\": \"abc123\",\n \"retention_period\": \"abc123\"\n }'") + return nil, fmt.Errorf("invalid JSON for body, \nerror: %s, \nexample of valid JSON:\n%s", err, "'{\n \"completed_dir\": \"abc123\",\n \"path\": \"abc123\",\n \"pipeline\": \"abc123\",\n \"processing_config\": \"abc123\",\n \"reject_duplicates\": false,\n \"retention_period\": \"abc123\"\n }'") } } v := &batch.SubmitPayload{ @@ -32,6 +32,13 @@ func BuildSubmitPayload(batchSubmitBody string) (*batch.SubmitPayload, error) { ProcessingConfig: body.ProcessingConfig, CompletedDir: body.CompletedDir, RetentionPeriod: body.RetentionPeriod, + RejectDuplicates: body.RejectDuplicates, + } + { + var zero bool + if v.RejectDuplicates == zero { + v.RejectDuplicates = false + } } return v, nil diff --git a/internal/api/gen/http/batch/client/types.go b/internal/api/gen/http/batch/client/types.go index 008604d1..20f861f0 100644 --- a/internal/api/gen/http/batch/client/types.go +++ b/internal/api/gen/http/batch/client/types.go @@ -21,6 +21,7 @@ type SubmitRequestBody struct { ProcessingConfig *string `form:"processing_config,omitempty" json:"processing_config,omitempty" xml:"processing_config,omitempty"` CompletedDir *string `form:"completed_dir,omitempty" json:"completed_dir,omitempty" xml:"completed_dir,omitempty"` RetentionPeriod *string `form:"retention_period,omitempty" json:"retention_period,omitempty" xml:"retention_period,omitempty"` + RejectDuplicates bool `form:"reject_duplicates" json:"reject_duplicates" xml:"reject_duplicates"` } // SubmitResponseBody is the type of the "batch" service "submit" endpoint HTTP @@ -91,6 +92,13 @@ func NewSubmitRequestBody(p *batch.SubmitPayload) *SubmitRequestBody { ProcessingConfig: p.ProcessingConfig, CompletedDir: p.CompletedDir, RetentionPeriod: p.RetentionPeriod, + RejectDuplicates: p.RejectDuplicates, + } + { + var zero bool + if body.RejectDuplicates == zero { + body.RejectDuplicates = false + } } return body } diff --git a/internal/api/gen/http/batch/server/types.go b/internal/api/gen/http/batch/server/types.go index c3578831..86751f4d 100644 --- a/internal/api/gen/http/batch/server/types.go +++ b/internal/api/gen/http/batch/server/types.go @@ -21,6 +21,7 @@ type SubmitRequestBody struct { ProcessingConfig *string `form:"processing_config,omitempty" json:"processing_config,omitempty" xml:"processing_config,omitempty"` CompletedDir *string `form:"completed_dir,omitempty" json:"completed_dir,omitempty" xml:"completed_dir,omitempty"` RetentionPeriod *string `form:"retention_period,omitempty" json:"retention_period,omitempty" xml:"retention_period,omitempty"` + RejectDuplicates *bool `form:"reject_duplicates,omitempty" json:"reject_duplicates,omitempty" xml:"reject_duplicates,omitempty"` } // SubmitResponseBody is the type of the "batch" service "submit" endpoint HTTP @@ -154,6 +155,12 @@ func NewSubmitPayload(body *SubmitRequestBody) *batch.SubmitPayload { CompletedDir: body.CompletedDir, RetentionPeriod: body.RetentionPeriod, } + if body.RejectDuplicates != nil { + v.RejectDuplicates = *body.RejectDuplicates + } + if body.RejectDuplicates == nil { + v.RejectDuplicates = false + } return v } diff --git a/internal/api/gen/http/cli/enduro/cli.go b/internal/api/gen/http/cli/enduro/cli.go index 80a40b9b..f8a05815 100644 --- a/internal/api/gen/http/cli/enduro/cli.go +++ b/internal/api/gen/http/cli/enduro/cli.go @@ -39,6 +39,7 @@ func UsageExamples() string { "path": "abc123", "pipeline": "abc123", "processing_config": "abc123", + "reject_duplicates": false, "retention_period": "abc123" }'` + "\n" + os.Args[0] + ` collection monitor` + "\n" + @@ -410,6 +411,7 @@ Example: "path": "abc123", "pipeline": "abc123", "processing_config": "abc123", + "reject_duplicates": false, "retention_period": "abc123" }' `, os.Args[0]) diff --git a/internal/api/gen/http/openapi.json b/internal/api/gen/http/openapi.json index 17bf17c7..ab544123 100644 --- a/internal/api/gen/http/openapi.json +++ b/internal/api/gen/http/openapi.json @@ -170,6 +170,7 @@ "path": "abc123", "pipeline": "abc123", "processing_config": "abc123", + "reject_duplicates": false, "retention_period": "abc123" }, "properties": { @@ -189,6 +190,11 @@ "example": "abc123", "type": "string" }, + "reject_duplicates": { + "default": false, + "example": false, + "type": "boolean" + }, "retention_period": { "example": "abc123", "type": "string" diff --git a/internal/api/gen/http/openapi.yaml b/internal/api/gen/http/openapi.yaml index 111df67b..80aab260 100644 --- a/internal/api/gen/http/openapi.yaml +++ b/internal/api/gen/http/openapi.yaml @@ -654,6 +654,10 @@ definitions: processing_config: type: string example: abc123 + reject_duplicates: + type: boolean + default: false + example: false retention_period: type: string example: abc123 @@ -662,6 +666,7 @@ definitions: path: abc123 pipeline: abc123 processing_config: abc123 + reject_duplicates: false retention_period: abc123 required: - path diff --git a/internal/api/gen/http/openapi3.json b/internal/api/gen/http/openapi3.json index c1ddac02..989bc37d 100644 --- a/internal/api/gen/http/openapi3.json +++ b/internal/api/gen/http/openapi3.json @@ -567,6 +567,7 @@ "path": "abc123", "pipeline": "abc123", "processing_config": "abc123", + "reject_duplicates": false, "retention_period": "abc123" }, "properties": { @@ -586,6 +587,11 @@ "example": "abc123", "type": "string" }, + "reject_duplicates": { + "default": false, + "example": false, + "type": "boolean" + }, "retention_period": { "example": "abc123", "type": "string" @@ -642,6 +648,7 @@ "path": "abc123", "pipeline": "abc123", "processing_config": "abc123", + "reject_duplicates": false, "retention_period": "abc123" }, "schema": { diff --git a/internal/api/gen/http/openapi3.yaml b/internal/api/gen/http/openapi3.yaml index fd3e7f06..72b902f0 100644 --- a/internal/api/gen/http/openapi3.yaml +++ b/internal/api/gen/http/openapi3.yaml @@ -41,6 +41,7 @@ paths: path: abc123 pipeline: abc123 processing_config: abc123 + reject_duplicates: false retention_period: abc123 responses: "202": @@ -1154,6 +1155,10 @@ components: processing_config: type: string example: abc123 + reject_duplicates: + type: boolean + default: false + example: false retention_period: type: string example: abc123 @@ -1162,6 +1167,7 @@ components: path: abc123 pipeline: abc123 processing_config: abc123 + reject_duplicates: false retention_period: abc123 required: - path diff --git a/internal/batch/service.go b/internal/batch/service.go index e138819d..510f24a1 100644 --- a/internal/batch/service.go +++ b/internal/batch/service.go @@ -71,6 +71,7 @@ func (s *batchImpl) Submit(ctx context.Context, payload *goabatch.SubmitPayload) } input.RetentionPeriod = &dur } + input.RejectDuplicates = payload.RejectDuplicates opts := temporalsdk_client.StartWorkflowOptions{ ID: BatchWorkflowID, WorkflowIDReusePolicy: temporalapi_enums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE, diff --git a/internal/batch/workflow.go b/internal/batch/workflow.go index 49c0e2c7..4b75271e 100644 --- a/internal/batch/workflow.go +++ b/internal/batch/workflow.go @@ -28,6 +28,7 @@ type BatchWorkflowInput struct { ProcessingConfig string CompletedDir string RetentionPeriod *time.Duration + RejectDuplicates bool } func BatchWorkflow(ctx temporalsdk_workflow.Context, params BatchWorkflowInput) error { @@ -69,6 +70,7 @@ func (a *BatchActivity) Execute(ctx context.Context, params BatchWorkflowInput) ProcessingConfig: params.ProcessingConfig, CompletedDir: params.CompletedDir, RetentionPeriod: params.RetentionPeriod, + RejectDuplicates: params.RejectDuplicates, } _ = a.batchsvc.InitProcessingWorkflow(ctx, &req) } diff --git a/internal/collection/collection.go b/internal/collection/collection.go index 7054e089..4dc9a5e2 100644 --- a/internal/collection/collection.go +++ b/internal/collection/collection.go @@ -18,6 +18,7 @@ type Service interface { // Goa returns an implementation of the goacollection Service. Goa() goacollection.Service Create(context.Context, *Collection) error + CheckDuplicate(ctx context.Context, id uint) (bool, error) UpdateWorkflowStatus(ctx context.Context, ID uint, name string, workflowID, runID, transferID, aipID, pipelineID string, status Status, storedAt time.Time) error SetStatus(ctx context.Context, ID uint, status Status) error SetStatusInProgress(ctx context.Context, ID uint, startedAt time.Time) error @@ -88,6 +89,16 @@ func (svc *collectionImpl) Create(ctx context.Context, col *Collection) error { return nil } +func (svc *collectionImpl) CheckDuplicate(ctx context.Context, id uint) (bool, error) { + query := `SELECT EXISTS(SELECT 1 FROM collection c1 WHERE c1.name = (SELECT name FROM collection WHERE id = ?) AND c1.id <> ? AND c1.status NOT IN (3, 6))` + var exists bool + err := svc.db.GetContext(ctx, &exists, query, id, id) + if err != nil { + return false, fmt.Errorf("sql error: %w", err) + } + return exists, nil +} + func publishEvent(ctx context.Context, events EventService, eventType string, id uint) { // TODO: publish updated collection? var item *goacollection.EnduroStoredCollection diff --git a/internal/collection/fake/mock_collection.go b/internal/collection/fake/mock_collection.go index e583ba1a..4c365fc3 100644 --- a/internal/collection/fake/mock_collection.go +++ b/internal/collection/fake/mock_collection.go @@ -37,6 +37,21 @@ func (m *MockService) EXPECT() *MockServiceMockRecorder { return m.recorder } +// CheckDuplicate mocks base method. +func (m *MockService) CheckDuplicate(arg0 context.Context, arg1 uint) (bool, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CheckDuplicate", arg0, arg1) + ret0, _ := ret[0].(bool) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// CheckDuplicate indicates an expected call of CheckDuplicate. +func (mr *MockServiceMockRecorder) CheckDuplicate(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CheckDuplicate", reflect.TypeOf((*MockService)(nil).CheckDuplicate), arg0, arg1) +} + // Create mocks base method. func (m *MockService) Create(arg0 context.Context, arg1 *collection0.Collection) error { m.ctrl.T.Helper() diff --git a/internal/collection/workflow.go b/internal/collection/workflow.go index 8492ba9d..81593259 100644 --- a/internal/collection/workflow.go +++ b/internal/collection/workflow.go @@ -55,6 +55,9 @@ type ProcessingWorkflowRequest struct { // Processing configuration name. ProcessingConfig string + + // Whether we reject duplicates based on name (key). + RejectDuplicates bool } func InitProcessingWorkflow(ctx context.Context, c temporalsdk_client.Client, taskQueue string, req *ProcessingWorkflowRequest) error { diff --git a/internal/watcher/config.go b/internal/watcher/config.go index 482d0b57..80a76484 100644 --- a/internal/watcher/config.go +++ b/internal/watcher/config.go @@ -37,6 +37,7 @@ type FilesystemConfig struct { RetentionPeriod *time.Duration CompletedDir string StripTopLevelDir bool + RejectDuplicates bool } // See minio.go for more. @@ -56,4 +57,5 @@ type MinioConfig struct { Pipeline []string RetentionPeriod *time.Duration StripTopLevelDir bool + RejectDuplicates bool } diff --git a/internal/watcher/event.go b/internal/watcher/event.go index f818e398..dfd1ccc6 100644 --- a/internal/watcher/event.go +++ b/internal/watcher/event.go @@ -29,6 +29,9 @@ type BlobEvent struct { // Whether the top-level directory is meant to be stripped. StripTopLevelDir bool + // Whether duplicates are rejected or not. + RejectDuplicates bool + // Key of the blob. Key string @@ -46,6 +49,7 @@ func NewBlobEvent(w Watcher, key string, isDir bool) *BlobEvent { RetentionPeriod: w.RetentionPeriod(), CompletedDir: w.CompletedDir(), StripTopLevelDir: w.StripTopLevelDir(), + RejectDuplicates: w.RejectDuplicates(), Key: key, IsDir: isDir, } diff --git a/internal/watcher/filesystem.go b/internal/watcher/filesystem.go index 432bae19..a10af538 100644 --- a/internal/watcher/filesystem.go +++ b/internal/watcher/filesystem.go @@ -76,6 +76,7 @@ func NewFilesystemWatcher(ctx context.Context, config *FilesystemConfig) (*files retentionPeriod: config.RetentionPeriod, completedDir: config.CompletedDir, stripTopLevelDir: config.StripTopLevelDir, + rejectDuplicates: config.RejectDuplicates, }, } diff --git a/internal/watcher/minio.go b/internal/watcher/minio.go index 59b5e48d..186923a4 100644 --- a/internal/watcher/minio.go +++ b/internal/watcher/minio.go @@ -70,6 +70,7 @@ func NewMinioWatcher(ctx context.Context, config *MinioConfig) (*minioWatcher, e pipeline: config.Pipeline, retentionPeriod: config.RetentionPeriod, stripTopLevelDir: config.StripTopLevelDir, + rejectDuplicates: config.RejectDuplicates, }, }, nil } diff --git a/internal/watcher/watcher.go b/internal/watcher/watcher.go index 23740173..7942135b 100644 --- a/internal/watcher/watcher.go +++ b/internal/watcher/watcher.go @@ -29,6 +29,7 @@ type Watcher interface { RetentionPeriod() *time.Duration CompletedDir() string StripTopLevelDir() bool + RejectDuplicates() bool // Full path of the watched bucket when available, empty string otherwise. Path() string @@ -42,6 +43,7 @@ type commonWatcherImpl struct { retentionPeriod *time.Duration completedDir string stripTopLevelDir bool + rejectDuplicates bool } func (w *commonWatcherImpl) String() string { @@ -64,6 +66,10 @@ func (w *commonWatcherImpl) StripTopLevelDir() bool { return w.stripTopLevelDir } +func (w *commonWatcherImpl) RejectDuplicates() bool { + return w.rejectDuplicates +} + type Service interface { // Watchers return all known watchers. Watchers() []Watcher diff --git a/internal/workflow/local_activities.go b/internal/workflow/local_activities.go index ef87184e..e88731da 100644 --- a/internal/workflow/local_activities.go +++ b/internal/workflow/local_activities.go @@ -60,6 +60,10 @@ func updatePackageLocalActivity(ctx context.Context, logger logr.Logger, colsvc return nil } +func checkDuplicatePackageLocalActivity(ctx context.Context, logger logr.Logger, colsvc collection.Service, id uint) (bool, error) { + return colsvc.CheckDuplicate(ctx, id) +} + func loadConfigLocalActivity(ctx context.Context, m *manager.Manager, pipeline string, tinfo *TransferInfo) (*TransferInfo, error) { p, err := m.Pipelines.ByName(pipeline) if err != nil { diff --git a/internal/workflow/processing.go b/internal/workflow/processing.go index f7319c8f..79a62dcc 100644 --- a/internal/workflow/processing.go +++ b/internal/workflow/processing.go @@ -226,6 +226,21 @@ func (w *ProcessingWorkflow) Execute(ctx temporalsdk_workflow.Context, req *coll }).Get(activityOpts, nil) }() + // Reject duplicate collection if applicable. + { + if req.RejectDuplicates { + var exists bool + activityOpts := withLocalActivityOpts(ctx) + err := temporalsdk_workflow.ExecuteLocalActivity(activityOpts, checkDuplicatePackageLocalActivity, w.manager.Logger, w.manager.Collection, tinfo.CollectionID).Get(activityOpts, &exists) + if err != nil { + return fmt.Errorf("error checking duplicate: %v", err) + } + if exists { + return fmt.Errorf("duplicate detected: key: %s", tinfo.Key) + } + } + } + // Extract details from transfer name. { activityOpts := withLocalActivityWithoutRetriesOpts(ctx) diff --git a/main.go b/main.go index ab663880..00432060 100644 --- a/main.go +++ b/main.go @@ -190,6 +190,7 @@ func main() { RetentionPeriod: event.RetentionPeriod, CompletedDir: event.CompletedDir, StripTopLevelDir: event.StripTopLevelDir, + RejectDuplicates: event.RejectDuplicates, Key: event.Key, IsDir: event.IsDir, ValidationConfig: config.Validation, diff --git a/ui/src/openapi-generator/models/SubmitRequestBody.ts b/ui/src/openapi-generator/models/SubmitRequestBody.ts index ff9e279c..1360de20 100644 --- a/ui/src/openapi-generator/models/SubmitRequestBody.ts +++ b/ui/src/openapi-generator/models/SubmitRequestBody.ts @@ -43,6 +43,12 @@ export interface SubmitRequestBody { * @memberof SubmitRequestBody */ processingConfig?: string; + /** + * + * @type {boolean} + * @memberof SubmitRequestBody + */ + rejectDuplicates?: boolean; /** * * @type {string} @@ -75,6 +81,7 @@ export function SubmitRequestBodyFromJSONTyped(json: any, ignoreDiscriminator: b 'path': json['path'], 'pipeline': !exists(json, 'pipeline') ? undefined : json['pipeline'], 'processingConfig': !exists(json, 'processing_config') ? undefined : json['processing_config'], + 'rejectDuplicates': !exists(json, 'reject_duplicates') ? undefined : json['reject_duplicates'], 'retentionPeriod': !exists(json, 'retention_period') ? undefined : json['retention_period'], }; } @@ -92,6 +99,7 @@ export function SubmitRequestBodyToJSON(value?: SubmitRequestBody | null): any { 'path': value.path, 'pipeline': value.pipeline, 'processing_config': value.processingConfig, + 'reject_duplicates': value.rejectDuplicates, 'retention_period': value.retentionPeriod, }; } diff --git a/ui/src/views/Batch.vue b/ui/src/views/Batch.vue index 87abd82f..2b7012ff 100644 --- a/ui/src/views/Batch.vue +++ b/ui/src/views/Batch.vue @@ -31,6 +31,12 @@ + + + Reject transfers with duplicate names. + + +
@@ -93,6 +99,7 @@ export default class Batch extends Vue { processingConfig: null, completedDir: null, retentionPeriod: null, + rejectDuplicates: null, }; private tabIndex: number = 0; @@ -161,6 +168,9 @@ export default class Batch extends Vue { if (this.form.retentionPeriod && this.tabIndex === 1) { request.submitRequestBody.retentionPeriod = this.form.retentionPeriod; } + if (this.form.rejectDuplicates) { + request.submitRequestBody.rejectDuplicates = this.form.rejectDuplicates; + } return EnduroBatchClient.batchSubmit(request).then((response: api.BatchSubmitResponseBody) => { this.loadStatus(); }).catch((response: Response) => { diff --git a/website/content/en/docs/user-manual/configuration.md b/website/content/en/docs/user-manual/configuration.md index 785f9041..91df7b44 100644 --- a/website/content/en/docs/user-manual/configuration.md +++ b/website/content/en/docs/user-manual/configuration.md @@ -112,6 +112,9 @@ ignore = '(^\.gitkeep)|(^*\.mft)$' # Omit the top-level directory of the transfer after extraction. stripTopLevelDir = true + +# Reject transfers with duplicate transfer names. +rejectDuplicates = false ``` Namely, it monitors the `watched-dir` directory. It uses the inotify API for @@ -131,6 +134,14 @@ Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h". E.g.: `"10m"` +#### `rejectDuplicates` (Boolean) + +When enabled, the workflow will execute a check on the internal database for +successfully completed transfers with the same transfer name as the currently +processing package. If it finds a duplicate the transfer will fail. + +E.g.: `false` + #### `completedDir` (String) The path where transfers are moved into when processing has completed @@ -194,6 +205,9 @@ retentionPeriod = "10s" # Omit the top-level directory of the transfer after extraction. stripTopLevelDir = true + +# Reject transfers with duplicate transfer names. +rejectDuplicates = false ``` MinIO will deliver new events to us via a Redis instance at @@ -215,6 +229,14 @@ Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h". E.g.: `"10m"` +#### `rejectDuplicates` (Boolean) + +When enabled, the workflow will execute a check on the internal database for +succesfully completed transfers with the same transfer name as the currently +processing package. If it finds a duplicate the transfer will fail. + +E.g.: `false` + #### `pipeline` (String | Array(String)) The name of the pipeline to be used during processing. If undefined, one will