Skip to content

Commit

Permalink
test: add test cases to verify consistent reading right after writing
Browse files Browse the repository at this point in the history
Signed-off-by: Benjamin Wang <[email protected]>
  • Loading branch information
ahrtr committed Oct 2, 2023
1 parent df9a814 commit 7a812fa
Showing 1 changed file with 95 additions and 0 deletions.
95 changes: 95 additions & 0 deletions tests/e2e/ctl_v3_kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,107 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/pkg/v3/expect"
"go.etcd.io/etcd/tests/v3/framework/e2e"
)

func TestCtlV3SerializableRead(t *testing.T) {
testCtlV3ReadAfterWrite(t, clientv3.WithSerializable())
}

func TestCtlV3LinearizableRead(t *testing.T) {
testCtlV3ReadAfterWrite(t)
}

func testCtlV3ReadAfterWrite(t *testing.T, ops ...clientv3.OpOption) {
e2e.BeforeTest(t)

ctx := context.Background()

epc, err := e2e.NewEtcdProcessCluster(ctx, t,
e2e.WithClusterSize(1),
e2e.WithWaitClusterReadyTimeout(1*time.Nanosecond),
e2e.WithEnvVars(map[string]string{"GOFAIL_FAILPOINTS": `raftBeforeSave=sleep("200ms");beforeCommit=sleep("200ms")`}),
)
require.NoError(t, err, "failed to start etcd cluster: %v", err)
defer func() {
derr := epc.Close()
require.NoError(t, derr, "failed to close etcd cluster: %v", derr)
}()

cc, err := clientv3.New(clientv3.Config{
Endpoints: epc.EndpointsGRPC(),
DialKeepAliveTime: 5 * time.Second,
DialKeepAliveTimeout: 1 * time.Second,
})
require.NoError(t, err)
defer func() {
derr := cc.Close()
require.NoError(t, derr)
}()

_, err = cc.Put(ctx, "foo", "bar")
require.NoError(t, err)

// Refer to https://github.com/etcd-io/etcd/pull/16658#discussion_r1341346778
t.Log("Restarting the etcd process to ensure all data is persisted")
err = epc.Procs[0].Restart(ctx)
require.NoError(t, err)
epc.WaitLeader(t)

_, err = cc.Put(ctx, "foo", "bar2")
require.NoError(t, err)

t.Log("Killing the etcd process right after successfully writing a new key/value")
err = epc.Procs[0].Kill()
require.NoError(t, err)
err = epc.Procs[0].Wait(ctx)
require.NoError(t, err)

stopc := make(chan struct{}, 1)
donec := make(chan struct{}, 1)

t.Log("Starting a goroutine to repeatedly read the key/value")
count := 0
go func() {
defer func() {
donec <- struct{}{}
}()
for {
select {
case <-stopc:
return
default:
}

rctx, cancel := context.WithTimeout(ctx, 2*time.Second)
resp, rerr := cc.Get(rctx, "foo", ops...)
cancel()
if rerr != nil {
continue
}

count++
require.Equal(t, "bar2", string(resp.Kvs[0].Value))
}
}()

t.Log("Starting the etcd process again")
err = epc.Procs[0].Start(ctx)
require.NoError(t, err)

time.Sleep(3 * time.Second)
stopc <- struct{}{}

<-donec
assert.Greater(t, count, 0)
t.Logf("Checked the key/value %d times", count)
}

func TestCtlV3PutTimeout(t *testing.T) { testCtl(t, putTest, withDialTimeout(0)) }
func TestCtlV3PutClientTLSFlagByEnv(t *testing.T) {
testCtl(t, putTest, withCfg(*e2e.NewConfigClientTLS()), withFlagByEnv())
Expand Down

0 comments on commit 7a812fa

Please sign in to comment.