Skip to content

Commit

Permalink
[core] Ensure TRG timeouts are obeyed + add polling timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
teo committed Oct 24, 2024
1 parent 24a8e72 commit 4dfed95
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 25 deletions.
6 changes: 5 additions & 1 deletion core/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ func setDefaults() error {
viper.SetDefault("ddSchedulerUseSystemProxy", false)
viper.SetDefault("trgServiceEndpoint", "//127.0.0.1:50060")
viper.SetDefault("trgPollingInterval", "3s")
viper.SetDefault("trgPollingTimeout", "3s")
viper.SetDefault("trgReconciliationTimeout", "5s")
viper.SetDefault("odcEndpoint", "//127.0.0.1:50053")
viper.SetDefault("odcPollingInterval", "3s")
viper.SetDefault("odcUseSystemProxy", false)
Expand Down Expand Up @@ -174,7 +176,9 @@ func setFlags() error {
pflag.Duration("ddSchedulerStatusTimeout", viper.GetDuration("ddSchedulerStatusTimeout"), "Timeout for status calls in ddshed plugin")
pflag.Bool("ddSchedulerUseSystemProxy", viper.GetBool("ddSchedulerUseSystemProxy"), "When true the https_proxy, http_proxy and no_proxy environment variables are obeyed")
pflag.String("trgServiceEndpoint", viper.GetString("trgServiceEndpoint"), "Endpoint of the TRG gRPC service (`host:port`)")
pflag.String("trgPollingInterval", viper.GetString("trgPollingInterval"), "How often to query the TRG gRPC service for run status (default: 3s)")
pflag.Duration("trgPollingInterval", viper.GetDuration("trgPollingInterval"), "How often to query the TRG gRPC service for run status (default: 3s)")
pflag.Duration("trgPollingTimeout", viper.GetDuration("trgPollingTimeout"), "Timeout for the query to the TRG gRPC service for run status (default: 3s)")
pflag.Duration("trgReconciliationTimeout", viper.GetDuration("trgReconciliationTimeout"), "Timeout for reconciliation requests to the TRG gRPC service (default: 5s)")
pflag.String("odcEndpoint", viper.GetString("odcEndpoint"), "Endpoint of the ODC gRPC service (`host:port`)")
pflag.String("odcPollingInterval", viper.GetString("odcPollingInterval"), "How often to query the ODC gRPC service for partition status (default: 3s)")
pflag.Bool("odcUseSystemProxy", viper.GetBool("odcUseSystemProxy"), "When true the https_proxy, http_proxy and no_proxy environment variables are obeyed")
Expand Down
77 changes: 53 additions & 24 deletions core/integration/trg/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,14 @@ import (
)

const (
TRG_DIAL_TIMEOUT = 2 * time.Second
TRG_POLLING_INTERVAL = 3 * time.Second
TRG_RECONCILIATION_TIMEOUT = 5 * time.Second
TOPIC = topic.IntegratedService + topic.Separator + "trg"
TRG_DIAL_TIMEOUT = 2 * time.Second
TRG_PFR_TIMEOUT = 5 * time.Second
TRG_LOAD_TIMEOUT = 5 * time.Second
TRG_START_TIMEOUT = 5 * time.Second
TRG_STOP_TIMEOUT = 5 * time.Second
TRG_UNLOAD_TIMEOUT = 5 * time.Second
TRG_CLEANUP_TIMEOUT = 5 * time.Second
TOPIC = topic.IntegratedService + topic.Separator + "trg"
)

type Plugin struct {
Expand Down Expand Up @@ -119,7 +123,10 @@ func (p *Plugin) GetConnectionState() string {
}

func (p *Plugin) queryRunList() {
runReply, err := p.trgClient.RunList(context.Background(), &trgpb.Empty{}, grpc.EmptyCallOption{})
ctx, cancel := context.WithTimeout(context.Background(), viper.GetDuration("trgPollingTimeout"))
defer cancel()

runReply, err := p.trgClient.RunList(ctx, &trgpb.Empty{}, grpc.EmptyCallOption{})
if err != nil {
err = fmt.Errorf("error querying TRG service at %s: %w", viper.GetString("trgServiceEndpoint"), err)
log.WithError(err).
Expand Down Expand Up @@ -260,12 +267,7 @@ func (p *Plugin) Init(instanceId string) error {
var ctx context.Context
ctx, p.cachedStatusCancelFunc = context.WithCancel(context.Background())

trgPollingIntervalStr := viper.GetString("trgPollingInterval")
trgPollingInterval, err := time.ParseDuration(trgPollingIntervalStr)
if err != nil {
trgPollingInterval = TRG_POLLING_INTERVAL
log.Debugf("TRG plugin cannot acquire polling interval, defaulting to %s", TRG_POLLING_INTERVAL.String())
}
trgPollingInterval := viper.GetDuration("trgPollingInterval")

// polling
go func() {
Expand Down Expand Up @@ -321,8 +323,10 @@ func (p *Plugin) reconcile() {
}
}*/

ctx, _ := context.WithTimeout(context.Background(), TRG_RECONCILIATION_TIMEOUT)
ctx, cancel := context.WithTimeout(context.Background(), viper.GetDuration("trgReconciliationTimeout"))
_, err := p.trgClient.RunStop(ctx, &in, grpc.EmptyCallOption{})
cancel()

// TODO: Response's RC should also be checked here
if err != nil {
err = fmt.Errorf("TRG reconciliation failure: %w", err)
Expand All @@ -344,8 +348,9 @@ func (p *Plugin) reconcile() {
}
}
if trgRun.State == CTP_LOADED && trgRun.Cardinality == CTP_GLOBAL {
ctx, _ := context.WithTimeout(context.Background(), TRG_RECONCILIATION_TIMEOUT)
ctx, cancel := context.WithTimeout(context.Background(), viper.GetDuration("trgReconciliationTimeout"))
_, err := p.trgClient.RunUnload(ctx, &in, grpc.EmptyCallOption{})
cancel()
if err != nil {
err = fmt.Errorf("TRG reconciliation failure: %w", err)
log.WithError(err).
Expand Down Expand Up @@ -457,6 +462,10 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
return
}

timeout := callable.AcquireTimeout(TRG_PFR_TIMEOUT, varStack, "PrepareForRun", envId)
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

payload := map[string]interface{}{
"trgRequest": &in,
}
Expand All @@ -473,7 +482,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
})

var response *trgpb.RunReply
response, err = p.trgClient.PrepareForRun(context.Background(), &in, grpc.EmptyCallOption{})
response, err = p.trgClient.PrepareForRun(ctx, &in, grpc.EmptyCallOption{})
if err != nil {
log.WithError(err).
WithField("level", infologger.IL_Support).
Expand Down Expand Up @@ -666,6 +675,10 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
return
}

timeout := callable.AcquireTimeout(TRG_LOAD_TIMEOUT, varStack, "RunLoad", envId)
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

payload := map[string]interface{}{
"trgRequest": &in,
}
Expand All @@ -682,7 +695,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
})

var response *trgpb.RunReply
response, err = p.trgClient.RunLoad(context.Background(), &in, grpc.EmptyCallOption{})
response, err = p.trgClient.RunLoad(ctx, &in, grpc.EmptyCallOption{})
if err != nil {
log.WithError(err).
WithField("level", infologger.IL_Support).
Expand Down Expand Up @@ -853,6 +866,10 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
return
}

timeout := callable.AcquireTimeout(TRG_START_TIMEOUT, varStack, "RunStart", envId)
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

payload := map[string]interface{}{
"trgRequest": &in,
}
Expand All @@ -869,7 +886,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
})

var response *trgpb.RunReply
response, err = p.trgClient.RunStart(context.Background(), &in, grpc.EmptyCallOption{})
response, err = p.trgClient.RunStart(ctx, &in, grpc.EmptyCallOption{})
if err != nil {
log.WithError(err).
WithField("level", infologger.IL_Support).
Expand Down Expand Up @@ -956,7 +973,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {

return
}
runStopFunc := func(runNumber64 int64) (out string) {
runStopFunc := func(ctx context.Context, runNumber64 int64) (out string) {
trgDetectorsParam, ok := varStack["trg_detectors"]
if !ok {
// "" it is a global run
Expand Down Expand Up @@ -1033,7 +1050,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
})

var response *trgpb.RunReply
response, err = p.trgClient.RunStop(context.Background(), &in, grpc.EmptyCallOption{})
response, err = p.trgClient.RunStop(ctx, &in, grpc.EmptyCallOption{})
if err != nil {
log.WithError(err).
WithField("level", infologger.IL_Support).
Expand Down Expand Up @@ -1124,7 +1141,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {

return
}
runUnloadFunc := func(runNumber64 int64) (out string) {
runUnloadFunc := func(ctx context.Context, runNumber64 int64) (out string) {

trgDetectorsParam, ok := varStack["trg_detectors"]
if !ok {
Expand Down Expand Up @@ -1201,7 +1218,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
})

var response *trgpb.RunReply
response, err = p.trgClient.RunUnload(context.Background(), &in, grpc.EmptyCallOption{})
response, err = p.trgClient.RunUnload(ctx, &in, grpc.EmptyCallOption{})
if err != nil {
log.WithError(err).
WithField("level", infologger.IL_Support).
Expand Down Expand Up @@ -1294,7 +1311,11 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
Error("cannot acquire run number for TRG Run Stop")
}

return runStopFunc(runNumber64)
timeout := callable.AcquireTimeout(TRG_STOP_TIMEOUT, varStack, "RunStop", envId)
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

return runStopFunc(ctx, runNumber64)
}
stack["RunUnload"] = func() (out string) {
log.WithField("partition", envId).
Expand All @@ -1311,7 +1332,11 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
Error("cannot acquire run number for TRG Run Unload")
}

return runUnloadFunc(runNumber64)
timeout := callable.AcquireTimeout(TRG_UNLOAD_TIMEOUT, varStack, "RunUnload", envId)
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

return runUnloadFunc(ctx, runNumber64)
}
stack["Cleanup"] = func() (out string) {
envId, ok := varStack["environment_id"]
Expand All @@ -1322,6 +1347,10 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
return
}

timeout := callable.AcquireTimeout(TRG_CLEANUP_TIMEOUT, varStack, "Cleanup", envId)
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

// runStop if found pending
runNumberStop, ok := p.pendingRunStops[envId]
if ok {
Expand All @@ -1331,7 +1360,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
Debug("pending TRG Stop found, performing cleanup")

delete(p.pendingRunStops, envId)
_ = runStopFunc(runNumberStop)
_ = runStopFunc(ctx, runNumberStop)

trgEndTime := strconv.FormatInt(time.Now().UnixMilli(), 10)
parentRole, ok := call.GetParentRole().(callable.ParentRole)
Expand All @@ -1358,7 +1387,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
Debug("pending TRG Unload found, performing cleanup")

delete(p.pendingRunUnloads, envId)
_ = runUnloadFunc(runNumberUnload)
_ = runUnloadFunc(ctx, runNumberUnload)
} else {
log.WithField("partition", envId).
WithField("level", infologger.IL_Devel).
Expand Down

0 comments on commit 4dfed95

Please sign in to comment.