Skip to content

Commit

Permalink
support prometheus
Browse files Browse the repository at this point in the history
  • Loading branch information
wz14 committed Sep 24, 2021
1 parent 95e0f66 commit 968faa2
Show file tree
Hide file tree
Showing 5 changed files with 221 additions and 3 deletions.
11 changes: 10 additions & 1 deletion dtmsvr/dtmsvr.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,32 +9,41 @@ import (
"github.com/yedf/dtm/dtmcli"
"github.com/yedf/dtm/dtmgrpc"

"github.com/grpc-ecosystem/go-grpc-middleware"
"google.golang.org/grpc"

"github.com/yedf/dtm/examples"
)

var dtmsvrPort = 8080
var dtmsvrGrpcPort = 58080
var metricsPort = 8889

// StartSvr StartSvr
func StartSvr() {
dtmcli.Logf("start dtmsvr")
app := common.GetGinApp()
app = HTTP_metrics(app)
addRoute(app)
dtmcli.Logf("dtmsvr listen at: %d", dtmsvrPort)
go app.Run(fmt.Sprintf(":%d", dtmsvrPort))

lis, err := net.Listen("tcp", fmt.Sprintf(":%d", dtmsvrGrpcPort))
dtmcli.FatalIfError(err)
s := grpc.NewServer(grpc.UnaryInterceptor(dtmgrpc.GrpcServerLog))
s := grpc.NewServer(
grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(
grpc.UnaryServerInterceptor(GRPC_metrics), grpc.UnaryServerInterceptor(dtmgrpc.GrpcServerLog)),
))
dtmgrpc.RegisterDtmServer(s, &dtmServer{})
dtmcli.Logf("grpc listening at %v", lis.Addr())
go func() {
err := s.Serve(lis)
dtmcli.FatalIfError(err)
}()

// prometheus exporter
dtmcli.Logf("prometheus exporter listen at: %d", metricsPort)
PrometheusHttpRun(fmt.Sprintf("%d", metricsPort))
time.Sleep(100 * time.Millisecond)
}

Expand Down
101 changes: 101 additions & 0 deletions dtmsvr/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package dtmsvr

import (
"context"
"github.com/gin-gonic/gin"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/promhttp"
"google.golang.org/grpc"
"net/http"
"strings"
)

var (
processTotal = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "dtm_server_process_total",
Help: "All request received by dtm",
},
[]string{"type", "api", "status"})

responseTime = promauto.NewHistogramVec(prometheus.HistogramOpts{
Name: "dtm_server_response_duration",
Help: "The request durations of a dtm server api",
},
[]string{"type", "api"})

transactionTotal = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "dtm_transaction_process_total",
Help: "All transactions processed by dtm",
},
[]string{"model", "gid", "status"})

branchTotal = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "dtm_branch_process_total",
Help: "All branches processed by dtm",
},
[]string{"model", "gid", "branchid", "branchtype", "status"})
)

func PrometheusHttpRun(port string) {
go func() {
http.Handle("/metrics", promhttp.Handler())
http.ListenAndServe(":"+port, nil)
}()
}

func HTTP_metrics(app *gin.Engine) *gin.Engine {
app.Use(func(c *gin.Context) {
api := extractFromPath(c.Request.RequestURI)
timer := prometheus.NewTimer(prometheus.ObserverFunc(func(v float64) {
responseTime.WithLabelValues("http", api).Observe(v)
}))
defer timer.ObserveDuration()
c.Next()
status := c.Writer.Status()
if status >= 500 {
processTotal.WithLabelValues("http", api, "fail").Inc()
} else if status == 200 {
processTotal.WithLabelValues("http", api, "ok").Inc()
} else {
panic("undefined status")
}
})
return app
}

