Skip to content

Commit

Permalink
Add pkg/client/dqlite package (#144)
Browse files Browse the repository at this point in the history
* Add pkg/client/dqlite package

* Add Snap.K8sDqliteClient() method

* Use pkg/client/dqlite to remove cluster nodes

* Remove obsolete dqlite code
  • Loading branch information
neoaggelos authored Feb 22, 2024
1 parent a24f932 commit 95ef873
Show file tree
Hide file tree
Showing 13 changed files with 295 additions and 110 deletions.
61 changes: 61 additions & 0 deletions src/k8s/pkg/client/dqlite/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package dqlite

import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"os"

"github.com/canonical/go-dqlite/app"
"github.com/canonical/go-dqlite/client"
)

type ClientOpts struct {
// ClusterYAML is the path cluster.yaml, containing the list of known dqlite nodes.
ClusterYAML string
// ClusterCert is the path to cluster.crt, containing the dqlite cluster certificate.
ClusterCert string
// ClusterKey is the path to cluster.key, containing the dqlite cluster private key.
ClusterKey string
}

type Client struct {
// clientGetter dynamically creates a dqlite client. This is because the dqlite client
// must dynamically connect to the leader node of the cluster.
clientGetter func(context.Context) (*client.Client, error)
}

// NewClient creates a new client connected to the leader of the dqlite cluster.
func NewClient(ctx context.Context, opts ClientOpts) (*Client, error) {
var options []client.Option
if opts.ClusterCert != "" && opts.ClusterKey != "" {
cert, err := tls.LoadX509KeyPair(opts.ClusterCert, opts.ClusterKey)
if err != nil {
return nil, fmt.Errorf("failed to load x509 keypair from certificate %q and key %q: %w", opts.ClusterCert, opts.ClusterKey, err)
}
b, err := os.ReadFile(opts.ClusterCert)
if err != nil {
return nil, fmt.Errorf("failed to read x509 certificate: %w", err)
}
pool := x509.NewCertPool()
if !pool.AppendCertsFromPEM(b) {
return nil, fmt.Errorf("bad certificate in %q", opts.ClusterCert)
}
options = append(options, client.WithDialFunc(client.DialFuncWithTLS(client.DefaultDialFunc, app.SimpleDialTLSConfig(cert, pool))))
}

return &Client{
clientGetter: func(ctx context.Context) (*client.Client, error) {
store, err := client.NewYamlNodeStore(opts.ClusterYAML)
if err != nil {
return nil, fmt.Errorf("failed to open node store from %q: %w", opts.ClusterYAML, err)
}
c, err := client.FindLeader(ctx, store, options...)
if err != nil {
return nil, fmt.Errorf("failed to connect to dqlite leader: %w", err)
}
return c, nil
},
}, nil
}
14 changes: 14 additions & 0 deletions src/k8s/pkg/client/dqlite/list.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package dqlite

import (
"context"
"fmt"
)

func (c *Client) ListMembers(ctx context.Context) ([]NodeInfo, error) {
client, err := c.clientGetter(ctx)
if err != nil {
return nil, fmt.Errorf("failed to create dqlite client: %w", err)
}
return client.Cluster(ctx)
}
47 changes: 47 additions & 0 deletions src/k8s/pkg/client/dqlite/remove.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package dqlite

import (
"context"
"fmt"
)

func (c *Client) RemoveNodeByAddress(ctx context.Context, address string) error {
client, err := c.clientGetter(ctx)
if err != nil {
return fmt.Errorf("failed to create dqlite client: %w", err)
}
members, err := client.Cluster(ctx)
if err != nil {
return fmt.Errorf("failed to retrieve cluster nodes")
}

var (
memberExists, clusterHasOtherVoters bool
memberToRemove NodeInfo
)
for _, member := range members {
switch {
case member.Address == address:
memberToRemove = member
memberExists = true

case member.Address != address && member.Role == Voter:
clusterHasOtherVoters = true
}
}

if !memberExists {
return fmt.Errorf("cluster does not have a node with address %v", address)
}

// TODO: consider using client.Transfer() for a different node to become leader
if !clusterHasOtherVoters {
return fmt.Errorf("not removing node because there are no other voter members")
}

if err := client.Remove(ctx, memberToRemove.ID); err != nil {
return fmt.Errorf("failed to remove node %#v from dqlite cluster: %w", memberToRemove, err)
}

return nil
}
64 changes: 64 additions & 0 deletions src/k8s/pkg/client/dqlite/remove_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package dqlite_test

