Skip to content

Commit 3d232ce

Browse files
committed
feature: 增加多线程分片上传和断续上传
1 parent 33b2b3a commit 3d232ce

File tree

12 files changed

+610
-51
lines changed

12 files changed

+610
-51
lines changed

cache/cache.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package cache
2+
3+
import (
4+
"os"
5+
"path/filepath"
6+
"runtime"
7+
8+
"github.com/syndtr/goleveldb/leveldb"
9+
"github.com/syndtr/goleveldb/leveldb/util"
10+
)
11+
12+
// 存储本地数据库连接
13+
var db *leveldb.DB
14+
15+
func GetDBName() string {
16+
if runtime.GOOS == "windows" {
17+
return filepath.Join(os.Getenv("USERPROFILE"), ".upx.db")
18+
}
19+
return filepath.Join(os.Getenv("HOME"), ".upx.db")
20+
}
21+
22+
func GetClient() (*leveldb.DB, error) {
23+
var err error
24+
if db == nil {
25+
db, err = leveldb.OpenFile(GetDBName(), nil)
26+
}
27+
return db, err
28+
}
29+
30+
func Delete(key string) error {
31+
db, err := GetClient()
32+
if err != nil {
33+
return err
34+
}
35+
return db.Delete([]byte(key), nil)
36+
}
37+
38+
func Range(scoop string, fn func(key []byte, data []byte)) error {
39+
db, err := GetClient()
40+
if err != nil {
41+
return err
42+
}
43+
44+
iter := db.NewIterator(
45+
util.BytesPrefix([]byte(scoop)),
46+
nil,
47+
)
48+
49+
for iter.Next() {
50+
fn(iter.Key(), iter.Value())
51+
}
52+
53+
iter.Release()
54+
return iter.Error()
55+
}

cache/upload.go

Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
package cache
2+
3+
import (
4+
"crypto/md5"
5+
"encoding/json"
6+
"fmt"
7+
"time"
8+
)
9+
10+
// 分片上传任务
11+
type MutUpload struct {
12+
UploadID string
13+
14+
// 文件总计大小
15+
Size int64
16+
17+
// 分块大小
18+
PartSize int64
19+
20+
// 本都文件路径
21+
Path string
22+
23+
// 云端文件路径
24+
UpPath string
25+
26+
// 上传时间
27+
CreateAt time.Time
28+
}
29+
30+
func (p *MutUpload) Key() string {
31+
fingerprint := fmt.Sprintf(
32+
"%s:%s:%d:%d",
33+
p.Path,
34+
p.UpPath,
35+
p.Size,
36+
p.PartSize,
37+
)
38+
39+
return fmt.Sprintf(
40+
"mutupload-%x",
41+
md5.Sum([]byte(fingerprint)),
42+
)
43+
}
44+
45+
// 查询分片上传任务
46+
func FindMutUpload(fn func(key string, entity *MutUpload) bool) ([]*MutUpload, error) {
47+
var result []*MutUpload
48+
err := Range("mutupload-", func(key []byte, value []byte) {
49+
var item = &MutUpload{}
50+
if err := json.Unmarshal(value, item); err != nil {
51+
db.Delete(key, nil)
52+
return
53+
}
54+
55+
// 删除过期的分片上传记录
56+
if time.Since(item.CreateAt).Hours() > 12 {
57+
FindMutUploadPart(func(key string, part *MutUploadPart) bool {
58+
if part.UploadID == item.UploadID {
59+
db.Delete([]byte(key), nil)
60+
}
61+
return false
62+
})
63+
db.Delete(key, nil)
64+
}
65+
66+
if fn(string(key), item) {
67+
result = append(result, item)
68+
}
69+
})
70+
return result, err
71+
}
72+
73+
// 添加分片上传
74+
func AddMutUpload(entity *MutUpload) error {
75+
db, err := GetClient()
76+
if err != nil {
77+
return err
78+
}
79+
80+
data, err := json.Marshal(entity)
81+
if err != nil {
82+
return err
83+
}
84+
85+
return db.Put([]byte(entity.Key()), data, nil)
86+
}
87+
88+
// 分片上传任务下的具体分片信息
89+
type MutUploadPart struct {
90+
UploadID string
91+
PartId int64
92+
Len int64
93+
}
94+
95+
func (p *MutUploadPart) Key() string {
96+
return fmt.Sprintf("part-%s-%d", p.UploadID, p.PartId)
97+
}
98+
99+
// 获取已经上传的分片
100+
func FindMutUploadPart(fn func(key string, entity *MutUploadPart) bool) ([]*MutUploadPart, error) {
101+
var result []*MutUploadPart
102+
err := Range("part-", func(key []byte, value []byte) {
103+
var item = &MutUploadPart{}
104+
if err := json.Unmarshal(value, item); err != nil {
105+
db.Delete(key, nil)
106+
return
107+
}
108+
109+
if fn(string(key), item) {
110+
result = append(result, item)
111+
}
112+
})
113+
return result, err
114+
}
115+
116+
// 记录已经上传的分片
117+
func AddMutUploadPart(entity *MutUploadPart) error {
118+
db, err := GetClient()
119+
if err != nil {
120+
return err
121+
}
122+
123+
data, err := json.Marshal(entity)
124+
if err != nil {
125+
return err
126+
}
127+
128+
return db.Put([]byte(entity.Key()), data, nil)
129+
}
130+
131+
func DeleteByUploadID(uploadID string) error {
132+
FindMutUpload(func(key string, entity *MutUpload) bool {
133+
if entity.UploadID == uploadID {
134+
Delete(key)
135+
}
136+
return false
137+
})
138+
FindMutUploadPart(func(key string, entity *MutUploadPart) bool {
139+
if entity.UploadID == uploadID {
140+
Delete(key)
141+
}
142+
return false
143+
})
144+
return nil
145+
}

