Skip to content

Commit

Permalink
Ensure Count in GET response is consistent with key filtering by revi…
Browse files Browse the repository at this point in the history
…sion

Signed-off-by: Cristian Ferretti <[email protected]>
  • Loading branch information
jcferretti committed Jul 20, 2024
1 parent 37cbd6c commit bd5ec03
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 22 deletions.
3 changes: 2 additions & 1 deletion etcdctl/ctlv3/ctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ func init() {
rootCmd.PersistentFlags().StringSliceVar(&globalFlags.Endpoints, "endpoints", []string{"127.0.0.1:2379"}, "gRPC endpoints")
rootCmd.PersistentFlags().BoolVar(&globalFlags.Debug, "debug", false, "enable client-side debug logging")

rootCmd.PersistentFlags().StringVarP(&globalFlags.OutputFormat, "write-out", "w", "simple", "set the output format (fields, json, protobuf, simple, table)")
rootCmd.PersistentFlags().StringVarP(&globalFlags.OutputFormat, "write-out", "w", "simple",
"set the output format (fields, json, protobuf, simple, table); note json encodes kvs as base64")
rootCmd.PersistentFlags().BoolVar(&globalFlags.IsHex, "hex", false, "print byte strings as hex encoded strings")
rootCmd.RegisterFlagCompletionFunc("write-out", func(_ *cobra.Command, _ []string, _ string) ([]string, cobra.ShellCompDirective) {
return []string{"fields", "json", "protobuf", "simple", "table"}, cobra.ShellCompDirectiveDefault
Expand Down
3 changes: 2 additions & 1 deletion etcdutl/ctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ var (
)

func init() {
rootCmd.PersistentFlags().StringVarP(&etcdutl.OutputFormat, "write-out", "w", "simple", "set the output format (fields, json, protobuf, simple, table)")
rootCmd.PersistentFlags().StringVarP(&etcdutl.OutputFormat, "write-out", "w", "simple",
"set the output format (fields, json, protobuf, simple, table); note json encodes kvs as base64")
rootCmd.RegisterFlagCompletionFunc("write-out", func(_ *cobra.Command, _ []string, _ string) ([]string, cobra.ShellCompDirective) {
return []string{"fields", "json", "protobuf", "simple", "table"}, cobra.ShellCompDirectiveDefault
})
Expand Down
55 changes: 45 additions & 10 deletions server/etcdserver/txn/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,6 @@ func Range(ctx context.Context, lg *zap.Logger, kv mvcc.KV, r *pb.RangeRequest)
func executeRange(ctx context.Context, lg *zap.Logger, txnRead mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) {
trace := traceutil.Get(ctx)

resp := &pb.RangeResponse{}
resp.Header = &pb.ResponseHeader{}

limit := r.Limit
if r.SortOrder != pb.RangeRequest_NONE ||
r.MinModRevision != 0 || r.MaxModRevision != 0 ||
Expand All @@ -167,17 +164,52 @@ func executeRange(ctx context.Context, lg *zap.Logger, txnRead mvcc.TxnRead, r *
limit = limit + 1
}

// Combining r.CountOnly and any of { r.MaxModRevision, r.MinModRevision, r.MaxCreateRevision, r.MinCreateRevision }
// means "give me the count for the result of the filtering I asked for".
filtering := r.MaxModRevision != 0 || r.MinModRevision != 0 || r.MaxCreateRevision != 0 || r.MinCreateRevision != 0
var noKvsToProcess bool
var readLimit int64
if filtering {
// if we are doing filtering, we can't limit our read;
// otherwise we can't calculate what the total count would be.
readLimit = 0
noKvsToProcess = false
} else {
readLimit = limit
noKvsToProcess = r.CountOnly
}

ro := mvcc.RangeOptions{
Limit: limit,
Limit: readLimit,
Rev: r.Revision,
Count: r.CountOnly,
Count: noKvsToProcess,
}

rr, err := txnRead.Range(ctx, r.Key, mkGteRange(r.RangeEnd), ro)
if err != nil {
return nil, err
}
resp := &pb.RangeResponse{Header: &pb.ResponseHeader{Revision: rr.Rev}}

if noKvsToProcess {
resp.Count = int64(rr.Count)
resp.Kvs = make([]*mvccpb.KeyValue, 0)
} else {
processKvsInRange(r, rr, resp, trace, lg)
}
// No more pruning after this point; we can count now.
if filtering {
resp.Count = int64(len(rr.KVs))
} else {
resp.Count = int64(rr.Count)
}

trace.Step("assemble the response")
return resp, nil
}

func processKvsInRange(r *pb.RangeRequest, rr *mvcc.RangeResult, resp *pb.RangeResponse, trace *traceutil.Trace, lg *zap.Logger) {
trace.Step("filter and sort the key-value pairs")
if r.MaxModRevision != 0 {
f := func(kv *mvccpb.KeyValue) bool { return kv.ModRevision > r.MaxModRevision }
pruneKVs(rr, f)
Expand All @@ -195,6 +227,14 @@ func executeRange(ctx context.Context, lg *zap.Logger, txnRead mvcc.TxnRead, r *
pruneKVs(rr, f)
}

// If r.CountOnly was specified:
// * r.SortOrder is useless and ignored.
// * r.Limit is useless and ignored.
if r.CountOnly {
resp.Kvs = make([]*mvccpb.KeyValue, 0)
return
}

sortOrder := r.SortOrder
if r.SortTarget != pb.RangeRequest_KEY && sortOrder == pb.RangeRequest_NONE {
// Since current mvcc.Range implementation returns results
Expand Down Expand Up @@ -235,18 +275,13 @@ func executeRange(ctx context.Context, lg *zap.Logger, txnRead mvcc.TxnRead, r *
rr.KVs = rr.KVs[:r.Limit]
resp.More = true
}
trace.Step("filter and sort the key-value pairs")
resp.Header.Revision = rr.Rev
resp.Count = int64(rr.Count)
resp.Kvs = make([]*mvccpb.KeyValue, len(rr.KVs))
for i := range rr.KVs {
if r.KeysOnly {
rr.KVs[i].Value = nil
}
resp.Kvs[i] = &rr.KVs[i]
}
trace.Step("assemble the response")
return resp, nil
}

func Txn(ctx context.Context, lg *zap.Logger, rt *pb.TxnRequest, txnModeWriteWithSharedBuffer bool, kv mvcc.KV, lessor lease.Lessor) (*pb.TxnResponse, *traceutil.Trace, error) {
Expand Down
22 changes: 19 additions & 3 deletions tests/e2e/ctl_v3_kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ func getMinMaxCreateModRevTest(cx ctlCtx) {

for i, tt := range tests {
if err := ctlV3Get(cx, tt.args, tt.wkv...); err != nil {
cx.t.Errorf("getMinModRevTest #%d: ctlV3Get error (%v)", i, err)
cx.t.Errorf("getMinMaxCreateModRevTest #%d: ctlV3Get error (%v)", i, err)
}
}
}
Expand Down Expand Up @@ -387,15 +387,31 @@ type kv struct {
key, val string
}

// returns -1 if no string in ss with prefix is found, or the index of the first occurrence.
func findWithPrefix(ss []string, prefix string) int {
for i, s := range ss {
if strings.HasPrefix(s, prefix) {
return i
}
}
return -1
}

func ctlV3Get(cx ctlCtx, args []string, kvs ...kv) error {
cmdArgs := append(cx.PrefixArgs(), "get")
cmdArgs := append(cx.PrefixArgs(), "get", "--write-out=fields")
cmdArgs = append(cmdArgs, args...)
if !cx.quorum {
cmdArgs = append(cmdArgs, "--consistency", "s")
}
var lines []expect.ExpectedResponse
for _, elem := range kvs {
lines = append(lines, expect.ExpectedResponse{Value: elem.key}, expect.ExpectedResponse{Value: elem.val})
lines = append(lines,
expect.ExpectedResponse{Value: fmt.Sprintf("\"Key\" : \"%s\"", elem.key)},
expect.ExpectedResponse{Value: fmt.Sprintf("\"Value\" : \"%s\"", elem.val)},
)
}
if findWithPrefix(args, "--limit") == -1 {
lines = append(lines, expect.ExpectedResponse{Value: fmt.Sprintf("\"Count\" : %d", len(kvs))})
}
return e2e.SpawnWithExpects(cmdArgs, cx.envMap, lines...)
}
Expand Down
14 changes: 7 additions & 7 deletions tests/integration/v3_grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1471,7 +1471,7 @@ func TestV3RangeRequest(t *testing.T) {
{"rev2", "rev3", "rev4", "rev5", "rev6"},
},
[]bool{false, false, false, false},
[]int64{5, 5, 5, 5},
[]int64{4, 2, 3, 5},
},
{
"min/max create rev",
Expand Down Expand Up @@ -1503,7 +1503,7 @@ func TestV3RangeRequest(t *testing.T) {
{"rev2", "rev3", "rev6"},
},
[]bool{false, false, false, false},
[]int64{3, 3, 3, 3},
[]int64{2, 2, 1, 3},
},
}

Expand All @@ -1527,24 +1527,24 @@ func TestV3RangeRequest(t *testing.T) {
continue
}
if len(resp.Kvs) != len(tt.wresps[j]) {
t.Errorf("#%d.%d: bad len(resp.Kvs). got = %d, want = %d, ", i, j, len(resp.Kvs), len(tt.wresps[j]))
t.Errorf("#%d.%d (%s): bad len(resp.Kvs). got = %d, want = %d, ", i, j, tt.name, len(resp.Kvs), len(tt.wresps[j]))
continue
}
for k, wKey := range tt.wresps[j] {
respKey := string(resp.Kvs[k].Key)
if respKey != wKey {
t.Errorf("#%d.%d: key[%d]. got = %v, want = %v, ", i, j, k, respKey, wKey)
t.Errorf("#%d.%d (%s): key[%d]. got = %v, want = %v, ", i, j, tt.name, k, respKey, wKey)
}
}
if resp.More != tt.wmores[j] {
t.Errorf("#%d.%d: bad more. got = %v, want = %v, ", i, j, resp.More, tt.wmores[j])
t.Errorf("#%d.%d (%s): bad more. got = %v, want = %v, ", i, j, tt.name, resp.More, tt.wmores[j])
}
if resp.GetCount() != tt.wcounts[j] {
t.Errorf("#%d.%d: bad count. got = %v, want = %v, ", i, j, resp.GetCount(), tt.wcounts[j])
t.Errorf("#%d.%d (%s): bad count. got = %v, want = %v, ", i, j, tt.name, resp.GetCount(), tt.wcounts[j])
}
wrev := int64(len(tt.putKeys) + 1)
if resp.Header.Revision != wrev {
t.Errorf("#%d.%d: bad header revision. got = %d. want = %d", i, j, resp.Header.Revision, wrev)
t.Errorf("#%d.%d (%s): bad header revision. got = %d. want = %d", i, j, tt.name, resp.Header.Revision, wrev)
}
}
})
Expand Down

0 comments on commit bd5ec03

Please sign in to comment.