Skip to content

Commit

Permalink
gateway: set xattr before rename to ensure consistency (#5257)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhijian-pro authored Oct 28, 2024
1 parent 54dad73 commit ae411fd
Showing 1 changed file with 47 additions and 46 deletions.
93 changes: 47 additions & 46 deletions pkg/gateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,27 @@ func (n *jfsObjects) CopyObject(ctx context.Context, srcBucket, srcObject, dstBu
logger.Errorf("copy %s to %s: %s", src, tmp, err)
return
}

var etag []byte
if n.gConf.KeepEtag {
etag, _ = n.fs.GetXattr(mctx, src, s3Etag)
if len(etag) != 0 {
eno = n.fs.SetXattr(mctx, tmp, s3Etag, etag, 0)
if eno != 0 {
logger.Warnf("set xattr error, path: %s,xattr: %s,value: %s,flags: %d", tmp, s3Etag, etag, 0)
}
}
}

var tagStr string
if n.gConf.ObjTag && srcInfo.UserDefined != nil {
if tagStr = srcInfo.UserDefined[xhttp.AmzObjectTagging]; tagStr != "" {
if eno := n.fs.SetXattr(mctx, tmp, s3Tags, []byte(tagStr), 0); eno != 0 {
logger.Errorf("set object tags error, path: %s,value: %s error %s", tmp, tagStr, eno)
}
}
}

eno = n.fs.Rename(mctx, tmp, dst, 0)
if eno == syscall.ENOENT {
if err = n.mkdirAll(ctx, path.Dir(dst)); err != nil {
Expand All @@ -577,26 +598,6 @@ func (n *jfsObjects) CopyObject(ctx context.Context, srcBucket, srcObject, dstBu
return
}

var etag []byte
if n.gConf.KeepEtag {
etag, _ = n.fs.GetXattr(mctx, src, s3Etag)
if len(etag) != 0 {
eno = n.fs.SetXattr(mctx, dst, s3Etag, etag, 0)
if eno != 0 {
logger.Warnf("set xattr error, path: %s,xattr: %s,value: %s,flags: %d", dst, s3Etag, etag, 0)
}
}
}

var tagStr string
if n.gConf.ObjTag && srcInfo.UserDefined != nil {
if tagStr = srcInfo.UserDefined[xhttp.AmzObjectTagging]; tagStr != "" {
if eno := n.fs.SetXattr(mctx, dst, s3Tags, []byte(tagStr), 0); eno != 0 {
logger.Errorf("set object tags error, path: %s,value: %s error %s", dst, tagStr, eno)
}
}
}

return minio.ObjectInfo{
Bucket: dstBucket,
Name: dstObject,
Expand Down Expand Up @@ -725,7 +726,7 @@ func (n *jfsObjects) mkdirAll(ctx context.Context, p string) error {
return eno
}

func (n *jfsObjects) putObject(ctx context.Context, bucket, object string, r *minio.PutObjReader, opts minio.ObjectOptions) (err error) {
func (n *jfsObjects) putObject(ctx context.Context, bucket, object string, r *minio.PutObjReader, opts minio.ObjectOptions, objTagging map[string]string) (err error) {
tmpname := n.tpath(bucket, "tmp", minio.MustGetUUID())
f, eno := n.fs.Create(mctx, tmpname, 0666, n.gConf.Umask)
if eno == syscall.ENOENT {
Expand Down Expand Up @@ -766,6 +767,12 @@ func (n *jfsObjects) putObject(ctx context.Context, bucket, object string, r *mi
if err != nil {
return
}
for k, v := range objTagging {
if errno := n.fs.SetXattr(mctx, tmpname, k, []byte(v), 0); errno != 0 {
logger.Errorf("set xattr error, path: %s(%s),xattr: %s,value: %s,flags: %d", tmpname, object, k, v, 0)
}
}

eno = n.fs.Rename(mctx, tmpname, object, 0)
if eno == syscall.ENOENT {
if err = n.mkdirAll(ctx, path.Dir(object)); err != nil {
Expand All @@ -785,7 +792,8 @@ func (n *jfsObjects) PutObject(ctx context.Context, bucket string, object string
if err = n.checkBucket(ctx, bucket); err != nil {
return
}

var tagStr string
var etag string
p := n.path(bucket, object)
if strings.HasSuffix(object, sep) {
if err = n.mkdirAll(ctx, p); err != nil {
Expand All @@ -802,30 +810,26 @@ func (n *jfsObjects) PutObject(ctx context.Context, bucket string, object string
}
// if the put object is a directory, set its atime to 0
n.setFileAtime(p, 0)
} else if err = n.putObject(ctx, bucket, p, r, opts); err != nil {
return
} else {
var objTagging = make(map[string]string)
etag = r.MD5CurrentHexString()
if n.gConf.KeepEtag && !strings.HasSuffix(object, sep) {
objTagging[s3Etag] = etag
}
// tags: key1=value1&key2=value2&key3=value3
if n.gConf.ObjTag && opts.UserDefined != nil {
if tagStr = opts.UserDefined[xhttp.AmzObjectTagging]; tagStr != "" {
objTagging[s3Tags] = tagStr
}
}
if err = n.putObject(ctx, bucket, p, r, opts, objTagging); err != nil {
return
}
}
fi, eno := n.fs.Stat(mctx, p)
if eno != 0 {
return objInfo, jfsToObjectErr(ctx, eno, bucket, object)
}
etag := r.MD5CurrentHexString()
if n.gConf.KeepEtag && !strings.HasSuffix(object, sep) {
eno = n.fs.SetXattr(mctx, p, s3Etag, []byte(etag), 0)
if eno != 0 {
logger.Errorf("set xattr error, path: %s,xattr: %s,value: %s,flags: %d", p, s3Etag, etag, 0)
}
}
// tags: key1=value1&key2=value2&key3=value3
var tagStr string
if n.gConf.ObjTag && opts.UserDefined != nil {
if tagStr = opts.UserDefined[xhttp.AmzObjectTagging]; tagStr != "" {
if eno := n.fs.SetXattr(mctx, p, s3Tags, []byte(tagStr), 0); eno != 0 {
logger.Errorf("set object tags error, path: %s,value: %s error: %s", p, tagStr, eno)
}
}
}

return minio.ObjectInfo{
Bucket: bucket,
Name: object,
Expand Down Expand Up @@ -1021,14 +1025,11 @@ func (n *jfsObjects) PutObjectPart(ctx context.Context, bucket, object, uploadID
return
}
p := n.ppath(bucket, uploadID, strconv.Itoa(partID))
if err = n.putObject(ctx, bucket, p, r, opts); err != nil {
etag := r.MD5CurrentHexString()
if err = n.putObject(ctx, bucket, p, r, opts, map[string]string{s3Etag: etag}); err != nil {
err = jfsToObjectErr(ctx, err, bucket, object)
return
}
etag := r.MD5CurrentHexString()
if n.fs.SetXattr(mctx, p, s3Etag, []byte(etag), 0) != 0 {
logger.Warnf("set xattr error, path: %s,xattr: %s,value: %s,flags: %d", p, s3Etag, etag, 0)
}
info.PartNumber = partID
info.ETag = etag
info.LastModified = minio.UTCNow()
Expand Down

0 comments on commit ae411fd

Please sign in to comment.