Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backup requests for replicated storage #7

Open
wants to merge 25 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 23 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions client/blb/backup_reads.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package blb

import (
"time"
)

// BackupReadBehavior lets clients decide when to enable or disable backup reads. The delay
// field determines how long before the backup request is sent.
type BackupReadBehavior struct {
// Enabled is a flag to enable the backup read feature.
Enabled bool

// Delay is the time duration between successive backup reads.
Delay time.Duration

// MaxNumBackups is the maximum number of requests sent in addition
// to the primary read.
MaxNumBackups int
}

type backupReadState struct {
BackupReadBehavior

// backupDelay is the delay before sending backup reads.
delayFunc func(time.Duration) <-chan time.Time
}

func makeBackupReadState(behavior BackupReadBehavior) backupReadState {
if behavior.Enabled {
if behavior.MaxNumBackups <= 0 {
// Give it a default value of 1.
behavior.MaxNumBackups = 1
}
} else {
// MaxNumBackups isn't used in the read path if this is disabled, clear this.
behavior.MaxNumBackups = 0
}
return backupReadState{
BackupReadBehavior: behavior,
delayFunc: time.After,
}
}
213 changes: 175 additions & 38 deletions client/blb/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ type Options struct {

// How the client decides whether to attempt client-side RS reconstruction.
ReconstructBehavior ReconstructBehavior

// How the client decides to send backup reads if the primary read is delayed.
BackupReadBehavior BackupReadBehavior
}

// Client exposes a simple interface to Blb users for requesting services and
Expand Down Expand Up @@ -115,6 +118,8 @@ type Client struct {

// Reconstruct behavior.
reconstructState reconstructState
// Backup read behavior.
backupReadState backupReadState

// Metrics we collect.
metricOpen prometheus.Observer
Expand Down Expand Up @@ -165,6 +170,7 @@ func newBaseClient(options *Options) *Client {
cluster: options.Cluster,
retrier: retrier,
reconstructState: makeReconstructState(options.ReconstructBehavior),
backupReadState: makeBackupReadState(options.BackupReadBehavior),
metricOpen: clientOpLatenciesSet.WithLabelValues("open", options.Instance),
metricCreate: clientOpLatenciesSet.WithLabelValues("create", options.Instance),
metricReadDurations: clientOpLatenciesSet.WithLabelValues("read", options.Instance),
Expand Down Expand Up @@ -888,7 +894,8 @@ func (cli *Client) writeOneTract(
sem.Acquire()
defer sem.Release()

*result = cli.tractservers.Write(ctx, host, tract.Tract, tract.Version, thisB, thisOffset)
reqID := core.GenRequestID()
*result = cli.tractservers.Write(ctx, host, reqID, tract.Tract, tract.Version, thisB, thisOffset)

log.V(1).Infof("write %s to %s: %s", tract.Tract, host, *result)
}
Expand All @@ -909,7 +916,8 @@ func (cli *Client) createOneTract(
sem.Acquire()
defer sem.Release()

*result = cli.tractservers.Create(ctx, host, tsid, tract.Tract, thisB, thisOffset)
reqID := core.GenRequestID()
*result = cli.tractservers.Create(ctx, host, reqID, tsid, tract.Tract, thisB, thisOffset)

log.V(1).Infof("create %s to %s: %s", tract.Tract, host, *result)
}
Expand Down Expand Up @@ -1103,6 +1111,111 @@ func (cli *Client) readOneTract(
}
}

// tractResultRepl carries a tract result and a reference to a local
// thisB.
type tractResultRepl struct {
tractResult
thisB []byte
}

func (cli *Client) readOneTractWithResult(
ctx context.Context,
host string,
reqID string,
tract *core.TractInfo,
thisB []byte,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's weird to pass in a slice without using it at all. I know we may have to pass in the slice later if we want to try removing the allocation/copy, but the code should be as clean as possible at this point in time, so we should pass just the length now, and change it back later if we need to.

thisOffset int64) tractResultRepl {
var badVersionHost string
// TODO(eric): fill in otherHosts when ts-ts cancellation is done.
thisBLoc, err := cli.tractservers.Read(ctx, host, nil, reqID, tract.Tract, tract.Version, len(thisB), thisOffset)
if err == core.ErrVersionMismatch {
// TODO(eric): fix this check when we redo error reporting.
badVersionHost = host
}
readDone()
if err != core.NoError && err != core.ErrEOF {
// TODO(eric): redo bad TS reporting mechanism.
return tractResultRepl{
tractResult: tractResult{0, 0, err, badVersionHost},
}
}
log.V(1).Infof("read %s from tractserver at address %s: %s", tract.Tract, host, err)

return tractResultRepl{
tractResult: tractResult{len(thisB), len(thisBLoc), err, badVersionHost},
thisB: thisBLoc,
}
}

// vars for test injection
var (
randOrder = getRandomOrder
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can just write randOrder = rand.Perm

backupRequestFunc = doParallelBackupReads
readDone = func() {} // call when read/readAt rpcs return.
backupPhaseDone = func() {} // call when the entire backup read phase is done.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are only used in tests, but it still feels better to put them in the Client rather than globals. Also maybe put "Hook" in the name so the semantics are more clear (that they're just hooks for injecting test synchronization).

)

func (cli *Client) readOneTractBackupReplicated(
ctx context.Context,
reqID string,
tract *core.TractInfo,
thisB []byte,
thisOffset int64,
order []int,
n int,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could pass in order[n] instead of both of them?

resultCh chan tractResultRepl) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

put a direction restriction on the chan type if you can

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tried to put a direction restriction where the compiler would allow.

err := core.ErrAllocHost // default error if none present
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is only used on one place below, you can just inline it

var badVersionHost string
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unused here?

host := tract.Hosts[order[n]]
if host == "" {
log.V(1).Infof("read %s from tsid %d: no host", tract.Tract, tract.TSIDs[n])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tract.Hosts and tract.TSIDs are parallel, right? so using n here is wrong, it should be order[n]?

resultCh <- tractResultRepl{
tractResult: tractResult{0, 0, err, badVersionHost},
}
return
}
delay := cli.backupReadState.BackupReadBehavior.Delay
select {
case <-ctx.Done():
log.V(1).Infof("backup read %s-%d to tractserver cancelled", tract.Tract, n)
resultCh <- tractResultRepl{
tractResult: tractResult{0, 0, core.ErrCanceled, badVersionHost},
}
case <-cli.backupReadState.delayFunc(time.Duration(n) * delay):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could pass in the delay value instead of n? or possibly even the timer channel itself? then you might not need delayFunc at all, since you're going to override the spawning logic.

Copy link
Collaborator Author

@ericox ericox Apr 16, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you thinking that we override spawning logic at the layer below by putting readOneTractWithResult calls in different goroutines? Or if we get rid of the bch channel in the test overrides, we could pass in a time.After(0) to each and control execution by outer channels as I wrote in the tests.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking the latter.. pass in 0 or time.After(0) for all the calls in your backupRequestFunc implementation and control things by the order that you spawn them.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Passing the delay channel seemed to work better for testing. Went with that over time.After(0).

resultCh <- cli.readOneTractWithResult(ctx, host, reqID, tract, thisB, thisOffset)
}
return
}

