Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor #71

Draft
wants to merge 37 commits into
base: master
Choose a base branch
from
Draft
Changes from 1 commit
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
72e0118
start refactor
zyxkad Jun 11, 2024
e081599
refactoring cluster
zyxkad Jun 18, 2024
5e89ede
add socket logic
zyxkad Jun 20, 2024
103c480
migrated more cluster
zyxkad Jun 26, 2024
451a665
abstract subscription, token, and user api
zyxkad Jun 27, 2024
8cbd7c5
fix wrong jwt subject usage
zyxkad Jun 27, 2024
e17d8c7
fix format
zyxkad Jun 27, 2024
f2dbcbf
add permHandler
zyxkad Jun 28, 2024
b7b6cc6
complete api
zyxkad Jun 29, 2024
a498e11
go fmt
zyxkad Jul 1, 2024
0ab1240
Merge branch 'master' into refactor
zyxkad Aug 6, 2024
10eee06
update dockerfile
zyxkad Aug 6, 2024
8dca5cb
seperate config
zyxkad Aug 8, 2024
9d44901
fix notifier error
zyxkad Aug 8, 2024
caae4f7
add webhook
zyxkad Aug 8, 2024
74c811d
add license header to installer.sh
zyxkad Aug 9, 2024
18ee907
update cluster handler
zyxkad Aug 10, 2024
4fe7a79
start to reforge storage sync
zyxkad Aug 11, 2024
3c20fd8
run go fmt
zyxkad Aug 11, 2024
173aced
refactored most stuffs
zyxkad Aug 11, 2024
650b0c3
refactored all errors
zyxkad Aug 12, 2024
1d399d9
add gc
zyxkad Aug 13, 2024
e723847
fix certificate request logic
zyxkad Aug 13, 2024
5465766
add report API
zyxkad Aug 14, 2024
16e9db4
seperate runner
zyxkad Aug 17, 2024
88b902b
refactor file download
zyxkad Aug 17, 2024
8cf633b
fill more manager
zyxkad Aug 17, 2024
f2a883d
implemented singleUserManager
zyxkad Aug 17, 2024
a09e758
run go fmt
zyxkad Aug 17, 2024
29265d0
bump go version to 1.23
zyxkad Aug 17, 2024
d436d88
use report API
zyxkad Aug 17, 2024
8698d11
fix nil pointer config
zyxkad Aug 17, 2024
774a388
fix typo
zyxkad Aug 17, 2024
9308d58
fix http client
zyxkad Aug 17, 2024
0ec5189
fix report API, and a few translations
zyxkad Aug 18, 2024
07bf29d
fix unused import
zyxkad Aug 18, 2024
62c50ae
update config
zyxkad Oct 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
refactoring cluster
zyxkad committed Jun 18, 2024

Verified

This commit was signed with the committer’s verified signature.
commit e0815997026e9b164c19b2e745fceab0dee397b1
309 changes: 8 additions & 301 deletions cluster.go
Original file line number Diff line number Diff line change
@@ -29,7 +29,6 @@
"net/http"
"os"
"path/filepath"
"regexp"
"runtime"
"sync"
"sync/atomic"
@@ -52,10 +51,6 @@
"github.com/LiterMC/go-openbmclapi/utils"
)

var (
reFileHashMismatchError = regexp.MustCompile(` hash mismatch, expected ([0-9a-f]+), got ([0-9a-f]+)`)
)

type Cluster struct {
host string // not the public access host, but maybe a public IP, or a host that will be resolved to the IP
publicHosts []string // should not contains port, can be nil
@@ -74,7 +69,7 @@
storageTotalWeight uint
cache gocache.Cache
apiHmacKey []byte
hijackProxy *HjProxy

Check failure on line 72 in cluster.go

GitHub Actions / test

undefined: HjProxy

stats notify.Stats
lastHits, statOnlyHits atomic.Int32
@@ -96,7 +91,7 @@
filesetMux sync.RWMutex
fileset map[string]int64
authTokenMux sync.RWMutex
authToken *ClusterToken

Check failure on line 94 in cluster.go

GitHub Actions / test

undefined: ClusterToken

client *http.Client
cachedCli *http.Client
@@ -145,7 +140,7 @@
clusterSecret: clusterSecret,
prefix: prefix,
byoc: byoc,
jwtIssuer: jwtIssuerPrefix + "#" + clusterId,

Check failure on line 143 in cluster.go

GitHub Actions / test

undefined: jwtIssuerPrefix

dataDir: filepath.Join(baseDir, "data"),
maxConn: config.DownloadMaxConn,
@@ -204,7 +199,7 @@
}

