Skip to content

Commit

Permalink
Added volume config parameters for RunJob utility (#327)
Browse files Browse the repository at this point in the history
  • Loading branch information
powerfooI authored Apr 24, 2024
1 parent 22c9018 commit 76c2044
Show file tree
Hide file tree
Showing 7 changed files with 632 additions and 467 deletions.
61 changes: 60 additions & 1 deletion internal/resource/obcluster/obcluster_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -989,7 +989,66 @@ func WaitOBZoneRunning(m *OBClusterManager) tasktypes.TaskError {
}

func CheckEnvironment(m *OBClusterManager) tasktypes.TaskError {
_, exitCode, err := resourceutils.RunJob(m.Ctx, m.Client, m.Logger, m.OBCluster.Namespace, "check-fs", m.OBCluster.Spec.OBServerTemplate.Image, "/home/admin/oceanbase/bin/oceanbase-helper env-check storage "+oceanbaseconst.ClogPath)
volumeName := m.OBCluster.Name + "check-clog-volume-" + rand.String(6)
claimName := m.OBCluster.Name + "check-clog-claim-" + rand.String(6)
jobName := m.OBCluster.Name + "-check-fs-" + rand.String(6)
// Create PVC
storageSpec := m.OBCluster.Spec.OBServerTemplate.Storage.RedoLogStorage
requestsResources := corev1.ResourceList{}
// Try fallocate to check if the filesystem meet the requirement.
// The checker requires 4Mi space, we set the request to 64Mi for safety.
requestsResources["storage"] = storageSpec.Size
pvc := &corev1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Name: claimName,
Namespace: m.OBCluster.Namespace,
OwnerReferences: []metav1.OwnerReference{{
APIVersion: m.OBCluster.APIVersion,
Kind: m.OBCluster.Kind,
Name: m.OBCluster.Name,
UID: m.OBCluster.UID,
}},
},
Spec: corev1.PersistentVolumeClaimSpec{
AccessModes: []corev1.PersistentVolumeAccessMode{corev1.ReadWriteOnce},
Resources: corev1.ResourceRequirements{
Requests: requestsResources,
},
StorageClassName: &storageSpec.StorageClass,
},
}
err := m.Client.Create(m.Ctx, pvc)
if err != nil {
return errors.Wrap(err, "Create pvc for checking storage")
}
defer func() {
err = m.Client.Delete(m.Ctx, pvc)
if err != nil {
m.Logger.Info("Failed to delete pvc for checking storage")
}
}()
// Assemble volumeConfigs
volumeConfigs := resourceutils.JobContainerVolumes{
Volumes: []corev1.Volume{{
Name: volumeName,
VolumeSource: corev1.VolumeSource{
PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
ClaimName: claimName,
},
},
}},
VolumeMounts: []corev1.VolumeMount{{
Name: volumeName,
MountPath: oceanbaseconst.ClogPath,
}},
}
_, exitCode, err := resourceutils.RunJob(
m.Ctx, m.Client, m.Logger, m.OBCluster.Namespace,
jobName,
m.OBCluster.Spec.OBServerTemplate.Image,
"/home/admin/oceanbase/bin/oceanbase-helper env-check storage "+oceanbaseconst.ClogPath,
volumeConfigs,
)
// exit code 1 means the image version does not support the env-check command, just ignore it and try
if err != nil && exitCode != 1 {
return errors.Wrap(err, "Check filesystem")
Expand Down
37 changes: 37 additions & 0 deletions internal/resource/utils/annotations.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
Copyright (c) 2023 OceanBase
ob-operator is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details.
*/

package utils

import (
corev1 "k8s.io/api/core/v1"

oceanbaseconst "github.com/oceanbase/ob-operator/internal/const/oceanbase"
)

func GetCNIFromAnnotation(pod *corev1.Pod) string {
_, found := pod.Annotations[oceanbaseconst.AnnotationCalicoValidate]
if found {
return oceanbaseconst.CNICalico
}
return oceanbaseconst.CNIUnknown
}

func NeedAnnotation(pod *corev1.Pod, cni string) bool {
switch cni {
case oceanbaseconst.CNICalico:
_, found := pod.Annotations[oceanbaseconst.AnnotationCalicoIpAddrs]
return !found
default:
return false
}
}
160 changes: 160 additions & 0 deletions internal/resource/utils/connections.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
/*
Copyright (c) 2023 OceanBase
ob-operator is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details.
*/

package utils

import (
"context"
"strconv"
"strings"

"github.com/go-logr/logr"
"github.com/pkg/errors"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/oceanbase/ob-operator/api/v1alpha1"
oceanbaseconst "github.com/oceanbase/ob-operator/internal/const/oceanbase"
clusterstatus "github.com/oceanbase/ob-operator/internal/const/status/obcluster"
"github.com/oceanbase/ob-operator/pkg/oceanbase-sdk/connector"
"github.com/oceanbase/ob-operator/pkg/oceanbase-sdk/operation"
)

func GetSysOperationClient(c client.Client, logger *logr.Logger, obcluster *v1alpha1.OBCluster) (*operation.OceanbaseOperationManager, error) {
logger.V(oceanbaseconst.LogLevelTrace).Info("Get cluster sys client", "obCluster", obcluster)
var manager *operation.OceanbaseOperationManager
var err error
_, migrateAnnoExist := GetAnnotationField(obcluster, oceanbaseconst.AnnotationsSourceClusterAddress)
if migrateAnnoExist && obcluster.Status.Status == clusterstatus.MigrateFromExisting {
manager, err = getSysClientFromSourceCluster(c, logger, obcluster, oceanbaseconst.RootUser, oceanbaseconst.SysTenant, obcluster.Spec.UserSecrets.Root)
} else {
manager, err = getSysClient(c, logger, obcluster, oceanbaseconst.OperatorUser, oceanbaseconst.SysTenant, obcluster.Spec.UserSecrets.Operator)
}
return manager, err
}

func GetTenantRootOperationClient(c client.Client, logger *logr.Logger, obcluster *v1alpha1.OBCluster, tenantName, credential string) (*operation.OceanbaseOperationManager, error) {
logger.V(oceanbaseconst.LogLevelTrace).Info("Get tenant root client", "obCluster", obcluster, "tenantName", tenantName, "credential", credential)
observerList := &v1alpha1.OBServerList{}
err := c.List(context.Background(), observerList, client.MatchingLabels{
oceanbaseconst.LabelRefOBCluster: obcluster.Name,
}, client.InNamespace(obcluster.Namespace))
if err != nil {
return nil, errors.Wrap(err, "Get observer list")
}
if len(observerList.Items) == 0 {
return nil, errors.Errorf("No observer belongs to cluster %s", obcluster.Name)
}
var password string
if credential != "" {
password, err = ReadPassword(c, obcluster.Namespace, credential)
if err != nil {
return nil, errors.Wrapf(err, "Read password to get oceanbase operation manager of cluster %s", obcluster.Name)
}
}

var s *connector.OceanBaseDataSource
for _, observer := range observerList.Items {
address := observer.Status.GetConnectAddr()
switch obcluster.Status.Status {
case clusterstatus.New:
return nil, errors.New("Cluster is not bootstrapped")
case clusterstatus.Bootstrapped:
return nil, errors.New("Cluster is not initialized")
default:
s = connector.NewOceanBaseDataSource(address, oceanbaseconst.SqlPort, oceanbaseconst.RootUser, tenantName, password, oceanbaseconst.DefaultDatabase)
}
// if err is nil, db connection is already checked available
rootClient, err := operation.GetOceanbaseOperationManager(s)
if err == nil && rootClient != nil {
rootClient.Logger = logger
return rootClient, nil
}
// err is not nil, try to use empty password
s = connector.NewOceanBaseDataSource(address, oceanbaseconst.SqlPort, oceanbaseconst.RootUser, tenantName, "", oceanbaseconst.DefaultDatabase)
rootClient, err = operation.GetOceanbaseOperationManager(s)
if err == nil && rootClient != nil {
rootClient.Logger = logger
return rootClient, nil
}
}
return nil, errors.Errorf("Can not get root operation client of tenant %s in obcluster %s after checked all servers", tenantName, obcluster.Name)
}

func getSysClientFromSourceCluster(c client.Client, logger *logr.Logger, obcluster *v1alpha1.OBCluster, userName, tenantName, secretName string) (*operation.OceanbaseOperationManager, error) {
sysClient, err := getSysClient(c, logger, obcluster, userName, tenantName, secretName)
if err == nil {
return sysClient, nil
}
password, err := ReadPassword(c, obcluster.Namespace, secretName)
if err != nil {
return nil, errors.Wrapf(err, "Read password to get oceanbase operation manager of cluster %s", obcluster.Name)
}
// when obcluster is under migrating, use address from annotation
migrateAnnoVal, _ := GetAnnotationField(obcluster, oceanbaseconst.AnnotationsSourceClusterAddress)
servers := strings.Split(migrateAnnoVal, ";")
for _, server := range servers {
addressParts := strings.Split(server, ":")
if len(addressParts) != 2 {
return nil, errors.New("Parse oceanbase cluster connect address failed")
}
sqlPort, err := strconv.ParseInt(addressParts[1], 10, 64)
if err != nil {
return nil, errors.New("Parse sql port of obcluster failed")
}
s := connector.NewOceanBaseDataSource(addressParts[0], sqlPort, userName, tenantName, password, oceanbaseconst.DefaultDatabase)
// if err is nil, db connection is already checked available
sysClient, err := operation.GetOceanbaseOperationManager(s)
if err == nil && sysClient != nil {
sysClient.Logger = logger
return sysClient, nil
}
logger.Error(err, "Get operation manager from existing obcluster")
}
return nil, errors.Errorf("Failed to get sys client from existing obcluster, address: %s", migrateAnnoVal)
}

func getSysClient(c client.Client, logger *logr.Logger, obcluster *v1alpha1.OBCluster, userName, tenantName, secretName string) (*operation.OceanbaseOperationManager, error) {
observerList := &v1alpha1.OBServerList{}
err := c.List(context.Background(), observerList, client.MatchingLabels{
oceanbaseconst.LabelRefOBCluster: obcluster.Name,
}, client.InNamespace(obcluster.Namespace))
if err != nil {
return nil, errors.Wrap(err, "Get observer list")
}
if len(observerList.Items) == 0 {
return nil, errors.Errorf("No observer belongs to cluster %s", obcluster.Name)
}

var s *connector.OceanBaseDataSource
password, err := ReadPassword(c, obcluster.Namespace, secretName)
if err != nil {
return nil, errors.Wrapf(err, "Read password to get oceanbase operation manager of cluster %s", obcluster.Name)
}
for _, observer := range observerList.Items {
address := observer.Status.GetConnectAddr()
switch obcluster.Status.Status {
case clusterstatus.New:
s = connector.NewOceanBaseDataSource(address, oceanbaseconst.SqlPort, oceanbaseconst.RootUser, tenantName, "", "")
case clusterstatus.Bootstrapped:
s = connector.NewOceanBaseDataSource(address, oceanbaseconst.SqlPort, oceanbaseconst.RootUser, tenantName, "", oceanbaseconst.DefaultDatabase)
default:
s = connector.NewOceanBaseDataSource(address, oceanbaseconst.SqlPort, userName, tenantName, password, oceanbaseconst.DefaultDatabase)
}
// if err is nil, db connection is already checked available
sysClient, err := operation.GetOceanbaseOperationManager(s)
if err == nil && sysClient != nil {
sysClient.Logger = logger
return sysClient, nil
}
}
return nil, errors.Errorf("Can not get oceanbase operation manager of obcluster %s after checked all servers", obcluster.Name)
}
Loading

0 comments on commit 76c2044

Please sign in to comment.