From 449d1ecd18a8675a3f058273a7c3265354cee55f Mon Sep 17 00:00:00 2001 From: Zespre Chang Date: Thu, 15 Aug 2024 11:56:21 +0800 Subject: [PATCH] fix(upgradelog): adapt to new extravolume mechanics As for logging-operator v4.4.0, the way ExtraVolume is handled is different. It's impossible to make the fluentd Pod associate with a pre-created PVC. The PVC has now been created along with the fluentd StatefulSet. The UpgradeLog mechanics adapt the new upstream behavior to reconcile the StatefulSet populated PVC instead of creating a new one. An UpgradeLog OwnerReference will be added to the PVC to make it live "longer" than its original owner/creator, i.e., the Logging object. Signed-off-by: Zespre Chang --- pkg/api/upgradelog/handler.go | 2 +- pkg/controller/master/upgradelog/common.go | 80 ++++++++----- .../master/upgradelog/controller.go | 75 +++++++++++- .../master/upgradelog/controller_test.go | 107 +++++++++++++++++- pkg/controller/master/upgradelog/register.go | 2 + 5 files changed, 225 insertions(+), 41 deletions(-) diff --git a/pkg/api/upgradelog/handler.go b/pkg/api/upgradelog/handler.go index 72e9feab63..f0302e38da 100644 --- a/pkg/api/upgradelog/handler.go +++ b/pkg/api/upgradelog/handler.go @@ -334,7 +334,7 @@ func prepareLogPackager(upgradeLog *harvesterv1.UpgradeLog, imageVersion, archiv Name: "log-archive", VolumeSource: corev1.VolumeSource{ PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ - ClaimName: name.SafeConcatName(upgradeLog.Name, util.UpgradeLogArchiveComponent), + ClaimName: ctlupgradelog.GetUpgradeLogPvcName(upgradeLog), }, }, }, diff --git a/pkg/controller/master/upgradelog/common.go b/pkg/controller/master/upgradelog/common.go index cba099364a..43ffef323d 100644 --- a/pkg/controller/master/upgradelog/common.go +++ b/pkg/controller/master/upgradelog/common.go @@ -2,6 +2,7 @@ package upgradelog import ( "fmt" + "strings" "github.com/cisco-open/operator-tools/pkg/volume" loggingv1 "github.com/kube-logging/logging-operator/pkg/sdk/logging/api/v1beta1" @@ -15,6 +16,7 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" harvesterv1 "github.com/harvester/harvester/pkg/apis/harvesterhci.io/v1beta1" @@ -35,35 +37,6 @@ func upgradeLogReference(upgradeLog *harvesterv1.UpgradeLog) metav1.OwnerReferen } } -func preparePvc(upgradeLog *harvesterv1.UpgradeLog) *corev1.PersistentVolumeClaim { - volumeMode := corev1.PersistentVolumeFilesystem - - return &corev1.PersistentVolumeClaim{ - ObjectMeta: metav1.ObjectMeta{ - Labels: map[string]string{ - util.LabelUpgradeLog: upgradeLog.Name, - util.LabelUpgradeLogComponent: util.UpgradeLogArchiveComponent, - }, - Name: name.SafeConcatName(upgradeLog.Name, util.UpgradeLogArchiveComponent), - Namespace: util.HarvesterSystemNamespaceName, - OwnerReferences: []metav1.OwnerReference{ - upgradeLogReference(upgradeLog), - }, - }, - Spec: corev1.PersistentVolumeClaimSpec{ - AccessModes: []corev1.PersistentVolumeAccessMode{ - corev1.ReadWriteOnce, - }, - Resources: corev1.VolumeResourceRequirements{ - Requests: corev1.ResourceList{ - "storage": resource.MustParse(defaultLogArchiveVolumeSize), - }, - }, - VolumeMode: &volumeMode, - }, - } -} - func prepareOperator(upgradeLog *harvesterv1.UpgradeLog) *mgmtv3.ManagedChart { operatorName := name.SafeConcatName(upgradeLog.Name, util.UpgradeLogOperatorComponent) return &mgmtv3.ManagedChart{ @@ -98,6 +71,8 @@ func prepareOperator(upgradeLog *harvesterv1.UpgradeLog) *mgmtv3.ManagedChart { } func prepareLogging(upgradeLog *harvesterv1.UpgradeLog, images map[string]Image) *loggingv1.Logging { + volumeMode := corev1.PersistentVolumeFilesystem + return &loggingv1.Logging{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{ @@ -144,9 +119,20 @@ func prepareLogging(upgradeLog *harvesterv1.UpgradeLog, images map[string]Image) Volume: &volume.KubernetesVolume{ PersistentVolumeClaim: &volume.PersistentVolumeClaim{ PersistentVolumeSource: corev1.PersistentVolumeClaimVolumeSource{ - ClaimName: name.SafeConcatName(upgradeLog.Name, util.UpgradeLogArchiveComponent), + ClaimName: util.UpgradeLogArchiveComponent, ReadOnly: false, }, + PersistentVolumeClaimSpec: corev1.PersistentVolumeClaimSpec{ + AccessModes: []corev1.PersistentVolumeAccessMode{ + corev1.ReadWriteOnce, + }, + Resources: corev1.VolumeResourceRequirements{ + Requests: corev1.ResourceList{ + "storage": resource.MustParse(defaultLogArchiveVolumeSize), + }, + }, + VolumeMode: &volumeMode, + }, }, }, }, @@ -372,7 +358,7 @@ func prepareLogDownloader(upgradeLog *harvesterv1.UpgradeLog, imageVersion strin Name: "log-archive", VolumeSource: corev1.VolumeSource{ PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ - ClaimName: name.SafeConcatName(upgradeLog.Name, util.UpgradeLogArchiveComponent), + ClaimName: GetUpgradeLogPvcName(upgradeLog), ReadOnly: true, }, }, @@ -416,6 +402,23 @@ func prepareLogDownloaderSvc(upgradeLog *harvesterv1.UpgradeLog) *corev1.Service } } +// Returns the name of the log-archive PVC, which is created by the fluentd StatefulSet. +// +// The name will look like: -infra-log-archive--infra-fluentd-0 +// For instance: hvst-upgrade-bczl4-upgradelog-infra-log-archive-hvst-upgrade-bczl4-upgradelog-infra-fluentd-0. +// +// TODO: As of rancher-logging v4.4.0, we use the PVC created by the fluentd StatefulSet, not making it ourselves. After v4.6.0, we need to revisit here and perhaps update the implementation because the upstream behavior changes. +func GetUpgradeLogPvcName(upgradeLog *harvesterv1.UpgradeLog) string { + return strings.Join([]string{ + upgradeLog.Name, + util.UpgradeLogInfraComponent, + util.UpgradeLogArchiveComponent, + upgradeLog.Name, + util.UpgradeLogInfraComponent, + "fluentd-0", + }, "-") +} + func setOperatorDeployedCondition(upgradeLog *harvesterv1.UpgradeLog, status corev1.ConditionStatus, reason, message string) { harvesterv1.LoggingOperatorDeployed.SetStatus(upgradeLog, string(status)) harvesterv1.LoggingOperatorDeployed.Reason(upgradeLog, reason) @@ -836,6 +839,21 @@ func (p *pvcBuilder) WithLabel(key, value string) *pvcBuilder { return p } +func (p *pvcBuilder) OwnerReference(name, uid string) *pvcBuilder { + newOwnerReferences := []metav1.OwnerReference{{ + Name: name, + UID: types.UID(uid), + }} + + if len(p.pvc.OwnerReferences) == 0 { + p.pvc.OwnerReferences = newOwnerReferences + } else { + p.pvc.OwnerReferences = append(p.pvc.OwnerReferences, newOwnerReferences...) + } + + return p +} + func (p *pvcBuilder) Build() *corev1.PersistentVolumeClaim { return p.pvc } diff --git a/pkg/controller/master/upgradelog/controller.go b/pkg/controller/master/upgradelog/controller.go index ea81934b3d..49057d6542 100644 --- a/pkg/controller/master/upgradelog/controller.go +++ b/pkg/controller/master/upgradelog/controller.go @@ -19,8 +19,10 @@ import ( appsv1 "k8s.io/api/apps/v1" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" harvesterv1 "github.com/harvester/harvester/pkg/apis/harvesterhci.io/v1beta1" ctlharvesterv1 "github.com/harvester/harvester/pkg/generated/controllers/harvesterhci.io/v1beta1" @@ -43,6 +45,15 @@ const ( upgradeLogStateAnnotation = "harvesterhci.io/upgradeLogState" upgradeLogStateCollecting = "Collecting" upgradeLogStateStopped = "Stopped" + + appLabelName = "app.kubernetes.io/name" +) + +var ( + logArchiveMatchingLabels = labels.Set{ + appLabelName: "fluentd", + util.LabelUpgradeLogComponent: util.UpgradeLogAggregatorComponent, + } ) type handler struct { @@ -146,10 +157,6 @@ func (h *handler) OnUpgradeLogChange(_ string, upgradeLog *harvesterv1.UpgradeLo toUpdate := upgradeLog.DeepCopy() - // The volume acts as a central log storage for fluentd - if _, err := h.pvcClient.Create(preparePvc(upgradeLog)); err != nil && !apierrors.IsAlreadyExists(err) { - return nil, err - } // The creation of the Logging resource will indirectly bring up fluent-bit DaemonSet and fluentd StatefulSet candidateImages, err := h.getConsolidatedLoggingImageList(name.SafeConcatName(upgradeLog.Name, util.UpgradeLogOperatorComponent)) if err != nil { @@ -581,6 +588,66 @@ func (h *handler) OnStatefulSetChange(_ string, statefulSet *appsv1.StatefulSet) return statefulSet, err } +func (h *handler) OnPvcChange(_ string, pvc *corev1.PersistentVolumeClaim) (*corev1.PersistentVolumeClaim, error) { + if pvc == nil || pvc.DeletionTimestamp != nil || pvc.Namespace != util.HarvesterSystemNamespaceName { + return pvc, nil + } + + // We only care about the log-archive PVC created by the fluentd StatefulSet + pvcLabels := labels.Set(pvc.Labels) + if !logArchiveMatchingLabels.AsSelector().Matches(pvcLabels) { + return pvc, nil + } + upgradeLogName, ok := pvc.Labels[util.LabelUpgradeLog] + if !ok { + return pvc, nil + } + + upgradeLog, err := h.upgradeLogCache.Get(util.HarvesterSystemNamespaceName, upgradeLogName) + if err != nil { + if !errors.IsNotFound(err) { + return pvc, err + } + logrus.WithFields(logrus.Fields{ + "namespace": pvc.Namespace, + "name": pvc.Name, + "kind": pvc.Kind, + }).Warn("upgradelog not found, skip it") + return pvc, nil + } + + newOwnerRef := metav1.OwnerReference{ + Name: upgradeLog.Name, + APIVersion: upgradeLog.APIVersion, + UID: upgradeLog.UID, + Kind: upgradeLog.Kind, + } + + // Check if the OwnerReference already exists + for _, ownerRef := range pvc.OwnerReferences { + if ownerRef.UID == newOwnerRef.UID { + return pvc, nil + } + } + + // Add UpgradeLog as an owner of the log-archive PVC because we want it to + // live longer than its original owner, i.e., Logging, so pods like the + // downloader and packager can still access the log-archive volume. + toUpdate := pvc.DeepCopy() + toUpdate.OwnerReferences = append(toUpdate.OwnerReferences, newOwnerRef) + + if !reflect.DeepEqual(pvc, toUpdate) { + logrus.WithFields(logrus.Fields{ + "namespace": pvc.Namespace, + "name": pvc.Name, + "kind": pvc.Kind, + }).Info("updating ownerReference") + return h.pvcClient.Update(toUpdate) + } + + return pvc, nil +} + func (h *handler) OnUpgradeChange(_ string, upgrade *harvesterv1.Upgrade) (*harvesterv1.Upgrade, error) { if upgrade == nil || upgrade.DeletionTimestamp != nil || upgrade.Labels == nil || upgrade.Namespace != util.HarvesterSystemNamespaceName { return upgrade, nil diff --git a/pkg/controller/master/upgradelog/controller_test.go b/pkg/controller/master/upgradelog/controller_test.go index 985ba12d6d..1b572ef1e9 100644 --- a/pkg/controller/master/upgradelog/controller_test.go +++ b/pkg/controller/master/upgradelog/controller_test.go @@ -26,12 +26,14 @@ const ( testUpgradeName = "test-upgrade" testUpgradeLogName = "test-upgrade-upgradelog" + testUpgradeLogUID = "test-upgradelog-uid" testClusterFlowName = "test-upgrade-upgradelog-clusterflow" testClusterOutputName = "test-upgrade-upgradelog-clusteroutput" testDaemonSetName = "test-upgrade-upgradelog-fluentbit" testDeploymentName = "test-upgrade-upgradelog-log-downloader" testJobName = "test-upgrade-upgradelog-log-packager" testLoggingName = "test-upgrade-upgradelog-infra" + testLoggingUID = "test-logging-uid" testManagedChartName = "test-upgrade-upgradelog-operator" testPvcName = "test-upgrade-upgradelog-log-archive" testStatefulSetName = "test-upgrade-upgradelog-fluentd" @@ -124,8 +126,7 @@ func newTestManagedChartBuilder() *managedChartBuilder { } func newTestPvcBuilder() *pvcBuilder { - return newPvcBuilder(testPvcName). - WithLabel(util.LabelUpgradeLog, testUpgradeLogName) + return newPvcBuilder(testPvcName) } func newTestStatefulSetBuilder() *statefulSetBuilder { @@ -516,6 +517,105 @@ func TestHandler_OnStatefulSetChange(t *testing.T) { } } +func TestHandler_OnPvcChange(t *testing.T) { + type input struct { + key string + pvc *corev1.PersistentVolumeClaim + upgradeLog *harvesterv1.UpgradeLog + } + type output struct { + pvc *corev1.PersistentVolumeClaim + err error + } + var testCases = []struct { + name string + given input + expected output + }{ + { + name: "The UpgradeLog will be added as an owner of the log-archive PVC created by the fluentd StatefulSet", + given: input{ + key: testPvcName, + upgradeLog: newTestUpgradeLogBuilder().Build(), + pvc: newTestPvcBuilder(). + WithLabel(appLabelName, "fluentd"). + WithLabel(util.LabelUpgradeLogComponent, util.UpgradeLogAggregatorComponent). + WithLabel(util.LabelUpgradeLog, testUpgradeLogName). + OwnerReference(testLoggingName, testLoggingUID). + Build(), + }, + expected: output{ + pvc: newTestPvcBuilder(). + OwnerReference(testLoggingName, testLoggingUID). + OwnerReference(testUpgradeLogName, ""). + Build(), + }, + }, + { + name: "The log-archive PVC is owned by exactly one Logging and one UpgradeLog", + given: input{ + key: testPvcName, + upgradeLog: newTestUpgradeLogBuilder().Build(), + pvc: newTestPvcBuilder(). + WithLabel(appLabelName, "fluentd"). + OwnerReference(testLoggingName, testLoggingUID). + OwnerReference(testUpgradeLogName, testUpgradeLogUID). + Build(), + }, + expected: output{ + pvc: newTestPvcBuilder(). + OwnerReference(testLoggingName, testLoggingUID). + OwnerReference(testUpgradeLogName, testUpgradeLogUID). + Build(), + }, + }, + { + name: "The irrelevant PVC will be intact", + given: input{ + key: testPvcName, + upgradeLog: newTestUpgradeLogBuilder().Build(), + pvc: newTestPvcBuilder(). + WithLabel(appLabelName, "fluentd"). + OwnerReference(testLoggingName, testLoggingUID). + Build(), + }, + expected: output{ + pvc: newTestPvcBuilder(). + OwnerReference(testLoggingName, testLoggingUID). + Build(), + }, + }, + } + + for _, tc := range testCases { + var clientset = fake.NewSimpleClientset(tc.given.upgradeLog) + + var k8sclientset = k8sfake.NewSimpleClientset() + if tc.given.pvc != nil { + var err = k8sclientset.Tracker().Add(tc.given.pvc) + assert.Nil(t, err, "mock resource should add into k8s fake controller tracker") + } + + var handler = &handler{ + namespace: util.HarvesterSystemNamespaceName, + pvcClient: fakeclients.PersistentVolumeClaimClient(k8sclientset.CoreV1().PersistentVolumeClaims), + upgradeLogCache: fakeclients.UpgradeLogCache(clientset.HarvesterhciV1beta1().UpgradeLogs), + } + + var actual output + actual.pvc, actual.err = handler.OnPvcChange(tc.given.key, tc.given.pvc) + if tc.expected.err != nil { + assert.Equal(t, tc.expected.err, actual.err, tc.name) + } else { + assert.Nil(t, actual.err) + } + + if tc.expected.pvc != nil { + assert.Equal(t, tc.expected.pvc.OwnerReferences, actual.pvc.OwnerReferences, "case %q", tc.name) + } + } +} + func TestHandler_OnUpgradeChange(t *testing.T) { type input struct { key string @@ -690,7 +790,6 @@ func TestHandler_OnUpgradeLogChange(t *testing.T) { }, expected: output{ logging: prepareLogging(newTestUpgradeLogBuilder().Build(), testImages), - pvc: preparePvc(newTestUpgradeLogBuilder().Build()), upgradeLog: newTestUpgradeLogBuilder(). UpgradeLogReadyCondition(corev1.ConditionUnknown, "", ""). OperatorDeployedCondition(corev1.ConditionTrue, "", ""). @@ -859,7 +958,6 @@ func TestHandler_OnUpgradeLogChange(t *testing.T) { clusterFlow: newTestClusterFlowBuilder().Build(), clusterOutput: newTestClusterOutputBuilder().Build(), logging: newTestLoggingBuilder().Build(), - pvc: preparePvc(newTestUpgradeLogBuilder().Build()), upgradeLog: newTestUpgradeLogBuilder(). WithAnnotation(upgradeLogStateAnnotation, upgradeLogStateCollecting). UpgradeLogReadyCondition(corev1.ConditionTrue, "", ""). @@ -869,7 +967,6 @@ func TestHandler_OnUpgradeLogChange(t *testing.T) { DownloadReadyCondition(corev1.ConditionTrue, "", "").Build(), }, expected: output{ - pvc: preparePvc(newTestUpgradeLogBuilder().Build()), upgradeLog: newTestUpgradeLogBuilder(). WithAnnotation(upgradeLogStateAnnotation, upgradeLogStateStopped). UpgradeLogReadyCondition(corev1.ConditionTrue, "", ""). diff --git a/pkg/controller/master/upgradelog/register.go b/pkg/controller/master/upgradelog/register.go index 5386805310..082e918886 100644 --- a/pkg/controller/master/upgradelog/register.go +++ b/pkg/controller/master/upgradelog/register.go @@ -16,6 +16,7 @@ const ( loggingControllerName = "harvester-upgradelog-logging-controller" statefulSetControllerName = "harvester-upgradelog-statefulset-controller" managedChartControllerName = "harvester-upgradelog-managedchart-controller" + pvcControllerName = "harvester-upgradelog-pvc-controller" upgradeControllerName = "harvester-upgradelog-upgrade-controller" ) @@ -69,6 +70,7 @@ func Register(ctx context.Context, management *config.Management, options config jobController.OnChange(ctx, jobControllerName, handler.OnJobChange) statefulSetController.OnChange(ctx, statefulSetControllerName, handler.OnStatefulSetChange) managedChartController.OnChange(ctx, managedChartControllerName, handler.OnManagedChartChange) + pvcController.OnChange(ctx, pvcControllerName, handler.OnPvcChange) upgradeController.OnChange(ctx, upgradeControllerName, handler.OnUpgradeChange) return nil