Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
130619: kvserver: enable admitted vector protocol r=sumeerbhola,kvoli a=pav-kv

This commit enables `AdmittedState` annotations in `RaftMessageRequest` and `RaftMessageRequestBatch` protocol. The leader starts receiving admitted vector updates from all replicas.

Part of #129508

131172: cloud/amazon: test kms against mock server r=dt a=dt

Release note: none.
Epic: none.

Co-authored-by: Pavel Kalinnikov <[email protected]>
Co-authored-by: David Taylor <[email protected]>
  • Loading branch information
3 people committed Sep 23, 2024
3 parents e76e9dc + 1ac7e10 + de15077 commit 0b4b121
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 8 deletions.
1 change: 1 addition & 0 deletions pkg/cloud/amazon/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ go_test(
"//pkg/cloud/cloudpb",
"//pkg/cloud/cloudtestutils",
"//pkg/security/username",
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/testutils",
"//pkg/testutils/skip",
Expand Down
63 changes: 63 additions & 0 deletions pkg/cloud/amazon/aws_kms_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,22 @@ package amazon

import (
"context"
"encoding/pem"
"fmt"
"io"
"net/http"
"net/http/httptest"
"net/url"
"os"
"path/filepath"
"strings"
"testing"

"github.com/aws/aws-sdk-go-v2/config"
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/cloud"
"github.com/cockroachdb/cockroach/pkg/cloud/cloudtestutils"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
Expand Down Expand Up @@ -231,6 +237,63 @@ func TestEncryptDecryptAWSAssumeRole(t *testing.T) {
})
}

func TestKMSAgainstMockAWS(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()

// Setup a bogus credentials file so it doesn't try to use a metadata server.
tempDir, cleanup := testutils.TempDir(t)
defer cleanup()
credFile := filepath.Join(tempDir, "credentials")
require.NoError(t, os.Setenv("AWS_SHARED_CREDENTIALS_FILE", credFile))
defer func() {
require.NoError(t, os.Unsetenv("AWS_SHARED_CREDENTIALS_FILE"))
}()
require.NoError(t, os.WriteFile(credFile, []byte(`[default]
aws_access_key_id = abc
aws_secret_access_key = xyz
`), 0644))

srv := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
body, err := io.ReadAll(r.Body)
require.NoError(t, err)
defer r.Body.Close()
// Default to replying with static placeholder "ciphertext", unless req is
// a decrypt req, in which case send static "decrypted" plaintext resp.
resp := `{"CiphertextBlob": "dW51c2Vk"}` // "unused".
if strings.Contains(string(body), "Ciphertext") {
resp = `{"Plaintext": "aGVsbG8gd29ybGQ="}` // base64 for 'hello world'
}
_, err = w.Write([]byte(resp))
require.NoError(t, err)
}))
defer srv.Close()

tEnv := &cloud.TestKMSEnv{Settings: cluster.MakeTestingClusterSettings(), ExternalIOConfig: &base.ExternalIODirConfig{}}

// Set the custom CA so testserver is trusted, and defer reset of it.
u := tEnv.Settings.MakeUpdater()
require.NoError(t, u.Set(ctx, "cloudstorage.http.custom_ca", settings.EncodedValue{
Type: "s", Value: string(pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: srv.Certificate().Raw})),
}))

t.Run("implicit", func(t *testing.T) {
q := url.Values{
KMSRegionParam: []string{"r"}, AWSEndpointParam: []string{srv.URL},
cloud.AuthParam: []string{"implicit"},
}
cloud.KMSEncryptDecrypt(t, fmt.Sprintf("aws:///arn?%s", q.Encode()), tEnv)
})

t.Run("specified", func(t *testing.T) {
q := url.Values{
KMSRegionParam: []string{"r"}, AWSEndpointParam: []string{srv.URL},
AWSAccessKeyParam: []string{"k"}, AWSSecretParam: []string{"s"},
}
cloud.KMSEncryptDecrypt(t, fmt.Sprintf("aws:///arn?%s", q.Encode()), tEnv)
})
}

func TestPutAWSKMSEndpoint(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()
Expand Down
4 changes: 0 additions & 4 deletions pkg/kv/kvserver/raft_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -749,10 +749,6 @@ func (t *RaftTransport) processQueue(
maybeAnnotateWithAdmittedStates := func(
batch *kvserverpb.RaftMessageRequestBatch, admitted []kvflowcontrolpb.PiggybackedAdmittedState,
) {
// TODO(pav-kv): send these protos once they are populated correctly.
if true {
return
}
batch.AdmittedStates = append(batch.AdmittedStates, admitted...)
}

Expand Down
10 changes: 6 additions & 4 deletions pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -1944,10 +1944,12 @@ func (r *Replica) sendRaftMessage(ctx context.Context, msg raftpb.Message) {
// admitted log indices, by priority.
if msg.Type == raftpb.MsgAppResp && !msg.Reject {
admitted := r.flowControlV2.AdmittedState()
// The admitted state must be in the coordinate system of the leader's log.
if admitted.Term == msg.Term && false {
// TODO(pav-kv): enable this annotation when it is covered with tests, and
// the leader knows how to handle it.
// If admitted.Term is lagging msg.Term, sending the admitted vector has no
// effect on the leader, so skip it.
// If msg.Term is lagging the admitted.Term, this is a message to a stale
// leader. Sending it allows that leader to release all tokens. It would
// otherwise do so soon anyway, upon learning about the new leader.
if admitted.Term >= msg.Term {
req.AdmittedState = kvflowcontrolpb.AdmittedState{
Term: admitted.Term,
Admitted: admitted.Admitted[:],
Expand Down

0 comments on commit 0b4b121

Please sign in to comment.