Skip to content

Commit

Permalink
Skip first collect in delta mode (#112)
Browse files Browse the repository at this point in the history
  • Loading branch information
anjmao authored Aug 20, 2024
1 parent a79879b commit 9bbb75e
Show file tree
Hide file tree
Showing 18 changed files with 764 additions and 342 deletions.
4 changes: 3 additions & 1 deletion cmd/collector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ var (
ciliumClockSource = flag.String("cilium-clock-source", string(conntrack.ClockSourceJiffies), "Kernel clock source used in cilium (jiffies or ktime)")
groupPublicIPs = flag.Bool("group-public-ips", false, "Group public ips destinations as 0.0.0.0")
sendTrafficDelta = flag.Bool("send-traffic-delta", false, "Send traffic delta between reads of conntrack entry. Traffic counter is sent by default")
logEntries = flag.Bool("log-entries", false, "Log raw conntrack entries")
ebpfDNSTracerEnabled = flag.Bool("ebpf-dns-tracer-enabled", true, "Enable DNS tracer using eBPF")
ebpfDNSTracerQueueSize = flag.Int("ebpf-dns-tracer-queue-size", 1000, "Size of the queue for DNS tracer")
// Kubernetes requires container to run in privileged mode if Bidirectional mount is used.
Expand Down Expand Up @@ -107,7 +108,7 @@ func run(log logrus.FieldLogger) error {
options.FieldSelector = "spec.nodeName=" + os.Getenv("NODE_NAME")
}))
podsInformer := informersFactory.Core().V1().Pods().Informer()
podsByNodeCache := kube.NewPodsByNodeCache(podsInformer)
podsByNodeCache := kube.NewRunningPodsCache(podsInformer)
informersFactory.Start(wait.NeverStop)
informersFactory.WaitForCacheSync(wait.NeverStop)

