Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure Count in GET response is consistent with key filtering by revision #18268

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion etcdctl/ctlv3/ctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ func init() {
rootCmd.PersistentFlags().StringSliceVar(&globalFlags.Endpoints, "endpoints", []string{"127.0.0.1:2379"}, "gRPC endpoints")
rootCmd.PersistentFlags().BoolVar(&globalFlags.Debug, "debug", false, "enable client-side debug logging")

rootCmd.PersistentFlags().StringVarP(&globalFlags.OutputFormat, "write-out", "w", "simple", "set the output format (fields, json, protobuf, simple, table)")
rootCmd.PersistentFlags().StringVarP(&globalFlags.OutputFormat, "write-out", "w", "simple",
"set the output format (fields, json, protobuf, simple, table); note json encodes kvs as base64")
rootCmd.PersistentFlags().BoolVar(&globalFlags.IsHex, "hex", false, "print byte strings as hex encoded strings")
rootCmd.RegisterFlagCompletionFunc("write-out", func(_ *cobra.Command, _ []string, _ string) ([]string, cobra.ShellCompDirective) {
return []string{"fields", "json", "protobuf", "simple", "table"}, cobra.ShellCompDirectiveDefault
Expand Down
3 changes: 2 additions & 1 deletion etcdutl/ctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ var (
)

func init() {
rootCmd.PersistentFlags().StringVarP(&etcdutl.OutputFormat, "write-out", "w", "simple", "set the output format (fields, json, protobuf, simple, table)")
rootCmd.PersistentFlags().StringVarP(&etcdutl.OutputFormat, "write-out", "w", "simple",
"set the output format (fields, json, protobuf, simple, table); note json encodes kvs as base64")
rootCmd.RegisterFlagCompletionFunc("write-out", func(_ *cobra.Command, _ []string, _ string) ([]string, cobra.ShellCompDirective) {
return []string{"fields", "json", "protobuf", "simple", "table"}, cobra.ShellCompDirectiveDefault
})
Expand Down
55 changes: 45 additions & 10 deletions server/etcdserver/txn/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,6 @@ func Range(ctx context.Context, lg *zap.Logger, kv mvcc.KV, r *pb.RangeRequest)
func executeRange(ctx context.Context, lg *zap.Logger, txnRead mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) {
trace := traceutil.Get(ctx)

resp := &pb.RangeResponse{}
resp.Header = &pb.ResponseHeader{}

limit := r.Limit
if r.SortOrder != pb.RangeRequest_NONE ||
r.MinModRevision != 0 || r.MaxModRevision != 0 ||
Expand All @@ -167,17 +164,52 @@ func executeRange(ctx context.Context, lg *zap.Logger, txnRead mvcc.TxnRead, r *
limit = limit + 1
}

// Combining r.CountOnly and any of { r.MaxModRevision, r.MinModRevision, r.MaxCreateRevision, r.MinCreateRevision }
// means "give me the count for the result of the filtering I asked for".
filtering := r.MaxModRevision != 0 || r.MinModRevision != 0 || r.MaxCreateRevision != 0 || r.MinCreateRevision != 0
var noKvsToProcess bool
var readLimit int64
if filtering {
// if we are doing filtering, we can't limit our read;
// otherwise we can't calculate what the total count would be.
readLimit = 0
noKvsToProcess = false
} else {
readLimit = limit
noKvsToProcess = r.CountOnly
}

ro := mvcc.RangeOptions{
Limit: limit,
Limit: readLimit,
Rev: r.Revision,
Count: r.CountOnly,
Count: noKvsToProcess,
}

rr, err := txnRead.Range(ctx, r.Key, mkGteRange(r.RangeEnd), ro)
if err != nil {
return nil, err
}
resp := &pb.RangeResponse{Header: &pb.ResponseHeader{Revision: rr.Rev}}

if noKvsToProcess {
resp.Count = int64(rr.Count)
resp.Kvs = make([]*mvccpb.KeyValue, 0)
} else {
processKvsInRange(r, rr, resp, trace, lg)
}
// No more pruning after this point; we can count now.
if filtering {
resp.Count = int64(len(rr.KVs))
} else {
resp.Count = int64(rr.Count)
}

trace.Step("assemble the response")
return resp, nil
}

func processKvsInRange(r *pb.RangeRequest, rr *mvcc.RangeResult, resp *pb.RangeResponse, trace *traceutil.Trace, lg *zap.Logger) {
trace.Step("filter and sort the key-value pairs")
if r.MaxModRevision != 0 {
f := func(kv *mvccpb.KeyValue) bool { return kv.ModRevision > r.MaxModRevision }
pruneKVs(rr, f)
Expand All @@ -195,6 +227,14 @@ func executeRange(ctx context.Context, lg *zap.Logger, txnRead mvcc.TxnRead, r *
pruneKVs(rr, f)
}

// If r.CountOnly was specified:
// * r.SortOrder is useless and ignored.
// * r.Limit is useless and ignored.
if r.CountOnly {
resp.Kvs = make([]*mvccpb.KeyValue, 0)
return
}

sortOrder := r.SortOrder
if r.SortTarget != pb.RangeRequest_KEY && sortOrder == pb.RangeRequest_NONE {
// Since current mvcc.Range implementation returns results
Expand Down Expand Up @@ -235,18 +275,13 @@ func executeRange(ctx context.Context, lg *zap.Logger, txnRead mvcc.TxnRead, r *
rr.KVs = rr.KVs[:r.Limit]
resp.More = true
}
trace.Step("filter and sort the key-value pairs")
resp.Header.Revision = rr.Rev
resp.Count = int64(rr.Count)
resp.Kvs = make([]*mvccpb.KeyValue, len(rr.KVs))
for i := range rr.KVs {
if r.KeysOnly {
rr.KVs[i].Value = nil
}
resp.Kvs[i] = &rr.KVs[i]
}
trace.Step("assemble the response")
return resp, nil
}

func Txn(ctx context.Context, lg *zap.Logger, rt *pb.TxnRequest, txnModeWriteWithSharedBuffer bool, kv mvcc.KV, lessor lease.Lessor) (*pb.TxnResponse, *traceutil.Trace, error) {
Expand Down
22 changes: 19 additions & 3 deletions tests/e2e/ctl_v3_kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ func getMinMaxCreateModRevTest(cx ctlCtx) {

for i, tt := range tests {
if err := ctlV3Get(cx, tt.args, tt.wkv...); err != nil {
cx.t.Errorf("getMinModRevTest #%d: ctlV3Get error (%v)", i, err)
cx.t.Errorf("getMinMaxCreateModRevTest #%d: ctlV3Get error (%v)", i, err)
}
}
}
Expand Down Expand Up @@ -387,15 +387,31 @@ type kv struct {
key, val string
}

// returns -1 if no string in ss with prefix is found, or the index of the first occurrence.
func findWithPrefix(ss []string, prefix string) int {
for i, s := range ss {
if strings.HasPrefix(s, prefix) {
return i
}
}
return -1
}

func ctlV3Get(cx ctlCtx, args []string, kvs ...kv) error {
cmdArgs := append(cx.PrefixArgs(), "get")
cmdArgs := append(cx.PrefixArgs(), "get", "--write-out=fields")
cmdArgs = append(cmdArgs, args...)
if !cx.quorum {
cmdArgs = append(cmdArgs, "--consistency", "s")
}
var lines []expect.ExpectedResponse
for _, elem := range kvs {
lines = append(lines, expect.ExpectedResponse{Value: elem.key}, expect.ExpectedResponse{Value: elem.val})
lines = append(lines,
expect.ExpectedResponse{Value: fmt.Sprintf("\"Key\" : \"%s\"", elem.key)},
expect.ExpectedResponse{Value: fmt.Sprintf("\"Value\" : \"%s\"", elem.val)},
)
}
if findWithPrefix(args, "--limit") == -1 {
lines = append(lines, expect.ExpectedResponse{Value: fmt.Sprintf("\"Count\" : %d", len(kvs))})
}
return e2e.SpawnWithExpects(cmdArgs, cx.envMap, lines...)
}
Expand Down
14 changes: 7 additions & 7 deletions tests/integration/v3_grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1471,7 +1471,7 @@ func TestV3RangeRequest(t *testing.T) {
{"rev2", "rev3", "rev4", "rev5", "rev6"},
},
[]bool{false, false, false, false},
[]int64{5, 5, 5, 5},
[]int64{4, 2, 3, 5},
},
{
"min/max create rev",
Expand Down Expand Up @@ -1503,7 +1503,7 @@ func TestV3RangeRequest(t *testing.T) {
{"rev2", "rev3", "rev6"},
},
[]bool{false, false, false, false},
[]int64{3, 3, 3, 3},
[]int64{2, 2, 1, 3},
},
}

Expand All @@ -1527,24 +1527,24 @@ func TestV3RangeRequest(t *testing.T) {
continue
}
if len(resp.Kvs) != len(tt.wresps[j]) {
t.Errorf("#%d.%d: bad len(resp.Kvs). got = %d, want = %d, ", i, j, len(resp.Kvs), len(tt.wresps[j]))
t.Errorf("#%d.%d (%s): bad len(resp.Kvs). got = %d, want = %d, ", i, j, tt.name, len(resp.Kvs), len(tt.wresps[j]))
continue
}
for k, wKey := range tt.wresps[j] {
respKey := string(resp.Kvs[k].Key)
if respKey != wKey {
t.Errorf("#%d.%d: key[%d]. got = %v, want = %v, ", i, j, k, respKey, wKey)
t.Errorf("#%d.%d (%s): key[%d]. got = %v, want = %v, ", i, j, tt.name, k, respKey, wKey)
}
}
if resp.More != tt.wmores[j] {
t.Errorf("#%d.%d: bad more. got = %v, want = %v, ", i, j, resp.More, tt.wmores[j])
t.Errorf("#%d.%d (%s): bad more. got = %v, want = %v, ", i, j, tt.name, resp.More, tt.wmores[j])
}
if resp.GetCount() != tt.wcounts[j] {
t.Errorf("#%d.%d: bad count. got = %v, want = %v, ", i, j, resp.GetCount(), tt.wcounts[j])
t.Errorf("#%d.%d (%s): bad count. got = %v, want = %v, ", i, j, tt.name, resp.GetCount(), tt.wcounts[j])
}
wrev := int64(len(tt.putKeys) + 1)
if resp.Header.Revision != wrev {
t.Errorf("#%d.%d: bad header revision. got = %d. want = %d", i, j, resp.Header.Revision, wrev)
t.Errorf("#%d.%d (%s): bad header revision. got = %d. want = %d", i, j, tt.name, resp.Header.Revision, wrev)
}
}
})
Expand Down
Loading