Skip to content

Commit

Permalink
Implement otlploggrpc gRPC client (#5572)
Browse files Browse the repository at this point in the history
part of #5056

For full usage of this client, check
#5522

---------

Co-authored-by: Damien Mathieu <[email protected]>
  • Loading branch information
XSAM and dmathieu authored Jul 4, 2024
1 parent bcb7d42 commit 5cd7807
Show file tree
Hide file tree
Showing 3 changed files with 434 additions and 1 deletion.
97 changes: 97 additions & 0 deletions exporters/otlp/otlplog/otlploggrpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
package otlploggrpc // import "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc"

import (
"context"
"fmt"
"time"

"google.golang.org/genproto/googleapis/rpc/errdetails"
Expand All @@ -16,8 +18,10 @@ import (
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal/retry"
collogpb "go.opentelemetry.io/proto/otlp/collector/logs/v1"
logpb "go.opentelemetry.io/proto/otlp/logs/v1"
)

// The methods of this type are not expected to be called concurrently.
Expand Down Expand Up @@ -107,6 +111,99 @@ func newGRPCDialOptions(cfg config) []grpc.DialOption {
return dialOpts
}

// UploadLogs sends proto logs to connected endpoint.
//
// Retryable errors from the server will be handled according to any
// RetryConfig the client was created with.
//
// The otlplog.Exporter synchronizes access to client methods, and
// ensures this is not called after the Exporter is shutdown. Only thing
// to do here is send data.
func (c *client) UploadLogs(ctx context.Context, rl []*logpb.ResourceLogs) error {
select {
case <-ctx.Done():
// Do not upload if the context is already expired.
return ctx.Err()
default:
}

ctx, cancel := c.exportContext(ctx)
defer cancel()

return c.requestFunc(ctx, func(ctx context.Context) error {
resp, err := c.lsc.Export(ctx, &collogpb.ExportLogsServiceRequest{
ResourceLogs: rl,
})
if resp != nil && resp.PartialSuccess != nil {
msg := resp.PartialSuccess.GetErrorMessage()
n := resp.PartialSuccess.GetRejectedLogRecords()
if n != 0 || msg != "" {
err := fmt.Errorf("OTLP partial success: %s (%d log records rejected)", msg, n)
otel.Handle(err)
}
}
// nil is converted to OK.
if status.Code(err) == codes.OK {
// Success.
return nil
}
return err
})
}

// Shutdown shuts down the client, freeing all resources.
//
// Any active connections to a remote endpoint are closed if they were created
// by the client. Any gRPC connection passed during creation using
// WithGRPCConn will not be closed. It is the caller's responsibility to
// handle cleanup of that resource.
//
// The otlplog.Exporter synchronizes access to client methods and
// ensures this is called only once. The only thing that needs to be done
// here is to release any computational resources the client holds.
func (c *client) Shutdown(ctx context.Context) error {
c.metadata = nil
c.requestFunc = nil
c.lsc = nil

// Release the connection if we created it.
err := ctx.Err()
if c.ourConn {
closeErr := c.conn.Close()
// A context timeout error takes precedence over this error.
if err == nil && closeErr != nil {
err = closeErr
}
}
c.conn = nil
return err
}

// exportContext returns a copy of parent with an appropriate deadline and
// cancellation function based on the clients configured export timeout.
//
// It is the callers responsibility to cancel the returned context once its
// use is complete, via the parent or directly with the returned CancelFunc, to
// ensure all resources are correctly released.
func (c *client) exportContext(parent context.Context) (context.Context, context.CancelFunc) {
var (
ctx context.Context
cancel context.CancelFunc
)

if c.exportTimeout > 0 {
ctx, cancel = context.WithTimeout(parent, c.exportTimeout)
} else {
ctx, cancel = context.WithCancel(parent)
}

if c.metadata.Len() > 0 {
ctx = metadata.NewOutgoingContext(ctx, c.metadata)
}

return ctx, cancel
}

// retryable returns if err identifies a request that can be retried and a
// duration to wait for if an explicit throttle time is included in err.
func retryable(err error) (bool, time.Duration) {
Expand Down
Loading

0 comments on commit 5cd7807

Please sign in to comment.