Skip to content

Commit

Permalink
EC: recover missing or corrupted data and/or metadata
Browse files Browse the repository at this point in the history
* route to specific ec-encoding xaction
* filter out duplicated requests (optimize)
* when recovering, try to wait longer for in-flight requests
* part three, prev. commit: b4666d3

Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Oct 22, 2024
1 parent a19c88d commit 8587ca8
Show file tree
Hide file tree
Showing 12 changed files with 98 additions and 39 deletions.
5 changes: 4 additions & 1 deletion ais/prxclu.go
Original file line number Diff line number Diff line change
Expand Up @@ -940,8 +940,11 @@ func (p *proxy) _syncFinal(ctx *smapModifier, clone *smapX) {
debug.Assert(clone._sgl != nil)

config, err := p.ensureConfigURLs()
if err != nil {
debug.Assert(nlog.Stopping(), err)
return
}
if config != nil /*updated*/ {
debug.AssertNoErr(err)
pairs = append(pairs, revsPair{config, aisMsg})
}

Expand Down
9 changes: 5 additions & 4 deletions ais/test/ec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2835,17 +2835,18 @@ func TestECBckEncodeRecover(t *testing.T) {

o := ecOptions{
minTargets: 4,
objCount: 40,
objCount: 128,
concurrency: 8,
pattern: "obj-%04d",
silent: testing.Short(),
}.init(t, proxyURL)
// Damage this number of objects for each case: object and slice

// Damage this number of objects for each test case
objToDamage := o.objCount / 8
initMountpaths(t, proxyURL)

tassert.Fatalf(t, o.objCount > 2*objToDamage, "The total number of objects must be twice as greater as the number of damaged ones")

tassert.Fatalf(t, o.objCount > 2*objToDamage,
"The total number of objects must be twice as greater as the number of damaged ones")
var (
mtx sync.Mutex
objSlicesOrig = make(map[string]map[string]ecSliceMD, o.objCount)
Expand Down
37 changes: 27 additions & 10 deletions ais/tgtec.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@ import (
"github.com/NVIDIA/aistore/api/apc"
"github.com/NVIDIA/aistore/cmn"
"github.com/NVIDIA/aistore/cmn/cos"
"github.com/NVIDIA/aistore/cmn/debug"
"github.com/NVIDIA/aistore/cmn/nlog"
"github.com/NVIDIA/aistore/core"
"github.com/NVIDIA/aistore/core/meta"
"github.com/NVIDIA/aistore/ec"
"github.com/NVIDIA/aistore/hk"
"github.com/NVIDIA/aistore/xact/xreg"
)

var errCloseStreams = errors.New("EC is currently active, cannot close streams")
Expand Down Expand Up @@ -88,29 +90,44 @@ func (t *target) httpecpost(w http.ResponseWriter, r *http.Request) {
query := r.URL.Query()
objName := query.Get(apc.QparamECObject)
if objName == "" {
err := fmt.Errorf("%s: invalid request to recover an object: name's empty", t)
err := fmt.Errorf("%s: invalid ec-recover request: object name's empty", t)
t.writeErr(w, r, err)
return
}
lom := core.AllocLOM(objName)

mbck, err := newBckFromQuname(query, true)
lom := core.AllocLOM(objName)
bck, err := newBckFromQuname(query, true)
if err != nil {
core.FreeLOM(lom)
err = fmt.Errorf("%s: %v", t, err) // FATAL/unlikely
err := fmt.Errorf("%s: %v", t, err) // (unlikely)
t.writeErr(w, r, err)
return
}

bck := mbck.Bucket()
if err := lom.InitBck(bck); err != nil {
if err := lom.InitBck(bck.Bucket()); err != nil {
core.FreeLOM(lom)
err = fmt.Errorf("%s: %v", t, err)
err := fmt.Errorf("%s: %v", t, err)
t.writeErr(w, r, err)
return
}
ec.ECM.TryRecoverObj(lom) // free LOM inside
return

// [TODO]
// this target's endpoint can also be used to recover individual objects
// and selected (multi)objects
// but right now we only run it as part of a bucket encoding xaction
// (see checkAndRecover)
uuid := query.Get(apc.QparamUUID)
xctn, errN := xreg.GetXact(uuid)
if errN != nil || xctn == nil {
err := fmt.Errorf("%s: failed to find %s[%s] to recover %s: %v",
t, apc.ActECEncode, uuid, lom.Cname(), errN)
t.writeErr(w, r, err)
return
}
xbenc, ok := xctn.(*ec.XactBckEncode)
debug.Assert(ok, xctn.String())

// do
xbenc.RecvEncodeMD(lom)
case apc.ActEcOpen:
hk.UnregIf(hkname, closeEc) // just in case, a no-op most of the time
ec.ECM.OpenStreams(false /*with refc*/)
Expand Down
3 changes: 2 additions & 1 deletion ais/tgtimpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,10 +371,11 @@ func (t *target) _promRemote(params *core.PromoteParams, lom *core.LOM, tsi *met
return size, err
}

func (t *target) ECRestoreReq(ct *core.CT, tsi *meta.Snode) error {
func (t *target) ECRestoreReq(ct *core.CT, tsi *meta.Snode, uuid string) error {
q := ct.Bck().NewQuery()
ct.Bck().AddUnameToQuery(q, apc.QparamBckTo)
q.Set(apc.QparamECObject, ct.ObjectName())
q.Set(apc.QparamUUID, uuid)
cargs := allocCargs()
{
cargs.si = tsi
Expand Down
2 changes: 1 addition & 1 deletion ais/tgtobj.go
Original file line number Diff line number Diff line change
Expand Up @@ -899,7 +899,7 @@ gfn:
}

// restore from existing EC slices, if possible
ecErr := ec.ECM.RestoreObject(goi.lom)
ecErr := ec.ECM.RestoreObject(goi.lom, nil /*on-finished callback*/)
if ecErr == nil {
ecErr = goi.lom.Load(true /*cache it*/, false /*locked*/) // TODO: optimize locking
if ecErr == nil {
Expand Down
2 changes: 1 addition & 1 deletion cmd/cli/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/NVIDIA/aistore/cmd/cli
go 1.23.2

require (
github.com/NVIDIA/aistore v1.3.26-0.20241021212206-e65b4c2efcf3
github.com/NVIDIA/aistore v1.3.26-0.20241021220713-b4666d34154a
github.com/fatih/color v1.17.0
github.com/json-iterator/go v1.1.12
github.com/onsi/ginkgo/v2 v2.20.2
Expand Down
4 changes: 2 additions & 2 deletions cmd/cli/go.sum
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
code.cloudfoundry.org/bytefmt v0.0.0-20190710193110-1eb035ffe2b6/go.mod h1:wN/zk7mhREp/oviagqUXY3EwuHhWyOvAdsn5Y4CzOrc=
github.com/BurntSushi/toml v1.4.0/go.mod h1:ukJfTF/6rtPPRCnwkur4qwRxa8vTRFBF0uk2lLoLwho=
github.com/NVIDIA/aistore v1.3.26-0.20241021212206-e65b4c2efcf3 h1:apzFZIz80x8HYgFG1PFnNZ1JUVAfiaHhnyhSJjiqriw=
github.com/NVIDIA/aistore v1.3.26-0.20241021212206-e65b4c2efcf3/go.mod h1:Q6J3YIeiL4A6oWga3qCJ8+XI1CUvdde7Gua/HfueGlQ=
github.com/NVIDIA/aistore v1.3.26-0.20241021220713-b4666d34154a h1:wqxdIeqExWJdkFbC23tum4rKOXBeuxAeAxNceJI7tSw=
github.com/NVIDIA/aistore v1.3.26-0.20241021220713-b4666d34154a/go.mod h1:Q6J3YIeiL4A6oWga3qCJ8+XI1CUvdde7Gua/HfueGlQ=
github.com/OneOfOne/xxhash v1.2.8 h1:31czK/TI9sNkxIKfaUfGlU47BAxQ0ztGgd9vPyqimf8=
github.com/OneOfOne/xxhash v1.2.8/go.mod h1:eZbhyaAYD41SGSSsnmcpxVoRiQ/MPUTjUdIIOT9Um7Q=
github.com/VividCortex/ewma v1.1.1/go.mod h1:2Tkkvm3sRDVXaiyucHiACn4cqf7DpdyLvmxzcbUokwA=
Expand Down
2 changes: 1 addition & 1 deletion core/mock/target_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,6 @@ func (*TargetMock) Health(*meta.Snode, time.Duration, url.Values) ([]byte, int,
return nil, 0, nil
}

func (*TargetMock) ECRestoreReq(*core.CT, *meta.Snode) error {
func (*TargetMock) ECRestoreReq(*core.CT, *meta.Snode, string) error {
return nil
}
2 changes: 1 addition & 1 deletion core/target.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ type (
Promote(params *PromoteParams) (ecode int, err error)
HeadObjT2T(lom *LOM, si *meta.Snode) bool

ECRestoreReq(ct *CT, si *meta.Snode) error
ECRestoreReq(ct *CT, si *meta.Snode, uuid string) error

BMDVersionFixup(r *http.Request, bck ...cmn.Bck)
}
Expand Down
53 changes: 42 additions & 11 deletions ec/bencodex.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,13 @@ import (
"fmt"
"os"
"sync"
"time"

"github.com/NVIDIA/aistore/api/apc"
"github.com/NVIDIA/aistore/cmn"
"github.com/NVIDIA/aistore/cmn/cos"
"github.com/NVIDIA/aistore/cmn/nlog"
"github.com/NVIDIA/aistore/cmn/prob"
"github.com/NVIDIA/aistore/core"
"github.com/NVIDIA/aistore/core/meta"
"github.com/NVIDIA/aistore/fs"
Expand All @@ -31,6 +34,7 @@ type (
bck *meta.Bck
wg *sync.WaitGroup // to wait for EC finishes all objects
smap *meta.Smap
probFilter *prob.Filter
checkAndRecover bool
}
)
Expand Down Expand Up @@ -80,7 +84,11 @@ func newXactBckEncode(bck *meta.Bck, uuid string, checkAndRecover bool) (r *Xact
bck: bck,
wg: &sync.WaitGroup{},
smap: core.T.Sowner().Get(),
checkAndRecover: checkAndRecover}
checkAndRecover: checkAndRecover,
}
if checkAndRecover {
r.probFilter = prob.NewDefaultFilter()
}
r.InitBase(uuid, apc.ActECEncode, bck)
return
}
Expand Down Expand Up @@ -113,7 +121,8 @@ func (r *XactBckEncode) Run(wg *sync.WaitGroup) {
}

opts.Bck.Copy(r.bck.Bucket())
jg := mpather.NewJoggerGroup(opts, cmn.GCO.Get(), nil)
config := cmn.GCO.Get()
jg := mpather.NewJoggerGroup(opts, config, nil)
jg.Run()

select {
Expand All @@ -125,6 +134,10 @@ func (r *XactBckEncode) Run(wg *sync.WaitGroup) {
r.AddErr(err)
}
}
if r.checkAndRecover {
// TODO -- FIXME: rm sleep; may still terminate prematurely vs RecvEncodeMD flow
time.Sleep(config.Timeout.MaxKeepalive.D() << 1)
}
r.wg.Wait() // Need to wait for all async actions to finish.

r.Finish()
Expand All @@ -136,9 +149,13 @@ func (r *XactBckEncode) afterECObj(lom *core.LOM, err error) {
if err == nil {
r.LomAdd(lom)
} else if err != errSkipped {
nlog.Errorf("failed to erasure-code %s: %v", lom.Cname(), err)
r.AddErr(err)
if r.checkAndRecover {
nlog.Errorln("failed to check-and-recover", lom.Cname(), "err:", err)
} else {
nlog.Errorln("failed to erasure-code", lom.Cname(), "err:", err)
}
}

r.wg.Done()
}

Expand All @@ -148,18 +165,18 @@ func (r *XactBckEncode) afterECObj(lom *core.LOM, err error) {
func (r *XactBckEncode) bckEncode(lom *core.LOM, _ []byte) error {
_, local, err := lom.HrwTarget(r.smap)
if err != nil {
nlog.Errorf("%s: %s", lom, err)
return nil
return err
}
// An object replica - skip EC.
if !local {
return nil
}
mdFQN, _, err := core.HrwFQN(lom.Bck().Bucket(), fs.ECMetaType, lom.ObjName)
if err != nil {
nlog.Warningln("metadata FQN generation failed", lom, ":", err)
return nil
nlog.Warningln("failed to generate md FQN for", lom.Cname(), "err:", err)
return err
}

md, err := LoadMetadata(mdFQN)
// If metafile exists, the object has been already encoded. But for
// replicated objects we have to fall through. Otherwise, bencode
Expand All @@ -168,9 +185,9 @@ func (r *XactBckEncode) bckEncode(lom *core.LOM, _ []byte) error {
return nil
}
if err != nil && !os.IsNotExist(err) {
nlog.Warningln("failed to stat ", mdFQN, ":", err, "Deleting...")
nlog.Warningln("failed to fstat", mdFQN, "err:", err)
if errDel := os.Remove(mdFQN); errDel != nil {
nlog.Warningln("failed to delete broken metafile ", mdFQN, ":", errDel)
nlog.Warningln("nested err: failed to delete broken metafile:", errDel)
return nil
}
}
Expand Down Expand Up @@ -209,5 +226,19 @@ func (r *XactBckEncode) bckEncodeMD(ct *core.CT, _ []byte) error {
if tsi.ID() == core.T.SID() {
return nil
}
return core.T.ECRestoreReq(ct, tsi)
return core.T.ECRestoreReq(ct, tsi, r.ID())
}

func (r *XactBckEncode) RecvEncodeMD(lom *core.LOM) {
r.beforeECObj()

uname := lom.UnamePtr()
bname := cos.UnsafeBptr(uname)
if r.probFilter.Lookup(*bname) {
r.afterECObj(lom, nil)
return
}

r.probFilter.Insert(*bname)
ECM.TryRecoverObj(lom, r.afterECObj) // free LOM inside
}
9 changes: 6 additions & 3 deletions ec/getx.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,13 +248,16 @@ func (r *XactGet) stop() {
// a nil value from channel but ecrunner keeps working - it reuploads all missing
// slices or copies
func (r *XactGet) decode(req *request, lom *core.LOM) {
debug.Assert(req.Action == ActRestore, "invalid action for restore: "+req.Action)
debug.Assert(req.Action == ActRestore, "invalid action: ", req.Action)
r.stats.updateDecode()
req.putTime = time.Now()
req.tm = time.Now()
req.tm = req.putTime

if err := r.dispatchRequest(req, lom); err != nil {
nlog.Errorf("failed to restore %s: %v", lom, err)
if req.Callback != nil {
req.Callback(lom, err)
}
nlog.Errorln("failed to restore", lom.Cname(), "err:", err)
freeReq(req)
}
}
Expand Down
9 changes: 6 additions & 3 deletions ec/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ func (mgr *Manager) CleanupObject(lom *core.LOM) {
mgr.RestoreBckPutXact(lom.Bck()).cleanup(req, lom)
}

func (mgr *Manager) RestoreObject(lom *core.LOM) error {
func (mgr *Manager) RestoreObject(lom *core.LOM, cb core.OnFinishObj) error {
if !lom.ECEnabled() {
return ErrorECDisabled
}
Expand All @@ -301,6 +301,7 @@ func (mgr *Manager) RestoreObject(lom *core.LOM) error {
req := allocateReq(ActRestore, lom.LIF())
errCh := make(chan error) // unbuffered
req.ErrCh = errCh
req.Callback = cb
mgr.RestoreBckGetXact(lom.Bck()).decode(req, lom)

// wait for EC completes restoring the object
Expand Down Expand Up @@ -350,11 +351,13 @@ func (mgr *Manager) BMDChanged() error {
}

// TODO -- FIXME: joggers, etc.
func (mgr *Manager) TryRecoverObj(lom *core.LOM) {
func (mgr *Manager) TryRecoverObj(lom *core.LOM, cb core.OnFinishObj) {
go func() {
if err := mgr.RestoreObject(lom); err != nil {
err := mgr.RestoreObject(lom, cb)
if err != nil {
nlog.Errorln(core.T.String(), "failed to recover", lom.Cname(), "err:", err)
}
cb(lom, err)
core.FreeLOM(lom)
}()
}

0 comments on commit 8587ca8

Please sign in to comment.