Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 38 additions & 17 deletions .github/workflows/betterstack-docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,33 @@ jobs:
with:
go-version-file: go.mod

- name: Apply memory leak fix
- name: Verify memory spike fixes are present
run: |
echo "Applying BetterStack memory leak fix..."
patch -p1 < fix-memory-leak-minimal.patch
echo "Verifying our memory spike fixes are in the source code..."
if ! grep -q "BEYLA_MAX_CONCURRENT_ELF" vendor/go.opentelemetry.io/obi/pkg/components/discover/typer.go; then
echo "❌ FAIL: BEYLA_MAX_CONCURRENT_ELF not found in typer.go"
echo "This means our memory spike fixes are missing from the source!"
exit 1
fi
if ! grep -q "elfParseSem" vendor/go.opentelemetry.io/obi/pkg/components/discover/typer.go; then
echo "❌ FAIL: elfParseSem variable not found in typer.go"
echo "This means our semaphore code is missing!"
exit 1
fi
if ! grep -q "Acquire semaphore to limit concurrent ELF parsing" vendor/go.opentelemetry.io/obi/pkg/components/discover/typer.go; then
echo "❌ FAIL: Semaphore comment not found in typer.go"
echo "This means our semaphore implementation is missing!"
exit 1
fi
echo "✅ SUCCESS: All memory spike fix markers found in source code"
echo "✅ BEYLA_MAX_CONCURRENT_ELF environment variable: FOUND"
echo "✅ elfParseSem semaphore variable: FOUND"
echo "✅ Semaphore implementation: FOUND"

- name: Set up QEMU
uses: docker/setup-qemu-action@v3

- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
- name: Set up Depot
uses: depot/setup-action@v1

- name: Log in to DockerHub
if: github.event_name != 'pull_request'
uses: docker/login-action@v3
with:
username: ${{ secrets.DOCKERHUB_USERNAME }}
Expand All @@ -60,29 +74,36 @@ jobs:
type=raw,value=latest,enable={{is_default_branch}}
# For version tags, use the version
type=ref,event=tag
# For branches, use branch name
# For pull requests, use pr-X format
type=ref,event=pr,prefix=pr-
# For branches, use branch name (sanitized)
type=ref,event=branch
# SHA prefix for all builds
type=sha,prefix={{branch}}-,format=short
# SHA prefix - use pr-X for PRs, branch for others
type=sha,prefix=pr-{{pr}}-,format=short,enable=${{ github.event_name == 'pull_request' }}
type=sha,prefix={{branch}}-,format=short,enable=${{ github.event_name != 'pull_request' }}

- name: Build and test
run: |
make build
# Everything is already vendored and committed, just compile
make compile
# Install test prerequisites
make prereqs
make test

- name: Build and push Docker image
uses: docker/build-push-action@v5
uses: depot/build-push-action@v1
with:
project: ${{ secrets.DEPOT_PROJECT_ID }}
token: ${{ secrets.DEPOT_API_TOKEN }}
context: .
platforms: linux/amd64,linux/arm64
push: ${{ github.event_name != 'pull_request' }}
push: true
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}
cache-from: type=gha
cache-to: type=gha,mode=max
build-args: |
VERSION=${{ github.ref_name }}
COMMIT=${{ github.sha }}
DEV_OBI=1

- name: Update Docker Hub description
if: github.event_name == 'push' && github.ref == 'refs/heads/main'
Expand Down
13 changes: 13 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,23 @@ COPY third_party_licenses.csv third_party_licenses.csv
ENV TOOLS_DIR=/go/bin

# Build
RUN echo "=== Verifying patches before build ===" && \
grep -q "BEYLA_MAX_CONCURRENT_ELF" vendor/go.opentelemetry.io/obi/pkg/components/discover/typer.go && \
echo "✅ Patches present before build steps" || (echo "❌ Patches missing before build steps" && exit 1)

RUN if [ -z "${DEV_OBI}" ]; then \
make generate && \
make copy-obi-vendor \
; fi

RUN echo "=== Verifying patches after generate/vendor steps ===" && \
grep -q "BEYLA_MAX_CONCURRENT_ELF" vendor/go.opentelemetry.io/obi/pkg/components/discover/typer.go && \
echo "✅ Patches still present after generate/vendor steps" || (echo "❌ Patches overwritten by generate/vendor steps" && exit 1)

RUN echo "=== FINAL VERIFICATION: Verifying patches immediately before compilation ===" && \
grep -q "BEYLA_MAX_CONCURRENT_ELF" vendor/go.opentelemetry.io/obi/pkg/components/discover/typer.go && \
echo "✅ CONFIRMED: Patches present, proceeding with compilation" || (echo "❌ CRITICAL: Patches missing before compilation!" && exit 1)

RUN make compile

# Create final image from minimal + built binary
Expand Down
18 changes: 18 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -197,15 +197,33 @@ copy-obi-vendor:
go get go.opentelemetry.io/obi
go mod vendor

