Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[NETPATH-366] Enrich dynamic network path with NPM domain cache #31685

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion comp/networkpath/npcollector/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ import model "github.com/DataDog/agent-payload/v5/process"

// Component is the component type.
type Component interface {
ScheduleConns(conns []*model.Connection)
ScheduleConns(conns []*model.Connection, dns map[string]*model.DNSEntry)
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,21 @@ import (
"github.com/DataDog/datadog-agent/pkg/networkpath/payload"
)

// PathtestMetadata contains metadata used to annotate the result of a traceroute.
// This data is not used by the traceroute itself.
type PathtestMetadata struct {
// ReverseDNSHostname is an optional hostname which will be used in place of rDNS querying for
// the destination address.
ReverseDNSHostname string
}

// Pathtest details of information necessary to run a traceroute (pathtrace)
type Pathtest struct {
Hostname string
Port uint16
Protocol payload.Protocol
SourceContainerID string
Metadata PathtestMetadata
}

// GetHash returns the hash of the Pathtest
Expand Down
88 changes: 57 additions & 31 deletions comp/networkpath/npcollector/npcollectorimpl/npcollector.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,26 +127,49 @@ func newNpCollectorImpl(epForwarder eventplatform.Forwarder, collectorConfigs *c
}
}

func (s *npCollectorImpl) ScheduleConns(conns []*model.Connection) {
// makePathtest extracts pathtest information using a single connection and the connection check's reverse dns map
func makePathtest(conn *model.Connection, dns map[string]*model.DNSEntry) common.Pathtest {
protocol := convertProtocol(conn.GetType())

rDNSEntry := dns[conn.Raddr.GetIp()]
reverseDNSHostname := ""
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

var reverseDNSHostname string

if rDNSEntry != nil && len(rDNSEntry.Names) > 0 {
reverseDNSHostname = rDNSEntry.Names[0]
}

var remotePort uint16
// UDP traces should not be done to the active port
if protocol != payload.ProtocolUDP {
remotePort = uint16(conn.Raddr.GetPort())
}

sourceContainer := conn.Laddr.GetContainerId()

return common.Pathtest{
Hostname: conn.Raddr.GetIp(),
Port: remotePort,
Protocol: protocol,
SourceContainerID: sourceContainer,
Metadata: common.PathtestMetadata{
ReverseDNSHostname: reverseDNSHostname,
},
}
}

func (s *npCollectorImpl) ScheduleConns(conns []*model.Connection, dns map[string]*model.DNSEntry) {
if !s.collectorConfigs.connectionsMonitoringEnabled {
return
}
startTime := s.TimeNowFn()
for _, conn := range conns {
remoteAddr := conn.Raddr
protocol := convertProtocol(conn.GetType())
var remotePort uint16
// UDP traces should not be done to the active
// port
if protocol != payload.ProtocolUDP {
remotePort = uint16(conn.Raddr.GetPort())
}
if !shouldScheduleNetworkPathForConn(conn) {
s.logger.Tracef("Skipped connection: addr=%s, port=%d, protocol=%s", remoteAddr, remotePort, protocol)
protocol := convertProtocol(conn.GetType())
s.logger.Tracef("Skipped connection: addr=%s, protocol=%s", conn.Raddr, protocol)
continue
}
sourceContainer := conn.Laddr.GetContainerId()
err := s.scheduleOne(remoteAddr.GetIp(), remotePort, protocol, sourceContainer)
pathtest := makePathtest(conn, dns)

err := s.scheduleOne(&pathtest)
if err != nil {
s.logger.Errorf("Error scheduling pathtests: %s", err)
}
Expand All @@ -158,20 +181,14 @@ func (s *npCollectorImpl) ScheduleConns(conns []*model.Connection) {

// scheduleOne schedules pathtests.
// It shouldn't block, if the input channel is full, an error is returned.
func (s *npCollectorImpl) scheduleOne(hostname string, port uint16, protocol payload.Protocol, sourceContainerID string) error {
func (s *npCollectorImpl) scheduleOne(pathtest *common.Pathtest) error {
if s.pathtestInputChan == nil {
return errors.New("no input channel, please check that network path is enabled")
}
s.logger.Debugf("Schedule traceroute for: hostname=%s port=%d", hostname, port)
s.logger.Debugf("Schedule traceroute for: hostname=%s port=%d", pathtest.Hostname, pathtest.Port)

ptest := &common.Pathtest{
Hostname: hostname,
Port: port,
Protocol: protocol,
SourceContainerID: sourceContainerID,
}
select {
case s.pathtestInputChan <- ptest:
case s.pathtestInputChan <- pathtest:
return nil
default:
return fmt.Errorf("collector input channel is full (channel capacity is %d)", cap(s.pathtestInputChan))
Expand Down Expand Up @@ -246,7 +263,7 @@ func (s *npCollectorImpl) runTracerouteForPath(ptest *pathteststore.PathtestCont
path.Origin = payload.PathOriginNetworkTraffic

// Perform reverse DNS lookup on destination and hop IPs
s.enrichPathWithRDNS(&path)
s.enrichPathWithRDNS(&path, ptest.Pathtest.Metadata.ReverseDNSHostname)

s.sendTelemetry(path, startTime, ptest)

Expand Down Expand Up @@ -326,14 +343,19 @@ func (s *npCollectorImpl) flush() {
}
}

func (s *npCollectorImpl) enrichPathWithRDNS(path *payload.NetworkPath) {
// enrichPathWithRDNS populates a NetworkPath with reverse-DNS queried hostnames.
func (s *npCollectorImpl) enrichPathWithRDNS(path *payload.NetworkPath, knownDestHostname string) {
if !s.collectorConfigs.reverseDNSEnabled {
return
}

// collect unique IP addresses from destination and hops
ipSet := make(map[string]struct{}, len(path.Hops)+1) // +1 for destination
ipSet[path.Destination.IPAddress] = struct{}{}

// only look up the destination hostname if we need to
if knownDestHostname == "" {
ipSet[path.Destination.IPAddress] = struct{}{}
}
for _, hop := range path.Hops {
if !hop.Reachable {
continue
Expand All @@ -356,13 +378,17 @@ func (s *npCollectorImpl) enrichPathWithRDNS(path *payload.NetworkPath) {
}

// assign resolved hostnames to destination and hops
hostname := s.getReverseDNSResult(path.Destination.IPAddress, results)
// if hostname is blank, use what's given by traceroute
// TODO: would it be better to move the logic up from the traceroute command?
// benefit to the current approach is having consistent behavior for all paths
// both static and dynamic
if hostname != "" {
path.Destination.ReverseDNSHostname = hostname
if knownDestHostname != "" {
path.Destination.ReverseDNSHostname = knownDestHostname
} else {
hostname := s.getReverseDNSResult(path.Destination.IPAddress, results)
// if hostname is blank, use what's given by traceroute
// TODO: would it be better to move the logic up from the traceroute command?
// benefit to the current approach is having consistent behavior for all paths
// both static and dynamic
if hostname != "" {
path.Destination.ReverseDNSHostname = hostname
}
}

for i, hop := range path.Hops {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func MockModule() fxutil.Module {

type npCollectorMock struct{}

func (s *npCollectorMock) ScheduleConns(_ []*model.Connection) {
func (s *npCollectorMock) ScheduleConns(_ []*model.Connection, _ map[string]*model.DNSEntry) {
panic("implement me")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ func Test_NpCollector_runningAndProcessing(t *testing.T) {
Type: model.ConnectionType_udp,
},
}
npCollector.ScheduleConns(conns)
npCollector.ScheduleConns(conns, make(map[string]*model.DNSEntry))

waitForProcessedPathtests(npCollector, 5*time.Second, 2)

Expand Down Expand Up @@ -281,7 +281,7 @@ func Test_NpCollector_ScheduleConns_ScheduleDurationMetric(t *testing.T) {
}

// WHEN
npCollector.ScheduleConns(conns)
npCollector.ScheduleConns(conns, make(map[string]*model.DNSEntry))

// THEN
calls := stats.GaugeCalls
Expand Down Expand Up @@ -340,6 +340,7 @@ func Test_npCollectorImpl_ScheduleConns(t *testing.T) {
tests := []struct {
name string
conns []*model.Connection
dns map[string]*model.DNSEntry
noInputChan bool
agentConfigs map[string]any
expectedPathtests []*common.Pathtest
Expand Down Expand Up @@ -480,6 +481,25 @@ func Test_npCollectorImpl_ScheduleConns(t *testing.T) {
},
expectedLogs: []logCount{},
},
{
name: "one outgoing TCP conn with known hostname (DNS)",
agentConfigs: defaultagentConfigs,
conns: []*model.Connection{
{
Laddr: &model.Addr{Ip: "10.0.0.3", Port: int32(30000), ContainerId: "testId1"},
Raddr: &model.Addr{Ip: "10.0.0.4", Port: int32(80)},
Direction: model.ConnectionDirection_outgoing,
Type: model.ConnectionType_tcp,
},
},
expectedPathtests: []*common.Pathtest{
{Hostname: "10.0.0.4", Port: uint16(80), Protocol: payload.ProtocolTCP, SourceContainerID: "testId1",
Metadata: common.PathtestMetadata{ReverseDNSHostname: "known-hostname"}},
},
dns: map[string]*model.DNSEntry{
"10.0.0.4": {Names: []string{"known-hostname"}},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand All @@ -497,7 +517,7 @@ func Test_npCollectorImpl_ScheduleConns(t *testing.T) {
stats := &teststatsd.Client{}
npCollector.statsdClient = stats

npCollector.ScheduleConns(tt.conns)
npCollector.ScheduleConns(tt.conns, tt.dns)

actualPathtests := []*common.Pathtest{}
for i := 0; i < len(tt.expectedPathtests); i++ {
Expand Down Expand Up @@ -758,7 +778,7 @@ func Benchmark_npCollectorImpl_ScheduleConns(b *testing.B) {
for i := 0; i < b.N; i++ {
// add line to avoid linter error
_ = i
npCollector.ScheduleConns(connections)
npCollector.ScheduleConns(connections, make(map[string]*model.DNSEntry))

waitForProcessedPathtests(npCollector, 60*time.Second, 50)
}
Expand Down Expand Up @@ -791,7 +811,7 @@ func Test_npCollectorImpl_enrichPathWithRDNS(t *testing.T) {
},
}

npCollector.enrichPathWithRDNS(&path)
npCollector.enrichPathWithRDNS(&path, "")

// THEN
assert.Equal(t, "hostname-10.0.0.41", path.Destination.ReverseDNSHostname) // private IP should be resolved
Expand All @@ -812,7 +832,7 @@ func Test_npCollectorImpl_enrichPathWithRDNS(t *testing.T) {
},
}

npCollector.enrichPathWithRDNS(&path)
npCollector.enrichPathWithRDNS(&path, "")

// THEN
assert.Equal(t, "", path.Destination.ReverseDNSHostname)
Expand Down Expand Up @@ -843,7 +863,7 @@ func Test_npCollectorImpl_enrichPathWithRDNS(t *testing.T) {
},
}

npCollector.enrichPathWithRDNS(&path)
npCollector.enrichPathWithRDNS(&path, "")

// THEN - no resolution should happen
assert.Equal(t, "", path.Destination.ReverseDNSHostname)
Expand All @@ -853,6 +873,30 @@ func Test_npCollectorImpl_enrichPathWithRDNS(t *testing.T) {
assert.Equal(t, "dest-hostname", path.Hops[3].Hostname)
}

func Test_npCollectorImpl_enrichPathWithRDNSKnownHostName(t *testing.T) {
// GIVEN
agentConfigs := map[string]any{
"network_path.connections_monitoring.enabled": true,
}
_, npCollector := newTestNpCollector(t, agentConfigs)

stats := &teststatsd.Client{}
npCollector.statsdClient = stats
npCollector.metricSender = metricsender.NewMetricSenderStatsd(stats)

// WHEN
path := payload.NetworkPath{
Destination: payload.NetworkPathDestination{IPAddress: "10.0.0.41", Hostname: "dest-hostname"},
Hops: nil,
}

npCollector.enrichPathWithRDNS(&path, "known-dest-hostname")

// THEN - destination hostname should resolve to known hostname
assert.Equal(t, "known-dest-hostname", path.Destination.ReverseDNSHostname)
assert.Empty(t, path.Hops)
}

func Test_npCollectorImpl_getReverseDNSResult(t *testing.T) {
// GIVEN
agentConfigs := map[string]any{
Expand Down
2 changes: 1 addition & 1 deletion pkg/process/checks/net.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func (c *ConnectionsCheck) Run(nextGroupID func() int32, _ *RunOptions) (RunResu

log.Debugf("collected connections in %s", time.Since(start))

c.npCollector.ScheduleConns(conns.Conns)
c.npCollector.ScheduleConns(conns.Conns, conns.Dns)

groupID := nextGroupID()
messages := batchConnections(c.hostInfo, c.maxConnsPerMessage, groupID, conns.Conns, conns.Dns, c.networkID, conns.ConnTelemetryMap, conns.CompilationTelemetryByAsset, conns.KernelHeaderFetchResult, conns.CORETelemetryByAsset, conns.PrebuiltEBPFAssets, conns.Domains, conns.Routes, conns.Tags, conns.AgentConfiguration, c.serviceExtractor)
Expand Down
10 changes: 10 additions & 0 deletions releasenotes/notes/network-path-local-dns-41303d691d58a2b4.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# Each section from every release note are combined when the
# CHANGELOG.rst is rendered. So the text needs to be worded so that
# it does not depend on any information only available in another
# section. This may mean repeating some details, but each section
# must be readable independently of the other.
#
# Each section note must be formatted as reStructuredText.
---
enhancements:
- Network Path will use recent DNS lookups to infer the destination hostname, if they are available. If a DNS lookup is not found, it will query reverse DNS the same way as before.
Loading