diff --git a/cluster.go b/cluster.go index fa67eca4..4ef391f1 100644 --- a/cluster.go +++ b/cluster.go @@ -20,27 +20,16 @@ package main import ( - "compress/gzip" - "compress/zlib" "context" - "crypto" "encoding/base64" - "encoding/hex" - "encoding/json" "errors" "fmt" - "io" "net" "net/http" - "net/url" "os" - "path" "path/filepath" "regexp" "runtime" - "sort" - "strconv" - "strings" "sync" "sync/atomic" "time" @@ -49,10 +38,6 @@ import ( "github.com/LiterMC/socket.io/engine.io" "github.com/gorilla/websocket" "github.com/gregjones/httpcache" - "github.com/hamba/avro/v2" - "github.com/klauspost/compress/zstd" - "github.com/vbauerster/mpb/v8" - "github.com/vbauerster/mpb/v8/decor" gocache "github.com/LiterMC/go-openbmclapi/cache" "github.com/LiterMC/go-openbmclapi/database" @@ -63,7 +48,6 @@ import ( "github.com/LiterMC/go-openbmclapi/notify/email" "github.com/LiterMC/go-openbmclapi/notify/webpush" "github.com/LiterMC/go-openbmclapi/storage" - "github.com/LiterMC/go-openbmclapi/update" "github.com/LiterMC/go-openbmclapi/utils" ) @@ -686,906 +670,3 @@ func (cr *Cluster) Disabled() <-chan struct{} { defer cr.mux.RUnlock() return cr.disabled } - -func (cr *Cluster) CloneFileset() map[string]int64 { - cr.filesetMux.RLock() - defer cr.filesetMux.RUnlock() - fileset := make(map[string]int64, len(cr.fileset)) - for k, v := range cr.fileset { - fileset[k] = v - } - return fileset -} - -func (cr *Cluster) CachedFileSize(hash string) (size int64, ok bool) { - cr.filesetMux.RLock() - defer cr.filesetMux.RUnlock() - if size, ok = cr.fileset[hash]; !ok { - return - } - if size < 0 { - size = -size - } - return -} - -type CertKeyPair struct { - Cert string `json:"cert"` - Key string `json:"key"` -} - -func (cr *Cluster) RequestCert(ctx context.Context) (ckp *CertKeyPair, err error) { - resCh, err := cr.socket.EmitWithAck("request-cert") - if err != nil { - return - } - var data []any - select { - case <-ctx.Done(): - return nil, ctx.Err() - case data = <-resCh: - } - if ero := data[0]; ero != nil { - err = fmt.Errorf("socket.io remote error: %v", ero) - return - } - pair := data[1].(map[string]any) - ckp = &CertKeyPair{ - Cert: pair["cert"].(string), - Key: pair["key"].(string), - } - return -} - -func (cr *Cluster) makeReq(ctx context.Context, method string, relpath string, query url.Values) (req *http.Request, err error) { - return cr.makeReqWithBody(ctx, method, relpath, query, nil) -} - -func (cr *Cluster) makeReqWithBody( - ctx context.Context, - method string, relpath string, - query url.Values, body io.Reader, -) (req *http.Request, err error) { - var u *url.URL - if u, err = url.Parse(cr.prefix); err != nil { - return - } - u.Path = path.Join(u.Path, relpath) - if query != nil { - u.RawQuery = query.Encode() - } - target := u.String() - - req, err = http.NewRequestWithContext(ctx, method, target, body) - if err != nil { - return - } - req.Header.Set("User-Agent", build.ClusterUserAgent) - return -} - -func (cr *Cluster) makeReqWithAuth(ctx context.Context, method string, relpath string, query url.Values) (req *http.Request, err error) { - req, err = cr.makeReq(ctx, method, relpath, query) - if err != nil { - return - } - token, err := cr.GetAuthToken(ctx) - if err != nil { - return - } - req.Header.Set("Authorization", "Bearer "+token) - return -} - -type FileInfo struct { - Path string `json:"path" avro:"path"` - Hash string `json:"hash" avro:"hash"` - Size int64 `json:"size" avro:"size"` - Mtime int64 `json:"mtime" avro:"mtime"` -} - -// from -var fileListSchema = avro.MustParse(`{ - "type": "array", - "items": { - "type": "record", - "name": "fileinfo", - "fields": [ - {"name": "path", "type": "string"}, - {"name": "hash", "type": "string"}, - {"name": "size", "type": "long"}, - {"name": "mtime", "type": "long"} - ] - } -}`) - -func (cr *Cluster) GetFileList(ctx context.Context, lastMod int64) (files []FileInfo, err error) { - var query url.Values - if lastMod > 0 { - query = url.Values{ - "lastModified": {strconv.FormatInt(lastMod, 10)}, - } - } - req, err := cr.makeReqWithAuth(ctx, http.MethodGet, "/openbmclapi/files", query) - if err != nil { - return - } - res, err := cr.cachedCli.Do(req) - if err != nil { - return - } - defer res.Body.Close() - if res.StatusCode != http.StatusOK { - err = utils.NewHTTPStatusErrorFromResponse(res) - return - } - log.Debug("Parsing filelist body ...") - zr, err := zstd.NewReader(res.Body) - if err != nil { - return - } - defer zr.Close() - if err = avro.NewDecoderForSchema(fileListSchema, zr).Decode(&files); err != nil { - return - } - log.Debugf("Filelist parsed, length = %d", len(files)) - return -} - -func (cr *Cluster) GetConfig(ctx context.Context) (cfg *OpenbmclapiAgentConfig, err error) { - req, err := cr.makeReqWithAuth(ctx, http.MethodGet, "/openbmclapi/configuration", nil) - if err != nil { - return - } - res, err := cr.cachedCli.Do(req) - if err != nil { - return - } - defer res.Body.Close() - if res.StatusCode != http.StatusOK { - err = utils.NewHTTPStatusErrorFromResponse(res) - return - } - cfg = new(OpenbmclapiAgentConfig) - if err = json.NewDecoder(res.Body).Decode(cfg); err != nil { - cfg = nil - return - } - return -} - -type syncStats struct { - slots *limited.BufSlots - noOpen bool - - totalSize int64 - okCount, failCount atomic.Int32 - totalFiles int - - pg *mpb.Progress - totalBar *mpb.Bar - lastInc atomic.Int64 -} - -func (cr *Cluster) SyncFiles(ctx context.Context, files []FileInfo, heavyCheck bool) bool { - log.Infof(Tr("info.sync.prepare"), len(files)) - if !cr.issync.CompareAndSwap(false, true) { - log.Warn("Another sync task is running!") - return false - } - defer cr.issync.Store(false) - - sort.Slice(files, func(i, j int) bool { return files[i].Hash < files[j].Hash }) - if cr.syncFiles(ctx, files, heavyCheck) != nil { - return false - } - - cr.filesetMux.Lock() - for _, f := range files { - cr.fileset[f.Hash] = f.Size - } - cr.filesetMux.Unlock() - - return true -} - -type fileInfoWithTargets struct { - FileInfo - tgMux sync.Mutex - targets []storage.Storage -} - -func (cr *Cluster) checkFileFor( - ctx context.Context, - sto storage.Storage, files []FileInfo, - heavy bool, - missing *utils.SyncMap[string, *fileInfoWithTargets], - pg *mpb.Progress, -) { - var missingCount atomic.Int32 - addMissing := func(f FileInfo) { - missingCount.Add(1) - if info, has := missing.GetOrSet(f.Hash, func() *fileInfoWithTargets { - return &fileInfoWithTargets{ - FileInfo: f, - targets: []storage.Storage{sto}, - } - }); has { - info.tgMux.Lock() - info.targets = append(info.targets, sto) - info.tgMux.Unlock() - } - } - - log.Infof(Tr("info.check.start"), sto.String(), heavy) - - var ( - checkingHashMux sync.Mutex - checkingHash string - lastCheckingHash string - slots *limited.BufSlots - ) - - if heavy { - slots = limited.NewBufSlots(runtime.GOMAXPROCS(0) * 2) - } - - bar := pg.AddBar(0, - mpb.BarRemoveOnComplete(), - mpb.PrependDecorators( - decor.Name(Tr("hint.check.checking")), - decor.Name(sto.String()), - decor.OnCondition( - decor.Any(func(decor.Statistics) string { - c, l := slots.Cap(), slots.Len() - return fmt.Sprintf(" (%d / %d)", c-l, c) - }), - heavy, - ), - ), - mpb.AppendDecorators( - decor.CountersNoUnit("%d / %d", decor.WCSyncSpaceR), - decor.NewPercentage("%d", decor.WCSyncSpaceR), - decor.EwmaETA(decor.ET_STYLE_GO, 30), - ), - mpb.BarExtender((mpb.BarFillerFunc)(func(w io.Writer, _ decor.Statistics) (err error) { - if checkingHashMux.TryLock() { - lastCheckingHash = checkingHash - checkingHashMux.Unlock() - } - if lastCheckingHash != "" { - _, err = fmt.Fprintln(w, "\t", lastCheckingHash) - } - return - }), false), - ) - defer bar.Wait() - defer bar.Abort(true) - - bar.SetTotal(0x100, false) - - sizeMap := make(map[string]int64, len(files)) - { - start := time.Now() - var checkedMp [256]bool - sto.WalkDir(func(hash string, size int64) error { - if n := utils.HexTo256(hash); !checkedMp[n] { - checkedMp[n] = true - now := time.Now() - bar.EwmaIncrement(now.Sub(start)) - start = now - } - sizeMap[hash] = size - return nil - }) - } - - bar.SetCurrent(0) - bar.SetTotal((int64)(len(files)), false) - for _, f := range files { - if ctx.Err() != nil { - return - } - start := time.Now() - hash := f.Hash - if checkingHashMux.TryLock() { - checkingHash = hash - checkingHashMux.Unlock() - } - name := sto.String() + "/" + hash - if f.Size == 0 { - log.Debugf("Skipped empty file %s", name) - } else if size, ok := sizeMap[hash]; ok { - if size != f.Size { - log.Warnf(Tr("warn.check.modified.size"), name, size, f.Size) - addMissing(f) - } else if heavy { - hashMethod, err := getHashMethod(len(hash)) - if err != nil { - log.Errorf(Tr("error.check.unknown.hash.method"), hash) - } else { - _, buf, free := slots.Alloc(ctx) - if buf == nil { - return - } - go func(f FileInfo, buf []byte, free func()) { - defer log.RecoverPanic(nil) - defer free() - miss := true - r, err := sto.Open(hash) - if err != nil { - log.Errorf(Tr("error.check.open.failed"), name, err) - } else { - hw := hashMethod.New() - _, err = io.CopyBuffer(hw, r, buf[:]) - r.Close() - if err != nil { - log.Errorf(Tr("error.check.hash.failed"), name, err) - } else if hs := hex.EncodeToString(hw.Sum(buf[:0])); hs != hash { - log.Warnf(Tr("warn.check.modified.hash"), name, hs, hash) - } else { - miss = false - } - } - if miss { - addMissing(f) - } - bar.EwmaIncrement(time.Since(start)) - }(f, buf, free) - continue - } - } - } else { - // log.Debugf("Could not found file %q", name) - addMissing(f) - } - bar.EwmaIncrement(time.Since(start)) - } - - checkingHashMux.Lock() - checkingHash = "" - checkingHashMux.Unlock() - - bar.SetTotal(-1, true) - log.Infof(Tr("info.check.done"), sto.String(), missingCount.Load()) - return -} - -func (cr *Cluster) CheckFiles( - ctx context.Context, - files []FileInfo, - heavyCheck bool, - pg *mpb.Progress, -) (map[string]*fileInfoWithTargets, error) { - missingMap := utils.NewSyncMap[string, *fileInfoWithTargets]() - done := make(chan struct{}, 0) - - for _, s := range cr.storages { - go func(s storage.Storage) { - defer log.RecordPanic() - defer func() { - select { - case done <- struct{}{}: - case <-ctx.Done(): - } - }() - cr.checkFileFor(ctx, s, files, heavyCheck, missingMap, pg) - }(s) - } - for i := len(cr.storages); i > 0; i-- { - select { - case <-done: - case <-ctx.Done(): - log.Warn(Tr("warn.sync.interrupted")) - return nil, ctx.Err() - } - } - return missingMap.RawMap(), nil -} - -func (cr *Cluster) SetFilesetByExists(ctx context.Context, files []FileInfo) error { - pg := mpb.New(mpb.WithRefreshRate(time.Second/2), mpb.WithAutoRefresh(), mpb.WithWidth(140)) - defer pg.Shutdown() - log.SetLogOutput(pg) - defer log.SetLogOutput(nil) - - missingMap, err := cr.CheckFiles(ctx, files, false, pg) - if err != nil { - return err - } - fileset := make(map[string]int64, len(files)) - stoCount := len(cr.storages) - for _, f := range files { - if t, ok := missingMap[f.Hash]; !ok || len(t.targets) < stoCount { - fileset[f.Hash] = f.Size - } - } - - cr.mux.Lock() - cr.fileset = fileset - cr.mux.Unlock() - return nil -} - -func (cr *Cluster) syncFiles(ctx context.Context, files []FileInfo, heavyCheck bool) error { - pg := mpb.New(mpb.WithRefreshRate(time.Second/2), mpb.WithAutoRefresh(), mpb.WithWidth(140)) - defer pg.Shutdown() - log.SetLogOutput(pg) - defer log.SetLogOutput(nil) - - cr.syncProg.Store(0) - cr.syncTotal.Store(-1) - - missingMap, err := cr.CheckFiles(ctx, files, heavyCheck, pg) - if err != nil { - return err - } - var ( - missing = make([]*fileInfoWithTargets, 0, len(missingMap)) - missingSize int64 = 0 - ) - for _, f := range missingMap { - missing = append(missing, f) - missingSize += f.Size - } - totalFiles := len(missing) - if totalFiles == 0 { - log.Info(Tr("info.sync.none")) - return nil - } - - go cr.notifyManager.OnSyncBegin(len(missing), missingSize) - defer func() { - go cr.notifyManager.OnSyncDone() - }() - - cr.syncTotal.Store((int64)(totalFiles)) - - ccfg, err := cr.GetConfig(ctx) - if err != nil { - return err - } - syncCfg := ccfg.Sync - log.Infof(Tr("info.sync.config"), syncCfg) - - done := make(chan struct{}, 0) - - var stats syncStats - stats.pg = pg - stats.noOpen = syncCfg.Source == "center" - stats.slots = limited.NewBufSlots(syncCfg.Concurrency) - stats.totalFiles = totalFiles - for _, f := range missing { - stats.totalSize += f.Size - } - - var barUnit decor.SizeB1024 - stats.lastInc.Store(time.Now().UnixNano()) - stats.totalBar = pg.AddBar(stats.totalSize, - mpb.BarRemoveOnComplete(), - mpb.BarPriority(stats.slots.Cap()), - mpb.PrependDecorators( - decor.Name(Tr("hint.sync.total")), - decor.NewPercentage("%.2f"), - ), - mpb.AppendDecorators( - decor.Any(func(decor.Statistics) string { - return fmt.Sprintf("(%d + %d / %d) ", stats.okCount.Load(), stats.failCount.Load(), stats.totalFiles) - }), - decor.Counters(barUnit, "(%.1f/%.1f) "), - decor.EwmaSpeed(barUnit, "%.1f ", 30), - decor.OnComplete( - decor.EwmaETA(decor.ET_STYLE_GO, 30), "done", - ), - ), - ) - - log.Infof(Tr("hint.sync.start"), totalFiles, utils.BytesToUnit((float64)(stats.totalSize))) - start := time.Now() - - for _, f := range missing { - log.Debugf("File %s is for %v", f.Hash, f.targets) - pathRes, err := cr.fetchFile(ctx, &stats, f.FileInfo) - if err != nil { - log.Warn(Tr("warn.sync.interrupted")) - return err - } - go func(f *fileInfoWithTargets, pathRes <-chan string) { - defer log.RecordPanic() - defer func() { - select { - case done <- struct{}{}: - case <-ctx.Done(): - } - }() - select { - case path := <-pathRes: - cr.syncProg.Add(1) - if path != "" { - defer os.Remove(path) - // acquire slot here - slotId, buf, free := stats.slots.Alloc(ctx) - if buf == nil { - return - } - defer free() - _ = slotId - var srcFd *os.File - if srcFd, err = os.Open(path); err != nil { - return - } - defer srcFd.Close() - for _, target := range f.targets { - if _, err = srcFd.Seek(0, io.SeekStart); err != nil { - log.Errorf("Cannot seek file %q to start: %v", path, err) - continue - } - err := target.Create(f.Hash, srcFd) - if err != nil { - log.Errorf(Tr("error.sync.create.failed"), target.String(), f.Hash, err) - continue - } - } - } - case <-ctx.Done(): - return - } - }(f, pathRes) - } - for i := len(missing); i > 0; i-- { - select { - case <-done: - case <-ctx.Done(): - log.Warn(Tr("warn.sync.interrupted")) - return ctx.Err() - } - } - - use := time.Since(start) - stats.totalBar.Abort(true) - pg.Wait() - - log.Infof(Tr("hint.sync.done"), use, utils.BytesToUnit((float64)(stats.totalSize)/use.Seconds())) - return nil -} - -func (cr *Cluster) Gc() { - for _, s := range cr.storages { - cr.gcFor(s) - } -} - -func (cr *Cluster) gcFor(s storage.Storage) { - log.Infof(Tr("info.gc.start"), s.String()) - err := s.WalkDir(func(hash string, _ int64) error { - if cr.issync.Load() { - return context.Canceled - } - if _, ok := cr.CachedFileSize(hash); !ok { - log.Infof(Tr("info.gc.found"), s.String()+"/"+hash) - s.Remove(hash) - } - return nil - }) - if err != nil { - if err == context.Canceled { - log.Warnf(Tr("warn.gc.interrupted"), s.String()) - } else { - log.Errorf(Tr("error.gc.error"), err) - } - return - } - log.Infof(Tr("info.gc.done"), s.String()) -} - -func (cr *Cluster) fetchFile(ctx context.Context, stats *syncStats, f FileInfo) (<-chan string, error) { - const ( - maxRetryCount = 5 - maxTryWithOpen = 3 - ) - - slotId, buf, free := stats.slots.Alloc(ctx) - if buf == nil { - return nil, ctx.Err() - } - - pathRes := make(chan string, 1) - go func() { - defer log.RecordPanic() - defer free() - defer close(pathRes) - - var barUnit decor.SizeB1024 - var trycount atomic.Int32 - trycount.Store(1) - bar := stats.pg.AddBar(f.Size, - mpb.BarRemoveOnComplete(), - mpb.BarPriority(slotId), - mpb.PrependDecorators( - decor.Name(Tr("hint.sync.downloading")), - decor.Any(func(decor.Statistics) string { - tc := trycount.Load() - if tc <= 1 { - return "" - } - return fmt.Sprintf("(%d/%d) ", tc, maxRetryCount) - }), - decor.Name(f.Path, decor.WCSyncSpaceR), - ), - mpb.AppendDecorators( - decor.NewPercentage("%d", decor.WCSyncSpace), - decor.Counters(barUnit, "[%.1f / %.1f]", decor.WCSyncSpace), - decor.EwmaSpeed(barUnit, "%.1f", 10, decor.WCSyncSpace), - decor.OnComplete( - decor.EwmaETA(decor.ET_STYLE_GO, 10, decor.WCSyncSpace), "done", - ), - ), - ) - defer bar.Abort(true) - - noOpen := stats.noOpen - interval := time.Second - for { - bar.SetCurrent(0) - hashMethod, err := getHashMethod(len(f.Hash)) - if err == nil { - var path string - if path, err = cr.fetchFileWithBuf(ctx, f, hashMethod, buf, noOpen, func(r io.Reader) io.Reader { - return ProxyReader(r, bar, stats.totalBar, &stats.lastInc) - }); err == nil { - pathRes <- path - stats.okCount.Add(1) - log.Infof(Tr("info.sync.downloaded"), f.Path, - utils.BytesToUnit((float64)(f.Size)), - (float64)(stats.totalBar.Current())/(float64)(stats.totalSize)*100) - return - } - } - bar.SetRefill(bar.Current()) - - log.Errorf(Tr("error.sync.download.failed"), f.Path, err) - c := trycount.Add(1) - if c > maxRetryCount { - break - } - if c > maxTryWithOpen { - noOpen = true - } - select { - case <-time.After(interval): - interval *= 2 - case <-ctx.Done(): - return - } - } - stats.failCount.Add(1) - }() - return pathRes, nil -} - -func (cr *Cluster) fetchFileWithBuf( - ctx context.Context, f FileInfo, - hashMethod crypto.Hash, buf []byte, - noOpen bool, - wrapper func(io.Reader) io.Reader, -) (path string, err error) { - var ( - reqPath = f.Path - req *http.Request - res *http.Response - fd *os.File - r io.Reader - ) - if noOpen { - reqPath = "/openbmclapi/download/" + f.Hash - } - if req, err = cr.makeReqWithAuth(ctx, http.MethodGet, reqPath, nil); err != nil { - return - } - req.Header.Set("Accept-Encoding", "gzip, deflate") - if res, err = cr.client.Do(req); err != nil { - return - } - defer res.Body.Close() - if err = ctx.Err(); err != nil { - return - } - if res.StatusCode != http.StatusOK { - err = ErrorFromRedirect(utils.NewHTTPStatusErrorFromResponse(res), res) - return - } - switch ce := strings.ToLower(res.Header.Get("Content-Encoding")); ce { - case "": - r = res.Body - case "gzip": - if r, err = gzip.NewReader(res.Body); err != nil { - err = ErrorFromRedirect(err, res) - return - } - case "deflate": - if r, err = zlib.NewReader(res.Body); err != nil { - err = ErrorFromRedirect(err, res) - return - } - default: - err = ErrorFromRedirect(fmt.Errorf("Unexpected Content-Encoding %q", ce), res) - return - } - if wrapper != nil { - r = wrapper(r) - } - - hw := hashMethod.New() - - if fd, err = os.CreateTemp("", "*.downloading"); err != nil { - return - } - path = fd.Name() - defer func(path string) { - if err != nil { - os.Remove(path) - } - }(path) - - _, err = io.CopyBuffer(io.MultiWriter(hw, fd), r, buf) - stat, err2 := fd.Stat() - fd.Close() - if err != nil { - err = ErrorFromRedirect(err, res) - return - } - if err2 != nil { - err = err2 - return - } - if t := stat.Size(); f.Size >= 0 && t != f.Size { - err = ErrorFromRedirect(fmt.Errorf("File size wrong, got %d, expect %d", t, f.Size), res) - return - } else if hs := hex.EncodeToString(hw.Sum(buf[:0])); hs != f.Hash { - err = ErrorFromRedirect(fmt.Errorf("File hash not match, got %s, expect %s", hs, f.Hash), res) - return - } - return -} - -type downloadingItem struct { - err error - done chan struct{} -} - -func (cr *Cluster) lockDownloading(target string) (*downloadingItem, bool) { - cr.downloadMux.RLock() - item := cr.downloading[target] - cr.downloadMux.RUnlock() - if item != nil { - return item, true - } - - cr.downloadMux.Lock() - defer cr.downloadMux.Unlock() - - if item = cr.downloading[target]; item != nil { - return item, true - } - item = &downloadingItem{ - done: make(chan struct{}, 0), - } - cr.downloading[target] = item - return item, false -} - -func (cr *Cluster) DownloadFile(ctx context.Context, hash string) (err error) { - hashMethod, err := getHashMethod(len(hash)) - if err != nil { - return - } - - f := FileInfo{ - Path: "/openbmclapi/download/" + hash, - Hash: hash, - Size: -1, - Mtime: 0, - } - item, ok := cr.lockDownloading(hash) - if !ok { - go func() { - defer log.RecoverPanic(nil) - var err error - defer func() { - if err != nil { - log.Errorf(Tr("error.sync.download.failed"), hash, err) - } - item.err = err - close(item.done) - - cr.downloadMux.Lock() - defer cr.downloadMux.Unlock() - delete(cr.downloading, hash) - }() - - log.Infof(Tr("hint.sync.downloading.handler"), hash) - - ctx, cancel := context.WithCancel(context.Background()) - go func() { - if cr.enabled.Load() { - select { - case <-cr.Disabled(): - cancel() - case <-ctx.Done(): - } - } else { - select { - case <-cr.WaitForEnable(): - cancel() - case <-ctx.Done(): - } - } - }() - defer cancel() - - var buf []byte - _, buf, free := cr.allocBuf(ctx) - if buf == nil { - err = ctx.Err() - return - } - defer free() - - path, err := cr.fetchFileWithBuf(ctx, f, hashMethod, buf, true, nil) - if err != nil { - return - } - defer os.Remove(path) - var srcFd *os.File - if srcFd, err = os.Open(path); err != nil { - return - } - defer srcFd.Close() - var stat os.FileInfo - if stat, err = srcFd.Stat(); err != nil { - return - } - size := stat.Size() - - for _, target := range cr.storages { - if _, err = srcFd.Seek(0, io.SeekStart); err != nil { - log.Errorf("Cannot seek file %q: %v", path, err) - return - } - if err := target.Create(hash, srcFd); err != nil { - log.Errorf(Tr("error.sync.create.failed"), target.String(), hash, err) - continue - } - } - - cr.filesetMux.Lock() - cr.fileset[hash] = -size // negative means that the file was not stored into the database yet - cr.filesetMux.Unlock() - }() - } - select { - case <-item.done: - err = item.err - case <-ctx.Done(): - err = ctx.Err() - case <-cr.Disabled(): - err = context.Canceled - } - return -} - -func (cr *Cluster) checkUpdate() (err error) { - if update.CurrentBuildTag == nil { - return - } - log.Info(Tr("info.update.checking")) - release, err := update.Check(cr.cachedCli, config.GithubAPI.Authorization) - if err != nil || release == nil { - return - } - // TODO: print all middle change logs - log.Infof(Tr("info.update.detected"), release.Tag, update.CurrentBuildTag) - log.Infof(Tr("info.update.changelog"), update.CurrentBuildTag, release.Tag, release.Body) - cr.notifyManager.OnUpdateAvaliable(release) - return -} diff --git a/main.go b/main.go index ccb6977b..fc57dce9 100644 --- a/main.go +++ b/main.go @@ -445,7 +445,11 @@ func (r *Runner) InitSynchronizer(ctx context.Context) { log.Info(Tr("info.filelist.fetching")) fl, err := r.cluster.GetFileList(ctx, lastMod) if err != nil { - log.Errorf(Tr("error.cannot.fetch.filelist"), err) + log.Errorf(Tr("error.filelist.fetch.failed"), err) + return + } + if len(fl) == 0 { + log.Infof("No file was updated since %s", time.UnixMilli(lastMod).Format(time.DateTime)) return } for _, f := range fl { diff --git a/sync.go b/sync.go new file mode 100644 index 00000000..e3629029 --- /dev/null +++ b/sync.go @@ -0,0 +1,962 @@ +/** + * OpenBmclAPI (Golang Edition) + * Copyright (C) 2024 Kevin Z + * All rights reserved + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package main + +import ( + "compress/gzip" + "compress/zlib" + "context" + "crypto" + "encoding/hex" + "encoding/json" + "fmt" + "io" + "net/http" + "net/url" + "os" + "path" + "runtime" + "sort" + "strconv" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/hamba/avro/v2" + "github.com/klauspost/compress/zstd" + "github.com/vbauerster/mpb/v8" + "github.com/vbauerster/mpb/v8/decor" + + "github.com/LiterMC/go-openbmclapi/internal/build" + "github.com/LiterMC/go-openbmclapi/limited" + "github.com/LiterMC/go-openbmclapi/log" + "github.com/LiterMC/go-openbmclapi/storage" + "github.com/LiterMC/go-openbmclapi/update" + "github.com/LiterMC/go-openbmclapi/utils" +) + +func (cr *Cluster) CloneFileset() map[string]int64 { + cr.filesetMux.RLock() + defer cr.filesetMux.RUnlock() + fileset := make(map[string]int64, len(cr.fileset)) + for k, v := range cr.fileset { + fileset[k] = v + } + return fileset +} + +func (cr *Cluster) CachedFileSize(hash string) (size int64, ok bool) { + cr.filesetMux.RLock() + defer cr.filesetMux.RUnlock() + if size, ok = cr.fileset[hash]; !ok { + return + } + if size < 0 { + size = -size + } + return +} + +type CertKeyPair struct { + Cert string `json:"cert"` + Key string `json:"key"` +} + +func (cr *Cluster) RequestCert(ctx context.Context) (ckp *CertKeyPair, err error) { + resCh, err := cr.socket.EmitWithAck("request-cert") + if err != nil { + return + } + var data []any + select { + case <-ctx.Done(): + return nil, ctx.Err() + case data = <-resCh: + } + if ero := data[0]; ero != nil { + err = fmt.Errorf("socket.io remote error: %v", ero) + return + } + pair := data[1].(map[string]any) + ckp = &CertKeyPair{ + Cert: pair["cert"].(string), + Key: pair["key"].(string), + } + return +} + +func (cr *Cluster) makeReq(ctx context.Context, method string, relpath string, query url.Values) (req *http.Request, err error) { + return cr.makeReqWithBody(ctx, method, relpath, query, nil) +} + +func (cr *Cluster) makeReqWithBody( + ctx context.Context, + method string, relpath string, + query url.Values, body io.Reader, +) (req *http.Request, err error) { + var u *url.URL + if u, err = url.Parse(cr.prefix); err != nil { + return + } + u.Path = path.Join(u.Path, relpath) + if query != nil { + u.RawQuery = query.Encode() + } + target := u.String() + + req, err = http.NewRequestWithContext(ctx, method, target, body) + if err != nil { + return + } + req.Header.Set("User-Agent", build.ClusterUserAgent) + return +} + +func (cr *Cluster) makeReqWithAuth(ctx context.Context, method string, relpath string, query url.Values) (req *http.Request, err error) { + req, err = cr.makeReq(ctx, method, relpath, query) + if err != nil { + return + } + token, err := cr.GetAuthToken(ctx) + if err != nil { + return + } + req.Header.Set("Authorization", "Bearer "+token) + return +} + +type FileInfo struct { + Path string `json:"path" avro:"path"` + Hash string `json:"hash" avro:"hash"` + Size int64 `json:"size" avro:"size"` + Mtime int64 `json:"mtime" avro:"mtime"` +} + +// from +var fileListSchema = avro.MustParse(`{ + "type": "array", + "items": { + "type": "record", + "name": "fileinfo", + "fields": [ + {"name": "path", "type": "string"}, + {"name": "hash", "type": "string"}, + {"name": "size", "type": "long"}, + {"name": "mtime", "type": "long"} + ] + } +}`) + +func (cr *Cluster) GetFileList(ctx context.Context, lastMod int64) (files []FileInfo, err error) { + var query url.Values + if lastMod > 0 { + query = url.Values{ + "lastModified": {strconv.FormatInt(lastMod, 10)}, + } + } + req, err := cr.makeReqWithAuth(ctx, http.MethodGet, "/openbmclapi/files", query) + if err != nil { + return + } + res, err := cr.cachedCli.Do(req) + if err != nil { + return + } + defer res.Body.Close() + switch res.StatusCode { + case http.StatusOK: + // + case http.StatusNotModified: + return + default: + err = utils.NewHTTPStatusErrorFromResponse(res) + return + } + log.Debug("Parsing filelist body ...") + zr, err := zstd.NewReader(res.Body) + if err != nil { + return + } + defer zr.Close() + if err = avro.NewDecoderForSchema(fileListSchema, zr).Decode(&files); err != nil { + return + } + log.Debugf("Filelist parsed, length = %d", len(files)) + return +} + +func (cr *Cluster) GetConfig(ctx context.Context) (cfg *OpenbmclapiAgentConfig, err error) { + req, err := cr.makeReqWithAuth(ctx, http.MethodGet, "/openbmclapi/configuration", nil) + if err != nil { + return + } + res, err := cr.cachedCli.Do(req) + if err != nil { + return + } + defer res.Body.Close() + if res.StatusCode != http.StatusOK { + err = utils.NewHTTPStatusErrorFromResponse(res) + return + } + cfg = new(OpenbmclapiAgentConfig) + if err = json.NewDecoder(res.Body).Decode(cfg); err != nil { + cfg = nil + return + } + return +} + +type syncStats struct { + slots *limited.BufSlots + noOpen bool + + totalSize int64 + okCount, failCount atomic.Int32 + totalFiles int + + pg *mpb.Progress + totalBar *mpb.Bar + lastInc atomic.Int64 +} + +func (cr *Cluster) SyncFiles(ctx context.Context, files []FileInfo, heavyCheck bool) bool { + log.Infof(Tr("info.sync.prepare"), len(files)) + if !cr.issync.CompareAndSwap(false, true) { + log.Warn("Another sync task is running!") + return false + } + defer cr.issync.Store(false) + + sort.Slice(files, func(i, j int) bool { return files[i].Hash < files[j].Hash }) + if cr.syncFiles(ctx, files, heavyCheck) != nil { + return false + } + + cr.filesetMux.Lock() + for _, f := range files { + cr.fileset[f.Hash] = f.Size + } + cr.filesetMux.Unlock() + + return true +} + +type fileInfoWithTargets struct { + FileInfo + tgMux sync.Mutex + targets []storage.Storage +} + +func (cr *Cluster) checkFileFor( + ctx context.Context, + sto storage.Storage, files []FileInfo, + heavy bool, + missing *utils.SyncMap[string, *fileInfoWithTargets], + pg *mpb.Progress, +) { + var missingCount atomic.Int32 + addMissing := func(f FileInfo) { + missingCount.Add(1) + if info, has := missing.GetOrSet(f.Hash, func() *fileInfoWithTargets { + return &fileInfoWithTargets{ + FileInfo: f, + targets: []storage.Storage{sto}, + } + }); has { + info.tgMux.Lock() + info.targets = append(info.targets, sto) + info.tgMux.Unlock() + } + } + + log.Infof(Tr("info.check.start"), sto.String(), heavy) + + var ( + checkingHashMux sync.Mutex + checkingHash string + lastCheckingHash string + slots *limited.BufSlots + ) + + if heavy { + slots = limited.NewBufSlots(runtime.GOMAXPROCS(0) * 2) + } + + bar := pg.AddBar(0, + mpb.BarRemoveOnComplete(), + mpb.PrependDecorators( + decor.Name(Tr("hint.check.checking")), + decor.Name(sto.String()), + decor.OnCondition( + decor.Any(func(decor.Statistics) string { + c, l := slots.Cap(), slots.Len() + return fmt.Sprintf(" (%d / %d)", c-l, c) + }), + heavy, + ), + ), + mpb.AppendDecorators( + decor.CountersNoUnit("%d / %d", decor.WCSyncSpaceR), + decor.NewPercentage("%d", decor.WCSyncSpaceR), + decor.EwmaETA(decor.ET_STYLE_GO, 30), + ), + mpb.BarExtender((mpb.BarFillerFunc)(func(w io.Writer, _ decor.Statistics) (err error) { + if checkingHashMux.TryLock() { + lastCheckingHash = checkingHash + checkingHashMux.Unlock() + } + if lastCheckingHash != "" { + _, err = fmt.Fprintln(w, "\t", lastCheckingHash) + } + return + }), false), + ) + defer bar.Wait() + defer bar.Abort(true) + + bar.SetTotal(0x100, false) + + sizeMap := make(map[string]int64, len(files)) + { + start := time.Now() + var checkedMp [256]bool + sto.WalkDir(func(hash string, size int64) error { + if n := utils.HexTo256(hash); !checkedMp[n] { + checkedMp[n] = true + now := time.Now() + bar.EwmaIncrement(now.Sub(start)) + start = now + } + sizeMap[hash] = size + return nil + }) + } + + bar.SetCurrent(0) + bar.SetTotal((int64)(len(files)), false) + for _, f := range files { + if ctx.Err() != nil { + return + } + start := time.Now() + hash := f.Hash + if checkingHashMux.TryLock() { + checkingHash = hash + checkingHashMux.Unlock() + } + name := sto.String() + "/" + hash + if f.Size == 0 { + log.Debugf("Skipped empty file %s", name) + } else if size, ok := sizeMap[hash]; ok { + if size != f.Size { + log.Warnf(Tr("warn.check.modified.size"), name, size, f.Size) + addMissing(f) + } else if heavy { + hashMethod, err := getHashMethod(len(hash)) + if err != nil { + log.Errorf(Tr("error.check.unknown.hash.method"), hash) + } else { + _, buf, free := slots.Alloc(ctx) + if buf == nil { + return + } + go func(f FileInfo, buf []byte, free func()) { + defer log.RecoverPanic(nil) + defer free() + miss := true + r, err := sto.Open(hash) + if err != nil { + log.Errorf(Tr("error.check.open.failed"), name, err) + } else { + hw := hashMethod.New() + _, err = io.CopyBuffer(hw, r, buf[:]) + r.Close() + if err != nil { + log.Errorf(Tr("error.check.hash.failed"), name, err) + } else if hs := hex.EncodeToString(hw.Sum(buf[:0])); hs != hash { + log.Warnf(Tr("warn.check.modified.hash"), name, hs, hash) + } else { + miss = false + } + } + if miss { + addMissing(f) + } + bar.EwmaIncrement(time.Since(start)) + }(f, buf, free) + continue + } + } + } else { + // log.Debugf("Could not found file %q", name) + addMissing(f) + } + bar.EwmaIncrement(time.Since(start)) + } + + checkingHashMux.Lock() + checkingHash = "" + checkingHashMux.Unlock() + + bar.SetTotal(-1, true) + log.Infof(Tr("info.check.done"), sto.String(), missingCount.Load()) + return +} + +func (cr *Cluster) CheckFiles( + ctx context.Context, + files []FileInfo, + heavyCheck bool, + pg *mpb.Progress, +) (map[string]*fileInfoWithTargets, error) { + missingMap := utils.NewSyncMap[string, *fileInfoWithTargets]() + done := make(chan struct{}, 0) + + for _, s := range cr.storages { + go func(s storage.Storage) { + defer log.RecordPanic() + defer func() { + select { + case done <- struct{}{}: + case <-ctx.Done(): + } + }() + cr.checkFileFor(ctx, s, files, heavyCheck, missingMap, pg) + }(s) + } + for i := len(cr.storages); i > 0; i-- { + select { + case <-done: + case <-ctx.Done(): + log.Warn(Tr("warn.sync.interrupted")) + return nil, ctx.Err() + } + } + return missingMap.RawMap(), nil +} + +func (cr *Cluster) SetFilesetByExists(ctx context.Context, files []FileInfo) error { + pg := mpb.New(mpb.WithRefreshRate(time.Second/2), mpb.WithAutoRefresh(), mpb.WithWidth(140)) + defer pg.Shutdown() + log.SetLogOutput(pg) + defer log.SetLogOutput(nil) + + missingMap, err := cr.CheckFiles(ctx, files, false, pg) + if err != nil { + return err + } + fileset := make(map[string]int64, len(files)) + stoCount := len(cr.storages) + for _, f := range files { + if t, ok := missingMap[f.Hash]; !ok || len(t.targets) < stoCount { + fileset[f.Hash] = f.Size + } + } + + cr.mux.Lock() + cr.fileset = fileset + cr.mux.Unlock() + return nil +} + +func (cr *Cluster) syncFiles(ctx context.Context, files []FileInfo, heavyCheck bool) error { + pg := mpb.New(mpb.WithRefreshRate(time.Second/2), mpb.WithAutoRefresh(), mpb.WithWidth(140)) + defer pg.Shutdown() + log.SetLogOutput(pg) + defer log.SetLogOutput(nil) + + cr.syncProg.Store(0) + cr.syncTotal.Store(-1) + + missingMap, err := cr.CheckFiles(ctx, files, heavyCheck, pg) + if err != nil { + return err + } + var ( + missing = make([]*fileInfoWithTargets, 0, len(missingMap)) + missingSize int64 = 0 + ) + for _, f := range missingMap { + missing = append(missing, f) + missingSize += f.Size + } + totalFiles := len(missing) + if totalFiles == 0 { + log.Info(Tr("info.sync.none")) + return nil + } + + go cr.notifyManager.OnSyncBegin(len(missing), missingSize) + defer func() { + go cr.notifyManager.OnSyncDone() + }() + + cr.syncTotal.Store((int64)(totalFiles)) + + ccfg, err := cr.GetConfig(ctx) + if err != nil { + return err + } + syncCfg := ccfg.Sync + log.Infof(Tr("info.sync.config"), syncCfg) + + done := make(chan struct{}, 0) + + var stats syncStats + stats.pg = pg + stats.noOpen = syncCfg.Source == "center" + stats.slots = limited.NewBufSlots(syncCfg.Concurrency) + stats.totalFiles = totalFiles + for _, f := range missing { + stats.totalSize += f.Size + } + + var barUnit decor.SizeB1024 + stats.lastInc.Store(time.Now().UnixNano()) + stats.totalBar = pg.AddBar(stats.totalSize, + mpb.BarRemoveOnComplete(), + mpb.BarPriority(stats.slots.Cap()), + mpb.PrependDecorators( + decor.Name(Tr("hint.sync.total")), + decor.NewPercentage("%.2f"), + ), + mpb.AppendDecorators( + decor.Any(func(decor.Statistics) string { + return fmt.Sprintf("(%d + %d / %d) ", stats.okCount.Load(), stats.failCount.Load(), stats.totalFiles) + }), + decor.Counters(barUnit, "(%.1f/%.1f) "), + decor.EwmaSpeed(barUnit, "%.1f ", 30), + decor.OnComplete( + decor.EwmaETA(decor.ET_STYLE_GO, 30), "done", + ), + ), + ) + + log.Infof(Tr("hint.sync.start"), totalFiles, utils.BytesToUnit((float64)(stats.totalSize))) + start := time.Now() + + for _, f := range missing { + log.Debugf("File %s is for %v", f.Hash, f.targets) + pathRes, err := cr.fetchFile(ctx, &stats, f.FileInfo) + if err != nil { + log.Warn(Tr("warn.sync.interrupted")) + return err + } + go func(f *fileInfoWithTargets, pathRes <-chan string) { + defer log.RecordPanic() + defer func() { + select { + case done <- struct{}{}: + case <-ctx.Done(): + } + }() + select { + case path := <-pathRes: + cr.syncProg.Add(1) + if path != "" { + defer os.Remove(path) + // acquire slot here + slotId, buf, free := stats.slots.Alloc(ctx) + if buf == nil { + return + } + defer free() + _ = slotId + var srcFd *os.File + if srcFd, err = os.Open(path); err != nil { + return + } + defer srcFd.Close() + for _, target := range f.targets { + if _, err = srcFd.Seek(0, io.SeekStart); err != nil { + log.Errorf("Cannot seek file %q to start: %v", path, err) + continue + } + err := target.Create(f.Hash, srcFd) + if err != nil { + log.Errorf(Tr("error.sync.create.failed"), target.String(), f.Hash, err) + continue + } + } + } + case <-ctx.Done(): + return + } + }(f, pathRes) + } + for i := len(missing); i > 0; i-- { + select { + case <-done: + case <-ctx.Done(): + log.Warn(Tr("warn.sync.interrupted")) + return ctx.Err() + } + } + + use := time.Since(start) + stats.totalBar.Abort(true) + pg.Wait() + + log.Infof(Tr("hint.sync.done"), use, utils.BytesToUnit((float64)(stats.totalSize)/use.Seconds())) + return nil +} + +func (cr *Cluster) Gc() { + for _, s := range cr.storages { + cr.gcFor(s) + } +} + +func (cr *Cluster) gcFor(s storage.Storage) { + log.Infof(Tr("info.gc.start"), s.String()) + err := s.WalkDir(func(hash string, _ int64) error { + if cr.issync.Load() { + return context.Canceled + } + if _, ok := cr.CachedFileSize(hash); !ok { + log.Infof(Tr("info.gc.found"), s.String()+"/"+hash) + s.Remove(hash) + } + return nil + }) + if err != nil { + if err == context.Canceled { + log.Warnf(Tr("warn.gc.interrupted"), s.String()) + } else { + log.Errorf(Tr("error.gc.error"), err) + } + return + } + log.Infof(Tr("info.gc.done"), s.String()) +} + +func (cr *Cluster) fetchFile(ctx context.Context, stats *syncStats, f FileInfo) (<-chan string, error) { + const ( + maxRetryCount = 5 + maxTryWithOpen = 3 + ) + + slotId, buf, free := stats.slots.Alloc(ctx) + if buf == nil { + return nil, ctx.Err() + } + + pathRes := make(chan string, 1) + go func() { + defer log.RecordPanic() + defer free() + defer close(pathRes) + + var barUnit decor.SizeB1024 + var trycount atomic.Int32 + trycount.Store(1) + bar := stats.pg.AddBar(f.Size, + mpb.BarRemoveOnComplete(), + mpb.BarPriority(slotId), + mpb.PrependDecorators( + decor.Name(Tr("hint.sync.downloading")), + decor.Any(func(decor.Statistics) string { + tc := trycount.Load() + if tc <= 1 { + return "" + } + return fmt.Sprintf("(%d/%d) ", tc, maxRetryCount) + }), + decor.Name(f.Path, decor.WCSyncSpaceR), + ), + mpb.AppendDecorators( + decor.NewPercentage("%d", decor.WCSyncSpace), + decor.Counters(barUnit, "[%.1f / %.1f]", decor.WCSyncSpace), + decor.EwmaSpeed(barUnit, "%.1f", 10, decor.WCSyncSpace), + decor.OnComplete( + decor.EwmaETA(decor.ET_STYLE_GO, 10, decor.WCSyncSpace), "done", + ), + ), + ) + defer bar.Abort(true) + + noOpen := stats.noOpen + interval := time.Second + for { + bar.SetCurrent(0) + hashMethod, err := getHashMethod(len(f.Hash)) + if err == nil { + var path string + if path, err = cr.fetchFileWithBuf(ctx, f, hashMethod, buf, noOpen, func(r io.Reader) io.Reader { + return ProxyReader(r, bar, stats.totalBar, &stats.lastInc) + }); err == nil { + pathRes <- path + stats.okCount.Add(1) + log.Infof(Tr("info.sync.downloaded"), f.Path, + utils.BytesToUnit((float64)(f.Size)), + (float64)(stats.totalBar.Current())/(float64)(stats.totalSize)*100) + return + } + } + bar.SetRefill(bar.Current()) + + log.Errorf(Tr("error.sync.download.failed"), f.Path, err) + c := trycount.Add(1) + if c > maxRetryCount { + break + } + if c > maxTryWithOpen { + noOpen = true + } + select { + case <-time.After(interval): + interval *= 2 + case <-ctx.Done(): + return + } + } + stats.failCount.Add(1) + }() + return pathRes, nil +} + +func (cr *Cluster) fetchFileWithBuf( + ctx context.Context, f FileInfo, + hashMethod crypto.Hash, buf []byte, + noOpen bool, + wrapper func(io.Reader) io.Reader, +) (path string, err error) { + var ( + reqPath = f.Path + req *http.Request + res *http.Response + fd *os.File + r io.Reader + ) + if noOpen { + reqPath = "/openbmclapi/download/" + f.Hash + } + if req, err = cr.makeReqWithAuth(ctx, http.MethodGet, reqPath, nil); err != nil { + return + } + req.Header.Set("Accept-Encoding", "gzip, deflate") + if res, err = cr.client.Do(req); err != nil { + return + } + defer res.Body.Close() + if err = ctx.Err(); err != nil { + return + } + if res.StatusCode != http.StatusOK { + err = ErrorFromRedirect(utils.NewHTTPStatusErrorFromResponse(res), res) + return + } + switch ce := strings.ToLower(res.Header.Get("Content-Encoding")); ce { + case "": + r = res.Body + case "gzip": + if r, err = gzip.NewReader(res.Body); err != nil { + err = ErrorFromRedirect(err, res) + return + } + case "deflate": + if r, err = zlib.NewReader(res.Body); err != nil { + err = ErrorFromRedirect(err, res) + return + } + default: + err = ErrorFromRedirect(fmt.Errorf("Unexpected Content-Encoding %q", ce), res) + return + } + if wrapper != nil { + r = wrapper(r) + } + + hw := hashMethod.New() + + if fd, err = os.CreateTemp("", "*.downloading"); err != nil { + return + } + path = fd.Name() + defer func(path string) { + if err != nil { + os.Remove(path) + } + }(path) + + _, err = io.CopyBuffer(io.MultiWriter(hw, fd), r, buf) + stat, err2 := fd.Stat() + fd.Close() + if err != nil { + err = ErrorFromRedirect(err, res) + return + } + if err2 != nil { + err = err2 + return + } + if t := stat.Size(); f.Size >= 0 && t != f.Size { + err = ErrorFromRedirect(fmt.Errorf("File size wrong, got %d, expect %d", t, f.Size), res) + return + } else if hs := hex.EncodeToString(hw.Sum(buf[:0])); hs != f.Hash { + err = ErrorFromRedirect(fmt.Errorf("File hash not match, got %s, expect %s", hs, f.Hash), res) + return + } + return +} + +type downloadingItem struct { + err error + done chan struct{} +} + +func (cr *Cluster) lockDownloading(target string) (*downloadingItem, bool) { + cr.downloadMux.RLock() + item := cr.downloading[target] + cr.downloadMux.RUnlock() + if item != nil { + return item, true + } + + cr.downloadMux.Lock() + defer cr.downloadMux.Unlock() + + if item = cr.downloading[target]; item != nil { + return item, true + } + item = &downloadingItem{ + done: make(chan struct{}, 0), + } + cr.downloading[target] = item + return item, false +} + +func (cr *Cluster) DownloadFile(ctx context.Context, hash string) (err error) { + hashMethod, err := getHashMethod(len(hash)) + if err != nil { + return + } + + f := FileInfo{ + Path: "/openbmclapi/download/" + hash, + Hash: hash, + Size: -1, + Mtime: 0, + } + item, ok := cr.lockDownloading(hash) + if !ok { + go func() { + defer log.RecoverPanic(nil) + var err error + defer func() { + if err != nil { + log.Errorf(Tr("error.sync.download.failed"), hash, err) + } + item.err = err + close(item.done) + + cr.downloadMux.Lock() + defer cr.downloadMux.Unlock() + delete(cr.downloading, hash) + }() + + log.Infof(Tr("hint.sync.downloading.handler"), hash) + + ctx, cancel := context.WithCancel(context.Background()) + go func() { + if cr.enabled.Load() { + select { + case <-cr.Disabled(): + cancel() + case <-ctx.Done(): + } + } else { + select { + case <-cr.WaitForEnable(): + cancel() + case <-ctx.Done(): + } + } + }() + defer cancel() + + var buf []byte + _, buf, free := cr.allocBuf(ctx) + if buf == nil { + err = ctx.Err() + return + } + defer free() + + path, err := cr.fetchFileWithBuf(ctx, f, hashMethod, buf, true, nil) + if err != nil { + return + } + defer os.Remove(path) + var srcFd *os.File + if srcFd, err = os.Open(path); err != nil { + return + } + defer srcFd.Close() + var stat os.FileInfo + if stat, err = srcFd.Stat(); err != nil { + return + } + size := stat.Size() + + for _, target := range cr.storages { + if _, err = srcFd.Seek(0, io.SeekStart); err != nil { + log.Errorf("Cannot seek file %q: %v", path, err) + return + } + if err := target.Create(hash, srcFd); err != nil { + log.Errorf(Tr("error.sync.create.failed"), target.String(), hash, err) + continue + } + } + + cr.filesetMux.Lock() + cr.fileset[hash] = -size // negative means that the file was not stored into the database yet + cr.filesetMux.Unlock() + }() + } + select { + case <-item.done: + err = item.err + case <-ctx.Done(): + err = ctx.Err() + case <-cr.Disabled(): + err = context.Canceled + } + return +} + +func (cr *Cluster) checkUpdate() (err error) { + if update.CurrentBuildTag == nil { + return + } + log.Info(Tr("info.update.checking")) + release, err := update.Check(cr.cachedCli, config.GithubAPI.Authorization) + if err != nil || release == nil { + return + } + // TODO: print all middle change logs + log.Infof(Tr("info.update.detected"), release.Tag, update.CurrentBuildTag) + log.Infof(Tr("info.update.changelog"), update.CurrentBuildTag, release.Tag, release.Body) + cr.notifyManager.OnUpdateAvaliable(release) + return +}