Skip to content

Commit

Permalink
[core] TRG asynchronous GetData + RunList polling + reconciliation
Browse files Browse the repository at this point in the history
  • Loading branch information
teo committed Apr 1, 2022
1 parent eae90a7 commit 9bddeea
Show file tree
Hide file tree
Showing 3 changed files with 179 additions and 30 deletions.
4 changes: 4 additions & 0 deletions core/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ func setDefaults() error {
viper.SetDefault("dcsServiceUseSystemProxy", false)
viper.SetDefault("ddSchedulerEndpoint", "//127.0.0.1:50052")
viper.SetDefault("ddSchedulerUseSystemProxy", false)
viper.SetDefault("trgServiceEndpoint", "//127.0.0.1:50060")
viper.SetDefault("trgPollingInterval", "3s")
viper.SetDefault("odcEndpoint", "//127.0.0.1:50053")
viper.SetDefault("odcPollingInterval", "3s")
viper.SetDefault("odcUseSystemProxy", false)
Expand Down Expand Up @@ -156,6 +158,8 @@ func setFlags() error {
pflag.Bool("dcsServiceUseSystemProxy", viper.GetBool("dcsServiceUseSystemProxy"), "When true the https_proxy, http_proxy and no_proxy environment variables are obeyed")
pflag.String("ddSchedulerEndpoint", viper.GetString("ddSchedulerEndpoint"), "Endpoint of the DD scheduler gRPC service (`host:port`)")
pflag.Bool("ddSchedulerUseSystemProxy", viper.GetBool("ddSchedulerUseSystemProxy"), "When true the https_proxy, http_proxy and no_proxy environment variables are obeyed")
pflag.String("trgServiceEndpoint", viper.GetString("trgServiceEndpoint"), "Endpoint of the TRG gRPC service (`host:port`)")
pflag.String("trgPollingInterval", viper.GetString("trgPollingInterval"), "How often to query the TRG gRPC service for run status (default: 3s)")
pflag.String("odcEndpoint", viper.GetString("odcEndpoint"), "Endpoint of the ODC gRPC service (`host:port`)")
pflag.String("odcPollingInterval", viper.GetString("odcPollingInterval"), "How often to query the ODC gRPC service for partition status (default: 3s)")
pflag.Bool("odcUseSystemProxy", viper.GetBool("odcUseSystemProxy"), "When true the https_proxy, http_proxy and no_proxy environment variables are obeyed")
Expand Down
198 changes: 169 additions & 29 deletions core/integration/trg/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,25 @@ import (
"net/url"
"strconv"
"strings"
"sync"
"time"

"github.com/AliceO2Group/Control/common/logger/infologger"
"github.com/AliceO2Group/Control/common/utils/uid"
"github.com/AliceO2Group/Control/core/environment"
"github.com/AliceO2Group/Control/core/integration"
trgecspb "github.com/AliceO2Group/Control/core/integration/trg/protos"
trgpb "github.com/AliceO2Group/Control/core/integration/trg/protos"
"github.com/AliceO2Group/Control/core/workflow/callable"
"github.com/spf13/viper"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
)

const TRG_DIAL_TIMEOUT = 2 * time.Second
const (
TRG_DIAL_TIMEOUT = 2 * time.Second
TRG_POLLING_INTERVAL = 3 * time.Second
TRG_RECONCILIATION_TIMEOUT = 5 * time.Second
)

type Plugin struct {
trgHost string
Expand All @@ -56,6 +62,16 @@ type Plugin struct {

pendingRunStops map[string] /*envId*/ int64
pendingRunUnloads map[string] /*envId*/ int64

cachedStatus *TrgStatus
cachedStatusMu sync.RWMutex
cachedStatusCancelFunc context.CancelFunc
}

type TrgStatus struct {
RunCount int `json:"runCount,omitempty"`
Lines []string `json:"lines,omitempty"`
Structured Runs `json:"structured,omitempty"`
}

func NewPlugin(endpoint string) integration.Plugin {
Expand Down Expand Up @@ -97,50 +113,69 @@ func (p *Plugin) GetConnectionState() string {
return p.trgClient.conn.GetState().String()
}

func (p *Plugin) GetData(environmentIds []uid.ID) string {
if p == nil || p.trgClient == nil {
return ""
}

runReply, err := p.trgClient.RunList(context.Background(), &trgecspb.Empty{}, grpc.EmptyCallOption{})
func (p *Plugin) queryRunList() {
runReply, err := p.trgClient.RunList(context.Background(), &trgpb.Empty{}, grpc.EmptyCallOption{})
if err != nil {
err = fmt.Errorf("error querying TRG service at %s: %w", viper.GetString("trgServiceEndpoint"), err)
log.WithError(err).
WithField("level", infologger.IL_Devel).
WithField("endpoint", viper.GetString("trgServiceEndpoint")).
WithField("call", "RunList").
Error("TRG error")

return fmt.Sprintf("error querying TRG service at %s: %s", viper.GetString("trgServiceEndpoint"), err.Error())
}
if runReply == nil {
log.WithField("level", infologger.IL_Support).
WithField("endpoint", viper.GetString("trgServiceEndpoint")).
WithField("call", "RunList").
WithError(fmt.Errorf("TRG RunList response is nil")).
Error("TRG error")
runReply = &trgpb.RunReply{}
}

structured, err := parseRunList(int(runReply.Rc), runReply.Msg)
if err != nil {
err = fmt.Errorf("error parsing response from TRG service at %s: %w", viper.GetString("trgServiceEndpoint"), err)
log.WithError(err).
WithField("level", infologger.IL_Devel).
WithField("endpoint", viper.GetString("trgServiceEndpoint")).
WithField("call", "RunList").
Error("TRG error")

return fmt.Sprintf("error parsing response from TRG service at %s: %s", viper.GetString("trgServiceEndpoint"), err.Error())
}
if structured == nil {
structured = make(Runs, 0)
}

out := struct {
RunCount int `json:"runCount,omitempty"`
Lines []string `json:"lines,omitempty"`
Structured Runs `json:"structured,omitempty"`
}{
out := &TrgStatus{
RunCount: int(runReply.Rc),
Lines: strings.Split(runReply.Msg, "\n"),
Structured: structured,
}

var js []byte
js, err = json.Marshal(out)
p.cachedStatusMu.Lock()
p.cachedStatus = out
p.cachedStatusMu.Unlock()
}

func (p *Plugin) GetData(_ []uid.ID) string {
if p == nil || p.trgClient == nil {
return ""
}

p.cachedStatusMu.RLock()
r := p.cachedStatus
if r == nil {
p.cachedStatusMu.RUnlock()
return ""
}

out, err := json.Marshal(r)
p.cachedStatusMu.RUnlock()

if err != nil {
return fmt.Sprintf("error marshaling TRG service response from %s: %s", viper.GetString("trgServiceEndpoint"), err.Error())
return ""
}
return string(out[:])

return string(js[:])
}

func (p *Plugin) Init(instanceId string) error {
Expand All @@ -155,9 +190,113 @@ func (p *Plugin) Init(instanceId string) error {
if p.trgClient == nil {
return fmt.Errorf("failed to start TRG client on %s", viper.GetString("trgServiceEndpoint"))
}

var ctx context.Context
ctx, p.cachedStatusCancelFunc = context.WithCancel(context.Background())

trgPollingIntervalStr := viper.GetString("trgPollingInterval")
trgPollingInterval, err := time.ParseDuration(trgPollingIntervalStr)
if err != nil {
trgPollingInterval = TRG_POLLING_INTERVAL
log.Debugf("TRG plugin cannot acquire polling interval, defaulting to %s", TRG_POLLING_INTERVAL.String())
}

// polling
go func() {
for {
select {
case <-ctx.Done():
return
case <-time.After(trgPollingInterval):
p.queryRunList()
p.reconcile()
}
}
}()
return nil
}

func (p *Plugin) reconcile() {
envMan := environment.ManagerInstance()
envIds := envMan.Ids()
activeEcsRuns := make(map[uint32]struct{})
for _, envId := range envIds {
env, err := envMan.Environment(envId)
if err != nil || env == nil {
continue
}
rn := env.GetCurrentRunNumber()
if rn != 0 {
activeEcsRuns[rn] = struct{}{}
}
}

p.cachedStatusMu.RLock()
if p.cachedStatus != nil && len(p.cachedStatus.Structured) > 0 {
for _, trgRun := range p.cachedStatus.Structured {
if _, contains := activeEcsRuns[trgRun.RunNumber]; contains { // if activeEcsRuns contains a RN known to TRG
continue
} else { // found TRG run unknown to AliECS
if p.trgClient == nil {
continue
}
in := trgpb.RunStopRequest{ // applies to both RunStop and RunUnload
Runn: trgRun.RunNumber,
Detector: "",
}

if trgRun.State == CTP_RUNNING { // both STANDALONE and GLOBAL
if trgRun.Cardinality == CTP_STANDALONE { // if global run, we send no detector
if len(trgRun.Detectors) == 1 {
in.Detector = trgRun.Detectors[0].String()
}
}
ctx, _ := context.WithTimeout(context.Background(), TRG_RECONCILIATION_TIMEOUT)
_, err := p.trgClient.RunStop(ctx, &in, grpc.EmptyCallOption{})
if err != nil {
err = fmt.Errorf("TRG reconciliation failure: %w", err)
log.WithError(err).
WithField("level", infologger.IL_Support).
WithField("endpoint", viper.GetString("trgServiceEndpoint")).
WithField("runNumber", trgRun.RunNumber).
WithField("call", "RunStop").
Error("TRG error")
} else {
if trgRun.Cardinality == CTP_GLOBAL {
trgRun.State = CTP_LOADED // must be loaded if GLOBAL && RunStop successful
} else {
log.WithField("level", infologger.IL_Support).
WithField("endpoint", viper.GetString("trgServiceEndpoint")).
WithField("runNumber", trgRun.RunNumber).
Info("TRG STANDALONE reconciliation complete")
}
}
}
if trgRun.State == CTP_LOADED && trgRun.Cardinality == CTP_GLOBAL {
ctx, _ := context.WithTimeout(context.Background(), TRG_RECONCILIATION_TIMEOUT)
_, err := p.trgClient.RunUnload(ctx, &in, grpc.EmptyCallOption{})
if err != nil {
err = fmt.Errorf("TRG reconciliation failure: %w", err)
log.WithError(err).
WithField("level", infologger.IL_Support).
WithField("endpoint", viper.GetString("trgServiceEndpoint")).
WithField("runNumber", trgRun.RunNumber).
WithField("call", "RunUnload").
Error("TRG error")
} else {
log.WithField("level", infologger.IL_Support).
WithField("endpoint", viper.GetString("trgServiceEndpoint")).
WithField("runNumber", trgRun.RunNumber).
Info("TRG GLOBAL reconciliation complete")
}
}
}
}
}
p.cachedStatusMu.RUnlock()

}

func (p *Plugin) ObjectStack(_ map[string]string) (stack map[string]interface{}) {
stack = make(map[string]interface{})
return stack
Expand Down Expand Up @@ -235,7 +374,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
return
}

in := trgecspb.RunLoadRequest{
in := trgpb.RunLoadRequest{
Runn: uint32(runNumber64),
Detectors: detectors,
Config: globalConfig,
Expand Down Expand Up @@ -274,7 +413,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
return
}

var response *trgecspb.RunReply
var response *trgpb.RunReply
response, err = p.trgClient.RunLoad(context.Background(), &in, grpc.EmptyCallOption{})
if err != nil {
log.WithError(err).
Expand Down Expand Up @@ -371,7 +510,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
detectors = ""
}

in := trgecspb.RunStartRequest{
in := trgpb.RunStartRequest{
Runn: uint32(runNumber64),
Detector: detectors,
Config: runtimeConfig,
Expand Down Expand Up @@ -410,7 +549,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
return
}

var response *trgecspb.RunReply
var response *trgpb.RunReply

response, err = p.trgClient.RunStart(context.Background(), &in, grpc.EmptyCallOption{})
if err != nil {
Expand Down Expand Up @@ -475,7 +614,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
detectors = ""
}

in := trgecspb.RunStopRequest{
in := trgpb.RunStopRequest{
Runn: uint32(runNumber64),
Detector: detectors,
}
Expand Down Expand Up @@ -515,7 +654,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
return
}

var response *trgecspb.RunReply
var response *trgpb.RunReply
response, err = p.trgClient.RunStop(context.Background(), &in, grpc.EmptyCallOption{})
if err != nil {
log.WithError(err).
Expand Down Expand Up @@ -578,7 +717,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
return
}

in := trgecspb.RunStopRequest{
in := trgpb.RunStopRequest{
Runn: uint32(runNumber64),
// "" when unloading global run
Detector: "",
Expand Down Expand Up @@ -619,7 +758,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
return
}

var response *trgecspb.RunReply
var response *trgpb.RunReply
response, err = p.trgClient.RunUnload(context.Background(), &in, grpc.EmptyCallOption{})
if err != nil {
log.WithError(err).
Expand Down Expand Up @@ -756,5 +895,6 @@ func (p *Plugin) parseDetectors(ctsDetectorsParam string) (detectors string, err
}

func (p *Plugin) Destroy() error {
p.cachedStatusCancelFunc()
return p.trgClient.Close()
}
7 changes: 6 additions & 1 deletion core/integration/trg/trgutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,12 @@ const (
//G 2224 L its, tpc run2224

func parseRunList(runCount int, payload string) (runs Runs, err error) {
lines := strings.Split(strings.TrimSpace(payload), "\n")
cleanPayload := strings.TrimSpace(payload)
if len(cleanPayload) == 0 {
err = fmt.Errorf("empty RunList response payload")
return
}
lines := strings.Split(strings.TrimSpace(cleanPayload), "\n")
if len(lines) != runCount {
err = fmt.Errorf("cannot parse run count mismatch in payload: %s", payload)
return
Expand Down

0 comments on commit 9bddeea

Please sign in to comment.