.PHONY: copy-generated-files-only
copy-generated-files-only:
@echo "### Copying generated files without overwriting vendor patches..."
@if [ -d ".obi-src" ]; then \
cp -f .obi-src/pkg/internal/otelsdk/grafana-opentelemetry-java.jar vendor/go.opentelemetry.io/obi/pkg/internal/otelsdk/ 2>/dev/null || true; \
find .obi-src -name "*bpfel.go" -exec cp {} vendor/go.opentelemetry.io/obi/pkg/internal/ebpf/ \; 2>/dev/null || true; \
find .obi-src -name "*bpfeb.go" -exec cp {} vendor/go.opentelemetry.io/obi/pkg/internal/ebpf/ \; 2>/dev/null || true; \
echo "Generated files copied successfully"; \
else \
echo "No .obi-src directory found, skipping generated file copy"; \
fi

.PHONY: vendor-obi
vendor-obi: obi-submodule docker-generate copy-obi-vendor

.PHONY: vendor-obi-preserve-patches
vendor-obi-preserve-patches: obi-submodule docker-generate copy-generated-files-only

.PHONY: verify
verify: prereqs lint-dashboard vendor-obi lint test

.PHONY: build
build: vendor-obi verify compile

.PHONY: build-preserve-patches
build-preserve-patches: vendor-obi-preserve-patches compile

.PHONY: all
all: vendor-obi build

