Skip to content

fix(halalcloud): switch to new upload method #8511

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
190 changes: 163 additions & 27 deletions drivers/halalcloud/driver.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
package halalcloud

import (
"bytes"
"context"
"crypto/sha1"
"encoding/json"
"fmt"
"io"
"math"
"net/http"
"net/url"
"path"
"strconv"
Expand All @@ -15,14 +19,12 @@ import (
"github.com/alist-org/alist/v3/internal/model"
"github.com/alist-org/alist/v3/internal/op"
"github.com/alist-org/alist/v3/pkg/http_range"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/city404/v6-public-rpc-proto/go/v6/common"
pbPublicUser "github.com/city404/v6-public-rpc-proto/go/v6/user"
pubUserFile "github.com/city404/v6-public-rpc-proto/go/v6/userfile"
"github.com/ipfs/go-cid"
"github.com/rclone/rclone/lib/readers"
log "github.com/sirupsen/logrus"
"github.com/zzzhr1990/go-common-entity/userfile"
)

Expand Down Expand Up @@ -367,39 +369,173 @@ func (d *HalalCloud) put(ctx context.Context, dstDir model.Obj, fileStream model

newDir := path.Join(dstDir.GetPath(), fileStream.GetName())

result, err := pubUserFile.NewPubUserFileClient(d.HalalCommon.serv.GetGrpcConnection()).CreateUploadToken(ctx, &pubUserFile.File{
// https://github.com/city404/v6-public-rpc-proto/wiki/0.100.000-%E6%96%87%E4%BB%B6%E4%B8%8A%E4%BC%A0

// https://github.com/halalcloud/golang-sdk/blob/652cd8d99c8329b6a975b608d094944cb006d757/cmd/disk/upload.go#L73
result, err := pubUserFile.NewPubUserFileClient(d.HalalCommon.serv.GetGrpcConnection()).CreateUploadTask(ctx, &pubUserFile.File{
// Parent: &pubUserFile.File{Path: currentDir},
Path: newDir,
//ContentIdentity: args[1],
Size: fileStream.GetSize(),
})
if err != nil {
return nil, err
}
u, _ := url.Parse(result.Endpoint)
u.Host = "s3." + u.Host
result.Endpoint = u.String()
s, err := session.NewSession(&aws.Config{
HTTPClient: base.HttpClient,
Credentials: credentials.NewStaticCredentials(result.AccessKey, result.SecretKey, result.Token),
Region: aws.String(result.Region),
Endpoint: aws.String(result.Endpoint),
S3ForcePathStyle: aws.Bool(true),
})
if err != nil {
return nil, err
if result.Created {
return nil, fmt.Errorf("upload file has been created")
}
uploader := s3manager.NewUploader(s, func(u *s3manager.Uploader) {
u.Concurrency = d.uploadThread
})
if fileStream.GetSize() > s3manager.MaxUploadParts*s3manager.DefaultUploadPartSize {
uploader.PartSize = fileStream.GetSize() / (s3manager.MaxUploadParts - 1)
log.Debugf("[halalcloud] Upload task started, total size: %d, block size: %d -> %s\n", fileStream.GetSize(), result.BlockSize, result.Task)
slicesCount := int(math.Ceil(float64(fileStream.GetSize()) / float64(result.BlockSize)))
bufferSize := int(result.BlockSize)
buffer := make([]byte, bufferSize)
slicesList := make([]string, 0)
codec := uint64(0x55)
if result.BlockCodec > 0 {
codec = uint64(result.BlockCodec)
}
mhType := uint64(0x12)
if result.BlockHashType > 0 {
mhType = uint64(result.BlockHashType)
}
prefix := cid.Prefix{
Codec: codec,
MhLength: -1,
MhType: mhType,
Version: 1,
}
// read file
reader := driver.NewLimitedUploadStream(ctx, fileStream)
_, err = uploader.UploadWithContext(ctx, &s3manager.UploadInput{
Bucket: aws.String(result.Bucket),
Key: aws.String(result.Key),
Body: io.TeeReader(reader, driver.NewProgress(fileStream.GetSize(), up)),
})
for {
n, err := io.ReadFull(reader, buffer)
if n > 0 {
data := buffer[:n]
uploadCid, err := postFileSlice(data, result.Task, result.UploadAddress, prefix)
if err != nil {
return nil, err
}
slicesList = append(slicesList, uploadCid.String())
up(float64(len(slicesList)) * 90 / float64(slicesCount))
}
if err == io.EOF || n == 0 {
break
}
}
up(95.0)
newFile, err := makeFile(slicesList, result.Task, result.UploadAddress)
if err != nil {
return nil, err
}
log.Debugf("[halalcloud] File uploaded, cid: %s\n", newFile.ContentIdentity)
return nil, err

}

func makeFile(fileSlice []string, taskID string, uploadAddress string) (*pubUserFile.File, error) {
accessUrl := uploadAddress + "/" + taskID
u, err := url.Parse(accessUrl)
if err != nil {
return nil, err
}
n, _ := json.Marshal(fileSlice)
httpRequest := http.Request{
Method: http.MethodPost,
URL: u,
Header: map[string][]string{
"Accept": {"application/json"},
"Content-Type": {"application/json"},
//"Content-Length": {fmt.Sprintf("%d", len(n))},
},
Body: io.NopCloser(bytes.NewReader(n)),
}
httpResponse, err := base.HttpClient.Do(&httpRequest)
if err != nil {
return nil, err
}
defer httpResponse.Body.Close()
if httpResponse.StatusCode != http.StatusOK && httpResponse.StatusCode != http.StatusCreated {
b, _ := io.ReadAll(httpResponse.Body)
fmt.Println(string(b))
return nil, fmt.Errorf("mk file slice failed, status code: %d", httpResponse.StatusCode)
}
b, _ := io.ReadAll(httpResponse.Body)
var result *pubUserFile.File
err = json.Unmarshal(b, &result)
if err != nil {
return nil, err
}
return result, nil
}

func postFileSlice(fileSlice []byte, taskID string, uploadAddress string, preix cid.Prefix) (cid.Cid, error) {
// 1. sum file slice
newCid, err := preix.Sum(fileSlice)
if err != nil {
return cid.Undef, err
}
// 2. post file slice
sliceCidString := newCid.String()
// /{taskID}/{sliceID}
accessUrl := uploadAddress + "/" + taskID + "/" + sliceCidString
// get {accessUrl} in {getTimeOut}
u, err := url.Parse(accessUrl)
if err != nil {
return cid.Undef, err
}
// header: accept: application/json
// header: content-type: application/octet-stream
// header: content-length: {fileSlice.length}
// header: x-content-cid: {sliceCidString}
// header: x-task-id: {taskID}
httpRequest := http.Request{
Method: http.MethodGet,
URL: u,
Header: map[string][]string{
"Accept": {"application/json"},
},
}
httpResponse, err := base.HttpClient.Do(&httpRequest)
if err != nil {
return cid.Undef, err
}
if httpResponse.StatusCode != http.StatusOK {
return cid.Undef, fmt.Errorf("check file slice failed, status code: %d", httpResponse.StatusCode)
}
var result bool
b, err := io.ReadAll(httpResponse.Body)
if err != nil {
return cid.Undef, err
}
err = json.Unmarshal(b, &result)
if err != nil {
return cid.Undef, err
}
if result {
log.Debugf("[halalcloud] Slice exists, cid: %s\n", newCid)
return newCid, nil
}

httpRequest = http.Request{
Method: http.MethodPost,
URL: u,
Header: map[string][]string{
"Accept": {"application/json"},
"Content-Type": {"application/octet-stream"},
// "Content-Length": {fmt.Sprintf("%d", len(fileSlice))},
},
Body: io.NopCloser(bytes.NewReader(fileSlice)),
}
httpResponse, err = base.HttpClient.Do(&httpRequest)
if err != nil {
return cid.Undef, err
}
defer httpResponse.Body.Close()
if httpResponse.StatusCode != http.StatusOK && httpResponse.StatusCode != http.StatusCreated {
b, _ := io.ReadAll(httpResponse.Body)
fmt.Println(string(b))
return cid.Undef, fmt.Errorf("upload file slice failed, status code: %d", httpResponse.StatusCode)
}
log.Debugf("[halalcloud] Slice uploaded, cid: %s\n", newCid)
return newCid, nil
}

var _ driver.Driver = (*HalalCloud)(nil)
34 changes: 16 additions & 18 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
module github.com/alist-org/alist/v3

go 1.23.4
go 1.24.1

require (
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.17.0
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.6.0
github.com/KirCute/ftpserverlib-pasvportmap v1.25.0
github.com/KirCute/sftpd-alist v0.0.12
github.com/ProtonMail/go-crypto v1.0.0
Expand All @@ -19,7 +21,7 @@ require (
github.com/charmbracelet/bubbles v0.20.0
github.com/charmbracelet/bubbletea v1.1.0
github.com/charmbracelet/lipgloss v0.13.0
github.com/city404/v6-public-rpc-proto/go v0.0.0-20240817070657-90f8e24b653e
github.com/city404/v6-public-rpc-proto/go v0.0.0-20250417084940-1e27e1f2834d
github.com/coreos/go-oidc v2.2.1+incompatible
github.com/deckarep/golang-set/v2 v2.6.0
github.com/dhowden/tag v0.0.0-20240417053706-3d75831295e8
Expand Down Expand Up @@ -65,11 +67,11 @@ require (
github.com/xhofe/wopan-sdk-go v0.1.3
github.com/yeka/zip v0.0.0-20231116150916-03d6312748a9
github.com/zzzhr1990/go-common-entity v0.0.0-20221216044934-fd1c571e3a22
golang.org/x/crypto v0.36.0
golang.org/x/crypto v0.37.0
golang.org/x/exp v0.0.0-20240904232852-e7e105dedf7e
golang.org/x/image v0.19.0
golang.org/x/net v0.38.0
golang.org/x/oauth2 v0.22.0
golang.org/x/net v0.39.0
golang.org/x/oauth2 v0.25.0
golang.org/x/time v0.8.0
google.golang.org/appengine v1.6.8
gopkg.in/ldap.v3 v3.1.0
Expand All @@ -79,11 +81,7 @@ require (
gorm.io/gorm v1.25.11
)

require (
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.17.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/internal v1.10.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.6.0 // indirect
)
require github.com/Azure/azure-sdk-for-go/sdk/internal v1.10.0 // indirect

require (
github.com/STARRY-S/zip v0.2.1 // indirect
Expand All @@ -109,7 +107,6 @@ require (
github.com/ipfs/boxo v0.12.0 // indirect
github.com/jackc/puddle/v2 v2.2.1 // indirect
github.com/klauspost/pgzip v1.2.6 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/matoous/go-nanoid/v2 v2.1.0 // indirect
github.com/microcosm-cc/bluemonday v1.0.27
github.com/nwaples/rardecode/v2 v2.0.0-beta.4.0.20241112120701-034e449c6e78
Expand Down Expand Up @@ -252,15 +249,15 @@ require (
github.com/yusufpapurcu/wmi v1.2.4 // indirect
go.etcd.io/bbolt v1.3.8 // indirect
golang.org/x/arch v0.8.0 // indirect
golang.org/x/sync v0.12.0
golang.org/x/sys v0.31.0 // indirect
golang.org/x/term v0.30.0 // indirect
golang.org/x/text v0.23.0
golang.org/x/sync v0.13.0
golang.org/x/sys v0.32.0 // indirect
golang.org/x/term v0.31.0 // indirect
golang.org/x/text v0.24.0
golang.org/x/tools v0.24.0 // indirect
google.golang.org/api v0.169.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240604185151-ef581f913117 // indirect
google.golang.org/grpc v1.66.0
google.golang.org/protobuf v1.34.2 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20250115164207-1a7da9e5054f // indirect
google.golang.org/grpc v1.71.1
google.golang.org/protobuf v1.36.6 // indirect
gopkg.in/asn1-ber.v1 v1.0.0-20181015200546-f715ec2f112d // indirect
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
gopkg.in/square/go-jose.v2 v2.6.0 // indirect
Expand All @@ -269,3 +266,4 @@ require (
)

// replace github.com/xhofe/115-sdk-go => ../../xhofe/115-sdk-go
replace nhooyr.io/websocket => github.com/coder/websocket v1.8.7
Loading