Skip to content

Commit

Permalink
Refactor: Remove epoll, migrate to perf_reader_poll
Browse files Browse the repository at this point in the history
Signed-off-by: Viet Anh Duong <[email protected]>
  • Loading branch information
vietanhduong committed Nov 23, 2023
1 parent 8bb74fd commit f71600f
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 79 deletions.
6 changes: 3 additions & 3 deletions bcc/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ type Module struct {
tracepoints map[string]int
rawTracepoints map[string]int
perfEvents map[string][]int
perfBuffers map[string]*PerfBuffer
perfBuffers map[string]*PerfEvent

symCacheMu sync.Mutex
symCaches map[int]*SymbolCache
Expand Down Expand Up @@ -126,7 +126,7 @@ func newModule(code string, opts *ModuleOptions) *Module {
tracepoints: make(map[string]int),
rawTracepoints: make(map[string]int),
perfEvents: make(map[string][]int),
perfBuffers: make(map[string]*PerfBuffer),
perfBuffers: make(map[string]*PerfEvent),
symCaches: make(map[int]*SymbolCache),
}
}
Expand Down Expand Up @@ -681,7 +681,7 @@ func (bpf *Module) ClosePerfBuffer(name string) error {
return perfBuf.CloseAllCpu()
}

func (bpf *Module) GetPerfBuffer(name string) *PerfBuffer {
func (bpf *Module) GetPerfBuffer(name string) *PerfEvent {
return bpf.perfBuffers[name]
}

Expand Down
105 changes: 32 additions & 73 deletions bcc/perf.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package bcc

import (
"fmt"
"log"
"runtime"
"runtime/cgo"
"time"
Expand All @@ -27,42 +26,28 @@ import (
extern void rawCallback(void*, void*, int);
// typedef void (*perf_reader_lost_cb)(void *cb_cookie, uint64_t lost);
extern void lostCallback(void*, uint64_t);
struct epoll_event create_ptr_event(int event_type, void* ptr) {
struct epoll_event event = { .events = event_type };
event.data.ptr = ptr;
return event;
}
void* get_event_data_ptr(struct epoll_event event) { return event.data.ptr; }
*/
import "C"

type PerfBuffer struct {
table *Table
epfd C.int
readers map[int]*C.struct_perf_reader
handler cgo.Handle
epEvents []C.struct_epoll_event
// TODO(vietanhduong): Implement open perf event
type PerfEvent struct {
table *Table
readers map[int]*C.struct_perf_reader
handler cgo.Handle
}

func CreatePerfBuffer(table *Table) *PerfBuffer {
return &PerfBuffer{
func CreatePerfBuffer(table *Table) *PerfEvent {
return &PerfEvent{
table: table,
epfd: -1,
readers: make(map[int]*C.struct_perf_reader),
}
}

func (perf *PerfBuffer) Close() error {
func (perf *PerfEvent) Close() error {
return perf.CloseAllCpu()
}

func (perf *PerfBuffer) OpenAllCpu(cb Callback, pageCnt int) error {
if len(perf.readers) != 0 || perf.epfd != -1 {
return fmt.Errorf("perviously opened perf buffer not cleaned")
}

func (perf *PerfEvent) OpenAllCpu(cb Callback, pageCnt int) error {
cpus, err := cpuonline.Get()
if err != nil {
return fmt.Errorf("get online cpu: %v", err)
Expand All @@ -72,43 +57,22 @@ func (perf *PerfBuffer) OpenAllCpu(cb Callback, pageCnt int) error {
cb = &emptyCallback{}
}

perf.epEvents = make([]C.struct_epoll_event, len(cpus))
perf.epfd, err = C.epoll_create1(C.EPOLL_CLOEXEC)
if err != nil {
return fmt.Errorf("failed to create epoll: %v", err)
}

perf.handler = cgo.NewHandle(cb)
runtime.SetFinalizer(perf, (*PerfBuffer).Close)
runtime.SetFinalizer(perf, (*PerfEvent).Close)

for _, cpu := range cpus {
opts := &C.struct_bcc_perf_buffer_opts{
pid: -1,
cpu: C.int(cpu),
wakeup_events: 1,
}

if err := perf.openOnCpu(pageCnt, opts); err != nil {
if err := perf.openOnCpu(int(cpu), pageCnt, 1); err != nil {
_ = perf.CloseAllCpu()
return err
}
}
return nil
}

func (perf *PerfBuffer) CloseAllCpu() error {
func (perf *PerfEvent) CloseAllCpu() error {
var errStr string
perf.handler.Delete()

if int(perf.epfd) >= 0 {
_, err := C.close(perf.epfd)
perf.epfd = -1
perf.epEvents = perf.epEvents[:0]
if err != nil {
errStr += fmt.Sprintf("close epoll: %v\n", err)
}
}

for cpu := range perf.readers {
if err := perf.closeOnCpu(cpu); err != nil {
errStr += fmt.Sprintf("cpu %d: %v\n", cpu, err)
Expand All @@ -121,40 +85,42 @@ func (perf *PerfBuffer) CloseAllCpu() error {
return nil
}

func (perf *PerfBuffer) Poll(timeout time.Duration) int {
if perf.epfd < 0 {
return -1
}

timeoutMs := C.int(timeout.Milliseconds())
cnt, err := C.epoll_wait(perf.epfd, &perf.epEvents[0], C.int(len(perf.readers)), timeoutMs)
if err != nil {
log.Printf("epoll_wait: %v", err)
}
func (perf *PerfEvent) Poll(timeout time.Duration) int {
ctimeout := C.int(timeout.Milliseconds())

for i := 0; i < int(cnt); i++ {
C.perf_reader_event_read((*C.struct_perf_reader)(unsafe.Pointer(C.get_event_data_ptr(perf.epEvents[i]))))
var readers []*C.struct_perf_reader
for _, reader := range perf.readers {
readers = append(readers, reader)
}
return int(cnt)
res := C.perf_reader_poll(C.int(len(readers)), &readers[0], ctimeout)
return int(res)
}

func (perf *PerfBuffer) openOnCpu(pageCnt int, opts *C.struct_bcc_perf_buffer_opts) error {
if _, ok := perf.readers[int(opts.cpu)]; ok {
return fmt.Errorf("perf buffer already open on CPU %d", opts.cpu)
func (perf *PerfEvent) openOnCpu(cpu, pageCnt, weakupEvents int) error {
if _, ok := perf.readers[cpu]; ok {
return fmt.Errorf("perf buffer already open on CPU %d", cpu)
}
if (pageCnt & (pageCnt - 1)) != 0 {
return fmt.Errorf("pageCnt must be a power of 2: %d", pageCnt)
}

opts := &C.struct_bcc_perf_buffer_opts{
pid: -1,
cpu: C.int(cpu),
wakeup_events: C.int(weakupEvents),
}

reader, err := C.bpf_open_perf_buffer_opts(
// Raw callback
(C.perf_reader_raw_cb)(unsafe.Pointer(C.rawCallback)),
// Lost callback
(C.perf_reader_lost_cb)(unsafe.Pointer(C.lostCallback)),
// Callback Cookie
unsafe.Pointer(&perf.handler),
C.int(pageCnt), opts,
C.int(pageCnt),
opts,
)

if reader == nil {
return fmt.Errorf("unable to open perf buffer: %v", err)
}
Expand All @@ -165,17 +131,11 @@ func (perf *PerfBuffer) openOnCpu(pageCnt int, opts *C.struct_bcc_perf_buffer_op
return fmt.Errorf("unable to open perf buffer on CPU %d: %v", opts.cpu, err)
}

event := C.create_ptr_event(C.EPOLLIN, unsafe.Pointer(reader))
if _, err = C.epoll_ctl(perf.epfd, C.EPOLL_CTL_ADD, readerFd, &event); err != nil {
C.perf_reader_free(unsafe.Pointer(reader))
return fmt.Errorf("unable to add perf buffer FD to epoll: %v", err)
}

perf.readers[int(opts.cpu)] = ((*C.struct_perf_reader)(reader))
return nil
}

func (perf *PerfBuffer) closeOnCpu(cpu int) error {
func (perf *PerfEvent) closeOnCpu(cpu int) error {
reader := perf.readers[cpu]
if reader == nil {
return nil
Expand All @@ -185,7 +145,6 @@ func (perf *PerfBuffer) closeOnCpu(cpu int) error {
if err := perf.table.Remove(unsafe.Pointer(&cpuC)); err != nil {
return fmt.Errorf("unable to close perf buffer on CPU: %d, %v", cpu, err)
}

delete(perf.readers, cpu)
return nil
}
6 changes: 3 additions & 3 deletions examples/bcc/perf/perf.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func main() {
return
default:
}
time.Sleep(500 * time.Millisecond)
time.Sleep(5 * time.Second)
m.PollPerfBuffer("chown_events", 0)
}
}
Expand All @@ -145,11 +145,11 @@ func (cb *callback) RawSample(raw []byte, size int32) {
var event chownEvent
err := binary.Read(bytes.NewBuffer(raw), binary.LittleEndian, &event)
if err != nil {
fmt.Printf("failed to decode received data: %s\n", err)
log.Printf("failed to decode received data: %s\n", err)
return
}
filename := (*C.char)(unsafe.Pointer(&event.Filename))
fmt.Printf("uid %d gid %d pid %d called fchownat(2) on %s (return value: %d)\n",
log.Printf("uid %d gid %d pid %d called fchownat(2) on %s (return value: %d)\n",
event.Uid, event.Gid, event.Pid, C.GoString(filename), event.ReturnValue)
}

Expand Down

0 comments on commit f71600f

Please sign in to comment.