diff --git a/client/blb/backup_reads.go b/client/blb/backup_reads.go new file mode 100644 index 0000000..e6ba3a6 --- /dev/null +++ b/client/blb/backup_reads.go @@ -0,0 +1,36 @@ +package blb + +import ( + "time" +) + +// BackupReadBehavior lets clients decide when to enable or disable backup reads. The delay +// field determines how long before the backup request is sent. +type BackupReadBehavior struct { + // Enabled is a flag to enable the backup read feature. + Enabled bool + + // Delay is the time duration between successive backup reads. + Delay time.Duration + + // MaxNumBackups is the maximum number of requests sent in addition + // to the primary read. + MaxNumBackups int +} + +type backupReadState struct { + BackupReadBehavior +} + +func makeBackupReadState(behavior BackupReadBehavior) backupReadState { + if behavior.Enabled { + if behavior.MaxNumBackups <= 0 { + // Give it a default value of 1. + behavior.MaxNumBackups = 1 + } + } else { + // MaxNumBackups isn't used in the read path if this is disabled, clear this. + behavior.MaxNumBackups = 0 + } + return backupReadState{BackupReadBehavior: behavior} +} diff --git a/client/blb/client.go b/client/blb/client.go index 45e80cc..bf2569d 100644 --- a/client/blb/client.go +++ b/client/blb/client.go @@ -81,6 +81,9 @@ type Options struct { // How the client decides whether to attempt client-side RS reconstruction. ReconstructBehavior ReconstructBehavior + + // How the client decides to send backup reads if the primary read is delayed. + BackupReadBehavior BackupReadBehavior } // Client exposes a simple interface to Blb users for requesting services and @@ -115,6 +118,24 @@ type Client struct { // Reconstruct behavior. reconstructState reconstructState + // Backup read behavior. + backupReadState backupReadState + + // hooks for testing + // Enforces deterministic ordering for hosts. + randOrderFunc func(n int) []int + + // Override the read spawining logic. + backupRequestFunc backupReqFunc + + // Read rpc completes + readDoneHookFunc func() + + // Signal that the backup phase is complete. + backupPhaseDoneHookFunc func() + + // Allows waiting for cancellation to occur. + cancelDoneHookFunc func() // Metrics we collect. metricOpen prometheus.Observer @@ -159,24 +180,30 @@ func newBaseClient(options *Options) *Client { } return &Client{ - lookupCache: lookupCacheNew(LookupCacheSize), - tractCache: tractCacheNew(TractCacheSize), - cacheDisabled: options.DisableCache, - cluster: options.Cluster, - retrier: retrier, - reconstructState: makeReconstructState(options.ReconstructBehavior), - metricOpen: clientOpLatenciesSet.WithLabelValues("open", options.Instance), - metricCreate: clientOpLatenciesSet.WithLabelValues("create", options.Instance), - metricReadDurations: clientOpLatenciesSet.WithLabelValues("read", options.Instance), - metricWriteDurations: clientOpLatenciesSet.WithLabelValues("write", options.Instance), - metricDelete: clientOpLatenciesSet.WithLabelValues("delete", options.Instance), - metricUndelete: clientOpLatenciesSet.WithLabelValues("undelete", options.Instance), - metricReconDuration: clientOpLatenciesSet.WithLabelValues("reconstruct", options.Instance), - metricReadSizes: clientOpSizesSet.WithLabelValues("read", options.Instance), - metricWriteSizes: clientOpSizesSet.WithLabelValues("write", options.Instance), - metricReadBytes: clientOpBytesSet.WithLabelValues("read", options.Instance), - metricWriteBytes: clientOpBytesSet.WithLabelValues("write", options.Instance), - metricReconBytes: clientOpBytesSet.WithLabelValues("reconstruct", options.Instance), + lookupCache: lookupCacheNew(LookupCacheSize), + tractCache: tractCacheNew(TractCacheSize), + cacheDisabled: options.DisableCache, + cluster: options.Cluster, + retrier: retrier, + reconstructState: makeReconstructState(options.ReconstructBehavior), + backupReadState: makeBackupReadState(options.BackupReadBehavior), + randOrderFunc: rand.Perm, + backupRequestFunc: doParallelBackupReads, + readDoneHookFunc: func() {}, + backupPhaseDoneHookFunc: func() {}, + cancelDoneHookFunc: func() {}, + metricOpen: clientOpLatenciesSet.WithLabelValues("open", options.Instance), + metricCreate: clientOpLatenciesSet.WithLabelValues("create", options.Instance), + metricReadDurations: clientOpLatenciesSet.WithLabelValues("read", options.Instance), + metricWriteDurations: clientOpLatenciesSet.WithLabelValues("write", options.Instance), + metricDelete: clientOpLatenciesSet.WithLabelValues("delete", options.Instance), + metricUndelete: clientOpLatenciesSet.WithLabelValues("undelete", options.Instance), + metricReconDuration: clientOpLatenciesSet.WithLabelValues("reconstruct", options.Instance), + metricReadSizes: clientOpSizesSet.WithLabelValues("read", options.Instance), + metricWriteSizes: clientOpSizesSet.WithLabelValues("write", options.Instance), + metricReadBytes: clientOpBytesSet.WithLabelValues("read", options.Instance), + metricWriteBytes: clientOpBytesSet.WithLabelValues("write", options.Instance), + metricReconBytes: clientOpBytesSet.WithLabelValues("reconstruct", options.Instance), } } @@ -888,7 +915,8 @@ func (cli *Client) writeOneTract( sem.Acquire() defer sem.Release() - *result = cli.tractservers.Write(ctx, host, tract.Tract, tract.Version, thisB, thisOffset) + reqID := core.GenRequestID() + *result = cli.tractservers.Write(ctx, host, reqID, tract.Tract, tract.Version, thisB, thisOffset) log.V(1).Infof("write %s to %s: %s", tract.Tract, host, *result) } @@ -909,7 +937,8 @@ func (cli *Client) createOneTract( sem.Acquire() defer sem.Release() - *result = cli.tractservers.Create(ctx, host, tsid, tract.Tract, thisB, thisOffset) + reqID := core.GenRequestID() + *result = cli.tractservers.Create(ctx, host, reqID, tsid, tract.Tract, thisB, thisOffset) log.V(1).Infof("create %s to %s: %s", tract.Tract, host, *result) } @@ -1103,6 +1132,107 @@ func (cli *Client) readOneTract( } } +// tractResultRepl carries a tract result and a reference to a local +// thisB. +type tractResultRepl struct { + tractResult + thisB []byte +} + +func (cli *Client) readOneTractWithResult( + ctx context.Context, + host string, + reqID string, + tract *core.TractInfo, + lengthThisB int, + thisOffset int64) tractResultRepl { + var badVersionHost string + // TODO(eric): fill in otherHosts when ts-ts cancellation is done. + thisBLoc, err := cli.tractservers.Read(ctx, host, nil, reqID, tract.Tract, tract.Version, lengthThisB, thisOffset) + defer cli.readDoneHookFunc() + if err == core.ErrVersionMismatch { + // TODO(eric): fix this check when we redo error reporting. + badVersionHost = host + } + if err != core.NoError && err != core.ErrEOF { + // TODO(eric): redo bad TS reporting mechanism. + return tractResultRepl{ + tractResult: tractResult{0, 0, err, badVersionHost}, + } + } + log.V(1).Infof("read %s from tractserver at address %s: %s", tract.Tract, host, err) + + return tractResultRepl{ + tractResult: tractResult{lengthThisB, len(thisBLoc), err, badVersionHost}, + thisB: thisBLoc, + } +} + +// backupRequestFunc is a function type for sending backup requests. +type backupReqFunc func(ctx context.Context, cli *Client, reqID string, tract *core.TractInfo, + thisB []byte, thisOffset int64, nReaders int, order []int) <-chan tractResultRepl + +func (cli *Client) readOneTractBackupReplicated( + ctx context.Context, + reqID string, + tract *core.TractInfo, + thisB []byte, + thisOffset int64, + delayTimer <-chan time.Time, + resultCh chan<- tractResultRepl, + nOrder int) { + err := core.ErrAllocHost // default error if none present + var badVersionHost string + host := tract.Hosts[nOrder] + if host == "" { + log.V(1).Infof("read %s from tsid %d: no host", tract.Tract, tract.TSIDs[nOrder]) + resultCh <- tractResultRepl{ + tractResult: tractResult{0, 0, err, badVersionHost}, + } + return + } + select { + case <-ctx.Done(): + log.V(1).Infof("backup read %s-%d to tractserver cancelled", tract.Tract, nOrder) + resultCh <- tractResultRepl{ + tractResult: tractResult{0, 0, core.ErrCanceled, badVersionHost}, + } + case <-delayTimer: + resultCh <- cli.readOneTractWithResult(ctx, host, reqID, tract, len(thisB), thisOffset) + } + return +} + +func doParallelBackupReads( + ctx context.Context, + cli *Client, + reqID string, + tract *core.TractInfo, + thisB []byte, + thisOffset int64, + nReaders int, + order []int) <-chan tractResultRepl { + resultCh := make(chan tractResultRepl) + for n := 0; n < nReaders; n++ { + delayCh := time.After(time.Duration(n) * cli.backupReadState.BackupReadBehavior.Delay) + go cli.readOneTractBackupReplicated(ctx, reqID, tract, thisB, thisOffset, delayCh, resultCh, order[n]) + } + return resultCh +} + +func copyResult(thisB []byte, srcB []byte, result *tractResult, srcResult tractResult) { + copy(thisB, srcB) + // In case of short read, i.e., len(res.thisB) < len(thisB), we should + // pad thisB result with 0's. This doesn't need to be done + // explicitly if 'thisB' given to us starts to be empty. + // However, just in case it's not, we will ensure that by + // overwritting untouched bytes with 0's. + for i := len(srcB); i < len(thisB); i++ { + thisB[i] = 0 + } + *result = srcResult +} + func (cli *Client) readOneTractReplicated( ctx context.Context, curAddr string, @@ -1111,46 +1241,70 @@ func (cli *Client) readOneTractReplicated( thisB []byte, thisOffset int64) { + // TODO(eric): remove this var when we redo bad ts version reporting var badVersionHost string - order := rand.Perm(len(tract.Hosts)) + // This counter tracks how many backups have been issued and is used to mark which hosts + // to start sending reads to during the sequential read fallback phase. + var nStart int + reqID := core.GenRequestID() + order := cli.randOrderFunc(len(tract.Hosts)) + + ctx, cancel := context.WithCancel(ctx) + defer cancel() err := core.ErrAllocHost // default error if none present - for _, n := range order { + if cli.backupReadState.BackupReadBehavior.Enabled { + // Backup request logic: + // A single "level" of n reader goroutines will be spawned at the same time. + // Each reader does {sleep 0ms; read from host[order[0]]}, {sleep 2ms; read from host[order[1]}, + // {sleep 4ms; read from host[order[2]} and so on. Each reader is guaranteed + // to write a result to the resultCh channel, no matter if the reader has been + // cancelled or not. We then consume the same number of n messages since one message + // is produced per reader, scanning until we find a successful read. If we find a + // successful read, we copy the resulting bytes returned from the read operation into + // thisB passed in to this function. If all of the backup reads fail, we fallback to the + // a sequential phase starting at the nth host. + maxNumBackups := cli.backupReadState.BackupReadBehavior.MaxNumBackups + nReaders := min(maxNumBackups+1, len(tract.Hosts)) + nStart = nReaders + + // This function call should not block. + resultCh := cli.backupRequestFunc(ctx, cli, reqID, tract, thisB, thisOffset, nReaders, order) + + // Copy the first successful read data into thisB and continue to unblock resultCh. + // Return early only if we have a valid read. + done := false + for n := 0; n < nReaders; n++ { + res := <-resultCh + if res.tractResult.err == core.NoError && !done { + cancel() + cli.cancelDoneHookFunc() + copyResult(thisB, res.thisB, result, res.tractResult) + done = true + } + } + cli.backupPhaseDoneHookFunc() + if done { + return + } + } + + // Sequentially read from tract server replicas. This phase will run if backups are disabled from + // config state, or all of the backup reads issued return errors. + for _, n := range order[nStart:] { host := tract.Hosts[n] if host == "" { log.V(1).Infof("read %s from tsid %d: no host", tract.Tract, tract.TSIDs[n]) continue } - - var read int - read, err = cli.tractservers.ReadInto(ctx, host, tract.Tract, tract.Version, thisB, thisOffset) - if err == core.ErrVersionMismatch { - badVersionHost = host - } - if err != core.NoError && err != core.ErrEOF { - log.V(1).Infof("read %s from tractserver at address %s: %s", tract.Tract, host, err) - // If we failed to read from a TS, report that to the curator. Defer so we can examine - // result.err at the end, to see if we succeeded on another or not. - defer func(host string, err core.Error) { - couldRecover := result.err == core.NoError || result.err == core.ErrEOF - go cli.curators.ReportBadTS(context.Background(), curAddr, tract.Tract, host, "read", err, couldRecover) - }(host, err) - continue // try another host - } - log.V(1).Infof("read %s from tractserver at address %s: %s", tract.Tract, host, err) - // In case of short read, i.e., read < len(thisB), we should - // pad the result with 0's. This doesn't need to be done - // explicitly if 'thisB' given to us starts to be empty. - // However, just in case it's not, we will ensure that by - // overwritting untouched bytes with 0's. - for i := read; i < len(thisB); i++ { - thisB[i] = 0 + res := cli.readOneTractWithResult(ctx, host, reqID, tract, len(thisB), thisOffset) + if res.err != core.NoError && res.err != core.ErrEOF { + continue } - *result = tractResult{len(thisB), read, err, badVersionHost} + copyResult(thisB, res.thisB, result, res.tractResult) return } - log.V(1).Infof("read %s all hosts failed", tract.Tract) *result = tractResult{0, 0, err, badVersionHost} } @@ -1166,16 +1320,12 @@ func (cli *Client) readOneTractRS( rsTract := tract.RS.Chunk.ToTractID() length := min(len(thisB), int(tract.RS.Length)) offset := int64(tract.RS.Offset) + thisOffset - read, err := cli.tractservers.ReadInto(ctx, tract.RS.Host, rsTract, core.RSChunkVersion, thisB[:length], offset) + reqID := core.GenRequestID() + read, err := cli.tractservers.ReadInto(ctx, tract.RS.Host, nil, reqID, rsTract, core.RSChunkVersion, thisB[:length], offset) if err != core.NoError && err != core.ErrEOF { log.V(1).Infof("rs read %s from tractserver at address %s: %s", tract.Tract, tract.RS.Host, err) - // If we failed to read from a TS, report that to the curator. Defer so - // we can examine result.err at the end, to see if we recovered or not. - defer func(err core.Error) { - couldRecover := result.err == core.NoError || result.err == core.ErrEOF - go cli.curators.ReportBadTS(context.Background(), curAddr, rsTract, tract.RS.Host, "read", err, couldRecover) - }(err) + // TODO(eric): redo bad TS reporting mechanism. if !cli.shouldReconstruct(tract) { *result = tractResult{len(thisB), 0, err, ""} return @@ -1228,7 +1378,7 @@ func (cli *Client) statTract(ctx context.Context, tract core.TractInfo) (int64, return int64(tract.RS.Length), core.NoError } - order := rand.Perm(len(tract.Hosts)) + order := cli.randOrderFunc(len(tract.Hosts)) err := core.ErrAllocHost // default error if none present for _, n := range order { diff --git a/client/blb/client_test.go b/client/blb/client_test.go index 07ae999..64f5419 100644 --- a/client/blb/client_test.go +++ b/client/blb/client_test.go @@ -12,6 +12,7 @@ import ( "strings" "sync" "testing" + "time" "github.com/westerndigitalcorporation/blb/pkg/slices" @@ -54,6 +55,7 @@ func (log *tsTraceLog) check(t *testing.T, write bool, addr string, version, len } } else { // Reads randomly pick a host to read from so we don't verify the address. + // This is true if backup requests are disabled. Use this if that is the case. if e.write == write && e.version == version && e.length == length && @@ -65,9 +67,38 @@ func (log *tsTraceLog) check(t *testing.T, write bool, addr string, version, len t.Errorf("trace log missing entry %v %v %v %v %v", write, addr, version, length, off) } +// checkRead checks that the specified entry is present for a read. +// The tract id itself is embedded in addr (along with the index of the +// tractserver), so we don't bother checking id. +func (log *tsTraceLog) checkRead(t *testing.T, addr string, version, length int, off int64) { + for _, e := range log.log { + if !e.write && + e.addr == addr && + e.version == version && + e.length == length && + e.off == off { + return + } + } + t.Errorf("trace log missing entry %v %v %v %v", addr, version, length, off) +} + +func (log *tsTraceLog) checkReadAbsence(t *testing.T, addr string, version, length int, off int64) { + for _, e := range log.log { + if !e.write && + e.addr == addr && + e.version == version && + e.length == length && + e.off == off { + t.Errorf("trace must not contain entry %v %v %v %v", addr, version, length, off) + } + } +} + // newClient creates a Client suitable for testing. The trace function given // should return core.NoError for a read/write to proceed, and something else to -// inject an error. +// inject an error. The disableBackupReads parameter allows the backup read feature +// to be turned off when tests only need to send reads to a single tract. // Note: this mechanism doesn't provide a way to inject an error into a write // _and_ have the write reflected on the tractserver anyway. We can test that // later. @@ -86,7 +117,7 @@ func newClient(trace tsTraceFunc) *Client { // newTracingClient creates a Client suitable for testing. It's connected to a // fake master, three fake curators (each responsible for one partition), and as // many tractservers as there are tract copies (the fake curator places each -// copy on a separate fake tractserver). The trace log returned can be used to +// copy on a separate fake tractserver). The trace log returned can be used to // check the exact calls made to each tractserver. func newTracingClient() (*Client, *tsTraceLog) { traceLog := new(tsTraceLog) @@ -135,6 +166,22 @@ func testWriteRead(t *testing.T, blob *Blob, length int, off int64) { } } +// testWrite does a single write at the given length and offset. +func testWrite(t *testing.T, blob *Blob, length int, off int64) { + blob.Seek(off, os.SEEK_SET) + p1 := makeData(length) + checkWrite(t, blob, p1) +} + +// testRead tests a single read at a given length and offset. +func testRead(t *testing.T, blob *Blob, len int, off int64, done chan<- bool) { + p := make([]byte, core.TractLength) + n, err := blob.Read(p) + if err != nil || n != core.TractLength { + t.Fatal("error or short read", n, core.TractLength, err) + } +} + // testWriteReadInParts does a series of sequential writes at the given length // and offset, and then a series of sequential reads, and checks that the data // matches. @@ -257,6 +304,376 @@ func TestWriteReadTracedExactlyOneTract(t *testing.T) { trace.checkLength(t, 7) } +// setupBackupRequestFunc overrides spawning logic for backup reads, it will spawn nReaders +// goroutines that will block on respective delayChans. To release a reader one can write to the +// delayChan. +func setupBackupRequestFunc(cli *Client, nReaders int) []chan time.Time { + delayChans := make([]chan time.Time, nReaders) + for i := 0; i < nReaders; i++ { + delayChans[i] = make(chan time.Time) + } + + resultCh := make(chan tractResultRepl) + cli.backupRequestFunc = func( + ctx context.Context, + cli *Client, + reqID string, + tract *core.TractInfo, + thisB []byte, + thisOffset int64, + nReaders int, + order []int) <-chan tractResultRepl { + + for i, ch := range delayChans { + go func(i int, delayCh <-chan time.Time) { + cli.readOneTractBackupReplicated(ctx, reqID, tract, + thisB, thisOffset, delayChans[i], resultCh, order[i]) + }(i, ch) + } + return resultCh + } + return delayChans +} + +// setupBackupClient initializes a client that has backup requests enabled. +// it also overrides synchronization hooks on the client that allow for +// contol of read behavior operation ordering. +func (cli *Client) setupBackupClient(maxNumBackups int, overrideDelay bool) (<-chan bool, <-chan bool, <-chan bool) { + behavior := BackupReadBehavior{ + Enabled: true, + MaxNumBackups: maxNumBackups - 1, + Delay: 2 * time.Millisecond, + } + cli.backupReadState = makeBackupReadState(behavior) + rdone := make(chan bool) + bdone := make(chan bool) + cancelch := make(chan bool) + if overrideDelay { + cli.readDoneHookFunc = func() { rdone <- true } + cli.backupPhaseDoneHookFunc = func() { bdone <- true } + cli.cancelDoneHookFunc = func() { cancelch <- true } + } + cli.randOrderFunc = func(n int) []int { + order := make([]int, n) + for i := range order { + order[i] = i + } + return order + } + return rdone, bdone, cancelch +} + +// restoreTestFuncs restores the overrides created in setupBackupClient so other +// tests not relevant to backups work as is. +func (cli *Client) restoreTestFuncs() { + cli.randOrderFunc = rand.Perm + cli.backupRequestFunc = doParallelBackupReads + cli.readDoneHookFunc = func() {} + cli.backupPhaseDoneHookFunc = func() {} + cli.cancelDoneHookFunc = func() {} +} + +func TestBackupReadFirstFinishes(t *testing.T) { + cli, trace := newTracingClient() + defer cli.restoreTestFuncs() + nReaders := 3 + // TODO(eric): clean up nReaders and make it consistent with setupBackupRequestFunc. + rdone, bdone, cancelCh := cli.setupBackupClient(nReaders, true) + + // Write a blob and check the writes are logged. + blob := createBlob(t, cli) + testWrite(t, blob, core.TractLength, core.TractLength) + + trace.check(t, true, "ts-0000000100000001:0000-0", 1, 0, 0) + trace.check(t, true, "ts-0000000100000001:0000-1", 1, 0, 0) + trace.check(t, true, "ts-0000000100000001:0000-2", 1, 0, 0) + trace.check(t, true, "ts-0000000100000001:0001-0", 1, core.TractLength, 0) + trace.check(t, true, "ts-0000000100000001:0001-1", 1, core.TractLength, 0) + trace.check(t, true, "ts-0000000100000001:0001-2", 1, core.TractLength, 0) + + // setup overrides for backup reads and spawn nReaders that wait to fire rpcs. + delayChans := setupBackupRequestFunc(cli, nReaders) + + // Does the first request succeed before backup is sent work? + readDone := make(chan bool) + go func() { + blob.Seek(core.TractLength, os.SEEK_SET) + testRead(t, blob, core.TractLength, core.TractLength, readDone) + readDone <- true + }() + + // let the first request finish and check the first read is there. + delayChans[0] <- time.Time{} + <-rdone + trace.check(t, false, "ts-0000000100000001:0001-0", 1, core.TractLength, 0) + + // check if the second and third requests are cancelled, by rdone noop + // and the backup phase is done. + <-cancelCh + <-bdone + <-readDone + + trace.checkReadAbsence(t, "ts-0000000100000001:0001-1", 1, core.TractLength, 0) + trace.checkReadAbsence(t, "ts-0000000100000001:0001-2", 1, core.TractLength, 0) +} + +func TestBackupReadSecondFinishes(t *testing.T) { + cli, trace := newTracingClient() + defer cli.restoreTestFuncs() + + // Write a blob and check the writes are logged. + blob := createBlob(t, cli) + testWrite(t, blob, core.TractLength, core.TractLength) + + trace.check(t, true, "ts-0000000100000001:0000-0", 1, 0, 0) + trace.check(t, true, "ts-0000000100000001:0000-1", 1, 0, 0) + trace.check(t, true, "ts-0000000100000001:0000-2", 1, 0, 0) + trace.check(t, true, "ts-0000000100000001:0001-0", 1, core.TractLength, 0) + trace.check(t, true, "ts-0000000100000001:0001-1", 1, core.TractLength, 0) + trace.check(t, true, "ts-0000000100000001:0001-2", 1, core.TractLength, 0) + + rdone, bdone, cancelCh := cli.setupBackupClient(3, true) // 1 primary + 2 backup requests + delayChans := setupBackupRequestFunc(cli, 3) + readDone := make(chan bool) + go func() { + blob.Seek(core.TractLength, os.SEEK_SET) + testRead(t, blob, core.TractLength, core.TractLength, readDone) + readDone <- true + }() + + // let 2 go first + delayChans[1] <- time.Time{} + <-rdone + trace.checkRead(t, "ts-0000000100000001:0001-1", 1, core.TractLength, 0) + + // check if we cancel the others and the read completes. + <-cancelCh + <-bdone + <-readDone + + trace.checkReadAbsence(t, "ts-0000000100000001:0001-0", 1, core.TractLength, 0) + trace.checkReadAbsence(t, "ts-0000000100000001:0001-2", 1, core.TractLength, 0) +} + +func TestBackupReadThirdFinishes(t *testing.T) { + cli, trace := newTracingClient() + defer cli.restoreTestFuncs() + + // Write a blob and check the writes are logged. + blob := createBlob(t, cli) + testWrite(t, blob, core.TractLength, core.TractLength) + + trace.check(t, true, "ts-0000000100000001:0000-0", 1, 0, 0) + trace.check(t, true, "ts-0000000100000001:0000-1", 1, 0, 0) + trace.check(t, true, "ts-0000000100000001:0000-2", 1, 0, 0) + trace.check(t, true, "ts-0000000100000001:0001-0", 1, core.TractLength, 0) + trace.check(t, true, "ts-0000000100000001:0001-1", 1, core.TractLength, 0) + trace.check(t, true, "ts-0000000100000001:0001-2", 1, core.TractLength, 0) + + // Test one request per host, first request finishes + rdone, bdone, cancelCh := cli.setupBackupClient(3, true) // 2 backup requests + delayChans := setupBackupRequestFunc(cli, 3) + readDone := make(chan bool) + go func() { + blob.Seek(core.TractLength, os.SEEK_SET) + testRead(t, blob, core.TractLength, core.TractLength, readDone) + readDone <- true + }() + + // relase the thrid reader first + delayChans[2] <- time.Time{} + <-rdone + + trace.checkRead(t, "ts-0000000100000001:0001-2", 1, core.TractLength, 0) + + <-cancelCh + <-bdone + <-readDone + + trace.checkReadAbsence(t, "ts-0000000100000001:0001-0", 1, core.TractLength, 0) + trace.checkReadAbsence(t, "ts-0000000100000001:0001-1", 1, core.TractLength, 0) +} + +func TestReadFailoverErrOnFirst(t *testing.T) { + fail := func(e tsTraceEntry) core.Error { + // Reads from the first tractserver fails. + if !e.write && strings.HasSuffix(e.addr, "0") { + return core.ErrRPC + } + return core.NoError + } + cli := newClient(fail) + defer cli.restoreTestFuncs() + + // Write a blob and check the writes are logged. + blob := createBlob(t, cli) + testWrite(t, blob, core.TractLength, core.TractLength) + + // Test one request per host, first request finishes + rdone, bdone, cancelCh := cli.setupBackupClient(3, true) + delayChans := setupBackupRequestFunc(cli, 3) + readDone := make(chan bool) + go func() { + blob.Seek(core.TractLength, os.SEEK_SET) + testRead(t, blob, core.TractLength, core.TractLength, readDone) + readDone <- true + }() + + // relase the first request first + delayChans[0] <- time.Time{} + <-rdone + + // since the first one returns an error, try the next. + delayChans[1] <- time.Time{} + <-rdone + + // this one passes, so it will cancel the last request. + <-cancelCh + <-bdone + <-readDone +} + +func TestReadFailoverErrOnSecond(t *testing.T) { + fail := func(e tsTraceEntry) core.Error { + // Reads from the first tractserver fails. + if !e.write && strings.HasSuffix(e.addr, "1") { + return core.ErrRPC + } + return core.NoError + } + cli := newClient(fail) + defer cli.restoreTestFuncs() + + // Write a blob and check the writes are logged. + blob := createBlob(t, cli) + testWrite(t, blob, core.TractLength, core.TractLength) + + // Test one request per host, first request finishes + rdone, bdone, cancelCh := cli.setupBackupClient(3, true) // 2 backup requests + delayChans := setupBackupRequestFunc(cli, 3) + readDone := make(chan bool) + go func() { + blob.Seek(core.TractLength, os.SEEK_SET) + testRead(t, blob, core.TractLength, core.TractLength, readDone) + readDone <- true + }() + + // backup request will return an error before the first one returns success. + delayChans[1] <- time.Time{} + <-rdone + + // since the first one returns an error, try the next. + delayChans[0] <- time.Time{} + <-rdone + + // this one passes, so it will cancel the last request. + <-cancelCh + <-bdone + <-readDone +} + +func TestReadFailoverErrOnThird(t *testing.T) { + fail := func(e tsTraceEntry) core.Error { + // Reads from the first tractserver fails. + if !e.write && strings.HasSuffix(e.addr, "2") { + return core.ErrRPC + } + return core.NoError + } + cli := newClient(fail) + defer cli.restoreTestFuncs() + + // Write a blob and check the writes are logged. + blob := createBlob(t, cli) + testWrite(t, blob, core.TractLength, core.TractLength) + + // Test one request per host, first request finishes + rdone, bdone, cancelCh := cli.setupBackupClient(3, true) // 2 backup requests + delayChans := setupBackupRequestFunc(cli, 3) + readDone := make(chan bool) + go func() { + blob.Seek(core.TractLength, os.SEEK_SET) + testRead(t, blob, core.TractLength, core.TractLength, readDone) + readDone <- true + }() + + // second backup request will return an error before the first one returns success. + delayChans[2] <- time.Time{} + <-rdone + + // since the first one returns an error, try the next. + delayChans[0] <- time.Time{} + <-rdone + + // this one passes, so it will cancel the last request. + <-cancelCh + <-bdone + <-readDone +} + +func TestBackupFailoverFallback(t *testing.T) { + fail := func(e tsTraceEntry) core.Error { + // Reads from the first two tractservers fail. + if !e.write && !strings.HasSuffix(e.addr, "2") { + return core.ErrRPC + } + return core.NoError + } + cli := newClient(fail) + defer cli.restoreTestFuncs() + + // Write a blob and check the writes are logged. + blob := createBlob(t, cli) + testWrite(t, blob, core.TractLength, core.TractLength) + + // Test one request per host, first request finishes + rdone, bdone, _ := cli.setupBackupClient(2, true) // 2 backup requests + delayChans := setupBackupRequestFunc(cli, 2) + readDone := make(chan bool) + go func() { + blob.Seek(core.TractLength, os.SEEK_SET) + testRead(t, blob, core.TractLength, core.TractLength, readDone) + readDone <- true + }() + + // Do we fallback if the first request returns an error before the backup + // returns an error? + // second backup request will return an error before the first one returns success. + delayChans[0] <- time.Time{} + <-rdone + + // since the first one returns an error, try the next. + delayChans[1] <- time.Time{} + <-rdone + + // omit cancel since none of the backups pass and check that backup phase completes. + // Also check that a read happens from the sequential phase. + <-bdone + <-rdone + <-readDone + + // Do we fallback if the backup request returns an error before the first one + // returns an error? + go func() { + blob.Seek(core.TractLength, os.SEEK_SET) + testRead(t, blob, core.TractLength, core.TractLength, readDone) + readDone <- true + }() + // second backup request will return an error before the first one returns success. + delayChans[1] <- time.Time{} + <-rdone + + // since the first one returns an error, try the next. + delayChans[0] <- time.Time{} + + // omit cancel since none of the backups pass and check that backup phase completes. + // Also check that a read happens from the sequential phase. + <-rdone + <-bdone + <-rdone + <-readDone +} + func TestWriteFailSimple(t *testing.T) { fail := func(e tsTraceEntry) core.Error { // All writes to last tractserver fail diff --git a/client/blb/mem_tractserver_talker.go b/client/blb/mem_tractserver_talker.go index cefdc11..d0e140a 100644 --- a/client/blb/mem_tractserver_talker.go +++ b/client/blb/mem_tractserver_talker.go @@ -5,8 +5,9 @@ package blb import ( "context" - "github.com/westerndigitalcorporation/blb/internal/core" "sync" + + "github.com/westerndigitalcorporation/blb/internal/core" ) // A tsTraceEntry contains the args for a read or write on a tractserver (but @@ -39,7 +40,7 @@ type memTractserverTalker struct { // Create creates a new tract on the tractserver and does a write to the newly // created tract. -func (tt *memTractserverTalker) Create(ctx context.Context, addr string, tsid core.TractserverID, id core.TractID, b []byte, off int64) core.Error { +func (tt *memTractserverTalker) Create(ctx context.Context, addr, reqID string, tsid core.TractserverID, id core.TractID, b []byte, off int64) core.Error { tt.lock.Lock() ts := tt.getTractserver(addr) _, ok := ts.versions[id] @@ -49,11 +50,11 @@ func (tt *memTractserverTalker) Create(ctx context.Context, addr string, tsid co } ts.versions[id] = 1 tt.lock.Unlock() - return tt.Write(context.Background(), addr, id, 1, b, off) + return tt.Write(context.Background(), addr, reqID, id, 1, b, off) } // Write writes the given data to a tract. -func (tt *memTractserverTalker) Write(ctx context.Context, addr string, id core.TractID, version int, b []byte, off int64) core.Error { +func (tt *memTractserverTalker) Write(ctx context.Context, addr, reqID string, id core.TractID, version int, b []byte, off int64) core.Error { tt.lock.Lock() if version < 0 { return core.ErrBadVersion @@ -86,7 +87,7 @@ func (tt *memTractserverTalker) Write(ctx context.Context, addr string, id core. } // Read reads from a given tract. -func (tt *memTractserverTalker) Read(ctx context.Context, addr string, id core.TractID, version int, length int, off int64) ([]byte, core.Error) { +func (tt *memTractserverTalker) Read(ctx context.Context, addr string, otherHosts []string, reqID string, id core.TractID, version int, length int, off int64) ([]byte, core.Error) { if version < 0 { return nil, core.ErrBadVersion } @@ -123,8 +124,8 @@ func copySlice(b []byte) []byte { } // ReadInto reads from a given tract. -func (tt *memTractserverTalker) ReadInto(ctx context.Context, addr string, id core.TractID, version int, b []byte, off int64) (int, core.Error) { - r, err := tt.Read(ctx, addr, id, version, len(b), off) +func (tt *memTractserverTalker) ReadInto(ctx context.Context, addr string, otherHosts []string, reqID string, id core.TractID, version int, b []byte, off int64) (int, core.Error) { + r, err := tt.Read(ctx, addr, otherHosts, reqID, id, version, len(b), off) return copy(b, r), err } diff --git a/client/blb/reconstruct.go b/client/blb/reconstruct.go index fc16dfd..90dd681 100644 --- a/client/blb/reconstruct.go +++ b/client/blb/reconstruct.go @@ -119,11 +119,13 @@ func (cli *Client) reconstructOneTract( nctx, cancel := context.WithCancel(ctx) for _, i := range requests { go func(i int) { + // reqID allows ts-ts cancellation of reads. + reqID := core.GenRequestID() rsTract := tract.RS.BaseChunk.Add(i).ToTractID() p := piece{idx: i} // Don't use ReadInto here, because that would require us to pre-allocate all the // memory, instead of having rpc/gob allocate it for us when a response comes in. - p.res, p.err = cli.tractservers.Read(nctx, tract.RS.OtherHosts[i], rsTract, core.RSChunkVersion, length, offset) + p.res, p.err = cli.tractservers.Read(nctx, tract.RS.OtherHosts[i], nil, reqID, rsTract, core.RSChunkVersion, length, offset) // length is already clipped to fall within the RS piece, so short // reads are unexpected here. Treat them as an error. if (p.err == core.NoError || p.err == core.ErrEOF) && len(p.res) != length { diff --git a/client/blb/rpc_tractserver_talker.go b/client/blb/rpc_tractserver_talker.go index 76661c1..396858d 100644 --- a/client/blb/rpc_tractserver_talker.go +++ b/client/blb/rpc_tractserver_talker.go @@ -5,6 +5,7 @@ package blb import ( "context" + log "github.com/golang/glog" "github.com/westerndigitalcorporation/blb/internal/core" @@ -27,7 +28,7 @@ func NewRPCTractserverTalker() TractserverTalker { } // Create creates a new tract on the tractserver and does a write to the newly created tract. -func (r *RPCTractserverTalker) Create(ctx context.Context, addr string, tsid core.TractserverID, id core.TractID, b []byte, off int64) core.Error { +func (r *RPCTractserverTalker) Create(ctx context.Context, addr, reqID string, tsid core.TractserverID, id core.TractID, b []byte, off int64) core.Error { pri := priorityFromContext(ctx) req := core.CreateTractReq{TSID: tsid, ID: id, Off: off, Pri: pri} req.Set(b, false) @@ -43,13 +44,12 @@ func (r *RPCTractserverTalker) Create(ctx context.Context, addr string, tsid cor } // Write does a write to a tract on this tractserver. -func (r *RPCTractserverTalker) Write(ctx context.Context, addr string, id core.TractID, version int, b []byte, off int64) core.Error { +func (r *RPCTractserverTalker) Write(ctx context.Context, addr, reqID string, id core.TractID, version int, b []byte, off int64) core.Error { pri := priorityFromContext(ctx) - rpcid := rpc.GenID() - req := core.WriteReq{ID: id, Version: version, Off: off, Pri: pri, ReqID: rpcid} + req := core.WriteReq{ID: id, Version: version, Off: off, Pri: pri, ReqID: reqID} req.Set(b, false) var reply core.Error - cancel := rpc.CancelAction{Method: core.CancelReqMethod, Req: rpcid} + cancel := rpc.CancelAction{Method: core.CancelReqMethod, Req: reqID} if err := r.cc.SendWithCancel(ctx, addr, core.WriteMethod, &req, &reply, &cancel); err != nil { log.Errorf("Write RPC error for tract (id: %s, version: %d, offset: %d) on tractserver @%s: %s", id, version, off, addr, err) return core.ErrRPC @@ -61,12 +61,11 @@ func (r *RPCTractserverTalker) Write(ctx context.Context, addr string, id core.T } // Read reads from a tract. -func (r *RPCTractserverTalker) Read(ctx context.Context, addr string, id core.TractID, version int, len int, off int64) ([]byte, core.Error) { +func (r *RPCTractserverTalker) Read(ctx context.Context, addr string, otherHosts []string, reqID string, id core.TractID, version int, len int, off int64) ([]byte, core.Error) { pri := priorityFromContext(ctx) - rpcid := rpc.GenID() - req := core.ReadReq{ID: id, Version: version, Len: len, Off: off, Pri: pri, ReqID: rpcid} + req := core.ReadReq{ID: id, Version: version, Len: len, Off: off, Pri: pri, ReqID: reqID, OtherHosts: otherHosts} var reply core.ReadReply - cancel := rpc.CancelAction{Method: core.CancelReqMethod, Req: rpcid} + cancel := rpc.CancelAction{Method: core.CancelReqMethod, Req: reqID} if err := r.cc.SendWithCancel(ctx, addr, core.ReadMethod, req, &reply, &cancel); err != nil { log.Errorf("Read RPC error for tract (id: %s, version: %d, length: %d, offset: %d) on tractserver @%s: %s", id, version, len, off, addr, err) return nil, core.ErrRPC @@ -79,15 +78,14 @@ func (r *RPCTractserverTalker) Read(ctx context.Context, addr string, id core.Tr } // ReadInto reads from a tract into a provided slice without copying. -func (r *RPCTractserverTalker) ReadInto(ctx context.Context, addr string, id core.TractID, version int, b []byte, off int64) (int, core.Error) { +func (r *RPCTractserverTalker) ReadInto(ctx context.Context, addr string, otherHosts []string, reqID string, id core.TractID, version int, b []byte, off int64) (int, core.Error) { pri := priorityFromContext(ctx) - rpcid := rpc.GenID() - req := core.ReadReq{ID: id, Version: version, Len: len(b), Off: off, Pri: pri, ReqID: rpcid} + req := core.ReadReq{ID: id, Version: version, Len: len(b), Off: off, Pri: pri, ReqID: reqID, OtherHosts: otherHosts} // Gob will decode into a provided slice if there's enough capacity. Give it b, // but reset the cap to len so it can't go past our segment. var reply core.ReadReply reply.Set(b[0:0:len(b)], false) - cancel := rpc.CancelAction{Method: core.CancelReqMethod, Req: rpcid} + cancel := rpc.CancelAction{Method: core.CancelReqMethod, Req: reqID} if err := r.cc.SendWithCancel(ctx, addr, core.ReadMethod, req, &reply, &cancel); err != nil { log.Errorf("Read RPC error for tract (id: %s, version: %d, length: %d, offset: %d) on tractserver @%s: %s", id, version, len(b), off, addr, err) return 0, core.ErrRPC diff --git a/client/blb/tractserver_talker.go b/client/blb/tractserver_talker.go index c61a82e..94bef57 100644 --- a/client/blb/tractserver_talker.go +++ b/client/blb/tractserver_talker.go @@ -5,6 +5,7 @@ package blb import ( "context" + "github.com/westerndigitalcorporation/blb/internal/core" ) @@ -12,17 +13,17 @@ import ( type TractserverTalker interface { // Create creates a new tract on the tractserver and does a write to the // newly created tract. - Create(ctx context.Context, addr string, tsid core.TractserverID, id core.TractID, b []byte, off int64) core.Error + Create(ctx context.Context, addr, reqID string, tsid core.TractserverID, id core.TractID, b []byte, off int64) core.Error // Write does a write to a tract on this tractserver. - Write(ctx context.Context, addr string, id core.TractID, version int, b []byte, off int64) core.Error + Write(ctx context.Context, addr, reqID string, id core.TractID, version int, b []byte, off int64) core.Error // Read reads from a tract. - Read(ctx context.Context, addr string, id core.TractID, version int, len int, off int64) ([]byte, core.Error) + Read(ctx context.Context, addr string, otherHosts []string, reqID string, id core.TractID, version int, len int, off int64) ([]byte, core.Error) // ReadInto reads from a tract. It will try to read up to len(b) bytes and put them in b. // It returns the number of bytes read, as in io.Reader's Read. - ReadInto(ctx context.Context, addr string, id core.TractID, version int, b []byte, off int64) (int, core.Error) + ReadInto(ctx context.Context, addr string, otherHosts []string, reqID string, id core.TractID, version int, b []byte, off int64) (int, core.Error) // StatTract returns the number of bytes in a tract. StatTract(ctx context.Context, addr string, id core.TractID, version int) (int64, core.Error) diff --git a/internal/core/genreqid.go b/internal/core/genreqid.go new file mode 100644 index 0000000..d11e3e5 --- /dev/null +++ b/internal/core/genreqid.go @@ -0,0 +1,30 @@ +// SPDX-License-Identifier: MIT + +package core + +import ( + "crypto/rand" + "encoding/base64" + "strconv" + "sync/atomic" +) + +var ( + clientIDPrefix = makePrefix() + seqNum uint64 +) + +func makePrefix() string { + buf := make([]byte, 15) + rand.Read(buf) + return base64.StdEncoding.EncodeToString(buf) +} + +// GenRequestID returns a unique string to be used as a request id for cancellation. This +// implementation works by using 120 random bits as a process identitifer combined with +// 64 bits of a sequence number. The values that it produces are printable (though things +// should work regadless). +func GenRequestID() string { + id := atomic.AddUint64(&seqNum, 1) + return clientIDPrefix + strconv.FormatUint(id, 36) +} diff --git a/internal/core/tractserver_messages.go b/internal/core/tractserver_messages.go index 477486b..6837641 100644 --- a/internal/core/tractserver_messages.go +++ b/internal/core/tractserver_messages.go @@ -227,6 +227,9 @@ type ReadReq struct { // ID for cancellation. ReqID string + + // Other replicas that can serve alternate reads. + OtherHosts []string } // ReadReply is the reply for ReadReq. diff --git a/pkg/rpc/genid.go b/pkg/rpc/genid.go deleted file mode 100644 index b5b6df4..0000000 --- a/pkg/rpc/genid.go +++ /dev/null @@ -1,31 +0,0 @@ -// Copyright (c) 2017 Western Digital Corporation or its affiliates. All rights reserved. -// SPDX-License-Identifier: MIT - -package rpc - -import ( - "crypto/rand" - "encoding/base64" - "strconv" - "sync/atomic" -) - -var ( - processIDPrefix = makePrefix() - nextID uint64 -) - -func makePrefix() string { - buf := make([]byte, 15) - rand.Read(buf) - return base64.StdEncoding.EncodeToString(buf) -} - -// GenID returns a unique string to be used as an RPC id for cancellation. This -// implementation works by using 120 random bits as a process identifier -// combined with 64 bits of sequence number. The values that it produces are -// printable (though things should work regardless). -func GenID() string { - id := atomic.AddUint64(&nextID, 1) - return processIDPrefix + strconv.FormatUint(id, 36) -}