cache/upload_test.go

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
package cache
2+
3+
import (
4+
"testing"
5+
"time"
6+
7+
"github.com/stretchr/testify/assert"
8+
)
9+
10+
func TestMutUpload(t *testing.T) {
11+
mutUpload := &MutUpload{
12+
UploadID: "1",
13+
Size: 100 * 12,
14+
PartSize: 100,
15+
Path: "a.jpg",
16+
UpPath: "b.jpg",
17+
CreateAt: time.Now(),
18+
}
19+
assert.NoError(t, AddMutUpload(mutUpload))
20+
assert.NoError(t, AddMutUpload(&MutUpload{
21+
UploadID: "2",
22+
Size: 100 * 12,
23+
PartSize: 100,
24+
Path: "/c/a.jpg",
25+
UpPath: "b.jpg",
26+
CreateAt: time.Now(),
27+
}))
28+
results, err := FindMutUpload(func(key string, entity *MutUpload) bool {
29+
return key == mutUpload.Key()
30+
})
31+
32+
assert.NoError(t, err)
33+
assert.Equal(t, len(results), 1)
34+
assert.Equal(
35+
t,
36+
results[0].Key(),
37+
mutUpload.Key(),
38+
)
39+
}
40+
41+
func TestMutUploadPart(t *testing.T) {
42+
part1s := []int64{}
43+
for i := 0; i < 100; i++ {
44+
part1s = append(part1s, int64(i))
45+
}
46+
47+
for _, v := range part1s {
48+
err := AddMutUploadPart(&MutUploadPart{
49+
UploadID: "1",
50+
PartId: v,
51+
Len: 100,
52+
})
53+
assert.NoError(t, err)
54+
}
55+
56+
part2s := []int64{}
57+
records, err := FindMutUploadPart(func(key string, entity *MutUploadPart) bool {
58+
return entity.UploadID == "1"
59+
})
60+
assert.NoError(t, err)
61+
for _, v := range records {
62+
part2s = append(part2s, v.PartId)
63+
}
64+
65+
assert.ElementsMatch(t, part1s, part2s)
66+
}

