Skip to content

Commit

Permalink
Ensure Count in GET response is consistent with key filtering by revi…
Browse files Browse the repository at this point in the history
…sion

Signed-off-by: Cristian Ferretti <[email protected]>
  • Loading branch information
jcferretti committed Jul 20, 2024
1 parent 37cbd6c commit 042d0d3
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 14 deletions.
3 changes: 2 additions & 1 deletion etcdctl/ctlv3/ctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ func init() {
rootCmd.PersistentFlags().StringSliceVar(&globalFlags.Endpoints, "endpoints", []string{"127.0.0.1:2379"}, "gRPC endpoints")
rootCmd.PersistentFlags().BoolVar(&globalFlags.Debug, "debug", false, "enable client-side debug logging")

rootCmd.PersistentFlags().StringVarP(&globalFlags.OutputFormat, "write-out", "w", "simple", "set the output format (fields, json, protobuf, simple, table)")
rootCmd.PersistentFlags().StringVarP(&globalFlags.OutputFormat, "write-out", "w", "simple",
"set the output format (fields, json, protobuf, simple, table); note json encodes kvs as base64")
rootCmd.PersistentFlags().BoolVar(&globalFlags.IsHex, "hex", false, "print byte strings as hex encoded strings")
rootCmd.RegisterFlagCompletionFunc("write-out", func(_ *cobra.Command, _ []string, _ string) ([]string, cobra.ShellCompDirective) {
return []string{"fields", "json", "protobuf", "simple", "table"}, cobra.ShellCompDirectiveDefault
Expand Down
3 changes: 2 additions & 1 deletion etcdutl/ctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ var (
)

func init() {
rootCmd.PersistentFlags().StringVarP(&etcdutl.OutputFormat, "write-out", "w", "simple", "set the output format (fields, json, protobuf, simple, table)")
rootCmd.PersistentFlags().StringVarP(&etcdutl.OutputFormat, "write-out", "w", "simple",
"set the output format (fields, json, protobuf, simple, table); note json encodes kvs as base64")
rootCmd.RegisterFlagCompletionFunc("write-out", func(_ *cobra.Command, _ []string, _ string) ([]string, cobra.ShellCompDirective) {
return []string{"fields", "json", "protobuf", "simple", "table"}, cobra.ShellCompDirectiveDefault
})
Expand Down
44 changes: 35 additions & 9 deletions server/etcdserver/txn/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,6 @@ func Range(ctx context.Context, lg *zap.Logger, kv mvcc.KV, r *pb.RangeRequest)
func executeRange(ctx context.Context, lg *zap.Logger, txnRead mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) {
trace := traceutil.Get(ctx)

resp := &pb.RangeResponse{}
resp.Header = &pb.ResponseHeader{}

limit := r.Limit
if r.SortOrder != pb.RangeRequest_NONE ||
r.MinModRevision != 0 || r.MaxModRevision != 0 ||
Expand All @@ -167,17 +164,41 @@ func executeRange(ctx context.Context, lg *zap.Logger, txnRead mvcc.TxnRead, r *
limit = limit + 1
}

// Combining r.CountOnly and any of { r.MaxModRevision, r.MinModRevision, r.MaxCreateRevision, r.MinCreateRevision }
// means "give me the count for the result of the filtering I asked for".
filtering := r.MaxModRevision != 0 || r.MinModRevision != 0 || r.MaxCreateRevision != 0 || r.MinCreateRevision != 0
var noKvsToProcess bool
if filtering {
noKvsToProcess = false
} else {
noKvsToProcess = r.CountOnly
}

ro := mvcc.RangeOptions{
Limit: limit,
Rev: r.Revision,
Count: r.CountOnly,
Count: noKvsToProcess,
}

rr, err := txnRead.Range(ctx, r.Key, mkGteRange(r.RangeEnd), ro)
if err != nil {
return nil, err
}
resp := &pb.RangeResponse{Header: &pb.ResponseHeader{Revision: rr.Rev}}

if noKvsToProcess {
resp.Count = int64(rr.Count)
resp.Kvs = make([]*mvccpb.KeyValue, 0)
} else {
processKvsInRange(r, rr, resp, trace, lg)
}

trace.Step("assemble the response")
return resp, nil
}

func processKvsInRange(r *pb.RangeRequest, rr *mvcc.RangeResult, resp *pb.RangeResponse, trace *traceutil.Trace, lg *zap.Logger) {
trace.Step("filter and sort the key-value pairs")
if r.MaxModRevision != 0 {
f := func(kv *mvccpb.KeyValue) bool { return kv.ModRevision > r.MaxModRevision }
pruneKVs(rr, f)
Expand All @@ -195,6 +216,16 @@ func executeRange(ctx context.Context, lg *zap.Logger, txnRead mvcc.TxnRead, r *
pruneKVs(rr, f)
}

// No more prunning after this point; we can count now.
resp.Count = int64(len(rr.KVs))
// If r.CountOnly was specified:
// * r.SortOrder is useless and ignored.
// * r.Limit is useless and ignored.
if r.CountOnly {
resp.Kvs = make([]*mvccpb.KeyValue, 0)
return
}

sortOrder := r.SortOrder
if r.SortTarget != pb.RangeRequest_KEY && sortOrder == pb.RangeRequest_NONE {
// Since current mvcc.Range implementation returns results
Expand Down Expand Up @@ -235,18 +266,13 @@ func executeRange(ctx context.Context, lg *zap.Logger, txnRead mvcc.TxnRead, r *
rr.KVs = rr.KVs[:r.Limit]
resp.More = true
}
trace.Step("filter and sort the key-value pairs")
resp.Header.Revision = rr.Rev
resp.Count = int64(rr.Count)
resp.Kvs = make([]*mvccpb.KeyValue, len(rr.KVs))
for i := range rr.KVs {
if r.KeysOnly {
rr.KVs[i].Value = nil
}
resp.Kvs[i] = &rr.KVs[i]
}
trace.Step("assemble the response")
return resp, nil
}

func Txn(ctx context.Context, lg *zap.Logger, rt *pb.TxnRequest, txnModeWriteWithSharedBuffer bool, kv mvcc.KV, lessor lease.Lessor) (*pb.TxnResponse, *traceutil.Trace, error) {
Expand Down
22 changes: 19 additions & 3 deletions tests/e2e/ctl_v3_kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ func getMinMaxCreateModRevTest(cx ctlCtx) {

for i, tt := range tests {
if err := ctlV3Get(cx, tt.args, tt.wkv...); err != nil {
cx.t.Errorf("getMinModRevTest #%d: ctlV3Get error (%v)", i, err)
cx.t.Errorf("getMinMaxCreateModRevTest #%d: ctlV3Get error (%v)", i, err)
}
}
}
Expand Down Expand Up @@ -387,15 +387,31 @@ type kv struct {
key, val string
}

// returns -1 if searchArg is not found, or the index when found.
func findArg(cx ctlCtx, searchArg string) int {
for i, arg := range cx.PrefixArgs() {
if arg == searchArg {
return i
}
}
return -1
}

func ctlV3Get(cx ctlCtx, args []string, kvs ...kv) error {
cmdArgs := append(cx.PrefixArgs(), "get")
cmdArgs := append(cx.PrefixArgs(), "get", "--write-out=fields")
cmdArgs = append(cmdArgs, args...)
if !cx.quorum {
cmdArgs = append(cmdArgs, "--consistency", "s")
}
var lines []expect.ExpectedResponse
for _, elem := range kvs {
lines = append(lines, expect.ExpectedResponse{Value: elem.key}, expect.ExpectedResponse{Value: elem.val})
lines = append(lines,
expect.ExpectedResponse{Value: fmt.Sprintf("\"Key\" : \"%s\"", elem.key)},
expect.ExpectedResponse{Value: fmt.Sprintf("\"Value\" : \"%s\"", elem.val)},
)
}
if findArg(cx, "--limit") == -1 {
lines = append(lines, expect.ExpectedResponse{Value: fmt.Sprintf("\"Count\" : %d", len(kvs))})
}
return e2e.SpawnWithExpects(cmdArgs, cx.envMap, lines...)
}
Expand Down

0 comments on commit 042d0d3

Please sign in to comment.