Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backup requests for replicated storage #7

Open
wants to merge 25 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 126 additions & 5 deletions client/blb/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ type Options struct {

// How the client decides whether to attempt client-side RS reconstruction.
ReconstructBehavior ReconstructBehavior

// How the client decides to send backup reads if the primary read is delayed.
BackupReadBehavior BackupReadBehavior
}

// Client exposes a simple interface to Blb users for requesting services and
Expand Down Expand Up @@ -115,6 +118,8 @@ type Client struct {

// Reconstruct behavior.
reconstructState reconstructState
// Backup read behavior.
backupReadState backupReadState

// Metrics we collect.
metricOpen prometheus.Observer
Expand Down Expand Up @@ -165,6 +170,7 @@ func newBaseClient(options *Options) *Client {
cluster: options.Cluster,
retrier: retrier,
reconstructState: makeReconstructState(options.ReconstructBehavior),
backupReadState: makeBackupReadState(options.BackupReadBehavior),
metricOpen: clientOpLatenciesSet.WithLabelValues("open", options.Instance),
metricCreate: clientOpLatenciesSet.WithLabelValues("create", options.Instance),
metricReadDurations: clientOpLatenciesSet.WithLabelValues("read", options.Instance),
Expand Down Expand Up @@ -888,7 +894,8 @@ func (cli *Client) writeOneTract(
sem.Acquire()
defer sem.Release()

*result = cli.tractservers.Write(ctx, host, tract.Tract, tract.Version, thisB, thisOffset)
reqID := core.GenRequestID()
*result = cli.tractservers.Write(ctx, host, reqID, tract.Tract, tract.Version, thisB, thisOffset)

log.V(1).Infof("write %s to %s: %s", tract.Tract, host, *result)
}
Expand All @@ -909,7 +916,8 @@ func (cli *Client) createOneTract(
sem.Acquire()
defer sem.Release()

*result = cli.tractservers.Create(ctx, host, tsid, tract.Tract, thisB, thisOffset)
reqID := core.GenRequestID()
*result = cli.tractservers.Create(ctx, host, reqID, tsid, tract.Tract, thisB, thisOffset)

log.V(1).Infof("create %s to %s: %s", tract.Tract, host, *result)
}
Expand Down Expand Up @@ -1103,6 +1111,14 @@ func (cli *Client) readOneTract(
}
}

// errorResult carries state to deterime if a failed tract read should
// report a bad TS to the curator.
type errorResult struct {
tractID core.TractID
host string
err core.Error
}

func (cli *Client) readOneTractReplicated(
ctx context.Context,
curAddr string,
Expand All @@ -1113,18 +1129,122 @@ func (cli *Client) readOneTractReplicated(

var badVersionHost string
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unused?


reqID := core.GenRequestID()
order := rand.Perm(len(tract.Hosts))
orderCh := make(chan int, len(tract.Hosts))

// This goroutine hands out the order in which to process reads.
// This ensures that when goroutines are done processing the initial backup
// phase, we can know which hosts still need to be processed sequentially.
go func() {
for _, n := range order {
orderCh <- n
}
close(orderCh)
}()

err := core.ErrAllocHost // default error if none present
for _, n := range order {
if cli.backupReadState.BackupReadBehavior.Enabled {
ch := make(chan tractResult)

ctx, cancel := context.WithCancel(ctx)
dnr marked this conversation as resolved.
Show resolved Hide resolved
defer cancel()

// Track state for reporting bad TSs in an array of errorResults after the backup request phase
// is done. Each gouroutine writes into its own slot. This is simpler than using a channel and
// provides a work around to changing reporting semantics when deferring ReportBadTs calls in
// helper functions.
idx := 0
reportErrors := make([]errorResult, len(tract.Hosts))
readFunc := func(orderCh chan int) {
err := core.ErrAllocHost // default error if none present
ericox marked this conversation as resolved.
Show resolved Hide resolved
n := <-orderCh
host := tract.Hosts[n]
if host == "" {
log.V(1).Infof("read %s from tsid %d: no host", tract.Tract, tract.TSIDs[n])
ch <- tractResult{0, 0, err, badVersionHost}
return
}

// TODO(eric): fill in otherHosts when ts-ts cancellation is done.
read, err := cli.tractservers.ReadInto(ctx, host, nil, reqID, tract.Tract, tract.Version, thisB, thisOffset)
dnr marked this conversation as resolved.
Show resolved Hide resolved
if err == core.ErrVersionMismatch {
badVersionHost = host
dnr marked this conversation as resolved.
Show resolved Hide resolved
}
if err != core.NoError && err != core.ErrEOF {
// TODO: This probably has a race, pull this into a struct with a lock.
reportErrors[idx] = errorResult{tract.Tract, host, err}
idx++
ch <- tractResult{0, 0, err, badVersionHost}
return
}
log.V(1).Infof("read %s from tractserver at address %s: %s", tract.Tract, host, err)
// In case of short read, i.e., read < len(thisB), we should
// pad the result with 0's. This doesn't need to be done
// explicitly if 'thisB' given to us starts to be empty.
// However, just in case it's not, we will ensure that by
// overwritting untouched bytes with 0's.
for i := read; i < len(thisB); i++ {
thisB[i] = 0
}
ch <- tractResult{len(thisB), read, err, badVersionHost}
}

// Backup request logic involves the following:
// 1) Send the first read to the primary tract (item 0 available from the orderCh) in one goroutine.
// 2) Start another goroutine to issue backup requests. Each subsequent request is spawned after the
// configured backup time.After delay time expires.
// 3) Block until the first read is returned from the goroutines servicing the reads. Note that if the
// the request returned fails, we accept that and continume on to the fallback phase for sequential reads.
// We could optimize this later and scan for the first successful backup request, but that may not be
// worth the latency savings.
maxBackups := cli.backupReadState.BackupReadBehavior.MaxNumBackups
delay := cli.backupReadState.BackupReadBehavior.Delay
go readFunc(orderCh)
go func(delay time.Duration, maxNumBackups int) {
for i := 0; i < maxBackups; i++ {
select {
case <-cli.backupReadState.backupDelayFunc(delay):
go readFunc(orderCh)
case <-ctx.Done():
log.V(1).Infof("backup read %s-%d to tractserver cancelled", tract.Tract, i+1)
break
}
}
}(delay, maxBackups)

// Block and take the first result.
*result = <-ch
if result.err == core.NoError {
cancel()

// Report badness amongst tractservers from backup read phase.
for _, res := range reportErrors {
if res.err != core.NoError && res.err != core.ErrEOF {
log.V(1).Infof("read %s from tractserver at address %s: %s", tract.Tract, res.host, res.err)
// If we failed to read from a TS, report that to the curator. Defer so we can examine
// result.err at the end, to see if we succeeded on another or not.
defer func(host string, err core.Error) {
couldRecover := result.err == core.NoError || result.err == core.ErrEOF
go cli.curators.ReportBadTS(context.Background(), curAddr, res.tractID, host, "read", err, couldRecover)
}(res.host, res.err)
}
}
return
}
}

// Sequentailly read from tract server replicas. This phase will run if backups are disabled from config state, or
dnr marked this conversation as resolved.
Show resolved Hide resolved
// the backup read phase fails and we use this as a fallback.
dnr marked this conversation as resolved.
Show resolved Hide resolved
for n := range orderCh {
host := tract.Hosts[n]
dnr marked this conversation as resolved.
Show resolved Hide resolved
if host == "" {
log.V(1).Infof("read %s from tsid %d: no host", tract.Tract, tract.TSIDs[n])
continue
}

var read int
read, err = cli.tractservers.ReadInto(ctx, host, tract.Tract, tract.Version, thisB, thisOffset)
read, err = cli.tractservers.ReadInto(ctx, host, nil, reqID, tract.Tract, tract.Version, thisB, thisOffset)
if err == core.ErrVersionMismatch {
badVersionHost = host
}
Expand Down Expand Up @@ -1166,7 +1286,8 @@ func (cli *Client) readOneTractRS(
rsTract := tract.RS.Chunk.ToTractID()
length := min(len(thisB), int(tract.RS.Length))
offset := int64(tract.RS.Offset) + thisOffset
read, err := cli.tractservers.ReadInto(ctx, tract.RS.Host, rsTract, core.RSChunkVersion, thisB[:length], offset)
reqID := core.GenRequestID()
read, err := cli.tractservers.ReadInto(ctx, tract.RS.Host, nil, reqID, rsTract, core.RSChunkVersion, thisB[:length], offset)

if err != core.NoError && err != core.ErrEOF {
log.V(1).Infof("rs read %s from tractserver at address %s: %s", tract.Tract, tract.RS.Host, err)
Expand Down
117 changes: 115 additions & 2 deletions client/blb/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"strings"
"sync"
"testing"
"time"

"github.com/westerndigitalcorporation/blb/pkg/slices"

Expand Down Expand Up @@ -65,9 +66,37 @@ func (log *tsTraceLog) check(t *testing.T, write bool, addr string, version, len
t.Errorf("trace log missing entry %v %v %v %v %v", write, addr, version, length, off)
}

// checkRead checks that the specified entry is present and verifies the read host address.
// The tract id itself is embedded in addr (along with the index of the
// tractserver), so we don't bother checking id.
func (log *tsTraceLog) checkRead(t *testing.T, write bool, addr string, version, length int, off int64) {
for _, e := range log.log {
// Reads randomly pick a host to read from so we don't verify the address.
if e.write == write &&
e.addr == addr &&
e.version == version &&
e.length == length &&
e.off == off {
return
}
}
t.Errorf("trace log missing entry %v %v %v %v %v", write, addr, version, length, off)
}

// checkReadGroup checks that a group of entries is present and verifies the read host address.
// This is useful for checking a batch of concurrent reads are present in the log.
// The tract id itself is embedded in addr (along with the index of the
// tractserver), so we don't bother checking id.
func (log *tsTraceLog) checkReadEntries(t *testing.T, write bool, entries []tsTraceEntry) {
for _, e := range entries {
log.checkRead(t, write, e.addr, e.version, e.length, e.off)
}
}

// newClient creates a Client suitable for testing. The trace function given
// should return core.NoError for a read/write to proceed, and something else to
// inject an error.
// inject an error. The disableBackupReads parameter allows the backup read feature
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment change should be reverted

// to be turned off when tests only need to send reads to a single tract.
// Note: this mechanism doesn't provide a way to inject an error into a write
// _and_ have the write reflected on the tractserver anyway. We can test that
// later.
Expand All @@ -86,7 +115,7 @@ func newClient(trace tsTraceFunc) *Client {
// newTracingClient creates a Client suitable for testing. It's connected to a
// fake master, three fake curators (each responsible for one partition), and as
// many tractservers as there are tract copies (the fake curator places each
// copy on a separate fake tractserver). The trace log returned can be used to
// copy on a separate fake tractserver). The trace log returned can be used to
// check the exact calls made to each tractserver.
func newTracingClient() (*Client, *tsTraceLog) {
traceLog := new(tsTraceLog)
Expand Down Expand Up @@ -135,6 +164,13 @@ func testWriteRead(t *testing.T, blob *Blob, length int, off int64) {
}
}

// testWrite does a single write at the given length and offset.
func testWrite(t *testing.T, blob *Blob, length int, off int64) {
blob.Seek(off, os.SEEK_SET)
p1 := makeData(length)
checkWrite(t, blob, p1)
}

// testWriteReadInParts does a series of sequential writes at the given length
// and offset, and then a series of sequential reads, and checks that the data
// matches.
Expand Down Expand Up @@ -257,6 +293,83 @@ func TestWriteReadTracedExactlyOneTract(t *testing.T) {
trace.checkLength(t, 7)
}

func (cli *Client) setupBackupClient(maxNumBackups int) chan<- time.Time {
bch := make(chan time.Time)
cli.backupReadState = backupReadState{
BackupReadBehavior: BackupReadBehavior{
Enabled: true,
MaxNumBackups: maxNumBackups,
},
}
cli.backupReadState.backupDelayFunc = func(_ time.Duration) <-chan time.Time {
// send a time value to this channel to unblock.
<-bch
return time.After(0 * time.Second)
}
return bch
}

func TestWriteReadTracedExactlyOneTractWithBackups(t *testing.T) {
cli, trace := newTracingClient()
bch := cli.setupBackupClient(2) // 2 backup requests

// Write a blob and check the writes are logged.
blob := createBlob(t, cli)
blob.Seek(core.TractLength, os.SEEK_SET)
p1 := makeData(core.TractLength)
checkWrite(t, blob, p1)

trace.check(t, true, "ts-0000000100000001:0000-0", 1, 0, 0)
trace.check(t, true, "ts-0000000100000001:0000-1", 1, 0, 0)
trace.check(t, true, "ts-0000000100000001:0000-2", 1, 0, 0)
trace.check(t, true, "ts-0000000100000001:0001-0", 1, core.TractLength, 0)
trace.check(t, true, "ts-0000000100000001:0001-1", 1, core.TractLength, 0)
trace.check(t, true, "ts-0000000100000001:0001-2", 1, core.TractLength, 0)

// Send a read with backups. And syncronize backup reads using fake time.
go func() {
blob.Seek(core.TractLength, os.SEEK_SET)
p := make([]byte, core.TractLength)
n, err := blob.Read(p)
if err != nil || n != core.TractLength {
t.Fatal("error or short read", n, core.TractLength, err)
}
}()

// TODO(eric): The test was still racy after using the channel approach. If the first
// read completes, the other two are cancelled. Perhaps we need to mock the cancel func
// as well to disable cancellation?
bch <- time.Time{}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, I'm thinking you don't need delayFunc/bch after all

bch <- time.Time{}
time.Sleep(100 * time.Millisecond)

// Check that all three reads, 1 primary and 2 backup requests are logged.
tsIDs := []tsTraceEntry{
{addr: "ts-0000000100000001:0001-0", version: 1, length: core.TractLength, off: 0},
{addr: "ts-0000000100000001:0001-1", version: 1, length: core.TractLength, off: 0},
{addr: "ts-0000000100000001:0001-2", version: 1, length: core.TractLength, off: 0},
}
trace.checkReadEntries(t, false, tsIDs)
trace.checkLength(t, 9)
}

func TestReadFailoverWithBackups(t *testing.T) {
fail := func(e tsTraceEntry) core.Error {
// Reads from the first two tractservers fail.
if !e.write && !strings.HasSuffix(e.addr, "2") {
return core.ErrRPC
}
return core.NoError
}

cli := newClient(fail)
bch := cli.setupBackupClient(1) // 1 backup requests
go func() {
bch <- time.Time{}
}()
testWriteRead(t, createBlob(t, cli), 3*core.TractLength+8765, 2*core.TractLength+27)
}

func TestWriteFailSimple(t *testing.T) {
fail := func(e tsTraceEntry) core.Error {
// All writes to last tractserver fail
Expand Down
Loading