diff --git a/controller/monitor/disk_monitor.go b/controller/monitor/disk_monitor.go index 50e48171d6..aed2946407 100644 --- a/controller/monitor/disk_monitor.go +++ b/controller/monitor/disk_monitor.go @@ -25,7 +25,7 @@ import ( ) const ( - NodeMonitorSyncPeriod = 30 * time.Second + DiskMonitorSyncPeriod = 30 * time.Second volumeMetaData = "volume.meta" ) @@ -35,7 +35,7 @@ type DiskServiceClient struct { err error } -type NodeMonitor struct { +type DiskMonitor struct { *baseMonitor nodeName string @@ -69,11 +69,11 @@ type GetDiskConfigHandler func(longhorn.DiskType, string, string, longhorn.DiskD type GenerateDiskConfigHandler func(longhorn.DiskType, string, string, string, string, *DiskServiceClient) (*util.DiskConfig, error) type GetReplicaDataStoresHandler func(longhorn.DiskType, *longhorn.Node, string, string, string, string, *DiskServiceClient) (map[string]string, error) -func NewDiskMonitor(logger logrus.FieldLogger, ds *datastore.DataStore, nodeName string, syncCallback func(key string)) (*NodeMonitor, error) { +func NewDiskMonitor(logger logrus.FieldLogger, ds *datastore.DataStore, nodeName string, syncCallback func(key string)) (*DiskMonitor, error) { ctx, quit := context.WithCancel(context.Background()) - m := &NodeMonitor{ - baseMonitor: newBaseMonitor(ctx, quit, logger, ds, NodeMonitorSyncPeriod), + m := &DiskMonitor{ + baseMonitor: newBaseMonitor(ctx, quit, logger, ds, DiskMonitorSyncPeriod), nodeName: nodeName, checkVolumeMeta: true, @@ -94,7 +94,7 @@ func NewDiskMonitor(logger logrus.FieldLogger, ds *datastore.DataStore, nodeName return m, nil } -func (m *NodeMonitor) Start() { +func (m *DiskMonitor) Start() { if err := wait.PollUntilContextCancel(m.ctx, m.syncPeriod, false, func(context.Context) (bool, error) { if err := m.run(struct{}{}); err != nil { m.logger.WithError(err).Error("Stopped monitoring disks") @@ -109,19 +109,19 @@ func (m *NodeMonitor) Start() { } } -func (m *NodeMonitor) Stop() { +func (m *DiskMonitor) Stop() { m.quit() } -func (m *NodeMonitor) RunOnce() error { +func (m *DiskMonitor) RunOnce() error { return m.run(struct{}{}) } -func (m *NodeMonitor) UpdateConfiguration(map[string]interface{}) error { +func (m *DiskMonitor) UpdateConfiguration(map[string]interface{}) error { return nil } -func (m *NodeMonitor) GetCollectedData() (interface{}, error) { +func (m *DiskMonitor) GetCollectedData() (interface{}, error) { m.collectedDataLock.RLock() defer m.collectedDataLock.RUnlock() @@ -133,7 +133,7 @@ func (m *NodeMonitor) GetCollectedData() (interface{}, error) { return data, nil } -func (m *NodeMonitor) run(value interface{}) error { +func (m *DiskMonitor) run(value interface{}) error { node, err := m.ds.GetNode(m.nodeName) if err != nil { logrus.WithError(err).Errorf("Failed to get longhorn node %v", m.nodeName) @@ -155,7 +155,7 @@ func (m *NodeMonitor) run(value interface{}) error { return nil } -func (m *NodeMonitor) getRunningInstanceManagerRO(dataEngine longhorn.DataEngineType) (*longhorn.InstanceManager, error) { +func (m *DiskMonitor) getRunningInstanceManagerRO(dataEngine longhorn.DataEngineType) (*longhorn.InstanceManager, error) { switch dataEngine { case longhorn.DataEngineTypeV1: return m.ds.GetDefaultInstanceManagerByNodeRO(m.nodeName, dataEngine) @@ -181,7 +181,7 @@ func (m *NodeMonitor) getRunningInstanceManagerRO(dataEngine longhorn.DataEngine return nil, fmt.Errorf("unknown data engine %v", dataEngine) } -func (m *NodeMonitor) newDiskServiceClients() map[longhorn.DataEngineType]*DiskServiceClient { +func (m *DiskMonitor) newDiskServiceClients() map[longhorn.DataEngineType]*DiskServiceClient { clients := map[longhorn.DataEngineType]*DiskServiceClient{} dataEngines := m.ds.GetDataEngines() @@ -208,7 +208,7 @@ func (m *NodeMonitor) newDiskServiceClients() map[longhorn.DataEngineType]*DiskS return clients } -func (m *NodeMonitor) closeDiskServiceClients(clients map[longhorn.DataEngineType]*DiskServiceClient) { +func (m *DiskMonitor) closeDiskServiceClients(clients map[longhorn.DataEngineType]*DiskServiceClient) { for _, client := range clients { if client.c != nil { client.c.Close() @@ -218,7 +218,7 @@ func (m *NodeMonitor) closeDiskServiceClients(clients map[longhorn.DataEngineTyp } // Collect disk data and generate disk UUID blindly. -func (m *NodeMonitor) collectDiskData(node *longhorn.Node) map[string]*CollectedDiskInfo { +func (m *DiskMonitor) collectDiskData(node *longhorn.Node) map[string]*CollectedDiskInfo { diskInfoMap := make(map[string]*CollectedDiskInfo, 0) diskServiceClients := m.newDiskServiceClients() @@ -393,7 +393,7 @@ func NewDiskInfo(diskName, diskUUID, diskPath string, diskDriver longhorn.DiskDr return diskInfo } -func (m *NodeMonitor) getOrphanedReplicaDataStores(diskType longhorn.DiskType, diskUUID, diskPath string, replicaDataStores map[string]string) (map[string]string, error) { +func (m *DiskMonitor) getOrphanedReplicaDataStores(diskType longhorn.DiskType, diskUUID, diskPath string, replicaDataStores map[string]string) (map[string]string, error) { switch diskType { case longhorn.DiskTypeFilesystem: return m.getOrphanedReplicaDirectoryNames(diskUUID, diskPath, replicaDataStores) @@ -404,7 +404,7 @@ func (m *NodeMonitor) getOrphanedReplicaDataStores(diskType longhorn.DiskType, d } } -func (m *NodeMonitor) getOrphanedReplicaLvolNames(replicaDataStores map[string]string) (map[string]string, error) { +func (m *DiskMonitor) getOrphanedReplicaLvolNames(replicaDataStores map[string]string) (map[string]string, error) { if len(replicaDataStores) == 0 { return map[string]string{}, nil } @@ -419,7 +419,7 @@ func (m *NodeMonitor) getOrphanedReplicaLvolNames(replicaDataStores map[string]s return replicaDataStores, nil } -func (m *NodeMonitor) getOrphanedReplicaDirectoryNames(diskUUID, diskPath string, replicaDataStores map[string]string) (map[string]string, error) { +func (m *DiskMonitor) getOrphanedReplicaDirectoryNames(diskUUID, diskPath string, replicaDataStores map[string]string) (map[string]string, error) { if len(replicaDataStores) == 0 { return map[string]string{}, nil } diff --git a/controller/monitor/environment_check_monitor.go b/controller/monitor/environment_check_monitor.go index f93715c83c..6c906628a6 100644 --- a/controller/monitor/environment_check_monitor.go +++ b/controller/monitor/environment_check_monitor.go @@ -64,7 +64,7 @@ func NewEnvironmentCheckMonitor(logger logrus.FieldLogger, ds *datastore.DataSto ctx, quit := context.WithCancel(context.Background()) m := &EnvironmentCheckMonitor{ - baseMonitor: newBaseMonitor(ctx, quit, logger, ds, NodeMonitorSyncPeriod), + baseMonitor: newBaseMonitor(ctx, quit, logger, ds, DiskMonitorSyncPeriod), nodeName: nodeName, diff --git a/controller/monitor/fake_disk_monitor.go b/controller/monitor/fake_disk_monitor.go index 2fd1062485..6f55c51c03 100644 --- a/controller/monitor/fake_disk_monitor.go +++ b/controller/monitor/fake_disk_monitor.go @@ -21,11 +21,11 @@ const ( TestOrphanedReplicaDirectoryName = "test-volume-r-000000000" ) -func NewFakeNodeMonitor(logger logrus.FieldLogger, ds *datastore.DataStore, nodeName string, syncCallback func(key string)) (*NodeMonitor, error) { +func NewFakeDiskMonitor(logger logrus.FieldLogger, ds *datastore.DataStore, nodeName string, syncCallback func(key string)) (*DiskMonitor, error) { ctx, quit := context.WithCancel(context.Background()) - m := &NodeMonitor{ - baseMonitor: newBaseMonitor(ctx, quit, logger, ds, NodeMonitorSyncPeriod), + m := &DiskMonitor{ + baseMonitor: newBaseMonitor(ctx, quit, logger, ds, DiskMonitorSyncPeriod), nodeName: nodeName, checkVolumeMeta: false, diff --git a/controller/monitor/fake_environment_check_monitor.go b/controller/monitor/fake_environment_check_monitor.go new file mode 100644 index 0000000000..be7dc72a94 --- /dev/null +++ b/controller/monitor/fake_environment_check_monitor.go @@ -0,0 +1,106 @@ +package monitor + +import ( + "context" + "reflect" + "sync" + + "github.com/jinzhu/copier" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + "k8s.io/apimachinery/pkg/util/wait" + + "github.com/longhorn/longhorn-manager/datastore" + + longhorn "github.com/longhorn/longhorn-manager/k8s/pkg/apis/longhorn/v1beta2" +) + +type FakeEnvironmentCheckMonitor struct { + *baseMonitor + + nodeName string + + collectedDataLock sync.RWMutex + collectedData *CollectedEnvironmentCheckInfo + + syncCallback func(key string) +} + +func NewFakeEnvironmentCheckMonitor(logger logrus.FieldLogger, ds *datastore.DataStore, nodeName string, syncCallback func(key string)) (*FakeEnvironmentCheckMonitor, error) { + ctx, quit := context.WithCancel(context.Background()) + + m := &FakeEnvironmentCheckMonitor{ + baseMonitor: newBaseMonitor(ctx, quit, logger, ds, DiskMonitorSyncPeriod), + + nodeName: nodeName, + + collectedDataLock: sync.RWMutex{}, + collectedData: &CollectedEnvironmentCheckInfo{}, + + syncCallback: syncCallback, + } + + return m, nil +} + +func (m *FakeEnvironmentCheckMonitor) Start() { + if err := wait.PollUntilContextCancel(m.ctx, m.syncPeriod, true, func(context.Context) (bool, error) { + if err := m.run(struct{}{}); err != nil { + m.logger.WithError(err).Error("Stopped monitoring environment check") + } + return false, nil + }); err != nil { + if errors.Is(err, context.Canceled) { + m.logger.WithError(err).Warning("Environment check monitor is stopped") + } else { + m.logger.WithError(err).Error("Failed to start environment check monitor") + } + } +} + +func (m *FakeEnvironmentCheckMonitor) Stop() { + m.quit() +} + +func (m *FakeEnvironmentCheckMonitor) RunOnce() error { + return m.run(struct{}{}) +} + +func (m *FakeEnvironmentCheckMonitor) UpdateConfiguration(map[string]interface{}) error { + return nil +} + +func (m *FakeEnvironmentCheckMonitor) GetCollectedData() (interface{}, error) { + m.collectedDataLock.RLock() + defer m.collectedDataLock.RUnlock() + + data := []longhorn.Condition{} + if err := copier.CopyWithOption(&data, &m.collectedData.conditions, copier.Option{IgnoreEmpty: true, DeepCopy: true}); err != nil { + return data, errors.Wrap(err, "failed to copy collected data") + } + + return data, nil +} + +func (m *FakeEnvironmentCheckMonitor) run(value interface{}) error { + node, err := m.ds.GetNode(m.nodeName) + if err != nil { + return errors.Wrapf(err, "failed to get longhorn node %v", m.nodeName) + } + + collectedData := &CollectedEnvironmentCheckInfo{ + conditions: []longhorn.Condition{}, + } + if !reflect.DeepEqual(m.collectedData, collectedData) { + func() { + m.collectedDataLock.Lock() + defer m.collectedDataLock.Unlock() + m.collectedData = collectedData + }() + + key := node.Namespace + "/" + m.nodeName + m.syncCallback(key) + } + + return nil +} diff --git a/controller/node_controller.go b/controller/node_controller.go index 202b3ea225..047ff17728 100644 --- a/controller/node_controller.go +++ b/controller/node_controller.go @@ -378,14 +378,17 @@ func (nc *NodeController) syncNode(key string) (err error) { if nc.diskMonitor != nil { nc.diskMonitor.Stop() + nc.diskMonitor = nil } if nc.environmentCheckMonitor != nil { nc.environmentCheckMonitor.Stop() + nc.environmentCheckMonitor = nil } if nc.snapshotMonitor != nil { nc.snapshotMonitor.Stop() + nc.snapshotMonitor = nil } return nc.ds.RemoveFinalizerForNode(node) diff --git a/controller/node_controller_test.go b/controller/node_controller_test.go index b27cab5bd1..9db6f06334 100644 --- a/controller/node_controller_test.go +++ b/controller/node_controller_test.go @@ -9,16 +9,17 @@ import ( "github.com/sirupsen/logrus" . "gopkg.in/check.v1" + "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/record" + "k8s.io/kubernetes/pkg/controller" + corev1 "k8s.io/api/core/v1" apiextensionsfake "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/fake" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" clientset "k8s.io/client-go/kubernetes" fake "k8s.io/client-go/kubernetes/fake" - "k8s.io/client-go/kubernetes/scheme" - "k8s.io/client-go/tools/cache" - "k8s.io/client-go/tools/record" - "k8s.io/kubernetes/pkg/controller" "github.com/longhorn/longhorn-manager/datastore" "github.com/longhorn/longhorn-manager/types" @@ -207,6 +208,8 @@ func (s *NodeControllerSuite) TestManagerPodUp(c *C) { if s.controller.controllerID == node.Name { err = s.controller.diskMonitor.RunOnce() c.Assert(err, IsNil) + err = s.controller.environmentCheckMonitor.RunOnce() + c.Assert(err, IsNil) } err = s.controller.syncNode(getKey(node, c)) @@ -291,6 +294,8 @@ func (s *NodeControllerSuite) TestManagerPodDown(c *C) { if s.controller.controllerID == node.Name { err = s.controller.diskMonitor.RunOnce() c.Assert(err, IsNil) + err = s.controller.environmentCheckMonitor.RunOnce() + c.Assert(err, IsNil) } err = s.controller.syncNode(getKey(node, c)) @@ -375,6 +380,8 @@ func (s *NodeControllerSuite) TestKubeNodeDown(c *C) { if s.controller.controllerID == node.Name { err = s.controller.diskMonitor.RunOnce() c.Assert(err, IsNil) + err = s.controller.environmentCheckMonitor.RunOnce() + c.Assert(err, IsNil) } err = s.controller.syncNode(getKey(node, c)) @@ -459,6 +466,8 @@ func (s *NodeControllerSuite) TestKubeNodePressure(c *C) { if s.controller.controllerID == node.Name { err = s.controller.diskMonitor.RunOnce() c.Assert(err, IsNil) + err = s.controller.environmentCheckMonitor.RunOnce() + c.Assert(err, IsNil) } err = s.controller.syncNode(getKey(node, c)) @@ -611,6 +620,8 @@ func (s *NodeControllerSuite) TestUpdateDiskStatus(c *C) { if s.controller.controllerID == node.Name { err = s.controller.diskMonitor.RunOnce() c.Assert(err, IsNil) + err = s.controller.environmentCheckMonitor.RunOnce() + c.Assert(err, IsNil) } err = s.controller.syncNode(getKey(node, c)) @@ -756,6 +767,8 @@ func (s *NodeControllerSuite) TestCleanDiskStatus(c *C) { if s.controller.controllerID == node.Name { err = s.controller.diskMonitor.RunOnce() c.Assert(err, IsNil) + err = s.controller.environmentCheckMonitor.RunOnce() + c.Assert(err, IsNil) } err = s.controller.syncNode(getKey(node, c)) @@ -911,6 +924,8 @@ func (s *NodeControllerSuite) TestDisableDiskOnFilesystemChange(c *C) { if s.controller.controllerID == node.Name { err = s.controller.diskMonitor.RunOnce() c.Assert(err, IsNil) + err = s.controller.environmentCheckMonitor.RunOnce() + c.Assert(err, IsNil) } err = s.controller.syncNode(getKey(node, c)) @@ -1031,6 +1046,8 @@ func (s *NodeControllerSuite) TestCreateDefaultInstanceManager(c *C) { if s.controller.controllerID == node.Name { err = s.controller.diskMonitor.RunOnce() c.Assert(err, IsNil) + err = s.controller.environmentCheckMonitor.RunOnce() + c.Assert(err, IsNil) } err = s.controller.syncNode(getKey(node, c)) @@ -1175,6 +1192,8 @@ func (s *NodeControllerSuite) TestCleanupRedundantInstanceManagers(c *C) { if s.controller.controllerID == node.Name { err = s.controller.diskMonitor.RunOnce() c.Assert(err, IsNil) + err = s.controller.environmentCheckMonitor.RunOnce() + c.Assert(err, IsNil) } err = s.controller.syncNode(getKey(node, c)) @@ -1267,6 +1286,8 @@ func (s *NodeControllerSuite) TestCleanupAllInstanceManagers(c *C) { if s.controller.controllerID == node.Name { err = s.controller.diskMonitor.RunOnce() c.Assert(err, IsNil) + err = s.controller.environmentCheckMonitor.RunOnce() + c.Assert(err, IsNil) } err = s.controller.syncNode(getKey(node, c)) @@ -1359,6 +1380,8 @@ func (s *NodeControllerSuite) TestEventOnNotReady(c *C) { if s.controller.controllerID == node.Name { err = s.controller.diskMonitor.RunOnce() c.Assert(err, IsNil) + err = s.controller.environmentCheckMonitor.RunOnce() + c.Assert(err, IsNil) } err = s.controller.syncNode(getKey(node, c)) @@ -1446,6 +1469,8 @@ func (s *NodeControllerSuite) TestEventOnDiskPressure(c *C) { if s.controller.controllerID == node.Name { err = s.controller.diskMonitor.RunOnce() c.Assert(err, IsNil) + err = s.controller.environmentCheckMonitor.RunOnce() + c.Assert(err, IsNil) } err = s.controller.syncNode(getKey(node, c)) @@ -1533,6 +1558,8 @@ func (s *NodeControllerSuite) TestEventOnMemoryPressure(c *C) { if s.controller.controllerID == node.Name { err = s.controller.diskMonitor.RunOnce() c.Assert(err, IsNil) + err = s.controller.environmentCheckMonitor.RunOnce() + c.Assert(err, IsNil) } err = s.controller.syncNode(getKey(node, c)) @@ -1620,6 +1647,8 @@ func (s *NodeControllerSuite) TestEventOnPidPressure(c *C) { if s.controller.controllerID == node.Name { err = s.controller.diskMonitor.RunOnce() c.Assert(err, IsNil) + err = s.controller.environmentCheckMonitor.RunOnce() + c.Assert(err, IsNil) } err = s.controller.syncNode(getKey(node, c)) @@ -1707,6 +1736,8 @@ func (s *NodeControllerSuite) TestEventOnNetworkPressure(c *C) { if s.controller.controllerID == node.Name { err = s.controller.diskMonitor.RunOnce() c.Assert(err, IsNil) + err = s.controller.environmentCheckMonitor.RunOnce() + c.Assert(err, IsNil) } err = s.controller.syncNode(getKey(node, c)) @@ -1800,6 +1831,8 @@ func (s *NodeControllerSuite) TestNoEventOnUnknownTrueNodeCondition(c *C) { if s.controller.controllerID == node.Name { err = s.controller.diskMonitor.RunOnce() c.Assert(err, IsNil) + err = s.controller.environmentCheckMonitor.RunOnce() + c.Assert(err, IsNil) } err = s.controller.syncNode(getKey(node, c)) @@ -2095,6 +2128,8 @@ CONFIG_NFS_V4_2=y` if s.controller.controllerID == node.Name { err = s.controller.diskMonitor.RunOnce() c.Assert(err, IsNil) + err = s.controller.environmentCheckMonitor.RunOnce() + c.Assert(err, IsNil) } err = s.controller.syncNode(getKey(node, c)) @@ -2296,11 +2331,17 @@ func newTestNodeController(lhClient *lhfake.Clientset, kubeClient *fake.Clientse enqueueNodeForMonitor := func(key string) { nc.queue.Add(key) } - mon, err := monitor.NewFakeNodeMonitor(nc.logger, nc.ds, controllerID, enqueueNodeForMonitor) + diskMonitor, err := monitor.NewFakeDiskMonitor(nc.logger, nc.ds, controllerID, enqueueNodeForMonitor) + if err != nil { + return nil, err + } + nc.diskMonitor = diskMonitor + + environmentCheckMonitor, err := monitor.NewFakeEnvironmentCheckMonitor(nc.logger, nc.ds, controllerID, enqueueNodeForMonitor) if err != nil { return nil, err } - nc.diskMonitor = mon + nc.environmentCheckMonitor = environmentCheckMonitor for index := range nc.cacheSyncs { nc.cacheSyncs[index] = alwaysReady