diff --git a/stompserver/stompserver.go b/stompserver/stompserver.go index 418c02b..eb0ac05 100644 --- a/stompserver/stompserver.go +++ b/stompserver/stompserver.go @@ -19,6 +19,7 @@ import ( lbstomp "github.com/vkuznet/lb-stomp" // prometheus apis "github.com/prometheus/client_golang/prometheus/promhttp" + // prometheus apis ) // stompMgr defines the stomp manager for the producer. @@ -64,7 +65,7 @@ func main() { if Config.Producer == "wmarchive" { fwjrServer() } else if Config.Producer == "cmsswpop" { - swpopServer() + traceServer("swpop") } else if Config.Producer == "xrootd" { xrtdServer() } else { diff --git a/stompserver/cmsswpopTracer.go b/stompserver/traceMaker.go similarity index 63% rename from stompserver/cmsswpopTracer.go rename to stompserver/traceMaker.go index 65e6767..10bccc4 100644 --- a/stompserver/cmsswpopTracer.go +++ b/stompserver/traceMaker.go @@ -1,12 +1,13 @@ package main -// cmsswpopTracer - Is one of the three RucioTracer. It handles data from +// tracerMaker - It handles data that share the same data structure: // CMSSWPOP: /topic/cms.swpop +// xrootd: /topic/xrootd.cms.aaa.ng // Process it, then produce a Ruci trace message and then it to topic: // /topic/cms.rucio.tracer // // Authors: Yuyi Guo -// Created: June 2021 +// Created: September 2021 import ( "encoding/json" @@ -19,11 +20,29 @@ import ( // stomp library "github.com/go-stomp/stomp" + // prometheus apis "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" ) +// Define the domain and RSE map file +var domainRSEMap []DomainRSE + +// TopicRecord defines the common record structure. +type TopicRecord struct { + SiteName string `json:"site_name"` + Usrdn string `json:"user_dn"` + ClientHost string `json:"client_host"` + ClientDomain string `json:"client_domain"` + ServerHost string `json:"server_host"` + ServerDomain string `json:"server_domain"` + ServerSite string `json:"server_site"` + Lfn string `json:"file_lfn"` + JobType string `json:"app_info"` + Ts int64 `json:"start_time"` +} + // prometheus metrics var ( Received_swpop = promauto.NewCounter(prometheus.CounterOpts{ @@ -39,53 +58,41 @@ var ( Help: "The number of traces messages os swpop", }) ) +var Receivedperk_2 uint64 -// SWPOPRecord defines CMSSW POP record structure. -type SWPOPRecord struct { - SiteName string `json:"site_name"` - Usrdn string `json:"user_dn"` - ClientHost string `json:"client_host"` - ClientDomain string `json:"client_domain"` - ServerHost string `json:"server_host"` - ServerDomain string `json:"server_domain"` - Lfn string `json:"file_lfn"` - JobType string `json:"app_info"` - Ts int64 `json:"start_time"` -} - -// Define the domain and RSE map file -var domainRSEMap []DomainRSE - -// Receivedperk_swpop keeps number of messages per 1k -var Receivedperk_swpop uint64 - -// swpopConsumer consumes for cmssw pop topic -func swpopConsumer(msg *stomp.Message) (string, []string, string, string, int64, string, error) { +// DSConsumer consumes from various topics, such as aaa/xrtood topic +func DSConsumer(msg *stomp.Message, topic string) (string, []string, string, string, int64, string, error) { //first to check to make sure there is something in msg, - //otherwise we will get error: + //otherwise we will get error. // - Received_swpop.Inc() - atomic.AddUint64(&Receivedperk_swpop, 1) + atomic.AddUint64(&Receivedperk_2, 1) + if topic == "xrtd" { + Received_xrtd.Inc() + } else if topic == "swpop" { + Received_swpop.Inc() + } else { + return "", nil, "", "", 0, "", errors.New("topic is not supported.") + } if msg == nil || msg.Body == nil { return "", nil, "", "", 0, "", errors.New("Empty message") } // if Config.Verbose > 2 { - log.Println("*****************Source AMQ message of swpop*********************") + log.Printf("*****************Source AMQ message of %s*********************\n", topic) log.Println("\n" + string(msg.Body)) - log.Println("*******************End AMQ message of swpop**********************") + log.Printf("*******************End AMQ message of %s **********************\n", topic) } - var rec SWPOPRecord + var rec TopicRecord err := json.Unmarshal(msg.Body, &rec) if err != nil { log.Printf("Enable to Unmarchal input message. Error: %v", err) return "", nil, "", "", 0, "", err } if Config.Verbose > 2 { - log.Println(" ******Parsed swpop record******") + log.Printf(" ******Parsed %s record******\n", topic) log.Println("\n", rec) - log.Println("******End parsed swpop record******") + log.Printf("\n******End parsed %s record******\n", topic) } // process received message, e.g. extract some fields var lfn string @@ -101,7 +108,15 @@ func swpopConsumer(msg *stomp.Message) (string, []string, string, string, int64, } else { return "", nil, "", "", 0, "", errors.New("No Lfn found") } - if strings.ToLower(rec.ServerDomain) == "unknown" || len(rec.ServerDomain) <= 0 { + if strings.ToLower(rec.ServerSite) != "unknown" || len(rec.ServerSite) > 0 { + if s, ok := Sitemap[rec.ServerSite]; ok { + site = s + } else { + site = rec.ServerSite + } + // one lfn may have more than one RSE in some cases, such as in2p3.fr + sitename = append(sitename, site) + } else if strings.ToLower(rec.ServerDomain) == "unknown" || len(rec.ServerDomain) <= 0 { if len(rec.SiteName) == 0 { return "", nil, "", "", 0, "", errors.New("No RSEs found") } else { @@ -110,6 +125,7 @@ func swpopConsumer(msg *stomp.Message) (string, []string, string, string, int64, } else { site = rec.SiteName } + // one lfn may have more than one RSE in some cases, such as in2p3.fr sitename = append(sitename, site) } } else { @@ -151,17 +167,18 @@ func swpopConsumer(msg *stomp.Message) (string, []string, string, string, int64, return lfn, sitename, usrdn, jobtype, ts, wnname, nil } -// swpopTrace makes swpop trace and send it to rucio endpoint -func swpopTrace(msg *stomp.Message) ([]string, error) { +// TraceSender makes a trace and send it to rucio endpoint +func traceSender(msg *stomp.Message, topic string) ([]string, error) { var dids []string //get trace data - lfn, sitename, usrdn, jobtype, ts, wnname, err := swpopConsumer(msg) + lfn, sitename, usrdn, jobtype, ts, wnname, err := DSConsumer(msg, topic) if err != nil { - log.Println("Bad swpop message.") - return nil, errors.New("Bad swpop message") + log.Printf("Bad %s message.\n", topic) + + return nil, errors.New(fmt.Sprintf("Bad %s message", topic)) } for _, s := range sitename { - trc := NewTrace(lfn, s, ts, jobtype, wnname, "swpop", usrdn) + trc := NewTrace(lfn, s, ts, jobtype, wnname, topic, usrdn) data, err := json.Marshal(trc) if err != nil { if Config.Verbose > 1 { @@ -172,18 +189,25 @@ func swpopTrace(msg *stomp.Message) ([]string, error) { dids = append(dids, fmt.Sprintf("%v", trc.DID)) } if Config.Verbose > 2 { - log.Println("********* Rucio trace record ***************") + log.Printf("********* Rucio trace record from %s ***************\n", topic) log.Println("\n" + string(data)) - log.Println("******** Done Rucio trace record *************") + log.Println("\n******** Done Rucio trace record from %s *************\n", topic) } // send data to Stomp endpoint if Config.EndpointProducer != "" { - err := stompMgr.Send(data, stomp.SendOpt.Header("appversion", "swpopAMQ")) + err := stompMgr.Send(data, stomp.SendOpt.Header("appversion", "xrootdAMQ")) if err != nil { dids = append(dids, fmt.Sprintf("%v", trc.DID)) log.Printf("Failed to send %s to stomp.", trc.DID) } else { - Send_swpop.Inc() + if topic == "xrtd" { + Send_xrtd.Inc() + } else if topic == "swpop" { + Send_swpop.Inc() + } else { + return nil, errors.New("topic is not supported.") + } + } } else { log.Fatalln("*** Config.Enpoint is empty, check config file! ***") @@ -192,61 +216,54 @@ func swpopTrace(msg *stomp.Message) ([]string, error) { return dids, nil } -// server gets messages from consumer AMQ end pointer, make tracers and send to AMQ producer end point. -func swpopServer() { - log.Println("Stomp broker URL: ", Config.StompURIConsumer) +// +// TraceServer gets messages from consumer AMQ end pointer, make tracers and send to AMQ producer end point. +func traceServer(topic string) { + log.Println("Consumer Stomp broker URL: ", Config.StompURIConsumer) // err2 := parseRSEMap(fdomainmap) if err2 != nil { log.Fatalf("Unable to parse rucio doamin RSE map file %s, error: %v \n", fdomainmap, err2) } - + // err2 = parseSitemap(fsitemap) if err2 != nil { log.Fatalf("Unable to parse rucio sitemap file %s, error: %v \n", fsitemap, err2) } - + // var tc uint64 t1 := time.Now().Unix() var t2 int64 var ts uint64 var restartSrv uint smgr := initStomp(Config.EndpointConsumer, Config.StompURIConsumer) - // get connection - sub, err := subscribe(smgr) - if err != nil { - log.Println(err) - sub, err = subscribe(smgr) - if err != nil { - log.Fatalf("Unable to subscribe to all the brokers, fatal error!") - } + // ch for all the listeners to write to + ch := make(chan *stomp.Message) + // defer close executed when the main function is about to exit. + // In this way the channel is to be closed and no resources taken. + defer close(ch) + for _, addr := range smgr.Addresses { + go listener(smgr, addr, ch) } - + // for { - // check first if subscription is still valid, otherwise get a new one - if sub == nil { - time.Sleep(time.Duration(Config.Interval) * time.Second) - if err != nil { - log.Println("unable to get new subscription", err) - continue - } - } - // get stomp messages from subscriber channel + // get stomp messages from ch select { - case msg := <-sub.C: + case msg := <-ch: restartSrv = 0 if msg.Err != nil { - log.Println("receive error message", msg.Err) - sub, err = subscribe(smgr) - if err != nil { - log.Println("unable to subscribe to", Config.EndpointConsumer, err) - } break } // process stomp messages - dids, err := swpopTrace(msg) + dids, err := traceSender(msg, topic) if err == nil { - Traces_swpop.Inc() + if topic == "xrtd" { + Traces_xrtd.Inc() + } else if topic == "swpop" { + Traces_swpop.Inc() + } else { + log.Fatalf(" Topic %s is not supported. \n", topic) + } atomic.AddUint64(&tc, 1) if Config.Verbose > 1 { log.Println("The number of traces processed in 1000 group: ", atomic.LoadUint64(&tc)) @@ -257,15 +274,13 @@ func swpopServer() { atomic.StoreUint64(&tc, 0) t2 = time.Now().Unix() - t1 t1 = time.Now().Unix() - log.Printf("Processing 1000 messages while total received %d messages.\n", atomic.LoadUint64(&Receivedperk_swpop)) + log.Printf("Processing 1000 %s messages while total received %d messages.\n", topic, atomic.LoadUint64(&Receivedperk_2)) log.Printf("Processing 1000 messages took %d seconds.\n", t2) - atomic.StoreUint64(&Receivedperk_swpop, 0) + atomic.StoreUint64(&Receivedperk_2, 0) } if err != nil && err.Error() != "Empty message" { - log.Println("SWPOP message processing error", err) + log.Printf("\n %s message processing error: %v\n", topic, err) } - //got error message "SWPOP message processing error unexpected end of JSON input". - //Code stoped to loop??? YG 6/22/2021 if len(dids) > 0 { log.Printf("DIDS in Error: %v .\n ", dids) } @@ -274,6 +289,7 @@ func swpopServer() { // Config.Interval = 1 so each sleeping is 10 ms. We will have to restart the server // if it cannot get any messages in 5 minutes. if restartSrv >= 300000 { + //FIXME: We may not exit anymore. log.Fatalln("No messages in 5 minutes, exit(1)") } restartSrv += 1 diff --git a/stompserver/xrootdTracer.go b/stompserver/xrootdTracer.go index 585488d..e712d33 100644 --- a/stompserver/xrootdTracer.go +++ b/stompserver/xrootdTracer.go @@ -62,6 +62,7 @@ var domainRSEMap2 []DomainRSE var Receivedperk_xrtd uint64 // xrtdConsumer consumes for aaa/xrtood topic +// We may use a general Consumer for both xrootd and cmsswpop func xrtdConsumer(msg *stomp.Message) (string, []string, string, string, int64, string, error) { //first to check to make sure there is something in msg, //otherwise we will get error.