Skip to content

Commit

Permalink
feat: approve csr action (#11)
Browse files Browse the repository at this point in the history
  • Loading branch information
varnastadeus authored Nov 29, 2021
1 parent 3a0cdb5 commit 41d695f
Show file tree
Hide file tree
Showing 7 changed files with 622 additions and 29 deletions.
3 changes: 2 additions & 1 deletion actions/actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ func NewService(
reflect.TypeOf(&castai.ActionDrainNode{}): newDrainNodeHandler(log, clientset),
reflect.TypeOf(&castai.ActionPatchNode{}): newPatchNodeHandler(log, clientset),
reflect.TypeOf(&castai.ActionCreateEvent{}): newCreateEventHandler(log, clientset),
reflect.TypeOf(&castai.ActionApproveCSR{}): newApproveCSRHandler(log, clientset),
},
}
}
Expand Down Expand Up @@ -87,7 +88,7 @@ func (s *service) doWork(ctx context.Context) error {
return fmt.Errorf("polling actions: %w", err)
}

pollDuration := time.Now().Sub(start)
pollDuration := time.Since(start)
if len(actions) == 0 {
s.log.Infof("no actions returned in %s", pollDuration)
return nil
Expand Down
97 changes: 97 additions & 0 deletions actions/approve_csr_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package actions

import (
"context"
"errors"
"fmt"
"time"

"github.com/sirupsen/logrus"
"k8s.io/client-go/kubernetes"

"github.com/castai/cluster-controller/castai"
"github.com/castai/cluster-controller/csr"
)

func newApproveCSRHandler(log logrus.FieldLogger, clientset kubernetes.Interface) ActionHandler {
return &approveCSRHandler{
log: log,
clientset: clientset,
}
}

type approveCSRHandler struct {
log logrus.FieldLogger
clientset kubernetes.Interface
csrFetchInterval time.Duration
initialCSRFetchTimeout time.Duration
}

func (h *approveCSRHandler) Handle(ctx context.Context, data interface{}) error {
req, ok := data.(*castai.ActionApproveCSR)
if !ok {
return fmt.Errorf("unexpected type %T for approve csr handler", data)
}

log := h.log.WithField("node_name", req.NodeName)

// First get original csr which is created by kubelet.
log.Debug("getting initial csr")
cert, err := h.getInitialNodeCSR(ctx, req.NodeName)
if err != nil {
return fmt.Errorf("getting initial csr: %w", err)
}

if cert.Approved() {
log.Debug("initial csr is already approved")
return nil
}

// Since this new csr may be denied we need to delete it.
log.Debug("deleting old csr")
if err := csr.DeleteCertificate(ctx, h.clientset, cert); err != nil {
return fmt.Errorf("deleting csr: %w", err)
}

// Create new csr with the same request data as original csr.
log.Debug("requesting new csr")
cert, err = csr.RequestCertificate(
ctx,
h.clientset,
cert,
)
if err != nil {
return fmt.Errorf("requesting new csr: %w", err)
}

// Approve new csr.
log.Debug("approving new csr")
resp, err := csr.ApproveCertificate(ctx, h.clientset, cert)
if err != nil {
return fmt.Errorf("approving csr: %w", err)
}
if resp.Approved() {
return nil
}
return errors.New("certificate signing request was not approved")
}

func (h *approveCSRHandler) getInitialNodeCSR(ctx context.Context, nodeName string) (*csr.Certificate, error) {
csrFetchCtx, cancel := context.WithTimeout(ctx, h.initialCSRFetchTimeout)
defer cancel()

for {
select {
case <-csrFetchCtx.Done():
return nil, csrFetchCtx.Err()
case <-time.After(h.csrFetchInterval):
cert, err := csr.GetCertificateByNodeName(ctx, h.clientset, nodeName)
if err != nil && !errors.Is(err, csr.ErrNodeCertificateNotFound) {
return nil, err
}
if cert != nil {
return cert, nil
}
}
}
}
162 changes: 162 additions & 0 deletions actions/approve_csr_handler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
package actions

import (
"context"
"testing"
"time"

"github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"
certv1 "k8s.io/api/certificates/v1"
certv1beta1 "k8s.io/api/certificates/v1beta1"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/kubernetes/fake"
ktest "k8s.io/client-go/testing"

"github.com/castai/cluster-controller/castai"
)

func TestApproveCSRHandler(t *testing.T) {
r := require.New(t)
log := logrus.New()
log.SetLevel(logrus.DebugLevel)

t.Run("approve v1 csr successfully", func(t *testing.T) {
csr := &certv1.CertificateSigningRequest{
TypeMeta: metav1.TypeMeta{},
ObjectMeta: metav1.ObjectMeta{
Name: "node-csr-123",
},
Spec: certv1.CertificateSigningRequestSpec{
Request: []byte(`-----BEGIN CERTIFICATE REQUEST-----
MIIBADCBqAIBADBGMRUwEwYDVQQKEwxzeXN0ZW06bm9kZXMxLTArBgNVBAMTJHN5
c3RlbTpub2RlOmdrZS1hbS1nY3AtY2FzdC01ZGM0ZjRlYzBZMBMGByqGSM49AgEG
CCqGSM49AwEHA0IABF/9p5y4t09Y6yAlhF0OthexpL0CEyNHVnVmmbB4jridyJzW
vrcLKbFat0qvJftODQhEA/lqByJepB4YGqQGhregADAKBggqhkjOPQQDAgNHADBE
AiAHVYZXHxxspoV0hcfn2Pdsl89fIPCOFy/K1PqSUR6QNAIgYdt51ZbQt9rgM2BD
39zKjbxU1t82BlrW9/NrmaadNHQ=
-----END CERTIFICATE REQUEST-----`),
SignerName: "kubelet",
ExpirationSeconds: nil,
Usages: []certv1.KeyUsage{
certv1.KeyUsage("kubelet"),
},
Username: "kubelet",
UID: "",
Groups: nil,
Extra: nil,
},
Status: certv1.CertificateSigningRequestStatus{},
}
client := fake.NewSimpleClientset(csr)
// Return NotFound for all v1beta1 resources.
client.PrependReactor("*", "*", func(action ktest.Action) (handled bool, ret runtime.Object, err error) {
if action.GetResource().Version == "v1beta1" {
err = apierrors.NewNotFound(schema.GroupResource{}, action.GetResource().String())
return true, nil, err
}
return
})
client.PrependReactor("update", "certificatesigningrequests", func(action ktest.Action) (handled bool, ret runtime.Object, err error) {
approved := csr.DeepCopy()
approved.Status.Conditions = []certv1.CertificateSigningRequestCondition{
{
Type: certv1.CertificateApproved,
Reason: "CastaiApprove",
Message: "approved",
LastUpdateTime: metav1.Now(),
Status: v1.ConditionTrue,
},
}
return true, approved, nil
})

h := &approveCSRHandler{
log: log,
clientset: client,
csrFetchInterval: 1 * time.Millisecond,
initialCSRFetchTimeout: 10 * time.Millisecond,
}

ctx := context.Background()
err := h.Handle(ctx, &castai.ActionApproveCSR{NodeName: "gke-am-gcp-cast-5dc4f4ec"})
r.NoError(err)
})

t.Run("approve v1beta1 csr successfully", func(t *testing.T) {
csr := &certv1beta1.CertificateSigningRequest{
TypeMeta: metav1.TypeMeta{},
ObjectMeta: metav1.ObjectMeta{
Name: "node-csr-123",
},
Spec: certv1beta1.CertificateSigningRequestSpec{
Request: []byte(`-----BEGIN CERTIFICATE REQUEST-----
MIIBADCBqAIBADBGMRUwEwYDVQQKEwxzeXN0ZW06bm9kZXMxLTArBgNVBAMTJHN5
c3RlbTpub2RlOmdrZS1hbS1nY3AtY2FzdC01ZGM0ZjRlYzBZMBMGByqGSM49AgEG
CCqGSM49AwEHA0IABF/9p5y4t09Y6yAlhF0OthexpL0CEyNHVnVmmbB4jridyJzW
vrcLKbFat0qvJftODQhEA/lqByJepB4YGqQGhregADAKBggqhkjOPQQDAgNHADBE
AiAHVYZXHxxspoV0hcfn2Pdsl89fIPCOFy/K1PqSUR6QNAIgYdt51ZbQt9rgM2BD
39zKjbxU1t82BlrW9/NrmaadNHQ=
-----END CERTIFICATE REQUEST-----`),
ExpirationSeconds: nil,
Username: "kubelet",
UID: "",
Groups: nil,
Extra: nil,
},
Status: certv1beta1.CertificateSigningRequestStatus{},
}
client := fake.NewSimpleClientset(csr)
// Return NotFound for all v1 resources.
client.PrependReactor("*", "*", func(action ktest.Action) (handled bool, ret runtime.Object, err error) {
if action.GetResource().Version == "v1" {
err = apierrors.NewNotFound(schema.GroupResource{}, action.GetResource().String())
return true, nil, err
}
return
})

client.PrependReactor("update", "certificatesigningrequests", func(action ktest.Action) (handled bool, ret runtime.Object, err error) {
approved := csr.DeepCopy()
approved.Status.Conditions = []certv1beta1.CertificateSigningRequestCondition{
{
Type: certv1beta1.CertificateApproved,
Reason: "CastaiApprove",
Message: "approved",
LastUpdateTime: metav1.Now(),
Status: v1.ConditionTrue,
},
}
return true, approved, nil
})

h := &approveCSRHandler{
log: log,
clientset: client,
csrFetchInterval: 1 * time.Millisecond,
initialCSRFetchTimeout: 10 * time.Millisecond,
}

ctx := context.Background()
err := h.Handle(ctx, &castai.ActionApproveCSR{NodeName: "gke-am-gcp-cast-5dc4f4ec"})
r.NoError(err)
})

t.Run("return timeout error when no initial csr found for node", func(t *testing.T) {
client := fake.NewSimpleClientset()
h := &approveCSRHandler{
log: log,
clientset: client,
csrFetchInterval: 1 * time.Millisecond,
initialCSRFetchTimeout: 10 * time.Millisecond,
}

ctx := context.Background()
err := h.Handle(ctx, &castai.ActionApproveCSR{NodeName: "node"})
r.EqualError(err, "getting initial csr: context deadline exceeded")
})
}
9 changes: 9 additions & 0 deletions castai/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type ClusterAction struct {
ActionDrainNode *ActionDrainNode `json:"actionDrainNode,omitempty"`
ActionPatchNode *ActionPatchNode `json:"actionPatchNode,omitempty"`
ActionCreateEvent *ActionCreateEvent `json:"actionCreateEvent,omitempty"`
ActionApproveCSR *ActionApproveCSR `json:"actionApproveCsr,omitempty"`
CreatedAt time.Time `json:"createdAt"`
DoneAt *time.Time `json:"doneAt,omitempty"`
Error *string `json:"error,omitempty"`
Expand All @@ -39,6 +40,10 @@ func (c *ClusterAction) Data() interface{} {
if c.ActionCreateEvent != nil {
return c.ActionCreateEvent
}
if c.ActionApproveCSR != nil {
return c.ActionApproveCSR
}

return nil
}

Expand All @@ -59,6 +64,10 @@ type ActionDrainNode struct {
Force bool `json:"force"`
}

type ActionApproveCSR struct {
NodeName string `json:"nodeName"`
}

type ActionPatchNode struct {
NodeName string `json:"nodeName"`
Labels map[string]string `json:"labels"`
Expand Down
Loading

0 comments on commit 41d695f

Please sign in to comment.