Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Ensure TRG timeouts are obeyed + add polling timeout #630

Merged
merged 1 commit into from
Oct 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion core/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ func setDefaults() error {
viper.SetDefault("ddSchedulerUseSystemProxy", false)
viper.SetDefault("trgServiceEndpoint", "//127.0.0.1:50060")
viper.SetDefault("trgPollingInterval", "3s")
viper.SetDefault("trgPollingTimeout", "3s")
viper.SetDefault("trgReconciliationTimeout", "5s")
viper.SetDefault("odcEndpoint", "//127.0.0.1:50053")
viper.SetDefault("odcPollingInterval", "3s")
viper.SetDefault("odcUseSystemProxy", false)
Expand Down Expand Up @@ -174,7 +176,9 @@ func setFlags() error {
pflag.Duration("ddSchedulergRPCTimeout", viper.GetDuration("ddSchedulergRPCTimeout"), "Timeout for gRPC calls in ddshed plugin")
pflag.Bool("ddSchedulerUseSystemProxy", viper.GetBool("ddSchedulerUseSystemProxy"), "When true the https_proxy, http_proxy and no_proxy environment variables are obeyed")
pflag.String("trgServiceEndpoint", viper.GetString("trgServiceEndpoint"), "Endpoint of the TRG gRPC service (`host:port`)")
pflag.String("trgPollingInterval", viper.GetString("trgPollingInterval"), "How often to query the TRG gRPC service for run status (default: 3s)")
pflag.Duration("trgPollingInterval", viper.GetDuration("trgPollingInterval"), "How often to query the TRG gRPC service for run status (default: 3s)")
pflag.Duration("trgPollingTimeout", viper.GetDuration("trgPollingTimeout"), "Timeout for the query to the TRG gRPC service for run status (default: 3s)")
pflag.Duration("trgReconciliationTimeout", viper.GetDuration("trgReconciliationTimeout"), "Timeout for reconciliation requests to the TRG gRPC service (default: 5s)")
pflag.String("odcEndpoint", viper.GetString("odcEndpoint"), "Endpoint of the ODC gRPC service (`host:port`)")
pflag.String("odcPollingInterval", viper.GetString("odcPollingInterval"), "How often to query the ODC gRPC service for partition status (default: 3s)")
pflag.Bool("odcUseSystemProxy", viper.GetBool("odcUseSystemProxy"), "When true the https_proxy, http_proxy and no_proxy environment variables are obeyed")
Expand Down
77 changes: 53 additions & 24 deletions core/integration/trg/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,14 @@ import (
)

const (
TRG_DIAL_TIMEOUT = 2 * time.Second
TRG_POLLING_INTERVAL = 3 * time.Second
TRG_RECONCILIATION_TIMEOUT = 5 * time.Second
TOPIC = topic.IntegratedService + topic.Separator + "trg"
TRG_DIAL_TIMEOUT = 2 * time.Second
TRG_PFR_TIMEOUT = 5 * time.Second
TRG_LOAD_TIMEOUT = 5 * time.Second
TRG_START_TIMEOUT = 5 * time.Second
TRG_STOP_TIMEOUT = 5 * time.Second
TRG_UNLOAD_TIMEOUT = 5 * time.Second
TRG_CLEANUP_TIMEOUT = 5 * time.Second
TOPIC = topic.IntegratedService + topic.Separator + "trg"
)

type Plugin struct {
Expand Down Expand Up @@ -119,7 +123,10 @@ func (p *Plugin) GetConnectionState() string {
}

func (p *Plugin) queryRunList() {
runReply, err := p.trgClient.RunList(context.Background(), &trgpb.Empty{}, grpc.EmptyCallOption{})
ctx, cancel := context.WithTimeout(context.Background(), viper.GetDuration("trgPollingTimeout"))
defer cancel()

runReply, err := p.trgClient.RunList(ctx, &trgpb.Empty{}, grpc.EmptyCallOption{})
if err != nil {
err = fmt.Errorf("error querying TRG service at %s: %w", viper.GetString("trgServiceEndpoint"), err)
log.WithError(err).
Expand Down Expand Up @@ -260,12 +267,7 @@ func (p *Plugin) Init(instanceId string) error {
var ctx context.Context
ctx, p.cachedStatusCancelFunc = context.WithCancel(context.Background())

trgPollingIntervalStr := viper.GetString("trgPollingInterval")
trgPollingInterval, err := time.ParseDuration(trgPollingIntervalStr)
if err != nil {
trgPollingInterval = TRG_POLLING_INTERVAL
log.Debugf("TRG plugin cannot acquire polling interval, defaulting to %s", TRG_POLLING_INTERVAL.String())
}
trgPollingInterval := viper.GetDuration("trgPollingInterval")

// polling
go func() {
Expand Down Expand Up @@ -321,8 +323,10 @@ func (p *Plugin) reconcile() {
}
}*/

ctx, _ := context.WithTimeout(context.Background(), TRG_RECONCILIATION_TIMEOUT)
ctx, cancel := context.WithTimeout(context.Background(), viper.GetDuration("trgReconciliationTimeout"))
_, err := p.trgClient.RunStop(ctx, &in, grpc.EmptyCallOption{})
cancel()

// TODO: Response's RC should also be checked here
if err != nil {
err = fmt.Errorf("TRG reconciliation failure: %w", err)
Expand All @@ -344,8 +348,9 @@ func (p *Plugin) reconcile() {
}
}
if trgRun.State == CTP_LOADED && trgRun.Cardinality == CTP_GLOBAL {
ctx, _ := context.WithTimeout(context.Background(), TRG_RECONCILIATION_TIMEOUT)
ctx, cancel := context.WithTimeout(context.Background(), viper.GetDuration("trgReconciliationTimeout"))
_, err := p.trgClient.RunUnload(ctx, &in, grpc.EmptyCallOption{})
cancel()
if err != nil {
err = fmt.Errorf("TRG reconciliation failure: %w", err)
log.WithError(err).
Expand Down Expand Up @@ -457,6 +462,10 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
return
}

timeout := callable.AcquireTimeout(TRG_PFR_TIMEOUT, varStack, "PrepareForRun", envId)
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

payload := map[string]interface{}{
"trgRequest": &in,
}
Expand All @@ -473,7 +482,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
})

var response *trgpb.RunReply
response, err = p.trgClient.PrepareForRun(context.Background(), &in, grpc.EmptyCallOption{})
response, err = p.trgClient.PrepareForRun(ctx, &in, grpc.EmptyCallOption{})
if err != nil {
log.WithError(err).
WithField("level", infologger.IL_Support).
Expand Down Expand Up @@ -666,6 +675,10 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
return
}

timeout := callable.AcquireTimeout(TRG_LOAD_TIMEOUT, varStack, "RunLoad", envId)
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

payload := map[string]interface{}{
"trgRequest": &in,
}
Expand All @@ -682,7 +695,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
})

var response *trgpb.RunReply
response, err = p.trgClient.RunLoad(context.Background(), &in, grpc.EmptyCallOption{})
response, err = p.trgClient.RunLoad(ctx, &in, grpc.EmptyCallOption{})
if err != nil {
log.WithError(err).
WithField("level", infologger.IL_Support).
Expand Down Expand Up @@ -853,6 +866,10 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
return
}

timeout := callable.AcquireTimeout(TRG_START_TIMEOUT, varStack, "RunStart", envId)
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are you creating the context with timeout here and not next to the place where it is needed? (line 889)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just did it in the same spot as in ddsched. Shouldn't change much either way, as between here and the call we only have declarations and a Kafka event.

payload := map[string]interface{}{
"trgRequest": &in,
}
Expand All @@ -869,7 +886,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
})

var response *trgpb.RunReply
response, err = p.trgClient.RunStart(context.Background(), &in, grpc.EmptyCallOption{})
response, err = p.trgClient.RunStart(ctx, &in, grpc.EmptyCallOption{})
if err != nil {
log.WithError(err).
WithField("level", infologger.IL_Support).
Expand Down Expand Up @@ -956,7 +973,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {

return
}
runStopFunc := func(runNumber64 int64) (out string) {
runStopFunc := func(ctx context.Context, runNumber64 int64) (out string) {
trgDetectorsParam, ok := varStack["trg_detectors"]
if !ok {
// "" it is a global run
Expand Down Expand Up @@ -1033,7 +1050,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
})

var response *trgpb.RunReply
response, err = p.trgClient.RunStop(context.Background(), &in, grpc.EmptyCallOption{})
response, err = p.trgClient.RunStop(ctx, &in, grpc.EmptyCallOption{})
if err != nil {
log.WithError(err).
WithField("level", infologger.IL_Support).
Expand Down Expand Up @@ -1124,7 +1141,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {

return
}
runUnloadFunc := func(runNumber64 int64) (out string) {
runUnloadFunc := func(ctx context.Context, runNumber64 int64) (out string) {

trgDetectorsParam, ok := varStack["trg_detectors"]
if !ok {
Expand Down Expand Up @@ -1201,7 +1218,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
})

var response *trgpb.RunReply
response, err = p.trgClient.RunUnload(context.Background(), &in, grpc.EmptyCallOption{})
response, err = p.trgClient.RunUnload(ctx, &in, grpc.EmptyCallOption{})
if err != nil {
log.WithError(err).
WithField("level", infologger.IL_Support).
Expand Down Expand Up @@ -1294,7 +1311,11 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
Error("cannot acquire run number for TRG Run Stop")
}

return runStopFunc(runNumber64)
timeout := callable.AcquireTimeout(TRG_STOP_TIMEOUT, varStack, "RunStop", envId)
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

return runStopFunc(ctx, runNumber64)
}
stack["RunUnload"] = func() (out string) {
log.WithField("partition", envId).
Expand All @@ -1311,7 +1332,11 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
Error("cannot acquire run number for TRG Run Unload")
}

return runUnloadFunc(runNumber64)
timeout := callable.AcquireTimeout(TRG_UNLOAD_TIMEOUT, varStack, "RunUnload", envId)
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

return runUnloadFunc(ctx, runNumber64)
}
stack["Cleanup"] = func() (out string) {
envId, ok := varStack["environment_id"]
Expand All @@ -1322,6 +1347,10 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
return
}

timeout := callable.AcquireTimeout(TRG_CLEANUP_TIMEOUT, varStack, "Cleanup", envId)
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

// runStop if found pending
runNumberStop, ok := p.pendingRunStops[envId]
if ok {
Expand All @@ -1331,7 +1360,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
Debug("pending TRG Stop found, performing cleanup")

delete(p.pendingRunStops, envId)
_ = runStopFunc(runNumberStop)
_ = runStopFunc(ctx, runNumberStop)

trgEndTime := strconv.FormatInt(time.Now().UnixMilli(), 10)
parentRole, ok := call.GetParentRole().(callable.ParentRole)
Expand All @@ -1358,7 +1387,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
Debug("pending TRG Unload found, performing cleanup")

delete(p.pendingRunUnloads, envId)
_ = runUnloadFunc(runNumberUnload)
_ = runUnloadFunc(ctx, runNumberUnload)
} else {
log.WithField("partition", envId).
WithField("level", infologger.IL_Devel).
Expand Down
Loading