From 1124de5261082e109477550d19bf4e81d2e423c4 Mon Sep 17 00:00:00 2001 From: Teo Mrnjavac Date: Tue, 4 Jun 2024 14:50:35 +0200 Subject: [PATCH] [core] Fix issue with ECS state reverting to UNKNOWN --- core/integration/odc/plugin.go | 38 +++++++++++++++++++--------------- 1 file changed, 21 insertions(+), 17 deletions(-) diff --git a/core/integration/odc/plugin.go b/core/integration/odc/plugin.go index efbf0159..b1fd1fee 100644 --- a/core/integration/odc/plugin.go +++ b/core/integration/odc/plugin.go @@ -85,7 +85,7 @@ type Plugin struct { } type OdcStatus struct { - Partitions map[uid.ID]OdcPartitionInfo + Partitions map[uid.ID]*OdcPartitionInfo Status odc.ReplyStatus Message string Error *odc.Error @@ -98,14 +98,14 @@ func (o OdcDeviceId) MarshalJSON() ([]byte, error) { } type OdcPartitionInfo struct { - PartitionId uid.ID `json:"-"` - RunNumber uint32 `json:"runNumber"` - State string `json:"state"` - EcsState sm.State `json:"ecsState"` - DdsSessionId string `json:"ddsSessionId"` - DdsSessionStatus string `json:"ddsSessionStatus"` - Devices map[OdcDeviceId]OdcDevice `json:"devices"` - Hosts []string `json:"hosts"` + PartitionId uid.ID `json:"-"` + RunNumber uint32 `json:"runNumber"` + State string `json:"state"` + EcsState sm.State `json:"ecsState"` + DdsSessionId string `json:"ddsSessionId"` + DdsSessionStatus string `json:"ddsSessionStatus"` + Devices map[OdcDeviceId]*OdcDevice `json:"devices"` + Hosts []string `json:"hosts"` } type OdcDevice struct { @@ -199,7 +199,7 @@ func (p *Plugin) queryPartitionStatus() { Status: statusRep.Status, Message: statusRep.Msg, Error: statusRep.Error, - Partitions: make(map[uid.ID]OdcPartitionInfo), + Partitions: make(map[uid.ID]*OdcPartitionInfo), } odcPartInfoSlice := make([]*OdcPartitionInfo, len(statusRep.Partitions)) @@ -254,9 +254,9 @@ func (p *Plugin) queryPartitionStatus() { } odcPartInfoSlice[idx].Hosts = odcPartStateRep.Reply.Hosts - odcPartInfoSlice[idx].Devices = make(map[OdcDeviceId]OdcDevice, len(odcPartStateRep.Devices)) + odcPartInfoSlice[idx].Devices = make(map[OdcDeviceId]*OdcDevice, len(odcPartStateRep.Devices)) for _, device := range odcPartStateRep.Devices { - odcPartInfoSlice[idx].Devices[OdcDeviceId(device.Id)] = OdcDevice{ + odcPartInfoSlice[idx].Devices[OdcDeviceId(device.Id)] = &OdcDevice{ TaskId: strconv.FormatUint(device.Id, 10), State: device.State, Path: device.Path, @@ -273,7 +273,7 @@ func (p *Plugin) queryPartitionStatus() { // The partition wasn't found in the ODC response continue } - response.Partitions[odcPartSt.PartitionId] = *odcPartSt + response.Partitions[odcPartSt.PartitionId] = odcPartSt } p.cachedStatusMu.Lock() @@ -281,7 +281,6 @@ func (p *Plugin) queryPartitionStatus() { // state change detection if p.cachedStatus != nil && p.cachedStatus.Status == odc.ReplyStatus_SUCCESS { for id, partitionInfo := range response.Partitions { - // do we have the given partition on record already? if not, no state change detection is possible if existingPartition, ok := p.cachedStatus.Partitions[id]; ok { @@ -298,7 +297,8 @@ func (p *Plugin) queryPartitionStatus() { // if the state has changed, we take note of the previous state oldEcsState = existingDevice.EcsState } else { - // if the state hasn't changed, we bail + // if the state hasn't changed, we set the old ECS state and bail + device.EcsState = existingDevice.EcsState continue } } @@ -328,14 +328,16 @@ func (p *Plugin) queryPartitionStatus() { // detection of env (ODC partition) state change + event publication if existingPartition.State != partitionInfo.State { + partitionInfo.EcsState = fairmq.ToEcsState(partitionInfo.State, existingPartition.EcsState) + log.WithField("level", infologger.IL_Support). WithField("partition", id.String()). WithField("oldState", existingPartition.State). + WithField("oldEcsState", existingPartition.EcsState). + WithField("ecsState", partitionInfo.EcsState). WithField("state", partitionInfo.State). Info("ODC Partition state changed") - partitionInfo.EcsState = fairmq.ToEcsState(partitionInfo.State, existingPartition.EcsState) - payload := partitionStateChangedEventPayload{ PartitionId: partitionInfo.PartitionId, DdsSessionId: partitionInfo.DdsSessionId, @@ -367,6 +369,8 @@ func (p *Plugin) queryPartitionStatus() { WithField("state", partitionInfo.State). Warn("could not notify environment manager of ODC partition state change event") } + } else { + partitionInfo.EcsState = existingPartition.EcsState } } }