Skip to content

Commit

Permalink
[core] New cleanup facilities for ODC
Browse files Browse the repository at this point in the history
  • Loading branch information
teo committed Oct 13, 2021
1 parent 55aadad commit 7b47a4b
Show file tree
Hide file tree
Showing 2 changed files with 146 additions and 7 deletions.
102 changes: 95 additions & 7 deletions core/integration/odc/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (

"github.com/AliceO2Group/Control/common/logger/infologger"
"github.com/AliceO2Group/Control/common/utils"
"github.com/AliceO2Group/Control/core/environment"
"github.com/AliceO2Group/Control/core/integration/odc/odcutils"
odcpb "github.com/AliceO2Group/Control/core/integration/odc/protos"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -243,7 +244,7 @@ func handleReset(ctx context.Context, odcClient *RpcClient, arguments map[string
return nil
}

func handleCleanup(ctx context.Context, odcClient *RpcClient, arguments map[string]string, envId string) error {
func handleCleanupLegacy(ctx context.Context, odcClient *RpcClient, arguments map[string]string, envId string) error {
defer utils.TimeTrackFunction(time.Now(), log.WithPrefix("odcclient"))
if envId == "" {
return errors.New("cannot proceed with empty environment id")
Expand Down Expand Up @@ -277,6 +278,93 @@ func handleCleanup(ctx context.Context, odcClient *RpcClient, arguments map[stri
return nil // We clobber the error because nothing can be done for a failed cleanup
}

func handleCleanup(ctx context.Context, odcClient *RpcClient, arguments map[string]string, envId string) error {
defer utils.TimeTrackFunction(time.Now(), log.WithPrefix("odcclient"))

// First we query ODC for the full list of active partitions
req := &odcpb.StatusRequest{}

var err error = nil
var rep *odcpb.StatusReply

rep, err = odcClient.Status(ctx, req, grpc.EmptyCallOption{})
if err != nil {
return printGrpcError(err)
}

if rep == nil || rep.GetStatus() == odcpb.ReplyStatus_UNKNOWN {
// We got a nil response with nil error, this should never happen
return errors.New("nil response error")
}

if odcErr := rep.GetError(); odcErr != nil {
return fmt.Errorf("code %d from ODC: %s", odcErr.GetCode(), odcErr.GetMsg())
}
if replyStatus := rep.GetStatus(); replyStatus != odcpb.ReplyStatus_SUCCESS {
return fmt.Errorf("status %s from ODC", replyStatus.String())
}
log.WithFields(logrus.Fields{
"odcMsg": rep.GetMsg(),
"odcStatus": rep.GetStatus().String(),
"odcExectime": rep.GetExectime(),
}).
Trace("call to ODC complete")

knownEnvs := environment.ManagerInstance().Ids()
partitionsToClean := make(map[string]struct{})
for _, odcPartition := range rep.GetPartitions() {
isOrphan := true
for _, knownEnv := range knownEnvs {
if odcPartition.Partitionid == knownEnv.String() { // found a matching env
isOrphan = false
break
}
}
if isOrphan { // no env was found for the given ODC partition
partitionsToClean[odcPartition.Partitionid] = struct{}{}
}
}

// The present function can in principle be called with envId = "", if the cleanup is triggered from
// outside of an active environment.
// If an envId is passed, we append it to the list of partitions to clean up just in case, otherwise we
// ignore it.
if envId != "" {
partitionsToClean[envId] = struct {}{}
}

// Then the actual cleanup calls begin, one partition at a time...
for odcPartitionId, _ := range partitionsToClean {
// This block tries to perform the regular teardown sequence.
// Since Shutdown is supposed to work in any state, we don't bail on error.
err := doReset(ctx, odcClient, arguments, odcPartitionId)
if err != nil {
log.WithError(printGrpcError(err)).
WithField("level", infologger.IL_Devel).
WithField("partition", odcPartitionId).
Warn("ODC Reset call failed")
}

err = doTerminate(ctx, odcClient, arguments, odcPartitionId)
if err != nil {
log.WithError(printGrpcError(err)).
WithField("level", infologger.IL_Devel).
WithField("partition", odcPartitionId).
Warn("ODC Terminate call failed")
}

err = doShutdown(ctx, odcClient, arguments, odcPartitionId)
if err != nil {
log.WithError(printGrpcError(err)).
WithField("level", infologger.IL_Devel).
WithField("partition", odcPartitionId).
Warn("ODC Shutdown call failed")
}
}

return nil // We clobber the error because nothing can be done for a failed cleanup
}

func doReset(ctx context.Context, odcClient *RpcClient, arguments map[string]string, envId string) error {
// RESET
req := &odcpb.ResetRequest{
Expand Down Expand Up @@ -307,12 +395,12 @@ func doReset(ctx context.Context, odcClient *RpcClient, arguments map[string]str
return fmt.Errorf("status %s from ODC", replyStatus.String())
}
log.WithFields(logrus.Fields{
"odcMsg": rep.Reply.Msg,
"odcStatus": rep.Reply.Status.String(),
"odcExectime": rep.Reply.Exectime,
"odcRunid": rep.Reply.Partitionid,
"odcSessionid": rep.Reply.Sessionid,
}).
"odcMsg": rep.Reply.Msg,
"odcStatus": rep.Reply.Status.String(),
"odcExectime": rep.Reply.Exectime,
"odcRunid": rep.Reply.Partitionid,
"odcSessionid": rep.Reply.Sessionid,
}).
Debug("call to ODC complete")
return err
}
Expand Down
51 changes: 51 additions & 0 deletions core/integration/odc/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,8 @@ func (p *Plugin) ObjectStack(data interface{}) (stack map[string]interface{}) {

stack = make(map[string]interface{})
stack["Configure"] = func() (out string) {
// ODC Run + SetProperties + Configure

var topology, plugin, resources string
ok := false
topology, ok = varStack["odc_topology"]
Expand Down Expand Up @@ -209,6 +211,8 @@ func (p *Plugin) ObjectStack(data interface{}) (stack map[string]interface{}) {
return
}
stack["Start"] = func() (out string) { // must formally return string even when we return nothing
// ODC SetProperties + Start

rn, ok := varStack["run_number"]
if !ok {
log.WithField("partition", envId).
Expand Down Expand Up @@ -238,6 +242,8 @@ func (p *Plugin) ObjectStack(data interface{}) (stack map[string]interface{}) {
return
}
stack["Stop"] = func() (out string) {
// ODC Stop

timeout := callable.AcquireTimeout(ODC_STOP_TIMEOUT, varStack, "Stop", envId)

ctx, cancel := context.WithTimeout(context.Background(), timeout)
Expand All @@ -256,6 +262,8 @@ func (p *Plugin) ObjectStack(data interface{}) (stack map[string]interface{}) {
return
}
stack["Reset"] = func() (out string) {
// ODC Reset + Terminate + Shutdown

timeout := callable.AcquireTimeout(ODC_RESET_TIMEOUT, varStack, "Reset", envId)

ctx, cancel := context.WithTimeout(context.Background(), timeout)
Expand All @@ -273,7 +281,30 @@ func (p *Plugin) ObjectStack(data interface{}) (stack map[string]interface{}) {
}
return
}
stack["EnsureCleanupLegacy"] = func() (out string) {
// ODC Reset + Terminate + Shutdown for current env

timeout := callable.AcquireTimeout(ODC_GENERAL_OP_TIMEOUT, varStack, "EnsureCleanup", envId)

ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
err := handleCleanupLegacy(ctx, p.odcClient, nil, envId)
if err != nil {
log.WithError(err).
WithField("level", infologger.IL_Support).
WithField("partition", envId).
WithField("call", "EnsureCleanupLegacy").

Error("ODC error")
log.WithField("partition", envId).
WithField("call", "EnsureCleanupLegacy").
Error("EPN Cleanup sequence failed")
}
return
}
stack["EnsureCleanup"] = func() (out string) {
// ODC Reset + Terminate + Shutdown for currend env + all orphans

timeout := callable.AcquireTimeout(ODC_GENERAL_OP_TIMEOUT, varStack, "EnsureCleanup", envId)

ctx, cancel := context.WithTimeout(context.Background(), timeout)
Expand All @@ -292,7 +323,27 @@ func (p *Plugin) ObjectStack(data interface{}) (stack map[string]interface{}) {
}
return
}
stack["PreDeploymentCleanup"] = func() (out string) {
// ODC Reset + Terminate + Shutdown for all orphans

timeout := callable.AcquireTimeout(ODC_GENERAL_OP_TIMEOUT, varStack, "EnsureCleanup", envId)

ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
err := handleCleanup(ctx, p.odcClient, nil, "")
if err != nil {
log.WithError(err).
WithField("level", infologger.IL_Support).
WithField("partition", envId).
WithField("call", "PreDeploymentCleanup").

Error("ODC error")
log.WithField("partition", envId).
WithField("call", "PreDeploymentCleanup").
Error("EPN PreDeploymentCleanup sequence failed")
}
return
}
return
}

Expand Down

0 comments on commit 7b47a4b

Please sign in to comment.