From b682a455750b220279e37e8d3474d9b3a70c8eda Mon Sep 17 00:00:00 2001 From: Benjamin Wang Date: Mon, 2 Oct 2023 13:48:40 +0100 Subject: [PATCH] test: add test cases to verify consistent reading right after writing Signed-off-by: Benjamin Wang --- tests/e2e/ctl_v3_kv_test.go | 93 +++++++++++++++++++++++++++++++++++++ 1 file changed, 93 insertions(+) diff --git a/tests/e2e/ctl_v3_kv_test.go b/tests/e2e/ctl_v3_kv_test.go index a24103e57a27..c3a7e88d6cee 100644 --- a/tests/e2e/ctl_v3_kv_test.go +++ b/tests/e2e/ctl_v3_kv_test.go @@ -20,12 +20,105 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/pkg/v3/expect" "go.etcd.io/etcd/tests/v3/framework/e2e" ) +func TestCtlV3SerializableRead(t *testing.T) { + testCtlV3ReadAfterWrite(t, clientv3.WithSerializable()) +} + +func TestCtlV3LinearizableRead(t *testing.T) { + testCtlV3ReadAfterWrite(t) +} + +func testCtlV3ReadAfterWrite(t *testing.T, ops ...clientv3.OpOption) { + e2e.BeforeTest(t) + + ctx := context.Background() + + epc, err := e2e.NewEtcdProcessCluster(ctx, t, + e2e.WithClusterSize(1), + e2e.WithWaitClusterReadyTimeout(1*time.Nanosecond), + e2e.WithEnvVars(map[string]string{"GOFAIL_FAILPOINTS": `raftBeforeSave=sleep("200ms");beforeCommit=sleep("200ms")`}), + ) + require.NoError(t, err, "failed to start etcd cluster: %v", err) + defer func() { + derr := epc.Close() + require.NoError(t, derr, "failed to close etcd cluster: %v", derr) + }() + + cc, err := clientv3.New(clientv3.Config{ + Endpoints: epc.EndpointsGRPC(), + DialKeepAliveTime: 5 * time.Second, + DialKeepAliveTimeout: 1 * time.Second, + }) + require.NoError(t, err) + defer func() { + derr := cc.Close() + require.NoError(t, derr) + }() + + _, err = cc.Put(ctx, "foo", "bar") + require.NoError(t, err) + + // Refer to https://github.com/etcd-io/etcd/pull/16658#discussion_r1341346778 + t.Log("Restarting the etcd process to ensure all data is persisted") + err = epc.Procs[0].Restart(ctx) + require.NoError(t, err) + epc.WaitLeader(t) + + _, err = cc.Put(ctx, "foo", "bar2") + require.NoError(t, err) + + t.Log("Killing the etcd process right after successfully writing a new key/value") + err = epc.Procs[0].Kill() + require.NoError(t, err) + err = epc.Procs[0].Wait(ctx) + require.NoError(t, err) + + stopc := make(chan struct{}, 1) + donec := make(chan struct{}, 1) + + t.Log("Starting a goroutine to repeatedly read the key/value") + count := 0 + go func() { + defer func() { + donec <- struct{}{} + }() + for { + select { + case <-stopc: + return + default: + } + + resp, rerr := cc.Get(ctx, "foo", ops...) + if rerr != nil { + continue + } + + count++ + require.Equal(t, "bar2", string(resp.Kvs[0].Value)) + } + }() + + t.Log("Starting the etcd process again") + err = epc.Procs[0].Start(ctx) + require.NoError(t, err) + + time.Sleep(3 * time.Second) + stopc <- struct{}{} + + <-donec + assert.Greater(t, count, 0) + t.Logf("Checked the key/value %d times", count) +} + func TestCtlV3PutTimeout(t *testing.T) { testCtl(t, putTest, withDialTimeout(0)) } func TestCtlV3PutClientTLSFlagByEnv(t *testing.T) { testCtl(t, putTest, withCfg(*e2e.NewConfigClientTLS()), withFlagByEnv())