diff --git a/tests/e2e/watch_delay_test.go b/tests/e2e/watch_test.go similarity index 68% rename from tests/e2e/watch_delay_test.go rename to tests/e2e/watch_test.go index 3aac45c034d..d911736eced 100644 --- a/tests/e2e/watch_delay_test.go +++ b/tests/e2e/watch_test.go @@ -26,6 +26,7 @@ import ( "time" "github.com/stretchr/testify/require" + "go.etcd.io/etcd/api/v3/mvccpb" clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/tests/v3/framework/e2e" "golang.org/x/sync/errgroup" @@ -230,3 +231,92 @@ func continuouslyExecuteGetAll(ctx context.Context, t *testing.T, g *errgroup.Gr return nil }) } + +// TestDeleteEventDrop_Issue18089 is an e2e test to reproduce the issue reported in: https://github.com/etcd-io/etcd/issues/18089 +// +// The goal is to reproduce a DELETE event being dropped in a watch after a compaction +// occurs on the revision where the deletion took place. In order to reproduce this, we +// perform the following sequence (steps for reproduction thanks to @ahrtr): +// - PUT k v2 (assume returned revision = r2) +// - PUT k v3 (assume returned revision = r3) +// - PUT k v4 (assume returned revision = r4) +// - DELETE k (assume returned revision = r5) +// - PUT k v6 (assume returned revision = r6) +// - COMPACT r5 +// - WATCH rev=r5 +// +// We should get the DELETE event (r5) followed by the PUT event (r6). However, currently we only +// get the PUT event with returned revision of r6 (key=k, val=v6). +func TestDeleteEventDrop_Issue18089(t *testing.T) { + e2e.BeforeTest(t) + + cfg := e2e.EtcdProcessClusterConfig{ + ClusterSize: 1, + IsClientAutoTLS: true, + ClientTLS: e2e.ClientTLS, + } + + clus, err := e2e.NewEtcdProcessCluster(t, &cfg) + require.NoError(t, err) + defer clus.Close() + + c := newClient(t, clus.EndpointsGRPC(), cfg.ClientTLS, cfg.IsClientAutoTLS) + defer c.Close() + + ctx := context.Background() + const ( + key = "k" + v2 = "v2" + v3 = "v3" + v4 = "v4" + v6 = "v6" + ) + + t.Logf("PUT key=%s, val=%s", key, v2) + _, err = c.KV.Put(ctx, key, v2) + require.NoError(t, err) + + t.Logf("PUT key=%s, val=%s", key, v3) + _, err = c.KV.Put(ctx, key, v3) + require.NoError(t, err) + + t.Logf("PUT key=%s, val=%s", key, v4) + _, err = c.KV.Put(ctx, key, v4) + require.NoError(t, err) + + t.Logf("DELTE key=%s", key) + deleteResp, err := c.KV.Delete(ctx, key) + require.NoError(t, err) + + t.Logf("PUT key=%s, val=%s", key, v6) + _, err = c.KV.Put(ctx, key, v6) + require.NoError(t, err) + + t.Logf("COMPACT rev=%d", deleteResp.Header.Revision) + _, err = c.KV.Compact(ctx, deleteResp.Header.Revision, clientv3.WithCompactPhysical()) + require.NoError(t, err) + + watchChan := c.Watch(ctx, key, clientv3.WithRev(deleteResp.Header.Revision)) + select { + case watchResp := <-watchChan: + // TODO(MadhavJivrajani): update conditions once https://github.com/etcd-io/etcd/issues/18089 + // is resolved. The existing conditions do not mimic the desired behaviour and are there to + // test and reproduce etcd-io/etcd#18089. + if len(watchResp.Events) != 1 { + t.Fatalf("expected exactly one event in response, got: %d", len(watchResp.Events)) + } + if watchResp.Events[0].Type != mvccpb.PUT { + t.Fatalf("unexpected event type, expected: %s, got: %s", mvccpb.PUT, watchResp.Events[0].Type) + } + if string(watchResp.Events[0].Kv.Key) != key { + t.Fatalf("unexpected key, expected: %s, got: %s", key, string(watchResp.Events[0].Kv.Key)) + } + if string(watchResp.Events[0].Kv.Value) != v6 { + t.Fatalf("unexpected valye, expected: %s, got: %s", v6, string(watchResp.Events[0].Kv.Value)) + } + case <-time.After(100 * time.Millisecond): + // we care only about the first response, but have an + // escape hatch in case the watch response is delayed. + t.Fatal("timed out getting watch response") + } +}