Skip to content

Commit

Permalink
Reduce DNS allocations (#432)
Browse files Browse the repository at this point in the history
  • Loading branch information
anjmao authored Dec 9, 2024
1 parent fe49281 commit c7a57c2
Show file tree
Hide file tree
Showing 13 changed files with 353 additions and 277 deletions.
15 changes: 10 additions & 5 deletions cmd/agent/daemon/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,11 +398,16 @@ func buildEBPFPolicy(log *logging.Logger, cfg *Config, exporters *state.Exporter
}

dnsEventPolicy := &ebpftracer.EventPolicy{
ID: events.NetPacketDNSBase,
FilterGenerator: ebpftracer.FilterAnd(
ebpftracer.FilterEmptyDnsAnswers(log),
ebpftracer.DeduplicateDnsEvents(log, 100, 60*time.Second),
),
ID: events.NetPacketDNSBase,
PreFilterGenerator: ebpftracer.DeduplicateDNSEventsPreFilter(log, 100, 60*time.Second),
KernelFilters: []ebpftracer.KernelEventFilter{
{
Name: "Skip emtpy dns answers",
Description: `Helper net_l7_empty_dns_answer is used to check if dns header answers field is non zero.
Currently we care only care about dns responses with valid answers.
`,
},
},
}

if cfg.ProcessTree.Enabled {
Expand Down
23 changes: 23 additions & 0 deletions pkg/ebpftracer/c/tracee.bpf.c
Original file line number Diff line number Diff line change
Expand Up @@ -2256,6 +2256,20 @@ statfunc bool net_l7_is_ssh(struct __sk_buff *skb, u32 l7_off)
return false;
}

// see https://datatracker.ietf.org/doc/html/rfc1035#section-4.1.1 for dns header.
statfunc bool net_l7_empty_dns_answer(struct __sk_buff *skb, u32 l7_off)
{
if (skb->len < l7_off) {
return false;
}

u16 ancount = 0;
if (bpf_skb_load_bytes(skb, l7_off + 6, &ancount, sizeof(ancount)) < 0) {
return false;
}
return bpf_ntohs(ancount) == 0;
}

//
// SUPPORTED L4 NETWORK PROTOCOL (tcp, udp, icmp) HANDLERS
//
Expand Down Expand Up @@ -2360,6 +2374,11 @@ CGROUP_SKB_HANDLE_FUNCTION(proto_udp)

CGROUP_SKB_HANDLE_FUNCTION(proto_tcp_dns)
{
// Skip the 2-byte length prefix for dns over tcp.
if (net_l7_empty_dns_answer(ctx, md.header_size + 2)) {
return 1;
}

// submit DNS base event if needed (full packet)
if (should_submit_net_event(neteventctx, SUB_NET_PACKET_DNS))
cgroup_skb_submit_event(ctx, md, neteventctx, NET_PACKET_DNS, FULL);
Expand All @@ -2369,6 +2388,10 @@ CGROUP_SKB_HANDLE_FUNCTION(proto_tcp_dns)

CGROUP_SKB_HANDLE_FUNCTION(proto_udp_dns)
{
if (net_l7_empty_dns_answer(ctx, md.header_size)) {
return 1;
}

// submit DNS base event if needed (full packet)
if (should_submit_net_event(neteventctx, SUB_NET_PACKET_DNS))
cgroup_skb_submit_event(ctx, md, neteventctx, NET_PACKET_DNS, FULL);
Expand Down
53 changes: 51 additions & 2 deletions pkg/ebpftracer/decoder/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -519,8 +519,54 @@ func (decoder *Decoder) ReadAddrTuple() (types.AddrTuple, error) {

var errDNSMessageNotComplete = errors.New("received dns packet not complete")

// NOTE: This is not thread safe. Since currently only single go-routine reads the data this is fine.
var dnsPacketParser = &layers.DNS{}

func (decoder *Decoder) DecodeDNSLayer(details *packet.PacketDetails) (*layers.DNS, error) {
if details.Proto == packet.SubProtocolTCP {
if len(details.Payload) < 2 {
return nil, errDNSMessageNotComplete
}

// DNS over TCP prefixes the DNS message with a two octet length field. If the payload is not as big as this specified length,
// then we cannot parse the packet, as part of the DNS message will be send in a later one.
// For more information see https://datatracker.ietf.org/doc/html/rfc1035.html#section-4.2.2
length := int(binary.BigEndian.Uint16(details.Payload[:2]))
if len(details.Payload)+2 < length {
return nil, errDNSMessageNotComplete
}
details.Payload = details.Payload[2:]
}
if err := dnsPacketParser.DecodeFromBytes(details.Payload, gopacket.NilDecodeFeedback); err != nil {
return nil, err
}
return dnsPacketParser, nil
}

func (decoder *Decoder) DecodeDNSAndDetails() (*layers.DNS, packet.PacketDetails, error) {
var discard uint8
// Read firsts two bytes and discard. It's mapped to argsnum and index.
// For network events in most cases there is only 1 argument (payload).
_ = decoder.DecodeUint8(&discard)
_ = decoder.DecodeUint8(&discard)

packetData, err := decoder.ReadMaxByteSliceFromBuff(-1)
if err != nil {
return nil, packet.PacketDetails{}, err
}

details, err := packet.ExtractPacketDetails(packetData)
if err != nil {
return nil, packet.PacketDetails{}, err
}

dns, err := decoder.DecodeDNSLayer(&details)
if err != nil {
return nil, packet.PacketDetails{}, err
}
return dns, details, nil
}

func (decoder *Decoder) ReadProtoDNS() (*types.ProtoDNS, error) {
data, err := decoder.ReadMaxByteSliceFromBuff(eventMaxByteSliceBufferSize(events.NetPacketDNSBase))
if err != nil {
Expand Down Expand Up @@ -550,6 +596,10 @@ func (decoder *Decoder) ReadProtoDNS() (*types.ProtoDNS, error) {
return nil, err
}

return ToProtoDNS(&details, dnsPacketParser), nil
}

func ToProtoDNS(details *packet.PacketDetails, dnsPacketParser *layers.DNS) *castpb.DNS {
pbDNS := &castpb.DNS{
Answers: make([]*castpb.DNSAnswers, len(dnsPacketParser.Answers)),
Tuple: &castpb.Tuple{
Expand All @@ -575,8 +625,7 @@ func (decoder *Decoder) ReadProtoDNS() (*types.ProtoDNS, error) {
Cname: string(v.CNAME),
}
}

return pbDNS, nil
return pbDNS
}

var ErrWrongSSHVersionPrefix = errors.New("got wrong ssh version prefix")
Expand Down
34 changes: 14 additions & 20 deletions pkg/ebpftracer/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package ebpftracer
import (
"time"

"github.com/castai/kvisor/pkg/ebpftracer/decoder"
"github.com/castai/kvisor/pkg/ebpftracer/events"
"github.com/castai/kvisor/pkg/ebpftracer/types"
)
Expand All @@ -14,8 +15,9 @@ type Policy struct {
Output PolicyOutputConfig
}

// PreEventFilter allows for filtering of events coming from the kernel before they are decoded
type PreEventFilter func(ctx *types.EventContext) error
// PreEventFilter allows for filtering of events coming from the kernel before they are decoded.
// Parsed args should be returned if filter passes.
type PreEventFilter func(ctx *types.EventContext, decoder *decoder.Decoder) (types.Args, error)

// EventFilterGenerator Produces an pre event filter for each call
type PreEventFilterGenerator func() PreEventFilter
Expand All @@ -26,10 +28,18 @@ type EventFilter func(event *types.Event) error
// EventFilterGenerator Produces an event filter for each call
type EventFilterGenerator func() EventFilter

// KernelEventFilter is a placeholder and currently used for documentation purposes only.
// Each used filter is describer with explanation how it's implemented in the kernel.
type KernelEventFilter struct {
Name string
Description string
}

type EventPolicy struct {
ID events.ID
PreFilterGenerator PreEventFilterGenerator
FilterGenerator EventFilterGenerator
KernelFilters []KernelEventFilter
}

// RateLimitPolicy allows to configure event rate limiting.
Expand All @@ -47,8 +57,8 @@ type LRUPolicy struct {
}

type PolicyOutputConfig struct {
RelativeTime bool
ExecHash bool
RelativeTime bool
ExecHash bool

ParseArguments bool
ParseArgumentsFDs bool
Expand All @@ -74,19 +84,3 @@ type cgroupEventPolicy struct {
preFilter PreEventFilter
filter EventFilter
}

func (c *cgroupEventPolicy) allowPre(ctx *types.EventContext) error {
if c.preFilter != nil {
return c.preFilter(ctx)
}

return nil
}

func (c *cgroupEventPolicy) allow(event *types.Event) error {
if c.filter != nil {
return c.filter(event)
}

return nil
}
90 changes: 21 additions & 69 deletions pkg/ebpftracer/policy_filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"net/netip"
"time"

"github.com/castai/kvisor/pkg/ebpftracer/decoder"
"github.com/castai/kvisor/pkg/ebpftracer/events"
"github.com/castai/kvisor/pkg/ebpftracer/types"
"github.com/castai/kvisor/pkg/logging"
Expand Down Expand Up @@ -56,22 +57,6 @@ func FilterAnd(filtersGenerators ...EventFilterGenerator) EventFilterGenerator {
}
}

// PreRateLimit creates an pre event filter that limits the amount of events that will be
// processed accoring to the specified limits
func PreRateLimit(spec RateLimitPolicy) PreEventFilterGenerator {
return func() PreEventFilter {
rateLimiter := newRateLimiter(spec)

return func(ctx *types.EventContext) error {
if rateLimiter.Allow() {
return FilterPass
}

return FilterErrRateLimit
}
}
}

func RateLimit(spec RateLimitPolicy) EventFilterGenerator {
return func() EventFilter {
rateLimiter := newRateLimiter(spec)
Expand Down Expand Up @@ -140,44 +125,14 @@ func newRateLimiter(spec RateLimitPolicy) *rate.Limiter {
return rateLimiter
}

// FilterEmptyDnsAnswers will drop any DNS event, that is missing an answer section
func FilterEmptyDnsAnswers(l *logging.Logger) EventFilterGenerator {
return func() EventFilter {
return func(event *types.Event) error {
if event.Context.EventID != events.NetPacketDNSBase {
return FilterPass
}

dnsEventArgs, ok := event.Args.(types.NetPacketDNSBaseArgs)
if !ok {
return FilterPass
}

if dnsEventArgs.Payload == nil {
l.Warn("retreived invalid event for event type dns")
return FilterPass
}

if len(dnsEventArgs.Payload.Answers) == 0 {
return FilterErrEmptyDNSResponse
}

return FilterPass
}
}
}

// more hash function in https://github.com/elastic/go-freelru/blob/main/bench/hash.go
func hashStringXXHASH(s string) uint32 {
return uint32(xxhash.Sum64String(s)) // nolint:gosec
}

// DeduplicateDnsEvents creates a filter that will drop any DNS event with questions already seen in `ttl` time
func DeduplicateDnsEvents(l *logging.Logger, size uint32, ttl time.Duration) EventFilterGenerator {
// DeduplicateDNSEventsPreFilter skips sending dns events which are already in the local per cgroup cache.
func DeduplicateDNSEventsPreFilter(log *logging.Logger, size uint32, ttl time.Duration) PreEventFilterGenerator {
type cacheValue struct{}

return func() EventFilter {
cache, err := freelru.New[string, cacheValue](size, hashStringXXHASH)
return func() PreEventFilter {
cache, err := freelru.New[uint64, cacheValue](size, func(key uint64) uint32 {
return uint32(key) //nolint:gosec
})
// err is only ever returned on configuration issues. There is nothing we can really do here, besides
// panicing and surfacing the error to the user.
if err != nil {
Expand All @@ -186,32 +141,29 @@ func DeduplicateDnsEvents(l *logging.Logger, size uint32, ttl time.Duration) Eve

cache.SetLifetime(ttl)

return func(event *types.Event) error {
if event.Context.EventID != events.NetPacketDNSBase {
return FilterPass
return func(ctx *types.EventContext, dec *decoder.Decoder) (types.Args, error) {
if ctx.EventID != events.NetPacketDNSBase {
return nil, FilterPass
}

dnsEventArgs, ok := event.Args.(types.NetPacketDNSBaseArgs)
if !ok {
return FilterPass
dns, details, err := dec.DecodeDNSAndDetails()
if err != nil {
return nil, err
}

if dnsEventArgs.Payload == nil {
l.Warn("received invalid event for event type dns")
return FilterPass
}

cacheKey := dnsEventArgs.Payload.DNSQuestionDomain
// Cache dns by dns question. Cached records are dropped.
cacheKey := xxhash.Sum64(dns.Questions[0].Name)
if cache.Contains(cacheKey) {
if l.IsEnabled(slog.LevelDebug) {
l.WithField("cachekey", cacheKey).Debug("dropping DNS event")
if log.IsEnabled(slog.LevelDebug) {
log.WithField("cachekey", string(dns.Questions[0].Name)).Debug("dropping DNS event")
}
return FilterErrDNSDuplicateDetected
return nil, FilterErrDNSDuplicateDetected
}

cache.Add(cacheKey, cacheValue{})

return FilterPass
return types.NetPacketDNSBaseArgs{
Payload: decoder.ToProtoDNS(&details, dns),
}, FilterPass
}
}
}
Expand Down
Loading

0 comments on commit c7a57c2

Please sign in to comment.