From 85f25e1bdc4d0e6059541e55cfd5fb58184ee59f Mon Sep 17 00:00:00 2001 From: abyss Date: Sat, 11 May 2024 17:19:59 +0800 Subject: [PATCH] feat(fs/qingstor): Added gbk encoding support --- constants/model.go | 8 ++++ endpoint/fs/base.go | 15 +++++-- endpoint/fs/client.go | 67 +++++++++++++++++++++++++++++++- endpoint/fs/destination.go | 5 ++- endpoint/qingstor/base.go | 15 +++++-- endpoint/qingstor/client.go | 64 ++++++++++++++++++++++++++++++ endpoint/qingstor/destination.go | 25 +++++++++--- go.mod | 1 + migrate/object.go | 2 +- model/task.go | 6 +-- 10 files changed, 190 insertions(+), 18 deletions(-) diff --git a/constants/model.go b/constants/model.go index 28bc0d3..90bcd36 100644 --- a/constants/model.go +++ b/constants/model.go @@ -25,6 +25,14 @@ const ( TaskIgnoreExistingMD5Sum = "md5sum" ) +// Constants for task encoding config. +const ( + GBK = "gbk" + HZGB2312 = "gb2312" + Big5 = "big5" + Windows1252 = "cp1252" +) + // Constants for object types. const ( ObjectTypeDirectory = "directory" diff --git a/endpoint/fs/base.go b/endpoint/fs/base.go index 80ec8fc..50c35d4 100644 --- a/endpoint/fs/base.go +++ b/endpoint/fs/base.go @@ -16,7 +16,10 @@ func (c *Client) Name(ctx context.Context) (name string) { // Read implement source.Read func (c *Client) Read(ctx context.Context, p string, _ bool) (r io.Reader, err error) { - cp := filepath.Join(c.AbsPath, p) + cp, err := c.Encode(filepath.Join(c.AbsPath, p)) + if err != nil { + return + } r, err = os.Open(cp) if err != nil { @@ -29,7 +32,10 @@ func (c *Client) Read(ctx context.Context, p string, _ bool) (r io.Reader, err e func (c *Client) ReadRange( ctx context.Context, p string, offset, size int64, ) (r io.Reader, err error) { - cp := filepath.Join(c.AbsPath, p) + cp, err := c.Encode(filepath.Join(c.AbsPath, p)) + if err != nil { + return + } f, err := os.Open(cp) if err != nil { @@ -42,7 +48,10 @@ func (c *Client) ReadRange( // Stat implement source.Stat and destination.Stat func (c *Client) Stat(ctx context.Context, p string, _ bool) (o *model.SingleObject, err error) { - cp := filepath.Join(c.AbsPath, p) + cp, err := c.Encode(filepath.Join(c.AbsPath, p)) + if err != nil { + return + } fi, err := os.Stat(cp) if err != nil { diff --git a/endpoint/fs/client.go b/endpoint/fs/client.go index 8374c6b..bf2f83c 100644 --- a/endpoint/fs/client.go +++ b/endpoint/fs/client.go @@ -2,7 +2,16 @@ package fs import ( "context" + "fmt" + "github.com/sirupsen/logrus" + "golang.org/x/text/encoding" + "golang.org/x/text/encoding/charmap" + "golang.org/x/text/encoding/simplifiedchinese" + "golang.org/x/text/encoding/traditionalchinese" + "golang.org/x/text/transform" + "io/ioutil" "path/filepath" + "strings" "gopkg.in/yaml.v2" @@ -19,7 +28,23 @@ type Client struct { // Options is the struct for fs options type Options struct { - EnableLinkFollow bool `yaml:"enable_link_follow"` + EnableLinkFollow bool `yaml:"enable_link_follow"` + Encoding string `yaml:"encoding"` +} + +func (o *Options) Check() error { + switch o.Encoding { + case "": + case constants.GBK: + case constants.HZGB2312: + case constants.Big5: + case constants.Windows1252: + default: + logrus.Errorf("%s is not a valid value for task encoding", o.Encoding) + return constants.ErrTaskInvalid + } + + return nil } // New will create a Fs. @@ -53,6 +78,46 @@ func New(ctx context.Context, et uint8) (c *Client, err error) { return } + err = opt.Check() + if err != nil { + return + } + c.Options = opt return } + +func (c *Client) Encode(key string) (string, error) { + if c.Options.Encoding != "" { + utf8, err := encode(key, c.Options.Encoding) + if err != nil { + return "", err + } + return utf8, nil + } + + return key, nil +} + +func encode(input, encodingName string) (string, error) { + var enc encoding.Encoding + switch strings.ToLower(encodingName) { + case constants.GBK: + enc = simplifiedchinese.GBK + case constants.HZGB2312: + enc = simplifiedchinese.HZGB2312 + case constants.Big5: + enc = traditionalchinese.Big5 + case constants.Windows1252: + enc = charmap.Windows1252 + default: + return "", fmt.Errorf("unsupported encoding: %s", encodingName) + } + + reader := transform.NewReader(strings.NewReader(input), enc.NewEncoder()) + resBytes, err := ioutil.ReadAll(reader) + if err != nil { + return "", err + } + return string(resBytes), nil +} diff --git a/endpoint/fs/destination.go b/endpoint/fs/destination.go index b74060f..0b540f4 100644 --- a/endpoint/fs/destination.go +++ b/endpoint/fs/destination.go @@ -42,7 +42,10 @@ func (c *Client) Delete(ctx context.Context, p string) (err error) { // Write implement destination.Write func (c *Client) Write(ctx context.Context, p string, _ int64, r io.Reader, _ bool, _ map[string]string) (err error) { - cp := filepath.Join(c.AbsPath, p) + cp, err := c.Encode(filepath.Join(c.AbsPath, p)) + if err != nil { + return + } _, err = os.Stat(filepath.Dir(cp)) if os.IsNotExist(err) { diff --git a/endpoint/qingstor/base.go b/endpoint/qingstor/base.go index b4d0885..f4f5807 100644 --- a/endpoint/qingstor/base.go +++ b/endpoint/qingstor/base.go @@ -24,7 +24,10 @@ func (c *Client) Read(ctx context.Context, p string, isDir bool) (r io.Reader, e if isDir { return nil, nil } - cp := utils.Join(c.Path, p) + cp, err := c.Decode(utils.Join(c.Path, p)) + if err != nil { + return + } resp, err := c.client.GetObject(cp, nil) if err != nil { @@ -39,7 +42,10 @@ func (c *Client) Read(ctx context.Context, p string, isDir bool) (r io.Reader, e func (c *Client) ReadRange( ctx context.Context, p string, offset, size int64, ) (r io.Reader, err error) { - cp := utils.Join(c.Path, p) + cp, err := c.Decode(utils.Join(c.Path, p)) + if err != nil { + return + } resp, err := c.client.GetObject(cp, &service.GetObjectInput{ Range: convert.String(fmt.Sprintf("bytes=%d-%d", offset, offset+size-1)), @@ -54,7 +60,10 @@ func (c *Client) ReadRange( // Stat implement source.Stat and destination.Stat func (c *Client) Stat(ctx context.Context, p string, isDir bool) (o *model.SingleObject, err error) { - cp := utils.Join(c.Path, p) + cp, err := c.Decode(utils.Join(c.Path, p)) + if err != nil { + return + } if isDir { cp += "/" } diff --git a/endpoint/qingstor/client.go b/endpoint/qingstor/client.go index bb5feef..7323aed 100644 --- a/endpoint/qingstor/client.go +++ b/endpoint/qingstor/client.go @@ -2,12 +2,20 @@ package qingstor import ( "context" + "fmt" + "io/ioutil" "net/http" + "strings" "time" "github.com/qingstor/qingstor-sdk-go/v4/config" "github.com/qingstor/qingstor-sdk-go/v4/service" "github.com/sirupsen/logrus" + "golang.org/x/text/encoding" + "golang.org/x/text/encoding/charmap" + "golang.org/x/text/encoding/simplifiedchinese" + "golang.org/x/text/encoding/traditionalchinese" + "golang.org/x/text/transform" "gopkg.in/yaml.v2" "github.com/yunify/qscamel/constants" @@ -29,6 +37,7 @@ type Client struct { StorageClass string `yaml:"storage_class"` DisableURICleaning bool `yaml:"disable_uri_cleaning"` EnableVirtualStyle bool `yaml:"enable_virtual_style"` + Decoding string `yaml:"decoding" msgpack:"d"` // Whether to migrate custom metadata UserDefineMeta bool `yaml:"user_define_meta"` @@ -40,6 +49,21 @@ type Client struct { client *service.Bucket } +func (c *Client) Check() error { + switch c.Decoding { + case "": + case constants.GBK: + case constants.HZGB2312: + case constants.Big5: + case constants.Windows1252: + default: + logrus.Errorf("%s is not a valid value for qingstor decoding", c.Decoding) + return constants.ErrTaskInvalid + } + + return nil +} + type TimeoutConfig struct { ConnectTimeout int64 `yaml:"connect_timeout"` ReadTimeout int64 `yaml:"read_timeout"` @@ -69,6 +93,11 @@ func New(ctx context.Context, et uint8, hc *http.Client) (c *Client, err error) return } + err = c.Check() + if err != nil { + return + } + // Set protocol. if c.Protocol == "" { c.Protocol = "https" @@ -168,3 +197,38 @@ func New(ctx context.Context, et uint8, hc *http.Client) (c *Client, err error) c.client, _ = qs.Bucket(c.BucketName, c.Zone) return } + +func (c *Client) Decode(key string) (string, error) { + if c.Decoding != "" { + utf8, err := decode(key, c.Decoding) + if err != nil { + return "", err + } + return utf8, nil + } + + return key, nil +} + +func decode(input, decodingName string) (string, error) { + var enc encoding.Encoding + switch strings.ToLower(decodingName) { + case constants.GBK: + enc = simplifiedchinese.GBK + case constants.HZGB2312: + enc = simplifiedchinese.HZGB2312 + case constants.Big5: + enc = traditionalchinese.Big5 + case constants.Windows1252: + enc = charmap.Windows1252 + default: + return "", fmt.Errorf("unsupported decoding: %s", decodingName) + } + + reader := transform.NewReader(strings.NewReader(input), enc.NewDecoder()) + utf8Bytes, err := ioutil.ReadAll(reader) + if err != nil { + return "", err + } + return string(utf8Bytes), nil +} diff --git a/endpoint/qingstor/destination.go b/endpoint/qingstor/destination.go index 30c9eb7..a1f25ca 100644 --- a/endpoint/qingstor/destination.go +++ b/endpoint/qingstor/destination.go @@ -43,7 +43,10 @@ func (c *Client) Delete(ctx context.Context, p string) (err error) { // Write implement destination.Write func (c *Client) Write(ctx context.Context, p string, size int64, r io.Reader, isDir bool, meta map[string]string) (err error) { - cp := utils.Join(c.Path, p) + cp, err := c.Decode(utils.Join(c.Path, p)) + if err != nil { + return + } var input *service.PutObjectInput if isDir { cp += "/" @@ -108,7 +111,10 @@ func (c *Client) Partable() bool { // InitPart implement destination.InitPart func (c *Client) InitPart(ctx context.Context, p string, size int64, meta map[string]string) (uploadID string, partSize int64, partNumbers int, err error) { - cp := utils.Join(c.Path, p) + cp, err := c.Decode(utils.Join(c.Path, p)) + if err != nil { + return + } input := &service.InitiateMultipartUploadInput{ XQSStorageClass: convert.String(c.StorageClass), @@ -153,7 +159,10 @@ func (c *Client) InitPart(ctx context.Context, p string, size int64, meta map[st // UploadPart implement destination.UploadPart func (c *Client) UploadPart(ctx context.Context, o *model.PartialObject, r io.Reader) (err error) { - cp := utils.Join(c.Path, o.Key) + cp, err := c.Decode(utils.Join(c.Path, o.Key)) + if err != nil { + return + } _, err = c.client.UploadMultipart(cp, &service.UploadMultipartInput{ // wrap by limitReader to keep body consistent with size @@ -172,7 +181,10 @@ func (c *Client) UploadPart(ctx context.Context, o *model.PartialObject, r io.Re } func (c *Client) CompleteParts(ctx context.Context, path string, uploadId string, totalNumber int) (err error) { - cp := utils.Join(c.Path, path) + cp, err := c.Decode(utils.Join(c.Path, path)) + if err != nil { + return + } logrus.Infof("Object %s start completing part", path) @@ -196,7 +208,10 @@ func (c *Client) CompleteParts(ctx context.Context, path string, uploadId string } func (c *Client) AbortUploads(ctx context.Context, path string, uploadId string) (err error) { - cp := utils.Join(c.Path, path) + cp, err := c.Decode(utils.Join(c.Path, path)) + if err != nil { + return + } logrus.Infof("Object %s start abort part", path) diff --git a/go.mod b/go.mod index a3440ee..b78662c 100644 --- a/go.mod +++ b/go.mod @@ -25,6 +25,7 @@ require ( github.com/upyun/go-sdk v2.1.0+incompatible github.com/vmihailenco/msgpack v3.3.3+incompatible go.uber.org/ratelimit v0.2.0 + golang.org/x/text v0.3.3 google.golang.org/api v0.32.0 gopkg.in/natefinch/lumberjack.v2 v2.0.0-20170531160350-a96e63847dc3 gopkg.in/yaml.v2 v2.4.0 diff --git a/migrate/object.go b/migrate/object.go index ce27042..3e60487 100644 --- a/migrate/object.go +++ b/migrate/object.go @@ -5,12 +5,12 @@ import ( "crypto/md5" "encoding/hex" "fmt" - "github.com/sirupsen/logrus" "io" "strings" "sync" "time" + "github.com/sirupsen/logrus" "github.com/yunify/qscamel/constants" "github.com/yunify/qscamel/endpoint" "github.com/yunify/qscamel/model" diff --git a/model/task.go b/model/task.go index a1505dc..3a581df 100644 --- a/model/task.go +++ b/model/task.go @@ -28,12 +28,10 @@ type Task struct { CheckMD5 bool `yaml:"check_md5" msgpack:"cm"` IgnoreExisting string `yaml:"ignore_existing" msgpack:"ie"` MultipartBoundarySize int64 `yaml:"multipart_boundary_size" msgpack:"mbs"` - // Format: 2006-01-02 15:04:05 - IgnoreBefore string `yaml:"ignore_before" msgpack:"ib"` + IgnoreBefore string `yaml:"ignore_before" msgpack:"ib"` // Format: 2006-01-02 15:04:05 IgnoreBeforeTimestamp int64 `yaml:"-" msgpack:"ibt"` RateLimit int `yaml:"rate_limit" msgpack:"rl"` - // The number of workers for multipart uploads, default 100. - Workers int `yaml:"workers" msgpack:"wk"` + Workers int `yaml:"workers" msgpack:"wk"` // The number of workers for multipart uploads, default 100. // Statistical Information SuccessCount int64 `yaml:"-" msgpack:"sc"`