From 8587ca80918e8965946afff6541bb89998c314dd Mon Sep 17 00:00:00 2001 From: Alex Aizman Date: Tue, 22 Oct 2024 11:57:50 -0400 Subject: [PATCH] EC: recover missing or corrupted data and/or metadata * route to specific ec-encoding xaction * filter out duplicated requests (optimize) * when recovering, try to wait longer for in-flight requests * part three, prev. commit: b4666d34154a9 Signed-off-by: Alex Aizman --- ais/prxclu.go | 5 +++- ais/test/ec_test.go | 9 ++++--- ais/tgtec.go | 37 ++++++++++++++++++++-------- ais/tgtimpl.go | 3 ++- ais/tgtobj.go | 2 +- cmd/cli/go.mod | 2 +- cmd/cli/go.sum | 4 +-- core/mock/target_mock.go | 2 +- core/target.go | 2 +- ec/bencodex.go | 53 +++++++++++++++++++++++++++++++--------- ec/getx.go | 9 ++++--- ec/manager.go | 9 ++++--- 12 files changed, 98 insertions(+), 39 deletions(-) diff --git a/ais/prxclu.go b/ais/prxclu.go index 457ad68a8e..424c907930 100644 --- a/ais/prxclu.go +++ b/ais/prxclu.go @@ -940,8 +940,11 @@ func (p *proxy) _syncFinal(ctx *smapModifier, clone *smapX) { debug.Assert(clone._sgl != nil) config, err := p.ensureConfigURLs() + if err != nil { + debug.Assert(nlog.Stopping(), err) + return + } if config != nil /*updated*/ { - debug.AssertNoErr(err) pairs = append(pairs, revsPair{config, aisMsg}) } diff --git a/ais/test/ec_test.go b/ais/test/ec_test.go index bd3377be4c..db70771a24 100644 --- a/ais/test/ec_test.go +++ b/ais/test/ec_test.go @@ -2835,17 +2835,18 @@ func TestECBckEncodeRecover(t *testing.T) { o := ecOptions{ minTargets: 4, - objCount: 40, + objCount: 128, concurrency: 8, pattern: "obj-%04d", silent: testing.Short(), }.init(t, proxyURL) - // Damage this number of objects for each case: object and slice + + // Damage this number of objects for each test case objToDamage := o.objCount / 8 initMountpaths(t, proxyURL) - tassert.Fatalf(t, o.objCount > 2*objToDamage, "The total number of objects must be twice as greater as the number of damaged ones") - + tassert.Fatalf(t, o.objCount > 2*objToDamage, + "The total number of objects must be twice as greater as the number of damaged ones") var ( mtx sync.Mutex objSlicesOrig = make(map[string]map[string]ecSliceMD, o.objCount) diff --git a/ais/tgtec.go b/ais/tgtec.go index c4f69f1845..ecdc17f44b 100644 --- a/ais/tgtec.go +++ b/ais/tgtec.go @@ -15,11 +15,13 @@ import ( "github.com/NVIDIA/aistore/api/apc" "github.com/NVIDIA/aistore/cmn" "github.com/NVIDIA/aistore/cmn/cos" + "github.com/NVIDIA/aistore/cmn/debug" "github.com/NVIDIA/aistore/cmn/nlog" "github.com/NVIDIA/aistore/core" "github.com/NVIDIA/aistore/core/meta" "github.com/NVIDIA/aistore/ec" "github.com/NVIDIA/aistore/hk" + "github.com/NVIDIA/aistore/xact/xreg" ) var errCloseStreams = errors.New("EC is currently active, cannot close streams") @@ -88,29 +90,44 @@ func (t *target) httpecpost(w http.ResponseWriter, r *http.Request) { query := r.URL.Query() objName := query.Get(apc.QparamECObject) if objName == "" { - err := fmt.Errorf("%s: invalid request to recover an object: name's empty", t) + err := fmt.Errorf("%s: invalid ec-recover request: object name's empty", t) t.writeErr(w, r, err) return } - lom := core.AllocLOM(objName) - mbck, err := newBckFromQuname(query, true) + lom := core.AllocLOM(objName) + bck, err := newBckFromQuname(query, true) if err != nil { core.FreeLOM(lom) - err = fmt.Errorf("%s: %v", t, err) // FATAL/unlikely + err := fmt.Errorf("%s: %v", t, err) // (unlikely) t.writeErr(w, r, err) return } - - bck := mbck.Bucket() - if err := lom.InitBck(bck); err != nil { + if err := lom.InitBck(bck.Bucket()); err != nil { core.FreeLOM(lom) - err = fmt.Errorf("%s: %v", t, err) + err := fmt.Errorf("%s: %v", t, err) t.writeErr(w, r, err) return } - ec.ECM.TryRecoverObj(lom) // free LOM inside - return + + // [TODO] + // this target's endpoint can also be used to recover individual objects + // and selected (multi)objects + // but right now we only run it as part of a bucket encoding xaction + // (see checkAndRecover) + uuid := query.Get(apc.QparamUUID) + xctn, errN := xreg.GetXact(uuid) + if errN != nil || xctn == nil { + err := fmt.Errorf("%s: failed to find %s[%s] to recover %s: %v", + t, apc.ActECEncode, uuid, lom.Cname(), errN) + t.writeErr(w, r, err) + return + } + xbenc, ok := xctn.(*ec.XactBckEncode) + debug.Assert(ok, xctn.String()) + + // do + xbenc.RecvEncodeMD(lom) case apc.ActEcOpen: hk.UnregIf(hkname, closeEc) // just in case, a no-op most of the time ec.ECM.OpenStreams(false /*with refc*/) diff --git a/ais/tgtimpl.go b/ais/tgtimpl.go index befaec8ff9..92a9f3e3f4 100644 --- a/ais/tgtimpl.go +++ b/ais/tgtimpl.go @@ -371,10 +371,11 @@ func (t *target) _promRemote(params *core.PromoteParams, lom *core.LOM, tsi *met return size, err } -func (t *target) ECRestoreReq(ct *core.CT, tsi *meta.Snode) error { +func (t *target) ECRestoreReq(ct *core.CT, tsi *meta.Snode, uuid string) error { q := ct.Bck().NewQuery() ct.Bck().AddUnameToQuery(q, apc.QparamBckTo) q.Set(apc.QparamECObject, ct.ObjectName()) + q.Set(apc.QparamUUID, uuid) cargs := allocCargs() { cargs.si = tsi diff --git a/ais/tgtobj.go b/ais/tgtobj.go index d0c1ec0f7a..cd80542a5e 100644 --- a/ais/tgtobj.go +++ b/ais/tgtobj.go @@ -899,7 +899,7 @@ gfn: } // restore from existing EC slices, if possible - ecErr := ec.ECM.RestoreObject(goi.lom) + ecErr := ec.ECM.RestoreObject(goi.lom, nil /*on-finished callback*/) if ecErr == nil { ecErr = goi.lom.Load(true /*cache it*/, false /*locked*/) // TODO: optimize locking if ecErr == nil { diff --git a/cmd/cli/go.mod b/cmd/cli/go.mod index 705965c735..de1dee4c53 100644 --- a/cmd/cli/go.mod +++ b/cmd/cli/go.mod @@ -3,7 +3,7 @@ module github.com/NVIDIA/aistore/cmd/cli go 1.23.2 require ( - github.com/NVIDIA/aistore v1.3.26-0.20241021212206-e65b4c2efcf3 + github.com/NVIDIA/aistore v1.3.26-0.20241021220713-b4666d34154a github.com/fatih/color v1.17.0 github.com/json-iterator/go v1.1.12 github.com/onsi/ginkgo/v2 v2.20.2 diff --git a/cmd/cli/go.sum b/cmd/cli/go.sum index 9cc2df043d..8a8eb60aa4 100644 --- a/cmd/cli/go.sum +++ b/cmd/cli/go.sum @@ -1,7 +1,7 @@ code.cloudfoundry.org/bytefmt v0.0.0-20190710193110-1eb035ffe2b6/go.mod h1:wN/zk7mhREp/oviagqUXY3EwuHhWyOvAdsn5Y4CzOrc= github.com/BurntSushi/toml v1.4.0/go.mod h1:ukJfTF/6rtPPRCnwkur4qwRxa8vTRFBF0uk2lLoLwho= -github.com/NVIDIA/aistore v1.3.26-0.20241021212206-e65b4c2efcf3 h1:apzFZIz80x8HYgFG1PFnNZ1JUVAfiaHhnyhSJjiqriw= -github.com/NVIDIA/aistore v1.3.26-0.20241021212206-e65b4c2efcf3/go.mod h1:Q6J3YIeiL4A6oWga3qCJ8+XI1CUvdde7Gua/HfueGlQ= +github.com/NVIDIA/aistore v1.3.26-0.20241021220713-b4666d34154a h1:wqxdIeqExWJdkFbC23tum4rKOXBeuxAeAxNceJI7tSw= +github.com/NVIDIA/aistore v1.3.26-0.20241021220713-b4666d34154a/go.mod h1:Q6J3YIeiL4A6oWga3qCJ8+XI1CUvdde7Gua/HfueGlQ= github.com/OneOfOne/xxhash v1.2.8 h1:31czK/TI9sNkxIKfaUfGlU47BAxQ0ztGgd9vPyqimf8= github.com/OneOfOne/xxhash v1.2.8/go.mod h1:eZbhyaAYD41SGSSsnmcpxVoRiQ/MPUTjUdIIOT9Um7Q= github.com/VividCortex/ewma v1.1.1/go.mod h1:2Tkkvm3sRDVXaiyucHiACn4cqf7DpdyLvmxzcbUokwA= diff --git a/core/mock/target_mock.go b/core/mock/target_mock.go index f23d31cfd5..cadc0310d2 100644 --- a/core/mock/target_mock.go +++ b/core/mock/target_mock.go @@ -86,6 +86,6 @@ func (*TargetMock) Health(*meta.Snode, time.Duration, url.Values) ([]byte, int, return nil, 0, nil } -func (*TargetMock) ECRestoreReq(*core.CT, *meta.Snode) error { +func (*TargetMock) ECRestoreReq(*core.CT, *meta.Snode, string) error { return nil } diff --git a/core/target.go b/core/target.go index d917bbc552..d616b4e6b5 100644 --- a/core/target.go +++ b/core/target.go @@ -128,7 +128,7 @@ type ( Promote(params *PromoteParams) (ecode int, err error) HeadObjT2T(lom *LOM, si *meta.Snode) bool - ECRestoreReq(ct *CT, si *meta.Snode) error + ECRestoreReq(ct *CT, si *meta.Snode, uuid string) error BMDVersionFixup(r *http.Request, bck ...cmn.Bck) } diff --git a/ec/bencodex.go b/ec/bencodex.go index e597de8590..20a714c5fb 100644 --- a/ec/bencodex.go +++ b/ec/bencodex.go @@ -8,10 +8,13 @@ import ( "fmt" "os" "sync" + "time" "github.com/NVIDIA/aistore/api/apc" "github.com/NVIDIA/aistore/cmn" + "github.com/NVIDIA/aistore/cmn/cos" "github.com/NVIDIA/aistore/cmn/nlog" + "github.com/NVIDIA/aistore/cmn/prob" "github.com/NVIDIA/aistore/core" "github.com/NVIDIA/aistore/core/meta" "github.com/NVIDIA/aistore/fs" @@ -31,6 +34,7 @@ type ( bck *meta.Bck wg *sync.WaitGroup // to wait for EC finishes all objects smap *meta.Smap + probFilter *prob.Filter checkAndRecover bool } ) @@ -80,7 +84,11 @@ func newXactBckEncode(bck *meta.Bck, uuid string, checkAndRecover bool) (r *Xact bck: bck, wg: &sync.WaitGroup{}, smap: core.T.Sowner().Get(), - checkAndRecover: checkAndRecover} + checkAndRecover: checkAndRecover, + } + if checkAndRecover { + r.probFilter = prob.NewDefaultFilter() + } r.InitBase(uuid, apc.ActECEncode, bck) return } @@ -113,7 +121,8 @@ func (r *XactBckEncode) Run(wg *sync.WaitGroup) { } opts.Bck.Copy(r.bck.Bucket()) - jg := mpather.NewJoggerGroup(opts, cmn.GCO.Get(), nil) + config := cmn.GCO.Get() + jg := mpather.NewJoggerGroup(opts, config, nil) jg.Run() select { @@ -125,6 +134,10 @@ func (r *XactBckEncode) Run(wg *sync.WaitGroup) { r.AddErr(err) } } + if r.checkAndRecover { + // TODO -- FIXME: rm sleep; may still terminate prematurely vs RecvEncodeMD flow + time.Sleep(config.Timeout.MaxKeepalive.D() << 1) + } r.wg.Wait() // Need to wait for all async actions to finish. r.Finish() @@ -136,9 +149,13 @@ func (r *XactBckEncode) afterECObj(lom *core.LOM, err error) { if err == nil { r.LomAdd(lom) } else if err != errSkipped { - nlog.Errorf("failed to erasure-code %s: %v", lom.Cname(), err) + r.AddErr(err) + if r.checkAndRecover { + nlog.Errorln("failed to check-and-recover", lom.Cname(), "err:", err) + } else { + nlog.Errorln("failed to erasure-code", lom.Cname(), "err:", err) + } } - r.wg.Done() } @@ -148,8 +165,7 @@ func (r *XactBckEncode) afterECObj(lom *core.LOM, err error) { func (r *XactBckEncode) bckEncode(lom *core.LOM, _ []byte) error { _, local, err := lom.HrwTarget(r.smap) if err != nil { - nlog.Errorf("%s: %s", lom, err) - return nil + return err } // An object replica - skip EC. if !local { @@ -157,9 +173,10 @@ func (r *XactBckEncode) bckEncode(lom *core.LOM, _ []byte) error { } mdFQN, _, err := core.HrwFQN(lom.Bck().Bucket(), fs.ECMetaType, lom.ObjName) if err != nil { - nlog.Warningln("metadata FQN generation failed", lom, ":", err) - return nil + nlog.Warningln("failed to generate md FQN for", lom.Cname(), "err:", err) + return err } + md, err := LoadMetadata(mdFQN) // If metafile exists, the object has been already encoded. But for // replicated objects we have to fall through. Otherwise, bencode @@ -168,9 +185,9 @@ func (r *XactBckEncode) bckEncode(lom *core.LOM, _ []byte) error { return nil } if err != nil && !os.IsNotExist(err) { - nlog.Warningln("failed to stat ", mdFQN, ":", err, "Deleting...") + nlog.Warningln("failed to fstat", mdFQN, "err:", err) if errDel := os.Remove(mdFQN); errDel != nil { - nlog.Warningln("failed to delete broken metafile ", mdFQN, ":", errDel) + nlog.Warningln("nested err: failed to delete broken metafile:", errDel) return nil } } @@ -209,5 +226,19 @@ func (r *XactBckEncode) bckEncodeMD(ct *core.CT, _ []byte) error { if tsi.ID() == core.T.SID() { return nil } - return core.T.ECRestoreReq(ct, tsi) + return core.T.ECRestoreReq(ct, tsi, r.ID()) +} + +func (r *XactBckEncode) RecvEncodeMD(lom *core.LOM) { + r.beforeECObj() + + uname := lom.UnamePtr() + bname := cos.UnsafeBptr(uname) + if r.probFilter.Lookup(*bname) { + r.afterECObj(lom, nil) + return + } + + r.probFilter.Insert(*bname) + ECM.TryRecoverObj(lom, r.afterECObj) // free LOM inside } diff --git a/ec/getx.go b/ec/getx.go index fe2ec4bfd6..8a8893de7c 100644 --- a/ec/getx.go +++ b/ec/getx.go @@ -248,13 +248,16 @@ func (r *XactGet) stop() { // a nil value from channel but ecrunner keeps working - it reuploads all missing // slices or copies func (r *XactGet) decode(req *request, lom *core.LOM) { - debug.Assert(req.Action == ActRestore, "invalid action for restore: "+req.Action) + debug.Assert(req.Action == ActRestore, "invalid action: ", req.Action) r.stats.updateDecode() req.putTime = time.Now() - req.tm = time.Now() + req.tm = req.putTime if err := r.dispatchRequest(req, lom); err != nil { - nlog.Errorf("failed to restore %s: %v", lom, err) + if req.Callback != nil { + req.Callback(lom, err) + } + nlog.Errorln("failed to restore", lom.Cname(), "err:", err) freeReq(req) } } diff --git a/ec/manager.go b/ec/manager.go index 6134096b5f..ba2e73a2e8 100644 --- a/ec/manager.go +++ b/ec/manager.go @@ -288,7 +288,7 @@ func (mgr *Manager) CleanupObject(lom *core.LOM) { mgr.RestoreBckPutXact(lom.Bck()).cleanup(req, lom) } -func (mgr *Manager) RestoreObject(lom *core.LOM) error { +func (mgr *Manager) RestoreObject(lom *core.LOM, cb core.OnFinishObj) error { if !lom.ECEnabled() { return ErrorECDisabled } @@ -301,6 +301,7 @@ func (mgr *Manager) RestoreObject(lom *core.LOM) error { req := allocateReq(ActRestore, lom.LIF()) errCh := make(chan error) // unbuffered req.ErrCh = errCh + req.Callback = cb mgr.RestoreBckGetXact(lom.Bck()).decode(req, lom) // wait for EC completes restoring the object @@ -350,11 +351,13 @@ func (mgr *Manager) BMDChanged() error { } // TODO -- FIXME: joggers, etc. -func (mgr *Manager) TryRecoverObj(lom *core.LOM) { +func (mgr *Manager) TryRecoverObj(lom *core.LOM, cb core.OnFinishObj) { go func() { - if err := mgr.RestoreObject(lom); err != nil { + err := mgr.RestoreObject(lom, cb) + if err != nil { nlog.Errorln(core.T.String(), "failed to recover", lom.Cname(), "err:", err) } + cb(lom, err) core.FreeLOM(lom) }() }