Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add PID as an attribute in each sample #212

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
39 changes: 27 additions & 12 deletions reporter/otlp_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ type traceAndMetaKey struct {
apmServiceName string
// containerID is annotated based on PID information
containerID string
pid int64
}

// traceEvents holds known information about a trace.
Expand All @@ -84,9 +85,9 @@ type traceEvents struct {
}

// attrKeyValue is a helper to populate Profile.attribute_table.
type attrKeyValue struct {
type attrKeyValue[T string | int64] struct {
key string
value string
value T
}

// OTLPReporter receives and transforms information to be OTLP/profiles compliant.
Expand Down Expand Up @@ -169,6 +170,7 @@ func (r *OTLPReporter) ReportTraceEvent(trace *libpf.Trace, meta *TraceEventMeta
comm: meta.Comm,
apmServiceName: meta.APMServiceName,
containerID: containerID,
pid: int64(meta.PID),
}

if events, exists := (*traceEventsMap)[key]; exists {
Expand Down Expand Up @@ -550,7 +552,7 @@ func (r *OTLPReporter) getProfile() (profile *profiles.Profile, startTS, endTS u

// Walk every frame of the trace.
for i := range traceInfo.frameTypes {
frameAttributes := addProfileAttributes(profile, []attrKeyValue{
frameAttributes := addProfileAttributes(profile, []attrKeyValue[string]{
{key: "profile.frame.type", value: traceInfo.frameTypes[i].String()},
}, attributeMap)

Expand Down Expand Up @@ -583,7 +585,7 @@ func (r *OTLPReporter) getProfile() (profile *profiles.Profile, startTS, endTS u
fileName = execInfo.fileName
}

mappingAttributes := addProfileAttributes(profile, []attrKeyValue{
mappingAttributes := addProfileAttributes(profile, []attrKeyValue[string]{
// Once SemConv and its Go package is released with the new
// semantic convention for build_id, replace these hard coded
// strings.
Expand Down Expand Up @@ -648,11 +650,13 @@ func (r *OTLPReporter) getProfile() (profile *profiles.Profile, startTS, endTS u
profile.Location = append(profile.Location, loc)
}

sample.Attributes = addProfileAttributes(profile, []attrKeyValue{
sample.Attributes = append(addProfileAttributes(profile, []attrKeyValue[string]{
{key: string(semconv.ContainerIDKey), value: traceKey.containerID},
{key: string(semconv.ThreadNameKey), value: traceKey.comm},
{key: string(semconv.ServiceNameKey), value: traceKey.apmServiceName},
}, attributeMap)
}, attributeMap), addProfileAttributes(profile, []attrKeyValue[int64]{
{key: string(semconv.ProcessPIDKey), value: traceKey.pid},
}, attributeMap)...)
sample.LocationsLength = uint64(len(traceInfo.frameTypes))
locationIndex += sample.LocationsLength

Expand Down Expand Up @@ -723,15 +727,26 @@ func createFunctionEntry(funcMap map[funcInfo]uint64,

// addProfileAttributes adds attributes to Profile.attribute_table and returns
// the indices to these attributes.
func addProfileAttributes(profile *profiles.Profile,
attributes []attrKeyValue, attributeMap map[string]uint64) []uint64 {
func addProfileAttributes[T string | int64](profile *profiles.Profile,
attributes []attrKeyValue[T], attributeMap map[string]uint64) []uint64 {
indices := make([]uint64, 0, len(attributes))

addAttr := func(attr attrKeyValue) {
if attr.value == "" {
addAttr := func(attr attrKeyValue[T]) {
var attributeCompositeKey string
var attributeValue common.AnyValue

switch val := any(attr.value).(type) {
case string:
attributeCompositeKey = attr.key + "_" + val
attributeValue = common.AnyValue{Value: &common.AnyValue_StringValue{StringValue: val}}
case int64:
attributeCompositeKey = attr.key + "_" + strconv.Itoa(int(val))
attributeValue = common.AnyValue{Value: &common.AnyValue_IntValue{IntValue: val}}
default:
log.Error("Unsupported attribute value type. Only string and int64 are supported.")
return
florianl marked this conversation as resolved.
Show resolved Hide resolved
rockdaboot marked this conversation as resolved.
Show resolved Hide resolved
}
attributeCompositeKey := attr.key + "_" + attr.value

if attributeIndex, exists := attributeMap[attributeCompositeKey]; exists {
indices = append(indices, attributeIndex)
return
Expand All @@ -740,7 +755,7 @@ func addProfileAttributes(profile *profiles.Profile,
indices = append(indices, newIndex)
profile.AttributeTable = append(profile.AttributeTable, &common.KeyValue{
Key: attr.key,
Value: &common.AnyValue{Value: &common.AnyValue_StringValue{StringValue: attr.value}},
Value: &attributeValue,
})
attributeMap[attributeCompositeKey] = newIndex
}
Expand Down
73 changes: 63 additions & 10 deletions reporter/otlp_reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,37 @@ func TestGetSampleAttributes(t *testing.T) {
comm: "",
apmServiceName: "",
containerID: "",
pid: 0,
},
},
attributeMap: make(map[string]uint64),
expectedIndices: [][]uint64{{0, 1, 2, 3}},
expectedAttributeTable: []*common.KeyValue{
{
Key: "container.id",
Value: &common.AnyValue{
Value: &common.AnyValue_StringValue{StringValue: ""},
},
},
{
Key: "thread.name",
Value: &common.AnyValue{
Value: &common.AnyValue_StringValue{StringValue: ""},
},
},
{
Key: "service.name",
Value: &common.AnyValue{
Value: &common.AnyValue_StringValue{StringValue: ""},
},
},
{
Key: "process.pid",
Value: &common.AnyValue{
Value: &common.AnyValue_IntValue{IntValue: 0},
},
},
},
attributeMap: make(map[string]uint64),
expectedIndices: [][]uint64{make([]uint64, 0, 4)},
expectedAttributeTable: nil,
},
"duplicate": {
profile: &profiles.Profile{},
Expand All @@ -40,16 +66,18 @@ func TestGetSampleAttributes(t *testing.T) {
comm: "comm1",
apmServiceName: "apmServiceName1",
containerID: "containerID1",
pid: 1234,
},
{
hash: libpf.TraceHash{},
comm: "comm1",
apmServiceName: "apmServiceName1",
containerID: "containerID1",
pid: 1234,
},
},
attributeMap: make(map[string]uint64),
expectedIndices: [][]uint64{{0, 1, 2}, {0, 1, 2}},
expectedIndices: [][]uint64{{0, 1, 2, 3}, {0, 1, 2, 3}},
expectedAttributeTable: []*common.KeyValue{
{
Key: "container.id",
Expand All @@ -69,6 +97,12 @@ func TestGetSampleAttributes(t *testing.T) {
Value: &common.AnyValue_StringValue{StringValue: "apmServiceName1"},
},
},
{
Key: "process.pid",
Value: &common.AnyValue{
Value: &common.AnyValue_IntValue{IntValue: 1234},
},
},
},
},
"different": {
Expand All @@ -79,16 +113,18 @@ func TestGetSampleAttributes(t *testing.T) {
comm: "comm1",
apmServiceName: "apmServiceName1",
containerID: "containerID1",
pid: 1234,
},
{
hash: libpf.TraceHash{},
comm: "comm2",
apmServiceName: "apmServiceName2",
containerID: "containerID2",
pid: 6789,
},
},
attributeMap: make(map[string]uint64),
expectedIndices: [][]uint64{{0, 1, 2}, {3, 4, 5}},
expectedIndices: [][]uint64{{0, 1, 2, 3}, {4, 5, 6, 7}},
expectedAttributeTable: []*common.KeyValue{
{
Key: "container.id",
Expand All @@ -108,6 +144,12 @@ func TestGetSampleAttributes(t *testing.T) {
Value: &common.AnyValue_StringValue{StringValue: "apmServiceName1"},
},
},
{
Key: "process.pid",
Value: &common.AnyValue{
Value: &common.AnyValue_IntValue{IntValue: 1234},
},
},
{
Key: "container.id",
Value: &common.AnyValue{
Expand All @@ -126,6 +168,12 @@ func TestGetSampleAttributes(t *testing.T) {
Value: &common.AnyValue_StringValue{StringValue: "apmServiceName2"},
},
},
{
Key: "process.pid",
Value: &common.AnyValue{
Value: &common.AnyValue_IntValue{IntValue: 6789},
},
},
},
},
}
Expand All @@ -136,11 +184,16 @@ func TestGetSampleAttributes(t *testing.T) {
t.Run(name, func(t *testing.T) {
indices := make([][]uint64, 0)
for _, k := range tc.k {
indices = append(indices, addProfileAttributes(tc.profile, []attrKeyValue{
{key: string(semconv.ContainerIDKey), value: k.containerID},
{key: string(semconv.ThreadNameKey), value: k.comm},
{key: string(semconv.ServiceNameKey), value: k.apmServiceName},
}, tc.attributeMap))
indices = append(indices, append(addProfileAttributes(tc.profile,
[]attrKeyValue[string]{
{key: string(semconv.ContainerIDKey), value: k.containerID},
{key: string(semconv.ThreadNameKey), value: k.comm},
{key: string(semconv.ServiceNameKey), value: k.apmServiceName},
}, tc.attributeMap),
addProfileAttributes(tc.profile,
[]attrKeyValue[int64]{
{key: string(semconv.ProcessPIDKey), value: k.pid},
}, tc.attributeMap)...))
}
require.Equal(t, tc.expectedIndices, indices)
require.Equal(t, tc.expectedAttributeTable, tc.profile.AttributeTable)
Expand Down