Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mirror:support version #5149

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions cmd/client-s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -739,6 +739,7 @@ func (c *S3Client) notificationToEventsInfo(ninfo notification.Info) []EventInfo
Host: record.Source.Host,
Port: record.Source.Port,
UserAgent: record.Source.UserAgent,
VersionID: record.S3.Object.VersionID,
}
} else if strings.HasPrefix(record.EventName, "s3:ObjectCreated:PutRetention") {
eventsInfo[i] = EventInfo{
Expand All @@ -750,6 +751,7 @@ func (c *S3Client) notificationToEventsInfo(ninfo notification.Info) []EventInfo
Host: record.Source.Host,
Port: record.Source.Port,
UserAgent: record.Source.UserAgent,
VersionID: record.S3.Object.VersionID,
}
} else if strings.HasPrefix(record.EventName, "s3:ObjectCreated:PutLegalHold") {
eventsInfo[i] = EventInfo{
Expand All @@ -761,6 +763,7 @@ func (c *S3Client) notificationToEventsInfo(ninfo notification.Info) []EventInfo
Host: record.Source.Host,
Port: record.Source.Port,
UserAgent: record.Source.UserAgent,
VersionID: record.S3.Object.VersionID,
}
} else {
eventsInfo[i] = EventInfo{
Expand All @@ -772,6 +775,7 @@ func (c *S3Client) notificationToEventsInfo(ninfo notification.Info) []EventInfo
Host: record.Source.Host,
Port: record.Source.Port,
UserAgent: record.Source.UserAgent,
VersionID: record.S3.Object.VersionID,
}
}
} else {
Expand All @@ -784,6 +788,7 @@ func (c *S3Client) notificationToEventsInfo(ninfo notification.Info) []EventInfo
Host: record.Source.Host,
Port: record.Source.Port,
UserAgent: record.Source.UserAgent,
VersionID: record.S3.Object.VersionID,
}
}
}
Expand Down Expand Up @@ -1087,6 +1092,7 @@ func (c *S3Client) Put(ctx context.Context, reader io.Reader, size int64, progre
}