import (
"context"
"path"
"testing"

"github.com/canonical/k8s/pkg/client/dqlite"
. "github.com/onsi/gomega"
)

func TestRemoveNodeByAddress(t *testing.T) {
t.Run("Spare", func(t *testing.T) {
withDqliteCluster(t, 2, func(ctx context.Context, dirs []string) {
g := NewWithT(t)
client, err := dqlite.NewClient(ctx, dqlite.ClientOpts{
ClusterYAML: path.Join(dirs[0], "cluster.yaml"),
})
g.Expect(err).To(BeNil())
g.Expect(client).NotTo(BeNil())

members, err := client.ListMembers(ctx)
g.Expect(err).To(BeNil())
g.Expect(members).To(HaveLen(2))

memberToRemove := members[0].Address
if members[0].Role == dqlite.Voter {
memberToRemove = members[1].Address
}
g.Expect(client.RemoveNodeByAddress(ctx, memberToRemove)).To(BeNil())

members, err = client.ListMembers(ctx)
g.Expect(err).To(BeNil())
g.Expect(members).To(HaveLen(1))
})
})

t.Run("LastVoterFails", func(t *testing.T) {
withDqliteCluster(t, 2, func(ctx context.Context, dirs []string) {
g := NewWithT(t)
client, err := dqlite.NewClient(ctx, dqlite.ClientOpts{
ClusterYAML: path.Join(dirs[0], "cluster.yaml"),
})
g.Expect(err).To(BeNil())
g.Expect(client).NotTo(BeNil())

members, err := client.ListMembers(ctx)
g.Expect(err).To(BeNil())
g.Expect(members).To(HaveLen(2))

memberToRemove := members[0].Address
if members[0].Role != dqlite.Voter {
memberToRemove = members[1].Address
}

// Removing the last Voter should fail
g.Expect(client.RemoveNodeByAddress(ctx, memberToRemove)).ToNot(BeNil())

members, err = client.ListMembers(ctx)
g.Expect(err).To(BeNil())
g.Expect(members).To(HaveLen(2))
})
})
}
11 changes: 11 additions & 0 deletions src/k8s/pkg/client/dqlite/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package dqlite

import (
"github.com/canonical/go-dqlite/client"
)

// NodeInfo is information about a node in the dqlite cluster.
type NodeInfo = client.NodeInfo

// Voter is the role for nodes that participate in the Raft quorum.
var Voter = client.Voter
59 changes: 59 additions & 0 deletions src/k8s/pkg/client/dqlite/util_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package dqlite_test

import (
"context"
"fmt"
"testing"

"github.com/canonical/go-dqlite/app"
)

// nextDqlitePort is used in withDqliteCluster() to pick unique port numbers for the dqlite nodes.
var nextDqlitePort = 37312

// withDqliteCluster creates a temporary dqlite cluster of the desired size, and is meant to be
// used in tests for *dqlite.Client.
//
// Example usage:
//
// ```
//
// func TestDqliteSomething(t *testing.T) {
// withDqliteCluster(t, 3, func(ctx context.Context, dirs []string) {
// fmt.Println("I have 3 nodes, directories are in %v", dirs)
//
// // ...
// })
// }
//
// ```
func withDqliteCluster(t *testing.T, size int, f func(ctx context.Context, dirs []string)) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

if size < 1 {
panic(fmt.Sprintf("dqlite cluster size %v must be at least 1", size))
}

var dirs []string
firstPort := nextDqlitePort
for i := 0; i < size; i++ {
dir := t.TempDir()
options := []app.Option{app.WithAddress(fmt.Sprintf("127.0.0.1:%d", nextDqlitePort))}
nextDqlitePort++
if i > 0 {
options = append(options, app.WithCluster([]string{fmt.Sprintf("127.0.0.1:%d", firstPort)}))
}
node, err := app.New(dir, options...)
if err != nil {
t.Fatalf("Failed to create dqlite node %d: %v", i, err)
}
if err := node.Ready(ctx); err != nil {
t.Fatalf("Failed to start dqlite node %d: %v", i, err)
}

dirs = append(dirs, dir)
}

