diff --git a/config/default/kustomization.yaml b/config/default/kustomization.yaml index bc13e3dc0..749e98ef9 100644 --- a/config/default/kustomization.yaml +++ b/config/default/kustomization.yaml @@ -31,7 +31,7 @@ patchesStrategicMerge: # If you want your controller-manager to expose the /metrics # endpoint w/o any authn/z, please comment the following line. - manager_auth_proxy_patch.yaml - +- manager_config_patch.yaml # [WEBHOOK] To enable webhook, uncomment all the sections with [WEBHOOK] prefix including the one in diff --git a/config/default/manager_auth_proxy_patch.yaml b/config/default/manager_auth_proxy_patch.yaml index f06445fee..14f6112f3 100644 --- a/config/default/manager_auth_proxy_patch.yaml +++ b/config/default/manager_auth_proxy_patch.yaml @@ -8,22 +8,6 @@ metadata: spec: template: spec: - affinity: - nodeAffinity: - requiredDuringSchedulingIgnoredDuringExecution: - nodeSelectorTerms: - - matchExpressions: - - key: kubernetes.io/arch - operator: In - values: - - amd64 - - arm64 - - ppc64le - - s390x - - key: kubernetes.io/os - operator: In - values: - - linux containers: - name: kube-rbac-proxy securityContext: diff --git a/config/manager/kustomization.yaml b/config/manager/kustomization.yaml index bbe1337ed..7f0a32564 100644 --- a/config/manager/kustomization.yaml +++ b/config/manager/kustomization.yaml @@ -5,4 +5,4 @@ kind: Kustomization images: - name: controller newName: oceanbasedev/ob-operator - newTag: 2.0.1-alpha.1 + newTag: 2.0.1-alpha.2 diff --git a/config/manager/manager.yaml b/config/manager/manager.yaml index f6fc5cb01..c16d052b2 100644 --- a/config/manager/manager.yaml +++ b/config/manager/manager.yaml @@ -40,22 +40,28 @@ spec: # according to the platforms which are supported by your solution. # It is considered best practice to support multiple architectures. You can # build your manager image using the makefile target docker-buildx. - # affinity: - # nodeAffinity: - # requiredDuringSchedulingIgnoredDuringExecution: - # nodeSelectorTerms: - # - matchExpressions: - # - key: kubernetes.io/arch - # operator: In - # values: - # - amd64 - # - arm64 - # - ppc64le - # - s390x - # - key: kubernetes.io/os - # operator: In - # values: - # - linux + affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: kubernetes.io/arch + operator: In + values: + - amd64 + - arm64 + - ppc64le + - s390x + - key: kubernetes.io/os + operator: In + values: + - linux + - key: kubernetes.io/hostname + operator: In + values: + - sqaappnoxdnv62s2011161204053.sa128 + - key: node-role.kubernetes.io/master + operator: Exists securityContext: runAsNonRoot: true # TODO(user): For common cases that do not require escalating privileges @@ -72,6 +78,11 @@ spec: - --leader-elect image: controller:latest name: manager + env: + # - name: TELEMETRY_DEBUG + # value: "true" + - name: TELEMETRY_SIGNATURE + value: "dbe97393a695335d67de91dd4049ba" securityContext: allowPrivilegeEscalation: false capabilities: @@ -93,7 +104,7 @@ spec: # More info: https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/ resources: limits: - cpu: 1 + cpu: "1" memory: 1Gi requests: cpu: 10m diff --git a/deploy/operator.yaml b/deploy/operator.yaml index 8a2fd498d..1339e1a90 100644 --- a/deploy/operator.yaml +++ b/deploy/operator.yaml @@ -11963,6 +11963,14 @@ rules: verbs: - create - patch +- apiGroups: + - "" + resources: + - nodes + verbs: + - get + - list + - watch - apiGroups: - "" resources: @@ -12614,6 +12622,12 @@ spec: operator: In values: - linux + - key: kubernetes.io/hostname + operator: In + values: + - sqaappnoxdnv62s2011161204053.sa128 + - key: node-role.kubernetes.io/master + operator: Exists containers: - args: - --health-probe-bind-address=:8081 @@ -12622,7 +12636,10 @@ spec: - --manager-namespace=oceanbase-system command: - /manager - image: oceanbasedev/ob-operator:2.0.1-alpha.1 + env: + - name: TELEMETRY_SIGNATURE + value: dbe97393a695335d67de91dd4049ba + image: oceanbasedev/ob-operator:2.0.1-alpha.2 livenessProbe: httpGet: path: /healthz @@ -12642,7 +12659,7 @@ spec: periodSeconds: 10 resources: limits: - cpu: 1 + cpu: "1" memory: 1Gi requests: cpu: 10m diff --git a/deploy/tenant.yaml b/deploy/tenant.yaml index d7f89fac8..f966da02e 100644 --- a/deploy/tenant.yaml +++ b/deploy/tenant.yaml @@ -2,7 +2,7 @@ apiVersion: oceanbase.oceanbase.com/v1alpha1 kind: OBTenant metadata: name: t1 - namespace: oceanbase + # namespace: oceanbase spec: obcluster: test tenantName: t1 diff --git a/deploy/tenant_restore.yaml b/deploy/tenant_restore.yaml index 536e3b51b..450eec094 100644 --- a/deploy/tenant_restore.yaml +++ b/deploy/tenant_restore.yaml @@ -16,8 +16,6 @@ spec: standbyRo: t1s-ro source: restore: - # sourceUri: "file:///ob-backup/t1/data_backup_custom1,file:///ob-backup/t1/log_archive_custom1" - # path in xxxSource should archiveSource: type: NFS path: "t1/log_archive_custom" diff --git a/pkg/controller/obcluster_controller.go b/pkg/controller/obcluster_controller.go index 2c6f64305..f16385306 100644 --- a/pkg/controller/obcluster_controller.go +++ b/pkg/controller/obcluster_controller.go @@ -29,6 +29,7 @@ import ( v1alpha1 "github.com/oceanbase/ob-operator/api/v1alpha1" "github.com/oceanbase/ob-operator/pkg/resource" + "github.com/oceanbase/ob-operator/pkg/telemetry" ) // OBClusterReconciler reconciles a OBCluster object @@ -84,6 +85,7 @@ func (r *OBClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( Client: r.Client, Recorder: r.Recorder, Logger: &logger, + Telemetry: telemetry.NewTelemetry(ctx, r.Recorder), } coordinator := resource.NewCoordinator(obclusterManager, &logger) return coordinator.Coordinate() diff --git a/pkg/controller/obparameter_controller.go b/pkg/controller/obparameter_controller.go index d77e2af8f..667042ab0 100644 --- a/pkg/controller/obparameter_controller.go +++ b/pkg/controller/obparameter_controller.go @@ -28,6 +28,7 @@ import ( v1alpha1 "github.com/oceanbase/ob-operator/api/v1alpha1" "github.com/oceanbase/ob-operator/pkg/resource" + "github.com/oceanbase/ob-operator/pkg/telemetry" ) // OBParameterReconciler reconciles a OBParameter object @@ -72,6 +73,7 @@ func (r *OBParameterReconciler) Reconcile(ctx context.Context, req ctrl.Request) Client: r.Client, Recorder: r.Recorder, Logger: &logger, + Telemetry: telemetry.NewTelemetry(ctx, r.Recorder), } coordinator := resource.NewCoordinator(obparameterManager, &logger) return coordinator.Coordinate() diff --git a/pkg/controller/observer_controller.go b/pkg/controller/observer_controller.go index 731c2c67a..943265858 100644 --- a/pkg/controller/observer_controller.go +++ b/pkg/controller/observer_controller.go @@ -32,6 +32,7 @@ import ( v1alpha1 "github.com/oceanbase/ob-operator/api/v1alpha1" "github.com/oceanbase/ob-operator/pkg/resource" + "github.com/oceanbase/ob-operator/pkg/telemetry" ) // OBServerReconciler reconciles a OBServer object @@ -77,11 +78,12 @@ func (r *OBServerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c // create observer manager observerManager := &resource.OBServerManager{ - Ctx: ctx, - OBServer: observer, - Client: r.Client, - Recorder: r.Recorder, - Logger: &logger, + Ctx: ctx, + OBServer: observer, + Client: r.Client, + Recorder: r.Recorder, + Logger: &logger, + Telemetry: telemetry.NewTelemetry(ctx, r.Recorder), } // execute finalizers diff --git a/pkg/controller/obtenant_controller.go b/pkg/controller/obtenant_controller.go index d8b02d4df..8a64c8b2c 100644 --- a/pkg/controller/obtenant_controller.go +++ b/pkg/controller/obtenant_controller.go @@ -27,6 +27,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/log" "github.com/oceanbase/ob-operator/pkg/resource" + "github.com/oceanbase/ob-operator/pkg/telemetry" "github.com/oceanbase/ob-operator/pkg/util/codec" v1alpha1 "github.com/oceanbase/ob-operator/api/v1alpha1" @@ -84,11 +85,12 @@ func (r *OBTenantReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c // create observer manager obtenantManager := &resource.OBTenantManager{ - Ctx: ctx, - OBTenant: obtenant, - Client: r.Client, - Recorder: r.Recorder, - Logger: &logger, + Ctx: ctx, + OBTenant: obtenant, + Client: r.Client, + Recorder: r.Recorder, + Logger: &logger, + Telemetry: telemetry.NewTelemetry(ctx, r.Recorder), } coordinator := resource.NewCoordinator(obtenantManager, &logger) diff --git a/pkg/controller/obtenantbackup_controller.go b/pkg/controller/obtenantbackup_controller.go index 2162746f7..54f4772de 100644 --- a/pkg/controller/obtenantbackup_controller.go +++ b/pkg/controller/obtenantbackup_controller.go @@ -19,16 +19,19 @@ package controller import ( "context" "fmt" + "sync" "time" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/record" + "k8s.io/client-go/util/retry" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/log" "github.com/oceanbase/ob-operator/pkg/resource" + "github.com/oceanbase/ob-operator/pkg/telemetry" "github.com/pkg/errors" @@ -41,10 +44,12 @@ import ( // OBTenantBackupReconciler reconciles a OBTenantBackup object type OBTenantBackupReconciler struct { client.Client - Scheme *runtime.Scheme - Recorder record.EventRecorder + Scheme *runtime.Scheme + Recorder record.EventRecorder + Telemetry telemetry.Telemetry - con *operation.OceanbaseOperationManager + telemetryOnce sync.Once + con *operation.OceanbaseOperationManager } //+kubebuilder:rbac:groups=oceanbase.oceanbase.com,resources=obtenantbackups,verbs=get;list;watch;create;update;patch;delete @@ -124,23 +129,31 @@ func (r *OBTenantBackupReconciler) createBackupJobInOB(ctx context.Context, job password, err := resource.ReadPassword(r.Client, job.Namespace, job.Spec.EncryptionSecret) if err != nil { logger.Error(err, "failed to read backup encryption secret") - r.Recorder.Event(job, "Warning", "ReadBackupEncryptionSecretFailed", err.Error()) + r.getTelemetry(ctx).Event(job, "Warning", "ReadBackupEncryptionSecretFailed", err.Error()) } else if password != "" { err = con.SetBackupPassword(password) if err != nil { logger.Error(err, "failed to set backup password") - r.Recorder.Event(job, "Warning", "SetBackupPasswordFailed", err.Error()) + r.getTelemetry(ctx).Event(job, "Warning", "SetBackupPasswordFailed", err.Error()) } } } latest, err := con.CreateAndReturnBackupJob(job.Spec.Type) if err != nil { logger.Error(err, "failed to create and return backup job") + r.getTelemetry(ctx).Event(job, "Warning", "CreateAndReturnBackupJobFailed", err.Error()) return err } job.Status.BackupJob = latest - return r.Status().Update(ctx, job) + err = r.retryUpdateStatus(ctx, job) + if err != nil { + logger.Error(err, "failed to update status") + r.getTelemetry(ctx).Event(job, "Warning", "UpdateStatusFailed", err.Error()) + return err + } + r.getTelemetry(ctx).Event(job, "Create", "", "create backup job successfully") + return nil } // TODO: Calculate the progress of running jobs @@ -187,7 +200,7 @@ func (r *OBTenantBackupReconciler) maintainRunningBackupJob(ctx context.Context, case "CANCELED": job.Status.Status = constants.BackupJobStatusCanceled } - return r.Client.Status().Update(ctx, job) + return r.retryUpdateStatus(ctx, job) } func (r *OBTenantBackupReconciler) maintainRunningBackupCleanJob(ctx context.Context, job *v1alpha1.OBTenantBackup) error { @@ -219,7 +232,7 @@ func (r *OBTenantBackupReconciler) maintainRunningBackupCleanJob(ctx context.Con case "DOING": job.Status.Status = constants.BackupJobStatusRunning } - return r.Client.Status().Update(ctx, job) + return r.retryUpdateStatus(ctx, job) } return nil @@ -252,7 +265,7 @@ func (r *OBTenantBackupReconciler) maintainRunningArchiveLogJob(ctx context.Cont case "SUSPEND": job.Status.Status = constants.BackupJobStatusSuspend } - return r.Client.Status().Update(ctx, job) + return r.retryUpdateStatus(ctx, job) } return nil @@ -279,3 +292,28 @@ func (r *OBTenantBackupReconciler) getObOperationClient(ctx context.Context, job r.con = con return con, nil } + +func (r *OBTenantBackupReconciler) retryUpdateStatus(ctx context.Context, job *v1alpha1.OBTenantBackup) error { + return retry.RetryOnConflict(retry.DefaultRetry, func() error { + newestJob := &v1alpha1.OBTenantBackup{} + err := r.Get(ctx, types.NamespacedName{ + Namespace: job.GetNamespace(), + Name: job.GetName(), + }, newestJob) + if err != nil { + return client.IgnoreNotFound(err) + } + newestJob.Status = job.Status + return r.Status().Update(ctx, newestJob) + }) +} + +func (r *OBTenantBackupReconciler) getTelemetry(ctx context.Context) telemetry.Telemetry { + if r.Telemetry != nil { + return r.Telemetry + } + r.telemetryOnce.Do(func() { + r.Telemetry = telemetry.NewTelemetry(ctx, r.Recorder) + }) + return r.Telemetry +} diff --git a/pkg/controller/obtenantbackuppolicy_controller.go b/pkg/controller/obtenantbackuppolicy_controller.go index a0a80cdd8..73cf81d0e 100644 --- a/pkg/controller/obtenantbackuppolicy_controller.go +++ b/pkg/controller/obtenantbackuppolicy_controller.go @@ -27,6 +27,7 @@ import ( "github.com/oceanbase/ob-operator/api/v1alpha1" "github.com/oceanbase/ob-operator/pkg/resource" + "github.com/oceanbase/ob-operator/pkg/telemetry" ) // OBTenantBackupPolicyReconciler reconciles a OBTenantBackupPolicy object @@ -74,6 +75,7 @@ func (r *OBTenantBackupPolicyReconciler) Reconcile(ctx context.Context, req ctrl Client: r.Client, Recorder: r.Recorder, Logger: &logger, + Telemetry: telemetry.NewTelemetry(ctx, r.Recorder), } coordinator := resource.NewCoordinator(mgr, &logger) diff --git a/pkg/controller/obtenantoperation_controller.go b/pkg/controller/obtenantoperation_controller.go index a15805790..475f2374b 100644 --- a/pkg/controller/obtenantoperation_controller.go +++ b/pkg/controller/obtenantoperation_controller.go @@ -27,6 +27,7 @@ import ( v1alpha1 "github.com/oceanbase/ob-operator/api/v1alpha1" "github.com/oceanbase/ob-operator/pkg/resource" + "github.com/oceanbase/ob-operator/pkg/telemetry" ) // OBTenantOperationReconciler reconciles a OBTenantOperation object @@ -57,11 +58,12 @@ func (r *OBTenantOperationReconciler) Reconcile(ctx context.Context, req ctrl.Re } mgr := &resource.ObTenantOperationManager{ - Ctx: ctx, - Resource: operation, - Client: r.Client, - Recorder: r.Recorder, - Logger: &logger, + Ctx: ctx, + Resource: operation, + Client: r.Client, + Recorder: r.Recorder, + Logger: &logger, + Telemetry: telemetry.NewTelemetry(ctx, r.Recorder), } coordinator := resource.NewCoordinator(mgr, &logger) diff --git a/pkg/controller/obtenantrestore_controller.go b/pkg/controller/obtenantrestore_controller.go index 5acd83ded..aa90967f0 100644 --- a/pkg/controller/obtenantrestore_controller.go +++ b/pkg/controller/obtenantrestore_controller.go @@ -28,6 +28,7 @@ import ( v1alpha1 "github.com/oceanbase/ob-operator/api/v1alpha1" "github.com/oceanbase/ob-operator/pkg/resource" + "github.com/oceanbase/ob-operator/pkg/telemetry" ) // OBTenantRestoreReconciler reconciles a OBTenantRestore object @@ -58,23 +59,13 @@ func (r *OBTenantRestoreReconciler) Reconcile(ctx context.Context, req ctrl.Requ return ctrl.Result{}, client.IgnoreNotFound(err) } - // finalizerName := "obtenantrestore.finalizers.oceanbase.com" - // // examine DeletionTimestamp to determine if the policy is under deletion - // if restore.ObjectMeta.DeletionTimestamp.IsZero() { - // if !controllerutil.ContainsFinalizer(restore, finalizerName) { - // controllerutil.AddFinalizer(restore, finalizerName) - // if err := r.Update(ctx, restore); err != nil { - // return ctrl.Result{}, err - // } - // } - // } - mgr := &resource.ObTenantRestoreManager{ - Ctx: ctx, - Resource: restore, - Client: r.Client, - Recorder: r.Recorder, - Logger: &logger, + Ctx: ctx, + Resource: restore, + Client: r.Client, + Recorder: r.Recorder, + Logger: &logger, + Telemetry: telemetry.NewTelemetry(ctx, r.Recorder), } coordinator := resource.NewCoordinator(mgr, &logger) diff --git a/pkg/controller/obzone_controller.go b/pkg/controller/obzone_controller.go index 23a8d243b..818947be1 100644 --- a/pkg/controller/obzone_controller.go +++ b/pkg/controller/obzone_controller.go @@ -28,6 +28,7 @@ import ( v1alpha1 "github.com/oceanbase/ob-operator/api/v1alpha1" "github.com/oceanbase/ob-operator/pkg/resource" + "github.com/oceanbase/ob-operator/pkg/telemetry" ) // OBZoneReconciler reconciles a OBZone object @@ -71,11 +72,12 @@ func (r *OBZoneReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr // create zone manager obzoneManager := &resource.OBZoneManager{ - Ctx: ctx, - OBZone: obzone, - Client: r.Client, - Recorder: r.Recorder, - Logger: &logger, + Ctx: ctx, + OBZone: obzone, + Client: r.Client, + Recorder: r.Recorder, + Logger: &logger, + Telemetry: telemetry.NewTelemetry(ctx, r.Recorder), } coordinator := resource.NewCoordinator(obzoneManager, &logger) return coordinator.Coordinate() diff --git a/pkg/resource/obcluster_manager.go b/pkg/resource/obcluster_manager.go index 863d556fc..7892d4f89 100644 --- a/pkg/resource/obcluster_manager.go +++ b/pkg/resource/obcluster_manager.go @@ -31,6 +31,7 @@ import ( taskname "github.com/oceanbase/ob-operator/pkg/task/const/task/name" taskstatus "github.com/oceanbase/ob-operator/pkg/task/const/task/status" "github.com/oceanbase/ob-operator/pkg/task/strategy" + "github.com/oceanbase/ob-operator/pkg/telemetry" ) type OBClusterManager struct { @@ -39,6 +40,7 @@ type OBClusterManager struct { OBCluster *v1alpha1.OBCluster Client client.Client Recorder record.EventRecorder + Telemetry telemetry.Telemetry Logger *logr.Logger } @@ -48,6 +50,7 @@ func (m *OBClusterManager) IsNewResource() bool { func (m *OBClusterManager) InitStatus() { m.Logger.Info("newly created cluster, init status") + m.Telemetry.Event(m.OBCluster, "Init", "", "newly created cluster, init status") status := v1alpha1.OBClusterStatus{ Image: m.OBCluster.Spec.OBServerTemplate.Image, Status: clusterstatus.New, @@ -309,7 +312,7 @@ func (m *OBClusterManager) GetTaskFunc(name string) (func() error, error) { } func (m *OBClusterManager) PrintErrEvent(err error) { - m.Recorder.Event(m.OBCluster, corev1.EventTypeWarning, "task exec failed", err.Error()) + m.Telemetry.Event(m.OBCluster, corev1.EventTypeWarning, "task exec failed", err.Error()) } func (m *OBClusterManager) listOBZones() (*v1alpha1.OBZoneList, error) { diff --git a/pkg/resource/obcluster_task.go b/pkg/resource/obcluster_task.go index ae33f06f1..22d374de8 100644 --- a/pkg/resource/obcluster_task.go +++ b/pkg/resource/obcluster_task.go @@ -292,6 +292,8 @@ func (m *OBClusterManager) Bootstrap() error { err = manager.Bootstrap(bootstrapServers) if err != nil { m.Logger.Error(err, "bootstrap failed") + } else { + m.Telemetry.Event(m.OBCluster, "Bootstrap", "", "Bootstrap successfully") } return err } @@ -780,7 +782,12 @@ func (m *OBClusterManager) CreateServiceForMonitor() error { Type: corev1.ServiceTypeClusterIP, }, } - return m.Client.Create(m.Ctx, &monitorService) + err := m.Client.Create(m.Ctx, &monitorService) + if err != nil { + return errors.Wrap(err, "Create monitor service") + } + m.Telemetry.Event(m.OBCluster, "MaintainedAfterBootstrap", "", "Create monitor service successfully") + return nil } func (m *OBClusterManager) RestoreEssentialParameters() error { @@ -798,9 +805,9 @@ func (m *OBClusterManager) RestoreEssentialParameters() error { }, contextSecret) if err != nil { m.Logger.Error(err, "Failed to get context secret") - return nil // parameter can be set manually, just return here and emit an event - // TODO: emit an event + m.Telemetry.Event(m.OBCluster, "Warning", "Restore essential parameters failed", err.Error()) + return nil } encodedParameters := string(contextSecret.Data[oceanbaseconst.EssentialParametersKey]) @@ -820,5 +827,6 @@ func (m *OBClusterManager) RestoreEssentialParameters() error { } } _ = m.Client.Delete(m.Ctx, contextSecret) + m.Telemetry.Event(m.OBCluster, "Upgrade", "", "Restore essential parameters successfully") return nil } diff --git a/pkg/resource/obparameter_manager.go b/pkg/resource/obparameter_manager.go index 16c3dc771..103f76539 100644 --- a/pkg/resource/obparameter_manager.go +++ b/pkg/resource/obparameter_manager.go @@ -27,6 +27,7 @@ import ( "github.com/oceanbase/ob-operator/pkg/oceanbase/operation" taskstatus "github.com/oceanbase/ob-operator/pkg/task/const/task/status" "github.com/oceanbase/ob-operator/pkg/task/strategy" + "github.com/oceanbase/ob-operator/pkg/telemetry" v1alpha1 "github.com/oceanbase/ob-operator/api/v1alpha1" oceanbaseconst "github.com/oceanbase/ob-operator/pkg/const/oceanbase" @@ -43,6 +44,7 @@ type OBParameterManager struct { OBParameter *v1alpha1.OBParameter Client client.Client Recorder record.EventRecorder + Telemetry telemetry.Telemetry Logger *logr.Logger } diff --git a/pkg/resource/observer_manager.go b/pkg/resource/observer_manager.go index 00feb563f..c181db930 100644 --- a/pkg/resource/observer_manager.go +++ b/pkg/resource/observer_manager.go @@ -18,6 +18,7 @@ import ( "github.com/oceanbase/ob-operator/pkg/oceanbase/model" taskstatus "github.com/oceanbase/ob-operator/pkg/task/const/task/status" "github.com/oceanbase/ob-operator/pkg/task/strategy" + "github.com/oceanbase/ob-operator/pkg/telemetry" corev1 "k8s.io/api/core/v1" kubeerrors "k8s.io/apimachinery/pkg/api/errors" @@ -42,11 +43,12 @@ import ( type OBServerManager struct { ResourceManager - Ctx context.Context - OBServer *v1alpha1.OBServer - Client client.Client - Recorder record.EventRecorder - Logger *logr.Logger + Ctx context.Context + OBServer *v1alpha1.OBServer + Client client.Client + Recorder record.EventRecorder + Telemetry telemetry.Telemetry + Logger *logr.Logger } func (m *OBServerManager) GetTaskFunc(name string) (func() error, error) { diff --git a/pkg/resource/obtenant_manager.go b/pkg/resource/obtenant_manager.go index 1575cc6da..6ed21f4c0 100644 --- a/pkg/resource/obtenant_manager.go +++ b/pkg/resource/obtenant_manager.go @@ -39,15 +39,17 @@ import ( taskname "github.com/oceanbase/ob-operator/pkg/task/const/task/name" taskstatus "github.com/oceanbase/ob-operator/pkg/task/const/task/status" "github.com/oceanbase/ob-operator/pkg/task/strategy" + "github.com/oceanbase/ob-operator/pkg/telemetry" ) type OBTenantManager struct { ResourceManager - OBTenant *v1alpha1.OBTenant - Ctx context.Context - Client client.Client - Recorder record.EventRecorder - Logger *logr.Logger + OBTenant *v1alpha1.OBTenant + Ctx context.Context + Client client.Client + Recorder record.EventRecorder + Telemetry telemetry.Telemetry + Logger *logr.Logger } // TODO add lock to be thread safe, and read/write whitelist from/to DB @@ -88,9 +90,12 @@ func (m *OBTenantManager) InitStatus() { if m.OBTenant.Spec.Source != nil && m.OBTenant.Spec.Source.Restore != nil { m.OBTenant.Status.Status = tenantstatus.Restoring + m.Telemetry.Event(m.OBTenant, "InitRestore", "", "start restoring") } else if m.OBTenant.Spec.Source != nil && m.OBTenant.Spec.Source.Tenant != nil { + m.Telemetry.Event(m.OBTenant, "InitEmptyStandby", "", "start creating empty standby") m.OBTenant.Status.Status = tenantstatus.CreatingEmptyStandby } else { + m.Telemetry.Event(m.OBTenant, "Init", "", "start creating") m.OBTenant.Status.Status = tenantstatus.CreatingTenant } } @@ -328,7 +333,7 @@ func (m *OBTenantManager) GetTaskFlow() (*task.TaskFlow, error) { } func (m *OBTenantManager) PrintErrEvent(err error) { - m.Recorder.Event(m.OBTenant, corev1.EventTypeWarning, "task exec failed", err.Error()) + m.Telemetry.Event(m.OBTenant, corev1.EventTypeWarning, "task exec failed", err.Error()) } // ---------- K8S API Helper ---------- diff --git a/pkg/resource/obtenant_task.go b/pkg/resource/obtenant_task.go index a286a96ee..827432899 100644 --- a/pkg/resource/obtenant_task.go +++ b/pkg/resource/obtenant_task.go @@ -386,10 +386,12 @@ func (m *OBTenantManager) createTenant() error { err = oceanbaseOperationManager.AddTenant(tenantSQLParam) if err != nil { + m.Telemetry.Event(m.OBTenant, corev1.EventTypeWarning, "failed to create OBTenant", err.Error()) return err } GlobalWhiteListMap[tenantName] = m.OBTenant.Spec.ConnectWhiteList // Create user or change password of root, do not return error + m.Telemetry.Event(m.OBTenant, "Create", "", "create OBTenant successfully") return nil } @@ -1086,7 +1088,7 @@ func (m *OBTenantManager) CreateUserWithCredentialSecrets() error { } err := m.createUserWithCredentials() if err != nil { - m.Recorder.Event(m.OBTenant, corev1.EventTypeWarning, "Failed to create user or change password", err.Error()) + m.Telemetry.Event(m.OBTenant, corev1.EventTypeWarning, "Failed to create user or change password", err.Error()) m.Logger.Error(err, "Failed to create user or change password, please check the credential secrets") } @@ -1179,6 +1181,7 @@ func (m *OBTenantManager) CreateEmptyStandbyTenant() error { if err != nil { return err } + m.Telemetry.Event(m.OBTenant, "CreateEmptyStandby", "", "Succeed to create empty standby tenant") return nil } diff --git a/pkg/resource/obtenantbackuppolicy_manager.go b/pkg/resource/obtenantbackuppolicy_manager.go index 9ef4e7b22..151647506 100644 --- a/pkg/resource/obtenantbackuppolicy_manager.go +++ b/pkg/resource/obtenantbackuppolicy_manager.go @@ -37,6 +37,7 @@ import ( taskname "github.com/oceanbase/ob-operator/pkg/task/const/task/name" taskstatus "github.com/oceanbase/ob-operator/pkg/task/const/task/status" "github.com/oceanbase/ob-operator/pkg/task/strategy" + "github.com/oceanbase/ob-operator/pkg/telemetry" ) type ObTenantBackupPolicyManager struct { @@ -45,6 +46,7 @@ type ObTenantBackupPolicyManager struct { BackupPolicy *v1alpha1.OBTenantBackupPolicy Client client.Client Recorder record.EventRecorder + Telemetry telemetry.Telemetry Logger *logr.Logger con *operation.OceanbaseOperationManager @@ -115,6 +117,7 @@ func (m *ObTenantBackupPolicyManager) InitStatus() { m.BackupPolicy.Status = v1alpha1.OBTenantBackupPolicyStatus{ Status: constants.BackupPolicyStatusPreparing, } + m.Telemetry.Event(m.BackupPolicy, "Init", "", "init status") err = m.syncTenantInformation() if err != nil { m.PrintErrEvent(err) diff --git a/pkg/resource/obtenantoperation_manager.go b/pkg/resource/obtenantoperation_manager.go index a30501617..f1e02a2be 100644 --- a/pkg/resource/obtenantoperation_manager.go +++ b/pkg/resource/obtenantoperation_manager.go @@ -33,16 +33,18 @@ import ( taskname "github.com/oceanbase/ob-operator/pkg/task/const/task/name" taskstatus "github.com/oceanbase/ob-operator/pkg/task/const/task/status" "github.com/oceanbase/ob-operator/pkg/task/strategy" + "github.com/oceanbase/ob-operator/pkg/telemetry" ) type ObTenantOperationManager struct { ResourceManager - Ctx context.Context - Resource *v1alpha1.OBTenantOperation - Client client.Client - Recorder record.EventRecorder - Logger *logr.Logger + Ctx context.Context + Resource *v1alpha1.OBTenantOperation + Client client.Client + Recorder record.EventRecorder + Telemetry telemetry.Telemetry + Logger *logr.Logger con *operation.OceanbaseOperationManager } diff --git a/pkg/resource/obtenantrestore_manager.go b/pkg/resource/obtenantrestore_manager.go index d294042a4..e4d3c92e5 100644 --- a/pkg/resource/obtenantrestore_manager.go +++ b/pkg/resource/obtenantrestore_manager.go @@ -33,16 +33,18 @@ import ( taskname "github.com/oceanbase/ob-operator/pkg/task/const/task/name" taskstatus "github.com/oceanbase/ob-operator/pkg/task/const/task/status" "github.com/oceanbase/ob-operator/pkg/task/strategy" + "github.com/oceanbase/ob-operator/pkg/telemetry" ) type ObTenantRestoreManager struct { ResourceManager - Ctx context.Context - Resource *v1alpha1.OBTenantRestore - Client client.Client - Recorder record.EventRecorder - Logger *logr.Logger + Ctx context.Context + Resource *v1alpha1.OBTenantRestore + Client client.Client + Recorder record.EventRecorder + Telemetry telemetry.Telemetry + Logger *logr.Logger con *operation.OceanbaseOperationManager } diff --git a/pkg/resource/obtenantrestore_task.go b/pkg/resource/obtenantrestore_task.go index 2f6a8f1b6..e729d2069 100644 --- a/pkg/resource/obtenantrestore_task.go +++ b/pkg/resource/obtenantrestore_task.go @@ -112,11 +112,13 @@ func (m *OBTenantManager) WatchRestoreJobToFinish() error { if runningRestore.Status.Status == constants.RestoreJobSuccessful { break } else if runningRestore.Status.Status == constants.RestoreJobFailed { + m.Telemetry.Event(m.OBTenant, "RestoreJobFailed", "", "restore job failed") return errors.New("Restore job failed") } time.Sleep(5 * time.Second) } GlobalWhiteListMap[m.OBTenant.Spec.TenantName] = m.OBTenant.Spec.ConnectWhiteList + m.Telemetry.Event(m.OBTenant, "RestoreJobFinished", "", "restore job finished successfully") return nil } diff --git a/pkg/resource/obzone_manager.go b/pkg/resource/obzone_manager.go index 84a94a9fd..b4a5a4de3 100644 --- a/pkg/resource/obzone_manager.go +++ b/pkg/resource/obzone_manager.go @@ -35,15 +35,17 @@ import ( taskname "github.com/oceanbase/ob-operator/pkg/task/const/task/name" taskstatus "github.com/oceanbase/ob-operator/pkg/task/const/task/status" "github.com/oceanbase/ob-operator/pkg/task/strategy" + "github.com/oceanbase/ob-operator/pkg/telemetry" ) type OBZoneManager struct { ResourceManager - Ctx context.Context - OBZone *v1alpha1.OBZone - Client client.Client - Recorder record.EventRecorder - Logger *logr.Logger + Ctx context.Context + OBZone *v1alpha1.OBZone + Client client.Client + Recorder record.EventRecorder + Telemetry telemetry.Telemetry + Logger *logr.Logger } func (m *OBZoneManager) IsNewResource() bool { diff --git a/pkg/resource/template_manager.go b/pkg/resource/template_manager.go index 3f6bdf6ab..eb0453f41 100644 --- a/pkg/resource/template_manager.go +++ b/pkg/resource/template_manager.go @@ -23,16 +23,18 @@ import ( v1alpha1 "github.com/oceanbase/ob-operator/api/v1alpha1" "github.com/oceanbase/ob-operator/pkg/oceanbase/operation" "github.com/oceanbase/ob-operator/pkg/task" + "github.com/oceanbase/ob-operator/pkg/telemetry" ) type ObResourceManager[T client.Object] struct { ResourceManager - Ctx context.Context - Resource T - Client client.Client - Recorder record.EventRecorder - Logger *logr.Logger + Ctx context.Context + Resource T + Client client.Client + Recorder record.EventRecorder + Logger *logr.Logger + Telemetry telemetry.Telemetry con *operation.OceanbaseOperationManager } diff --git a/pkg/telemetry/logging.go b/pkg/telemetry/logging.go new file mode 100644 index 000000000..85ec7eead --- /dev/null +++ b/pkg/telemetry/logging.go @@ -0,0 +1,44 @@ +/* +Copyright (c) 2023 OceanBase +ob-operator is licensed under Mulan PSL v2. +You can use this software according to the terms and conditions of the Mulan PSL v2. +You may obtain a copy of Mulan PSL v2 at: + http://license.coscl.org.cn/MulanPSL2 +THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, +EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, +MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. +See the Mulan PSL v2 for more details. +*/ + +package telemetry + +import ( + "io" + "log" + "os" + "sync" +) + +var lg *log.Logger +var loggerOnce sync.Once +var debugMode = os.Getenv(TelemetryDebugEnvName) == "true" + +func configLogger() { + if debugMode { + file, err := os.OpenFile("/tmp/telemetry.log", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666) + if err != nil { + // log.Println("Failed to open log file:", err) + lg = log.New(io.Discard, "[Telemetry] ", log.LstdFlags|log.Lshortfile) + } else { + lg = log.New(file, "[Telemetry] ", log.LstdFlags|log.Lshortfile) + } + } else { + // if not in debug mode, discard all logs + lg = log.New(io.Discard, "[Telemetry] ", log.LstdFlags|log.Lshortfile) + } +} + +func getLogger() *log.Logger { + loggerOnce.Do(configLogger) + return lg +} diff --git a/pkg/telemetry/sentry.go b/pkg/telemetry/sentry.go index 3bf09d940..42dee82f4 100644 --- a/pkg/telemetry/sentry.go +++ b/pkg/telemetry/sentry.go @@ -13,7 +13,7 @@ See the Mulan PSL v2 for more details. package telemetry import ( - "fmt" + "k8s.io/apimachinery/pkg/runtime" "github.com/oceanbase/ob-operator/api/v1alpha1" ) @@ -23,33 +23,28 @@ func objectSentry(object any) { return } if cluster, ok := object.(*v1alpha1.OBCluster); ok { - processOBCluster(cluster) + debugWrapper(processOBCluster, cluster, "OBCluster") } else if tenant, ok := object.(*v1alpha1.OBTenant); ok { - processOBTenant(tenant) + debugWrapper(processOBTenant, tenant, "OBTenant") } else if server, ok := object.(*v1alpha1.OBServer); ok { - processOBServer(server) + debugWrapper(processOBServer, server, "OBServer") } else if zone, ok := object.(*v1alpha1.OBZone); ok { - processOBZone(zone) + debugWrapper(processOBZone, zone, "OBZone") } } func processOBCluster(cluster *v1alpha1.OBCluster) { - _, _ = fmt.Printf("[OBCluster Before] %+v\n", cluster) if cluster.Spec.BackupVolume != nil && cluster.Spec.BackupVolume.Volume != nil && cluster.Spec.BackupVolume.Volume.NFS != nil { cluster.Spec.BackupVolume.Volume.NFS.Server = md5Hash(cluster.Spec.BackupVolume.Volume.NFS.Server) } - _, _ = fmt.Printf("[OBCluster After] %+v\n", cluster) } func processOBServer(server *v1alpha1.OBServer) { - _, _ = fmt.Printf("[OBServer Before] %+v\n", server) server.Status.PodIp = md5Hash(server.Status.PodIp) server.Status.NodeIp = md5Hash(server.Status.NodeIp) - _, _ = fmt.Printf("[OBServer After] %+v\n", server) } func processOBTenant(tenant *v1alpha1.OBTenant) { - _, _ = fmt.Printf("[OBTenant After] %+v\n", tenant) for i := range tenant.Status.Pools { for j := range tenant.Status.Pools[i].Units { tenant.Status.Pools[i].Units[j].ServerIP = md5Hash(tenant.Status.Pools[i].Units[j].ServerIP) @@ -58,16 +53,19 @@ func processOBTenant(tenant *v1alpha1.OBTenant) { } } } - _, _ = fmt.Printf("[OBTenant After] %+v\n", tenant) } func processOBZone(zone *v1alpha1.OBZone) { - _, _ = fmt.Printf("[OBZone Before] %+v\n", zone) for i := range zone.Status.OBServerStatus { zone.Status.OBServerStatus[i].Server = md5Hash(zone.Status.OBServerStatus[i].Server) } if zone.Spec.BackupVolume != nil && zone.Spec.BackupVolume.Volume != nil && zone.Spec.BackupVolume.Volume.NFS != nil { zone.Spec.BackupVolume.Volume.NFS.Server = md5Hash(zone.Spec.BackupVolume.Volume.NFS.Server) } - _, _ = fmt.Printf("[OBZone After] %+v\n", zone) +} + +func debugWrapper[T runtime.Object](processor func(T), object T, objectType string) { + getLogger().Printf("[%s Before] %+v\n", objectType, object) + processor(object) + getLogger().Printf("[%s After] %+v\n", objectType, object) } diff --git a/pkg/telemetry/telemetry.go b/pkg/telemetry/telemetry.go index 322deb4ae..acd832ba8 100644 --- a/pkg/telemetry/telemetry.go +++ b/pkg/telemetry/telemetry.go @@ -18,6 +18,7 @@ import ( "os" "time" + corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" record "k8s.io/client-go/tools/record" @@ -36,11 +37,13 @@ type telemetry struct { *hostMetrics record.EventRecorder + ctx context.Context telemetryDisabled bool } -func NewTelemetry(recorder record.EventRecorder) Telemetry { +func NewTelemetry(ctx context.Context, recorder record.EventRecorder) Telemetry { clt := &telemetry{ + ctx: ctx, EventRecorder: recorder, } @@ -61,19 +64,19 @@ func NewTelemetry(recorder record.EventRecorder) Telemetry { // Implement record.EventRecorder interface func (t *telemetry) Event(object runtime.Object, eventType, reason, message string) { - t.EventRecorder.Event(object, eventType, reason, message) + t.EventRecorder.Event(object, t.transformEventType(eventType), reason, message) t.generateFromEvent(object, nil, eventType, reason, message) } // Implement record.EventRecorder interface func (t *telemetry) Eventf(object runtime.Object, eventType, reason, messageFmt string, args ...any) { - t.EventRecorder.Eventf(object, eventType, reason, messageFmt, args...) + t.EventRecorder.Eventf(object, t.transformEventType(eventType), reason, messageFmt, args...) t.generateFromEvent(object, nil, eventType, reason, messageFmt, args...) } // Implement record.EventRecorder interface func (t *telemetry) AnnotatedEventf(object runtime.Object, annotations map[string]string, eventType, reason, messageFmt string, args ...any) { - t.EventRecorder.AnnotatedEventf(object, annotations, eventType, reason, messageFmt, args...) + t.EventRecorder.AnnotatedEventf(object, annotations, t.transformEventType(eventType), reason, messageFmt, args...) t.generateFromEvent(object, annotations, eventType, reason, messageFmt, args...) } @@ -112,3 +115,12 @@ func (t *telemetry) generateFromEvent(object runtime.Object, annotations map[str t.GenerateTelemetryRecord(object.DeepCopyObject(), object.GetObjectKind().GroupVersionKind().Kind, eventType, reason, fmt.Sprintf(messageFmt, args...), annotations) } } + +func (t *telemetry) transformEventType(eventType string) string { + switch eventType { + case "Error", "Warning", "error": + return corev1.EventTypeWarning + default: + return corev1.EventTypeNormal + } +} diff --git a/pkg/telemetry/telemetry_test.go b/pkg/telemetry/telemetry_test.go index ba0b27790..338ca9ccc 100644 --- a/pkg/telemetry/telemetry_test.go +++ b/pkg/telemetry/telemetry_test.go @@ -13,6 +13,7 @@ See the Mulan PSL v2 for more details. package telemetry import ( + "context" "time" . "github.com/onsi/ginkgo/v2" @@ -42,7 +43,7 @@ var _ = Describe("Telemetry", Label("telemetry"), Ordered, func() { } BeforeAll(func() { - telemetry = NewTelemetry(&fakeEventRecorder{}) + telemetry = NewTelemetry(context.TODO(), &fakeEventRecorder{}) Expect(telemetry).ShouldNot(BeNil()) }) diff --git a/pkg/telemetry/throttler.go b/pkg/telemetry/throttler.go index 58d1cd10f..a4d1d627a 100644 --- a/pkg/telemetry/throttler.go +++ b/pkg/telemetry/throttler.go @@ -14,18 +14,15 @@ package telemetry import ( "context" - "fmt" "io" "net/http" "net/url" - "os" "sync" "github.com/oceanbase/ob-operator/pkg/telemetry/models" ) type throttler struct { - debug bool client http.Client ctx context.Context cancel context.CancelFunc @@ -41,13 +38,13 @@ func getThrottler() *throttler { recordChan: make(chan *models.TelemetryRecord, DefaultThrottlerBufferSize), } - throttlerSingleton.debug = os.Getenv(TelemetryDebugEnvName) == "true" ctx, cancel := context.WithCancel(context.Background()) throttlerSingleton.ctx = ctx throttlerSingleton.cancel = cancel throttlerSingleton.client = *http.DefaultClient throttlerSingleton.startWorkers() + getLogger().Println("telemetry throttler started", "#worker:", DefaultThrottlerWorkerCount) }) return throttlerSingleton } @@ -96,18 +93,18 @@ func (t *throttler) startWorkers() { return } res, err := t.sendTelemetryRecord(record) - if t.debug { + if debugMode { if err != nil { - _, _ = fmt.Printf("send telemetry record error: %v\n", err) + getLogger().Printf("send telemetry record error: %v\n", err) } bts, err := io.ReadAll(res.Body) if err != nil { - _, _ = fmt.Printf("read response body error: %v\n", err) + getLogger().Printf("read response body error: %v\n", err) } - _, _ = fmt.Printf("[Event %s.%s] %s\n", record.ResourceType, record.EventType, string(bts)) + getLogger().Printf("[Event %s.%s] %s\n", record.ResourceType, record.EventType, string(bts)) } case <-ctx.Done(): - _, _ = fmt.Println(ctx.Err()) + getLogger().Println(ctx.Err()) return } }