Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backup requests for replicated storage #7

Open
wants to merge 25 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
190 changes: 115 additions & 75 deletions client/blb/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1118,6 +1118,104 @@ type tractResultRepl struct {
thisB []byte
}

func (cli *Client) readOneTractWithResult(
ctx context.Context,
host string,
reqID string,
tract *core.TractInfo,
thisB []byte,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's weird to pass in a slice without using it at all. I know we may have to pass in the slice later if we want to try removing the allocation/copy, but the code should be as clean as possible at this point in time, so we should pass just the length now, and change it back later if we need to.

thisOffset int64) tractResultRepl {
var badVersionHost string
// TODO(eric): fill in otherHosts when ts-ts cancellation is done.
thisBLoc, err := cli.tractservers.Read(ctx, host, nil, reqID, tract.Tract, tract.Version, len(thisB), thisOffset)
if err == core.ErrVersionMismatch {
// TODO(eric): fix this check when we redo error reporting.
badVersionHost = host
}
readDone()
if err != core.NoError && err != core.ErrEOF {
// TODO(eric): redo bad TS reporting mechanism.
return tractResultRepl{
tractResult: tractResult{0, 0, err, badVersionHost},
}
}
log.V(1).Infof("read %s from tractserver at address %s: %s", tract.Tract, host, err)

return tractResultRepl{
tractResult: tractResult{len(thisB), len(thisBLoc), err, badVersionHost},
thisB: thisBLoc,
}
}

// vars for test injection
var (
randOrder = getRandomOrder
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can just write randOrder = rand.Perm

backupRequestFunc = doParallelBackupReads
readDone = func() {} // call when read/readAt rpcs return.
backupPhaseDone = func() {} // call when the entire backup read phase is done.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are only used in tests, but it still feels better to put them in the Client rather than globals. Also maybe put "Hook" in the name so the semantics are more clear (that they're just hooks for injecting test synchronization).

)

func (cli *Client) readOneTractBackupReplicated(
ctx context.Context,
reqID string,
tract *core.TractInfo,
thisB []byte,
thisOffset int64,
order []int,
n int,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could pass in order[n] instead of both of them?

resultCh chan tractResultRepl) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

put a direction restriction on the chan type if you can

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tried to put a direction restriction where the compiler would allow.

err := core.ErrAllocHost // default error if none present
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is only used on one place below, you can just inline it

var badVersionHost string
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unused here?

host := tract.Hosts[order[n]]
if host == "" {
log.V(1).Infof("read %s from tsid %d: no host", tract.Tract, tract.TSIDs[n])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tract.Hosts and tract.TSIDs are parallel, right? so using n here is wrong, it should be order[n]?

resultCh <- tractResultRepl{
tractResult: tractResult{0, 0, err, badVersionHost},
}
return
}
delay := cli.backupReadState.BackupReadBehavior.Delay
select {
case <-ctx.Done():
log.V(1).Infof("backup read %s-%d to tractserver cancelled", tract.Tract, n)
resultCh <- tractResultRepl{
tractResult: tractResult{0, 0, core.ErrCanceled, badVersionHost},
}
case <-cli.backupReadState.delayFunc(time.Duration(n) * delay):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could pass in the delay value instead of n? or possibly even the timer channel itself? then you might not need delayFunc at all, since you're going to override the spawning logic.

Copy link
Collaborator Author

@ericox ericox Apr 16, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you thinking that we override spawning logic at the layer below by putting readOneTractWithResult calls in different goroutines? Or if we get rid of the bch channel in the test overrides, we could pass in a time.After(0) to each and control execution by outer channels as I wrote in the tests.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking the latter.. pass in 0 or time.After(0) for all the calls in your backupRequestFunc implementation and control things by the order that you spawn them.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Passing the delay channel seemed to work better for testing. Went with that over time.After(0).

resultCh <- cli.readOneTractWithResult(ctx, host, reqID, tract, thisB, thisOffset)
}
return
}

func doParallelBackupReads(
ctx context.Context,
cli *Client,
reqID string,
tract *core.TractInfo,
thisB []byte,
thisOffset int64,
nReaders int,
order []int) <-chan tractResultRepl {
resultCh := make(chan tractResultRepl)
for n := 0; n < nReaders; n++ {
go cli.readOneTractBackupReplicated(ctx, reqID, tract, thisB, thisOffset, order, n, resultCh)
}
return resultCh
}

func copyResult(thisB []byte, srcB []byte, result *tractResult, srcResult tractResult) {
copy(thisB, srcB)
// In case of short read, i.e., len(res.thisB) < len(thisB), we should
// pad thisB result with 0's. This doesn't need to be done
// explicitly if 'thisB' given to us starts to be empty.
// However, just in case it's not, we will ensure that by
// overwritting untouched bytes with 0's.
for i := len(srcB); i < len(thisB); i++ {
thisB[i] = 0
}
*result = srcResult
}

