Skip to content

Commit

Permalink
Merge pull request #40 from yuyiguo/upgrade-cmspop
Browse files Browse the repository at this point in the history
Created general traceMaker for cmspop and xroot.
  • Loading branch information
yuyiguo authored Sep 16, 2021
2 parents 2298b69 + 2d8eb06 commit 77bf22d
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 79 deletions.
3 changes: 2 additions & 1 deletion stompserver/stompserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
lbstomp "github.com/vkuznet/lb-stomp"
// prometheus apis
"github.com/prometheus/client_golang/prometheus/promhttp"
// prometheus apis
)

// stompMgr defines the stomp manager for the producer.
Expand Down Expand Up @@ -64,7 +65,7 @@ func main() {
if Config.Producer == "wmarchive" {
fwjrServer()
} else if Config.Producer == "cmsswpop" {
swpopServer()
traceServer("swpop")
} else if Config.Producer == "xrootd" {
xrtdServer()
} else {
Expand Down
172 changes: 94 additions & 78 deletions stompserver/cmsswpopTracer.go → stompserver/traceMaker.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package main

// cmsswpopTracer - Is one of the three RucioTracer. It handles data from
// tracerMaker - It handles data that share the same data structure:
// CMSSWPOP: /topic/cms.swpop
// xrootd: /topic/xrootd.cms.aaa.ng
// Process it, then produce a Ruci trace message and then it to topic:
// /topic/cms.rucio.tracer
//
// Authors: Yuyi Guo
// Created: June 2021
// Created: September 2021

import (
"encoding/json"
Expand All @@ -19,11 +20,29 @@ import (

// stomp library
"github.com/go-stomp/stomp"

// prometheus apis
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

// Define the domain and RSE map file
var domainRSEMap []DomainRSE

// TopicRecord defines the common record structure.
type TopicRecord struct {
SiteName string `json:"site_name"`
Usrdn string `json:"user_dn"`
ClientHost string `json:"client_host"`
ClientDomain string `json:"client_domain"`
ServerHost string `json:"server_host"`
ServerDomain string `json:"server_domain"`
ServerSite string `json:"server_site"`
Lfn string `json:"file_lfn"`
JobType string `json:"app_info"`
Ts int64 `json:"start_time"`
}

// prometheus metrics
var (
Received_swpop = promauto.NewCounter(prometheus.CounterOpts{
Expand All @@ -39,53 +58,41 @@ var (
Help: "The number of traces messages os swpop",
})
)
var Receivedperk_2 uint64

// SWPOPRecord defines CMSSW POP record structure.
type SWPOPRecord struct {
SiteName string `json:"site_name"`
Usrdn string `json:"user_dn"`
ClientHost string `json:"client_host"`
ClientDomain string `json:"client_domain"`
ServerHost string `json:"server_host"`
ServerDomain string `json:"server_domain"`
Lfn string `json:"file_lfn"`
JobType string `json:"app_info"`
Ts int64 `json:"start_time"`
}

// Define the domain and RSE map file
var domainRSEMap []DomainRSE

// Receivedperk_swpop keeps number of messages per 1k
var Receivedperk_swpop uint64

// swpopConsumer consumes for cmssw pop topic
func swpopConsumer(msg *stomp.Message) (string, []string, string, string, int64, string, error) {
// DSConsumer consumes from various topics, such as aaa/xrtood topic
func DSConsumer(msg *stomp.Message, topic string) (string, []string, string, string, int64, string, error) {
//first to check to make sure there is something in msg,
//otherwise we will get error:
//otherwise we will get error.
//
Received_swpop.Inc()
atomic.AddUint64(&Receivedperk_swpop, 1)
atomic.AddUint64(&Receivedperk_2, 1)
if topic == "xrtd" {
Received_xrtd.Inc()
} else if topic == "swpop" {
Received_swpop.Inc()
} else {
return "", nil, "", "", 0, "", errors.New("topic is not supported.")
}
if msg == nil || msg.Body == nil {
return "", nil, "", "", 0, "", errors.New("Empty message")
}
//
if Config.Verbose > 2 {
log.Println("*****************Source AMQ message of swpop*********************")
log.Printf("*****************Source AMQ message of %s*********************\n", topic)
log.Println("\n" + string(msg.Body))
log.Println("*******************End AMQ message of swpop**********************")
log.Printf("*******************End AMQ message of %s **********************\n", topic)
}

var rec SWPOPRecord
var rec TopicRecord
err := json.Unmarshal(msg.Body, &rec)
if err != nil {
log.Printf("Enable to Unmarchal input message. Error: %v", err)
return "", nil, "", "", 0, "", err
}
if Config.Verbose > 2 {
log.Println(" ******Parsed swpop record******")
log.Printf(" ******Parsed %s record******\n", topic)
log.Println("\n", rec)
log.Println("******End parsed swpop record******")
log.Printf("\n******End parsed %s record******\n", topic)
}
// process received message, e.g. extract some fields
var lfn string
Expand All @@ -101,7 +108,15 @@ func swpopConsumer(msg *stomp.Message) (string, []string, string, string, int64,
} else {
return "", nil, "", "", 0, "", errors.New("No Lfn found")
}
if strings.ToLower(rec.ServerDomain) == "unknown" || len(rec.ServerDomain) <= 0 {
if strings.ToLower(rec.ServerSite) != "unknown" || len(rec.ServerSite) > 0 {
if s, ok := Sitemap[rec.ServerSite]; ok {
site = s
} else {
site = rec.ServerSite
}
// one lfn may have more than one RSE in some cases, such as in2p3.fr
sitename = append(sitename, site)
} else if strings.ToLower(rec.ServerDomain) == "unknown" || len(rec.ServerDomain) <= 0 {
if len(rec.SiteName) == 0 {
return "", nil, "", "", 0, "", errors.New("No RSEs found")
} else {
Expand All @@ -110,6 +125,7 @@ func swpopConsumer(msg *stomp.Message) (string, []string, string, string, int64,
} else {
site = rec.SiteName
}
// one lfn may have more than one RSE in some cases, such as in2p3.fr
sitename = append(sitename, site)
}
} else {
Expand Down Expand Up @@ -151,17 +167,18 @@ func swpopConsumer(msg *stomp.Message) (string, []string, string, string, int64,
return lfn, sitename, usrdn, jobtype, ts, wnname, nil
}

// swpopTrace makes swpop trace and send it to rucio endpoint
func swpopTrace(msg *stomp.Message) ([]string, error) {
// TraceSender makes a trace and send it to rucio endpoint
func traceSender(msg *stomp.Message, topic string) ([]string, error) {
var dids []string
//get trace data
lfn, sitename, usrdn, jobtype, ts, wnname, err := swpopConsumer(msg)
lfn, sitename, usrdn, jobtype, ts, wnname, err := DSConsumer(msg, topic)
if err != nil {
log.Println("Bad swpop message.")
return nil, errors.New("Bad swpop message")
log.Printf("Bad %s message.\n", topic)

return nil, errors.New(fmt.Sprintf("Bad %s message", topic))
}
for _, s := range sitename {
trc := NewTrace(lfn, s, ts, jobtype, wnname, "swpop", usrdn)
trc := NewTrace(lfn, s, ts, jobtype, wnname, topic, usrdn)
data, err := json.Marshal(trc)
if err != nil {
if Config.Verbose > 1 {
Expand All @@ -172,18 +189,25 @@ func swpopTrace(msg *stomp.Message) ([]string, error) {
dids = append(dids, fmt.Sprintf("%v", trc.DID))
}
if Config.Verbose > 2 {
log.Println("********* Rucio trace record ***************")
log.Printf("********* Rucio trace record from %s ***************\n", topic)
log.Println("\n" + string(data))
log.Println("******** Done Rucio trace record *************")
log.Println("\n******** Done Rucio trace record from %s *************\n", topic)
}
// send data to Stomp endpoint
if Config.EndpointProducer != "" {
err := stompMgr.Send(data, stomp.SendOpt.Header("appversion", "swpopAMQ"))
err := stompMgr.Send(data, stomp.SendOpt.Header("appversion", "xrootdAMQ"))
if err != nil {
dids = append(dids, fmt.Sprintf("%v", trc.DID))
log.Printf("Failed to send %s to stomp.", trc.DID)
} else {
Send_swpop.Inc()
if topic == "xrtd" {
Send_xrtd.Inc()
} else if topic == "swpop" {
Send_swpop.Inc()
} else {
return nil, errors.New("topic is not supported.")
}

}
} else {
log.Fatalln("*** Config.Enpoint is empty, check config file! ***")
Expand All @@ -192,61 +216,54 @@ func swpopTrace(msg *stomp.Message) ([]string, error) {
return dids, nil
}

// server gets messages from consumer AMQ end pointer, make tracers and send to AMQ producer end point.
func swpopServer() {
log.Println("Stomp broker URL: ", Config.StompURIConsumer)
//
// TraceServer gets messages from consumer AMQ end pointer, make tracers and send to AMQ producer end point.
func traceServer(topic string) {
log.Println("Consumer Stomp broker URL: ", Config.StompURIConsumer)
//
err2 := parseRSEMap(fdomainmap)
if err2 != nil {
log.Fatalf("Unable to parse rucio doamin RSE map file %s, error: %v \n", fdomainmap, err2)
}

//
err2 = parseSitemap(fsitemap)
if err2 != nil {
log.Fatalf("Unable to parse rucio sitemap file %s, error: %v \n", fsitemap, err2)
}

//
var tc uint64
t1 := time.Now().Unix()
var t2 int64
var ts uint64
var restartSrv uint
smgr := initStomp(Config.EndpointConsumer, Config.StompURIConsumer)
// get connection
sub, err := subscribe(smgr)
if err != nil {
log.Println(err)
sub, err = subscribe(smgr)
if err != nil {
log.Fatalf("Unable to subscribe to all the brokers, fatal error!")
}
// ch for all the listeners to write to
ch := make(chan *stomp.Message)
// defer close executed when the main function is about to exit.
// In this way the channel is to be closed and no resources taken.
defer close(ch)
for _, addr := range smgr.Addresses {
go listener(smgr, addr, ch)
}

//
for {
// check first if subscription is still valid, otherwise get a new one
if sub == nil {
time.Sleep(time.Duration(Config.Interval) * time.Second)
if err != nil {
log.Println("unable to get new subscription", err)
continue
}
}
// get stomp messages from subscriber channel
// get stomp messages from ch
select {
case msg := <-sub.C:
case msg := <-ch:
restartSrv = 0
if msg.Err != nil {
log.Println("receive error message", msg.Err)
sub, err = subscribe(smgr)
if err != nil {
log.Println("unable to subscribe to", Config.EndpointConsumer, err)
}
break
}
// process stomp messages
dids, err := swpopTrace(msg)
dids, err := traceSender(msg, topic)
if err == nil {
Traces_swpop.Inc()
if topic == "xrtd" {
Traces_xrtd.Inc()
} else if topic == "swpop" {
Traces_swpop.Inc()
} else {
log.Fatalf(" Topic %s is not supported. \n", topic)
}
atomic.AddUint64(&tc, 1)
if Config.Verbose > 1 {
log.Println("The number of traces processed in 1000 group: ", atomic.LoadUint64(&tc))
Expand All @@ -257,15 +274,13 @@ func swpopServer() {
atomic.StoreUint64(&tc, 0)
t2 = time.Now().Unix() - t1
t1 = time.Now().Unix()
log.Printf("Processing 1000 messages while total received %d messages.\n", atomic.LoadUint64(&Receivedperk_swpop))
log.Printf("Processing 1000 %s messages while total received %d messages.\n", topic, atomic.LoadUint64(&Receivedperk_2))
log.Printf("Processing 1000 messages took %d seconds.\n", t2)
atomic.StoreUint64(&Receivedperk_swpop, 0)
atomic.StoreUint64(&Receivedperk_2, 0)
}
if err != nil && err.Error() != "Empty message" {
log.Println("SWPOP message processing error", err)
log.Printf("\n %s message processing error: %v\n", topic, err)
}
//got error message "SWPOP message processing error unexpected end of JSON input".
//Code stoped to loop??? YG 6/22/2021
if len(dids) > 0 {
log.Printf("DIDS in Error: %v .\n ", dids)
}
Expand All @@ -274,6 +289,7 @@ func swpopServer() {
// Config.Interval = 1 so each sleeping is 10 ms. We will have to restart the server
// if it cannot get any messages in 5 minutes.
if restartSrv >= 300000 {
//FIXME: We may not exit anymore.
log.Fatalln("No messages in 5 minutes, exit(1)")
}
restartSrv += 1
Expand Down
1 change: 1 addition & 0 deletions stompserver/xrootdTracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ var domainRSEMap2 []DomainRSE
var Receivedperk_xrtd uint64

// xrtdConsumer consumes for aaa/xrtood topic
// We may use a general Consumer for both xrootd and cmsswpop
func xrtdConsumer(msg *stomp.Message) (string, []string, string, string, int64, string, error) {
//first to check to make sure there is something in msg,
//otherwise we will get error.
Expand Down

0 comments on commit 77bf22d

Please sign in to comment.