Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(monitor): set monitors to nil after closing them #3388

Merged
merged 3 commits into from
Dec 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 24 additions & 19 deletions controller/monitor/disk_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
)

const (
NodeMonitorSyncPeriod = 30 * time.Second
DiskMonitorSyncPeriod = 30 * time.Second

volumeMetaData = "volume.meta"
)
Expand All @@ -35,7 +35,7 @@ type DiskServiceClient struct {
err error
}

type NodeMonitor struct {
type DiskMonitor struct {
*baseMonitor

nodeName string
Expand Down Expand Up @@ -69,11 +69,11 @@ type GetDiskConfigHandler func(longhorn.DiskType, string, string, longhorn.DiskD
type GenerateDiskConfigHandler func(longhorn.DiskType, string, string, string, string, *DiskServiceClient) (*util.DiskConfig, error)
type GetReplicaDataStoresHandler func(longhorn.DiskType, *longhorn.Node, string, string, string, string, *DiskServiceClient) (map[string]string, error)

func NewDiskMonitor(logger logrus.FieldLogger, ds *datastore.DataStore, nodeName string, syncCallback func(key string)) (*NodeMonitor, error) {
func NewDiskMonitor(logger logrus.FieldLogger, ds *datastore.DataStore, nodeName string, syncCallback func(key string)) (*DiskMonitor, error) {
ctx, quit := context.WithCancel(context.Background())

m := &NodeMonitor{
baseMonitor: newBaseMonitor(ctx, quit, logger, ds, NodeMonitorSyncPeriod),
m := &DiskMonitor{
baseMonitor: newBaseMonitor(ctx, quit, logger, ds, DiskMonitorSyncPeriod),

nodeName: nodeName,
checkVolumeMeta: true,
Expand All @@ -94,30 +94,34 @@ func NewDiskMonitor(logger logrus.FieldLogger, ds *datastore.DataStore, nodeName
return m, nil
}

func (m *NodeMonitor) Start() {
func (m *DiskMonitor) Start() {
if err := wait.PollUntilContextCancel(m.ctx, m.syncPeriod, false, func(context.Context) (bool, error) {
if err := m.run(struct{}{}); err != nil {
m.logger.WithError(err).Error("Stopped monitoring disks")
}
return false, nil
}); err != nil {
m.logger.WithError(err).Error("Failed to start node monitor")
if errors.Is(err, context.Canceled) {
m.logger.WithError(err).Warning("Disk monitor is stopped")
} else {
m.logger.WithError(err).Error("Failed to start disk monitor")
}
}
}

func (m *NodeMonitor) Stop() {
func (m *DiskMonitor) Stop() {
m.quit()
}

func (m *NodeMonitor) RunOnce() error {
func (m *DiskMonitor) RunOnce() error {
return m.run(struct{}{})
}

func (m *NodeMonitor) UpdateConfiguration(map[string]interface{}) error {
func (m *DiskMonitor) UpdateConfiguration(map[string]interface{}) error {
return nil
}

func (m *NodeMonitor) GetCollectedData() (interface{}, error) {
func (m *DiskMonitor) GetCollectedData() (interface{}, error) {
derekbit marked this conversation as resolved.
Show resolved Hide resolved
m.collectedDataLock.RLock()
defer m.collectedDataLock.RUnlock()

Expand All @@ -129,9 +133,10 @@ func (m *NodeMonitor) GetCollectedData() (interface{}, error) {
return data, nil
}

func (m *NodeMonitor) run(value interface{}) error {
func (m *DiskMonitor) run(value interface{}) error {
node, err := m.ds.GetNode(m.nodeName)
if err != nil {
logrus.WithError(err).Errorf("Failed to get longhorn node %v", m.nodeName)
return errors.Wrapf(err, "failed to get longhorn node %v", m.nodeName)
}

Expand All @@ -150,7 +155,7 @@ func (m *NodeMonitor) run(value interface{}) error {
return nil
}

func (m *NodeMonitor) getRunningInstanceManagerRO(dataEngine longhorn.DataEngineType) (*longhorn.InstanceManager, error) {
func (m *DiskMonitor) getRunningInstanceManagerRO(dataEngine longhorn.DataEngineType) (*longhorn.InstanceManager, error) {
switch dataEngine {
case longhorn.DataEngineTypeV1:
return m.ds.GetDefaultInstanceManagerByNodeRO(m.nodeName, dataEngine)
Expand All @@ -176,7 +181,7 @@ func (m *NodeMonitor) getRunningInstanceManagerRO(dataEngine longhorn.DataEngine
return nil, fmt.Errorf("unknown data engine %v", dataEngine)
}

func (m *NodeMonitor) newDiskServiceClients() map[longhorn.DataEngineType]*DiskServiceClient {
func (m *DiskMonitor) newDiskServiceClients() map[longhorn.DataEngineType]*DiskServiceClient {
clients := map[longhorn.DataEngineType]*DiskServiceClient{}

dataEngines := m.ds.GetDataEngines()
Expand All @@ -203,7 +208,7 @@ func (m *NodeMonitor) newDiskServiceClients() map[longhorn.DataEngineType]*DiskS
return clients
}

func (m *NodeMonitor) closeDiskServiceClients(clients map[longhorn.DataEngineType]*DiskServiceClient) {
func (m *DiskMonitor) closeDiskServiceClients(clients map[longhorn.DataEngineType]*DiskServiceClient) {
for _, client := range clients {
if client.c != nil {
client.c.Close()
Expand All @@ -213,7 +218,7 @@ func (m *NodeMonitor) closeDiskServiceClients(clients map[longhorn.DataEngineTyp
}

// Collect disk data and generate disk UUID blindly.
func (m *NodeMonitor) collectDiskData(node *longhorn.Node) map[string]*CollectedDiskInfo {
func (m *DiskMonitor) collectDiskData(node *longhorn.Node) map[string]*CollectedDiskInfo {
diskInfoMap := make(map[string]*CollectedDiskInfo, 0)

diskServiceClients := m.newDiskServiceClients()
Expand Down Expand Up @@ -388,7 +393,7 @@ func NewDiskInfo(diskName, diskUUID, diskPath string, diskDriver longhorn.DiskDr
return diskInfo
}

func (m *NodeMonitor) getOrphanedReplicaDataStores(diskType longhorn.DiskType, diskUUID, diskPath string, replicaDataStores map[string]string) (map[string]string, error) {
func (m *DiskMonitor) getOrphanedReplicaDataStores(diskType longhorn.DiskType, diskUUID, diskPath string, replicaDataStores map[string]string) (map[string]string, error) {
switch diskType {
case longhorn.DiskTypeFilesystem:
return m.getOrphanedReplicaDirectoryNames(diskUUID, diskPath, replicaDataStores)
Expand All @@ -399,7 +404,7 @@ func (m *NodeMonitor) getOrphanedReplicaDataStores(diskType longhorn.DiskType, d
}
}

func (m *NodeMonitor) getOrphanedReplicaLvolNames(replicaDataStores map[string]string) (map[string]string, error) {
func (m *DiskMonitor) getOrphanedReplicaLvolNames(replicaDataStores map[string]string) (map[string]string, error) {
if len(replicaDataStores) == 0 {
return map[string]string{}, nil
}
Expand All @@ -414,7 +419,7 @@ func (m *NodeMonitor) getOrphanedReplicaLvolNames(replicaDataStores map[string]s
return replicaDataStores, nil
}

func (m *NodeMonitor) getOrphanedReplicaDirectoryNames(diskUUID, diskPath string, replicaDataStores map[string]string) (map[string]string, error) {
func (m *DiskMonitor) getOrphanedReplicaDirectoryNames(diskUUID, diskPath string, replicaDataStores map[string]string) (map[string]string, error) {
if len(replicaDataStores) == 0 {
return map[string]string{}, nil
}
Expand Down
10 changes: 7 additions & 3 deletions controller/monitor/environment_check_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func NewEnvironmentCheckMonitor(logger logrus.FieldLogger, ds *datastore.DataSto
ctx, quit := context.WithCancel(context.Background())

m := &EnvironmentCheckMonitor{
baseMonitor: newBaseMonitor(ctx, quit, logger, ds, NodeMonitorSyncPeriod),
baseMonitor: newBaseMonitor(ctx, quit, logger, ds, EnvironmentCheckMonitorSyncPeriod),

nodeName: nodeName,

Expand All @@ -80,13 +80,17 @@ func NewEnvironmentCheckMonitor(logger logrus.FieldLogger, ds *datastore.DataSto
}

func (m *EnvironmentCheckMonitor) Start() {
if err := wait.PollUntilContextCancel(m.ctx, m.syncPeriod, false, func(context.Context) (bool, error) {
if err := wait.PollUntilContextCancel(m.ctx, m.syncPeriod, true, func(context.Context) (bool, error) {
if err := m.run(struct{}{}); err != nil {
m.logger.WithError(err).Error("Stopped monitoring environment check")
}
return false, nil
}); err != nil {
m.logger.WithError(err).Error("Failed to start monitoring environment check")
if errors.Is(err, context.Canceled) {
m.logger.WithError(err).Warning("Environment check monitor is stopped")
} else {
m.logger.WithError(err).Error("Failed to start environment check monitor")
}
}
}

Expand Down
6 changes: 3 additions & 3 deletions controller/monitor/fake_disk_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ const (
TestOrphanedReplicaDirectoryName = "test-volume-r-000000000"
)

func NewFakeNodeMonitor(logger logrus.FieldLogger, ds *datastore.DataStore, nodeName string, syncCallback func(key string)) (*NodeMonitor, error) {
func NewFakeDiskMonitor(logger logrus.FieldLogger, ds *datastore.DataStore, nodeName string, syncCallback func(key string)) (*DiskMonitor, error) {
ctx, quit := context.WithCancel(context.Background())

m := &NodeMonitor{
baseMonitor: newBaseMonitor(ctx, quit, logger, ds, NodeMonitorSyncPeriod),
m := &DiskMonitor{
baseMonitor: newBaseMonitor(ctx, quit, logger, ds, DiskMonitorSyncPeriod),

nodeName: nodeName,
checkVolumeMeta: false,
Expand Down
106 changes: 106 additions & 0 deletions controller/monitor/fake_environment_check_monitor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package monitor

import (
"context"
"reflect"
"sync"

"github.com/jinzhu/copier"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/util/wait"

"github.com/longhorn/longhorn-manager/datastore"

longhorn "github.com/longhorn/longhorn-manager/k8s/pkg/apis/longhorn/v1beta2"
)

type FakeEnvironmentCheckMonitor struct {
*baseMonitor

nodeName string

collectedDataLock sync.RWMutex
collectedData *CollectedEnvironmentCheckInfo

syncCallback func(key string)
}

func NewFakeEnvironmentCheckMonitor(logger logrus.FieldLogger, ds *datastore.DataStore, nodeName string, syncCallback func(key string)) (*FakeEnvironmentCheckMonitor, error) {
ctx, quit := context.WithCancel(context.Background())

m := &FakeEnvironmentCheckMonitor{
baseMonitor: newBaseMonitor(ctx, quit, logger, ds, EnvironmentCheckMonitorSyncPeriod),

nodeName: nodeName,

collectedDataLock: sync.RWMutex{},
collectedData: &CollectedEnvironmentCheckInfo{},

syncCallback: syncCallback,
}

return m, nil
}

func (m *FakeEnvironmentCheckMonitor) Start() {
if err := wait.PollUntilContextCancel(m.ctx, m.syncPeriod, true, func(context.Context) (bool, error) {
if err := m.run(struct{}{}); err != nil {
m.logger.WithError(err).Error("Stopped monitoring environment check")
}
return false, nil
}); err != nil {
if errors.Is(err, context.Canceled) {
m.logger.WithError(err).Warning("Environment check monitor is stopped")
} else {
m.logger.WithError(err).Error("Failed to start environment check monitor")
}
}
}

func (m *FakeEnvironmentCheckMonitor) Stop() {
m.quit()
}

func (m *FakeEnvironmentCheckMonitor) RunOnce() error {
return m.run(struct{}{})
}

func (m *FakeEnvironmentCheckMonitor) UpdateConfiguration(map[string]interface{}) error {
return nil
}

func (m *FakeEnvironmentCheckMonitor) GetCollectedData() (interface{}, error) {
m.collectedDataLock.RLock()
defer m.collectedDataLock.RUnlock()

data := []longhorn.Condition{}
if err := copier.CopyWithOption(&data, &m.collectedData.conditions, copier.Option{IgnoreEmpty: true, DeepCopy: true}); err != nil {
return data, errors.Wrap(err, "failed to copy collected data")
}

return data, nil
}

func (m *FakeEnvironmentCheckMonitor) run(value interface{}) error {
node, err := m.ds.GetNode(m.nodeName)
if err != nil {
return errors.Wrapf(err, "failed to get longhorn node %v", m.nodeName)
}

collectedData := &CollectedEnvironmentCheckInfo{
conditions: []longhorn.Condition{},
}
if !reflect.DeepEqual(m.collectedData, collectedData) {
func() {
m.collectedDataLock.Lock()
defer m.collectedDataLock.Unlock()
m.collectedData = collectedData
}()

key := node.Namespace + "/" + m.nodeName
m.syncCallback(key)
}

return nil
}
3 changes: 3 additions & 0 deletions controller/node_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,14 +378,17 @@ func (nc *NodeController) syncNode(key string) (err error) {

if nc.diskMonitor != nil {
nc.diskMonitor.Stop()
nc.diskMonitor = nil
}

if nc.environmentCheckMonitor != nil {
nc.environmentCheckMonitor.Stop()
nc.environmentCheckMonitor = nil
}

if nc.snapshotMonitor != nil {
nc.snapshotMonitor.Stop()
nc.snapshotMonitor = nil
}

return nc.ds.RemoveFinalizerForNode(node)
Expand Down
Loading
Loading