Skip to content

Commit

Permalink
truststore reload task (#686)
Browse files Browse the repository at this point in the history
* Add reload truststore cassandraTask.

* Make sure we're using POST as method.

* Ensure the reload task is in the FakeExecutorServerWithDetails.

* Add endpoint to the right place in the tests.

* Rename several variables to reflect that we might later have client ts reload too.

* Error handling for when a pod does not support the new ts reload functionality.

* Fix all tests so we test failed features endpoint for a sync feature, then also test that a working features endpoint gives us back the right response too.

* Refer to tsreload using more dsetool like language.

* Ensure that waitForTaskFailed works as expected.
  • Loading branch information
Miles-Garnsey committed Sep 9, 2024
1 parent 9559155 commit cc63a41
Show file tree
Hide file tree
Showing 7 changed files with 137 additions and 8 deletions.
1 change: 1 addition & 0 deletions apis/control/v1alpha1/cassandratask_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ const (
CommandGarbageCollect CassandraCommand = "garbagecollect"
CommandFlush CassandraCommand = "flush"
CommandRefresh CassandraCommand = "refresh"
CommandTSReload CassandraCommand = "tsreload"
)

type CassandraJob struct {
Expand Down
23 changes: 16 additions & 7 deletions internal/controllers/control/cassandratask_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ type TaskConfiguration struct {
// Execution functionality per pod
AsyncFeature httphelper.Feature
AsyncFunc AsyncTaskExecutorFunc
SyncFeature httphelper.Feature
SyncFunc SyncTaskExecutorFunc
PodFilter PodFilterFunc

Expand Down Expand Up @@ -317,6 +318,8 @@ JobDefinition:
}
completed = taskConfig.Completed
break JobDefinition
case api.CommandTSReload:
inodeTsReload(taskConfig)
default:
err = fmt.Errorf("unknown job command: %s", job.Command)
return ctrl.Result{}, err
Expand Down Expand Up @@ -647,13 +650,9 @@ func (r *CassandraTaskReconciler) reconcileEveryPodTask(ctx context.Context, dc
if !taskConfig.Filter(&pod) {
continue
}

features := &httphelper.FeatureSet{}
if taskConfig.AsyncFeature != "" {
features, err = nodeMgmtClient.FeatureSet(&pod)
if err != nil {
return ctrl.Result{}, failed, completed, errMsg, err
}
features, err := nodeMgmtClient.FeatureSet(&pod)
if err != nil {
return ctrl.Result{}, failed, completed, errMsg, err
}

if pod.Annotations == nil {
Expand Down Expand Up @@ -786,6 +785,16 @@ func (r *CassandraTaskReconciler) reconcileEveryPodTask(ctx context.Context, dc
return ctrl.Result{}, failed, completed, errMsg, err
}

if taskConfig.SyncFeature != "" {
if !features.Supports(taskConfig.SyncFeature) {
logger.Error(err, "Pod doesn't support this feature", "Pod", pod, "Feature", taskConfig.SyncFeature)
jobStatus.Status = podJobError
failed++
errMsg = fmt.Sprintf("Pod %s doesn't support %s feature", pod.Name, taskConfig.SyncFeature)
return ctrl.Result{}, failed, completed, errMsg, err
}
}

jobId := strconv.Itoa(idx)

// This pod should run next, mark it
Expand Down
53 changes: 53 additions & 0 deletions internal/controllers/control/cassandratask_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,18 @@ func waitForTaskCompletion(taskKey types.NamespacedName) *api.CassandraTask {
return emptyTask
}

func waitForTaskFailed(taskKey types.NamespacedName) *api.CassandraTask {
var emptyTask *api.CassandraTask
Eventually(func() bool {
emptyTask = &api.CassandraTask{}
err := k8sClient.Get(context.TODO(), taskKey, emptyTask)
Expect(err).ToNot(HaveOccurred())

return emptyTask.Status.Failed > 0
}, time.Duration(5*time.Second)).Should(BeTrue())
return emptyTask
}

var _ = Describe("CassandraTask controller tests", func() {
Describe("Execute jobs against all pods", func() {
JobRunningRequeue = time.Duration(1 * time.Millisecond)
Expand Down Expand Up @@ -622,6 +634,47 @@ var _ = Describe("CassandraTask controller tests", func() {
Expect(callDetails.URLCounts["/api/v0/ops/executor/job"]).To(Equal(0))
Expect(callDetails.URLCounts["/api/v0/metadata/versions/features"]).To(BeNumerically(">", 1))

// verifyPodsHaveAnnotations(testNamespaceName, string(task.UID))
Expect(completedTask.Status.Succeeded).To(BeNumerically(">=", 1))
})
It("Runs a ts reload task against a pod and fails", func() {
By("Creating a task for tsreload")
taskKey, task := buildTask(api.CommandTSReload, testNamespaceName)
task.Spec.Jobs[0].Arguments.PodName = fmt.Sprintf("%s-%s-r0-sts-0", clusterName, testDatacenterName)
Expect(k8sClient.Create(context.Background(), task)).Should(Succeed())

completedTask := waitForTaskFailed(taskKey)

Expect(callDetails.URLCounts["/api/v0/ops/node/encryption/internode/truststore/reload"]).To(Equal(0)) // This doesn't get called because the test of whether the feature exists doesn't pass. The features endpoint doesn't exist in this mock server.

// verifyPodsHaveAnnotations(testNamespaceName, string(task.UID))
Expect(completedTask.Status.Succeeded).To(BeNumerically("==", 0))
Expect(completedTask.Status.Failed).To(BeNumerically(">", 0))
})
})
Context("successful SyncFeature usage", func() {
var testNamespaceName string
BeforeEach(func() {
By("Creating fake synchronous mgmt-api server")
var err error
callDetails = httphelper.NewCallDetails()
mockServer, err = httphelper.FakeServerWithSyncFeaturesEndpoint(callDetails)
testNamespaceName = fmt.Sprintf("test-sync-task-%d", rand.Int31())
Expect(err).ToNot(HaveOccurred())
mockServer.Start()
By("create datacenter", createDatacenter(testDatacenterName, testNamespaceName))
})
It("Runs a ts reload task against a pod", func() {
By("Creating a task for tsreload")
taskKey, task := buildTask(api.CommandTSReload, testNamespaceName)
task.Spec.Jobs[0].Arguments.PodName = fmt.Sprintf("%s-%s-r0-sts-0", clusterName, testDatacenterName)
Expect(k8sClient.Create(context.Background(), task)).Should(Succeed())

completedTask := waitForTaskCompletion(taskKey)

Expect(callDetails.URLCounts["/api/v0/ops/node/encryption/internode/truststore/reload"]).To(Equal(1))
Expect(callDetails.URLCounts["/api/v0/metadata/versions/features"]).To(BeNumerically(">", 0)) // This doesn't get called because the test of whether the feature exists doesn't pass.

// verifyPodsHaveAnnotations(testNamespaceName, string(task.UID))
Expect(completedTask.Status.Succeeded).To(BeNumerically(">=", 1))
})
Expand Down
13 changes: 13 additions & 0 deletions internal/controllers/control/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,19 @@ func (r *CassandraTaskReconciler) refreshDatacenter(ctx context.Context, dc *cas
}
}
return ctrl.Result{RequeueAfter: JobRunningRequeue}, nil

}

// ts reload functionality

func inodeTsReload(taskConfig *TaskConfiguration) {
taskConfig.PodFilter = genericPodFilter
taskConfig.SyncFunc = inodeTsReloadSync
taskConfig.SyncFeature = httphelper.ReloadInodeTruststore
}

func inodeTsReloadSync(nodeMgmtClient httphelper.NodeMgmtClient, pod *corev1.Pod, taskConfig *TaskConfiguration) error {
return nodeMgmtClient.CallinodeTsReloadEndpoint(pod)
}

// Common functions
Expand Down
2 changes: 2 additions & 0 deletions internal/envtest/fake_mgmtapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ func FakeServer(cli client.Client, logger logr.Logger, podKey types.NamespacedNa
w.WriteHeader(http.StatusOK)
case "/api/v0/ops/keyspace/cleanup":
w.WriteHeader(http.StatusOK)
case "/api/v0/ops/node/encryption/internode/truststore/reload":
w.WriteHeader(http.StatusOK)
default:
w.WriteHeader(http.StatusNotFound)
}
Expand Down
28 changes: 28 additions & 0 deletions pkg/httphelper/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ const (
Move Feature = "async_move_task"
AsyncGarbageCollect Feature = "async_gc_task"
AsyncFlush Feature = "async_flush_task"
ReloadInodeTruststore Feature = "reload_internode_truststore"
)

func (f *FeatureSet) UnmarshalJSON(b []byte) error {
Expand Down Expand Up @@ -585,6 +586,33 @@ func (client *NodeMgmtClient) CallCompactionEndpoint(pod *corev1.Pod, compactReq
return nil
}

// CallTSReloadEndpoint calls the async version of TSReload
func (client *NodeMgmtClient) CallinodeTsReloadEndpoint(pod *corev1.Pod) error {
client.Log.Info(
"calling Management API TS REload endpoint - POST /api/v0/node/encryption/internode/truststore/reload",
"pod", pod.Name,
)
podHost, podPort, err := BuildPodHostFromPod(pod)
if err != nil {
return err
}

request := nodeMgmtRequest{
endpoint: "/api/v0/ops/node/encryption/internode/truststore/reload",
host: podHost,
port: podPort,
method: http.MethodPost,
timeout: 60 * time.Second,
}

_, err = callNodeMgmtEndpoint(client, request, "application/json")
if err != nil {
return err
}

return nil
}

type ScrubRequest struct {
DisableSnapshot bool `json:"disable_snapshot"`
SkipCorrupted bool `json:"skip_corrupted"`
Expand Down
25 changes: 24 additions & 1 deletion pkg/httphelper/server_test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ var featuresReply = `{
"async_gc_task",
"async_flush_task",
"async_scrub_task",
"async_compaction_task"
"async_compaction_task",
"reload_internode_truststore"
]
}`

Expand Down Expand Up @@ -128,10 +129,32 @@ func FakeExecutorServerWithDetailsFails(callDetails *CallDetails) (*httptest.Ser
}))
}

func FakeServerWithSyncFeaturesEndpoint(callDetails *CallDetails) (*httptest.Server, error) {
return FakeMgmtApiServer(callDetails, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_, err := url.ParseQuery(r.URL.RawQuery)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
}
if r.Method == http.MethodGet && r.RequestURI == "/api/v0/metadata/versions/features" {
w.WriteHeader(http.StatusOK)
_, err = w.Write([]byte(featuresReply))
} else if r.Method == http.MethodPost && r.URL.Path == "/api/v0/ops/node/encryption/internode/truststore/reload" {
w.WriteHeader(http.StatusOK)
} else {
w.WriteHeader(http.StatusNotFound)
}
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
}
}))
}

func FakeServerWithoutFeaturesEndpoint(callDetails *CallDetails) (*httptest.Server, error) {
return FakeMgmtApiServer(callDetails, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method == http.MethodPost && (r.URL.Path == "/api/v0/ops/keyspace/cleanup" || r.URL.Path == "/api/v0/ops/tables/sstables/upgrade" || r.URL.Path == "/api/v0/ops/node/drain" || r.URL.Path == "/api/v0/ops/tables/flush" || r.URL.Path == "/api/v0/ops/tables/garbagecollect" || r.URL.Path == "/api/v0/ops/tables/compact") {
w.WriteHeader(http.StatusOK)
} else if r.Method == http.MethodPost && r.URL.Path == "/api/v0/ops/node/encryption/internode/truststore/reload" {
w.WriteHeader(http.StatusOK)
} else {
w.WriteHeader(http.StatusNotFound)
}
Expand Down

0 comments on commit cc63a41

Please sign in to comment.