diff --git a/bigtable/bigtable.go b/bigtable/bigtable.go index b08785658714..2101e1595606 100644 --- a/bigtable/bigtable.go +++ b/bigtable/bigtable.go @@ -32,6 +32,7 @@ import ( btopt "cloud.google.com/go/bigtable/internal/option" "cloud.google.com/go/internal/trace" gax "github.com/googleapis/gax-go/v2" + "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" "google.golang.org/api/option" "google.golang.org/api/option/internaloption" @@ -95,6 +96,18 @@ func NewClient(ctx context.Context, project, instance string, opts ...option.Cli // NewClientWithConfig creates a new client with the given config. func NewClientWithConfig(ctx context.Context, project, instance string, config ClientConfig, opts ...option.ClientOption) (*Client, error) { + metricsProvider := config.MetricsProvider + if emulatorAddr := os.Getenv("BIGTABLE_EMULATOR_HOST"); emulatorAddr != "" { + // Do not emit metrics when emulator is being used + metricsProvider = NoopMetricsProvider{} + } + + // Create a OpenTelemetry metrics configuration + metricsTracerFactory, err := newBuiltinMetricsTracerFactory(ctx, project, instance, config.AppProfile, metricsProvider, opts...) + if err != nil { + return nil, err + } + o, err := btopt.DefaultClientOptions(prodAddr, mtlsProdAddr, Scope, clientUserAgent) if err != nil { return nil, err @@ -112,21 +125,24 @@ func NewClientWithConfig(ctx context.Context, project, instance string, config C // Allow non-default service account in DirectPath. o = append(o, internaloption.AllowNonDefaultServiceAccount(true)) o = append(o, opts...) - connPool, err := gtransport.DialPool(ctx, o...) - if err != nil { - return nil, fmt.Errorf("dialing: %w", err) - } - metricsProvider := config.MetricsProvider - if emulatorAddr := os.Getenv("BIGTABLE_EMULATOR_HOST"); emulatorAddr != "" { - // Do not emit metrics when emulator is being used - metricsProvider = NoopMetricsProvider{} - } + asyncRefreshMetricAttrs := metricsTracerFactory.clientAttributes + asyncRefreshMetricAttrs = append(asyncRefreshMetricAttrs, + attribute.String(metricLabelKeyTag, "async_refresh_dry_run"), - // Create a OpenTelemetry metrics configuration - metricsTracerFactory, err := newBuiltinMetricsTracerFactory(ctx, project, instance, config.AppProfile, metricsProvider, opts...) + // Table, cluster and zone are unknown at this point + // Use default values + attribute.String(monitoredResLabelKeyTable, defaultTable), + attribute.String(monitoredResLabelKeyCluster, defaultCluster), + attribute.String(monitoredResLabelKeyZone, defaultZone), + ) + o = append(o, internaloption.EnableAsyncRefreshDryRun(func() { + metricsTracerFactory.debugTags.Add(context.Background(), 1, + metric.WithAttributes(asyncRefreshMetricAttrs...)) + })) + connPool, err := gtransport.DialPool(ctx, o...) if err != nil { - return nil, err + return nil, fmt.Errorf("dialing: %w", err) } return &Client{ diff --git a/bigtable/conformance_test.sh b/bigtable/conformance_test.sh index bf6f520a6b0c..d380758315fc 100755 --- a/bigtable/conformance_test.sh +++ b/bigtable/conformance_test.sh @@ -50,10 +50,10 @@ trap cleanup EXIT # Run the conformance tests cd $conformanceTestsHome -# Tests in https://github.com/googleapis/cloud-bigtable-clients-test/tree/main/tests can only be run on go1.22.5 -go install golang.org/dl/go1.22.5@latest -go1.22.5 download -go1.22.5 test -v -proxy_addr=:$testProxyPort | tee -a $sponge_log +# Tests in https://github.com/googleapis/cloud-bigtable-clients-test/tree/main/tests can only be run on go1.22.7 +go install golang.org/dl/go1.22.7@latest +go1.22.7 download +go1.22.7 test -v -proxy_addr=:$testProxyPort | tee -a $sponge_log RETURN_CODE=$? echo "exiting with ${RETURN_CODE}" diff --git a/bigtable/metrics.go b/bigtable/metrics.go index b8c96a768030..5a3b817abf7c 100644 --- a/bigtable/metrics.go +++ b/bigtable/metrics.go @@ -50,6 +50,7 @@ const ( metricLabelKeyAppProfile = "app_profile" metricLabelKeyMethod = "method" metricLabelKeyStatus = "status" + metricLabelKeyTag = "tag" metricLabelKeyStreamingOperation = "streaming" metricLabelKeyClientName = "client_name" metricLabelKeyClientUID = "client_uid" @@ -59,6 +60,7 @@ const ( metricNameAttemptLatencies = "attempt_latencies" metricNameServerLatencies = "server_latencies" metricNameRetryCount = "retry_count" + metricNameDebugTags = "debug_tags" // Metric units metricUnitMS = "ms" @@ -79,7 +81,7 @@ var ( 800.0, 1000.0, 2000.0, 5000.0, 10000.0, 20000.0, 50000.0, 100000.0, 200000.0, 400000.0, 800000.0, 1600000.0, 3200000.0} - // All the built-in metrics have same attributes except 'status' and 'streaming' + // All the built-in metrics have same attributes except 'tag', 'status' and 'streaming' // These attributes need to be added to only few of the metrics metricsDetails = map[string]metricInfo{ metricNameOperationLatencies: { @@ -148,6 +150,7 @@ type builtinMetricsTracerFactory struct { serverLatencies metric.Float64Histogram attemptLatencies metric.Float64Histogram retryCount metric.Int64Counter + debugTags metric.Int64Counter } func newBuiltinMetricsTracerFactory(ctx context.Context, project, instance, appProfile string, metricsProvider MetricsProvider, opts ...option.ClientOption) (*builtinMetricsTracerFactory, error) { @@ -253,6 +256,16 @@ func (tf *builtinMetricsTracerFactory) createInstruments(meter metric.Meter) err metric.WithDescription("The number of additional RPCs sent after the initial attempt."), metric.WithUnit(metricUnitCount), ) + if err != nil { + return err + } + + // Create debug_tags + tf.debugTags, err = meter.Int64Counter( + metricNameDebugTags, + metric.WithDescription("A counter of internal client events used for debugging."), + metric.WithUnit(metricUnitCount), + ) return err } @@ -271,6 +284,7 @@ type builtinMetricsTracer struct { instrumentServerLatencies metric.Float64Histogram instrumentAttemptLatencies metric.Float64Histogram instrumentRetryCount metric.Int64Counter + instrumentDebugTags metric.Int64Counter tableName string method string @@ -363,6 +377,7 @@ func (tf *builtinMetricsTracerFactory) createBuiltinMetricsTracer(ctx context.Co instrumentServerLatencies: tf.serverLatencies, instrumentAttemptLatencies: tf.attemptLatencies, instrumentRetryCount: tf.retryCount, + instrumentDebugTags: tf.debugTags, tableName: tableName, isStreaming: isStreaming, diff --git a/bigtable/metrics_util.go b/bigtable/metrics_util.go index 8783f6ff4b21..64a1fb50e4d2 100644 --- a/bigtable/metrics_util.go +++ b/bigtable/metrics_util.go @@ -28,8 +28,9 @@ import ( ) const ( - defaultCluster = "unspecified" + defaultCluster = "" defaultZone = "global" + defaultTable = "" ) // get GFE latency in ms from response metadata diff --git a/storage/integration_test.go b/storage/integration_test.go index a9dc265c0ba5..e28afae60850 100644 --- a/storage/integration_test.go +++ b/storage/integration_test.go @@ -54,7 +54,10 @@ import ( "github.com/google/go-cmp/cmp/cmpopts" "github.com/googleapis/gax-go/v2/apierror" "go.opentelemetry.io/contrib/detectors/gcp" + "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/sdk/resource" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/sdk/trace/tracetest" "golang.org/x/oauth2/google" "google.golang.org/api/googleapi" "google.golang.org/api/iterator" @@ -5859,10 +5862,10 @@ func TestIntegration_PostPolicyV4_SignedURL_WithSignBytes(t *testing.T) { }) } -func TestIntegration_OCTracing(t *testing.T) { +func TestIntegration_OTelTracing(t *testing.T) { multiTransportTest(context.Background(), t, func(t *testing.T, ctx context.Context, bucket string, _ string, client *Client) { - te := testutil.NewTestExporter() - defer te.Unregister() + te := newOpenTelemetryTestExporter() + defer te.Unregister(ctx) bkt := client.Bucket(bucket) bkt.Attrs(ctx) @@ -5873,6 +5876,38 @@ func TestIntegration_OCTracing(t *testing.T) { }) } +// openTelemetryTestExporter is a test utility exporter. It should be created +// with NewopenTelemetryTestExporter. +type openTelemetryTestExporter struct { + exporter *tracetest.InMemoryExporter + tp *sdktrace.TracerProvider +} + +// newOpenTelemetryTestExporter creates a openTelemetryTestExporter with +// underlying InMemoryExporter and TracerProvider from OpenTelemetry. +func newOpenTelemetryTestExporter() *openTelemetryTestExporter { + exporter := tracetest.NewInMemoryExporter() + tp := sdktrace.NewTracerProvider( + sdktrace.WithSyncer(exporter), + sdktrace.WithSampler(sdktrace.AlwaysSample()), + ) + otel.SetTracerProvider(tp) + return &openTelemetryTestExporter{ + exporter: exporter, + tp: tp, + } +} + +// Spans returns the current in-memory stored spans. +func (te *openTelemetryTestExporter) Spans() tracetest.SpanStubs { + return te.exporter.GetSpans() +} + +// Unregister shuts down the underlying OpenTelemetry TracerProvider. +func (te *openTelemetryTestExporter) Unregister(ctx context.Context) { + te.tp.Shutdown(ctx) +} + // verifySignedURL gets the bytes at the provided url and verifies them against the // expectedFileBody. Make sure the SignedURLOptions set the method as "GET". func verifySignedURL(url string, headers map[string][]string, expectedFileBody []byte) error {