Skip to content

Commit

Permalink
WIP: Support multiple flow filter rules using json fmt string
Browse files Browse the repository at this point in the history
Signed-off-by: Mohamed Mahmoud <[email protected]>
  • Loading branch information
msherif1234 committed Dec 5, 2024
1 parent cbe3140 commit d95862c
Show file tree
Hide file tree
Showing 10 changed files with 123 additions and 91 deletions.
2 changes: 1 addition & 1 deletion bpf/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ typedef __u64 u64;
#define DSCP_MASK 0x3F
#define MIN_RTT 10000u //10us

#define MAX_FILTER_ENTRIES 1 // we have only one global filter
#define MAX_FILTER_ENTRIES 16
#define MAX_EVENT_MD 8
#define MAX_NETWORK_EVENTS 4

Expand Down
35 changes: 23 additions & 12 deletions pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package agent

import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
Expand Down Expand Up @@ -199,7 +200,28 @@ func FlowsAgent(cfg *Config) (*Flows, error) {
if cfg.LogLevel == logrus.TraceLevel.String() || cfg.LogLevel == logrus.DebugLevel.String() {
debug = true
}
filterRules := make([]*tracer.FilterConfig, 0)
if cfg.EnableFlowFilter {
var flowFilters []*FlowFilter
if err := json.Unmarshal([]byte(cfg.FlowFilterRules), &flowFilters); err != nil {
return nil, err
}

for _, r := range flowFilters {
filterRules = append(filterRules, &tracer.FilterConfig{
FilterAction: r.FilterAction,
FilterDirection: r.FilterDirection,
FilterIPCIDR: r.FilterIPCIDR,
FilterProtocol: r.FilterProtocol,
FilterPeerIP: r.FilterPeerIP,
FilterDestinationPort: tracer.ConvertFilterPortsToInstr(r.FilterDestinationPort, r.FilterDestinationPortRange, r.FilterDestinationPorts),
FilterSourcePort: tracer.ConvertFilterPortsToInstr(r.FilterSourcePort, r.FilterSourcePortRange, r.FilterSourcePorts),
FilterPort: tracer.ConvertFilterPortsToInstr(r.FilterPort, r.FilterPortRange, r.FilterPorts),
FilterTCPFLags: r.FilterTCPFlags,
FilterDrops: r.FilterDrops,
})
}
}
ebpfConfig := &tracer.FlowFetcherConfig{
EnableIngress: ingress,
EnableEgress: egress,
Expand All @@ -213,18 +235,7 @@ func FlowsAgent(cfg *Config) (*Flows, error) {
EnableNetworkEventsMonitoring: cfg.EnableNetworkEventsMonitoring,
NetworkEventsMonitoringGroupID: cfg.NetworkEventsMonitoringGroupID,
EnableFlowFilter: cfg.EnableFlowFilter,
FilterConfig: &tracer.FilterConfig{
FilterAction: cfg.FilterAction,
FilterDirection: cfg.FilterDirection,
FilterIPCIDR: cfg.FilterIPCIDR,
FilterProtocol: cfg.FilterProtocol,
FilterPeerIP: cfg.FilterPeerIP,
FilterDestinationPort: tracer.ConvertFilterPortsToInstr(cfg.FilterDestinationPort, cfg.FilterDestinationPortRange, cfg.FilterDestinationPorts),
FilterSourcePort: tracer.ConvertFilterPortsToInstr(cfg.FilterSourcePort, cfg.FilterSourcePortRange, cfg.FilterSourcePorts),
FilterPort: tracer.ConvertFilterPortsToInstr(cfg.FilterPort, cfg.FilterPortRange, cfg.FilterPorts),
FilterTCPFLags: cfg.FilterTCPFlags,
FilterDrops: cfg.FilterDrops,
},
FilterConfig: filterRules,
}

fetcher, err := tracer.NewFlowFetcher(ebpfConfig)
Expand Down
101 changes: 53 additions & 48 deletions pkg/agent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,57 @@ const (
IPIfaceNamedPrefix = "name:"
)

type FlowFilter struct {
// FilterDirection is the direction of the flow filter.
// Possible values are "Ingress" or "Egress".
FilterDirection string `json:"direction,omitempty"`
// FilterIPCIDR is the IP CIDR to filter flows.
// Example: 10.10.10.0/24 or 100:100:100:100::/64, default is 0.0.0.0/0
FilterIPCIDR string `json:"ip_cidr,omitempty"`
// FilterProtocol is the protocol to filter flows.
// Example: TCP, UDP, SCTP, ICMP, ICMPv6
FilterProtocol string `json:"protocol,omitempty"`
// FilterSourcePort is the source port to filter flows.
FilterSourcePort int32 `json:"source_port,omitempty"`
// FilterDestinationPort is the destination port to filter flows.
FilterDestinationPort int32 `json:"destination_port,omitempty"`
// FilterPort is the port to filter flows, can be use for either source or destination port.
FilterPort int32 `json:"port,omitempty"`
// FilterSourcePortRange is the source port range to filter flows.
// Example: 8000-8010
FilterSourcePortRange string `json:"source_port_range,omitempty"`
// FilterSourcePorts is two source ports to filter flows.
// Example: 8000,8010
FilterSourcePorts string `json:"source_ports,omitempty"`
// FilterDestinationPortRange is the destination port range to filter flows.
// Example: 8000-8010
FilterDestinationPortRange string `json:"destination_port_range,omitempty"`
// FilterDestinationPorts is two destination ports to filter flows.
// Example: 8000,8010
FilterDestinationPorts string `json:"destination_ports,omitempty"`
// FilterPortRange is the port range to filter flows, can be used for either source or destination port.
// Example: 8000-8010
FilterPortRange string `json:"port_range,omitempty"`
// FilterPorts is two ports option to filter flows, can be used for either source or destination port.
// Example: 8000,8010
FilterPorts string `json:"ports,omitempty"`
// FilterICMPType is the ICMP type to filter flows.
FilterICMPType int `json:"icmp_type,omitempty"`
// FilterICMPCode is the ICMP code to filter flows.
FilterICMPCode int `json:"icmp_code,omitempty"`
// FilterPeerIP is the IP to filter flows.
// Example: 10.10.10.10
FilterPeerIP string `json:"peer_ip,omitempty"`
// FilterAction is the action to filter flows.
// Possible values are "Accept" or "Reject".
FilterAction string `json:"action,omitempty"`
// FilterTCPFlags is the TCP flags to filter flows.
// possible values are: SYN, SYN-ACK, ACK, FIN, RST, PSH, URG, ECE, CWR, FIN-ACK, RST-ACK
FilterTCPFlags string `json:"tcp_flags,omitempty"`
// FilterDrops allow filtering flows with packet drops, default is false.
FilterDrops bool `json:"drops,omitempty"`
}

type Config struct {
// AgentIP allows overriding the reported Agent IP address on each flow.
AgentIP string `env:"AGENT_IP"`
Expand Down Expand Up @@ -183,54 +234,8 @@ type Config struct {
MetricsPrefix string `env:"METRICS_PREFIX" envDefault:"ebpf_agent_"`
// EnableFlowFilter enables flow filter, default is false.
EnableFlowFilter bool `env:"ENABLE_FLOW_FILTER" envDefault:"false"`
// FilterDirection is the direction of the flow filter.
// Possible values are "Ingress" or "Egress".
FilterDirection string `env:"FILTER_DIRECTION"`
// FilterIPCIDR is the IP CIDR to filter flows.
// Example: 10.10.10.0/24 or 100:100:100:100::/64
FilterIPCIDR string `env:"FILTER_IP_CIDR" envDefault:"0.0.0.0/0"`
// FilterProtocol is the protocol to filter flows.
// Example: TCP, UDP, SCTP, ICMP, ICMPv6
FilterProtocol string `env:"FILTER_PROTOCOL"`
// FilterSourcePort is the source port to filter flows.
FilterSourcePort int32 `env:"FILTER_SOURCE_PORT"`
// FilterDestinationPort is the destination port to filter flows.
FilterDestinationPort int32 `env:"FILTER_DESTINATION_PORT"`
// FilterPort is the port to filter flows, can be use for either source or destination port.
FilterPort int32 `env:"FILTER_PORT"`
// FilterSourcePortRange is the source port range to filter flows.
// Example: 8000-8010
FilterSourcePortRange string `env:"FILTER_SOURCE_PORT_RANGE"`
// FilterSourcePorts is two source ports to filter flows.
// Example: 8000,8010
FilterSourcePorts string `env:"FILTER_SOURCE_PORTS"`
// FilterDestinationPortRange is the destination port range to filter flows.
// Example: 8000-8010
FilterDestinationPortRange string `env:"FILTER_DESTINATION_PORT_RANGE"`
// FilterDestinationPorts is two destination ports to filter flows.
// Example: 8000,8010
FilterDestinationPorts string `env:"FILTER_DESTINATION_PORTS"`
// FilterPortRange is the port range to filter flows, can be used for either source or destination port.
// Example: 8000-8010
FilterPortRange string `env:"FILTER_PORT_RANGE"`
// FilterPorts is two ports option to filter flows, can be used for either source or destination port.
// Example: 8000,8010
FilterPorts string `env:"FILTER_PORTS"`
// FilterICMPType is the ICMP type to filter flows.
FilterICMPType int `env:"FILTER_ICMP_TYPE"`
// FilterICMPCode is the ICMP code to filter flows.
FilterICMPCode int `env:"FILTER_ICMP_CODE"`
// FilterPeerIP is the IP to filter flows.
// Example: 10.10.10.10
FilterPeerIP string `env:"FILTER_PEER_IP"`
// FilterAction is the action to filter flows.
// Possible values are "Accept" or "Reject".
FilterAction string `env:"FILTER_ACTION" envDefault:"Accept"`
// FilterTCPFlags is the TCP flags to filter flows.
// possible values are: SYN, SYN-ACK, ACK, FIN, RST, PSH, URG, ECE, CWR, FIN-ACK, RST-ACK
FilterTCPFlags string `env:"FILTER_TCP_FLAGS"`
// FilterDrops allow filtering flows with packet drops, default is false.
FilterDrops bool `env:"FILTER_DROPS" envDefault:"false"`
// FlowFilterRules list of flow filter rules
FlowFilterRules string `env:"FLOW_FILTER_RULES"`
// EnableNetworkEventsMonitoring enables monitoring network plugin events, default is false.
EnableNetworkEventsMonitoring bool `env:"ENABLE_NETWORK_EVENTS_MONITORING" envDefault:"false"`
// NetworkEventsMonitoringGroupID to allow ebpf hook to process samples for specific groupID and ignore the rest
Expand Down
36 changes: 24 additions & 12 deletions pkg/agent/packets_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package agent

import (
"context"
"encoding/json"
"fmt"
"io"
"net"
Expand Down Expand Up @@ -76,25 +77,36 @@ func PacketsAgent(cfg *Config) (*Packets, error) {
if cfg.LogLevel == logrus.TraceLevel.String() || cfg.LogLevel == logrus.DebugLevel.String() {
debug = true
}
filterRules := make([]*tracer.FilterConfig, 0)
if cfg.EnableFlowFilter {
var flowFilters []*FlowFilter
if err := json.Unmarshal([]byte(cfg.FlowFilterRules), &flowFilters); err != nil {
return nil, err
}

for _, r := range flowFilters {
filterRules = append(filterRules, &tracer.FilterConfig{
FilterAction: r.FilterAction,
FilterDirection: r.FilterDirection,
FilterIPCIDR: r.FilterIPCIDR,
FilterProtocol: r.FilterProtocol,
FilterPeerIP: r.FilterPeerIP,
FilterDestinationPort: tracer.ConvertFilterPortsToInstr(r.FilterDestinationPort, r.FilterDestinationPortRange, r.FilterDestinationPorts),
FilterSourcePort: tracer.ConvertFilterPortsToInstr(r.FilterSourcePort, r.FilterSourcePortRange, r.FilterSourcePorts),
FilterPort: tracer.ConvertFilterPortsToInstr(r.FilterPort, r.FilterPortRange, r.FilterPorts),
FilterTCPFLags: r.FilterTCPFlags,
FilterDrops: r.FilterDrops,
})
}
}
ebpfConfig := &tracer.FlowFetcherConfig{
EnableIngress: ingress,
EnableEgress: egress,
Debug: debug,
Sampling: cfg.Sampling,
CacheMaxSize: cfg.CacheMaxFlows,
EnablePCA: cfg.EnablePCA,
FilterConfig: &tracer.FilterConfig{
FilterAction: cfg.FilterAction,
FilterDirection: cfg.FilterDirection,
FilterIPCIDR: cfg.FilterIPCIDR,
FilterProtocol: cfg.FilterProtocol,
FilterPeerIP: cfg.FilterPeerIP,
FilterDestinationPort: tracer.ConvertFilterPortsToInstr(cfg.FilterDestinationPort, cfg.FilterDestinationPortRange, cfg.FilterDestinationPorts),
FilterSourcePort: tracer.ConvertFilterPortsToInstr(cfg.FilterSourcePort, cfg.FilterSourcePortRange, cfg.FilterSourcePorts),
FilterPort: tracer.ConvertFilterPortsToInstr(cfg.FilterPort, cfg.FilterPortRange, cfg.FilterPorts),
FilterTCPFLags: cfg.FilterTCPFlags,
FilterDrops: cfg.FilterDrops,
},
FilterConfig: filterRules,
}

fetcher, err := tracer.NewPacketFetcher(ebpfConfig)
Expand Down
Binary file modified pkg/ebpf/bpf_arm64_bpfel.o
Binary file not shown.
Binary file modified pkg/ebpf/bpf_powerpc_bpfel.o
Binary file not shown.
Binary file modified pkg/ebpf/bpf_s390_bpfeb.o
Binary file not shown.
Binary file modified pkg/ebpf/bpf_x86_bpfel.o
Binary file not shown.
38 changes: 21 additions & 17 deletions pkg/tracer/flow_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,41 +30,45 @@ type FilterConfig struct {
type Filter struct {
// eBPF objs to create/update eBPF maps
objects *ebpf.BpfObjects
config *FilterConfig
config []*FilterConfig
}

func NewFilter(objects *ebpf.BpfObjects, cfg *FilterConfig) *Filter {
func NewFilter(objects *ebpf.BpfObjects, cfg []*FilterConfig) *Filter {
return &Filter{
objects: objects,
config: cfg,
}
}

func (f *Filter) ProgramFilter() error {
log.Infof("Flow filter config: %v", f.config)
key, err := f.getFilterKey(f.config)
if err != nil {
return fmt.Errorf("failed to get filter key: %w", err)
}

val, err := f.getFilterValue(f.config)
if err != nil {
return fmt.Errorf("failed to get filter value: %w", err)
}
for _, config := range f.config {
log.Infof("Flow filter config: %v", f.config)
key, err := f.getFilterKey(config)
if err != nil {
return fmt.Errorf("failed to get filter key: %w", err)
}

err = f.objects.FilterMap.Update(key, val, cilium.UpdateAny)
if err != nil {
return fmt.Errorf("failed to update filter map: %w", err)
}
val, err := f.getFilterValue(config)
if err != nil {
return fmt.Errorf("failed to get filter value: %w", err)
}

log.Infof("Programmed filter with key: %v, value: %v", key, val)
err = f.objects.FilterMap.Update(key, val, cilium.UpdateAny)
if err != nil {
return fmt.Errorf("failed to update filter map: %w", err)
}

log.Infof("Programmed filter with key: %v, value: %v", key, val)
}
return nil
}

func (f *Filter) getFilterKey(config *FilterConfig) (ebpf.BpfFilterKeyT, error) {
key := ebpf.BpfFilterKeyT{}

if config.FilterIPCIDR == "" {
config.FilterIPCIDR = "0.0.0.0/0"
}
ip, ipNet, err := net.ParseCIDR(config.FilterIPCIDR)
if err != nil {
return key, fmt.Errorf("failed to parse FlowFilterIPCIDR: %w", err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/tracer/tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ type FlowFetcherConfig struct {
NetworkEventsMonitoringGroupID int
EnableFlowFilter bool
EnablePCA bool
FilterConfig *FilterConfig
FilterConfig []*FilterConfig
}

// nolint:golint,cyclop
Expand Down

0 comments on commit d95862c

Please sign in to comment.