Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backup requests for replicated storage #7

Open
wants to merge 25 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions client/blb/backup_reads.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package blb

import (
"time"
)

// BackupReadBehavior lets clients decide when to enable or disable backup reads. The delay
// field determines how long before the backup request is sent.
type BackupReadBehavior struct {
// Enabled is a flag to enable the backup read feature.
Enabled bool

// Delay is the time duration between successive backup reads.
Delay time.Duration

// MaxNumBackups is the maximum number of requests sent in addition
// to the primary read.
MaxNumBackups int
}

type backupReadState struct {
BackupReadBehavior

// backupDelay is the delay before sending backup reads.
backupDelayFunc func(time.Duration) <-chan time.Time
dnr marked this conversation as resolved.
Show resolved Hide resolved
}

func makeBackupReadState(behavior BackupReadBehavior) backupReadState {
if behavior.Enabled {
if behavior.MaxNumBackups <= 0 {
// Give it a default value of 1.
behavior.MaxNumBackups = 1
}
} else {
// MaxNumBackups isn't used in the read path if this is disabled, clear this.
behavior.MaxNumBackups = 0
}
return backupReadState{
BackupReadBehavior: behavior,
backupDelayFunc: time.After,
}
}
253 changes: 108 additions & 145 deletions client/blb/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ type Options struct {
// How the client decides whether to attempt client-side RS reconstruction.
ReconstructBehavior ReconstructBehavior

// Wether client will send backup reads if the primary read is delayed.
DisableBackupReads bool
// How the client decides to send backup reads if the primary read is delayed.
BackupReadBehavior BackupReadBehavior
}

