From d8f5c22fa7aa78c115b1eb42e7b973bd41b37661 Mon Sep 17 00:00:00 2001 From: z30001483 <37854724+dormanze@users.noreply.github.com> Date: Mon, 24 Feb 2025 15:07:04 +0800 Subject: [PATCH 1/3] mirror:Check the source object exists before remove --- cmd/mirror-main.go | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/cmd/mirror-main.go b/cmd/mirror-main.go index 488f83b69c..8ff85a44e0 100644 --- a/cmd/mirror-main.go +++ b/cmd/mirror-main.go @@ -367,7 +367,17 @@ func (mj *mirrorJob) doRemove(ctx context.Context, sURLs URLs, event EventInfo) if mj.opts.isFake { return sURLs.WithError(nil) } - + if sURLs.SourceContent != nil { + // Construct proper path with alias. + sourceWithAlias := filepath.Join(sURLs.SourceAlias, sURLs.SourceContent.URL.Path) + sourceCient, pErr := newClient(sourceWithAlias) + if pErr != nil { + return sURLs.WithError(pErr) + } + if _, err := sourceCient.Stat(ctx, StatOptions{headOnly: true}); err == nil { + return sURLs.WithError(nil) + } + } // Construct proper path with alias. targetWithAlias := filepath.Join(sURLs.TargetAlias, sURLs.TargetContent.URL.Path) clnt, pErr := newClient(targetWithAlias) From ad96fd0edc5b02f302e549742dddd622c6661653 Mon Sep 17 00:00:00 2001 From: z30001483 <37854724+dormanze@users.noreply.github.com> Date: Tue, 25 Feb 2025 17:33:01 +0800 Subject: [PATCH 2/3] mirror:support version --- cmd/client-s3.go | 6 ++++++ cmd/client.go | 1 + cmd/common-methods.go | 1 + cmd/mirror-main.go | 31 ++++++++++++++++++++++++++----- cmd/mirror-url.go | 2 +- cmd/watch.go | 1 + 6 files changed, 36 insertions(+), 6 deletions(-) diff --git a/cmd/client-s3.go b/cmd/client-s3.go index 64a98afcc7..5254be63f1 100644 --- a/cmd/client-s3.go +++ b/cmd/client-s3.go @@ -739,6 +739,7 @@ func (c *S3Client) notificationToEventsInfo(ninfo notification.Info) []EventInfo Host: record.Source.Host, Port: record.Source.Port, UserAgent: record.Source.UserAgent, + VersionID: record.S3.Object.VersionID, } } else if strings.HasPrefix(record.EventName, "s3:ObjectCreated:PutRetention") { eventsInfo[i] = EventInfo{ @@ -750,6 +751,7 @@ func (c *S3Client) notificationToEventsInfo(ninfo notification.Info) []EventInfo Host: record.Source.Host, Port: record.Source.Port, UserAgent: record.Source.UserAgent, + VersionID: record.S3.Object.VersionID, } } else if strings.HasPrefix(record.EventName, "s3:ObjectCreated:PutLegalHold") { eventsInfo[i] = EventInfo{ @@ -761,6 +763,7 @@ func (c *S3Client) notificationToEventsInfo(ninfo notification.Info) []EventInfo Host: record.Source.Host, Port: record.Source.Port, UserAgent: record.Source.UserAgent, + VersionID: record.S3.Object.VersionID, } } else { eventsInfo[i] = EventInfo{ @@ -772,6 +775,7 @@ func (c *S3Client) notificationToEventsInfo(ninfo notification.Info) []EventInfo Host: record.Source.Host, Port: record.Source.Port, UserAgent: record.Source.UserAgent, + VersionID: record.S3.Object.VersionID, } } } else { @@ -784,6 +788,7 @@ func (c *S3Client) notificationToEventsInfo(ninfo notification.Info) []EventInfo Host: record.Source.Host, Port: record.Source.Port, UserAgent: record.Source.UserAgent, + VersionID: record.S3.Object.VersionID, } } } @@ -1087,6 +1092,7 @@ func (c *S3Client) Put(ctx context.Context, reader io.Reader, size int64, progre } opts := minio.PutObjectOptions{ + Internal: minio.AdvancedPutOptions{SourceVersionID: putOpts.versionId}, UserMetadata: metadata, UserTags: tagsMap, Progress: progress, diff --git a/cmd/client.go b/cmd/client.go index cb81f57348..52379fd7e7 100644 --- a/cmd/client.go +++ b/cmd/client.go @@ -76,6 +76,7 @@ type PutOptions struct { concurrentStream bool ifNotExists bool checksum minio.ChecksumType + versionId string } // StatOptions holds options of the HEAD operation diff --git a/cmd/common-methods.go b/cmd/common-methods.go index 06b9ebeb52..0df8168c1a 100644 --- a/cmd/common-methods.go +++ b/cmd/common-methods.go @@ -507,6 +507,7 @@ func uploadSourceToTargetURL(ctx context.Context, uploadOpts uploadSourceToTarge multipartThreads: uint(multipartThreads), ifNotExists: uploadOpts.ifNotExists, checksum: uploadOpts.urls.checksum, + versionId: uploadOpts.urls.TargetContent.VersionID, } if isReadAt(reader) || length == 0 { diff --git a/cmd/mirror-main.go b/cmd/mirror-main.go index 8ff85a44e0..c57ed67e18 100644 --- a/cmd/mirror-main.go +++ b/cmd/mirror-main.go @@ -100,6 +100,10 @@ var ( Name: "disable-multipart", Usage: "disable multipart upload feature", }, + cli.BoolFlag{ + Name: "with-versioning", + Usage: "upload or remove with versioning enabled on a bucket", + }, cli.StringSliceFlag{ Name: "exclude", Usage: "exclude object(s) that match specified object name pattern", @@ -223,6 +227,9 @@ EXAMPLES: 16. Cross mirror between sites in a active-active deployment. Site-A: {{.Prompt}} {{.HelpName}} --active-active siteA siteB Site-B: {{.Prompt}} {{.HelpName}} --active-active siteB siteA + + 17. Mirror a bucket from MinIO cloud storage to other minIO cloud storage with versioning enabled. + {{.Prompt}} {{.HelpName}} --with-versioning myminio/bucket1 otherminio/bucket2 `, } @@ -367,7 +374,8 @@ func (mj *mirrorJob) doRemove(ctx context.Context, sURLs URLs, event EventInfo) if mj.opts.isFake { return sURLs.WithError(nil) } - if sURLs.SourceContent != nil { + // remote S3 to local file system + if sURLs.SourceContent != nil && sURLs.TargetAlias == "" { // Construct proper path with alias. sourceWithAlias := filepath.Join(sURLs.SourceAlias, sURLs.SourceContent.URL.Path) sourceCient, pErr := newClient(sourceWithAlias) @@ -390,7 +398,7 @@ func (mj *mirrorJob) doRemove(ctx context.Context, sURLs URLs, event EventInfo) clnt.AddUserAgent(uaMirrorAppName, ReleaseTag) } contentCh := make(chan *ClientContent, 1) - contentCh <- &ClientContent{URL: *newClientURL(sURLs.TargetContent.URL.Path)} + contentCh <- &ClientContent{URL: *newClientURL(sURLs.TargetContent.URL.Path), VersionID: sURLs.TargetContent.VersionID} close(contentCh) isRemoveBucket := false resultCh := clnt.Remove(ctx, false, isRemoveBucket, false, false, contentCh) @@ -702,6 +710,10 @@ func (mj *mirrorJob) watchMirrorEvents(ctx context.Context, events []EventInfo) if strings.HasPrefix(string(event.Type), "s3:ObjectCreated:") { sourceModTime, _ := time.Parse(time.RFC3339Nano, event.Time) + targetContent := &ClientContent{URL: *targetURL} + if mj.opts.enableVersion { + targetContent = &ClientContent{URL: *targetURL, VersionID: event.VersionID} + } mirrorURL := URLs{ SourceAlias: sourceAlias, SourceContent: &ClientContent{ @@ -713,7 +725,7 @@ func (mj *mirrorJob) watchMirrorEvents(ctx context.Context, events []EventInfo) Metadata: event.UserMetadata, }, TargetAlias: targetAlias, - TargetContent: &ClientContent{URL: *targetURL}, + TargetContent: targetContent, MD5: mj.opts.md5, checksum: mj.opts.checksum, DisableMultipart: mj.opts.disableMultipart, @@ -737,11 +749,15 @@ func (mj *mirrorJob) watchMirrorEvents(ctx context.Context, events []EventInfo) // Ignore delete cascading delete events if cyclical. continue } + targetContent := &ClientContent{URL: *targetURL} + if mj.opts.enableVersion { + targetContent = &ClientContent{URL: *targetURL, VersionID: event.VersionID} + } mirrorURL := URLs{ SourceAlias: sourceAlias, - SourceContent: nil, + SourceContent: &ClientContent{URL: *sourceURL}, TargetAlias: targetAlias, - TargetContent: &ClientContent{URL: *targetURL}, + TargetContent: targetContent, MD5: mj.opts.md5, checksum: mj.opts.checksum, DisableMultipart: mj.opts.disableMultipart, @@ -1016,6 +1032,7 @@ func runMirror(ctx context.Context, srcURL, dstURL string, cli *cli.Context, enc md5: md5, checksum: checksum, disableMultipart: cli.Bool("disable-multipart"), + enableVersion: cli.Bool("with-versioning"), skipErrors: cli.Bool("skip-errors"), excludeOptions: cli.StringSlice("exclude"), excludeBuckets: cli.StringSlice("exclude-bucket"), @@ -1044,6 +1061,10 @@ func runMirror(ctx context.Context, srcURL, dstURL string, cli *cli.Context, enc mirrorSrcBuckets := srcClt.GetURL().Type == objectStorage && srcClt.GetURL().Path == string(srcClt.GetURL().Separator) mirrorBucketsToBuckets := mirrorSrcBuckets && createDstBuckets + if cli.Bool("with-versioning") && (!checkIfBucketIsVersioned(ctx, srcURL) || !checkIfBucketIsVersioned(ctx, dstURL)) { + fatalIf(errInvalidArgument().Trace(cli.Command.Name), "You cannot specify --with-versioning in versioning not enabled buckets") + } + if mirrorSrcBuckets || createDstBuckets { // Synchronize buckets using dirDifference function for d := range bucketDifference(ctx, srcClt, dstClt, mj.opts) { diff --git a/cmd/mirror-url.go b/cmd/mirror-url.go index 1c71fb8917..4c33908204 100644 --- a/cmd/mirror-url.go +++ b/cmd/mirror-url.go @@ -272,7 +272,7 @@ type mirrorOptions struct { skipErrors bool excludeOptions, excludeStorageClasses, excludeBuckets []string encKeyDB map[string][]prefixSSEPair - md5, disableMultipart bool + md5, disableMultipart, enableVersion bool olderThan, newerThan string storageClass string userMetadata map[string]string diff --git a/cmd/watch.go b/cmd/watch.go index 1da8cd6662..32e676ca40 100644 --- a/cmd/watch.go +++ b/cmd/watch.go @@ -37,6 +37,7 @@ type EventInfo struct { Port string UserAgent string Type notification.EventType + VersionID string } // WatchOptions contains watch configuration options From 50cc4b9c481459b77d0b9927d22e7c3db57d929f Mon Sep 17 00:00:00 2001 From: z30001483 <37854724+dormanze@users.noreply.github.com> Date: Fri, 28 Feb 2025 09:15:44 +0800 Subject: [PATCH 3/3] fix code check --- cmd/client-s3.go | 2 +- cmd/client.go | 2 +- cmd/common-methods.go | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/cmd/client-s3.go b/cmd/client-s3.go index 5254be63f1..c2200cff87 100644 --- a/cmd/client-s3.go +++ b/cmd/client-s3.go @@ -1092,7 +1092,7 @@ func (c *S3Client) Put(ctx context.Context, reader io.Reader, size int64, progre } opts := minio.PutObjectOptions{ - Internal: minio.AdvancedPutOptions{SourceVersionID: putOpts.versionId}, + Internal: minio.AdvancedPutOptions{SourceVersionID: putOpts.versionID}, UserMetadata: metadata, UserTags: tagsMap, Progress: progress, diff --git a/cmd/client.go b/cmd/client.go index 52379fd7e7..19b0572926 100644 --- a/cmd/client.go +++ b/cmd/client.go @@ -76,7 +76,7 @@ type PutOptions struct { concurrentStream bool ifNotExists bool checksum minio.ChecksumType - versionId string + versionID string } // StatOptions holds options of the HEAD operation diff --git a/cmd/common-methods.go b/cmd/common-methods.go index 1df50cbbe4..2775f89556 100644 --- a/cmd/common-methods.go +++ b/cmd/common-methods.go @@ -506,7 +506,7 @@ func uploadSourceToTargetURL(ctx context.Context, uploadOpts uploadSourceToTarge multipartThreads: uint(multipartThreads), ifNotExists: uploadOpts.ifNotExists, checksum: uploadOpts.urls.checksum, - versionId: uploadOpts.urls.TargetContent.VersionID, + versionID: uploadOpts.urls.TargetContent.VersionID, } if isReadAt(reader) || length == 0 {