Skip to content

Commit

Permalink
EC: restore missing or corrupted slices
Browse files Browse the repository at this point in the history
* fixes and usability; testing
* throttle 10ms
* docs: update storage_svcs.md
* part seven, prev. commit: 1f2f155

Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Oct 25, 2024
1 parent 315e8d1 commit be7185a
Show file tree
Hide file tree
Showing 6 changed files with 142 additions and 50 deletions.
4 changes: 2 additions & 2 deletions ais/tgtec.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,11 +125,11 @@ func (t *target) httpecpost(w http.ResponseWriter, r *http.Request) {
if err != nil {
t.writeErr(w, r, cmn.NewErrFailedTo(t, "EC-recover", cname, err))
}
} else {
} else if !xctn.Finished() {
xbenc, ok := xctn.(*ec.XactBckEncode)
debug.Assert(ok, xctn.String())

// async, via j.work
// async, via j.workCh
xbenc.RecvRecover(lom)
}
case apc.ActEcOpen:
Expand Down
53 changes: 34 additions & 19 deletions cmd/cli/cli/bencodeway_hdlr.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/NVIDIA/aistore/api"
"github.com/NVIDIA/aistore/cmn"
"github.com/NVIDIA/aistore/cmn/cos"
"github.com/urfave/cli"
)

Expand All @@ -19,8 +20,10 @@ const mirrorUsage = "configure (or unconfigure) bucket as n-way mirror, and run
indent1 + "(see also: 'ais start ec-encode')"

const bencodeUsage = "erasure code entire bucket, e.g.:\n" +
indent1 + "\t- 'ais start ec-encode ais://m -d 8 -p 2'\t- erasure-code ais://m for (D=8, P=2).\n" +
indent1 + "(see also: 'ais start mirror')"
indent1 + "\t- 'ais start ec-encode ais://nnn -d 8 -p 2'\t- erasure-code ais://nnn for 8 data and 2 parity slices;\n" +
indent1 + "\t- 'ais start ec-encode ais://nnn --data-slices 8 --parity-slices 2'\t- same as above;\n" +
indent1 + "\t- 'ais start ec-encode ais://nnn --recover'\t- check and make sure that every ais://nnn object is properly erasure-coded.\n" +
indent1 + "see also: 'ais start mirror'"

