Skip to content

Commit

Permalink
EC: an option to check and recover missing/corrupted
Browse files Browse the repository at this point in the history
* add an option to check and recover missing or corrupted metadata and/or slices,
  if any
* part two, prev. commit: d2feb38

Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Oct 21, 2024
1 parent 4761060 commit b4666d3
Show file tree
Hide file tree
Showing 14 changed files with 58 additions and 47 deletions.
5 changes: 4 additions & 1 deletion ais/test/ec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2915,8 +2915,11 @@ func TestECBckEncodeRecover(t *testing.T) {
}
touched = damaged
}
xid, err := api.ECEncodeBucket(tools.BaseAPIParams(proxyURL), bck, test.data, test.parity, true)

bp := tools.BaseAPIParams(tools.RandomProxyURL())
xid, err := api.ECEncodeBucket(bp, bck, test.data, test.parity, true /*recover missing ...*/)
tassert.CheckFatal(t, err)

// First, wait for EC-encode xaction to complete
reqArgs := xact.ArgsMsg{ID: xid, Kind: apc.ActECEncode, Bck: bck}
api.WaitForXactionIdle(tools.BaseAPIParams(proxyURL), &reqArgs)
Expand Down
31 changes: 14 additions & 17 deletions ais/tgtec.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,33 +86,30 @@ func (t *target) httpecpost(w http.ResponseWriter, r *http.Request) {
switch action {
case apc.ActEcRecover:
query := r.URL.Query()
if isRecover := cos.IsParseBool(query.Get(apc.QparamECRecover)); !isRecover {
nlog.Errorf("EC: invalid request to recover an object. '%s' is not set", apc.QparamECRecover)
objName := query.Get(apc.QparamECObject)
if objName == "" {
err := fmt.Errorf("%s: invalid request to recover an object: name's empty", t)
t.writeErr(w, r, err)
return
}
objPath := query.Get(apc.QparamECObject)
if objPath == "" {
nlog.Errorf("EC: invalid request to recover an object. Object name(%s) is undefined", apc.QparamECRecover)
return
}
objName := objPath
lom := core.AllocLOM(objName)
mbck, err := newBckFromQuname(r.URL.Query(), true)

mbck, err := newBckFromQuname(query, true)
if err != nil {
nlog.Errorln("Failed to recover object:", err)
core.FreeLOM(lom)
err = fmt.Errorf("%s: %v", t, err) // FATAL/unlikely
t.writeErr(w, r, err)
return
}
bck := mbck.Clone()
if err := lom.InitBck(&bck); err != nil {
nlog.Errorln("LOM init failed:", err)

bck := mbck.Bucket()
if err := lom.InitBck(bck); err != nil {
core.FreeLOM(lom)
err = fmt.Errorf("%s: %v", t, err)
t.writeErr(w, r, err)
return
}
if err := ec.ECM.TryRecoverObj(lom); err != nil {
nlog.Errorln("Failed to recover object:", err)
core.FreeLOM(lom)
}
ec.ECM.TryRecoverObj(lom) // free LOM inside
return
case apc.ActEcOpen:
hk.UnregIf(hkname, closeEc) // just in case, a no-op most of the time
Expand Down
1 change: 0 additions & 1 deletion ais/tgtimpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,6 @@ func (t *target) _promRemote(params *core.PromoteParams, lom *core.LOM, tsi *met
func (t *target) ECRestoreReq(ct *core.CT, tsi *meta.Snode) error {
q := ct.Bck().NewQuery()
ct.Bck().AddUnameToQuery(q, apc.QparamBckTo)
q.Set(apc.QparamECRecover, "true")
q.Set(apc.QparamECObject, ct.ObjectName())
cargs := allocCargs()
{
Expand Down
7 changes: 5 additions & 2 deletions ais/tgttxn.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,9 @@ func (t *target) setBprops(c *txnSrv) (string, error) {
if _, reec := _reEC(bprops, nprops, c.bck, nil /*smap*/); reec {
flt := xreg.Flt{Kind: apc.ActECEncode, Bck: c.bck}
xreg.DoAbort(flt, errors.New("re-ec"))
rns := xreg.RenewECEncode(c.bck, c.uuid, apc.ActCommit, false)

// checkAndRecover always false (compare w/ ecEncode below)
rns := xreg.RenewECEncode(c.bck, c.uuid, apc.ActCommit, false /*check & recover missing/corrupted*/)
if rns.Err != nil {
return "", rns.Err
}
Expand Down Expand Up @@ -758,7 +760,8 @@ func (t *target) ecEncode(c *txnSrv) (string, error) {
if err = t.transactions.wait(txn, c.timeout.netw, c.timeout.host); err != nil {
return "", cmn.NewErrFailedTo(t, "commit", txn, err)
}
rns := xreg.RenewECEncode(c.bck, c.uuid, apc.ActCommit, c.msg.Name == apc.ActEcRecover)
checkAndRecover := c.msg.Name == apc.ActEcRecover
rns := xreg.RenewECEncode(c.bck, c.uuid, apc.ActCommit, checkAndRecover /*missing/corrupted slices, etc.*/)
if rns.Err != nil {
nlog.Errorf("%s: %s %v", t, txn, rns.Err)
return "", rns.Err
Expand Down
2 changes: 1 addition & 1 deletion api/apc/actmsg.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ const (
const (
ActEcOpen = "open-ec-streams"
ActEcClose = "close-ec-streams"
ActEcRecover = "obj-recover"
ActEcRecover = "recover" // check and recover missing or corrupted EC metadata and/or slices, if any
)

// ActMsg is a JSON-formatted control structures used in a majority of API calls
Expand Down
3 changes: 1 addition & 2 deletions api/apc/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,7 @@ const (
QparamMpathLabel = "mountpath_label"

// Request to restore an object
QparamECRecover = "recover"
QparamECObject = "object"
QparamECObject = "object"
)

// QparamFltPresence enum.
Expand Down
4 changes: 2 additions & 2 deletions api/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ func MakeNCopies(bp BaseParams, bck cmn.Bck, copies int) (xid string, err error)
// Erasure-code entire `bck` bucket at a given `data`:`parity` redundancy.
// The operation requires at least (`data + `parity` + 1) storage targets in the cluster.
// Returns xaction ID if successful, an error otherwise.
func ECEncodeBucket(bp BaseParams, bck cmn.Bck, data, parity int, doRecover ...bool) (xid string, err error) {
func ECEncodeBucket(bp BaseParams, bck cmn.Bck, data, parity int, checkAndRecover bool) (xid string, err error) {
bp.Method = http.MethodPost
// Without `string` conversion it makes base64 from []byte in `Body`.
ecConf := string(cos.MustMarshal(&cmn.ECConfToSet{
Expand All @@ -280,7 +280,7 @@ func ECEncodeBucket(bp BaseParams, bck cmn.Bck, data, parity int, doRecover ...b
reqParams.BaseParams = bp
reqParams.Path = apc.URLPathBuckets.Join(bck.Name)
msg := apc.ActMsg{Action: apc.ActECEncode, Value: ecConf}
if len(doRecover) > 0 && doRecover[0] {
if checkAndRecover {
msg.Name = apc.ActEcRecover
}
reqParams.Body = cos.MustMarshal(msg)
Expand Down
3 changes: 2 additions & 1 deletion cmd/cli/cli/bencodeway_hdlr.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ var (
dataSlicesFlag,
paritySlicesFlag,
nonverboseFlag,
checkAndRecoverFlag,
},
}

Expand Down Expand Up @@ -147,7 +148,7 @@ func ecEncodeHandler(c *cli.Context) error {
}

func ecEncode(c *cli.Context, bck cmn.Bck, bprops *cmn.Bprops, data, parity int, warned bool) error {
xid, err := api.ECEncodeBucket(apiBP, bck, data, parity)
xid, err := api.ECEncodeBucket(apiBP, bck, data, parity, flagIsSet(c, checkAndRecoverFlag))
if err != nil {
return err
}
Expand Down
5 changes: 5 additions & 0 deletions cmd/cli/cli/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -595,6 +595,11 @@ var (
dataSlicesFlag = cli.IntFlag{Name: "data-slices,data,d", Value: 2, Usage: "number of data slices", Required: true}
paritySlicesFlag = cli.IntFlag{Name: "parity-slices,parity,p", Value: 2, Usage: "number of parity slices", Required: true}

checkAndRecoverFlag = cli.BoolFlag{
Name: "recover",
Usage: "check and recover missing or corrupted EC metadata and/or slices, if any",
}

compactPropFlag = cli.BoolFlag{Name: "compact,c", Usage: "display properties grouped in human-readable mode"}

nameOnlyFlag = cli.BoolFlag{
Expand Down
2 changes: 1 addition & 1 deletion cmd/cli/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/NVIDIA/aistore/cmd/cli
go 1.23.2

require (
github.com/NVIDIA/aistore v1.3.26-0.20241018191536-5a7b43c14253
github.com/NVIDIA/aistore v1.3.26-0.20241021212206-e65b4c2efcf3
github.com/fatih/color v1.17.0
github.com/json-iterator/go v1.1.12
github.com/onsi/ginkgo/v2 v2.20.2
Expand Down
4 changes: 2 additions & 2 deletions cmd/cli/go.sum
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
code.cloudfoundry.org/bytefmt v0.0.0-20190710193110-1eb035ffe2b6/go.mod h1:wN/zk7mhREp/oviagqUXY3EwuHhWyOvAdsn5Y4CzOrc=
github.com/BurntSushi/toml v1.4.0/go.mod h1:ukJfTF/6rtPPRCnwkur4qwRxa8vTRFBF0uk2lLoLwho=
github.com/NVIDIA/aistore v1.3.26-0.20241018191536-5a7b43c14253 h1:4tyDXe2OssOS6Gi9dM6dX3n7Y6jHU2YQ+bguw/C05xM=
github.com/NVIDIA/aistore v1.3.26-0.20241018191536-5a7b43c14253/go.mod h1:Q6J3YIeiL4A6oWga3qCJ8+XI1CUvdde7Gua/HfueGlQ=
github.com/NVIDIA/aistore v1.3.26-0.20241021212206-e65b4c2efcf3 h1:apzFZIz80x8HYgFG1PFnNZ1JUVAfiaHhnyhSJjiqriw=
github.com/NVIDIA/aistore v1.3.26-0.20241021212206-e65b4c2efcf3/go.mod h1:Q6J3YIeiL4A6oWga3qCJ8+XI1CUvdde7Gua/HfueGlQ=
github.com/OneOfOne/xxhash v1.2.8 h1:31czK/TI9sNkxIKfaUfGlU47BAxQ0ztGgd9vPyqimf8=
github.com/OneOfOne/xxhash v1.2.8/go.mod h1:eZbhyaAYD41SGSSsnmcpxVoRiQ/MPUTjUdIIOT9Um7Q=
github.com/VividCortex/ewma v1.1.1/go.mod h1:2Tkkvm3sRDVXaiyucHiACn4cqf7DpdyLvmxzcbUokwA=
Expand Down
25 changes: 15 additions & 10 deletions ec/bencodex.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ type (
}
XactBckEncode struct {
xact.Base
bck *meta.Bck
wg *sync.WaitGroup // to wait for EC finishes all objects
smap *meta.Smap
doRecover bool
bck *meta.Bck
wg *sync.WaitGroup // to wait for EC finishes all objects
smap *meta.Smap
checkAndRecover bool
}
)

Expand Down Expand Up @@ -75,8 +75,12 @@ func (p *encFactory) WhenPrevIsRunning(prevEntry xreg.Renewable) (wpr xreg.WPR,
// XactBckEncode //
///////////////////

func newXactBckEncode(bck *meta.Bck, uuid string, doRecover bool) (r *XactBckEncode) {
r = &XactBckEncode{bck: bck, wg: &sync.WaitGroup{}, smap: core.T.Sowner().Get(), doRecover: doRecover}
func newXactBckEncode(bck *meta.Bck, uuid string, checkAndRecover bool) (r *XactBckEncode) {
r = &XactBckEncode{
bck: bck,
wg: &sync.WaitGroup{},
smap: core.T.Sowner().Get(),
checkAndRecover: checkAndRecover}
r.InitBase(uuid, apc.ActECEncode, bck)
return
}
Expand All @@ -98,15 +102,16 @@ func (r *XactBckEncode) Run(wg *sync.WaitGroup) {
ECM.incActive(r)

ctList := []string{fs.ObjectType}
if r.doRecover {
ctList = []string{fs.ObjectType, fs.ECMetaType, fs.ECSliceType}
}
opts := &mpather.JgroupOpts{
CTs: ctList,
VisitObj: r.bckEncode,
VisitCT: r.bckEncodeMD,
DoLoad: mpather.LoadUnsafe,
}
if r.checkAndRecover {
opts.CTs = []string{fs.ObjectType, fs.ECMetaType, fs.ECSliceType}
opts.VisitCT = r.bckEncodeMD
}

opts.Bck.Copy(r.bck.Bucket())
jg := mpather.NewJoggerGroup(opts, cmn.GCO.Get(), nil)
jg.Run()
Expand Down
8 changes: 3 additions & 5 deletions ec/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,14 +349,12 @@ func (mgr *Manager) BMDChanged() error {
return nil
}

func (mgr *Manager) TryRecoverObj(lom *core.LOM) error {
// RestoreObject is blocking: it waits for the object is recovered
// TODO -- FIXME: joggers, etc.
func (mgr *Manager) TryRecoverObj(lom *core.LOM) {
go func() {
if err := mgr.RestoreObject(lom); err != nil {
nlog.Errorf("Failed to recover an object '%s': %v", lom, err)
nlog.Errorln(core.T.String(), "failed to recover", lom.Cname(), "err:", err)
}
core.FreeLOM(lom)
}()

return nil
}
5 changes: 3 additions & 2 deletions xact/xreg/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,9 @@ func RenewBucketXact(kind string, bck *meta.Bck, args Args, buckets ...*meta.Bck
return dreg.renew(e, bck, buckets...)
}

func RenewECEncode(bck *meta.Bck, uuid, phase string, doRecover bool) RenewRes {
return RenewBucketXact(apc.ActECEncode, bck, Args{Custom: &ECEncodeArgs{Phase: phase, Recover: doRecover}, UUID: uuid})
func RenewECEncode(bck *meta.Bck, uuid, phase string, checkAndRecover bool) RenewRes {
args := Args{Custom: &ECEncodeArgs{Phase: phase, Recover: checkAndRecover}, UUID: uuid}
return RenewBucketXact(apc.ActECEncode, bck, args)
}

func RenewMakeNCopies(uuid, tag string) {
Expand Down

0 comments on commit b4666d3

Please sign in to comment.