Skip to content

Commit

Permalink
fix(env-check): added volume config parameters for RunJob utility
Browse files Browse the repository at this point in the history
  • Loading branch information
powerfooI committed Apr 23, 2024
1 parent 89709c1 commit 4cfaf28
Show file tree
Hide file tree
Showing 6 changed files with 608 additions and 467 deletions.
55 changes: 54 additions & 1 deletion internal/resource/obcluster/obcluster_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
kubeerrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -989,7 +990,59 @@ func WaitOBZoneRunning(m *OBClusterManager) tasktypes.TaskError {
}

func CheckEnvironment(m *OBClusterManager) tasktypes.TaskError {
_, exitCode, err := resourceutils.RunJob(m.Ctx, m.Client, m.Logger, m.OBCluster.Namespace, "check-fs", m.OBCluster.Spec.OBServerTemplate.Image, "/home/admin/oceanbase/bin/oceanbase-helper env-check storage "+oceanbaseconst.ClogPath)
// Create PVC
volumeName := "check-clog-volume"
claimName := "check-clog-claim"
storageSpec := m.OBCluster.Spec.OBServerTemplate.Storage.RedoLogStorage
requestsResources := corev1.ResourceList{}
// Try fallocate to check if the filesystem meet the requirement.
// The checker requires 4Mi space, we set the request to 64Mi for safety.
requestsResources["storage"] = resource.MustParse("64Mi")
pvc := &corev1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Name: claimName,
Namespace: m.OBCluster.Namespace,
OwnerReferences: []metav1.OwnerReference{{
APIVersion: m.OBCluster.APIVersion,
Kind: m.OBCluster.Kind,
Name: m.OBCluster.Name,
UID: m.OBCluster.UID,
}},
},
Spec: corev1.PersistentVolumeClaimSpec{
AccessModes: []corev1.PersistentVolumeAccessMode{corev1.ReadWriteOnce},
Resources: corev1.ResourceRequirements{
Requests: requestsResources,
},
StorageClassName: &storageSpec.StorageClass,
},
}
err := m.Client.Create(m.Ctx, pvc)
if err != nil {
return errors.Wrap(err, "Create pvc for checking storage")
}
// Assemble volumeConfigs
volumeConfigs := resourceutils.JobContainerVolumes{
Volumes: []corev1.Volume{{
Name: volumeName,
VolumeSource: corev1.VolumeSource{
PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
ClaimName: claimName,
},
},
}},
VolumeMounts: []corev1.VolumeMount{{
Name: volumeName,
MountPath: oceanbaseconst.ClogPath,
}},
}
_, exitCode, err := resourceutils.RunJob(
m.Ctx, m.Client, m.Logger, m.OBCluster.Namespace,
"check-fs",
m.OBCluster.Spec.OBServerTemplate.Image,
"/home/admin/oceanbase/bin/oceanbase-helper env-check storage "+oceanbaseconst.ClogPath,
volumeConfigs,
)
// exit code 1 means the image version does not support the env-check command, just ignore it and try
if err != nil && exitCode != 1 {
return errors.Wrap(err, "Check filesystem")
Expand Down
101 changes: 101 additions & 0 deletions internal/resource/utils/annotations.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
Copyright (c) 2023 OceanBase
ob-operator is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details.
*/

package utils

import (
"context"
"fmt"
"strings"

"github.com/go-logr/logr"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/oceanbase/ob-operator/api/v1alpha1"
oceanbaseconst "github.com/oceanbase/ob-operator/internal/const/oceanbase"
)

func GetCNIFromAnnotation(pod *corev1.Pod) string {
_, found := pod.Annotations[oceanbaseconst.AnnotationCalicoValidate]
if found {
return oceanbaseconst.CNICalico
}
return oceanbaseconst.CNIUnknown
}

func NeedAnnotation(pod *corev1.Pod, cni string) bool {
switch cni {
case oceanbaseconst.CNICalico:
_, found := pod.Annotations[oceanbaseconst.AnnotationCalicoIpAddrs]
return !found
default:
return false
}
}

// GetTenantRestoreSource gets restore source from tenant CR. If tenantCR is in form of ns/name, the parameter ns is ignored.
func GetTenantRestoreSource(ctx context.Context, clt client.Client, logger *logr.Logger, ns, tenantCR string) (string, error) {
finalNs := ns
finalTenantCR := tenantCR
splits := strings.Split(tenantCR, "/")
if len(splits) == 2 {
finalNs = splits[0]
finalTenantCR = splits[1]
}
var restoreSource string
var err error

primary := &v1alpha1.OBTenant{}
err = clt.Get(ctx, types.NamespacedName{
Namespace: finalNs,
Name: finalTenantCR,
}, primary)
if err != nil {
if client.IgnoreNotFound(err) != nil {
return "", err
}
} else {
obcluster := &v1alpha1.OBCluster{}
err := clt.Get(ctx, types.NamespacedName{
Namespace: finalNs,
Name: primary.Spec.ClusterName,
}, obcluster)
if err != nil {
return "", errors.Wrap(err, "get obcluster")
}
con, err := GetSysOperationClient(clt, logger, obcluster)
if err != nil {
return "", errors.Wrap(err, "get oceanbase operation manager")
}
// Get ip_list from primary tenant
aps, err := con.ListTenantAccessPoints(primary.Spec.TenantName)
if err != nil {
return "", err
}
ipList := make([]string, 0)
for _, ap := range aps {
ipList = append(ipList, fmt.Sprintf("%s:%d", ap.SvrIP, ap.SqlPort))
}
standbyRoPwd, err := ReadPassword(clt, ns, primary.Status.Credentials.StandbyRO)
if err != nil {
logger.Error(err, "Failed to read standby ro password")
return "", err
}
// Set restore source
restoreSource = fmt.Sprintf("SERVICE=%s USER=%s@%s PASSWORD=%s", strings.Join(ipList, ";"), oceanbaseconst.StandbyROUser, primary.Spec.TenantName, standbyRoPwd)
}

return restoreSource, nil
}
160 changes: 160 additions & 0 deletions internal/resource/utils/connections.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
/*
Copyright (c) 2023 OceanBase
ob-operator is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details.
*/

package utils

import (
"context"
"strconv"
"strings"

"github.com/go-logr/logr"
"github.com/pkg/errors"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/oceanbase/ob-operator/api/v1alpha1"
oceanbaseconst "github.com/oceanbase/ob-operator/internal/const/oceanbase"
clusterstatus "github.com/oceanbase/ob-operator/internal/const/status/obcluster"
"github.com/oceanbase/ob-operator/pkg/oceanbase-sdk/connector"
"github.com/oceanbase/ob-operator/pkg/oceanbase-sdk/operation"
)

func GetSysOperationClient(c client.Client, logger *logr.Logger, obcluster *v1alpha1.OBCluster) (*operation.OceanbaseOperationManager, error) {
logger.V(oceanbaseconst.LogLevelTrace).Info("Get cluster sys client", "obCluster", obcluster)
var manager *operation.OceanbaseOperationManager
var err error
_, migrateAnnoExist := GetAnnotationField(obcluster, oceanbaseconst.AnnotationsSourceClusterAddress)
if migrateAnnoExist && obcluster.Status.Status == clusterstatus.MigrateFromExisting {
manager, err = getSysClientFromSourceCluster(c, logger, obcluster, oceanbaseconst.RootUser, oceanbaseconst.SysTenant, obcluster.Spec.UserSecrets.Root)
} else {
manager, err = getSysClient(c, logger, obcluster, oceanbaseconst.OperatorUser, oceanbaseconst.SysTenant, obcluster.Spec.UserSecrets.Operator)
}
return manager, err
}

func GetTenantRootOperationClient(c client.Client, logger *logr.Logger, obcluster *v1alpha1.OBCluster, tenantName, credential string) (*operation.OceanbaseOperationManager, error) {
logger.V(oceanbaseconst.LogLevelTrace).Info("Get tenant root client", "obCluster", obcluster, "tenantName", tenantName, "credential", credential)
observerList := &v1alpha1.OBServerList{}
err := c.List(context.Background(), observerList, client.MatchingLabels{
oceanbaseconst.LabelRefOBCluster: obcluster.Name,
}, client.InNamespace(obcluster.Namespace))
if err != nil {
return nil, errors.Wrap(err, "Get observer list")
}
if len(observerList.Items) == 0 {
return nil, errors.Errorf("No observer belongs to cluster %s", obcluster.Name)
}
var password string
if credential != "" {
password, err = ReadPassword(c, obcluster.Namespace, credential)
if err != nil {
return nil, errors.Wrapf(err, "Read password to get oceanbase operation manager of cluster %s", obcluster.Name)
}
}

var s *connector.OceanBaseDataSource
for _, observer := range observerList.Items {
address := observer.Status.GetConnectAddr()
switch obcluster.Status.Status {
case clusterstatus.New:
return nil, errors.New("Cluster is not bootstrapped")
case clusterstatus.Bootstrapped:
return nil, errors.New("Cluster is not initialized")
default:
s = connector.NewOceanBaseDataSource(address, oceanbaseconst.SqlPort, oceanbaseconst.RootUser, tenantName, password, oceanbaseconst.DefaultDatabase)
}
// if err is nil, db connection is already checked available
rootClient, err := operation.GetOceanbaseOperationManager(s)
if err == nil && rootClient != nil {
rootClient.Logger = logger
return rootClient, nil
}
// err is not nil, try to use empty password
s = connector.NewOceanBaseDataSource(address, oceanbaseconst.SqlPort, oceanbaseconst.RootUser, tenantName, "", oceanbaseconst.DefaultDatabase)
rootClient, err = operation.GetOceanbaseOperationManager(s)
if err == nil && rootClient != nil {
rootClient.Logger = logger
return rootClient, nil
}
}
return nil, errors.Errorf("Can not get root operation client of tenant %s in obcluster %s after checked all servers", tenantName, obcluster.Name)
}

func getSysClientFromSourceCluster(c client.Client, logger *logr.Logger, obcluster *v1alpha1.OBCluster, userName, tenantName, secretName string) (*operation.OceanbaseOperationManager, error) {
sysClient, err := getSysClient(c, logger, obcluster, userName, tenantName, secretName)
if err == nil {
return sysClient, nil
}
password, err := ReadPassword(c, obcluster.Namespace, secretName)
if err != nil {
return nil, errors.Wrapf(err, "Read password to get oceanbase operation manager of cluster %s", obcluster.Name)
}
// when obcluster is under migrating, use address from annotation
migrateAnnoVal, _ := GetAnnotationField(obcluster, oceanbaseconst.AnnotationsSourceClusterAddress)
servers := strings.Split(migrateAnnoVal, ";")
for _, server := range servers {
addressParts := strings.Split(server, ":")
if len(addressParts) != 2 {
return nil, errors.New("Parse oceanbase cluster connect address failed")
}
sqlPort, err := strconv.ParseInt(addressParts[1], 10, 64)
if err != nil {
return nil, errors.New("Parse sql port of obcluster failed")
}
s := connector.NewOceanBaseDataSource(addressParts[0], sqlPort, userName, tenantName, password, oceanbaseconst.DefaultDatabase)
// if err is nil, db connection is already checked available
sysClient, err := operation.GetOceanbaseOperationManager(s)
if err == nil && sysClient != nil {
sysClient.Logger = logger
return sysClient, nil
}
logger.Error(err, "Get operation manager from existing obcluster")
}
return nil, errors.Errorf("Failed to get sys client from existing obcluster, address: %s", migrateAnnoVal)
}

func getSysClient(c client.Client, logger *logr.Logger, obcluster *v1alpha1.OBCluster, userName, tenantName, secretName string) (*operation.OceanbaseOperationManager, error) {
observerList := &v1alpha1.OBServerList{}
err := c.List(context.Background(), observerList, client.MatchingLabels{
oceanbaseconst.LabelRefOBCluster: obcluster.Name,
}, client.InNamespace(obcluster.Namespace))
if err != nil {
return nil, errors.Wrap(err, "Get observer list")
}
if len(observerList.Items) == 0 {
return nil, errors.Errorf("No observer belongs to cluster %s", obcluster.Name)
}

var s *connector.OceanBaseDataSource
password, err := ReadPassword(c, obcluster.Namespace, secretName)
if err != nil {
return nil, errors.Wrapf(err, "Read password to get oceanbase operation manager of cluster %s", obcluster.Name)
}
for _, observer := range observerList.Items {
address := observer.Status.GetConnectAddr()
switch obcluster.Status.Status {
case clusterstatus.New:
s = connector.NewOceanBaseDataSource(address, oceanbaseconst.SqlPort, oceanbaseconst.RootUser, tenantName, "", "")
case clusterstatus.Bootstrapped:
s = connector.NewOceanBaseDataSource(address, oceanbaseconst.SqlPort, oceanbaseconst.RootUser, tenantName, "", oceanbaseconst.DefaultDatabase)
default:
s = connector.NewOceanBaseDataSource(address, oceanbaseconst.SqlPort, userName, tenantName, password, oceanbaseconst.DefaultDatabase)
}
// if err is nil, db connection is already checked available
sysClient, err := operation.GetOceanbaseOperationManager(s)
if err == nil && sysClient != nil {
sysClient.Logger = logger
return sysClient, nil
}
}
return nil, errors.Errorf("Can not get oceanbase operation manager of obcluster %s after checked all servers", obcluster.Name)
}
Loading

0 comments on commit 4cfaf28

Please sign in to comment.