func GRPC_metrics(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
api := extractFromPath(info.FullMethod)
timer := prometheus.NewTimer(prometheus.ObserverFunc(func(v float64) {
responseTime.WithLabelValues("grpc", api).Observe(v)
}))
defer timer.ObserveDuration()
m, err := handler(ctx, req)
if err != nil {
processTotal.WithLabelValues("grpc", api, "fail").Inc()
} else {
processTotal.WithLabelValues("grpc", api, "ok").Inc()
}
return m, err
}

func TransactionMetrics(global *TransGlobal, status bool) {
if status {
transactionTotal.WithLabelValues(global.TransType, global.Gid, "ok").Inc()
} else {
transactionTotal.WithLabelValues(global.TransType, global.Gid, "fail").Inc()
}
}

func BranchMetrics(global *TransGlobal, branch *TransBranch, status bool) {
if status {
branchTotal.WithLabelValues(global.TransType, global.Gid, branch.BranchID, branch.BranchType, "ok").Inc()
} else {
branchTotal.WithLabelValues(global.TransType, global.Gid, branch.BranchID, branch.BranchType, "fail").Inc()
}
}

func extractFromPath(val string) string {
strs := strings.Split(val, "/")
return strings.ToLower(strs[len(strs)-1])
}
6 changes: 6 additions & 0 deletions dtmsvr/trans.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,16 +120,20 @@ func (t *TransGlobal) getProcessor() transProcessor {
func (t *TransGlobal) Process(db *common.DB, waitResult bool) dtmcli.M {
if !waitResult {
go t.processInner(db)
TransactionMetrics(t, true)
return dtmcli.MapSuccess
}
submitting := t.Status == dtmcli.StatusSubmitted
err := t.processInner(db)
if err != nil {
TransactionMetrics(t, false)
return dtmcli.M{"dtm_result": dtmcli.ResultFailure, "message": err.Error()}
}
if submitting && t.Status != dtmcli.StatusSucceed {
TransactionMetrics(t, false)
return dtmcli.M{"dtm_result": dtmcli.ResultFailure, "message": "trans failed by user"}
}
TransactionMetrics(t, true)
return dtmcli.MapSuccess
}

Expand Down Expand Up @@ -203,9 +207,11 @@ func (t *TransGlobal) execBranch(db *common.DB, branch *TransBranch) {
if strings.Contains(body, dtmcli.ResultSuccess) {
t.touch(db, config.TransCronInterval)
branch.changeStatus(db, dtmcli.StatusSucceed)
BranchMetrics(t, branch, true)
} else if t.TransType == "saga" && branch.BranchType == dtmcli.BranchAction && strings.Contains(body, dtmcli.ResultFailure) {
t.touch(db, config.TransCronInterval)
branch.changeStatus(db, dtmcli.StatusFailed)
BranchMetrics(t, branch, false)
} else {
panic(fmt.Errorf("http result should contains SUCCESS|FAILURE. grpc error should return nil|Aborted. \nrefer to: https://dtm.pub/summary/arch.html#http\nunkown result will be retried: %s", body))
}
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,17 @@ require (
github.com/go-playground/assert/v2 v2.0.1
github.com/go-resty/resty/v2 v2.6.0
github.com/go-sql-driver/mysql v1.5.0
github.com/json-iterator/go v1.1.10 // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/kr/pretty v0.1.0 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.1 // indirect
github.com/prometheus/client_golang v1.11.0
github.com/stretchr/testify v1.7.0
golang.org/x/net v0.0.0-20210805182204-aaa1db679c0d // indirect
golang.org/x/sys v0.0.0-20210806184541-e5e7981a1069 // indirect
google.golang.org/genproto v0.0.0-20210805201207-89edb61ffb67 // indirect
google.golang.org/grpc v1.39.1
google.golang.org/protobuf v1.27.1
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect
gopkg.in/yaml.v2 v2.3.0
gorm.io/driver/mysql v1.0.3
gorm.io/gorm v1.21.12
Expand Down
Loading

0 comments on commit 968faa2

Please sign in to comment.