func (cli *Client) readOneTractReplicated(
ctx context.Context,
curAddr string,
Expand All @@ -1133,33 +1231,12 @@ func (cli *Client) readOneTractReplicated(
// to start sending reads to during the sequential read fallback phase.
var nStart int
reqID := core.GenRequestID()
order := rand.Perm(len(tract.Hosts))
order := randOrder(len(tract.Hosts))

ctx, cancel := context.WithCancel(ctx)
defer cancel()

err := core.ErrAllocHost // default error if none present
readFunc := func(host string, n int) tractResultRepl {
// TODO(eric): fill in otherHosts when ts-ts cancellation is done.
thisBLoc, err := cli.tractservers.Read(ctx, host, nil, reqID, tract.Tract, tract.Version, len(thisB), thisOffset)
if err == core.ErrVersionMismatch {
// TODO(eric): fix this check when we redo error reporting.
badVersionHost = host
}
if err != core.NoError && err != core.ErrEOF {
// TODO(eric): redo bad TS reporting mechanism.
return tractResultRepl{
tractResult: tractResult{0, 0, err, badVersionHost},
}
}
log.V(1).Infof("read %s from tractserver at address %s: %s", tract.Tract, host, err)

return tractResultRepl{
tractResult: tractResult{len(thisB), len(thisBLoc), err, badVersionHost},
thisB: thisBLoc,
}
}

if cli.backupReadState.BackupReadBehavior.Enabled {
// Backup request logic:
// A single "level" of n reader goroutines will be spawned at the same time.
Expand All @@ -1171,58 +1248,25 @@ func (cli *Client) readOneTractReplicated(
// successful read, we copy the resulting bytes returned from the read operation into
// thisB passed in to this function. If all of the backup reads fail, we fallback to the
// a sequential phase starting at the nth host.
resultCh := make(chan tractResultRepl)
delay := cli.backupReadState.BackupReadBehavior.Delay

readFuncBackup := func(n int) {
host := tract.Hosts[n]
if host == "" {
log.V(1).Infof("read %s from tsid %d: no host", tract.Tract, tract.TSIDs[n])
resultCh <- tractResultRepl{
tractResult: tractResult{0, 0, err, badVersionHost},
}
return
}
select {
case <-cli.backupReadState.delayFunc(time.Duration(n) * delay):
resultCh <- readFunc(host, n)
case <-ctx.Done():
log.V(1).Infof("backup read %s-%d to tractserver cancelled", tract.Tract, n)
resultCh <- tractResultRepl{
tractResult: tractResult{0, 0, core.ErrCanceled, badVersionHost},
}
}

}

maxNumBackups := cli.backupReadState.BackupReadBehavior.MaxNumBackups
nReaders := min(maxNumBackups+1, len(order))
nReaders := min(maxNumBackups+1, len(tract.Hosts))
nStart = nReaders - 1
for n := 0; n < nReaders; n++ {
go readFuncBackup(n)
}

// This function should not block.
resultCh := backupRequestFunc(ctx, cli, reqID, tract, thisB, thisOffset, nReaders, order)

// Copy the first successful read data into thisB and continue to unblock resultCh.
// Return early only if we have a valid read.
done := false
var res tractResultRepl
for n := 0; n < nReaders; n++ {
res = <-resultCh
if res.err == core.NoError {
res := <-resultCh
if res.tractResult.err == core.NoError && !done {
cancel()
*result = res.tractResult
copy(thisB[:len(res.thisB)], res.thisB)
// In case of short read, i.e., len(res.thisB) < len(thisB), we should
// pad thisB result with 0's. This doesn't need to be done
// explicitly if 'thisB' given to us starts to be empty.
// However, just in case it's not, we will ensure that by
// overwritting untouched bytes with 0's.
for i := len(res.thisB); i < len(thisB); i++ {
thisB[i] = 0
}
copyResult(thisB, res.thisB, result, res.tractResult)
done = true
}
}
backupPhaseDone()
if done {
return
}
Expand All @@ -1236,20 +1280,11 @@ func (cli *Client) readOneTractReplicated(
log.V(1).Infof("read %s from tsid %d: no host", tract.Tract, tract.TSIDs[n])
continue
}
res := readFunc(host, n)
res := cli.readOneTractWithResult(ctx, host, reqID, tract, thisB, thisOffset)
if res.err != core.NoError && res.err != core.ErrEOF {
continue
}
*result = res.tractResult
copy(thisB[:len(res.thisB)], res.thisB)
// In case of short read, i.e., len(thisBLoc) < len(thisB), we should
// pad thisB result with 0's. This doesn't need to be done
// explicitly if 'thisB' given to us starts to be empty.
// However, just in case it's not, we will ensure that by
// overwritting untouched bytes with 0's.
for i := len(res.thisB); i < len(thisB); i++ {
thisB[i] = 0
}
copyResult(thisB, res.thisB, result, res.tractResult)
return
}
log.V(1).Infof("read %s all hosts failed", tract.Tract)
Expand Down Expand Up @@ -1325,7 +1360,7 @@ func (cli *Client) statTract(ctx context.Context, tract core.TractInfo) (int64,
return int64(tract.RS.Length), core.NoError
}

order := rand.Perm(len(tract.Hosts))
order := randOrder(len(tract.Hosts))

err := core.ErrAllocHost // default error if none present
for _, n := range order {
Expand Down Expand Up @@ -1423,6 +1458,11 @@ func (cli *Client) getTracts(ctx context.Context, addr string, id core.BlobID, s
return
}

// getRandomOrder returns a slice of random ints on [0, n).
func getRandomOrder(n int) []int {
return rand.Perm(n)
}

type blobIterator struct {
cli *Client
thisPartition core.PartitionID
Expand Down
Loading