// Client exposes a simple interface to Blb users for requesting services and
Expand Down Expand Up @@ -118,15 +118,8 @@ type Client struct {

// Reconstruct behavior.
reconstructState reconstructState

// Wether to use backup reads or not.
backupReadsDisabled bool

// backupDelay is the delay before sending backup reads.
backupDelay func(time.Duration) <-chan time.Time

// done checks if the client is done processing requests.
done func() <-chan struct{}
// Backup read behavior.
backupReadState backupReadState

// Metrics we collect.
metricOpen prometheus.Observer
Expand Down Expand Up @@ -177,8 +170,7 @@ func newBaseClient(options *Options) *Client {
cluster: options.Cluster,
retrier: retrier,
reconstructState: makeReconstructState(options.ReconstructBehavior),
backupReadsDisabled: options.DisableBackupReads,
backupDelay: func(d time.Duration) <-chan time.Time { return time.After(d) },
backupReadState: makeBackupReadState(options.BackupReadBehavior),
metricOpen: clientOpLatenciesSet.WithLabelValues("open", options.Instance),
metricCreate: clientOpLatenciesSet.WithLabelValues("create", options.Instance),
metricReadDurations: clientOpLatenciesSet.WithLabelValues("read", options.Instance),
Expand Down Expand Up @@ -902,7 +894,8 @@ func (cli *Client) writeOneTract(
sem.Acquire()
defer sem.Release()

*result = cli.tractservers.Write(ctx, host, tract.Tract, tract.Version, thisB, thisOffset)
reqID := core.GenRequestID()
*result = cli.tractservers.Write(ctx, host, reqID, tract.Tract, tract.Version, thisB, thisOffset)

log.V(1).Infof("write %s to %s: %s", tract.Tract, host, *result)
}
Expand All @@ -923,7 +916,8 @@ func (cli *Client) createOneTract(
sem.Acquire()
defer sem.Release()

*result = cli.tractservers.Create(ctx, host, tsid, tract.Tract, thisB, thisOffset)
reqID := core.GenRequestID()
*result = cli.tractservers.Create(ctx, host, reqID, tsid, tract.Tract, thisB, thisOffset)

log.V(1).Infof("create %s to %s: %s", tract.Tract, host, *result)
}
Expand Down Expand Up @@ -1109,18 +1103,22 @@ func (cli *Client) readOneTract(
defer sem.Release()

if len(tract.Hosts) > 0 {
if cli.backupReadsDisabled {
cli.readOneTractReplicated(ctx, curAddr, result, tract, thisB, thisOffset)
} else {
cli.readOneTractWithBackupReplicated(ctx, curAddr, result, tract, thisB, thisOffset)
}
cli.readOneTractReplicated(ctx, curAddr, result, tract, thisB, thisOffset)
} else if tract.RS.Present() {
cli.readOneTractRS(ctx, curAddr, result, tract, thisB, thisOffset)
} else {
*result = tractResult{err: core.ErrInvalidState}
}
}

// errorResult carries state to deterime if a failed tract read should
// report a bad TS to the curator.
type errorResult struct {
tractID core.TractID
host string
err core.Error
}

func (cli *Client) readOneTractReplicated(
ctx context.Context,
curAddr string,
Expand All @@ -1130,136 +1128,113 @@ func (cli *Client) readOneTractReplicated(
thisOffset int64) {

var badVersionHost string
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unused?

var skipHost bool

reqID := core.GenRequestID()
order := rand.Perm(len(tract.Hosts))

err := core.ErrAllocHost // default error if none present
for _, n := range order {
host := tract.Hosts[n]
if host == "" {
log.V(1).Infof("read %s from tsid %d: no host", tract.Tract, tract.TSIDs[n])
continue
}
skipHost, badVersionHost, err = cli.readIntoReportBadTS(ctx, curAddr, host, nil, "", result, tract, thisB, thisOffset)
if skipHost {
continue
}
return // if skipHost is false, we have hit a short read or there was no error returned.
}
log.V(1).Infof("read %s all hosts failed", tract.Tract)
*result = tractResult{0, 0, err, badVersionHost}
}

func (cli *Client) readIntoReportBadTS(
ctx context.Context,
curAddr string,
host string,
otherHosts []string,
reqID string,
result *tractResult,
tract *core.TractInfo,
thisB []byte,
thisOffset int64) (skipHost bool, badHost string, err core.Error) {

var read int
var badVersionHost string
read, err = cli.tractservers.ReadInto(ctx, host, otherHosts, reqID, tract.Tract, tract.Version, thisB, thisOffset)
if err == core.ErrVersionMismatch {
badVersionHost = host
}
if err != core.NoError && err != core.ErrEOF {
log.V(1).Infof("read %s from tractserver at address %s: %s", tract.Tract, host, err)
// If we failed to read from a TS, report that to the curator. Defer so we can examine
// result.err at the end, to see if we succeeded on another or not.
defer func(host string, err core.Error) {
couldRecover := result.err == core.NoError || result.err == core.ErrEOF
go cli.curators.ReportBadTS(context.Background(), curAddr, tract.Tract, host, "read", err, couldRecover)
}(host, err)
return true, badVersionHost, err // try another host

}
log.V(1).Infof("read %s from tractserver at address %s: %s", tract.Tract, host, err)
// In case of short read, i.e., read < len(thisB), we should
// pad the result with 0's. This doesn't need to be done
// explicitly if 'thisB' given to us starts to be empty.
// However, just in case it's not, we will ensure that by
// overwritting untouched bytes with 0's.
for i := read; i < len(thisB); i++ {
thisB[i] = 0
}
*result = tractResult{len(thisB), read, err, badVersionHost}
return false, badVersionHost, err
}

func (cli *Client) readOneTractWithBackupReplicated(
ctx context.Context,
curAddr string,
result *tractResult,
tract *core.TractInfo,
thisB []byte,
thisOffset int64) {

var badVersionHost string
if cli.backupReadState.BackupReadBehavior.Enabled {
ch := make(chan tractResult)

order := rand.Perm(len(tract.Hosts))
ctx, cancel := context.WithCancel(ctx)
dnr marked this conversation as resolved.
Show resolved Hide resolved
defer cancel()

// Generate a request ID to use for cancelling alternate requests.
reqID := core.GenRequestID()
maxNumBackups := cli.backupReadState.BackupReadBehavior.MaxNumBackups
delay := cli.backupReadState.BackupReadBehavior.Delay

ch := make(chan *tractResult)
errc := make(chan core.Error)
ctx, cancel := context.WithCancel(ctx)
defer cancel()

err := core.ErrAllocHost // default error if none present
for i, n := range order {
go func(i, j int) {
select {
case <-ctx.Done():
errc <- core.ErrCanceled
readFunc := func(n int) {
host := tract.Hosts[n]
if host == "" {
log.V(1).Infof("read %s from tsid %d: no host", tract.Tract, tract.TSIDs[n])
ch <- tractResult{0, 0, err, badVersionHost}
return
case <-cli.backupDelay(time.Duration(2*i) * time.Millisecond):
// sleep for the backup delay to prevent tied backup requests cancelling,
// each other's work.
}

host := tract.Hosts[j]
if host == "" {
log.V(1).Infof("read %s from tsid %d: no host", tract.Tract, tract.TSIDs[j])
// TODO(eric): fill in otherHosts when ts-ts cancellation is done.
read, err := cli.tractservers.ReadInto(ctx, host, nil, reqID, tract.Tract, tract.Version, thisB, thisOffset)
dnr marked this conversation as resolved.
Show resolved Hide resolved
if err == core.ErrVersionMismatch {
badVersionHost = host
dnr marked this conversation as resolved.
Show resolved Hide resolved
}
if err != core.NoError && err != core.ErrEOF {
// TODO(eric): redo bad TS reporting mechanism.
ch <- tractResult{0, 0, err, badVersionHost}
return
}
log.V(1).Infof("read %s from tractserver at address %s: %s", tract.Tract, host, err)
// In case of short read, i.e., read < len(thisB), we should
// pad the result with 0's. This doesn't need to be done
// explicitly if 'thisB' given to us starts to be empty.
// However, just in case it's not, we will ensure that by
// overwritting untouched bytes with 0's.
for i := read; i < len(thisB); i++ {
thisB[i] = 0
}
ch <- tractResult{len(thisB), read, err, badVersionHost}
return
}

// Compute alternate hosts that will receive backup requests. Pass them along
// to the tracserver rpc layer to enable cancellation messages to be sent between
// servers. TODO(eric): come up with a better name for this.
otherHosts := cutSlice(tract.Hosts, j)

_, badVersionHost, err = cli.readIntoReportBadTS(ctx, curAddr, host, otherHosts, reqID, result, tract, thisB, thisOffset)
if err != core.NoError {
errc <- err
return
// Backup request logic involves the following:
// 1) Send the first read to the first host (item 0 available from the orderCh) in a goroutine.
// 2) Start another goroutine to issue backup requests. Each subsequent request is spawned after the
// configured backup time.After delay time expires.
// 3) Block until the first read is returned from the goroutines servicing the reads. Note that if the
// the request returned fails, we accept that and continume on to the fallback phase for sequential reads.
// We could optimize this later and scan for the first successful backup request, but that may not be
// worth the latency savings.
go func() {
nb := min(maxNumBackups+1, len(order))
for i := 0; i < nb; i++ {
select {
case <-cli.backupReadState.backupDelayFunc(time.Duration(i) * delay):
go readFunc(order[i])
case <-ctx.Done():
log.V(1).Infof("backup read %s-%d to tractserver cancelled", tract.Tract, i)
break
}
}
ch <- result
}()

// take the first result.
dnr marked this conversation as resolved.
Show resolved Hide resolved
*result = <-ch
if result.err == core.NoError {
return
}(i, n)
}
}
// block until one goroutine finishes, or all fail due to errors.
nerr := 0
for {
select {
case result = <-ch:
return
case err = <-errc:
nerr++
if nerr == len(order) {
log.V(1).Infof("read %s all hosts failed", tract.Tract)
*result = tractResult{0, 0, err, badVersionHost}
return
}

// Sequentailly read from tract server replicas. This phase will run if backups are disabled from config state, or
dnr marked this conversation as resolved.
Show resolved Hide resolved
// the backup read phase fails and we use this as a fallback.
dnr marked this conversation as resolved.
Show resolved Hide resolved
for n := range order {
host := tract.Hosts[n]
dnr marked this conversation as resolved.
Show resolved Hide resolved
if host == "" {
log.V(1).Infof("read %s from tsid %d: no host", tract.Tract, tract.TSIDs[n])
continue
}

var read int
read, err = cli.tractservers.ReadInto(ctx, host, nil, reqID, tract.Tract, tract.Version, thisB, thisOffset)
if err == core.ErrVersionMismatch {
badVersionHost = host
}
if err != core.NoError && err != core.ErrEOF {
log.V(1).Infof("read %s from tractserver at address %s: %s", tract.Tract, host, err)
// TODO(eric): redo bad TS reporting mechanism.
continue // try another host
}
log.V(1).Infof("read %s from tractserver at address %s: %s", tract.Tract, host, err)
// In case of short read, i.e., read < len(thisB), we should
// pad the result with 0's. This doesn't need to be done
// explicitly if 'thisB' given to us starts to be empty.
// However, just in case it's not, we will ensure that by
// overwritting untouched bytes with 0's.
for i := read; i < len(thisB); i++ {
thisB[i] = 0
}
*result = tractResult{len(thisB), read, err, badVersionHost}
return
}
return // unreachable

log.V(1).Infof("read %s all hosts failed", tract.Tract)
*result = tractResult{0, 0, err, badVersionHost}
}

func (cli *Client) readOneTractRS(
Expand All @@ -1278,12 +1253,7 @@ func (cli *Client) readOneTractRS(

if err != core.NoError && err != core.ErrEOF {
log.V(1).Infof("rs read %s from tractserver at address %s: %s", tract.Tract, tract.RS.Host, err)
// If we failed to read from a TS, report that to the curator. Defer so
// we can examine result.err at the end, to see if we recovered or not.
defer func(err core.Error) {
couldRecover := result.err == core.NoError || result.err == core.ErrEOF
go cli.curators.ReportBadTS(context.Background(), curAddr, rsTract, tract.RS.Host, "read", err, couldRecover)
}(err)
// TODO(eric): redo bad TS reporting mechanism.
if !cli.shouldReconstruct(tract) {
*result = tractResult{len(thisB), 0, err, ""}
return
Expand Down Expand Up @@ -1540,10 +1510,3 @@ func priorityFromContext(ctx context.Context) core.Priority {
}
return core.Priority_TSDEFAULT
}

func cutSlice(s []string, i int) []string {
r := make([]string, len(s))
copy(r, s)
r[i], r[len(r)-1] = r[len(r)-1], r[i]
return r[:len(r)-1]
}
Loading