diff --git a/ais/tgtec.go b/ais/tgtec.go index 2df7f2cccb..b704890856 100644 --- a/ais/tgtec.go +++ b/ais/tgtec.go @@ -117,12 +117,20 @@ func (t *target) httpecpost(w http.ResponseWriter, r *http.Request) { // - to be used to recover individual objects and assorted (ranges, lists of) objects // - requires API & CLI // - remove warning when done - nlog.Warningf("%s[%s] not running or finished - proceeding with %s anyway", t, apc.ActECEncode, uuid, lom.Cname()) - go ec.ECM.TryRecoverObj(lom, nil) // free(lom) inside + nlog.Warningf("%s[%s] not running - proceeding to ec-recover %s anyway..", t, apc.ActECEncode, uuid, lom) + + err := ec.ECM.Recover(lom) + cname := lom.Cname() + core.FreeLOM(lom) + if err != nil { + t.writeErr(w, r, cmn.NewErrFailedTo(t, "EC-recover", cname, err)) + } } else { xbenc, ok := xctn.(*ec.XactBckEncode) debug.Assert(ok, xctn.String()) - xbenc.RecvEncodeMD(lom) + + // async, via j.work + xbenc.RecvRecover(lom) } case apc.ActEcOpen: hk.UnregIf(hkname, closeEc) // just in case, a no-op most of the time diff --git a/ais/tgtobj.go b/ais/tgtobj.go index cd80542a5e..7a84577c82 100644 --- a/ais/tgtobj.go +++ b/ais/tgtobj.go @@ -898,8 +898,8 @@ gfn: } } - // restore from existing EC slices, if possible - ecErr := ec.ECM.RestoreObject(goi.lom, nil /*on-finished callback*/) + // restore from existing EC slices + ecErr := ec.ECM.Recover(goi.lom) if ecErr == nil { ecErr = goi.lom.Load(true /*cache it*/, false /*locked*/) // TODO: optimize locking if ecErr == nil { diff --git a/cmd/cli/go.mod b/cmd/cli/go.mod index de1dee4c53..5392879c31 100644 --- a/cmd/cli/go.mod +++ b/cmd/cli/go.mod @@ -3,7 +3,7 @@ module github.com/NVIDIA/aistore/cmd/cli go 1.23.2 require ( - github.com/NVIDIA/aistore v1.3.26-0.20241021220713-b4666d34154a + github.com/NVIDIA/aistore v1.3.26-0.20241022221110-ec9d71b82df4 github.com/fatih/color v1.17.0 github.com/json-iterator/go v1.1.12 github.com/onsi/ginkgo/v2 v2.20.2 @@ -24,6 +24,7 @@ require ( github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/cpuguy83/go-md2man/v2 v2.0.5 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect + github.com/dgryski/go-metro v0.0.0-20211217172704-adc40b04c140 // indirect github.com/emicklei/go-restful/v3 v3.12.1 // indirect github.com/fxamacker/cbor/v2 v2.7.0 // indirect github.com/go-logr/logr v1.4.2 // indirect @@ -59,6 +60,7 @@ require ( github.com/prometheus/common v0.60.0 // indirect github.com/prometheus/procfs v0.15.1 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect + github.com/seiflotfy/cuckoofilter v0.0.0-20240715131351-a2f2c23f1771 // indirect github.com/spf13/pflag v1.0.5 // indirect github.com/teris-io/shortid v0.0.0-20220617161101-71ec9f2aa569 // indirect github.com/tidwall/btree v1.7.0 // indirect diff --git a/cmd/cli/go.sum b/cmd/cli/go.sum index 8a8eb60aa4..3dda6e0587 100644 --- a/cmd/cli/go.sum +++ b/cmd/cli/go.sum @@ -1,7 +1,7 @@ code.cloudfoundry.org/bytefmt v0.0.0-20190710193110-1eb035ffe2b6/go.mod h1:wN/zk7mhREp/oviagqUXY3EwuHhWyOvAdsn5Y4CzOrc= github.com/BurntSushi/toml v1.4.0/go.mod h1:ukJfTF/6rtPPRCnwkur4qwRxa8vTRFBF0uk2lLoLwho= -github.com/NVIDIA/aistore v1.3.26-0.20241021220713-b4666d34154a h1:wqxdIeqExWJdkFbC23tum4rKOXBeuxAeAxNceJI7tSw= -github.com/NVIDIA/aistore v1.3.26-0.20241021220713-b4666d34154a/go.mod h1:Q6J3YIeiL4A6oWga3qCJ8+XI1CUvdde7Gua/HfueGlQ= +github.com/NVIDIA/aistore v1.3.26-0.20241022221110-ec9d71b82df4 h1:bb2bM8SR+E28B+asH0kbWzfp/b+Yd/96NK/BhimfPL0= +github.com/NVIDIA/aistore v1.3.26-0.20241022221110-ec9d71b82df4/go.mod h1:Q6J3YIeiL4A6oWga3qCJ8+XI1CUvdde7Gua/HfueGlQ= github.com/OneOfOne/xxhash v1.2.8 h1:31czK/TI9sNkxIKfaUfGlU47BAxQ0ztGgd9vPyqimf8= github.com/OneOfOne/xxhash v1.2.8/go.mod h1:eZbhyaAYD41SGSSsnmcpxVoRiQ/MPUTjUdIIOT9Um7Q= github.com/VividCortex/ewma v1.1.1/go.mod h1:2Tkkvm3sRDVXaiyucHiACn4cqf7DpdyLvmxzcbUokwA= @@ -21,6 +21,9 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgryski/go-metro v0.0.0-20200812162917-85c65e2d0165/go.mod h1:c9O8+fpSOX1DM8cPNSkX/qsBWdkD4yd2dpciOWQjpBw= +github.com/dgryski/go-metro v0.0.0-20211217172704-adc40b04c140 h1:y7y0Oa6UawqTFPCDw9JG6pdKt4F9pAhHv0B7FMGaGD0= +github.com/dgryski/go-metro v0.0.0-20211217172704-adc40b04c140/go.mod h1:c9O8+fpSOX1DM8cPNSkX/qsBWdkD4yd2dpciOWQjpBw= github.com/emicklei/go-restful/v3 v3.12.1 h1:PJMDIM/ak7btuL8Ex0iYET9hxM3CI2sjZtzpL63nKAU= github.com/emicklei/go-restful/v3 v3.12.1/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc= github.com/fatih/color v1.17.0 h1:GlRw1BRJxkpqUCBKzKOw098ed57fEsKeNjpTe3cSjK4= @@ -129,6 +132,8 @@ github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99 github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/schollz/progressbar/v2 v2.13.2/go.mod h1:6YZjqdthH6SCZKv2rqGryrxPtfmRB/DWZxSMfCXPyD8= +github.com/seiflotfy/cuckoofilter v0.0.0-20240715131351-a2f2c23f1771 h1:emzAzMZ1L9iaKCTxdy3Em8Wv4ChIAGnfiz18Cda70g4= +github.com/seiflotfy/cuckoofilter v0.0.0-20240715131351-a2f2c23f1771/go.mod h1:bR6DqgcAl1zTcOX8/pE2Qkj9XO00eCNqmKb7lXP8EAg= github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= @@ -136,6 +141,7 @@ github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSS github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= @@ -246,6 +252,7 @@ gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.0-20200605160147-a5ece683394c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= k8s.io/api v0.31.1 h1:Xe1hX/fPW3PXYYv8BlozYqw63ytA92snr96zMW9gWTU= diff --git a/ec/bencodex.go b/ec/bencodex.go index 10e9a2f8e4..0a039835ed 100644 --- a/ec/bencodex.go +++ b/ec/bencodex.go @@ -7,6 +7,7 @@ package ec import ( "fmt" "os" + "runtime" "sync" "time" @@ -48,9 +49,10 @@ type ( checkAndRecover bool } rcvyJogger struct { - mi *fs.Mountpath - workCh chan *core.LOM - parent *XactBckEncode + mi *fs.Mountpath + workCh chan *core.LOM + parent *XactBckEncode + chanFull atomic.Int64 } ) @@ -272,7 +274,7 @@ func (r *XactBckEncode) bckEncodeMD(ct *core.CT, _ []byte) error { return core.T.ECRestoreReq(ct, tsi, r.ID()) } -func (r *XactBckEncode) RecvEncodeMD(lom *core.LOM) { +func (r *XactBckEncode) RecvRecover(lom *core.LOM) { r.last.Store(mono.NanoTime()) uname := lom.UnamePtr() @@ -292,13 +294,20 @@ func (r *XactBckEncode) RecvEncodeMD(lom *core.LOM) { j.workCh <- lom } -// TODO: add stats counting recovered slices, etc. func (r *XactBckEncode) setLast(lom *core.LOM, err error) { - if err == nil { + switch { + case err == nil: + r.LomAdd(lom) // TODO: instead, count restored slices, metafiles, possibly - objects r.last.Store(mono.NanoTime()) - } else if err != errSkipped { + case err == ErrorECDisabled: + r.Abort(err) + case err == errSkipped: + // do nothing + default: r.AddErr(err) - _errec(lom, err) + if cmn.Rom.FastV(4, cos.SmoduleEC) { + nlog.Warningln(core.T.String(), "failed to check-and-recover", lom.Cname(), "err:", err) + } } } @@ -312,6 +321,21 @@ func (j *rcvyJogger) run() { if !ok { break } - ECM.TryRecoverObj(lom, j.parent.setLast) // free(lom) inside + if l, c := len(j.workCh), cap(j.workCh); l > (c - c>>2) { + runtime.Gosched() // poor man's throttle + if l == c { + j.chanFull.Inc() + } + } + err := ECM.Recover(lom) + j.parent.setLast(lom, err) + core.FreeLOM(lom) + } + if cnt := j.chanFull.Load(); cnt > 1 { + nlog.Warningln(j.String(), cos.ErrWorkChanFull, "cnt:", cnt) } } + +func (j *rcvyJogger) String() string { + return fmt.Sprint("[ rcvy-j", j.mi.String(), j.parent.Name(), "]") +} diff --git a/ec/getx.go b/ec/getx.go index 8a8893de7c..a4447c75e4 100644 --- a/ec/getx.go +++ b/ec/getx.go @@ -248,7 +248,6 @@ func (r *XactGet) stop() { // a nil value from channel but ecrunner keeps working - it reuploads all missing // slices or copies func (r *XactGet) decode(req *request, lom *core.LOM) { - debug.Assert(req.Action == ActRestore, "invalid action: ", req.Action) r.stats.updateDecode() req.putTime = time.Now() req.tm = req.putTime diff --git a/ec/manager.go b/ec/manager.go index cd24c6c632..948684352e 100644 --- a/ec/manager.go +++ b/ec/manager.go @@ -288,26 +288,6 @@ func (mgr *Manager) CleanupObject(lom *core.LOM) { mgr.RestoreBckPutXact(lom.Bck()).cleanup(req, lom) } -func (mgr *Manager) RestoreObject(lom *core.LOM, cb core.OnFinishObj) error { - if !lom.ECEnabled() { - return ErrorECDisabled - } - cs := fs.Cap() - if err := cs.Err(); err != nil { - return err - } - - debug.Assert(lom.Mountpath() != nil && lom.Mountpath().Path != "") - req := allocateReq(ActRestore, lom.LIF()) - errCh := make(chan error) // unbuffered - req.ErrCh = errCh - req.Callback = cb - mgr.RestoreBckGetXact(lom.Bck()).decode(req, lom) - - // wait for EC completes restoring the object - return <-errCh -} - // disableBck starts to reject new EC requests, rejects pending ones func (mgr *Manager) disableBck(bck *meta.Bck) { mgr.RestoreBckGetXact(bck).ClearRequests() @@ -350,19 +330,20 @@ func (mgr *Manager) BMDChanged() error { return nil } -// TODO -- FIXME: joggers, etc. -func (mgr *Manager) TryRecoverObj(lom *core.LOM, cb core.OnFinishObj) { - go func() { - err := mgr.RestoreObject(lom, cb) - if cb != nil { - cb(lom, err) - } else if err != nil { - _errec(lom, err) - } - core.FreeLOM(lom) - }() -} +func (mgr *Manager) Recover(lom *core.LOM) error { + if !lom.ECEnabled() { + return ErrorECDisabled + } + cs := fs.Cap() + if err := cs.Err(); err != nil { + return err + } -func _errec(lom *core.LOM, err error) { - nlog.Errorln(core.T.String(), "failed to check-and-recover", lom.Cname(), "err:", err) + req := allocateReq(ActRestore, lom.LIF()) + errCh := make(chan error) // unbuffered + req.ErrCh = errCh + mgr.RestoreBckGetXact(lom.Bck()).decode(req, lom) + + // wait + return <-errCh }