diff --git a/core/config.go b/core/config.go index e181174f..a6a891f0 100644 --- a/core/config.go +++ b/core/config.go @@ -107,6 +107,8 @@ func setDefaults() error { viper.SetDefault("ddSchedulerUseSystemProxy", false) viper.SetDefault("trgServiceEndpoint", "//127.0.0.1:50060") viper.SetDefault("trgPollingInterval", "3s") + viper.SetDefault("trgPollingTimeout", "3s") + viper.SetDefault("trgReconciliationTimeout", "5s") viper.SetDefault("odcEndpoint", "//127.0.0.1:50053") viper.SetDefault("odcPollingInterval", "3s") viper.SetDefault("odcUseSystemProxy", false) @@ -174,7 +176,9 @@ func setFlags() error { pflag.Duration("ddSchedulerStatusTimeout", viper.GetDuration("ddSchedulerStatusTimeout"), "Timeout for status calls in ddshed plugin") pflag.Bool("ddSchedulerUseSystemProxy", viper.GetBool("ddSchedulerUseSystemProxy"), "When true the https_proxy, http_proxy and no_proxy environment variables are obeyed") pflag.String("trgServiceEndpoint", viper.GetString("trgServiceEndpoint"), "Endpoint of the TRG gRPC service (`host:port`)") - pflag.String("trgPollingInterval", viper.GetString("trgPollingInterval"), "How often to query the TRG gRPC service for run status (default: 3s)") + pflag.Duration("trgPollingInterval", viper.GetDuration("trgPollingInterval"), "How often to query the TRG gRPC service for run status (default: 3s)") + pflag.Duration("trgPollingTimeout", viper.GetDuration("trgPollingTimeout"), "Timeout for the query to the TRG gRPC service for run status (default: 3s)") + pflag.Duration("trgReconciliationTimeout", viper.GetDuration("trgReconciliationTimeout"), "Timeout for reconciliation requests to the TRG gRPC service (default: 5s)") pflag.String("odcEndpoint", viper.GetString("odcEndpoint"), "Endpoint of the ODC gRPC service (`host:port`)") pflag.String("odcPollingInterval", viper.GetString("odcPollingInterval"), "How often to query the ODC gRPC service for partition status (default: 3s)") pflag.Bool("odcUseSystemProxy", viper.GetBool("odcUseSystemProxy"), "When true the https_proxy, http_proxy and no_proxy environment variables are obeyed") diff --git a/core/integration/trg/plugin.go b/core/integration/trg/plugin.go index d636ec55..7983378f 100644 --- a/core/integration/trg/plugin.go +++ b/core/integration/trg/plugin.go @@ -52,10 +52,14 @@ import ( ) const ( - TRG_DIAL_TIMEOUT = 2 * time.Second - TRG_POLLING_INTERVAL = 3 * time.Second - TRG_RECONCILIATION_TIMEOUT = 5 * time.Second - TOPIC = topic.IntegratedService + topic.Separator + "trg" + TRG_DIAL_TIMEOUT = 2 * time.Second + TRG_PFR_TIMEOUT = 5 * time.Second + TRG_LOAD_TIMEOUT = 5 * time.Second + TRG_START_TIMEOUT = 5 * time.Second + TRG_STOP_TIMEOUT = 5 * time.Second + TRG_UNLOAD_TIMEOUT = 5 * time.Second + TRG_CLEANUP_TIMEOUT = 5 * time.Second + TOPIC = topic.IntegratedService + topic.Separator + "trg" ) type Plugin struct { @@ -119,7 +123,10 @@ func (p *Plugin) GetConnectionState() string { } func (p *Plugin) queryRunList() { - runReply, err := p.trgClient.RunList(context.Background(), &trgpb.Empty{}, grpc.EmptyCallOption{}) + ctx, cancel := context.WithTimeout(context.Background(), viper.GetDuration("trgPollingTimeout")) + defer cancel() + + runReply, err := p.trgClient.RunList(ctx, &trgpb.Empty{}, grpc.EmptyCallOption{}) if err != nil { err = fmt.Errorf("error querying TRG service at %s: %w", viper.GetString("trgServiceEndpoint"), err) log.WithError(err). @@ -260,12 +267,7 @@ func (p *Plugin) Init(instanceId string) error { var ctx context.Context ctx, p.cachedStatusCancelFunc = context.WithCancel(context.Background()) - trgPollingIntervalStr := viper.GetString("trgPollingInterval") - trgPollingInterval, err := time.ParseDuration(trgPollingIntervalStr) - if err != nil { - trgPollingInterval = TRG_POLLING_INTERVAL - log.Debugf("TRG plugin cannot acquire polling interval, defaulting to %s", TRG_POLLING_INTERVAL.String()) - } + trgPollingInterval := viper.GetDuration("trgPollingInterval") // polling go func() { @@ -321,8 +323,10 @@ func (p *Plugin) reconcile() { } }*/ - ctx, _ := context.WithTimeout(context.Background(), TRG_RECONCILIATION_TIMEOUT) + ctx, cancel := context.WithTimeout(context.Background(), viper.GetDuration("trgReconciliationTimeout")) _, err := p.trgClient.RunStop(ctx, &in, grpc.EmptyCallOption{}) + cancel() + // TODO: Response's RC should also be checked here if err != nil { err = fmt.Errorf("TRG reconciliation failure: %w", err) @@ -344,8 +348,9 @@ func (p *Plugin) reconcile() { } } if trgRun.State == CTP_LOADED && trgRun.Cardinality == CTP_GLOBAL { - ctx, _ := context.WithTimeout(context.Background(), TRG_RECONCILIATION_TIMEOUT) + ctx, cancel := context.WithTimeout(context.Background(), viper.GetDuration("trgReconciliationTimeout")) _, err := p.trgClient.RunUnload(ctx, &in, grpc.EmptyCallOption{}) + cancel() if err != nil { err = fmt.Errorf("TRG reconciliation failure: %w", err) log.WithError(err). @@ -457,6 +462,10 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { return } + timeout := callable.AcquireTimeout(TRG_PFR_TIMEOUT, varStack, "PrepareForRun", envId) + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + payload := map[string]interface{}{ "trgRequest": &in, } @@ -473,7 +482,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { }) var response *trgpb.RunReply - response, err = p.trgClient.PrepareForRun(context.Background(), &in, grpc.EmptyCallOption{}) + response, err = p.trgClient.PrepareForRun(ctx, &in, grpc.EmptyCallOption{}) if err != nil { log.WithError(err). WithField("level", infologger.IL_Support). @@ -666,6 +675,10 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { return } + timeout := callable.AcquireTimeout(TRG_LOAD_TIMEOUT, varStack, "RunLoad", envId) + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + payload := map[string]interface{}{ "trgRequest": &in, } @@ -682,7 +695,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { }) var response *trgpb.RunReply - response, err = p.trgClient.RunLoad(context.Background(), &in, grpc.EmptyCallOption{}) + response, err = p.trgClient.RunLoad(ctx, &in, grpc.EmptyCallOption{}) if err != nil { log.WithError(err). WithField("level", infologger.IL_Support). @@ -853,6 +866,10 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { return } + timeout := callable.AcquireTimeout(TRG_START_TIMEOUT, varStack, "RunStart", envId) + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + payload := map[string]interface{}{ "trgRequest": &in, } @@ -869,7 +886,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { }) var response *trgpb.RunReply - response, err = p.trgClient.RunStart(context.Background(), &in, grpc.EmptyCallOption{}) + response, err = p.trgClient.RunStart(ctx, &in, grpc.EmptyCallOption{}) if err != nil { log.WithError(err). WithField("level", infologger.IL_Support). @@ -956,7 +973,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { return } - runStopFunc := func(runNumber64 int64) (out string) { + runStopFunc := func(ctx context.Context, runNumber64 int64) (out string) { trgDetectorsParam, ok := varStack["trg_detectors"] if !ok { // "" it is a global run @@ -1033,7 +1050,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { }) var response *trgpb.RunReply - response, err = p.trgClient.RunStop(context.Background(), &in, grpc.EmptyCallOption{}) + response, err = p.trgClient.RunStop(ctx, &in, grpc.EmptyCallOption{}) if err != nil { log.WithError(err). WithField("level", infologger.IL_Support). @@ -1124,7 +1141,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { return } - runUnloadFunc := func(runNumber64 int64) (out string) { + runUnloadFunc := func(ctx context.Context, runNumber64 int64) (out string) { trgDetectorsParam, ok := varStack["trg_detectors"] if !ok { @@ -1201,7 +1218,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { }) var response *trgpb.RunReply - response, err = p.trgClient.RunUnload(context.Background(), &in, grpc.EmptyCallOption{}) + response, err = p.trgClient.RunUnload(ctx, &in, grpc.EmptyCallOption{}) if err != nil { log.WithError(err). WithField("level", infologger.IL_Support). @@ -1294,7 +1311,11 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { Error("cannot acquire run number for TRG Run Stop") } - return runStopFunc(runNumber64) + timeout := callable.AcquireTimeout(TRG_STOP_TIMEOUT, varStack, "RunStop", envId) + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + return runStopFunc(ctx, runNumber64) } stack["RunUnload"] = func() (out string) { log.WithField("partition", envId). @@ -1311,7 +1332,11 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { Error("cannot acquire run number for TRG Run Unload") } - return runUnloadFunc(runNumber64) + timeout := callable.AcquireTimeout(TRG_UNLOAD_TIMEOUT, varStack, "RunUnload", envId) + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + return runUnloadFunc(ctx, runNumber64) } stack["Cleanup"] = func() (out string) { envId, ok := varStack["environment_id"] @@ -1322,6 +1347,10 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { return } + timeout := callable.AcquireTimeout(TRG_CLEANUP_TIMEOUT, varStack, "Cleanup", envId) + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + // runStop if found pending runNumberStop, ok := p.pendingRunStops[envId] if ok { @@ -1331,7 +1360,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { Debug("pending TRG Stop found, performing cleanup") delete(p.pendingRunStops, envId) - _ = runStopFunc(runNumberStop) + _ = runStopFunc(ctx, runNumberStop) trgEndTime := strconv.FormatInt(time.Now().UnixMilli(), 10) parentRole, ok := call.GetParentRole().(callable.ParentRole) @@ -1358,7 +1387,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) { Debug("pending TRG Unload found, performing cleanup") delete(p.pendingRunUnloads, envId) - _ = runUnloadFunc(runNumberUnload) + _ = runUnloadFunc(ctx, runNumberUnload) } else { log.WithField("partition", envId). WithField("level", infologger.IL_Devel).