if config.Hijack.Enable {
cr.hijackProxy = NewHjProxy(cr.client, cr.database, cr.handleDownload)

Check failure on line 202 in cluster.go

GitHub Actions / test

undefined: NewHjProxy
if config.Hijack.EnableLocalCache {
os.MkdirAll(config.Hijack.LocalCachePath, 0755)
}
@@ -252,11 +247,11 @@
defer ticker.Stop()

if err := cr.checkUpdate(); err != nil {
log.Errorf(Tr("error.update.check.failed"), err)
log.TrErrorf("error.update.check.failed", err)
}
for range ticker.C {
if err := cr.checkUpdate(); err != nil {
log.Errorf(Tr("error.update.check.failed"), err)
log.TrErrorf("error.update.check.failed", err)
}
}
}(cr.updateChecker)
@@ -286,10 +281,10 @@
return true
}

_, err := cr.GetAuthToken(ctx)

Check failure on line 284 in cluster.go

GitHub Actions / test

cr.GetAuthToken undefined (type *Cluster has no field or method GetAuthToken)
if err != nil {
log.Errorf(Tr("error.cluster.auth.failed"), err)
log.TrErrorf("error.cluster.auth.failed", err)
osExit(CodeClientOrServerError)

Check failure on line 287 in cluster.go

GitHub Actions / test

undefined: CodeClientOrServerError
}

engio, err := engine.NewSocket(engine.Options{
@@ -303,7 +298,7 @@
})
if err != nil {
log.Errorf("Could not parse Engine.IO options: %v; exit.", err)
osExit(CodeClientUnexpectedError)

Check failure on line 301 in cluster.go

GitHub Actions / test

undefined: CodeClientUnexpectedError
}

