diff --git a/tests/e2e/corrupt_test.go b/tests/e2e/corrupt_test.go index 52023bc80d6..55b31336ba3 100644 --- a/tests/e2e/corrupt_test.go +++ b/tests/e2e/corrupt_test.go @@ -364,3 +364,95 @@ func TestCompactHashCheckDetectCorruptionInterrupt(t *testing.T) { } t.Log("no corruption detected.") } + +func TestCtlV3SerializableRead(t *testing.T) { + testCtlV3ReadAfterWrite(t, clientv3.WithSerializable()) +} + +func TestCtlV3LinearizableRead(t *testing.T) { + testCtlV3ReadAfterWrite(t) +} + +func testCtlV3ReadAfterWrite(t *testing.T, ops ...clientv3.OpOption) { + e2e.BeforeTest(t) + + ctx := context.Background() + + epc, err := e2e.NewEtcdProcessCluster(ctx, t, + e2e.WithClusterSize(1), + e2e.WithEnvVars(map[string]string{"GOFAIL_FAILPOINTS": `raftBeforeSave=sleep("200ms");beforeCommit=sleep("200ms")`}), + ) + require.NoError(t, err, "failed to start etcd cluster: %v", err) + defer func() { + derr := epc.Close() + require.NoError(t, derr, "failed to close etcd cluster: %v", derr) + }() + + cc, err := clientv3.New(clientv3.Config{ + Endpoints: epc.EndpointsGRPC(), + DialKeepAliveTime: 5 * time.Second, + DialKeepAliveTimeout: 1 * time.Second, + }) + require.NoError(t, err) + defer func() { + derr := cc.Close() + require.NoError(t, derr) + }() + + _, err = cc.Put(ctx, "foo", "bar") + require.NoError(t, err) + + // Refer to https://github.com/etcd-io/etcd/pull/16658#discussion_r1341346778 + t.Log("Restarting the etcd process to ensure all data is persisted") + err = epc.Procs[0].Restart(ctx) + require.NoError(t, err) + epc.WaitLeader(t) + + _, err = cc.Put(ctx, "foo", "bar2") + require.NoError(t, err) + + t.Log("Killing the etcd process right after successfully writing a new key/value") + err = epc.Procs[0].Kill() + require.NoError(t, err) + err = epc.Procs[0].Wait(ctx) + require.NoError(t, err) + + stopc := make(chan struct{}, 1) + donec := make(chan struct{}, 1) + + t.Log("Starting a goroutine to repeatedly read the key/value") + count := 0 + go func() { + defer func() { + donec <- struct{}{} + }() + for { + select { + case <-stopc: + return + default: + } + + rctx, cancel := context.WithTimeout(ctx, 2*time.Second) + resp, rerr := cc.Get(rctx, "foo", ops...) + cancel() + if rerr != nil { + continue + } + + count++ + require.Equal(t, "bar2", string(resp.Kvs[0].Value)) + } + }() + + t.Log("Starting the etcd process again") + err = epc.Procs[0].Start(ctx) + require.NoError(t, err) + + time.Sleep(3 * time.Second) + stopc <- struct{}{} + + <-donec + assert.Greater(t, count, 0) + t.Logf("Checked the key/value %d times", count) +}