Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backup requests for replicated storage #7

Open
wants to merge 25 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions client/blb/backup_reads.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package blb

import (
"time"
)

// BackupReadBehavior lets clients decide when to enable or disable backup reads. The delay
// field determines how long before the backup request is sent.
type BackupReadBehavior struct {
// Enabled is a flag to enable the backup read feature.
Enabled bool

// Delay is the time duration between successive backup reads.
Delay time.Duration

// MaxNumBackups is the maximum number of requests sent in addition
// to the primary read.
MaxNumBackups int
}

type backupReadState struct {
BackupReadBehavior

// backupDelay is the delay before sending backup reads.
delayFunc func(time.Duration) <-chan time.Time
}

func makeBackupReadState(behavior BackupReadBehavior) backupReadState {
if behavior.Enabled {
if behavior.MaxNumBackups <= 0 {
// Give it a default value of 1.
behavior.MaxNumBackups = 1
}
} else {
// MaxNumBackups isn't used in the read path if this is disabled, clear this.
behavior.MaxNumBackups = 0
}
return backupReadState{
BackupReadBehavior: behavior,
delayFunc: time.After,
}
}
149 changes: 120 additions & 29 deletions client/blb/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ type Options struct {

// How the client decides whether to attempt client-side RS reconstruction.
ReconstructBehavior ReconstructBehavior

// How the client decides to send backup reads if the primary read is delayed.
BackupReadBehavior BackupReadBehavior
}

