Skip to content

Commit

Permalink
[NETPATH-366] Enrich dynamic network path with NPM domain cache
Browse files Browse the repository at this point in the history
  • Loading branch information
pimlu committed Dec 2, 2024
1 parent be4b703 commit 0bc99fd
Show file tree
Hide file tree
Showing 6 changed files with 120 additions and 41 deletions.
2 changes: 1 addition & 1 deletion comp/networkpath/npcollector/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ import model "github.com/DataDog/agent-payload/v5/process"

// Component is the component type.
type Component interface {
ScheduleConns(conns []*model.Connection)
ScheduleConns(conns []*model.Connection, dns map[string]*model.DNSEntry)
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,21 @@ import (
"github.com/DataDog/datadog-agent/pkg/networkpath/payload"
)

// PathtestMetadata contains metadata used to annotate the result of a traceroute.
// This data is not used by the traceroute itself.
type PathtestMetadata struct {
// ReverseDNSHostname is an optional hostname which will be used in place of rDNS querying for
// the destination address.
ReverseDNSHostname string
}

// Pathtest details of information necessary to run a traceroute (pathtrace)
type Pathtest struct {
Hostname string
Port uint16
Protocol payload.Protocol
SourceContainerID string
Metadata PathtestMetadata
}

// GetHash returns the hash of the Pathtest
Expand Down
88 changes: 57 additions & 31 deletions comp/networkpath/npcollector/npcollectorimpl/npcollector.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,26 +127,49 @@ func newNpCollectorImpl(epForwarder eventplatform.Forwarder, collectorConfigs *c
}
}