func doParallelBackupReads(
ctx context.Context,
cli *Client,
reqID string,
tract *core.TractInfo,
thisB []byte,
thisOffset int64,
nReaders int,
order []int) <-chan tractResultRepl {
resultCh := make(chan tractResultRepl)
for n := 0; n < nReaders; n++ {
go cli.readOneTractBackupReplicated(ctx, reqID, tract, thisB, thisOffset, order, n, resultCh)
}
return resultCh
}

func copyResult(thisB []byte, srcB []byte, result *tractResult, srcResult tractResult) {
copy(thisB, srcB)
// In case of short read, i.e., len(res.thisB) < len(thisB), we should
// pad thisB result with 0's. This doesn't need to be done
// explicitly if 'thisB' given to us starts to be empty.
// However, just in case it's not, we will ensure that by
// overwritting untouched bytes with 0's.
for i := len(srcB); i < len(thisB); i++ {
thisB[i] = 0
}
*result = srcResult
}

func (cli *Client) readOneTractReplicated(
ctx context.Context,
curAddr string,
Expand All @@ -1111,46 +1224,69 @@ func (cli *Client) readOneTractReplicated(
thisB []byte,
thisOffset int64) {

// TODO(eric): remove this var when we redo bad ts version reporting
var badVersionHost string
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unused?


order := rand.Perm(len(tract.Hosts))
// This counter tracks how many backups have been issued and is used to mark which hosts
// to start sending reads to during the sequential read fallback phase.
var nStart int
reqID := core.GenRequestID()
order := randOrder(len(tract.Hosts))

ctx, cancel := context.WithCancel(ctx)
defer cancel()

err := core.ErrAllocHost // default error if none present
for _, n := range order {
if cli.backupReadState.BackupReadBehavior.Enabled {
// Backup request logic:
// A single "level" of n reader goroutines will be spawned at the same time.
// Each reader does {sleep 0ms; read from host[order[0]]}, {sleep 2ms; read from host[order[1]},
// {sleep 4ms; read from host[order[2]} and so on. Each reader is guaranteed
// to write a result to the resultCh channel, no matter if the reader has been
// cancelled or not. We then consume the same number of n messages since one message
// is produced per reader, scanning until we find a successful read. If we find a
// successful read, we copy the resulting bytes returned from the read operation into
// thisB passed in to this function. If all of the backup reads fail, we fallback to the
// a sequential phase starting at the nth host.
maxNumBackups := cli.backupReadState.BackupReadBehavior.MaxNumBackups
nReaders := min(maxNumBackups+1, len(tract.Hosts))
nStart = nReaders - 1

// This function should not block.
resultCh := backupRequestFunc(ctx, cli, reqID, tract, thisB, thisOffset, nReaders, order)

// Copy the first successful read data into thisB and continue to unblock resultCh.
// Return early only if we have a valid read.
done := false
for n := 0; n < nReaders; n++ {
res := <-resultCh
if res.tractResult.err == core.NoError && !done {
cancel()
copyResult(thisB, res.thisB, result, res.tractResult)
done = true
}
}
backupPhaseDone()
if done {
return
}
}

// Sequentially read from tract server replicas. This phase will run if backups are disabled from
// config state, or all of the backup reads issued return errors.
for _, n := range order[nStart:] {
host := tract.Hosts[n]
dnr marked this conversation as resolved.
Show resolved Hide resolved
if host == "" {
log.V(1).Infof("read %s from tsid %d: no host", tract.Tract, tract.TSIDs[n])
continue
}

var read int
read, err = cli.tractservers.ReadInto(ctx, host, tract.Tract, tract.Version, thisB, thisOffset)
if err == core.ErrVersionMismatch {
badVersionHost = host
}
if err != core.NoError && err != core.ErrEOF {
log.V(1).Infof("read %s from tractserver at address %s: %s", tract.Tract, host, err)
// If we failed to read from a TS, report that to the curator. Defer so we can examine
// result.err at the end, to see if we succeeded on another or not.
defer func(host string, err core.Error) {
couldRecover := result.err == core.NoError || result.err == core.ErrEOF
go cli.curators.ReportBadTS(context.Background(), curAddr, tract.Tract, host, "read", err, couldRecover)
}(host, err)
continue // try another host
}
log.V(1).Infof("read %s from tractserver at address %s: %s", tract.Tract, host, err)
// In case of short read, i.e., read < len(thisB), we should
// pad the result with 0's. This doesn't need to be done
// explicitly if 'thisB' given to us starts to be empty.
// However, just in case it's not, we will ensure that by
// overwritting untouched bytes with 0's.
for i := read; i < len(thisB); i++ {
thisB[i] = 0
res := cli.readOneTractWithResult(ctx, host, reqID, tract, thisB, thisOffset)
if res.err != core.NoError && res.err != core.ErrEOF {
continue
}
*result = tractResult{len(thisB), read, err, badVersionHost}
copyResult(thisB, res.thisB, result, res.tractResult)
return
}

log.V(1).Infof("read %s all hosts failed", tract.Tract)
*result = tractResult{0, 0, err, badVersionHost}
}
Expand All @@ -1166,16 +1302,12 @@ func (cli *Client) readOneTractRS(
rsTract := tract.RS.Chunk.ToTractID()
length := min(len(thisB), int(tract.RS.Length))
offset := int64(tract.RS.Offset) + thisOffset
read, err := cli.tractservers.ReadInto(ctx, tract.RS.Host, rsTract, core.RSChunkVersion, thisB[:length], offset)
reqID := core.GenRequestID()
read, err := cli.tractservers.ReadInto(ctx, tract.RS.Host, nil, reqID, rsTract, core.RSChunkVersion, thisB[:length], offset)

if err != core.NoError && err != core.ErrEOF {
log.V(1).Infof("rs read %s from tractserver at address %s: %s", tract.Tract, tract.RS.Host, err)
// If we failed to read from a TS, report that to the curator. Defer so
// we can examine result.err at the end, to see if we recovered or not.
defer func(err core.Error) {
couldRecover := result.err == core.NoError || result.err == core.ErrEOF
go cli.curators.ReportBadTS(context.Background(), curAddr, rsTract, tract.RS.Host, "read", err, couldRecover)
}(err)
// TODO(eric): redo bad TS reporting mechanism.
if !cli.shouldReconstruct(tract) {
*result = tractResult{len(thisB), 0, err, ""}
return
Expand Down Expand Up @@ -1228,7 +1360,7 @@ func (cli *Client) statTract(ctx context.Context, tract core.TractInfo) (int64,
return int64(tract.RS.Length), core.NoError
}

order := rand.Perm(len(tract.Hosts))
order := randOrder(len(tract.Hosts))

err := core.ErrAllocHost // default error if none present
for _, n := range order {
Expand Down Expand Up @@ -1326,6 +1458,11 @@ func (cli *Client) getTracts(ctx context.Context, addr string, id core.BlobID, s
return
}

// getRandomOrder returns a slice of random ints on [0, n).
func getRandomOrder(n int) []int {
return rand.Perm(n)
}

type blobIterator struct {
cli *Client
thisPartition core.PartitionID
Expand Down
Loading