Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor #71

Draft
wants to merge 37 commits into
base: master
Choose a base branch
from
Draft
Changes from 1 commit
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
72e0118
start refactor
zyxkad Jun 11, 2024
e081599
refactoring cluster
zyxkad Jun 18, 2024
5e89ede
add socket logic
zyxkad Jun 20, 2024
103c480
migrated more cluster
zyxkad Jun 26, 2024
451a665
abstract subscription, token, and user api
zyxkad Jun 27, 2024
8cbd7c5
fix wrong jwt subject usage
zyxkad Jun 27, 2024
e17d8c7
fix format
zyxkad Jun 27, 2024
f2dbcbf
add permHandler
zyxkad Jun 28, 2024
b7b6cc6
complete api
zyxkad Jun 29, 2024
a498e11
go fmt
zyxkad Jul 1, 2024
0ab1240
Merge branch 'master' into refactor
zyxkad Aug 6, 2024
10eee06
update dockerfile
zyxkad Aug 6, 2024
8dca5cb
seperate config
zyxkad Aug 8, 2024
9d44901
fix notifier error
zyxkad Aug 8, 2024
caae4f7
add webhook
zyxkad Aug 8, 2024
74c811d
add license header to installer.sh
zyxkad Aug 9, 2024
18ee907
update cluster handler
zyxkad Aug 10, 2024
4fe7a79
start to reforge storage sync
zyxkad Aug 11, 2024
3c20fd8
run go fmt
zyxkad Aug 11, 2024
173aced
refactored most stuffs
zyxkad Aug 11, 2024
650b0c3
refactored all errors
zyxkad Aug 12, 2024
1d399d9
add gc
zyxkad Aug 13, 2024
e723847
fix certificate request logic
zyxkad Aug 13, 2024
5465766
add report API
zyxkad Aug 14, 2024
16e9db4
seperate runner
zyxkad Aug 17, 2024
88b902b
refactor file download
zyxkad Aug 17, 2024
8cf633b
fill more manager
zyxkad Aug 17, 2024
f2a883d
implemented singleUserManager
zyxkad Aug 17, 2024
a09e758
run go fmt
zyxkad Aug 17, 2024
29265d0
bump go version to 1.23
zyxkad Aug 17, 2024
d436d88
use report API
zyxkad Aug 17, 2024
8698d11
fix nil pointer config
zyxkad Aug 17, 2024
774a388
fix typo
zyxkad Aug 17, 2024
9308d58
fix http client
zyxkad Aug 17, 2024
0ec5189
fix report API, and a few translations
zyxkad Aug 18, 2024
07bf29d
fix unused import
zyxkad Aug 18, 2024
62c50ae
update config
zyxkad Oct 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
refactor file download
now we download first then calculate the hash, which support zero copy
zyxkad committed Aug 17, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
commit 88b902b573ffe1cb8b780cda54a68d8a60c36bf1
10 changes: 10 additions & 0 deletions cluster/http.go
Original file line number Diff line number Diff line change
@@ -32,6 +32,7 @@ import (

gocache "github.com/LiterMC/go-openbmclapi/cache"
"github.com/LiterMC/go-openbmclapi/internal/build"
"github.com/LiterMC/go-openbmclapi/utils"
)