f(ctx, dirs)
}
34 changes: 0 additions & 34 deletions src/k8s/pkg/k8s/setup/k8s_dqlite.go

This file was deleted.

26 changes: 18 additions & 8 deletions src/k8s/pkg/k8sd/app/hooks_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,9 @@ package app

import (
"fmt"
"log"
"net"
"path"

old_setup "github.com/canonical/k8s/pkg/k8s/setup"
"github.com/canonical/k8s/pkg/k8sd/api/impl"
"github.com/canonical/k8s/pkg/k8sd/pki"
"github.com/canonical/k8s/pkg/k8sd/setup"
Expand Down Expand Up @@ -147,17 +145,29 @@ func onPostJoin(s *state.State, initConfig map[string]string) error {
func onPreRemove(s *state.State, force bool) error {
snap := snap.SnapFromContext(s.Context)

// Remove k8s dqlite node from cluster.
// Fails if the k8s-dqlite cluster would not have a leader afterwards.
log.Println("Leave k8s-dqlite cluster")
err := old_setup.LeaveK8sDqliteCluster(s.Context, snap, s)
cfg, err := utils.GetClusterConfig(s.Context, s)
if err != nil {
return fmt.Errorf("failed to leave k8s-dqlite cluster: %w", err)
return fmt.Errorf("failed to retrieve k8sd cluster config: %w", err)
}

// configure datastore
switch cfg.APIServer.Datastore {
case "k8s-dqlite":
client, err := snap.K8sDqliteClient(s.Context)
if err != nil {
return fmt.Errorf("failed to create k8s-dqlite client: %w", err)
}

nodeAddress := net.JoinHostPort(s.Address().Hostname(), fmt.Sprintf("%d", cfg.K8sDqlite.Port))
if err := client.RemoveNodeByAddress(s.Context, nodeAddress); err != nil {
return fmt.Errorf("failed to remove node with address %s from k8s-dqlite cluster: %w", nodeAddress, err)
}
default:
}

c, err := k8s.NewClient(snap)
if err != nil {
return fmt.Errorf("failed to create k8s client: %w", err)
return fmt.Errorf("failed to create Kubernetes client: %w", err)
}

if err := c.DeleteNode(s.Context, s.Name()); err != nil {
Expand Down
3 changes: 3 additions & 0 deletions src/k8s/pkg/snap/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package snap
import (
"context"

"github.com/canonical/k8s/pkg/client/dqlite"
"github.com/canonical/k8s/pkg/k8sd/types"
"k8s.io/cli-runtime/pkg/genericclioptions"
)
Expand Down Expand Up @@ -44,4 +45,6 @@ type Snap interface {
Components() map[string]types.Component // available components

KubernetesRESTClientGetter(namespace string) genericclioptions.RESTClientGetter // admin kubernetes client

K8sDqliteClient(ctx context.Context) (*dqlite.Client, error) // go-dqlite client for k8s-dqlite
}
5 changes: 5 additions & 0 deletions src/k8s/pkg/snap/mock/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package mock
import (
"context"

"github.com/canonical/k8s/pkg/client/dqlite"
"github.com/canonical/k8s/pkg/k8sd/types"
"github.com/canonical/k8s/pkg/snap"
"k8s.io/cli-runtime/pkg/genericclioptions"
Expand Down Expand Up @@ -32,6 +33,7 @@ type Mock struct {
LockFilesDir string
Components map[string]types.Component
KubernetesRESTClientGetter genericclioptions.RESTClientGetter
K8sDqliteClient *dqlite.Client
}

// Snap is a mock implementation for snap.Snap.
Expand Down Expand Up @@ -140,5 +142,8 @@ func (s *Snap) Components() map[string]types.Component {
func (s *Snap) KubernetesRESTClientGetter(namespace string) genericclioptions.RESTClientGetter {
return s.Mock.KubernetesRESTClientGetter
}
func (s *Snap) K8sDqliteClient(context.Context) (*dqlite.Client, error) {
return s.Mock.K8sDqliteClient, nil
}

var _ snap.Snap = &Snap{}
Loading

0 comments on commit 95ef873

Please sign in to comment.