func (s *npCollectorImpl) ScheduleConns(conns []*model.Connection) {
// makePathtest extracts pathtest information using a single connection and the connection check's reverse dns map
func makePathtest(conn *model.Connection, dns map[string]*model.DNSEntry) common.Pathtest {
protocol := convertProtocol(conn.GetType())

rDNSEntry := dns[conn.Raddr.GetIp()]
reverseDNSHostname := ""
if rDNSEntry != nil && len(rDNSEntry.Names) > 0 {
reverseDNSHostname = rDNSEntry.Names[0]
}

var remotePort uint16
// UDP traces should not be done to the active port
if protocol != payload.ProtocolUDP {
remotePort = uint16(conn.Raddr.GetPort())
}

sourceContainer := conn.Laddr.GetContainerId()

return common.Pathtest{
Hostname: conn.Raddr.GetIp(),
Port: remotePort,
Protocol: protocol,
SourceContainerID: sourceContainer,
Metadata: common.PathtestMetadata{
ReverseDNSHostname: reverseDNSHostname,
},
}
}

func (s *npCollectorImpl) ScheduleConns(conns []*model.Connection, dns map[string]*model.DNSEntry) {
if !s.collectorConfigs.connectionsMonitoringEnabled {
return
}
startTime := s.TimeNowFn()
for _, conn := range conns {
remoteAddr := conn.Raddr
protocol := convertProtocol(conn.GetType())
var remotePort uint16
// UDP traces should not be done to the active
// port
if protocol != payload.ProtocolUDP {
remotePort = uint16(conn.Raddr.GetPort())
}
if !shouldScheduleNetworkPathForConn(conn) {
s.logger.Tracef("Skipped connection: addr=%s, port=%d, protocol=%s", remoteAddr, remotePort, protocol)
protocol := convertProtocol(conn.GetType())
s.logger.Tracef("Skipped connection: addr=%s, protocol=%s", conn.Raddr, protocol)
continue
}
sourceContainer := conn.Laddr.GetContainerId()
err := s.scheduleOne(remoteAddr.GetIp(), remotePort, protocol, sourceContainer)
pathtest := makePathtest(conn, dns)

err := s.scheduleOne(&pathtest)
if err != nil {
s.logger.Errorf("Error scheduling pathtests: %s", err)
}
Expand All @@ -158,20 +181,14 @@ func (s *npCollectorImpl) ScheduleConns(conns []*model.Connection) {

// scheduleOne schedules pathtests.
// It shouldn't block, if the input channel is full, an error is returned.
func (s *npCollectorImpl) scheduleOne(hostname string, port uint16, protocol payload.Protocol, sourceContainerID string) error {
func (s *npCollectorImpl) scheduleOne(pathtest *common.Pathtest) error {
if s.pathtestInputChan == nil {
return errors.New("no input channel, please check that network path is enabled")
}
s.logger.Debugf("Schedule traceroute for: hostname=%s port=%d", hostname, port)
s.logger.Debugf("Schedule traceroute for: hostname=%s port=%d", pathtest.Hostname, pathtest.Port)

ptest := &common.Pathtest{
Hostname: hostname,
Port: port,
Protocol: protocol,
SourceContainerID: sourceContainerID,
}
select {
case s.pathtestInputChan <- ptest:
case s.pathtestInputChan <- pathtest:
return nil
default:
return fmt.Errorf("collector input channel is full (channel capacity is %d)", cap(s.pathtestInputChan))
Expand Down Expand Up @@ -246,7 +263,7 @@ func (s *npCollectorImpl) runTracerouteForPath(ptest *pathteststore.PathtestCont
path.Origin = payload.PathOriginNetworkTraffic

// Perform reverse DNS lookup on destination and hop IPs
s.enrichPathWithRDNS(&path)
s.enrichPathWithRDNS(&path, ptest.Pathtest.Metadata.ReverseDNSHostname)

s.sendTelemetry(path, startTime, ptest)

Expand Down Expand Up @@ -326,14 +343,19 @@ func (s *npCollectorImpl) flush() {
}
}

func (s *npCollectorImpl) enrichPathWithRDNS(path *payload.NetworkPath) {
// enrichPathWithRDNS populates a NetworkPath with reverse-DNS queried hostnames.
func (s *npCollectorImpl) enrichPathWithRDNS(path *payload.NetworkPath, knownDestHostname string) {
if !s.collectorConfigs.reverseDNSEnabled {
return
}

// collect unique IP addresses from destination and hops
ipSet := make(map[string]struct{}, len(path.Hops)+1) // +1 for destination
ipSet[path.Destination.IPAddress] = struct{}{}

// only look up the destination hostname if we need to
if knownDestHostname == "" {
ipSet[path.Destination.IPAddress] = struct{}{}
}
for _, hop := range path.Hops {
if !hop.Reachable {
continue
Expand All @@ -356,13 +378,17 @@ func (s *npCollectorImpl) enrichPathWithRDNS(path *payload.NetworkPath) {
}

// assign resolved hostnames to destination and hops
hostname := s.getReverseDNSResult(path.Destination.IPAddress, results)
// if hostname is blank, use what's given by traceroute
// TODO: would it be better to move the logic up from the traceroute command?
// benefit to the current approach is having consistent behavior for all paths
// both static and dynamic
if hostname != "" {
path.Destination.ReverseDNSHostname = hostname
if knownDestHostname != "" {
path.Destination.ReverseDNSHostname = knownDestHostname
} else {
hostname := s.getReverseDNSResult(path.Destination.IPAddress, results)
// if hostname is blank, use what's given by traceroute
// TODO: would it be better to move the logic up from the traceroute command?
// benefit to the current approach is having consistent behavior for all paths
// both static and dynamic
if hostname != "" {
path.Destination.ReverseDNSHostname = hostname
}
}

for i, hop := range path.Hops {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func MockModule() fxutil.Module {

type npCollectorMock struct{}

func (s *npCollectorMock) ScheduleConns(_ []*model.Connection) {
func (s *npCollectorMock) ScheduleConns(_ []*model.Connection, _ map[string]*model.DNSEntry) {
panic("implement me")
}

Expand Down
58 changes: 51 additions & 7 deletions comp/networkpath/npcollector/npcollectorimpl/npcollector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ func Test_NpCollector_runningAndProcessing(t *testing.T) {
Type: model.ConnectionType_udp,
},
}
npCollector.ScheduleConns(conns)
npCollector.ScheduleConns(conns, make(map[string]*model.DNSEntry))

waitForProcessedPathtests(npCollector, 5*time.Second, 2)

Expand Down Expand Up @@ -281,7 +281,7 @@ func Test_NpCollector_ScheduleConns_ScheduleDurationMetric(t *testing.T) {
}

// WHEN
npCollector.ScheduleConns(conns)
npCollector.ScheduleConns(conns, make(map[string]*model.DNSEntry))

// THEN
calls := stats.GaugeCalls
Expand Down Expand Up @@ -340,6 +340,7 @@ func Test_npCollectorImpl_ScheduleConns(t *testing.T) {
tests := []struct {
name string
conns []*model.Connection
dns map[string]*model.DNSEntry
noInputChan bool
agentConfigs map[string]any
expectedPathtests []*common.Pathtest
Expand Down Expand Up @@ -480,6 +481,25 @@ func Test_npCollectorImpl_ScheduleConns(t *testing.T) {
},
expectedLogs: []logCount{},
},
{
name: "one outgoing TCP conn with known hostname (DNS)",
agentConfigs: defaultagentConfigs,
conns: []*model.Connection{
{
Laddr: &model.Addr{Ip: "10.0.0.3", Port: int32(30000), ContainerId: "testId1"},
Raddr: &model.Addr{Ip: "10.0.0.4", Port: int32(80)},
Direction: model.ConnectionDirection_outgoing,
Type: model.ConnectionType_tcp,
},
},
expectedPathtests: []*common.Pathtest{
{Hostname: "10.0.0.4", Port: uint16(80), Protocol: payload.ProtocolTCP, SourceContainerID: "testId1",
Metadata: common.PathtestMetadata{ReverseDNSHostname: "known-hostname"}},
},
dns: map[string]*model.DNSEntry{
"10.0.0.4": {Names: []string{"known-hostname"}},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand All @@ -497,7 +517,7 @@ func Test_npCollectorImpl_ScheduleConns(t *testing.T) {
stats := &teststatsd.Client{}
npCollector.statsdClient = stats

npCollector.ScheduleConns(tt.conns)
npCollector.ScheduleConns(tt.conns, tt.dns)

actualPathtests := []*common.Pathtest{}
for i := 0; i < len(tt.expectedPathtests); i++ {
Expand Down Expand Up @@ -758,7 +778,7 @@ func Benchmark_npCollectorImpl_ScheduleConns(b *testing.B) {
for i := 0; i < b.N; i++ {
// add line to avoid linter error
_ = i
npCollector.ScheduleConns(connections)
npCollector.ScheduleConns(connections, make(map[string]*model.DNSEntry))

waitForProcessedPathtests(npCollector, 60*time.Second, 50)
}
Expand Down Expand Up @@ -791,7 +811,7 @@ func Test_npCollectorImpl_enrichPathWithRDNS(t *testing.T) {
},
}

npCollector.enrichPathWithRDNS(&path)
npCollector.enrichPathWithRDNS(&path, "")

// THEN
assert.Equal(t, "hostname-10.0.0.41", path.Destination.ReverseDNSHostname) // private IP should be resolved
Expand All @@ -812,7 +832,7 @@ func Test_npCollectorImpl_enrichPathWithRDNS(t *testing.T) {
},
}

npCollector.enrichPathWithRDNS(&path)
npCollector.enrichPathWithRDNS(&path, "")

// THEN
assert.Equal(t, "", path.Destination.ReverseDNSHostname)
Expand Down Expand Up @@ -843,7 +863,7 @@ func Test_npCollectorImpl_enrichPathWithRDNS(t *testing.T) {
},
}

npCollector.enrichPathWithRDNS(&path)
npCollector.enrichPathWithRDNS(&path, "")

// THEN - no resolution should happen
assert.Equal(t, "", path.Destination.ReverseDNSHostname)
Expand All @@ -853,6 +873,30 @@ func Test_npCollectorImpl_enrichPathWithRDNS(t *testing.T) {
assert.Equal(t, "dest-hostname", path.Hops[3].Hostname)
}

func Test_npCollectorImpl_enrichPathWithRDNSKnownHostName(t *testing.T) {
// GIVEN
agentConfigs := map[string]any{
"network_path.connections_monitoring.enabled": true,
}
_, npCollector := newTestNpCollector(t, agentConfigs)

stats := &teststatsd.Client{}
npCollector.statsdClient = stats
npCollector.metricSender = metricsender.NewMetricSenderStatsd(stats)

// WHEN
path := payload.NetworkPath{
Destination: payload.NetworkPathDestination{IPAddress: "10.0.0.41", Hostname: "dest-hostname"},
Hops: nil,
}

npCollector.enrichPathWithRDNS(&path, "known-dest-hostname")

// THEN - destination hostname should resolve to known hostname
assert.Equal(t, "known-dest-hostname", path.Destination.ReverseDNSHostname)
assert.Empty(t, path.Hops)
}

func Test_npCollectorImpl_getReverseDNSResult(t *testing.T) {
// GIVEN
agentConfigs := map[string]any{
Expand Down
2 changes: 1 addition & 1 deletion pkg/process/checks/net.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func (c *ConnectionsCheck) Run(nextGroupID func() int32, _ *RunOptions) (RunResu

log.Debugf("collected connections in %s", time.Since(start))

c.npCollector.ScheduleConns(conns.Conns)
c.npCollector.ScheduleConns(conns.Conns, conns.Dns)

groupID := nextGroupID()
messages := batchConnections(c.hostInfo, c.maxConnsPerMessage, groupID, conns.Conns, conns.Dns, c.networkID, conns.ConnTelemetryMap, conns.CompilationTelemetryByAsset, conns.KernelHeaderFetchResult, conns.CORETelemetryByAsset, conns.PrebuiltEBPFAssets, conns.Domains, conns.Routes, conns.Tags, conns.AgentConfiguration, c.serviceExtractor)
Expand Down

0 comments on commit 0bc99fd

Please sign in to comment.