Skip to content

Commit

Permalink
enable mqtt auth for e2e testing.
Browse files Browse the repository at this point in the history
Signed-off-by: morvencao <[email protected]>
  • Loading branch information
morvencao committed Jun 17, 2024
1 parent dac8306 commit a46b394
Show file tree
Hide file tree
Showing 5 changed files with 282 additions and 9 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@
# Ignore temporary *.swp files
*.swp

# Ignore certificate and key files
*.crt
*.key

# Ignore test databases
/db.sqlite.test.*

Expand Down
13 changes: 11 additions & 2 deletions cmd/maestro/agent/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"flag"
"fmt"
"os"
"time"

"github.com/spf13/cobra"
"github.com/spf13/pflag"
Expand All @@ -16,11 +17,13 @@ import (
commonoptions "open-cluster-management.io/ocm/pkg/common/options"
"open-cluster-management.io/ocm/pkg/features"
"open-cluster-management.io/ocm/pkg/work/spoke"
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/cert"
)

var (
commonOptions = commonoptions.NewAgentOptions()
agentOption = spoke.NewWorkloadAgentOptions()
commonOptions = commonoptions.NewAgentOptions()
agentOption = spoke.NewWorkloadAgentOptions()
certRefreshDuration = 5 * time.Minute
)

// by default uses 1M as the limit for state feedback
Expand All @@ -37,6 +40,10 @@ func NewAgentCommand() *cobra.Command {
cmd.Use = "agent"
cmd.Short = "Start the Maestro Agent"
cmd.Long = "Start the Maestro Agent"
cmd.PreRun = func(cmd *cobra.Command, args []string) {
// set the certificate refresh duration for the MQTT broker
cert.CertCallbackRefreshDuration = certRefreshDuration
}

// check if the flag is already registered to avoid duplicate flag define error
if flag.CommandLine.Lookup("alsologtostderr") != nil {
Expand Down Expand Up @@ -70,4 +77,6 @@ func addFlags(fs *pflag.FlagSet) {
commonOptions.SpokeClusterName, "Name of the consumer")
fs.BoolVar(&commonOptions.CommoOpts.CmdConfig.DisableLeaderElection, "disable-leader-election",
true, "Disable leader election.")
fs.DurationVar(&certRefreshDuration, "cert-refresh-duration",
certRefreshDuration, "Client certificate refresh duration for MQTT broker.")
}
168 changes: 168 additions & 0 deletions test/e2e/pkg/certificate_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
package e2e_test

import (
"bytes"
"context"
"fmt"
"io"
"net/http"
"strings"
"time"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/openshift-online/maestro/pkg/api/openapi"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

var _ = Describe("Certificate rotation", Ordered, Label("e2e-tests-spec-resync"), func() {

var resource *openapi.Resource
var validSecretData map[string][]byte

Context("Resource resync resource spec for agent reconnect with validate certificate", func() {

It("post the nginx resource to the maestro api", func() {

res := helper.NewAPIResource(consumer_name, 1)
var resp *http.Response
var err error
resource, resp, err = apiClient.DefaultApi.ApiMaestroV1ResourcesPost(context.Background()).Resource(res).Execute()
Expect(err).ShouldNot(HaveOccurred())
Expect(resp.StatusCode).To(Equal(http.StatusCreated))
Expect(*resource.Id).ShouldNot(BeEmpty())

Eventually(func() error {
deploy, err := kubeClient.AppsV1().Deployments("default").Get(context.Background(), "nginx", metav1.GetOptions{})
if err != nil {
return err
}
if *deploy.Spec.Replicas != 1 {
return fmt.Errorf("unexpected replicas, expected 1, got %d", *deploy.Spec.Replicas)
}
return nil
}, 1*time.Minute, 1*time.Second).ShouldNot(HaveOccurred())
})

It("replace the agent client certificates to mqtt broker", func() {

validSecret, err := kubeClient.CoreV1().Secrets("maestro-agent").Get(context.Background(), "maestro-agent-certs", metav1.GetOptions{})
Expect(err).ShouldNot(HaveOccurred())
invalidSecret, err := kubeClient.CoreV1().Secrets("maestro-agent").Get(context.Background(), "maestro-agent-invalid-certs", metav1.GetOptions{})
Expect(err).ShouldNot(HaveOccurred())

validSecretData = validSecret.Data
validSecret.Data = invalidSecret.Data

// update the secret with invalid data
_, err = kubeClient.CoreV1().Secrets("maestro-agent").Update(context.Background(), validSecret, metav1.UpdateOptions{})
Expect(err).ShouldNot(HaveOccurred())

// ensure maestro-agent logs have "expired certificate"
Eventually(func() error {
pods, err := kubeClient.CoreV1().Pods("maestro-agent").List(context.Background(), metav1.ListOptions{
LabelSelector: "app=maestro-agent",
})
if err != nil {
return fmt.Errorf("error in listing maestro-agent pods")
}
if len(pods.Items) == 0 {
return fmt.Errorf("maestro-agent pod not found")
}

for _, pod := range pods.Items {
req := kubeClient.CoreV1().Pods("maestro-agent").GetLogs(pod.Name, &corev1.PodLogOptions{})
podLogs, err := req.Stream(context.Background())
if err != nil {
return fmt.Errorf("error in opening pod logs stream")
}
buf := new(bytes.Buffer)
_, err = io.Copy(buf, podLogs)
if err != nil {
podLogs.Close()
return fmt.Errorf("error in copy information from podLogs to buf")
}
if strings.Contains(buf.String(), "expired certificate") {
podLogs.Close()
return nil
}
podLogs.Close()
}

return fmt.Errorf("maestro-agent logs does not have 'expired certificate'")
}, 3*time.Minute, 3*time.Second).ShouldNot(HaveOccurred())
})

It("patch the nginx resource", func() {

newRes := helper.NewAPIResource(consumer_name, 2)
patchedResource, resp, err := apiClient.DefaultApi.ApiMaestroV1ResourcesIdPatch(context.Background(), *resource.Id).
ResourcePatchRequest(openapi.ResourcePatchRequest{Version: resource.Version, Manifest: newRes.Manifest}).Execute()
Expect(err).ShouldNot(HaveOccurred())
Expect(resp.StatusCode).To(Equal(http.StatusOK))
Expect(*patchedResource.Version).To(Equal(*resource.Version + 1))

})

It("ensure the resource is not updated", func() {

// ensure the "nginx" deployment in the "default" namespace is not updated
Consistently(func() error {
deploy, err := kubeClient.AppsV1().Deployments("default").Get(context.Background(), "nginx", metav1.GetOptions{})
if err != nil {
return nil
}
if *deploy.Spec.Replicas != 1 {
return fmt.Errorf("unexpected replicas, expected 1, got %d", *deploy.Spec.Replicas)
}
return nil
}, 30*time.Second, 2*time.Second).ShouldNot(HaveOccurred())
})

It("recover the agent client certificates to mqtt broker", func() {

certSecret, err := kubeClient.CoreV1().Secrets("maestro-agent").Get(context.Background(), "maestro-agent-certs", metav1.GetOptions{})
Expect(err).ShouldNot(HaveOccurred())
certSecret.Data = validSecretData

// update the secret with invalid data
_, err = kubeClient.CoreV1().Secrets("maestro-agent").Update(context.Background(), certSecret, metav1.UpdateOptions{})
Expect(err).ShouldNot(HaveOccurred())
})

It("ensure the resource is updated", func() {

Eventually(func() error {
deploy, err := kubeClient.AppsV1().Deployments("default").Get(context.Background(), "nginx", metav1.GetOptions{})
if err != nil {
return err
}
if *deploy.Spec.Replicas != 2 {
return fmt.Errorf("unexpected replicas, expected 2, got %d", *deploy.Spec.Replicas)
}
return nil
}, 3*time.Minute, 3*time.Second).ShouldNot(HaveOccurred())
})

It("delete the nginx resource", func() {

resp, err := apiClient.DefaultApi.ApiMaestroV1ResourcesIdDelete(context.Background(), *resource.Id).Execute()
Expect(err).ShouldNot(HaveOccurred())
Expect(resp.StatusCode).To(Equal(http.StatusNoContent))

Eventually(func() error {
_, err := kubeClient.AppsV1().Deployments("default").Get(context.Background(), "nginx", metav1.GetOptions{})
if err != nil {
if errors.IsNotFound(err) {
return nil
}
return err
}
return fmt.Errorf("nginx deployment still exists")
}, 1*time.Minute, 1*time.Second).ShouldNot(HaveOccurred())
})

})
})
2 changes: 1 addition & 1 deletion test/e2e/pkg/status_resync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ var _ = Describe("Status resync", Ordered, Label("e2e-tests-status-resync"), fun
return fmt.Errorf("unexpected status, expected testKubeClient, got %s", string(statusJSON))
}
return nil
}, 3*time.Minute, 3*time.Second).ShouldNot(HaveOccurred())
}, 1*time.Minute, 1*time.Second).ShouldNot(HaveOccurred())
})

