Skip to content

Commit

Permalink
feat: agent
Browse files Browse the repository at this point in the history
  • Loading branch information
zenghur committed Jun 10, 2021
1 parent 0abe998 commit 922388b
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 81 deletions.
179 changes: 99 additions & 80 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@ import (
"encoding/base64"
"errors"
"fmt"
"log"
"math/rand"
"os"
"runtime"
"runtime/pprof"
"time"

"go.uber.org/zap"

gprofile "github.com/google/pprof/profile"
"github.com/sirupsen/logrus"
"github.com/xiaojiaoyu100/cast"
Expand All @@ -25,10 +26,11 @@ const (
)

type Agent struct {
o *Option
c *cast.Cast
stop chan struct{}
done chan struct{}
o *Option
c *cast.Cast
logger *zap.Logger
stop chan struct{}
done chan struct{}
}

type Setter func(o *Option) error
Expand Down Expand Up @@ -80,11 +82,18 @@ func New(ff ...Setter) (*Agent, error) {
if err != nil {
return nil, fmt.Errorf("create cast err: %w", err)
}

logger, err := zap.NewProduction()
if err != nil {
return nil, fmt.Errorf("fail to create a logger: %w", err)
}

agent := &Agent{
o: option,
c: c,
stop: make(chan struct{}),
done: make(chan struct{}),
o: option,
c: c,
logger: logger,
stop: make(chan struct{}),
done: make(chan struct{}),
}
return agent, nil
}
Expand All @@ -105,12 +114,10 @@ type ReceiveProfileReq struct {
ProfileType string `json:"profileType"`
Profile string `json:"profile"`
SendTime int64 `json:"sendTime"`
CreateTime int64 `json:"create_time"`
CreateTime int64 `json:"createTime"`
}

func (a *Agent) onSchedule(ctx context.Context) {
defer close(a.done)

func (a *Agent) initRing() *ring.Ring {
var ll = []profile.Type{
profile.TypeCPU,
profile.TypeHeap,
Expand All @@ -125,6 +132,82 @@ func (a *Agent) onSchedule(ctx context.Context) {
r.Value = ll[i]
r = r.Next()
}
return r
}

func (a *Agent) collectAndSend(ctx context.Context, buf *bytes.Buffer, r *ring.Ring) error {
profileType := r.Value.(profile.Type)
switch profileType {
case profile.TypeCPU:
if err := pprof.StartCPUProfile(buf); err != nil {
return fmt.Errorf("fail to start cpu profile: %w", err)
}
block(ctx, a.o.CPUProfilingPeriod)
pprof.StopCPUProfile()
case profile.TypeHeap,
profile.TypeAllocs,
profile.TypeBlock,
profile.TypeMutex,
profile.TypeGoroutine,
profile.TypeThreadCreate:
p := pprof.Lookup(profileType.String())
if p == nil {
return fmt.Errorf("fail to look up profile type: %s", profileType.String())
}
if err := p.WriteTo(buf, 0); err != nil {
return fmt.Errorf("fail to write profile: %w", err)
}
}

var body ReceiveProfileReq
body.Service = a.o.Service
body.ServiceVersion = a.o.ServiceVersion
hostname, err := os.Hostname()
if err != nil {
hostname = "unknown"
}
body.Host = hostname
body.GoVersion = a.o.goVersion
body.ProfileType = profileType.String()

pf := base64.StdEncoding.EncodeToString(buf.Bytes())
if len(pf) == 0 {
return fmt.Errorf("fprofile buffer is zero: %s", profileType.String())
}

body.Profile = pf
body.SendTime = time.Now().UnixNano()
pp, err := gprofile.ParseData(buf.Bytes())
if err != nil {
return fmt.Errorf("fail to parse profile data: %w", err)
}
body.CreateTime = pp.TimeNanos

req := a.c.NewRequest().Post().WithPath("/v1/profile").WithJSONBody(&body)
resp, err := a.c.Do(ctx, req)
if err != nil {
return fmt.Errorf("fail to send profile: %w", err)
}

if !resp.StatusOk() {
return fmt.Errorf("response is not ok: %s", resp.String())
}
return nil
}

func (a *Agent) prepareNextRound(t *time.Timer, buf *bytes.Buffer, r *ring.Ring) {
buf.Reset()
r = r.Next()
if r.Value.(profile.Type) == profile.TypeCPU {
t.Reset(adjust(a.o.BreakPeriod))
}
t.Reset(adjust(0))
}

func (a *Agent) onSchedule(ctx context.Context) {
defer close(a.done)

r := a.initRing()

ctx, cancel := context.WithCancel(ctx)
go func() {
Expand All @@ -145,74 +228,10 @@ func (a *Agent) onSchedule(ctx context.Context) {
}
case <-ti.C:
{
fmt.Println("timer fires: ", time.Now())
profileType := r.Value.(profile.Type)
switch profileType {
case profile.TypeCPU:
if err := pprof.StartCPUProfile(&buf); err != nil {
log.Println("fail to start cpu profile: ", err)
return
}
block(ctx, a.o.CPUProfilingPeriod)
pprof.StopCPUProfile()
case profile.TypeHeap,
profile.TypeAllocs,
profile.TypeBlock,
profile.TypeMutex,
profile.TypeGoroutine,
profile.TypeThreadCreate:
p := pprof.Lookup(profileType.String())
if p == nil {
log.Println("fail to look up profile")
return
}
if err := p.WriteTo(&buf, 0); err != nil {
log.Println("fail to write profile: ", err)
return
}
}

var body ReceiveProfileReq
body.Service = a.o.Service
body.ServiceVersion = a.o.ServiceVersion
hostname, err := os.Hostname()
if err != nil {
hostname = "unknown"
}
body.Host = hostname
body.GoVersion = runtime.Version()
body.ProfileType = profileType.String()

pf := base64.StdEncoding.EncodeToString(buf.Bytes())

body.Profile = pf
body.SendTime = time.Now().UnixNano()
pp, err := gprofile.ParseData(buf.Bytes())
if err != nil {
fmt.Println("parse data: ", err, profileType)
continue
}
body.CreateTime = pp.TimeNanos

req := a.c.NewRequest().Post().WithPath("/v1/profile").WithJSONBody(&body)
resp, err := a.c.Do(ctx, req)
if err != nil {
fmt.Printf("send err: %s", err)
continue
}

if !resp.StatusOk() {
fmt.Println("status not ok")
continue
if err := a.collectAndSend(ctx, &buf, r); err != nil {
a.logger.Warn(fmt.Sprintf("fail to collect and send: %v", r.Value), zap.Error(err))
}

buf.Reset()
r = r.Next()
if r.Value.(profile.Type) == profile.TypeCPU {
ti.Reset(adjust(a.o.BreakPeriod))
}

ti.Reset(adjust(0))
a.prepareNextRound(ti, &buf, r)
}

}
Expand Down
2 changes: 1 addition & 1 deletion collector/server/controller/profile/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ type ReceiveProfileReq struct {
ProfileType string `json:"profileType"`
Profile string `json:"profile"`
SendTime int64 `json:"sendTime"`
CreateTime int64 `json:"create_time"`
CreateTime int64 `json:"createTime"`
}

func ReceiveProfile(c *gin.Context) {
Expand Down

0 comments on commit 922388b

Please sign in to comment.