Expand Down Expand Up @@ -139,6 +140,7 @@ func run(log logrus.FieldLogger) error {
ExcludeNamespaces: *excludeNamespaces,
GroupPublicIPs: *groupPublicIPs,
SendTrafficDelta: *sendTrafficDelta,
LogEntries: *logEntries,
}
coll := collector.New(
cfg,
Expand Down
10 changes: 2 additions & 8 deletions cmd/exporter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,15 +100,14 @@ func run(log logrus.FieldLogger) error {
informersFactory := informers.NewSharedInformerFactoryWithOptions(clientset, 30*time.Second)
podsInformer := informersFactory.Core().V1().Pods().Informer()
nodesInformer := informersFactory.Core().V1().Nodes().Informer()
podsByNodeCache := kube.NewPodsByNodeCache(podsInformer)
podByIPCache := kube.NewPodByIPCache(ctx, podsInformer, log)
podByIPCache := kube.NewPodByIPCache(podsInformer, log)
go podByIPCache.Run(ctx)
nodeByNameCache := kube.NewNodeByNameCache(nodesInformer)
nodeByIPCache := kube.NewNodeByIPCache(nodesInformer)
informersFactory.Start(wait.NeverStop)
informersFactory.WaitForCacheSync(wait.NeverStop)

kw := &kubeWatcher{
podsByNode: podsByNodeCache,
podByIP: podByIPCache,
nodeByName: nodeByNameCache,
nodeByIP: nodeByIPCache,
Expand Down Expand Up @@ -186,16 +185,11 @@ func healthHandler(w http.ResponseWriter, req *http.Request) {
}

type kubeWatcher struct {
podsByNode *kube.PodsByNodeCache
podByIP *kube.PodByIPCache
nodeByName *kube.NodeByNameCache
nodeByIP *kube.NodeByIPCache
}

func (k *kubeWatcher) GetPodsByNode(nodeName string) ([]*v1.Pod, error) {
return k.podsByNode.Get(nodeName)
}

func (k *kubeWatcher) GetPodByIP(ip string) (*v1.Pod, error) {
return k.podByIP.Get(ip)
}
Expand Down
58 changes: 46 additions & 12 deletions collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,11 @@ type Config struct {
// SendTrafficDelta used to determines if traffic should be sent as delta of 2 consecutive conntrack entries
// or as the constantly growing counter value
SendTrafficDelta bool
LogEntries bool
}

type podsWatcher interface {
Get(nodeName string) ([]*corev1.Pod, error)
Get() ([]*corev1.Pod, error)
}

type rawNetworkMetric struct {
Expand Down Expand Up @@ -109,6 +110,8 @@ type Collector struct {
currentTimeGetter func() time.Time
exporterClient *http.Client
mu sync.Mutex

firstCollectDone bool
}

func (c *Collector) Start(ctx context.Context) error {
Expand Down Expand Up @@ -219,9 +222,6 @@ func (c *Collector) collect() error {
defer c.mu.Unlock()

for _, conn := range conns {
if c.cfg.GroupPublicIPs && !conn.Dst.IP().IsPrivate() {
conn.Dst = netaddr.IPPortFrom(netaddr.IPv4(0, 0, 0, 0), 0)
}
connKey := conntrackEntryKey(conn)
txBytes := conn.TxBytes
txPackets := conn.TxPackets
Expand All @@ -238,6 +238,27 @@ func (c *Collector) collect() error {
}
c.entriesCache[connKey] = conn

if c.cfg.LogEntries && (rxBytes > 0 || txBytes > 0) {
c.log.WithFields(map[string]any{
"src_ip": conn.Src.IP().String(),
"src_port": conn.Src.Port(),
"dst_ip": conn.Dst.IP().String(),
"dst_port": conn.Dst.Port(),
"tx_bytes": txBytes,
"rx_bytes": rxBytes,
"proto": conn.Proto,
}).Debug("ct")
}

// In delta mode we need to have initial conntrack connections so next collect can calculate only new deltas.
if c.cfg.SendTrafficDelta && !c.firstCollectDone {
continue
}

if c.cfg.GroupPublicIPs && !isPrivateNetwork(conn.Dst.IP()) {
conn.Dst = netaddr.IPPortFrom(netaddr.IPv4(0, 0, 0, 0), 0)
}

groupKey := entryGroupKey(conn)
if pm, found := c.podMetrics[groupKey]; found {
pm.TxBytes += int64(txBytes)
Expand All @@ -250,19 +271,23 @@ func (c *Collector) collect() error {
} else {
c.podMetrics[groupKey] = &rawNetworkMetric{
RawNetworkMetric: &pb.RawNetworkMetric{
SrcIp: dns.ToIPint32(conn.Src.IP()),
DstIp: dns.ToIPint32(conn.Dst.IP()),
TxBytes: int64(conn.TxBytes),
TxPackets: int64(conn.TxPackets),
RxBytes: int64(conn.RxBytes),
RxPackets: int64(conn.RxPackets),
Proto: int32(conn.Proto),
SrcIp: dns.ToIPint32(conn.Src.IP()),
DstIp: dns.ToIPint32(conn.Dst.IP()),
TxBytes: int64(txBytes),
TxPackets: int64(txPackets),
RxBytes: int64(rxBytes),
RxPackets: int64(rxPackets),
Proto: int32(conn.Proto),
},
lifetime: conn.Lifetime,
}
}
}

if !c.firstCollectDone {
c.firstCollectDone = true
}

c.log.Debugf("collection done in %s, pods=%d, conntrack=%d, conntrack_cache=%d", time.Since(start), len(pods), len(conns), len(c.entriesCache))
return nil
}
Expand Down Expand Up @@ -294,7 +319,7 @@ func (c *Collector) cleanup() {
}

func (c *Collector) getNodePods() ([]*corev1.Pod, error) {
pods, err := c.podsWatcher.Get(c.cfg.NodeName)
pods, err := c.podsWatcher.Get()
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -359,3 +384,12 @@ func conntrackEntryKey(conn *conntrack.Entry) uint64 {
conntrackEntryHash.Reset()
return res
}

func isPrivateNetwork(ip netaddr.IP) bool {
return ip.IsPrivate() ||
ip.IsLoopback() ||
ip.IsMulticast() ||
ip.IsLinkLocalUnicast() ||
ip.IsLinkLocalMulticast() ||
ip.IsInterfaceLocalMulticast()
}
Loading

0 comments on commit 9bbb75e

Please sign in to comment.