cr.reconnectCount = 0
@@ -331,15 +326,15 @@
if config.MaxReconnectCount == 0 {
if cr.shouldEnable.Load() {
log.Errorf("Cluster disconnected from remote; exit.")
osExit(CodeServerOrEnvionmentError)

Check failure on line 329 in cluster.go

GitHub Actions / test

undefined: CodeServerOrEnvionmentError
}
}
if !connected {
cr.reconnectCount++
if config.MaxReconnectCount > 0 && cr.reconnectCount >= config.MaxReconnectCount {
if cr.shouldEnable.Load() {
log.Error(Tr("error.cluster.connect.failed.toomuch"))
log.TrErrorf("error.cluster.connect.failed.toomuch")
osExit(CodeServerOrEnvionmentError)

Check failure on line 337 in cluster.go

GitHub Actions / test

undefined: CodeServerOrEnvionmentError
}
}
}
@@ -348,11 +343,11 @@
})
engio.OnDialError(func(_ *engine.Socket, err error) {
cr.reconnectCount++
log.Errorf(Tr("error.cluster.connect.failed"), cr.reconnectCount, config.MaxReconnectCount, err)
log.TrErrorf("error.cluster.connect.failed", cr.reconnectCount, config.MaxReconnectCount, err)
if config.MaxReconnectCount >= 0 && cr.reconnectCount >= config.MaxReconnectCount {
if cr.shouldEnable.Load() {
log.Error(Tr("error.cluster.connect.failed.toomuch"))
log.TrErrorf("error.cluster.connect.failed.toomuch")
osExit(CodeServerOrEnvionmentError)

Check failure on line 350 in cluster.go

GitHub Actions / test

undefined: CodeServerOrEnvionmentError
}
}
})
@@ -360,7 +355,7 @@
cr.socket = socket.NewSocket(engio, socket.WithAuthTokenFn(func() string {
token, err := cr.GetAuthToken(ctx)
if err != nil {
log.Errorf(Tr("error.cluster.auth.failed"), err)
log.TrErrorf("error.cluster.auth.failed", err)
osExit(CodeServerOrEnvionmentError)
}
return token
@@ -373,7 +368,7 @@
log.Debugf("shouldEnable is %v", cr.shouldEnable.Load())
if cr.shouldEnable.Load() {
if err := cr.Enable(ctx); err != nil {
log.Errorf(Tr("error.cluster.enable.failed"), err)
log.TrErrorf("error.cluster.enable.failed", err)
osExit(CodeClientOrEnvionmentError)
}
}
@@ -406,229 +401,6 @@
return true
}

func (cr *Cluster) WaitForEnable() <-chan struct{} {
if cr.enabled.Load() {
return closedCh
}

cr.mux.Lock()
defer cr.mux.Unlock()

if cr.enabled.Load() {
return closedCh
}
ch := make(chan struct{}, 0)
cr.waitEnable = append(cr.waitEnable, ch)
return ch
}

type EnableData struct {
Host string `json:"host"`
Port uint16 `json:"port"`
Version string `json:"version"`
Byoc bool `json:"byoc"`
NoFastEnable bool `json:"noFastEnable"`
Flavor ConfigFlavor `json:"flavor"`
}

type ConfigFlavor struct {
Runtime string `json:"runtime"`
Storage string `json:"storage"`
}

func (cr *Cluster) Enable(ctx context.Context) (err error) {
cr.mux.Lock()
defer cr.mux.Unlock()

if cr.enabled.Load() {
log.Debug("Extra enable")
return
}

if cr.socket != nil && !cr.socket.IO().Connected() && config.MaxReconnectCount == 0 {
log.Error(Tr("error.cluster.disconnected"))
osExit(CodeServerOrEnvionmentError)
return
}

cr.shouldEnable.Store(true)

storagesCount := make(map[string]int, 2)
for _, s := range cr.storageOpts {
switch s.Type {
case storage.StorageLocal:
storagesCount["file"]++
case storage.StorageMount, storage.StorageWebdav:
storagesCount["alist"]++
default:
log.Errorf("Unknown storage type %q", s.Type)
}
}
storageStr := ""
for s, _ := range storagesCount {
if len(storageStr) > 0 {
storageStr += "+"
}
storageStr += s
}

log.Info(Tr("info.cluster.enable.sending"))
resCh, err := cr.socket.EmitWithAck("enable", EnableData{
Host: cr.host,
Port: cr.publicPort,
Version: build.ClusterVersion,
Byoc: cr.byoc,
NoFastEnable: config.Advanced.NoFastEnable,
Flavor: ConfigFlavor{
Runtime: "golang/" + runtime.GOOS + "-" + runtime.GOARCH,
Storage: storageStr,
},
})
if err != nil {
return
}
var data []any
tctx, cancel := context.WithTimeout(ctx, time.Minute*6)
select {
case <-tctx.Done():
cancel()
return tctx.Err()
case data = <-resCh:
cancel()
}
log.Debug("got enable ack:", data)
if ero := data[0]; ero != nil {
if ero, ok := ero.(map[string]any); ok {
if msg, ok := ero["message"].(string); ok {
if hashMismatch := reFileHashMismatchError.FindStringSubmatch(msg); hashMismatch != nil {
hash := hashMismatch[1]
log.Warnf("Detected hash mismatch error, removing bad file %s", hash)
for _, s := range cr.storages {
s.Remove(hash)
}
}
return fmt.Errorf("Enable failed: %v", msg)
}
}
return fmt.Errorf("Enable failed: %v", ero)
}
if !data[1].(bool) {
return errors.New("Enable ack non true value")
}
log.Info(Tr("info.cluster.enabled"))
cr.reconnectCount = 0
cr.disabled = make(chan struct{}, 0)
cr.enabled.Store(true)
for _, ch := range cr.waitEnable {
close(ch)
}
cr.waitEnable = cr.waitEnable[:0]
go cr.notifyManager.OnEnabled()

const maxFailCount = 3
var (
keepaliveCtx context.Context
failedCount = 0
)
keepaliveCtx, cr.cancelKeepalive = context.WithCancel(ctx)
createInterval(keepaliveCtx, func() {
tctx, cancel := context.WithTimeout(keepaliveCtx, KeepAliveInterval/2)
status := cr.KeepAlive(tctx)
cancel()
if status == 0 {
failedCount = 0
return
}
if status == -1 {
log.Errorf("Kicked by remote server!!!")
osExit(CodeEnvironmentError)
return
}
if keepaliveCtx.Err() == nil {
if tctx.Err() != nil {
failedCount++
log.Warnf("keep-alive failed (%d/%d)", failedCount, maxFailCount)
if failedCount < maxFailCount {
return
}
}
log.Info(Tr("info.cluster.reconnect.keepalive"))
cr.disable(ctx)
log.Info(Tr("info.cluster.reconnecting"))
if !cr.Connect(ctx) {
log.Error(Tr("error.cluster.reconnect.failed"))
if ctx.Err() != nil {
return
}
osExit(CodeServerOrEnvionmentError)
}
if err := cr.Enable(ctx); err != nil {
log.Errorf(Tr("error.cluster.enable.failed"), err)
if ctx.Err() != nil {
return
}
osExit(CodeClientOrEnvionmentError)
}
}
}, KeepAliveInterval)
return
}

// KeepAlive will fresh hits & hit bytes data and send the keep-alive packet
func (cr *Cluster) KeepAlive(ctx context.Context) (status int) {
hits, hbts := cr.stats.GetTmpHits()
lhits, lhbts := cr.lastHits.Load(), cr.lastHbts.Load()
hits2, hbts2 := cr.statOnlyHits.Load(), cr.statOnlyHbts.Load()
ahits, ahbts := hits-lhits-hits2, hbts-lhbts-hbts2
resCh, err := cr.socket.EmitWithAck("keep-alive", Map{
"time": time.Now().UTC().Format("2006-01-02T15:04:05Z"),
"hits": ahits,
"bytes": ahbts,
})
go cr.notifyManager.OnReportStatus(&cr.stats)

if e := cr.stats.Save(cr.dataDir); e != nil {
log.Errorf(Tr("error.cluster.stat.save.failed"), e)
}
if err != nil {
log.Errorf(Tr("error.cluster.keepalive.send.failed"), err)
return 1
}
var data []any
select {
case <-ctx.Done():
return 1
case data = <-resCh:
}
log.Debugf("Keep-alive response: %v", data)
if ero := data[0]; len(data) <= 1 || ero != nil {
if ero, ok := ero.(map[string]any); ok {
if msg, ok := ero["message"].(string); ok {
if hashMismatch := reFileHashMismatchError.FindStringSubmatch(msg); hashMismatch != nil {
hash := hashMismatch[1]
log.Warnf("Detected hash mismatch error, removing bad file %s", hash)
for _, s := range cr.storages {
s.Remove(hash)
}
}
log.Errorf(Tr("error.cluster.keepalive.failed"), msg)
return 1
}
}
log.Errorf(Tr("error.cluster.keepalive.failed"), ero)
return 1
}
log.Infof(Tr("info.cluster.keepalive.success"), ahits, utils.BytesToUnit((float64)(ahbts)), data[1])
cr.lastHits.Store(hits)
cr.lastHbts.Store(hbts)
cr.statOnlyHits.Add(-hits2)
cr.statOnlyHbts.Add(-hbts2)
if data[1] == false {
return -1
}
return 0
}

func (cr *Cluster) disconnected() bool {
cr.mux.Lock()
defer cr.mux.Unlock()
@@ -644,11 +416,6 @@
return true
}

func (cr *Cluster) Disable(ctx context.Context) (ok bool) {
cr.shouldEnable.Store(false)
return cr.disable(ctx)
}

func (cr *Cluster) disable(ctx context.Context) (ok bool) {
cr.mux.Lock()
defer cr.mux.Unlock()
@@ -698,63 +465,3 @@
log.Warn(Tr("warn.cluster.disabled"))
return
}

func (cr *Cluster) Enabled() bool {
return cr.enabled.Load()
}

func (cr *Cluster) Disabled() <-chan struct{} {
cr.mux.RLock()
defer cr.mux.RUnlock()
return cr.disabled
}

type CertKeyPair struct {
Cert string `json:"cert"`
Key string `json:"key"`
}

func (cr *Cluster) RequestCert(ctx context.Context) (ckp *CertKeyPair, err error) {
resCh, err := cr.socket.EmitWithAck("request-cert")
if err != nil {
return
}
var data []any
select {
case <-ctx.Done():
return nil, ctx.Err()
case data = <-resCh:
}
if ero := data[0]; ero != nil {
err = fmt.Errorf("socket.io remote error: %v", ero)
return
}
pair := data[1].(map[string]any)
ckp = &CertKeyPair{
Cert: pair["cert"].(string),
Key: pair["key"].(string),
}
return
}

func (cr *Cluster) GetConfig(ctx context.Context) (cfg *OpenbmclapiAgentConfig, err error) {
req, err := cr.makeReqWithAuth(ctx, http.MethodGet, "/openbmclapi/configuration", nil)
if err != nil {
return
}
res, err := cr.cachedCli.Do(req)
if err != nil {
return
}
defer res.Body.Close()
if res.StatusCode != http.StatusOK {
err = utils.NewHTTPStatusErrorFromResponse(res)
return
}
cfg = new(OpenbmclapiAgentConfig)
if err = json.NewDecoder(res.Body).Decode(cfg); err != nil {
cfg = nil
return
}
return
}
Loading