Skip to content

Commit

Permalink
Merge branch 'main' into owl-bot-copy
Browse files Browse the repository at this point in the history
  • Loading branch information
codyoss authored Dec 11, 2024
2 parents 72c242e + 169e309 commit 6324314
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 21 deletions.
40 changes: 28 additions & 12 deletions bigtable/bigtable.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
btopt "cloud.google.com/go/bigtable/internal/option"
"cloud.google.com/go/internal/trace"
gax "github.com/googleapis/gax-go/v2"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"google.golang.org/api/option"
"google.golang.org/api/option/internaloption"
Expand Down Expand Up @@ -95,6 +96,18 @@ func NewClient(ctx context.Context, project, instance string, opts ...option.Cli

// NewClientWithConfig creates a new client with the given config.
func NewClientWithConfig(ctx context.Context, project, instance string, config ClientConfig, opts ...option.ClientOption) (*Client, error) {
metricsProvider := config.MetricsProvider
if emulatorAddr := os.Getenv("BIGTABLE_EMULATOR_HOST"); emulatorAddr != "" {
// Do not emit metrics when emulator is being used
metricsProvider = NoopMetricsProvider{}
}

// Create a OpenTelemetry metrics configuration
metricsTracerFactory, err := newBuiltinMetricsTracerFactory(ctx, project, instance, config.AppProfile, metricsProvider, opts...)
if err != nil {
return nil, err
}

o, err := btopt.DefaultClientOptions(prodAddr, mtlsProdAddr, Scope, clientUserAgent)
if err != nil {
return nil, err
Expand All @@ -112,21 +125,24 @@ func NewClientWithConfig(ctx context.Context, project, instance string, config C
// Allow non-default service account in DirectPath.
o = append(o, internaloption.AllowNonDefaultServiceAccount(true))
o = append(o, opts...)
connPool, err := gtransport.DialPool(ctx, o...)
if err != nil {
return nil, fmt.Errorf("dialing: %w", err)
}

metricsProvider := config.MetricsProvider
if emulatorAddr := os.Getenv("BIGTABLE_EMULATOR_HOST"); emulatorAddr != "" {
// Do not emit metrics when emulator is being used
metricsProvider = NoopMetricsProvider{}
}
asyncRefreshMetricAttrs := metricsTracerFactory.clientAttributes
asyncRefreshMetricAttrs = append(asyncRefreshMetricAttrs,
attribute.String(metricLabelKeyTag, "async_refresh_dry_run"),

// Create a OpenTelemetry metrics configuration
metricsTracerFactory, err := newBuiltinMetricsTracerFactory(ctx, project, instance, config.AppProfile, metricsProvider, opts...)
// Table, cluster and zone are unknown at this point
// Use default values
attribute.String(monitoredResLabelKeyTable, defaultTable),
attribute.String(monitoredResLabelKeyCluster, defaultCluster),
attribute.String(monitoredResLabelKeyZone, defaultZone),
)
o = append(o, internaloption.EnableAsyncRefreshDryRun(func() {
metricsTracerFactory.debugTags.Add(context.Background(), 1,
metric.WithAttributes(asyncRefreshMetricAttrs...))
}))
connPool, err := gtransport.DialPool(ctx, o...)
if err != nil {
return nil, err
return nil, fmt.Errorf("dialing: %w", err)
}

return &Client{
Expand Down
8 changes: 4 additions & 4 deletions bigtable/conformance_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,10 @@ trap cleanup EXIT

# Run the conformance tests
cd $conformanceTestsHome
# Tests in https://github.com/googleapis/cloud-bigtable-clients-test/tree/main/tests can only be run on go1.22.5
go install golang.org/dl/go1.22.5@latest
go1.22.5 download
go1.22.5 test -v -proxy_addr=:$testProxyPort | tee -a $sponge_log
# Tests in https://github.com/googleapis/cloud-bigtable-clients-test/tree/main/tests can only be run on go1.22.7
go install golang.org/dl/go1.22.7@latest
go1.22.7 download
go1.22.7 test -v -proxy_addr=:$testProxyPort | tee -a $sponge_log
RETURN_CODE=$?

echo "exiting with ${RETURN_CODE}"
Expand Down
17 changes: 16 additions & 1 deletion bigtable/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ const (
metricLabelKeyAppProfile = "app_profile"
metricLabelKeyMethod = "method"
metricLabelKeyStatus = "status"
metricLabelKeyTag = "tag"
metricLabelKeyStreamingOperation = "streaming"
metricLabelKeyClientName = "client_name"
metricLabelKeyClientUID = "client_uid"
Expand All @@ -59,6 +60,7 @@ const (
metricNameAttemptLatencies = "attempt_latencies"
metricNameServerLatencies = "server_latencies"
metricNameRetryCount = "retry_count"
metricNameDebugTags = "debug_tags"

// Metric units
metricUnitMS = "ms"
Expand All @@ -79,7 +81,7 @@ var (
800.0, 1000.0, 2000.0, 5000.0, 10000.0, 20000.0, 50000.0, 100000.0, 200000.0,
400000.0, 800000.0, 1600000.0, 3200000.0}

// All the built-in metrics have same attributes except 'status' and 'streaming'
// All the built-in metrics have same attributes except 'tag', 'status' and 'streaming'
// These attributes need to be added to only few of the metrics
metricsDetails = map[string]metricInfo{
metricNameOperationLatencies: {
Expand Down Expand Up @@ -148,6 +150,7 @@ type builtinMetricsTracerFactory struct {
serverLatencies metric.Float64Histogram
attemptLatencies metric.Float64Histogram
retryCount metric.Int64Counter
debugTags metric.Int64Counter
}

func newBuiltinMetricsTracerFactory(ctx context.Context, project, instance, appProfile string, metricsProvider MetricsProvider, opts ...option.ClientOption) (*builtinMetricsTracerFactory, error) {
Expand Down Expand Up @@ -253,6 +256,16 @@ func (tf *builtinMetricsTracerFactory) createInstruments(meter metric.Meter) err
metric.WithDescription("The number of additional RPCs sent after the initial attempt."),
metric.WithUnit(metricUnitCount),
)
if err != nil {
return err
}

// Create debug_tags
tf.debugTags, err = meter.Int64Counter(
metricNameDebugTags,
metric.WithDescription("A counter of internal client events used for debugging."),
metric.WithUnit(metricUnitCount),
)
return err
}

Expand All @@ -271,6 +284,7 @@ type builtinMetricsTracer struct {
instrumentServerLatencies metric.Float64Histogram
instrumentAttemptLatencies metric.Float64Histogram
instrumentRetryCount metric.Int64Counter
instrumentDebugTags metric.Int64Counter

tableName string
method string
Expand Down Expand Up @@ -363,6 +377,7 @@ func (tf *builtinMetricsTracerFactory) createBuiltinMetricsTracer(ctx context.Co
instrumentServerLatencies: tf.serverLatencies,
instrumentAttemptLatencies: tf.attemptLatencies,
instrumentRetryCount: tf.retryCount,
instrumentDebugTags: tf.debugTags,

tableName: tableName,
isStreaming: isStreaming,
Expand Down
3 changes: 2 additions & 1 deletion bigtable/metrics_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ import (
)

const (
defaultCluster = "unspecified"
defaultCluster = "<unspecified>"
defaultZone = "global"
defaultTable = "<unspecified>"
)

// get GFE latency in ms from response metadata
Expand Down
41 changes: 38 additions & 3 deletions storage/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,10 @@ import (
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/googleapis/gax-go/v2/apierror"
"go.opentelemetry.io/contrib/detectors/gcp"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/sdk/trace/tracetest"
"golang.org/x/oauth2/google"
"google.golang.org/api/googleapi"
"google.golang.org/api/iterator"
Expand Down Expand Up @@ -5859,10 +5862,10 @@ func TestIntegration_PostPolicyV4_SignedURL_WithSignBytes(t *testing.T) {
})
}

func TestIntegration_OCTracing(t *testing.T) {
func TestIntegration_OTelTracing(t *testing.T) {
multiTransportTest(context.Background(), t, func(t *testing.T, ctx context.Context, bucket string, _ string, client *Client) {
te := testutil.NewTestExporter()
defer te.Unregister()
te := newOpenTelemetryTestExporter()
defer te.Unregister(ctx)

bkt := client.Bucket(bucket)
bkt.Attrs(ctx)
Expand All @@ -5873,6 +5876,38 @@ func TestIntegration_OCTracing(t *testing.T) {
})
}

// openTelemetryTestExporter is a test utility exporter. It should be created
// with NewopenTelemetryTestExporter.
type openTelemetryTestExporter struct {
exporter *tracetest.InMemoryExporter
tp *sdktrace.TracerProvider
}

// newOpenTelemetryTestExporter creates a openTelemetryTestExporter with
// underlying InMemoryExporter and TracerProvider from OpenTelemetry.
func newOpenTelemetryTestExporter() *openTelemetryTestExporter {
exporter := tracetest.NewInMemoryExporter()
tp := sdktrace.NewTracerProvider(
sdktrace.WithSyncer(exporter),
sdktrace.WithSampler(sdktrace.AlwaysSample()),
)
otel.SetTracerProvider(tp)
return &openTelemetryTestExporter{
exporter: exporter,
tp: tp,
}
}

// Spans returns the current in-memory stored spans.
func (te *openTelemetryTestExporter) Spans() tracetest.SpanStubs {
return te.exporter.GetSpans()
}

// Unregister shuts down the underlying OpenTelemetry TracerProvider.
func (te *openTelemetryTestExporter) Unregister(ctx context.Context) {
te.tp.Shutdown(ctx)
}

// verifySignedURL gets the bytes at the provided url and verifies them against the
// expectedFileBody. Make sure the SignedURLOptions set the method as "GET".
func verifySignedURL(url string, headers map[string][]string, expectedFileBody []byte) error {
Expand Down

0 comments on commit 6324314

Please sign in to comment.