Skip to content

Commit

Permalink
Add support for syncing block volumes
Browse files Browse the repository at this point in the history
Right now we only support syncing file volumes.
This adds support for block volumes using the
blockrsync utility code here:
https://github.com/awels/blockrsync

This also modifies the transport code to allow a
second transport to be used. This is needed
because we start a second blockrsync server that
needs a different transport to get the routing of
data correct.

Added unit tests for the transport and rsync code

Signed-off-by: Alexander Wels <[email protected]>
  • Loading branch information
awels committed May 17, 2024
1 parent 75a3d60 commit 7c503ac
Show file tree
Hide file tree
Showing 27 changed files with 1,729 additions and 183 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/Luzifer/go-dhparam v1.1.0
github.com/evanphx/json-patch v4.11.0+incompatible
github.com/evanphx/json-patch/v5 v5.5.0
github.com/go-logr/logr v0.4.0
github.com/openshift/api v0.0.0-20210625082935-ad54d363d274
github.com/pkg/errors v0.9.1
github.com/sirupsen/logrus v1.8.1
Expand All @@ -18,7 +19,6 @@ require (

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/go-logr/logr v0.4.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/google/go-cmp v0.5.5 // indirect
Expand Down
3 changes: 2 additions & 1 deletion state_transfer/endpoint/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/konveyor/crane-lib/state_transfer/endpoint"
corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
Expand Down Expand Up @@ -137,7 +138,7 @@ func (s *ServiceEndpoint) createService(c client.Client) error {
}

err := c.Create(context.TODO(), &service, &client.CreateOptions{})
if err != nil {
if err != nil && !k8serrors.IsAlreadyExists(err) {
return err
}

Expand Down
137 changes: 89 additions & 48 deletions state_transfer/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,31 @@ package state_transfer_test

import (
"context"
"fmt"
"log"
"testing"
"time"

"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2/klogr"

"github.com/konveyor/crane-lib/state_transfer"
"github.com/konveyor/crane-lib/state_transfer/endpoint"
"github.com/konveyor/crane-lib/state_transfer/endpoint/route"
"github.com/konveyor/crane-lib/state_transfer/meta"
"sigs.k8s.io/controller-runtime/pkg/client/fake"

statetransfermeta "github.com/konveyor/crane-lib/state_transfer/meta"
"github.com/konveyor/crane-lib/state_transfer/transfer"
"github.com/konveyor/crane-lib/state_transfer/transfer/rclone"
"github.com/konveyor/crane-lib/state_transfer/transfer/rsync"
"github.com/konveyor/crane-lib/state_transfer/transport"
"github.com/konveyor/crane-lib/state_transfer/transport/stunnel"
routev1 "github.com/openshift/api/route/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/client"
)
Expand All @@ -33,28 +40,15 @@ var (

// This example shows how to wire up the components of the lib to
// transfer data from one PVC to another
func Example_basicTransfer() {
srcClient, err := client.New(srcCfg, client.Options{Scheme: runtime.NewScheme()})
if err != nil {
log.Fatal(err, "unable to create source client")
}

destClient, err := client.New(destCfg, client.Options{Scheme: runtime.NewScheme()})
if err != nil {
log.Fatal(err, "unable to create destination client")
}

// quiesce the applications if needed on the source side
err = state_transfer.QuiesceApplications(srcCfg, srcNamespace)
if err != nil {
log.Fatal(err, "unable to quiesce application on source cluster")
}
func TestExample_basicTransfer(t *testing.T) {
srcClient := buildTestClient(createPvc(srcPVC, srcNamespace))
destClient := buildTestClient()

// set up the PVC on destination to receive the data
pvc := &corev1.PersistentVolumeClaim{}
err = srcClient.Get(context.TODO(), client.ObjectKey{Namespace: srcNamespace, Name: srcPVC}, pvc)
err := srcClient.Get(context.TODO(), client.ObjectKey{Namespace: srcNamespace, Name: srcPVC}, pvc)
if err != nil {
log.Fatal(err, "unable to get source PVC")
t.Fatalf("unable to get source PVC: %v", err)
}

destPVC := pvc.DeepCopy()
Expand All @@ -64,35 +58,58 @@ func Example_basicTransfer() {
pvc.Annotations = map[string]string{}
err = destClient.Create(context.TODO(), destPVC, &client.CreateOptions{})
if err != nil {
log.Fatal(err, "unable to create destination PVC")
t.Fatalf("unable to create destination PVC: %v", err)
}

pvcList, err := transfer.NewPVCPairList(
pvcList, err := transfer.NewFilesystemPVCPairList(
transfer.NewPVCPair(pvc, destPVC),
)
if err != nil {
log.Fatal(err, "invalid pvc list")
t.Fatalf("invalid pvc list: %v", err)
}

// create a route for data transfer
r := route.NewEndpoint(
types.NamespacedName{
Namespace: pvc.Name,
Name: pvc.Namespace,
}, route.EndpointTypePassthrough, statetransfermeta.Labels, "")
Namespace: pvc.Namespace,
Name: pvc.Name,
}, route.EndpointTypePassthrough, statetransfermeta.Labels, "test.domain")
e, err := endpoint.Create(r, destClient)
if err != nil {
log.Fatal(err, "unable to create route endpoint")
t.Fatalf("unable to create route endpoint: %v", err)
}

_ = wait.PollUntil(time.Second*5, func() (done bool, err error) {
ready, err := e.IsHealthy(destClient)
if err != nil {
log.Println(err, "unable to check route health, retrying...")
return false, nil
}
return ready, nil
}, make(<-chan struct{}))
route := &routev1.Route{}
// Mark the route as admitted.
err = destClient.Get(context.TODO(), client.ObjectKey{Namespace: destPVC.Namespace, Name: destPVC.Name}, route)
if err != nil {
t.Fatalf("unable to get route: %v, %s/%s", err, destPVC.Namespace, destPVC.Name)
}
route.Status = routev1.RouteStatus{
Ingress: []routev1.RouteIngress{
{
Conditions: []routev1.RouteIngressCondition{
{
Type: routev1.RouteAdmitted,
Status: corev1.ConditionTrue,
},
},
},
},
}
err = destClient.Status().Update(context.TODO(), route)
if err != nil {
t.Fatalf("unable to update route status: %v", err)
}

// _ = wait.PollUntil(time.Second*5, func() (done bool, err error) {
ready, err := e.IsHealthy(destClient)
if err != nil {
t.Fatalf("unable to check route health: %v", err)
}
if !ready {
t.Fatalf("route is not ready")
}

// create an stunnel transport to carry the data over the route
s := stunnel.NewTransport(statetransfermeta.NewNamespacedPair(
Expand All @@ -101,25 +118,25 @@ func Example_basicTransfer() {
types.NamespacedName{
Name: destPVC.Name, Namespace: destPVC.Namespace},
), &transport.Options{})
_, err = transport.CreateServer(s, destClient, e)
_, err = transport.CreateServer(s, destClient, "fs", e)
if err != nil {
log.Fatal(err, "error creating stunnel server")
t.Fatalf("error creating stunnel server: %v", err)
}

_, err = transport.CreateClient(s, srcClient, e)
s, err = transport.CreateClient(s, srcClient, "fs", e)
if err != nil {
log.Fatal(err, "error creating stunnel client")
t.Fatalf("error creating stunnel client: %v", err)
}

// Create Rclone Transfer Pod
t, err := rclone.NewTransfer(s, r, srcCfg, destCfg, pvcList)
tr, err := rclone.NewTransfer(s, r, srcClient, destClient, pvcList)
if err != nil {
log.Fatal(err, "errror creating rclone transfer")
t.Fatalf("errror creating rclone transfer: %v", err)
}

err = transfer.CreateServer(t)
err = transfer.CreateServer(tr)
if err != nil {
log.Fatal(err, "error creating rclone server")
t.Fatalf("error creating rclone server: %v", err)
}

// Rsync Example
Expand All @@ -130,15 +147,15 @@ func Example_basicTransfer() {
}
rsyncTransferOptions = append(rsyncTransferOptions, customTransferOptions...)

rsyncTransfer, err := rsync.NewTransfer(s, r, srcCfg, destCfg, pvcList, rsyncTransferOptions...)
rsyncTransfer, err := rsync.NewTransfer(s, r, srcClient, destClient, pvcList, klogr.New(), rsyncTransferOptions...)
if err != nil {
log.Fatal(err, "error creating rsync transfer")
} else {
log.Printf("rsync transfer created for pvc %s\n", rsyncTransfer.PVCs()[0].Source().Claim().Name)
}

// Create Rclone Client Pod
err = transfer.CreateClient(t)
err = transfer.CreateClient(tr)
if err != nil {
log.Fatal(err, "error creating rclone client")
}
Expand Down Expand Up @@ -168,7 +185,7 @@ func Example_getFromCreatedObjects() {

destPVC := pvc.DeepCopy()

pvcList, err := transfer.NewPVCPairList(
pvcList, err := transfer.NewFilesystemPVCPairList(
transfer.NewPVCPair(pvc, destPVC),
)
if err != nil {
Expand All @@ -184,20 +201,20 @@ func Example_getFromCreatedObjects() {
types.NamespacedName{Namespace: srcNamespace, Name: srcPVC},
types.NamespacedName{Namespace: srcNamespace, Name: srcPVC},
)
s, err := stunnel.GetTransportFromKubeObjects(srcClient, destClient, nnPair, e, &transport.Options{})
s, err := stunnel.GetTransportFromKubeObjects(srcClient, destClient, nnPair, []endpoint.Endpoint{e}, &transport.Options{})
if err != nil {
log.Fatal(err, "error getting stunnel transport")
}

pvcList, err = transfer.NewPVCPairList(
pvcList, err = transfer.NewFilesystemPVCPairList(
transfer.NewPVCPair(pvc, nil),
)
if err != nil {
log.Fatal(err, "invalid pvc list")
}

// Create Rclone Transfer Pod
t, err := rclone.NewTransfer(s, e, srcCfg, destCfg, pvcList)
t, err := rclone.NewTransfer(s[0], e, srcClient, destClient, pvcList)
if err != nil {
log.Fatal(err, "errror creating rclone transfer")
}
Expand All @@ -224,3 +241,27 @@ func Example_getFromCreatedObjects() {

// TODO: check if the client is completed
}

func buildTestClient(objects ...runtime.Object) client.Client {
s := scheme.Scheme
schemeInitFuncs := []func(*runtime.Scheme) error{
corev1.AddToScheme,
routev1.AddToScheme,
}
for _, f := range schemeInitFuncs {
if err := f(s); err != nil {
panic(fmt.Errorf("failed to initiate the scheme %w", err))
}
}

return fake.NewClientBuilder().WithScheme(s).WithRuntimeObjects(objects...).Build()
}

func createPvc(name, namespace string) *corev1.PersistentVolumeClaim {
return &corev1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
},
}
}
74 changes: 74 additions & 0 deletions state_transfer/transfer/blockrsync/blockrsync.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package blockrsync

import (
"github.com/go-logr/logr"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/konveyor/crane-lib/state_transfer/endpoint"
"github.com/konveyor/crane-lib/state_transfer/transfer"
"github.com/konveyor/crane-lib/state_transfer/transport"
)

const (
blockrsyncImage = "quay.io/awels/blockrsync:latest"
volumeName = "volume"
BlockRsyncContainer = "blockrsync"
Proxy = "proxy"
)

type BlockrsyncTransfer struct {
log logr.Logger
username string
password string
source client.Client
destination client.Client
pvcList transfer.PVCPairList
transport transport.Transport
endpoint endpoint.Endpoint
transferOptions *TransferOptions
}

func NewTransfer(t transport.Transport, e endpoint.Endpoint, src client.Client,
dest client.Client, pvcList transfer.PVCPairList, log logr.Logger, options *TransferOptions) (transfer.Transfer, error) {
err := validatePVCList(pvcList)
if err != nil {
return nil, err
}
return &BlockrsyncTransfer{
log: log,
transport: t,
endpoint: e,
source: src,
destination: dest,
pvcList: pvcList,
transferOptions: options,
}, nil
}

func (r *BlockrsyncTransfer) PVCs() transfer.PVCPairList {
return r.pvcList
}

func (r *BlockrsyncTransfer) Endpoint() endpoint.Endpoint {
return r.endpoint
}

func (r *BlockrsyncTransfer) Transport() transport.Transport {
return r.transport
}

func (r *BlockrsyncTransfer) Source() client.Client {
return r.source
}

func (r *BlockrsyncTransfer) Destination() client.Client {
return r.destination
}

func (r *BlockrsyncTransfer) Username() string {
return r.username
}

func (r *BlockrsyncTransfer) Password() string {
return r.password
}
Loading

0 comments on commit 7c503ac

Please sign in to comment.