Expand Down
128 changes: 128 additions & 0 deletions fix-memory-spikes-test.patch
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
diff --git a/vendor/go.opentelemetry.io/obi/pkg/components/discover/typer_test.go b/vendor/go.opentelemetry.io/obi/pkg/components/discover/typer_test.go
new file mode 100644
index 0000000..1234567
--- /dev/null
+++ b/vendor/go.opentelemetry.io/obi/pkg/components/discover/typer_test.go
@@ -0,0 +1,120 @@
+package discover
+
+import (
+ "sync"
+ "sync/atomic"
+ "testing"
+ "time"
+
+ lru "github.com/hashicorp/golang-lru/v2"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+
+ "go.opentelemetry.io/obi/pkg/components/exec"
+ "go.opentelemetry.io/obi/pkg/components/svc"
+ "go.opentelemetry.io/obi/pkg/internal/goexec"
+)
+
+// TestConcurrentELFParsing verifies that the semaphore correctly limits
+// concurrent ELF parsing operations
+func TestConcurrentELFParsing(t *testing.T) {
+ // Save original semaphore and restore after test
+ originalSem := elfParseSem
+ defer func() { elfParseSem = originalSem }()
+
+ // Create a semaphore with limit of 2
+ elfParseSem = make(chan struct{}, 2)
+
+ // Create test typer with empty cache
+ cache, _ := lru.New[uint64, InstrumentedExecutable](10)
+ typer := &typer{
+ instrumentableCache: cache,
+ }
+
+ // Mock inspectOffsets to simulate slow ELF parsing
+ var activeParsers int32
+ var maxActiveParsers int32
+
+ typer.inspectOffsets = func(execElf *exec.FileInfo) (*goexec.Offsets, bool, error) {
+ // Increment active parsers counter
+ current := atomic.AddInt32(&activeParsers, 1)
+
+ // Track maximum concurrent parsers
+ for {
+ max := atomic.LoadInt32(&maxActiveParsers)
+ if current <= max || atomic.CompareAndSwapInt32(&maxActiveParsers, max, current) {
+ break
+ }
+ }
+
+ // Simulate parsing work
+ time.Sleep(50 * time.Millisecond)
+
+ // Decrement active parsers
+ atomic.AddInt32(&activeParsers, -1)
+
+ return &goexec.Offsets{}, true, nil
+ }
+
+ // Launch 5 concurrent parsing attempts
+ var wg sync.WaitGroup
+ for i := 0; i < 5; i++ {
+ wg.Add(1)
+ go func(pid int32) {
+ defer wg.Done()
+
+ execElf := &exec.FileInfo{
+ Pid: pid,
+ CmdExePath: "/test/binary",
+ Ino: uint64(pid), // Different inode for each to avoid cache
+ }
+
+ _ = typer.asInstrumentable(execElf)
+ }(int32(i))
+ }
+
+ wg.Wait()
+
+ // Verify that no more than 2 parsers ran concurrently
+ assert.LessOrEqual(t, maxActiveParsers, int32(2),
+ "Expected at most 2 concurrent parsers, but had %d", maxActiveParsers)
+}
+
+// TestCacheHitAvoidsParsing verifies that cache hits skip ELF parsing entirely
+func TestCacheHitAvoidsParsing(t *testing.T) {
+ cache, _ := lru.New[uint64, InstrumentedExecutable](10)
+
+ // Pre-populate cache
+ cached := InstrumentedExecutable{
+ Type: svc.InstrumentableGolang,
+ Offsets: &goexec.Offsets{},
+ }
+ cache.Add(uint64(123), cached)
+
+ typer := &typer{
+ instrumentableCache: cache,
+ }
+
+ parseCount := 0
+ typer.inspectOffsets = func(execElf *exec.FileInfo) (*goexec.Offsets, bool, error) {
+ parseCount++
+ return nil, false, nil
+ }
+
+ // First request - should hit cache
+ execElf1 := &exec.FileInfo{
+ Pid: 1,
+ Ino: 123,
+ }
+ result1 := typer.asInstrumentable(execElf1)
+
+ // Second request with same inode - should also hit cache
+ execElf2 := &exec.FileInfo{
+ Pid: 2,
+ Ino: 123,
+ }
+ result2 := typer.asInstrumentable(execElf2)
+
+ // Verify no parsing occurred
+ assert.Equal(t, 0, parseCount, "Expected no parsing for cached entries")
+ assert.Equal(t, svc.InstrumentableGolang, result1.Type)
+ assert.Equal(t, svc.InstrumentableGolang, result2.Type)
+}
78 changes: 78 additions & 0 deletions fix-memory-spikes.patch
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
diff --git a/vendor/go.opentelemetry.io/obi/pkg/components/discover/typer.go b/vendor/go.opentelemetry.io/obi/pkg/components/discover/typer.go
index original..modified 100644
--- a/vendor/go.opentelemetry.io/obi/pkg/components/discover/typer.go
+++ b/vendor/go.opentelemetry.io/obi/pkg/components/discover/typer.go
@@ -8,6 +8,8 @@ import (
"errors"
"fmt"
"log/slog"
+ "os"
+ "strconv"
"strings"

lru "github.com/hashicorp/golang-lru/v2"
@@ -27,6 +29,21 @@ import (
"go.opentelemetry.io/obi/pkg/services"
)

+var (
+ // elfParseSem limits concurrent ELF symbol parsing to prevent memory spikes
+ // when multiple processes of the same binary start simultaneously
+ elfParseSem chan struct{}
+)
+
+func init() {
+ maxConcurrent := 2
+ if val := os.Getenv("BEYLA_MAX_CONCURRENT_ELF"); val != "" {
+ if n, err := strconv.Atoi(val); err == nil && n > 0 {
+ maxConcurrent = n
+ }
+ }
+ elfParseSem = make(chan struct{}, maxConcurrent)
+}
+
type InstrumentedExecutable struct {
Type svc.InstrumentableType
Offsets *goexec.Offsets
@@ -180,11 +197,40 @@ func (t *typer) FilterClassify(evs []Event[ProcessMatch]) []Event[ebpf.Instrumen
// in case of belonging to a forked process, returns its parent.
func (t *typer) asInstrumentable(execElf *exec.FileInfo) ebpf.Instrumentable {
log := t.log.With("pid", execElf.Pid, "comm", execElf.CmdExePath)
+
+ // Check cache first to avoid expensive ELF symbol parsing
if ic, ok := t.instrumentableCache.Get(execElf.Ino); ok {
log.Debug("new instance of existing executable", "type", ic.Type)
return ebpf.Instrumentable{Type: ic.Type, FileInfo: execElf, Offsets: ic.Offsets, InstrumentationError: ic.InstrumentationError}
}

+ // Acquire semaphore to limit concurrent ELF parsing
+ // This prevents memory spikes when multiple processes start simultaneously
+ select {
+ case elfParseSem <- struct{}{}:
+ // Got a token, proceed
+ log.Debug("acquired ELF parse semaphore")
+ default:
+ // Semaphore is full, log and wait
+ log.Debug("waiting for ELF parse semaphore",
+ "queue_length", len(elfParseSem),
+ "max_concurrent", cap(elfParseSem))
+ elfParseSem <- struct{}{} // This will block
+ log.Debug("acquired ELF parse semaphore after waiting")
+ }
+
+ // Always release the semaphore when done
+ defer func() {
+ <-elfParseSem
+ log.Debug("released ELF parse semaphore")
+ }()
+
+ // Double-check cache after acquiring semaphore
+ // Another goroutine may have populated it while we waited
+ if ic, ok := t.instrumentableCache.Get(execElf.Ino); ok {
+ log.Debug("cache hit after semaphore acquisition", "type", ic.Type)
+ return ebpf.Instrumentable{Type: ic.Type, FileInfo: execElf, Offsets: ic.Offsets, InstrumentationError: ic.InstrumentationError}
+ }
+
log.Debug("getting instrumentable information")
// look for suitable Go application first
offsets, ok, err := t.inspectOffsets(execElf)
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ require (
go.opentelemetry.io/collector/exporter/otlpexporter v0.136.0
go.opentelemetry.io/collector/exporter/otlphttpexporter v0.136.0
go.opentelemetry.io/collector/pdata v1.42.0
go.opentelemetry.io/obi v0.0.0-20251013143511-10fd81bc8389
go.opentelemetry.io/obi v0.0.0-20251023073643-d71d07a6a4d2
go.opentelemetry.io/otel v1.38.0
go.opentelemetry.io/otel/sdk v1.38.0
go.opentelemetry.io/otel/sdk/metric v1.38.0
Expand Down
Loading
Loading