var (
storageSvcCmdsFlags = map[string][]cli.Flag{
Expand Down Expand Up @@ -115,8 +118,13 @@ func ecEncodeHandler(c *cli.Context) error {
if bprops, err = headBucket(bck, false /* don't add */); err != nil {
return err
}
numd = c.Int(fl1n(dataSlicesFlag.Name))
nump = c.Int(fl1n(paritySlicesFlag.Name))
numd = parseIntFlag(c, dataSlicesFlag)
nump = parseIntFlag(c, paritySlicesFlag)

if bprops.EC.Enabled {
numd = cos.NonZero(numd, bprops.EC.DataSlices)
nump = cos.NonZero(nump, bprops.EC.ParitySlices)
}

// compare with ECConf.Validate
if numd < cmn.MinSliceCount || numd > cmn.MaxSliceCount {
Expand All @@ -126,29 +134,31 @@ func ecEncodeHandler(c *cli.Context) error {
return fmt.Errorf("invalid number %d of parity slices (valid range: [%d, %d])", nump, cmn.MinSliceCount, cmn.MaxSliceCount)
}

checkAndRecover := flagIsSet(c, checkAndRecoverFlag)
if bprops.EC.Enabled {
if bprops.EC.DataSlices != numd || bprops.EC.ParitySlices != nump {
// not supported yet:
warn := fmt.Sprintf("%s is already erasure-coded, cannot change existing (D=%d, P=%d) configuration to (D=%d, P=%d)",
err := fmt.Errorf("%s is already (D=%d, P=%d) erasure-coded - cannot change this existing configuration to (D=%d, P=%d)",
bck.Cname(""), bprops.EC.DataSlices, bprops.EC.ParitySlices, numd, nump)
actionWarn(c, warn)
return nil
return err
}
var warn string
if bprops.EC.ObjSizeLimit == cmn.ObjSizeToAlwaysReplicate {
warn = fmt.Sprintf("%s is already configured for (P + 1 = %d copies)", bck.Cname(""), bprops.EC.ParitySlices+1)
} else {
warn = fmt.Sprintf("%s is already erasure-coded for (D=%d, P=%d)", bck.Cname(""), numd, nump)
if !checkAndRecover {
var warn string
if bprops.EC.ObjSizeLimit == cmn.ObjSizeToAlwaysReplicate {
warn = fmt.Sprintf("%s is already configured for (P + 1 = %d copies)", bck.Cname(""), bprops.EC.ParitySlices+1)
} else {
warn = fmt.Sprintf("%s is already erasure-coded for (D=%d, P=%d)", bck.Cname(""), numd, nump)
}
actionWarn(c, warn+" - proceeding to run anyway")
warned = true
}
actionWarn(c, warn+" - proceeding to run anyway")
warned = true
}

return ecEncode(c, bck, bprops, numd, nump, warned)
return ecEncode(c, bck, bprops, numd, nump, warned, checkAndRecover)
}

func ecEncode(c *cli.Context, bck cmn.Bck, bprops *cmn.Bprops, data, parity int, warned bool) error {
xid, err := api.ECEncodeBucket(apiBP, bck, data, parity, flagIsSet(c, checkAndRecoverFlag))
func ecEncode(c *cli.Context, bck cmn.Bck, bprops *cmn.Bprops, data, parity int, warned, checkAndRecover bool) error {
xid, err := api.ECEncodeBucket(apiBP, bck, data, parity, checkAndRecover)
if err != nil {
return err
}
Expand All @@ -161,9 +171,14 @@ func ecEncode(c *cli.Context, bck cmn.Bck, bprops *cmn.Bprops, data, parity int,
} else {
var msg string
if bprops.EC.ObjSizeLimit == cmn.ObjSizeToAlwaysReplicate {
msg = fmt.Sprintf("Erasure-coding %s for (P + 1 = %d copies). ", bck.Cname(""), bprops.EC.ParitySlices+1)
msg = fmt.Sprintf("Erasure-coding %s for (P + 1 = %d copies)", bck.Cname(""), bprops.EC.ParitySlices+1)
} else {
msg = fmt.Sprintf("Erasure-coding %s for (D=%d, P=%d)", bck.Cname(""), data, parity)
}
if checkAndRecover {
msg += ". Running in recovery mode. "
} else {
msg = fmt.Sprintf("Erasure-coding %s for (D=%d, P=%d). ", bck.Cname(""), data, parity)
msg += ". "
}
actionDone(c, msg+toMonitorMsg(c, xid, ""))
}
Expand Down
6 changes: 3 additions & 3 deletions cmd/cli/cli/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -592,12 +592,12 @@ var (

copiesFlag = cli.IntFlag{Name: "copies", Usage: "number of object replicas", Value: 1, Required: true}

dataSlicesFlag = cli.IntFlag{Name: "data-slices,data,d", Value: 2, Usage: "number of data slices", Required: true}
paritySlicesFlag = cli.IntFlag{Name: "parity-slices,parity,p", Value: 2, Usage: "number of parity slices", Required: true}
dataSlicesFlag = cli.IntFlag{Name: "data-slices,d", Value: 2, Usage: "number of data slices"}
paritySlicesFlag = cli.IntFlag{Name: "parity-slices,p", Value: 2, Usage: "number of parity slices"}

checkAndRecoverFlag = cli.BoolFlag{
Name: "recover",
Usage: "check and recover missing or corrupted EC metadata and/or slices, if any",
Usage: "check and make sure that each and every object is properly erasure coded",
}

compactPropFlag = cli.BoolFlag{Name: "compact,c", Usage: "display properties grouped in human-readable mode"}
Expand Down
98 changes: 83 additions & 15 deletions docs/storage_svcs.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,14 @@ redirect_from:
- [Example setting space properties](#example-setting-space-properties)
- [Example enabling LRU eviction for a given bucket](#example-enabling-lru-eviction-for-a-given-bucket)
- [Erasure coding](#erasure-coding)
- [Example setting bucket properties](#example-setting-bucket-properties)
- [Limitations](#limitations)
- [N-way mirror](#n-way-mirror)
- [Read load balancing](#read-load-balancing)
- [More examples](#more-examples)
- [Another n-way example](#another-n-way-example)
- [Data redundancy: summary of the available options (and considerations)](#data-redundancy-summary-of-the-available-options-and-considerations)
- [Erasure-coding: with and without recovery](#erasure-coding-with-and-without-recovery)
- [Example recovering lost or damaged slices and/or objects](#example-recovering-lost-or-damaged-slices-and-objects)

## Storage Services

Expand Down Expand Up @@ -134,29 +138,51 @@ Bucket props successfully updated.

AIStore provides data protection that comes in several flavors: [end-to-end checksumming](#checksumming), [n-way mirroring](#n-way-mirror), replication (for *small* objects), and erasure coding.

Erasure coding, or EC, is a well-known storage technique that protects user data by dividing it into N fragments or slices, computing K redundant (parity) slices, and then storing the resulting (N+K) slices on (N+K) storage servers - one slice per target server.
Erasure coding, or EC, is a well-known storage technique that protects user data by dividing it into D fragments or slices, computing P redundant (parity) slices, and then storing the resulting (D+P) slices on (D+P) storage servers - one slice per target server.

EC schemas are flexible and user-configurable: users can select the N and the K (above), thus ensuring that user data remains available even if the cluster loses **any** (emphasis on the **any**) of its K servers.
EC schemas are flexible and user-configurable: users can select the D and the P (above), thus ensuring that user data remains available even if the cluster loses **any** (emphasis on the **any**) of its P servers.

A bucket inherits EC settings from global configuration. But it can be overridden on a per bucket basis.

```console
$ ais start ec-encode --help
NAME:
ais start ec-encode - erasure code entire bucket, e.g.:
- 'ais start ec-encode ais://nnn -d 8 -p 2' - erasure-code ais://nnn for 8 data and 2 parity slices;
- 'ais start ec-encode ais://nnn --data-slices 8 --parity-slices 2' - same as above;
- 'ais start ec-encode ais://nnn --recover' - check and make sure that every ais://nnn object is properly erasure-coded.
see also: 'ais start mirror'

USAGE:
ais start ec-encode [command options] BUCKET

OPTIONS:
--data-slices value, -d value number of data slices (default: 2)
--parity-slices value, -p value number of parity slices (default: 2)
--non-verbose, --nv non-verbose (quiet) output, minimized reporting, fewer warnings
--recover check and make sure that each and every object is properly erasure coded
--help, -h show help
```

* `ec.enabled`: bool - enables or disabled data protection the bucket
* `ec.data_slices`: integer in the range [2, 100], representing the number of fragments the object is broken into
* `ec.parity_slices`: integer in the range [2, 32], representing the number of redundant fragments to provide protection from failures. The value defines the maximum number of storage targets a cluster can lose but it is still able to restore the original object
* `ec.objsize_limit`: integer indicating the minimum size of an object that is erasure encoded. Smaller objects are just replicated.
* `ec.compression`: string that contains rules for LZ4 compression used by EC when it sends its fragments and replicas over network. Value "never" disables compression. Other values enable compression: it can be "always" - use compression for all transfers, or list of compression options, like "ratio=1.5" that means "disable compression automatically when compression ratio drops below 1.5"

Choose the number data and parity slices depending on the required level of protection and the cluster configuration. The number of storage targets must be greater than the sum of the number of data and parity slices. If the cluster uses only replication (by setting `objsize_limit` to a very high value), the number of storage targets must exceed the number of parity slices.
Choose the number data and parity slices depending on the required level of protection and the cluster configuration.

Rebalance supports erasure-coded buckets.
The number of storage targets must be greater than the sum of the number of data and parity slices. If the cluster uses only replication (by setting `objsize_limit` to a very high value), the number of storage targets must exceed the number of parity slices.

Notes:
Global rebalance supports erasure-coded buckets.

- Every data and parity slice is stored on a separate storage target. To reconstruct a damaged object, AIStore requires at least `ec.data_slices` slices in total out of data and parity sets
- Small objects are replicated `ec.parity_slices` times to have the same level of data protection that big objects do
- Increasing the number of parity slices improves data protection level, but it may hit performance: doubling the number of slices approximately increases the time to encode the object by a factor of two
**Notes**:

Example of setting bucket properties:
> Every data and parity slice is stored on a separate storage target. To reconstruct a damaged object, AIStore requires at least `ec.data_slices` slices in total out of data and parity sets
> Small objects are replicated `ec.parity_slices` times to have the same level of data protection that big objects do
> Increasing the number of parity slices improves data protection level, but it may hit performance: doubling the number of slices approximately increases the time to encode the object by a factor of two
### Example setting bucket properties

```console
$ ais bucket props ais://<bucket-name> lru.lowwm=1 lru.highwm=90 ec.enabled=true ec.data_slices=4 ec.parity_slices=2
Expand Down Expand Up @@ -226,11 +252,11 @@ With respect to n-way mirrors, the usual pros-and-cons consideration boils down

Since object replicas are end-to-end protected by [checksums](#checksumming) all of them and any one in particular can be used interchangeably to satisfy a GET request thus providing for multiple possible choices of local filesystems and, ultimately, local drives. Given n > 1, AIS will utilize the least loaded drive(s).

### More examples
## Another n-way example
The following sequence creates a bucket named `abc`, PUTs an object into it and then converts it into a 3-way mirror:

```console
$ ais create abc
$ ais create ais://abc
$ ais put /tmp/obj1 ais://abc/obj1
$ ais start mirror --copies 3 ais://abc
```
Expand Down Expand Up @@ -262,14 +288,56 @@ This option won't protect from node failures but it will provide a fairly good p
Further, you could at some point in time decide to associate a given AIS bucket with a Cloud (backend) bucket, thus making sure that your data is stored in one of the AIS-supported Clouds: Amazon S3, Google Cloud Storage, Azure Blob Storage.

Finally, you could erasure code (EC) a given bucket for `D + P` redundancy, where `D` and `P` are, respectively, the numbers of data and parity slices. For example:
Finally, you could erasure code (EC) a given bucket for `D + P` redundancy, where `D` and `P` are, respectively, the numbers of data and parity slices:

```console
$ ais start ec-encode -d 6 -p 4 abc
$ ais start ec-encode --help
NAME:
ais start ec-encode - erasure code entire bucket, e.g.:
- 'ais start ec-encode ais://nnn -d 8 -p 2' - erasure-code ais://nnn for 8 data and 2 parity slices;
- 'ais start ec-encode ais://nnn --data-slices 8 --parity-slices 2' - same as above;
- 'ais start ec-encode ais://nnn --recover' - check and make sure that every ais://nnn object is properly erasure-coded.
see also: 'ais start mirror'

USAGE:
ais start ec-encode [command options] BUCKET

OPTIONS:
--data-slices value, -d value number of data slices (default: 2)
--parity-slices value, -p value number of parity slices (default: 2)
--non-verbose, --nv non-verbose (quiet) output, minimized reporting, fewer warnings
--recover check and make sure that each and every object is properly erasure coded
--help, -h show help
```

will erasure-code all objects in the `abc` bucket for the total of 10 slices stored on different AIS targets, plus 1 (one) full replica. In other words, this example requires at least `10 + 1 = 11` targets in the cluster.
## Erasure-coding: with and without recovery

Assuming, ais://abc is not erasure-coded (or its erasure-coding property is disabled):

```console
$ ais start ec-encode ais://abc -d 8 -p 2

## or, same:
##
$ ais start ec-encode ais://abc --data-slices 8 --parity-slices 2
```

This will erasure-code all objects in the `ais://abc` bucket for the total of 10 slices stored on different AIS targets, plus 1 (one) full replica. In other words, this example requires at least `10 + 1 = 11` targets in the cluster.

> Generally, `D + P` erasure coding requires that AIS cluster has `D + P + 1` targets, or more.
> In addition to Reed-Solomon encoded slices, we currently always store a full replica - the strategy that uses available capacity but pays back with read performance.
### Example recovering lost or damaged slices and objects

But what if there's an accident that involves corrupted or deleted data, lost disks, and/or entire nodes?

Well, erasure-coding supports a special _recovery_ mode to "check and make sure that each and every object is properly erasure coded."

```console
$ ais start ec-encode ais://abc --recover

## or same, assuming the bucket is (D=8, P=2) erasure-coded:
##
$ ais start ec-encode ais://abc --data-slices 8 --parity-slices 2
```
20 changes: 11 additions & 9 deletions ec/bencodex.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type (
probFilter *prob.Filter
rcvyJG map[string]*rcvyJogger
last atomic.Int64
done atomic.Bool
checkAndRecover bool
}
rcvyJogger struct {
Expand Down Expand Up @@ -142,6 +143,7 @@ func (r *XactBckEncode) Run(wg *sync.WaitGroup) {
r.Finish()
return
}
r.last.Store(mono.NanoTime())
// run recovery joggers
r.rcvyJG = make(map[string]*rcvyJogger, len(avail))
for _, mi := range avail {
Expand Down Expand Up @@ -171,7 +173,8 @@ func (r *XactBckEncode) Run(wg *sync.WaitGroup) {
}
if r.checkAndRecover {
// wait for in-flight and pending recovery
r.Quiesce(config.Timeout.MaxKeepalive.D(), r._quiesce)
r.Quiesce(time.Minute, r._quiesce)
r.done.Store(true)
}
r.wg.Wait() // wait for before/afterEncode

Expand All @@ -182,13 +185,10 @@ func (r *XactBckEncode) Run(wg *sync.WaitGroup) {
r.Finish()
}

func (r *XactBckEncode) _quiesce(elapsed time.Duration) core.QuiRes {
if mono.Since(r.last.Load()) > cmn.Rom.MaxKeepalive()-(cmn.Rom.MaxKeepalive()>>2) {
func (r *XactBckEncode) _quiesce(time.Duration) core.QuiRes {
if mono.Since(r.last.Load()) > cmn.Rom.MaxKeepalive() {
return core.QuiDone
}
if elapsed > time.Minute {
return core.QuiTimeout
}
return core.QuiInactiveCB
}

Expand Down Expand Up @@ -285,7 +285,9 @@ func (r *XactBckEncode) RecvRecover(lom *core.LOM) {
r.Abort(err)
return
}
j.workCh <- lom
if !r.done.Load() {
j.workCh <- lom
}
}

func (r *XactBckEncode) setLast(lom *core.LOM, err error) {
Expand Down Expand Up @@ -327,7 +329,7 @@ func (j *rcvyJogger) run() {
if err == nil && (n&throttleBatch == throttleBatch) {
pct, _, _ := _throttlePct()
if pct >= maxThreashold {
runtime.Gosched() // ditto
time.Sleep(fs.Throttle10ms)
}
}
}
Expand All @@ -337,7 +339,7 @@ func (j *rcvyJogger) run() {
}

func (j *rcvyJogger) String() string {
return fmt.Sprint("rcvy[ ", j.mi.String(), " ", j.parent.ID(), " ]")
return fmt.Sprintf("j-rcvy %s[%s/%s]", j.parent.ID(), j.mi, j.parent.Bck())
}

//
Expand Down
11 changes: 9 additions & 2 deletions ec/getjogger.go
Original file line number Diff line number Diff line change
Expand Up @@ -850,6 +850,9 @@ func (c *getJogger) requestMeta(ctx *restoreCtx) error {
ctx.nodes = make(map[string]*Metadata, len(nodes))
for _, node := range nodes {
wg.Add(1)
if node.InMaintOrDecomm() {
continue
}
go func(si *meta.Snode, c *getJogger, mtx *sync.Mutex, mdExists bool) {
ctx.requestMeta(si, c, mtx, mdExists)
wg.Done()
Expand All @@ -862,6 +865,9 @@ func (c *getJogger) requestMeta(ctx *restoreCtx) error {
if node.ID() == core.T.SID() {
continue
}
if node.InMaintOrDecomm() {
continue
}
wg.Add(1)
go func(si *meta.Snode, c *getJogger, mtx *sync.Mutex, mdExists bool) {
ctx.requestMeta(si, c, mtx, mdExists)
Expand Down Expand Up @@ -895,10 +901,11 @@ func (c *getJogger) requestMeta(ctx *restoreCtx) error {
func (ctx *restoreCtx) requestMeta(si *meta.Snode, c *getJogger, mtx *sync.Mutex, mdExists bool) {
md, err := RequestECMeta(ctx.lom.Bucket(), ctx.lom.ObjName, si, c.client)
if err != nil {
warn := fmt.Sprintf("%s: %s failed request-meta(%s) request: %v", core.T, ctx.lom.Cname(), si, err)
if mdExists {
nlog.Errorf("No EC meta %s from %s: %v", ctx.lom.Cname(), si, err)
nlog.Warningln(warn)
} else if cmn.Rom.FastV(4, cos.SmoduleEC) {
nlog.Infof("No EC meta %s from %s: %v", ctx.lom.Cname(), si, err)
nlog.Infoln(warn)
}
return
}
Expand Down

0 comments on commit be7185a

Please sign in to comment.