Skip to content

Commit

Permalink
fix!: shutdown otel resource before exiting
Browse files Browse the repository at this point in the history
relates to [PE-963](https://einride.atlassian.net/browse/PE-963)

BREAKING CHANGE: removed SHUTDOWNDELAY feature
  • Loading branch information
alethenorio committed Aug 9, 2024
1 parent 1ac9e65 commit 2a18e85
Show file tree
Hide file tree
Showing 8 changed files with 45 additions and 71 deletions.
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,6 @@ cloudrunner CLIENT_RETRY_RETRYABLESTATUSCODES []codes.Code
cloudrunner REQUESTLOGGER_MESSAGESIZELIMIT int 1024
cloudrunner REQUESTLOGGER_CODETOLEVEL map[codes.Code]zapcore.Level
cloudrunner REQUESTLOGGER_STATUSTOLEVEL map[int]zapcore.Level
cloudrunner SHUTDOWNDELAY time.Duration
Build-time configuration of grpc-server:
Expand Down
28 changes: 15 additions & 13 deletions cloudotel/metricexporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ package cloudotel

import (
"context"
"errors"
"fmt"
"strings"
"time"

metricexporter "github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/metric"
"go.einride.tech/cloudrunner/cloudruntime"
"go.einride.tech/cloudrunner/cloudzap"
hostinstrumentation "go.opentelemetry.io/contrib/instrumentation/host"
runtimeinstrumentation "go.opentelemetry.io/contrib/instrumentation/runtime"
"go.opentelemetry.io/otel"
Expand All @@ -18,7 +18,6 @@ import (
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource"
semconv "go.opentelemetry.io/otel/semconv/v1.21.0"
"go.uber.org/zap"
)

// MetricExporterConfig configures the metrics exporter.
Expand All @@ -35,9 +34,9 @@ func StartMetricExporter(
ctx context.Context,
exporterConfig MetricExporterConfig,
resource *resource.Resource,
) (func(), error) {
) (func(context.Context) error, error) {
if !exporterConfig.Enabled {
return func() {}, nil
return func(context.Context) error { return nil }, nil
}
projectID, ok := cloudruntime.ResolveProjectID(ctx)
if !ok {
Expand Down Expand Up @@ -81,23 +80,26 @@ func StartMetricExporter(
),
)
otel.SetMeterProvider(provider)
shutdown := func() {
if err := provider.Shutdown(context.Background()); err != nil {
if logger, ok := cloudzap.GetLogger(ctx); ok {
logger.Warn("error stopping metric provider, final metric export might have failed", zap.Error(err))
}
shutdown := func(ctx context.Context) error {
if err := provider.Shutdown(ctx); err != nil {
return fmt.Errorf("error stopping metric provider, final metric export might have failed: %v", err)
}
return nil
}
if exporterConfig.RuntimeInstrumentation {
if err := runtimeinstrumentation.Start(); err != nil {
shutdown()
return nil, fmt.Errorf("start metric exporter: start runtime instrumentation: %w", err)
return nil, errors.Join(
shutdown(ctx),
fmt.Errorf("start metric exporter: start runtime instrumentation: %w", err),
)
}
}
if exporterConfig.HostInstrumentation {
if err := hostinstrumentation.Start(); err != nil {
shutdown()
return nil, fmt.Errorf("start metric exporter: start host instrumentation: %w", err)
return nil, errors.Join(
shutdown(ctx),
fmt.Errorf("start metric exporter: start host instrumentation: %w", err),
)
}
}
return shutdown, nil
Expand Down
21 changes: 8 additions & 13 deletions cloudotel/traceexporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,11 @@ import (
traceexporter "github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/trace"
gcppropagator "github.com/GoogleCloudPlatform/opentelemetry-operations-go/propagator"
"go.einride.tech/cloudrunner/cloudruntime"
"go.einride.tech/cloudrunner/cloudzap"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/bridge/opencensus"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.uber.org/zap"
)

// TraceExporterConfig configures the trace exporter.
Expand All @@ -29,18 +27,14 @@ func StartTraceExporter(
ctx context.Context,
exporterConfig TraceExporterConfig,
resource *resource.Resource,
) (func(), error) {
) (func(context.Context) error, error) {
if !exporterConfig.Enabled {
return func() {}, nil
return func(context.Context) error { return nil }, nil
}
projectID, ok := cloudruntime.ResolveProjectID(ctx)
if !ok {
return nil, fmt.Errorf("start trace exporter: unknown project ID")
}
logger, ok := cloudzap.GetLogger(ctx)
if !ok {
return nil, fmt.Errorf("start trace exporter: no logger in context")
}
exporter, err := traceexporter.New(
traceexporter.WithProjectID(projectID),
traceexporter.WithTimeout(exporterConfig.Timeout),
Expand All @@ -62,13 +56,14 @@ func StartTraceExporter(
))
opencensus.InstallTraceBridge()

cleanup := func() {
if err := tracerProvider.ForceFlush(context.Background()); err != nil {
logger.Error("error shutting down trace exporter", zap.Error(err))
cleanup := func(ctx context.Context) error {
if err := tracerProvider.ForceFlush(ctx); err != nil {
return fmt.Errorf("error shutting down trace exporter: %v", err)
}
if err := tracerProvider.Shutdown(context.Background()); err != nil {
logger.Error("error shutting down trace exporter", zap.Error(err))
if err := tracerProvider.Shutdown(ctx); err != nil {
return fmt.Errorf("error shutting down trace exporter: %v", err)
}
return nil
}
return cleanup, nil
}
2 changes: 1 addition & 1 deletion cloudtrace/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,6 @@ func StartExporter(
ctx context.Context,
exporterConfig ExporterConfig,
resource *resource.Resource,
) (func(), error) {
) (func(context.Context) error, error) {
return cloudotel.StartTraceExporter(ctx, exporterConfig, resource)
}
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ require (
gotest.tools/v3 v3.5.1
)

require github.com/rogpeppe/go-internal v1.12.0 // indirect

require (
cloud.google.com/go v0.115.0 // indirect
cloud.google.com/go/auth v0.7.3 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,8 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN
github.com/power-devops/perfstat v0.0.0-20240221224432-82ca36839d55 h1:o4JXh1EVt9k/+g42oCprj/FisM4qX9L3sZB3upGN2ZU=
github.com/power-devops/perfstat v0.0.0-20240221224432-82ca36839d55/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog=
github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8=
github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4=
github.com/shirou/gopsutil/v4 v4.24.6 h1:9qqCSYF2pgOU+t+NgJtp7Co5+5mHF/HyKBUckySQL64=
github.com/shirou/gopsutil/v4 v4.24.6/go.mod h1:aoebb2vxetJ/yIDZISmduFvVNPHqXQ9SEJwRXxkf0RA=
github.com/shoenig/go-m1cpu v0.1.6 h1:nxdKQNcEB6vzgA2E2bvzKIYRuNj7XNJ4S/aRSwKzFtM=
Expand Down
36 changes: 17 additions & 19 deletions run.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cloudrunner

import (
"context"
"errors"
"flag"
"fmt"
"log/slog"
Expand Down Expand Up @@ -45,11 +46,6 @@ type runConfig struct {
Client cloudclient.Config
// RequestLogger contains request logging config.
RequestLogger cloudrequestlog.Config
// Artificial shutdown delay, allows for the service to
// process all incoming requests properly, before cancelling
// the root context.
// Note: Values higher than 10s will not be respected by cloudrun itself.
ShutdownDelay time.Duration
}

// Run a service.
Expand Down Expand Up @@ -110,18 +106,6 @@ func Run(fn func(context.Context) error, options ...Option) (err error) {
slog.SetDefault(newSlogger(logger))
run.loggerMiddleware.Logger = logger
ctx = cloudzap.WithLogger(ctx, logger)
// Set up shutdown delay
if run.config.ShutdownDelay.Seconds() != 0 {
sigCtx := ctx
ctx, cancel = context.WithCancel(context.WithoutCancel(ctx))
go func() {
<-sigCtx.Done()
logger.Info("delaying shutdown", zap.Duration("duration", run.config.ShutdownDelay))
time.Sleep(run.config.ShutdownDelay)
cancel()
}()
defer cancel()
}
if err := cloudprofiler.Start(run.config.Profiler); err != nil {
return fmt.Errorf("cloudrunner.Run: %w", err)
}
Expand All @@ -133,14 +117,28 @@ func Run(fn func(context.Context) error, options ...Option) (err error) {
if err != nil {
return fmt.Errorf("cloudrunner.Run: %w", err)
}
defer stopTraceExporter()
stopMetricExporter, err := cloudotel.StartMetricExporter(ctx, run.config.MetricExporter, resource)
if err != nil {
return fmt.Errorf("cloudrunner.Run: %w", err)
}
defer stopMetricExporter()
cloudotel.RegisterErrorHandler(ctx)
buildInfo, _ := debug.ReadBuildInfo()
go func() {
<-ctx.Done()
// Cloud Run sends a SIGTERM and allows for 10 seconds before it completely shuts down
// the instance.
// See https://cloud.google.com/run/docs/container-contract#instance-shutdown for more details.
shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
logger.Info("shutting down")
err := errors.Join(
stopTraceExporter(shutdownCtx),
stopMetricExporter(shutdownCtx),
)
if err != nil {
logger.Warn("unable to call shutdown routines:\n", zap.Error(err))
}
}()
logger.Info(
"up and running",
zap.Object("config", config),
Expand Down
22 changes: 0 additions & 22 deletions run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,10 @@ package cloudrunner_test
import (
"context"
"log"
"syscall"
"testing"
"time"

"go.einride.tech/cloudrunner"
"google.golang.org/grpc/health"
"google.golang.org/grpc/health/grpc_health_v1"
"gotest.tools/v3/assert"
)

func ExampleRun_helloWorld() {
Expand All @@ -22,24 +18,6 @@ func ExampleRun_helloWorld() {
}
}

func Test_helloWorldShutdownTimeout(t *testing.T) {
t.Setenv("SHUTDOWNDELAY", "1s")
if err := cloudrunner.Run(func(ctx context.Context) error {
cloudrunner.Logger(ctx).Info("hello world")
beforeKill := time.Now()
go func() {
// Simulating seeding a SIGTERM call.
_ = syscall.Kill(syscall.Getpid(), syscall.SIGTERM)
}()
<-ctx.Done()
afterKill := time.Now()
assert.Assert(t, afterKill.Sub(beforeKill).Seconds() > 1.0)
return nil
}); err != nil {
log.Fatal(err)
}
}

func ExampleRun_gRPCServer() {
if err := cloudrunner.Run(func(ctx context.Context) error {
grpcServer := cloudrunner.NewGRPCServer(ctx)
Expand Down

0 comments on commit 2a18e85

Please sign in to comment.