It("delete the nginx resource", func() {
Expand Down
104 changes: 98 additions & 6 deletions test/e2e/setup/e2e_setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,25 @@
# See the License for the specific language governing permissions and
# limitations under the License.

if ! command -v kind >/dev/null 2>&1; then
kind_version=0.12.0
step_version=0.26.2

if ! command -v kind >/dev/null 2>&1; then
echo "This script will install kind (https://kind.sigs.k8s.io/) on your machine."
curl -Lo ./kind-amd64 "https://kind.sigs.k8s.io/dl/v0.12.0/kind-$(uname)-amd64"
curl -Lo ./kind-amd64 "https://kind.sigs.k8s.io/dl/v${kind_version}/kind-$(uname)-amd64"
chmod +x ./kind-amd64
sudo mv ./kind-amd64 /usr/local/bin/kind
fi

if ! command -v step >/dev/null 2>&1; then
echo "This script will install step (https://smallstep.com/docs/step-cli/) on your machine."
curl -Lo ./step_${step_version}_amd64.tar.gz "https://dl.smallstep.com/gh-release/cli/gh-release-header/v${step_version}/step_$(uname | tr '[:upper:]' '[:lower:]')_${step_version}_amd64.tar.gz"
tar -xzvf step_${step_version}_amd64.tar.gz
chmod +x ./step_${step_version}/bin/step
sudo mv ./step_${step_version}/bin/step /usr/local/bin/step
rm -rf ./step_${step_version}_amd64.tar.gz ./step_${step_version}
fi

# 1. create KinD cluster
cat << EOF | kind create cluster --name maestro --kubeconfig ./test/e2e/.kubeconfig --config=-
kind: Cluster
Expand Down Expand Up @@ -55,6 +67,7 @@ fi

# 3. deploy service-ca
kubectl label node maestro-control-plane node-role.kubernetes.io/master=
kubectl get pod -A
kubectl apply -f ./test/e2e/setup/service-ca-crds
kubectl $1 create ns openshift-config-managed
kubectl $1 apply -f ./test/e2e/setup/service-ca/
Expand All @@ -76,19 +89,98 @@ kubectl patch service maestro -n $namespace -p '{"spec":{"type":"NodePort", "por
# expose the maestro grpc server via nodeport
kubectl patch service maestro-grpc -n $namespace -p '{"spec":{"type":"NodePort", "ports": [{"nodePort": 30090, "port": 8090, "targetPort": 8090}]}}' --type merge

# 5. create a consumer
# 5. create a self-signed certificate for mqtt
certDir=$(mktemp -d)
step certificate create "maestro-mqtt-ca" ${certDir}/ca.crt ${certDir}/ca.key --profile root-ca --no-password --insecure
step certificate create "maestro-mqtt-broker" ${certDir}/server.crt ${certDir}/server.key -san maestro-mqtt -san maestro-mqtt.maestro --profile leaf --ca ${certDir}/ca.crt --ca-key ${certDir}/ca.key --no-password --insecure
step certificate create "maestro-server-client" ${certDir}/server-client.crt ${certDir}/server-client.key --profile leaf --ca ${certDir}/ca.crt --ca-key ${certDir}/ca.key --no-password --insecure
step certificate create "maestro-agent-client" ${certDir}/agent-client.crt ${certDir}/agent-client.key --profile leaf --ca ${certDir}/ca.crt --ca-key ${certDir}/ca.key --no-password --insecure
step certificate create "maestro-invalid-client" ${certDir}/invalid-client.crt ${certDir}/invalid-client.key --profile leaf --not-after 1m --ca ${certDir}/ca.crt --ca-key ${certDir}/ca.key --no-password --insecure

# apply the mosquitto configmap
cat << EOF | kubectl -n $namespace apply -f -
apiVersion: v1
kind: ConfigMap
metadata:
name: maestro-mqtt
data:
mosquitto.conf: |
listener 1883 0.0.0.0
allow_anonymous false
use_identity_as_username true
cafile /mosquitto/certs/ca.crt
keyfile /mosquitto/certs/server.key
certfile /mosquitto/certs/server.crt
tls_version tlsv1.2
require_certificate true
EOF

# create secret containing the mqtt certs and patch the maestro-mqtt deployment
kubectl create secret generic maestro-mqtt-certs -n $namespace --from-file=ca.crt=${certDir}/ca.crt --from-file=server.crt=${certDir}/server.crt --from-file=server.key=${certDir}/server.key
kubectl patch deploy/maestro-mqtt -n $namespace --type='json' -p='[{"op":"add","path":"/spec/template/spec/volumes/-","value":{"name":"mosquitto-certs","secret":{"secretName":"maestro-mqtt-certs"}}},{"op":"add","path":"/spec/template/spec/containers/0/volumeMounts/-","value":{"name":"mosquitto-certs","mountPath":"/mosquitto/certs"}}]'
kubectl wait deploy/maestro-mqtt -n $namespace --for condition=Available=True --timeout=200s

# apply the maestro-mqtt secret
cat << EOF | kubectl -n $namespace apply -f -
apiVersion: v1
kind: Secret
metadata:
name: maestro-mqtt
stringData:
config.yaml: |
brokerHost: maestro-mqtt.maestro:1883
caFile: /secrets/mqtt-certs/ca.crt
clientCertFile: /secrets/mqtt-certs/client.crt
clientKeyFile: /secrets/mqtt-certs/client.key
topics:
sourceEvents: sources/maestro/consumers/+/sourceevents
agentEvents: \$share/statussubscribers/sources/maestro/consumers/+/agentevents
EOF

# create secret containing the client certs to mqtt broker and patch the maestro deployment
kubectl create secret generic maestro-server-certs -n $namespace --from-file=ca.crt=${certDir}/ca.crt --from-file=client.crt=${certDir}/server-client.crt --from-file=client.key=${certDir}/server-client.key
kubectl create secret generic maestro-server-invalid-certs -n $namespace --from-file=ca.crt=${certDir}/ca.crt --from-file=client.crt=${certDir}/invalid-client.crt --from-file=client.key=${certDir}/invalid-client.key
kubectl patch deploy/maestro -n $namespace --type='json' -p='[{"op": "add","path":"/spec/template/spec/volumes/-","value":{"name":"mqtt-certs","secret":{"secretName":"maestro-server-certs"}}},{"op":"add","path":"/spec/template/spec/containers/0/volumeMounts/-","value":{"name":"mqtt-certs","mountPath":"/secrets/mqtt-certs"}},{"op":"replace","path":"/spec/template/spec/containers/0/livenessProbe/initialDelaySeconds","value":1},{"op":"replace","path":"/spec/template/spec/containers/0/readinessProbe/initialDelaySeconds","value":1}]'
kubectl wait deploy/maestro -n $namespace --for condition=Available=True --timeout=200s

# 6. create a consumer
export external_host_ip="127.0.0.1"
echo $external_host_ip > ./test/e2e/.external_host_ip
kubectl wait deployment maestro -n $namespace --for condition=Available=True --timeout=200s

sleep 5 # wait 5 seconds for the service ready

# the consumer name is not specified, the consumer id will be used as the consumer name
export consumer_name=$(curl -k -X POST -H "Content-Type: application/json" https://${external_host_ip}:30080/api/maestro/v1/consumers -d '{}' | jq -r '.id')
echo $consumer_name > ./test/e2e/.consumer_name

# 6. deploy maestro agent into maestro-agent namespace
# 7. deploy maestro agent into maestro-agent namespace
export agent_namespace=maestro-agent
kubectl create namespace $agent_namespace || true
kubectl create namespace ${agent_namespace} || true
make agent-template
kubectl apply -n ${agent_namespace} --filename="templates/agent-template.json" | egrep --color=auto 'configured|$$'

# apply the maestro-mqtt secret
cat << EOF | kubectl -n ${agent_namespace} apply -f -
apiVersion: v1
kind: Secret
metadata:
name: maestro-agent-mqtt
stringData:
config.yaml: |
brokerHost: maestro-mqtt.maestro:1883
caFile: /secrets/mqtt-certs/ca.crt
clientCertFile: /secrets/mqtt-certs/client.crt
clientKeyFile: /secrets/mqtt-certs/client.key
topics:
sourceEvents: sources/maestro/consumers/${consumer_name}/sourceevents
agentEvents: sources/maestro/consumers/${consumer_name}/agentevents
EOF

# create secret containing the client certs to mqtt broker and patch the maestro-agent deployment
kubectl create secret generic maestro-agent-certs -n ${agent_namespace} --from-file=ca.crt=${certDir}/ca.crt --from-file=client.crt=${certDir}/agent-client.crt --from-file=client.key=${certDir}/agent-client.key
kubectl create secret generic maestro-agent-invalid-certs -n ${agent_namespace} --from-file=ca.crt=${certDir}/ca.crt --from-file=client.crt=${certDir}/invalid-client.crt --from-file=client.key=${certDir}/invalid-client.key
kubectl patch deploy/maestro-agent -n ${agent_namespace} --type='json' -p='[{"op":"add","path":"/spec/template/spec/volumes/-","value":{"name":"mqtt-certs","secret":{"secretName":"maestro-agent-certs"}}},{"op":"add","path":"/spec/template/spec/containers/0/volumeMounts/-","value":{"name":"mqtt-certs","mountPath":"/secrets/mqtt-certs"}},{"op":"add","path":"/spec/template/spec/containers/0/command/-","value":"--cert-refresh-duration=5s"}]'
kubectl wait deploy/maestro-agent -n ${agent_namespace} --for condition=Available=True --timeout=200s

# remove the certs
rm -rf ${certDir}

0 comments on commit a46b394

Please sign in to comment.