Skip to content

Commit

Permalink
[core] Fix issue with ECS state reverting to UNKNOWN
Browse files Browse the repository at this point in the history
  • Loading branch information
teo committed Jun 4, 2024
1 parent f96763b commit 1124de5
Showing 1 changed file with 21 additions and 17 deletions.
38 changes: 21 additions & 17 deletions core/integration/odc/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ type Plugin struct {
}

type OdcStatus struct {
Partitions map[uid.ID]OdcPartitionInfo
Partitions map[uid.ID]*OdcPartitionInfo
Status odc.ReplyStatus
Message string
Error *odc.Error
Expand All @@ -98,14 +98,14 @@ func (o OdcDeviceId) MarshalJSON() ([]byte, error) {
}

type OdcPartitionInfo struct {
PartitionId uid.ID `json:"-"`
RunNumber uint32 `json:"runNumber"`
State string `json:"state"`
EcsState sm.State `json:"ecsState"`
DdsSessionId string `json:"ddsSessionId"`
DdsSessionStatus string `json:"ddsSessionStatus"`
Devices map[OdcDeviceId]OdcDevice `json:"devices"`
Hosts []string `json:"hosts"`
PartitionId uid.ID `json:"-"`
RunNumber uint32 `json:"runNumber"`
State string `json:"state"`
EcsState sm.State `json:"ecsState"`
DdsSessionId string `json:"ddsSessionId"`
DdsSessionStatus string `json:"ddsSessionStatus"`
Devices map[OdcDeviceId]*OdcDevice `json:"devices"`
Hosts []string `json:"hosts"`
}

type OdcDevice struct {
Expand Down Expand Up @@ -199,7 +199,7 @@ func (p *Plugin) queryPartitionStatus() {
Status: statusRep.Status,
Message: statusRep.Msg,
Error: statusRep.Error,
Partitions: make(map[uid.ID]OdcPartitionInfo),
Partitions: make(map[uid.ID]*OdcPartitionInfo),
}

odcPartInfoSlice := make([]*OdcPartitionInfo, len(statusRep.Partitions))
Expand Down Expand Up @@ -254,9 +254,9 @@ func (p *Plugin) queryPartitionStatus() {
}

odcPartInfoSlice[idx].Hosts = odcPartStateRep.Reply.Hosts
odcPartInfoSlice[idx].Devices = make(map[OdcDeviceId]OdcDevice, len(odcPartStateRep.Devices))
odcPartInfoSlice[idx].Devices = make(map[OdcDeviceId]*OdcDevice, len(odcPartStateRep.Devices))
for _, device := range odcPartStateRep.Devices {
odcPartInfoSlice[idx].Devices[OdcDeviceId(device.Id)] = OdcDevice{
odcPartInfoSlice[idx].Devices[OdcDeviceId(device.Id)] = &OdcDevice{
TaskId: strconv.FormatUint(device.Id, 10),
State: device.State,
Path: device.Path,
Expand All @@ -273,15 +273,14 @@ func (p *Plugin) queryPartitionStatus() {
// The partition wasn't found in the ODC response
continue
}
response.Partitions[odcPartSt.PartitionId] = *odcPartSt
response.Partitions[odcPartSt.PartitionId] = odcPartSt
}

p.cachedStatusMu.Lock()

// state change detection
if p.cachedStatus != nil && p.cachedStatus.Status == odc.ReplyStatus_SUCCESS {
for id, partitionInfo := range response.Partitions {

// do we have the given partition on record already? if not, no state change detection is possible
if existingPartition, ok := p.cachedStatus.Partitions[id]; ok {

Expand All @@ -298,7 +297,8 @@ func (p *Plugin) queryPartitionStatus() {
// if the state has changed, we take note of the previous state
oldEcsState = existingDevice.EcsState
} else {
// if the state hasn't changed, we bail
// if the state hasn't changed, we set the old ECS state and bail
device.EcsState = existingDevice.EcsState
continue
}
}
Expand Down Expand Up @@ -328,14 +328,16 @@ func (p *Plugin) queryPartitionStatus() {

// detection of env (ODC partition) state change + event publication
if existingPartition.State != partitionInfo.State {
partitionInfo.EcsState = fairmq.ToEcsState(partitionInfo.State, existingPartition.EcsState)

log.WithField("level", infologger.IL_Support).
WithField("partition", id.String()).
WithField("oldState", existingPartition.State).
WithField("oldEcsState", existingPartition.EcsState).
WithField("ecsState", partitionInfo.EcsState).
WithField("state", partitionInfo.State).
Info("ODC Partition state changed")

partitionInfo.EcsState = fairmq.ToEcsState(partitionInfo.State, existingPartition.EcsState)

payload := partitionStateChangedEventPayload{
PartitionId: partitionInfo.PartitionId,
DdsSessionId: partitionInfo.DdsSessionId,
Expand Down Expand Up @@ -367,6 +369,8 @@ func (p *Plugin) queryPartitionStatus() {
WithField("state", partitionInfo.State).
Warn("could not notify environment manager of ODC partition state change event")
}
} else {
partitionInfo.EcsState = existingPartition.EcsState
}
}
}
Expand Down

0 comments on commit 1124de5

Please sign in to comment.