type HTTPClient struct {
@@ -45,6 +46,7 @@ func NewHTTPClient(dialer *net.Dialer, cache gocache.Cache) *HTTPClient {
DialContext: dialer.DialContext,
}
}
transport = utils.NewRoundTripRedirectErrorWrapper(transport)
cachedTransport := transport
if cache != gocache.NoCache {
cachedTransport = &httpcache.Transport{
@@ -72,6 +74,14 @@ func (c *HTTPClient) DoUseCache(req *http.Request) (*http.Response, error) {
return c.cachedCli.Do(req)
}

func (c *HTTPClient) Client() *http.Client {
return c.cli
}

func (c *HTTPClient) CachedClient() *http.Client {
return c.cachedCli
}

func redirectChecker(req *http.Request, via []*http.Request) error {
req.Header.Del("Referer")
if len(via) > 10 {
166 changes: 91 additions & 75 deletions cluster/storage.go
Original file line number Diff line number Diff line change
@@ -454,40 +454,36 @@ func (c *HTTPClient) SyncFiles(

for _, info := range missingMap {
log.Debugf("File %s is for %s", info.Hash, joinStorageIDs(info.Storages))
pathRes, err := c.fetchFile(ctx, &stats, info)
fileRes, err := c.fetchFile(ctx, &stats, info)
if err != nil {
log.TrWarnf("warn.sync.interrupted")
return err
}
go func(info *StorageFileInfo, pathRes <-chan string) {
go func(info *StorageFileInfo, fileRes <-chan *os.File) {
defer log.RecordPanic()
select {
case path := <-pathRes:
case srcFd := <-fileRes:
// cr.syncProg.Add(1)
if path == "" {
if srcFd == nil {
select {
case done <- nil: // TODO: or all storage?
case <-ctx.Done():
}
return
}
defer os.Remove(path)
defer os.Remove(srcFd.Name())
defer srcFd.Close()
// acquire slot here
slotId, buf, free := stats.slots.Alloc(ctx)
if buf == nil {
return
}
defer free()
_ = slotId
var srcFd *os.File
if srcFd, err = os.Open(path); err != nil {
return
}
defer srcFd.Close()
var failed []storage.Storage
for _, target := range info.Storages {
if _, err = srcFd.Seek(0, io.SeekStart); err != nil {
log.Errorf("Cannot seek file %q to start: %v", path, err)
log.Errorf("Cannot seek file %q to start: %v", srcFd.Name(), err)
continue
}
if err = target.Create(info.Hash, srcFd); err != nil {
@@ -498,18 +494,18 @@ func (c *HTTPClient) SyncFiles(
}
free()
srcFd.Close()
os.Remove(path)
os.Remove(srcFd.Name())
select {
case done <- failed:
case <-ctx.Done():
}
case <-ctx.Done():
return
}
}(info, pathRes)
}(info, fileRes)
}

for i := len(missingMap); i > 0; i-- {
for range len(missingMap) {
select {
case failed := <-done:
for _, s := range failed {
@@ -538,26 +534,35 @@ func (c *HTTPClient) SyncFiles(
return nil
}

func (c *HTTPClient) fetchFile(ctx context.Context, stats *syncStats, f *StorageFileInfo) (<-chan string, error) {
func (c *HTTPClient) fetchFile(ctx context.Context, stats *syncStats, f *StorageFileInfo) (<-chan *os.File, error) {
const maxRetryCount = 10

slotId, buf, free := stats.slots.Alloc(ctx)
if buf == nil {
return nil, ctx.Err()
}

pathRes := make(chan string, 1)
hashMethod, err := getHashMethod(len(f.Hash))
if err != nil {
return nil, err
}

reqInd := 0
reqs := make([]*http.Request, 0, len(f.URLs))
for _, rq := range f.URLs {
reqs = append(reqs, rq.Request)
}

fileRes := make(chan *os.File, 1)
go func() {
defer log.RecordPanic()
defer free()
defer close(pathRes)
defer close(fileRes)

var barUnit decor.SizeB1024
var tried atomic.Int32
tried.Store(1)

fPath := f.Hash // TODO: show downloading URL instead? Will it be too long?

bar := stats.pg.AddBar(f.Size,
mpb.BarRemoveOnComplete(),
mpb.BarPriority(slotId),
@@ -570,7 +575,7 @@ func (c *HTTPClient) fetchFile(ctx context.Context, stats *syncStats, f *Storage
}
return fmt.Sprintf("(%d/%d) ", tc, maxRetryCount)
}),
decor.Name(fPath, decor.WCSyncSpaceR),
decor.Name(f.Hash, decor.WCSyncSpaceR),
),
mpb.AppendDecorators(
decor.NewPercentage("%d", decor.WCSyncSpace),
@@ -583,58 +588,81 @@ func (c *HTTPClient) fetchFile(ctx context.Context, stats *syncStats, f *Storage
)
defer bar.Abort(true)

fd, err := os.CreateTemp("", "*.downloading")
if err != nil {
log.Errorf("Cannot create temporary file: %s", err)
stats.failCount.Add(1)
return
}
successed := false
defer func(fd *os.File) {
if !successed {
fd.Close()
os.Remove(fd.Name())
}
}(fd)
// prealloc space
if err := fd.Truncate(f.Size); err != nil {
log.Warnf("File space pre-alloc failed: %v", err)
}

downloadOnce := func() error {
if _, err := fd.Seek(io.SeekStart, 0); err != nil {
return err
}
if err := c.fetchFileWithBuf(ctx, reqs[reqInd], f.Size, hashMethod, f.Hash, fd, buf, func(r io.Reader) io.Reader {
return utils.ProxyPBReader(r, bar, stats.totalBar, &stats.lastInc)
}); err != nil {
reqInd = (reqInd + 1) % len(reqs)
return err
}
return nil
}

interval := time.Second
for {
bar.SetCurrent(0)
hashMethod, err := getHashMethod(len(f.Hash))
err := downloadOnce()
if err == nil {
var path string
if path, err = c.fetchFileWithBuf(ctx, f, hashMethod, buf, func(r io.Reader) io.Reader {
return utils.ProxyPBReader(r, bar, stats.totalBar, &stats.lastInc)
}); err == nil {
pathRes <- path
stats.okCount.Add(1)
log.Infof(lang.Tr("info.sync.downloaded"), fPath,
utils.BytesToUnit((float64)(f.Size)),
(float64)(stats.totalBar.Current())/(float64)(stats.totalSize)*100)
return
}
break
}
bar.SetRefill(bar.Current())

c := tried.Add(1)
if c > maxRetryCount {
log.TrErrorf("error.sync.download.failed", fPath, err)
break
log.TrErrorf("error.sync.download.failed", f.Hash, err)
stats.failCount.Add(1)
return
}
log.TrErrorf("error.sync.download.failed.retry", fPath, interval, err)
log.TrErrorf("error.sync.download.failed.retry", f.Hash, interval, err)
select {
case <-time.After(interval):
interval *= 2
interval = min(interval*2, time.Minute*10)
case <-ctx.Done():
stats.failCount.Add(1)
return
}
}
stats.failCount.Add(1)
successed = true
fileRes <- fd
stats.okCount.Add(1)
log.Infof(lang.Tr("info.sync.downloaded"), f.Hash,
utils.BytesToUnit((float64)(f.Size)),
(float64)(stats.totalBar.Current())/(float64)(stats.totalSize)*100)
}()
return pathRes, nil
return fileRes, nil
}

func (c *HTTPClient) fetchFileWithBuf(
ctx context.Context, f *StorageFileInfo,
hashMethod crypto.Hash, buf []byte,
ctx context.Context, req *http.Request,
size int64, hashMethod crypto.Hash, hash string,
rw io.ReadWriteSeeker, buf []byte,
wrapper func(io.Reader) io.Reader,
) (path string, err error) {
) (err error) {
var (
req *http.Request
res *http.Response
fd *os.File
r io.Reader
)
for _, rq := range f.URLs {
req = rq.Request
break
}
req = req.Clone(ctx)
req.Header.Set("Accept-Encoding", "gzip, deflate")
if res, err = c.Do(req); err != nil {
@@ -643,10 +671,13 @@ func (c *HTTPClient) fetchFileWithBuf(
defer res.Body.Close()
if res.StatusCode != http.StatusOK {
err = utils.NewHTTPStatusErrorFromResponse(res)
}else {
} else {
switch ce := strings.ToLower(res.Header.Get("Content-Encoding")); ce {
case "":
r = res.Body
if res.ContentLength >= 0 && res.ContentLength != size {
err = fmt.Errorf("File size wrong, got %d, expect %d", res.ContentLength, size)
}
case "gzip":
r, err = gzip.NewReader(res.Body)
case "deflate":
@@ -656,41 +687,26 @@ func (c *HTTPClient) fetchFileWithBuf(
}
}
if err != nil {
return "", utils.ErrorFromRedirect(err, res)
return utils.ErrorFromRedirect(err, res)
}
if wrapper != nil {
r = wrapper(r)
}

hw := hashMethod.New()

if fd, err = os.CreateTemp("", "*.downloading"); err != nil {
return
if n, err := io.CopyBuffer(rw, r, buf); err != nil {
return utils.ErrorFromRedirect(err, res)
} else if n != size {
return utils.ErrorFromRedirect(fmt.Errorf("File size wrong, got %d, expect %d", n, size), res)
}
path = fd.Name()
defer func(path string) {
if err != nil {
os.Remove(path)
}
}(path)

_, err = io.CopyBuffer(io.MultiWriter(hw, fd), r, buf)
stat, err2 := fd.Stat()
fd.Close()
if err != nil {
err = utils.ErrorFromRedirect(err, res)
return
if _, err := rw.Seek(io.SeekStart, 0); err != nil {
return err
}
if err2 != nil {
err = err2
return
hw := hashMethod.New()
if _, err := io.CopyBuffer(hw, rw, buf); err != nil {
return err
}
if t := stat.Size(); f.Size >= 0 && t != f.Size {
err = utils.ErrorFromRedirect(fmt.Errorf("File size wrong, got %d, expect %d", t, f.Size), res)
return
} else if hs := hex.EncodeToString(hw.Sum(buf[:0])); hs != f.Hash {
err = utils.ErrorFromRedirect(fmt.Errorf("File hash not match, got %s, expect %s", hs, f.Hash), res)
return
if hs := hex.EncodeToString(hw.Sum(buf[:0])); hs != hash {
return utils.ErrorFromRedirect(fmt.Errorf("File hash not match, got %s, expect %s", hs, hash), res)
}
return
}
Loading