// Client exposes a simple interface to Blb users for requesting services and
Expand Down Expand Up @@ -115,6 +118,8 @@ type Client struct {

// Reconstruct behavior.
reconstructState reconstructState
// Backup read behavior.
backupReadState backupReadState

// Metrics we collect.
metricOpen prometheus.Observer
Expand Down Expand Up @@ -165,6 +170,7 @@ func newBaseClient(options *Options) *Client {
cluster: options.Cluster,
retrier: retrier,
reconstructState: makeReconstructState(options.ReconstructBehavior),
backupReadState: makeBackupReadState(options.BackupReadBehavior),
metricOpen: clientOpLatenciesSet.WithLabelValues("open", options.Instance),
metricCreate: clientOpLatenciesSet.WithLabelValues("create", options.Instance),
metricReadDurations: clientOpLatenciesSet.WithLabelValues("read", options.Instance),
Expand Down Expand Up @@ -888,7 +894,8 @@ func (cli *Client) writeOneTract(
sem.Acquire()
defer sem.Release()

*result = cli.tractservers.Write(ctx, host, tract.Tract, tract.Version, thisB, thisOffset)
reqID := core.GenRequestID()
*result = cli.tractservers.Write(ctx, host, reqID, tract.Tract, tract.Version, thisB, thisOffset)

log.V(1).Infof("write %s to %s: %s", tract.Tract, host, *result)
}
Expand All @@ -909,7 +916,8 @@ func (cli *Client) createOneTract(
sem.Acquire()
defer sem.Release()

*result = cli.tractservers.Create(ctx, host, tsid, tract.Tract, thisB, thisOffset)
reqID := core.GenRequestID()
*result = cli.tractservers.Create(ctx, host, reqID, tsid, tract.Tract, thisB, thisOffset)

log.V(1).Infof("create %s to %s: %s", tract.Tract, host, *result)
}
Expand Down Expand Up @@ -1103,6 +1111,13 @@ func (cli *Client) readOneTract(
}
}

type tractResultRepl struct {
tractResult

badReadHost string // tracks what host has a failed read
thisB []byte
}

func (cli *Client) readOneTractReplicated(
ctx context.Context,
curAddr string,
Expand All @@ -1111,43 +1126,123 @@ func (cli *Client) readOneTractReplicated(
thisB []byte,
thisOffset int64) {

// TODO(eric): remove this var when we redo bad ts version reporting
var badVersionHost string
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unused?


reqID := core.GenRequestID()
order := rand.Perm(len(tract.Hosts))
badReadHosts := make(map[string]bool)

err := core.ErrAllocHost // default error if none present
for _, n := range order {
host := tract.Hosts[n]
if host == "" {
log.V(1).Infof("read %s from tsid %d: no host", tract.Tract, tract.TSIDs[n])
continue
}

var read int
read, err = cli.tractservers.ReadInto(ctx, host, tract.Tract, tract.Version, thisB, thisOffset)
readFunc := func(host string, n int, thisBLoc []byte) tractResult {
// TODO(eric): fill in otherHosts when ts-ts cancellation is done.
read, err := cli.tractservers.ReadInto(ctx, host, nil, reqID, tract.Tract, tract.Version, thisBLoc, thisOffset)
dnr marked this conversation as resolved.
Show resolved Hide resolved
// TODO(eric): redo this check when we fix error reporting.
if err == core.ErrVersionMismatch {
badVersionHost = host
}
if err != core.NoError && err != core.ErrEOF {
log.V(1).Infof("read %s from tractserver at address %s: %s", tract.Tract, host, err)
// If we failed to read from a TS, report that to the curator. Defer so we can examine
// result.err at the end, to see if we succeeded on another or not.
defer func(host string, err core.Error) {
couldRecover := result.err == core.NoError || result.err == core.ErrEOF
go cli.curators.ReportBadTS(context.Background(), curAddr, tract.Tract, host, "read", err, couldRecover)
}(host, err)
continue // try another host
// TODO(eric): redo bad TS reporting mechanism.
return tractResult{0, 0, err, badVersionHost}
}
log.V(1).Infof("read %s from tractserver at address %s: %s", tract.Tract, host, err)
// In case of short read, i.e., read < len(thisB), we should
// pad the result with 0's. This doesn't need to be done
// explicitly if 'thisB' given to us starts to be empty.
// However, just in case it's not, we will ensure that by
// overwritting untouched bytes with 0's.
for i := read; i < len(thisB); i++ {
thisB[i] = 0
for i := read; i < len(thisBLoc); i++ {
thisBLoc[i] = 0
}
return tractResult{len(thisBLoc), read, err, badVersionHost}
}

if cli.backupReadState.BackupReadBehavior.Enabled {
ch := make(chan tractResultRepl)

ctx, cancel := context.WithCancel(ctx)
dnr marked this conversation as resolved.
Show resolved Hide resolved
defer cancel()

maxNumBackups := cli.backupReadState.BackupReadBehavior.MaxNumBackups
delay := cli.backupReadState.BackupReadBehavior.Delay

// Backup request logic involves the following:
// 1) Send the first read to the first host (item 0 available from the orderCh) in a goroutine.
dnr marked this conversation as resolved.
Show resolved Hide resolved
// 2) Start another goroutine to issue backup requests. Each subsequent request is spawned after the
// configured backup time.After delay time expires.
// 3) Block until read tractResults are available from the goroutines processing the reads. In the result
// collection phase, we mark which hosts reutrned RPC or other errors so we don't re-send reads in the fallback
dnr marked this conversation as resolved.
Show resolved Hide resolved
// phase.
readFuncBackup := func(wg *sync.WaitGroup, n int, thisBLoc []byte) {
defer wg.Done()
host := tract.Hosts[order[n]]
if host == "" {
log.V(1).Infof("read %s from tsid %d: no host", tract.Tract, tract.TSIDs[n])
ch <- tractResultRepl{
tractResult: tractResult{0, 0, err, badVersionHost},
thisB: thisBLoc,
}
return
}

result := readFunc(host, n, thisBLoc)

ch <- tractResultRepl{
tractResult: result,
badReadHost: host,
thisB: thisBLoc,
}
}

var wg sync.WaitGroup
nb := min(maxNumBackups+1, len(order))
wg.Add(nb)
go func() {
for n := 0; n < nb; n++ {
select {
case <-cli.backupReadState.delayFunc(time.Duration(n) * delay):
dnr marked this conversation as resolved.
Show resolved Hide resolved
thisBLoc := make([]byte, len(thisB))
go readFuncBackup(&wg, n, thisBLoc)
case <-ctx.Done():
log.V(1).Infof("backup read %s-%d to tractserver cancelled", tract.Tract, n)
break
}
}
}()
go func() {
dnr marked this conversation as resolved.
Show resolved Hide resolved
wg.Wait()
close(ch)
}()

// Take the first result or mark bad hosts from being read during the
// fallback phase below.
for res := range ch {
if res.err == core.NoError {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you probably want this to be if res.err == core.NoError && !done to avoid taking two successes (very unlikely because of the cancel, but just in case)

*result = res.tractResult
copy(thisB, res.thisB)
return
dnr marked this conversation as resolved.
Show resolved Hide resolved
}
badReadHosts[res.badReadHost] = true
}
}

// Sequentially read from tract server replicas. This phase will run if backups are disabled from config state, or
// the backup read phase fails and we use this as a fallback.
dnr marked this conversation as resolved.
Show resolved Hide resolved
for n := range order {
host := tract.Hosts[n]
dnr marked this conversation as resolved.
Show resolved Hide resolved
if host == "" {
log.V(1).Infof("read %s from tsid %d: no host", tract.Tract, tract.TSIDs[n])
continue
}
if badReadHosts[host] {
dnr marked this conversation as resolved.
Show resolved Hide resolved
log.V(1).Infof("read %s from tsid %d: skipping host due to failed in backup phase", tract.Tract, tract.TSIDs[n])
continue
}
res := readFunc(host, n, thisB)
if res.err != core.NoError && res.err != core.ErrEOF {
continue
}
*result = tractResult{len(thisB), read, err, badVersionHost}
*result = res
return
}

Expand All @@ -1166,16 +1261,12 @@ func (cli *Client) readOneTractRS(
rsTract := tract.RS.Chunk.ToTractID()
length := min(len(thisB), int(tract.RS.Length))
offset := int64(tract.RS.Offset) + thisOffset
read, err := cli.tractservers.ReadInto(ctx, tract.RS.Host, rsTract, core.RSChunkVersion, thisB[:length], offset)
reqID := core.GenRequestID()
read, err := cli.tractservers.ReadInto(ctx, tract.RS.Host, nil, reqID, rsTract, core.RSChunkVersion, thisB[:length], offset)

if err != core.NoError && err != core.ErrEOF {
log.V(1).Infof("rs read %s from tractserver at address %s: %s", tract.Tract, tract.RS.Host, err)
// If we failed to read from a TS, report that to the curator. Defer so
// we can examine result.err at the end, to see if we recovered or not.
defer func(err core.Error) {
couldRecover := result.err == core.NoError || result.err == core.ErrEOF
go cli.curators.ReportBadTS(context.Background(), curAddr, rsTract, tract.RS.Host, "read", err, couldRecover)
}(err)
// TODO(eric): redo bad TS reporting mechanism.
if !cli.shouldReconstruct(tract) {
*result = tractResult{len(thisB), 0, err, ""}
return
Expand Down
Loading