Skip to content

Commit

Permalink
Implement otlploggrpc exporter (#5582)
Browse files Browse the repository at this point in the history
Part of #5056

It also abstracts some test help functions from the client and adjusts
the indent of `UploadLogs` and `PartialSuccess` in client tests.

For full usage of this exporter, check
#5522

---------

Co-authored-by: Tyler Yahn <[email protected]>
  • Loading branch information
XSAM and MrAlias authored Jul 9, 2024
1 parent fa00fc5 commit 4816927
Show file tree
Hide file tree
Showing 4 changed files with 295 additions and 97 deletions.
10 changes: 10 additions & 0 deletions exporters/otlp/otlplog/otlploggrpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,16 @@ func (c *client) exportContext(parent context.Context) (context.Context, context
return ctx, cancel
}

type noopClient struct{}

func newNoopClient() *noopClient {
return &noopClient{}
}

func (c *noopClient) UploadLogs(context.Context, []*logpb.ResourceLogs) error { return nil }

func (c *noopClient) Shutdown(context.Context) error { return nil }

// retryable returns if err identifies a request that can be retried and a
// duration to wait for if an explicit throttle time is included in err.
func retryable(err error) (bool, time.Duration) {
Expand Down
175 changes: 88 additions & 87 deletions exporters/otlp/otlplog/otlploggrpc/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,113 +450,114 @@ func (c *grpcCollector) Collect() *storage {
return c.storage
}

func TestClient(t *testing.T) {
factory := func(rCh <-chan exportResult) (*client, *grpcCollector) {
coll, err := newGRPCCollector("", rCh)
require.NoError(t, err)

addr := coll.listener.Addr().String()
opts := []Option{WithEndpoint(addr), WithInsecure()}
cfg := newConfig(opts)
client, err := newClient(cfg)
require.NoError(t, err)
return client, coll
func clientFactory(t *testing.T, rCh <-chan exportResult) (*client, *grpcCollector) {
t.Helper()
coll, err := newGRPCCollector("", rCh)
require.NoError(t, err)

addr := coll.listener.Addr().String()
opts := []Option{WithEndpoint(addr), WithInsecure()}
cfg := newConfig(opts)
client, err := newClient(cfg)
require.NoError(t, err)
return client, coll
}

func testCtxErrs(factory func() func(context.Context) error) func(t *testing.T) {
return func(t *testing.T) {
t.Helper()
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)

t.Run("DeadlineExceeded", func(t *testing.T) {
innerCtx, innerCancel := context.WithTimeout(ctx, time.Nanosecond)
t.Cleanup(innerCancel)
<-innerCtx.Done()

f := factory()
assert.ErrorIs(t, f(innerCtx), context.DeadlineExceeded)
})

t.Run("Canceled", func(t *testing.T) {
innerCtx, innerCancel := context.WithCancel(ctx)
innerCancel()

f := factory()
assert.ErrorIs(t, f(innerCtx), context.Canceled)
})
}
}

func TestClient(t *testing.T) {
t.Run("ClientHonorsContextErrors", func(t *testing.T) {
testCtxErrs := func(factory func() func(context.Context) error) func(t *testing.T) {
return func(t *testing.T) {
t.Helper()
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)

t.Run("DeadlineExceeded", func(t *testing.T) {
innerCtx, innerCancel := context.WithTimeout(ctx, time.Nanosecond)
t.Cleanup(innerCancel)
<-innerCtx.Done()

f := factory()
assert.ErrorIs(t, f(innerCtx), context.DeadlineExceeded)
})

t.Run("Canceled", func(t *testing.T) {
innerCtx, innerCancel := context.WithCancel(ctx)
innerCancel()

f := factory()
assert.ErrorIs(t, f(innerCtx), context.Canceled)
})
}
}

t.Run("Shutdown", testCtxErrs(func() func(context.Context) error {
c, _ := factory(nil)
c, _ := clientFactory(t, nil)
return c.Shutdown
}))

t.Run("UploadLog", testCtxErrs(func() func(context.Context) error {
c, _ := factory(nil)
c, _ := clientFactory(t, nil)
return func(ctx context.Context) error {
return c.UploadLogs(ctx, nil)
}
}))
})

t.Run("UploadLogs", func(t *testing.T) {
ctx := context.Background()
client, coll := factory(nil)

require.NoError(t, client.UploadLogs(ctx, resourceLogs))
require.NoError(t, client.Shutdown(ctx))
got := coll.Collect().Dump()
require.Len(t, got, 1, "upload of one ResourceLogs")
diff := cmp.Diff(got[0], resourceLogs[0], cmp.Comparer(proto.Equal))
if diff != "" {
t.Fatalf("unexpected ResourceLogs:\n%s", diff)
}
})
t.Run("UploadLogs", func(t *testing.T) {
ctx := context.Background()
client, coll := clientFactory(t, nil)

require.NoError(t, client.UploadLogs(ctx, resourceLogs))
require.NoError(t, client.Shutdown(ctx))
got := coll.Collect().Dump()
require.Len(t, got, 1, "upload of one ResourceLogs")
diff := cmp.Diff(got[0], resourceLogs[0], cmp.Comparer(proto.Equal))
if diff != "" {
t.Fatalf("unexpected ResourceLogs:\n%s", diff)
}
})

t.Run("PartialSuccess", func(t *testing.T) {
const n, msg = 2, "bad data"
rCh := make(chan exportResult, 3)
rCh <- exportResult{
Response: &collogpb.ExportLogsServiceResponse{
PartialSuccess: &collogpb.ExportLogsPartialSuccess{
RejectedLogRecords: n,
ErrorMessage: msg,
},
t.Run("PartialSuccess", func(t *testing.T) {
const n, msg = 2, "bad data"
rCh := make(chan exportResult, 3)
rCh <- exportResult{
Response: &collogpb.ExportLogsServiceResponse{
PartialSuccess: &collogpb.ExportLogsPartialSuccess{
RejectedLogRecords: n,
ErrorMessage: msg,
},
}
rCh <- exportResult{
Response: &collogpb.ExportLogsServiceResponse{
PartialSuccess: &collogpb.ExportLogsPartialSuccess{
// Should not be logged.
RejectedLogRecords: 0,
ErrorMessage: "",
},
},
}
rCh <- exportResult{
Response: &collogpb.ExportLogsServiceResponse{
PartialSuccess: &collogpb.ExportLogsPartialSuccess{
// Should not be logged.
RejectedLogRecords: 0,
ErrorMessage: "",
},
}
rCh <- exportResult{
Response: &collogpb.ExportLogsServiceResponse{},
}
},
}
rCh <- exportResult{
Response: &collogpb.ExportLogsServiceResponse{},
}

ctx := context.Background()
client, _ := factory(rCh)
ctx := context.Background()
client, _ := clientFactory(t, rCh)

defer func(orig otel.ErrorHandler) {
otel.SetErrorHandler(orig)
}(otel.GetErrorHandler())
defer func(orig otel.ErrorHandler) {
otel.SetErrorHandler(orig)
}(otel.GetErrorHandler())

var errs []error
eh := otel.ErrorHandlerFunc(func(e error) { errs = append(errs, e) })
otel.SetErrorHandler(eh)
var errs []error
eh := otel.ErrorHandlerFunc(func(e error) { errs = append(errs, e) })
otel.SetErrorHandler(eh)

require.NoError(t, client.UploadLogs(ctx, resourceLogs))
require.NoError(t, client.UploadLogs(ctx, resourceLogs))
require.NoError(t, client.UploadLogs(ctx, resourceLogs))
require.NoError(t, client.UploadLogs(ctx, resourceLogs))
require.NoError(t, client.UploadLogs(ctx, resourceLogs))
require.NoError(t, client.UploadLogs(ctx, resourceLogs))

require.Equal(t, 1, len(errs))
want := fmt.Sprintf("%s (%d log records rejected)", msg, n)
assert.ErrorContains(t, errs[0], want)
})
require.Equal(t, 1, len(errs))
want := fmt.Sprintf("%s (%d log records rejected)", msg, n)
assert.ErrorContains(t, errs[0], want)
})
}
56 changes: 46 additions & 10 deletions exporters/otlp/otlplog/otlploggrpc/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,27 @@ package otlploggrpc // import "go.opentelemetry.io/otel/exporters/otlp/otlplog/o

import (
"context"
"sync"
"sync/atomic"

"go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal/transform"
"go.opentelemetry.io/otel/sdk/log"
logpb "go.opentelemetry.io/proto/otlp/logs/v1"
)

type logClient interface {
UploadLogs(ctx context.Context, rl []*logpb.ResourceLogs) error
Shutdown(context.Context) error
}

// Exporter is a OpenTelemetry log Exporter. It transports log data encoded as
// OTLP protobufs using gRPC.
type Exporter struct {
// TODO: implement.
// Ensure synchronous access to the client across all functionality.
clientMu sync.Mutex
client logClient

stopped atomic.Bool
}

// Compile-time check Exporter implements [log.Exporter].
Expand All @@ -25,29 +38,52 @@ func New(_ context.Context, options ...Option) (*Exporter, error) {
if err != nil {
return nil, err
}
return newExporter(c, cfg)
return newExporter(c), nil
}

func newExporter(*client, config) (*Exporter, error) {
// TODO: implement
return &Exporter{}, nil
func newExporter(c logClient) *Exporter {
var e Exporter
e.client = c
return &e
}

var transformResourceLogs = transform.ResourceLogs

// Export transforms and transmits log records to an OTLP receiver.
//
// This method returns nil and drops records if called after Shutdown.
// This method returns an error if the method is canceled by the passed context.
func (e *Exporter) Export(ctx context.Context, records []log.Record) error {
// TODO: implement.
return nil
if e.stopped.Load() {
return nil
}

otlp := transformResourceLogs(records)
if otlp == nil {
return nil
}

e.clientMu.Lock()
defer e.clientMu.Unlock()
return e.client.UploadLogs(ctx, otlp)
}

// Shutdown shuts down the Exporter. Calls to Export or ForceFlush will perform
// no operation after this is called.
func (e *Exporter) Shutdown(ctx context.Context) error {
// TODO: implement.
return nil
if e.stopped.Swap(true) {
return nil
}

e.clientMu.Lock()
defer e.clientMu.Unlock()

err := e.client.Shutdown(ctx)
e.client = newNoopClient()
return err
}

// ForceFlush does nothing. The Exporter holds no state.
func (e *Exporter) ForceFlush(ctx context.Context) error {
// TODO: implement.
return nil
}
Loading

0 comments on commit 4816927

Please sign in to comment.