Skip to content

Commit

Permalink
PB-7476: Adding node affinity to kopia backup and restore jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
shkumari-px committed Jul 31, 2024
1 parent 4a8c1c5 commit c320671
Show file tree
Hide file tree
Showing 7 changed files with 129 additions and 20 deletions.
11 changes: 11 additions & 0 deletions pkg/controllers/dataexport/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -1885,6 +1885,15 @@ func startTransferJob(
psaJobUid = getAnnotationValue(dataExport, utils.PsaUIDKey)
psaJobGid = getAnnotationValue(dataExport, utils.PsaGIDKey)
}
nodeLabel := make(map[string]string)
kdmpData, err := core.Instance().GetConfigMap(jobConfigMap, jobConfigMapNs)
if err != nil {
return "", err
}
pxbJobNodeLabelValue, ok := kdmpData.Data[drivers.PxbJobNodeLabelKey]
if ok && pxbJobNodeLabelValue != "" {
nodeLabel[drivers.PxbJobNodeLabelKey] = pxbJobNodeLabelValue
}
switch drv.Name() {
case drivers.Rsync:
return drv.StartJob(
Expand Down Expand Up @@ -1929,6 +1938,7 @@ func startTransferJob(
drivers.WithExcludeFileList(excludeFileList),
drivers.WithPodDatapathType(podDataPath),
drivers.WithJobConfigMap(jobConfigMap),
drivers.WithNodeAffinity(nodeLabel),
drivers.WithJobConfigMapNs(jobConfigMapNs),
drivers.WithNfsServer(nfsServerAddr),
drivers.WithNfsExportDir(nfsExportPath),
Expand All @@ -1951,6 +1961,7 @@ func startTransferJob(
drivers.WithCertSecretNamespace(dataExport.Spec.Destination.Namespace),
drivers.WithJobConfigMap(jobConfigMap),
drivers.WithJobConfigMapNs(jobConfigMapNs),
drivers.WithNodeAffinity(nodeLabel),
drivers.WithNfsServer(nfsServerAddr),
drivers.WithNfsExportDir(nfsExportPath),
drivers.WithPodUserId(psaJobUid),
Expand Down
12 changes: 12 additions & 0 deletions pkg/controllers/resourceexport/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,16 @@ func startNfsResourceJob(
logrus.Errorf("failed to create NFS cred secret: %v", err)
return "", fmt.Errorf("failed to create NFS cred secret: %v", err)
}
nodeLabel := make(map[string]string)
kdmpData, err := core.Instance().GetConfigMap(jobConfigMap, jobConfigMapNs)
if err != nil {
return "", err
}
pxbJobNodeLabelValue, ok := kdmpData.Data[drivers.PxbJobNodeLabelKey]
if ok && pxbJobNodeLabelValue != "" {
nodeLabel[drivers.PxbJobNodeLabelKey] = pxbJobNodeLabelValue
}

switch drv.Name() {
case drivers.NFSBackup:
return drv.StartJob(
Expand All @@ -420,6 +430,7 @@ func startNfsResourceJob(
drivers.WithAppCRNamespace(re.Spec.Source.Namespace),
drivers.WithNamespace(re.Namespace),
drivers.WithResoureBackupName(re.Name),
drivers.WithNodeAffinity(nodeLabel),
drivers.WithResoureBackupNamespace(re.Namespace),
drivers.WithNfsMountOption(bl.Location.NFSConfig.MountOptions),
drivers.WithNfsExportDir(bl.Location.NFSConfig.SubPath),
Expand All @@ -438,6 +449,7 @@ func startNfsResourceJob(
drivers.WithAppCRNamespace(re.Spec.Source.Namespace),
drivers.WithNamespace(re.Namespace),
drivers.WithResoureBackupName(re.Name),
drivers.WithNodeAffinity(nodeLabel),
drivers.WithResoureBackupNamespace(re.Namespace),
drivers.WithNfsMountOption(bl.Location.NFSConfig.MountOptions),
drivers.WithNfsExportDir(bl.Location.NFSConfig.SubPath),
Expand Down
3 changes: 2 additions & 1 deletion pkg/drivers/drivers.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@ const (

var (
// ErrJobFailed is a know error for a data transfer job failure.
ErrJobFailed = fmt.Errorf("data transfer job failed")
ErrJobFailed = fmt.Errorf("data transfer job failed")
PxbJobNodeLabelKey = "PXB_JOB_NODE_AFFINITY_LABEL"
)

// Interface defines a data export driver behaviour.
Expand Down
49 changes: 30 additions & 19 deletions pkg/drivers/kopiabackup/kopiabackup.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ func jobFor(
jobName string,
resources corev1.ResourceRequirements,
nodeName string,
live bool,
) (*batchv1.Job, error) {
backupName := jobName

Expand Down Expand Up @@ -410,6 +411,31 @@ func jobFor(
job.Spec.Template.Spec.ImagePullSecrets = utils.ToImagePullSecret(utils.GetImageSecretName(jobName))
}

// Add node affnity to the job spec
if !live && len(jobOption.NodeAffinity) > 0 {
matchExpressions := []corev1.NodeSelectorRequirement{}
for key, val := range jobOption.NodeAffinity {
expression := corev1.NodeSelectorRequirement{
Key: key,
Operator: corev1.NodeSelectorOpIn,
Values: []string{val},
}
matchExpressions = append(matchExpressions, expression)
}

job.Spec.Template.Spec.Affinity = &corev1.Affinity{
NodeAffinity: &corev1.NodeAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{
NodeSelectorTerms: []corev1.NodeSelectorTerm{
{
MatchExpressions: matchExpressions,
},
},
},
},
}
}

if len(jobOption.NfsServer) != 0 {
volumeMount := corev1.VolumeMount{
Name: utils.NfsVolumeName,
Expand Down Expand Up @@ -505,11 +531,12 @@ func buildJob(jobName string, jobOptions drivers.JobOpts) (*batchv1.Job, error)
// get the nodeName, if the pods is in Running state, So that we can schedule
// kopia job on the same node.
nodeName = pod.Spec.NodeName
live = true
break
}
}
resourceNamespace = jobOptions.Namespace
if err := utils.SetupServiceAccount(jobName, resourceNamespace, roleFor(live)); err != nil {
if err := utils.SetupServiceAccount(jobName, resourceNamespace, roleFor()); err != nil {
errMsg := fmt.Sprintf("error creating service account %s/%s: %v", resourceNamespace, jobName, err)
logrus.Errorf("%s: %v", fn, errMsg)
return nil, fmt.Errorf(errMsg)
Expand All @@ -519,10 +546,11 @@ func buildJob(jobName string, jobOptions drivers.JobOpts) (*batchv1.Job, error)
jobName,
resources,
nodeName,
live,
)
}

func roleFor(live bool) *rbacv1.Role {
func roleFor() *rbacv1.Role {
role := &rbacv1.Role{
Rules: []rbacv1.PolicyRule{
{
Expand All @@ -532,22 +560,5 @@ func roleFor(live bool) *rbacv1.Role {
},
},
}
// Only live backup, we will add the hostaccess and privilege option.
if live {
hostAccessRule := rbacv1.PolicyRule{
APIGroups: []string{"security.openshift.io"},
Resources: []string{"securitycontextconstraints"},
ResourceNames: []string{"hostaccess"},
Verbs: []string{"use"},
}
role.Rules = append(role.Rules, hostAccessRule)
PrivilegedRule := rbacv1.PolicyRule{
APIGroups: []string{"security.openshift.io"},
Resources: []string{"securitycontextconstraints"},
ResourceNames: []string{"privileged"},
Verbs: []string{"use"},
}
role.Rules = append(role.Rules, PrivilegedRule)
}
return role
}
23 changes: 23 additions & 0 deletions pkg/drivers/kopiarestore/kopiarestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,29 @@ func jobFor(
if len(imageRegistrySecret) != 0 {
job.Spec.Template.Spec.ImagePullSecrets = utils.ToImagePullSecret(utils.GetImageSecretName(jobName))
}
if len(jobOption.NodeAffinity) > 0 {
matchExpressions := []corev1.NodeSelectorRequirement{}
for key, val := range jobOption.NodeAffinity {
expression := corev1.NodeSelectorRequirement{
Key: key,
Operator: corev1.NodeSelectorOpIn,
Values: []string{val},
}
matchExpressions = append(matchExpressions, expression)
}

job.Spec.Template.Spec.Affinity = &corev1.Affinity{
NodeAffinity: &corev1.NodeAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{
NodeSelectorTerms: []corev1.NodeSelectorTerm{
{
MatchExpressions: matchExpressions,
},
},
},
},
}
}

if drivers.CertFilePath != "" {
volumeMount := corev1.VolumeMount{
Expand Down
25 changes: 25 additions & 0 deletions pkg/drivers/nfsbackup/nfsbackup.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,31 @@ func jobForBackupResource(
return nil, err
}

// Add node affnity to the job spec
if len(jobOption.NodeAffinity) > 0 {
matchExpressions := []corev1.NodeSelectorRequirement{}
for key, val := range jobOption.NodeAffinity {
expression := corev1.NodeSelectorRequirement{
Key: key,
Operator: corev1.NodeSelectorOpIn,
Values: []string{val},
}
matchExpressions = append(matchExpressions, expression)
}

job.Spec.Template.Spec.Affinity = &corev1.Affinity{
NodeAffinity: &corev1.NodeAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{
NodeSelectorTerms: []corev1.NodeSelectorTerm{
{
MatchExpressions: matchExpressions,
},
},
},
},
}
}

// Add the image secret in job spec only if it is present in the stork deployment.
if len(imageRegistrySecret) != 0 {
job.Spec.Template.Spec.ImagePullSecrets = utils.ToImagePullSecret(utils.GetImageSecretName(jobOption.RestoreExportName))
Expand Down
26 changes: 26 additions & 0 deletions pkg/drivers/nfsrestore/nfsrestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,32 @@ func jobForRestoreResource(
if err != nil {
return nil, err
}

// Add node affnity to the job spec
if len(jobOption.NodeAffinity) > 0 {
matchExpressions := []corev1.NodeSelectorRequirement{}
for key, val := range jobOption.NodeAffinity {
expression := corev1.NodeSelectorRequirement{
Key: key,
Operator: corev1.NodeSelectorOpIn,
Values: []string{val},
}
matchExpressions = append(matchExpressions, expression)
}

job.Spec.Template.Spec.Affinity = &corev1.Affinity{
NodeAffinity: &corev1.NodeAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{
NodeSelectorTerms: []corev1.NodeSelectorTerm{
{
MatchExpressions: matchExpressions,
},
},
},
},
}
}

// Add the image secret in job spec only if it is present in the stork deployment.
if len(imageRegistrySecret) != 0 {
job.Spec.Template.Spec.ImagePullSecrets = utils.ToImagePullSecret(utils.GetImageSecretName(jobOption.RestoreExportName))
Expand Down

0 comments on commit c320671

Please sign in to comment.