Skip to content

Commit

Permalink
fix(monitor): set monitors to nil after closing them
Browse files Browse the repository at this point in the history
Set monitors to nil after closing them, so the controller can recreate
them when the node is back.

Longhorn 10035

Signed-off-by: Derek Su <[email protected]>
  • Loading branch information
derekbit committed Dec 23, 2024
1 parent f22c93c commit fb73905
Show file tree
Hide file tree
Showing 6 changed files with 178 additions and 28 deletions.
36 changes: 18 additions & 18 deletions controller/monitor/disk_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
)

const (
NodeMonitorSyncPeriod = 30 * time.Second
DiskMonitorSyncPeriod = 30 * time.Second

volumeMetaData = "volume.meta"
)
Expand All @@ -35,7 +35,7 @@ type DiskServiceClient struct {
err error
}

type NodeMonitor struct {
type DiskMonitor struct {
*baseMonitor

nodeName string
Expand Down Expand Up @@ -69,11 +69,11 @@ type GetDiskConfigHandler func(longhorn.DiskType, string, string, longhorn.DiskD
type GenerateDiskConfigHandler func(longhorn.DiskType, string, string, string, string, *DiskServiceClient) (*util.DiskConfig, error)
type GetReplicaDataStoresHandler func(longhorn.DiskType, *longhorn.Node, string, string, string, string, *DiskServiceClient) (map[string]string, error)

func NewDiskMonitor(logger logrus.FieldLogger, ds *datastore.DataStore, nodeName string, syncCallback func(key string)) (*NodeMonitor, error) {
func NewDiskMonitor(logger logrus.FieldLogger, ds *datastore.DataStore, nodeName string, syncCallback func(key string)) (*DiskMonitor, error) {
ctx, quit := context.WithCancel(context.Background())

m := &NodeMonitor{
baseMonitor: newBaseMonitor(ctx, quit, logger, ds, NodeMonitorSyncPeriod),
m := &DiskMonitor{
baseMonitor: newBaseMonitor(ctx, quit, logger, ds, DiskMonitorSyncPeriod),

nodeName: nodeName,
checkVolumeMeta: true,
Expand All @@ -94,7 +94,7 @@ func NewDiskMonitor(logger logrus.FieldLogger, ds *datastore.DataStore, nodeName
return m, nil
}

func (m *NodeMonitor) Start() {
func (m *DiskMonitor) Start() {
if err := wait.PollUntilContextCancel(m.ctx, m.syncPeriod, false, func(context.Context) (bool, error) {
if err := m.run(struct{}{}); err != nil {
m.logger.WithError(err).Error("Stopped monitoring disks")
Expand All @@ -109,19 +109,19 @@ func (m *NodeMonitor) Start() {
}
}

func (m *NodeMonitor) Stop() {
func (m *DiskMonitor) Stop() {
m.quit()
}

func (m *NodeMonitor) RunOnce() error {
func (m *DiskMonitor) RunOnce() error {
return m.run(struct{}{})
}

func (m *NodeMonitor) UpdateConfiguration(map[string]interface{}) error {
func (m *DiskMonitor) UpdateConfiguration(map[string]interface{}) error {
return nil
}

func (m *NodeMonitor) GetCollectedData() (interface{}, error) {
func (m *DiskMonitor) GetCollectedData() (interface{}, error) {
m.collectedDataLock.RLock()
defer m.collectedDataLock.RUnlock()

Expand All @@ -133,7 +133,7 @@ func (m *NodeMonitor) GetCollectedData() (interface{}, error) {
return data, nil
}

func (m *NodeMonitor) run(value interface{}) error {
func (m *DiskMonitor) run(value interface{}) error {
node, err := m.ds.GetNode(m.nodeName)
if err != nil {
logrus.WithError(err).Errorf("Failed to get longhorn node %v", m.nodeName)
Expand All @@ -155,7 +155,7 @@ func (m *NodeMonitor) run(value interface{}) error {
return nil
}

func (m *NodeMonitor) getRunningInstanceManagerRO(dataEngine longhorn.DataEngineType) (*longhorn.InstanceManager, error) {
func (m *DiskMonitor) getRunningInstanceManagerRO(dataEngine longhorn.DataEngineType) (*longhorn.InstanceManager, error) {
switch dataEngine {
case longhorn.DataEngineTypeV1:
return m.ds.GetDefaultInstanceManagerByNodeRO(m.nodeName, dataEngine)
Expand All @@ -181,7 +181,7 @@ func (m *NodeMonitor) getRunningInstanceManagerRO(dataEngine longhorn.DataEngine
return nil, fmt.Errorf("unknown data engine %v", dataEngine)
}

func (m *NodeMonitor) newDiskServiceClients() map[longhorn.DataEngineType]*DiskServiceClient {
func (m *DiskMonitor) newDiskServiceClients() map[longhorn.DataEngineType]*DiskServiceClient {
clients := map[longhorn.DataEngineType]*DiskServiceClient{}

dataEngines := m.ds.GetDataEngines()
Expand All @@ -208,7 +208,7 @@ func (m *NodeMonitor) newDiskServiceClients() map[longhorn.DataEngineType]*DiskS
return clients
}

func (m *NodeMonitor) closeDiskServiceClients(clients map[longhorn.DataEngineType]*DiskServiceClient) {
func (m *DiskMonitor) closeDiskServiceClients(clients map[longhorn.DataEngineType]*DiskServiceClient) {
for _, client := range clients {
if client.c != nil {
client.c.Close()
Expand All @@ -218,7 +218,7 @@ func (m *NodeMonitor) closeDiskServiceClients(clients map[longhorn.DataEngineTyp
}

// Collect disk data and generate disk UUID blindly.
func (m *NodeMonitor) collectDiskData(node *longhorn.Node) map[string]*CollectedDiskInfo {
func (m *DiskMonitor) collectDiskData(node *longhorn.Node) map[string]*CollectedDiskInfo {
diskInfoMap := make(map[string]*CollectedDiskInfo, 0)

diskServiceClients := m.newDiskServiceClients()
Expand Down Expand Up @@ -393,7 +393,7 @@ func NewDiskInfo(diskName, diskUUID, diskPath string, diskDriver longhorn.DiskDr
return diskInfo
}

func (m *NodeMonitor) getOrphanedReplicaDataStores(diskType longhorn.DiskType, diskUUID, diskPath string, replicaDataStores map[string]string) (map[string]string, error) {
func (m *DiskMonitor) getOrphanedReplicaDataStores(diskType longhorn.DiskType, diskUUID, diskPath string, replicaDataStores map[string]string) (map[string]string, error) {
switch diskType {
case longhorn.DiskTypeFilesystem:
return m.getOrphanedReplicaDirectoryNames(diskUUID, diskPath, replicaDataStores)
Expand All @@ -404,7 +404,7 @@ func (m *NodeMonitor) getOrphanedReplicaDataStores(diskType longhorn.DiskType, d
}
}

func (m *NodeMonitor) getOrphanedReplicaLvolNames(replicaDataStores map[string]string) (map[string]string, error) {
func (m *DiskMonitor) getOrphanedReplicaLvolNames(replicaDataStores map[string]string) (map[string]string, error) {
if len(replicaDataStores) == 0 {
return map[string]string{}, nil
}
Expand All @@ -419,7 +419,7 @@ func (m *NodeMonitor) getOrphanedReplicaLvolNames(replicaDataStores map[string]s
return replicaDataStores, nil
}

func (m *NodeMonitor) getOrphanedReplicaDirectoryNames(diskUUID, diskPath string, replicaDataStores map[string]string) (map[string]string, error) {
func (m *DiskMonitor) getOrphanedReplicaDirectoryNames(diskUUID, diskPath string, replicaDataStores map[string]string) (map[string]string, error) {
if len(replicaDataStores) == 0 {
return map[string]string{}, nil
}
Expand Down
2 changes: 1 addition & 1 deletion controller/monitor/environment_check_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func NewEnvironmentCheckMonitor(logger logrus.FieldLogger, ds *datastore.DataSto
ctx, quit := context.WithCancel(context.Background())

m := &EnvironmentCheckMonitor{
baseMonitor: newBaseMonitor(ctx, quit, logger, ds, NodeMonitorSyncPeriod),
baseMonitor: newBaseMonitor(ctx, quit, logger, ds, DiskMonitorSyncPeriod),

nodeName: nodeName,

Expand Down
6 changes: 3 additions & 3 deletions controller/monitor/fake_disk_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ const (
TestOrphanedReplicaDirectoryName = "test-volume-r-000000000"
)

func NewFakeNodeMonitor(logger logrus.FieldLogger, ds *datastore.DataStore, nodeName string, syncCallback func(key string)) (*NodeMonitor, error) {
func NewFakeDiskMonitor(logger logrus.FieldLogger, ds *datastore.DataStore, nodeName string, syncCallback func(key string)) (*DiskMonitor, error) {
ctx, quit := context.WithCancel(context.Background())

m := &NodeMonitor{
baseMonitor: newBaseMonitor(ctx, quit, logger, ds, NodeMonitorSyncPeriod),
m := &DiskMonitor{
baseMonitor: newBaseMonitor(ctx, quit, logger, ds, DiskMonitorSyncPeriod),

nodeName: nodeName,
checkVolumeMeta: false,
Expand Down
106 changes: 106 additions & 0 deletions controller/monitor/fake_environment_check_monitor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package monitor

import (
"context"
"reflect"
"sync"

"github.com/jinzhu/copier"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/util/wait"

"github.com/longhorn/longhorn-manager/datastore"

longhorn "github.com/longhorn/longhorn-manager/k8s/pkg/apis/longhorn/v1beta2"
)

type FakeEnvironmentCheckMonitor struct {
*baseMonitor

nodeName string

collectedDataLock sync.RWMutex
collectedData *CollectedEnvironmentCheckInfo

syncCallback func(key string)
}

func NewFakeEnvironmentCheckMonitor(logger logrus.FieldLogger, ds *datastore.DataStore, nodeName string, syncCallback func(key string)) (*FakeEnvironmentCheckMonitor, error) {
ctx, quit := context.WithCancel(context.Background())

m := &FakeEnvironmentCheckMonitor{
baseMonitor: newBaseMonitor(ctx, quit, logger, ds, DiskMonitorSyncPeriod),

nodeName: nodeName,

collectedDataLock: sync.RWMutex{},
collectedData: &CollectedEnvironmentCheckInfo{},

syncCallback: syncCallback,
}

return m, nil
}

func (m *FakeEnvironmentCheckMonitor) Start() {
if err := wait.PollUntilContextCancel(m.ctx, m.syncPeriod, true, func(context.Context) (bool, error) {
if err := m.run(struct{}{}); err != nil {
m.logger.WithError(err).Error("Stopped monitoring environment check")
}
return false, nil
}); err != nil {
if errors.Is(err, context.Canceled) {
m.logger.WithError(err).Warning("Environment check monitor is stopped")
} else {
m.logger.WithError(err).Error("Failed to start environment check monitor")
}
}
}

func (m *FakeEnvironmentCheckMonitor) Stop() {
m.quit()
}

func (m *FakeEnvironmentCheckMonitor) RunOnce() error {
return m.run(struct{}{})
}

func (m *FakeEnvironmentCheckMonitor) UpdateConfiguration(map[string]interface{}) error {
return nil
}

func (m *FakeEnvironmentCheckMonitor) GetCollectedData() (interface{}, error) {
m.collectedDataLock.RLock()
defer m.collectedDataLock.RUnlock()

data := []longhorn.Condition{}
if err := copier.CopyWithOption(&data, &m.collectedData.conditions, copier.Option{IgnoreEmpty: true, DeepCopy: true}); err != nil {
return data, errors.Wrap(err, "failed to copy collected data")
}

return data, nil
}

func (m *FakeEnvironmentCheckMonitor) run(value interface{}) error {
node, err := m.ds.GetNode(m.nodeName)
if err != nil {
return errors.Wrapf(err, "failed to get longhorn node %v", m.nodeName)
}

collectedData := &CollectedEnvironmentCheckInfo{
conditions: []longhorn.Condition{},
}
if !reflect.DeepEqual(m.collectedData, collectedData) {
func() {
m.collectedDataLock.Lock()
defer m.collectedDataLock.Unlock()
m.collectedData = collectedData
}()

key := node.Namespace + "/" + m.nodeName
m.syncCallback(key)
}

return nil
}
3 changes: 3 additions & 0 deletions controller/node_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,14 +378,17 @@ func (nc *NodeController) syncNode(key string) (err error) {

if nc.diskMonitor != nil {
nc.diskMonitor.Stop()
nc.diskMonitor = nil
}

if nc.environmentCheckMonitor != nil {
nc.environmentCheckMonitor.Stop()
nc.environmentCheckMonitor = nil
}

if nc.snapshotMonitor != nil {
nc.snapshotMonitor.Stop()
nc.snapshotMonitor = nil
}

return nc.ds.RemoveFinalizerForNode(node)
Expand Down
Loading

0 comments on commit fb73905

Please sign in to comment.