commands.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -348,10 +348,12 @@ func NewPutCommand() cli.Command {
348348
upPath,
349349
c.Int("w"),
350350
c.Bool("all"),
351+
c.Bool("c"),
351352
)
352353
return nil
353354
},
354355
Flags: []cli.Flag{
356+
cli.BoolFlag{Name: "c", Usage: "continue put, resume broken put"},
355357
cli.IntFlag{Name: "w", Usage: "max concurrent threads", Value: 5},
356358
cli.BoolFlag{Name: "all", Usage: "upload all files including hidden files"},
357359
},
@@ -373,10 +375,12 @@ func NewUploadCommand() cli.Command {
373375
c.String("remote"),
374376
c.Int("w"),
375377
c.Bool("all"),
378+
c.Bool("c"),
376379
)
377380
return nil
378381
},
379382
Flags: []cli.Flag{
383+
cli.BoolFlag{Name: "c", Usage: "continue put, resume broken put"},
380384
cli.BoolFlag{Name: "all", Usage: "upload all files including hidden files"},
381385
cli.IntFlag{Name: "w", Usage: "max concurrent threads", Value: 5},
382386
cli.StringFlag{Name: "remote", Usage: "remote path", Value: "./"},

db.go

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,11 @@ import (
66
"os"
77
"path"
88
"path/filepath"
9-
"runtime"
109
"strings"
1110

1211
"github.com/syndtr/goleveldb/leveldb"
12+
"github.com/syndtr/goleveldb/leveldb/util"
13+
"github.com/upyun/upx/cache"
1314
)
1415

1516
var db *leveldb.DB
@@ -31,20 +32,22 @@ type dbValue struct {
3132
Items []*fileMeta `json:"items"`
3233
}
3334

34-
func getDBName() string {
35-
if runtime.GOOS == "windows" {
36-
return filepath.Join(os.Getenv("USERPROFILE"), ".upx.db")
37-
}
38-
return filepath.Join(os.Getenv("HOME"), ".upx.db")
39-
}
40-
4135
func makeDBKey(src, dst string) ([]byte, error) {
4236
return json.Marshal(&dbKey{
4337
SrcPath: src,
4438
DstPath: path.Join(session.Bucket, dst),
4539
})
4640
}
4741

42+
func parseDBKey(key []byte) (*dbKey, error) {
43+
dbkey := &dbKey{}
44+
err := json.Unmarshal(
45+
key,
46+
dbkey,
47+
)
48+
return dbkey, err
49+
}
50+
4851
func makeDBValue(filename string, md5 bool) (*dbValue, error) {
4952
finfo, err := os.Stat(filename)
5053
if err != nil {
@@ -120,16 +123,17 @@ func delDBValue(src, dst string) error {
120123

121124
func delDBValues(srcPrefix, dstPrefix string) {
122125
dstPrefix = path.Join(session.Bucket, dstPrefix)
123-
iter := db.NewIterator(nil, nil)
126+
iter := db.NewIterator(
127+
util.BytesPrefix([]byte("{")),
128+
nil,
129+
)
124130
if ok := iter.First(); !ok {
125131
return
126132
}
127133
for {
128-
k := new(dbKey)
129-
key := iter.Key()
130-
err := json.Unmarshal(key, k)
134+
k, err := parseDBKey(iter.Key())
131135
if err != nil {
132-
PrintError("decode %s: %v", string(key), err)
136+
PrintError("decode %s: %v", string(iter.Key()), err)
133137
}
134138
if strings.HasPrefix(k.SrcPath, srcPrefix) && strings.HasPrefix(k.DstPath, dstPrefix) {
135139
PrintOnlyVerbose("found %s => %s to delete", k.SrcPath, k.DstPath)
@@ -182,9 +186,9 @@ func diffFileMetas(src []*fileMeta, dst []*fileMeta) []*fileMeta {
182186
}
183187

184188
func initDB() (err error) {
185-
db, err = leveldb.OpenFile(getDBName(), nil)
189+
db, err = cache.GetClient()
186190
if err != nil {
187-
Print("db %v %s", err, getDBName())
191+
Print("db %v %s", err, cache.GetDBName())
188192
}
189193
return err
190194
}

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ require (
2424
github.com/rivo/uniseg v0.4.4 // indirect
2525
github.com/russross/blackfriday/v2 v2.1.0 // indirect
2626
golang.org/x/net v0.8.0 // indirect
27+
golang.org/x/sync v0.2.0 // indirect
2728
golang.org/x/sys v0.8.0 // indirect
2829
gopkg.in/yaml.v3 v3.0.1 // indirect
2930
)

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,8 @@ golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJ
6666
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
6767
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
6868
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
69+
golang.org/x/sync v0.2.0 h1:PUR+T4wwASmuSTYdKjYHI5TD22Wy5ogLU5qZCOLxBrI=
70+
golang.org/x/sync v0.2.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
6971
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
7072
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
7173
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=

0 commit comments

Comments
 (0)