opts := minio.PutObjectOptions{
Internal: minio.AdvancedPutOptions{SourceVersionID: putOpts.versionID},
UserMetadata: metadata,
UserTags: tagsMap,
Progress: progress,
Expand Down
1 change: 1 addition & 0 deletions cmd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ type PutOptions struct {
concurrentStream bool
ifNotExists bool
checksum minio.ChecksumType
versionID string
}

// StatOptions holds options of the HEAD operation
Expand Down
1 change: 1 addition & 0 deletions cmd/common-methods.go
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,7 @@ func uploadSourceToTargetURL(ctx context.Context, uploadOpts uploadSourceToTarge
multipartThreads: uint(multipartThreads),
ifNotExists: uploadOpts.ifNotExists,
checksum: uploadOpts.urls.checksum,
versionID: uploadOpts.urls.TargetContent.VersionID,
}

if isReadAt(reader) || length == 0 {
Expand Down
41 changes: 36 additions & 5 deletions cmd/mirror-main.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ var (
Name: "disable-multipart",
Usage: "disable multipart upload feature",
},
cli.BoolFlag{
Name: "with-versioning",
Usage: "upload or remove with versioning enabled on a bucket",
},
cli.StringSliceFlag{
Name: "exclude",
Usage: "exclude object(s) that match specified object name pattern",
Expand Down Expand Up @@ -223,6 +227,9 @@ EXAMPLES:
16. Cross mirror between sites in a active-active deployment.
Site-A: {{.Prompt}} {{.HelpName}} --active-active siteA siteB
Site-B: {{.Prompt}} {{.HelpName}} --active-active siteB siteA

17. Mirror a bucket from MinIO cloud storage to other minIO cloud storage with versioning enabled.
{{.Prompt}} {{.HelpName}} --with-versioning myminio/bucket1 otherminio/bucket2
`,
}

Expand Down Expand Up @@ -372,7 +379,18 @@ func (mj *mirrorJob) doRemove(ctx context.Context, sURLs URLs, event EventInfo)
if mj.opts.isFake {
return sURLs.WithError(nil)
}

// remote S3 to local file system
if sURLs.SourceContent != nil && sURLs.TargetAlias == "" {
// Construct proper path with alias.
sourceWithAlias := filepath.Join(sURLs.SourceAlias, sURLs.SourceContent.URL.Path)
sourceCient, pErr := newClient(sourceWithAlias)
if pErr != nil {
return sURLs.WithError(pErr)
}
if _, err := sourceCient.Stat(ctx, StatOptions{headOnly: true}); err == nil {
return sURLs.WithError(nil)
}
}
// Construct proper path with alias.
targetWithAlias := filepath.Join(sURLs.TargetAlias, sURLs.TargetContent.URL.Path)
clnt, pErr := newClient(targetWithAlias)
Expand All @@ -385,7 +403,7 @@ func (mj *mirrorJob) doRemove(ctx context.Context, sURLs URLs, event EventInfo)
clnt.AddUserAgent(uaMirrorAppName, ReleaseTag)
}
contentCh := make(chan *ClientContent, 1)
contentCh <- &ClientContent{URL: *newClientURL(sURLs.TargetContent.URL.Path)}
contentCh <- &ClientContent{URL: *newClientURL(sURLs.TargetContent.URL.Path), VersionID: sURLs.TargetContent.VersionID}
close(contentCh)
isRemoveBucket := false
resultCh := clnt.Remove(ctx, false, isRemoveBucket, false, false, contentCh)
Expand Down Expand Up @@ -697,6 +715,10 @@ func (mj *mirrorJob) watchMirrorEvents(ctx context.Context, events []EventInfo)

if strings.HasPrefix(string(event.Type), "s3:ObjectCreated:") {
sourceModTime, _ := time.Parse(time.RFC3339Nano, event.Time)
targetContent := &ClientContent{URL: *targetURL}
if mj.opts.enableVersion {
targetContent = &ClientContent{URL: *targetURL, VersionID: event.VersionID}
}
mirrorURL := URLs{
SourceAlias: sourceAlias,
SourceContent: &ClientContent{
Expand All @@ -708,7 +730,7 @@ func (mj *mirrorJob) watchMirrorEvents(ctx context.Context, events []EventInfo)
Metadata: event.UserMetadata,
},
TargetAlias: targetAlias,
TargetContent: &ClientContent{URL: *targetURL},
TargetContent: targetContent,
MD5: mj.opts.md5,
checksum: mj.opts.checksum,
DisableMultipart: mj.opts.disableMultipart,
Expand All @@ -734,11 +756,15 @@ func (mj *mirrorJob) watchMirrorEvents(ctx context.Context, events []EventInfo)
// Ignore delete cascading delete events if cyclical.
continue
}
targetContent := &ClientContent{URL: *targetURL}
if mj.opts.enableVersion {
targetContent = &ClientContent{URL: *targetURL, VersionID: event.VersionID}
}
mirrorURL := URLs{
SourceAlias: sourceAlias,
SourceContent: nil,
SourceContent: &ClientContent{URL: *sourceURL},
TargetAlias: targetAlias,
TargetContent: &ClientContent{URL: *targetURL},
TargetContent: targetContent,
MD5: mj.opts.md5,
checksum: mj.opts.checksum,
DisableMultipart: mj.opts.disableMultipart,
Expand Down Expand Up @@ -1013,6 +1039,7 @@ func runMirror(ctx context.Context, srcURL, dstURL string, cli *cli.Context, enc
md5: md5,
checksum: checksum,
disableMultipart: cli.Bool("disable-multipart"),
enableVersion: cli.Bool("with-versioning"),
skipErrors: cli.Bool("skip-errors"),
excludeOptions: cli.StringSlice("exclude"),
excludeBuckets: cli.StringSlice("exclude-bucket"),
Expand Down Expand Up @@ -1041,6 +1068,10 @@ func runMirror(ctx context.Context, srcURL, dstURL string, cli *cli.Context, enc
mirrorSrcBuckets := srcClt.GetURL().Type == objectStorage && srcClt.GetURL().Path == string(srcClt.GetURL().Separator)
mirrorBucketsToBuckets := mirrorSrcBuckets && createDstBuckets

if cli.Bool("with-versioning") && (!checkIfBucketIsVersioned(ctx, srcURL) || !checkIfBucketIsVersioned(ctx, dstURL)) {
fatalIf(errInvalidArgument().Trace(cli.Command.Name), "You cannot specify --with-versioning in versioning not enabled buckets")
}

if mirrorSrcBuckets || createDstBuckets {
// Synchronize buckets using dirDifference function
for d := range bucketDifference(ctx, srcClt, dstClt, mj.opts) {
Expand Down
2 changes: 1 addition & 1 deletion cmd/mirror-url.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ type mirrorOptions struct {
skipErrors bool
excludeOptions, excludeStorageClasses, excludeBuckets []string
encKeyDB map[string][]prefixSSEPair
md5, disableMultipart bool
md5, disableMultipart, enableVersion bool
olderThan, newerThan string
storageClass string
userMetadata map[string]string
Expand Down
1 change: 1 addition & 0 deletions cmd/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type EventInfo struct {
Port string
UserAgent string
Type notification.EventType
VersionID string
}

// WatchOptions contains watch configuration options
Expand Down