Skip to content

Commit

Permalink
v1.10.0
Browse files Browse the repository at this point in the history
  • Loading branch information
zyxkad committed Mar 27, 2024
1 parent fc2a2eb commit ca7e839
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 22 deletions.
43 changes: 31 additions & 12 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"regexp"
"runtime"
"sort"
"strconv"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -686,6 +687,16 @@ func (cr *Cluster) Disabled() <-chan struct{} {
return cr.disabled
}

func (cr *Cluster) CloneFileset() map[string]int64 {
cr.filesetMux.RLock()
defer cr.filesetMux.RUnlock()
fileset := make(map[string]int64, len(cr.fileset))
for k, v := range cr.fileset {
fileset[k] = v
}
return fileset
}

func (cr *Cluster) CachedFileSize(hash string) (size int64, ok bool) {
cr.filesetMux.RLock()
defer cr.filesetMux.RUnlock()
Expand Down Expand Up @@ -767,12 +778,13 @@ func (cr *Cluster) makeReqWithAuth(ctx context.Context, method string, relpath s
}

type FileInfo struct {
Path string `json:"path" avro:"path"`
Hash string `json:"hash" avro:"hash"`
Size int64 `json:"size" avro:"size"`
Path string `json:"path" avro:"path"`
Hash string `json:"hash" avro:"hash"`
Size int64 `json:"size" avro:"size"`
Mtime int64 `json:"mtime" avro:"mtime"`
}

// from <https://github.com/bangbang93/openbmclapi/blob/master/src/cluster.ts>
// from <https://github.com/bangbang93/openbmclapi/blob/master/src/constants.ts>
var fileListSchema = avro.MustParse(`{
"type": "array",
"items": {
Expand All @@ -781,13 +793,20 @@ var fileListSchema = avro.MustParse(`{
"fields": [
{"name": "path", "type": "string"},
{"name": "hash", "type": "string"},
{"name": "size", "type": "long"}
{"name": "size", "type": "long"},
{"name": "mtime", "type": "long"}
]
}
}`)

func (cr *Cluster) GetFileList(ctx context.Context) (files []FileInfo, err error) {
req, err := cr.makeReqWithAuth(ctx, http.MethodGet, "/openbmclapi/files", nil)
func (cr *Cluster) GetFileList(ctx context.Context, lastMod int64) (files []FileInfo, err error) {
var query url.Values
if lastMod > 0 {
query = url.Values{
"lastModified": {strconv.FormatInt(lastMod, 10)},
}
}
req, err := cr.makeReqWithAuth(ctx, http.MethodGet, "/openbmclapi/files", query)
if err != nil {
return
}
Expand All @@ -809,6 +828,7 @@ func (cr *Cluster) GetFileList(ctx context.Context) (files []FileInfo, err error
if err = avro.NewDecoderForSchema(fileListSchema, zr).Decode(&files); err != nil {
return
}
log.Debugf("Filelist parsed, length = %d", len(files))
return
}

Expand Down Expand Up @@ -860,12 +880,10 @@ func (cr *Cluster) SyncFiles(ctx context.Context, files []FileInfo, heavyCheck b
return false
}

fileset := make(map[string]int64, len(files))
cr.filesetMux.Lock()
for _, f := range files {
fileset[f.Hash] = f.Size
cr.fileset[f.Hash] = f.Size
}
cr.filesetMux.Lock()
cr.fileset = fileset
cr.filesetMux.Unlock()

return true
Expand Down Expand Up @@ -1466,6 +1484,7 @@ func (cr *Cluster) DownloadFile(ctx context.Context, hash string) (err error) {
Path: "/openbmclapi/download/" + hash,
Hash: hash,
Size: -1,
Mtime: 0,
}
item, ok := cr.lockDownloading(hash)
if !ok {
Expand Down Expand Up @@ -1540,7 +1559,7 @@ func (cr *Cluster) DownloadFile(ctx context.Context, hash string) (err error) {
}

cr.filesetMux.Lock()
cr.fileset[hash] = -size // negative means that the file was not stored into the databse yet
cr.fileset[hash] = -size // negative means that the file was not stored into the database yet
cr.filesetMux.Unlock()
}()
}
Expand Down
2 changes: 1 addition & 1 deletion handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ func (cr *Cluster) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
case "PojavLauncher":
chance = 512 * 10
case "FCL":
fallthrough
chance = 512 * 2
}
if chance > 0 {
if randIntn(1024000) < chance/2 {
Expand Down
2 changes: 1 addition & 1 deletion internal/build/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"fmt"
)

const ClusterVersion = "1.9.8"
const ClusterVersion = "1.10.0"

var BuildVersion string = "dev"

Expand Down
21 changes: 21 additions & 0 deletions limited/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,27 @@ func (s *Semaphore) Release() {
<-s.c
}

func (s *Semaphore) Wait() {
if s == nil {
panic("Cannot wait on nil Semaphore")
}
for i := s.Cap(); i > 0; i-- {
s.Acquire()
}
}

func (s *Semaphore) WaitWithContext(ctx context.Context) bool {
if s == nil {
panic("Cannot wait on nil Semaphore")
}
for i := s.Cap(); i > 0; i-- {
if !s.AcquireWithContext(ctx) {
return false
}
}
return true
}

type spProxyReader struct {
io.Reader
released atomic.Bool
Expand Down
27 changes: 19 additions & 8 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,7 @@ func (r *Runner) UpdateFileRecords(files []FileInfo, oldfileset map[string]int64
return
}
defer r.updating.Store(false)

sem := limited.NewSemaphore(12)
log.Info("Begin to update file records")
for _, f := range files {
Expand All @@ -393,15 +394,13 @@ func (r *Runner) UpdateFileRecords(files []FileInfo, oldfileset map[string]int64
Size: f.Size,
})
}
for i := sem.Cap(); i > 0; i-- {
sem.Acquire()
}
sem.Wait()
log.Info("All file records are updated")
}

func (r *Runner) InitSynchronizer(ctx context.Context) {
log.Info(Tr("info.filelist.fetching"))
fl, err := r.cluster.GetFileList(ctx)
fl, err := r.cluster.GetFileList(ctx, 0)
if err != nil {
log.Errorf(Tr("error.filelist.fetch.failed"), err)
if errors.Is(err, context.Canceled) {
Expand Down Expand Up @@ -434,17 +433,29 @@ func (r *Runner) InitSynchronizer(ctx context.Context) {
return
}
}

var lastMod int64
for _, f := range fl {
if f.Mtime > lastMod {
lastMod = f.Mtime
}
}

createInterval(ctx, func() {
log.Info(Tr("info.filelist.fetching"))
fl, err := r.cluster.GetFileList(ctx)
fl, err := r.cluster.GetFileList(ctx, lastMod)
if err != nil {
log.Errorf(Tr("error.cannot.fetch.filelist"), err)
return
}
for _, f := range fl {
if f.Mtime > lastMod {
lastMod = f.Mtime
}
}

checkCount = (checkCount + 1) % heavyCheckInterval
r.cluster.mux.RLock()
oldfileset := r.cluster.fileset
r.cluster.mux.RUnlock()
oldfileset := r.cluster.CloneFileset()
r.cluster.SyncFiles(ctx, fl, heavyCheck && checkCount == 0)
go r.UpdateFileRecords(fl, oldfileset)
if !config.Advanced.NoGC && !config.OnlyGcWhenStart {
Expand Down

0 comments on commit ca7e839

Please sign in to comment.