From 4709cb5547e02cced25b4c51c9ad3895e563081f Mon Sep 17 00:00:00 2001 From: Joshua Hawxwell Date: Mon, 4 Nov 2024 08:11:53 +0000 Subject: [PATCH] Add xray to event-received lambda --- cmd/event-received/factory.go | 5 +- cmd/event-received/main.go | 120 +++++++++++++----- cmd/schedule-runner/main.go | 63 ++++----- docker/event-received/Dockerfile | 10 ++ .../region/modules/event_received/main.tf | 6 +- 5 files changed, 139 insertions(+), 65 deletions(-) diff --git a/cmd/event-received/factory.go b/cmd/event-received/factory.go index ea0cbb8c2a..e839f17fd9 100644 --- a/cmd/event-received/factory.go +++ b/cmd/event-received/factory.go @@ -69,6 +69,7 @@ type Factory struct { searchIndexName string searchIndexingEnabled bool eventClient EventClient + httpClient *http.Client // previously constructed values appData *appcontext.Data @@ -122,7 +123,7 @@ func (f *Factory) AppData() (appcontext.Data, error) { func (f *Factory) LambdaClient() LambdaClient { if f.lambdaClient == nil { - f.lambdaClient = lambda.New(f.cfg, v4.NewSigner(), &http.Client{Timeout: 10 * time.Second}, time.Now) + f.lambdaClient = lambda.New(f.cfg, v4.NewSigner(), f.httpClient, time.Now) } return f.lambdaClient @@ -158,7 +159,7 @@ func (f *Factory) ShareCodeSender(ctx context.Context) (ShareCodeSender, error) return nil, fmt.Errorf("failed to get notify API secret: %w", err) } - notifyClient, err := notify.New(f.logger, f.notifyIsProduction, f.notifyBaseURL, notifyApiKey, http.DefaultClient, event.NewClient(f.cfg, f.eventBusName), bundle) + notifyClient, err := notify.New(f.logger, f.notifyIsProduction, f.notifyBaseURL, notifyApiKey, f.httpClient, event.NewClient(f.cfg, f.eventBusName), bundle) if err != nil { return nil, err } diff --git a/cmd/event-received/main.go b/cmd/event-received/main.go index d88e8d9a94..755cd1c82e 100644 --- a/cmd/event-received/main.go +++ b/cmd/event-received/main.go @@ -8,6 +8,7 @@ import ( "errors" "fmt" "log/slog" + "net/http" "os" "time" @@ -22,12 +23,39 @@ import ( "github.com/ministryofjustice/opg-modernising-lpa/internal/event" "github.com/ministryofjustice/opg-modernising-lpa/internal/random" "github.com/ministryofjustice/opg-modernising-lpa/internal/s3" + "github.com/ministryofjustice/opg-modernising-lpa/internal/telemetry" + "go.opentelemetry.io/contrib/instrumentation/github.com/aws/aws-lambda-go/otellambda" + "go.opentelemetry.io/contrib/instrumentation/github.com/aws/aws-lambda-go/otellambda/xrayconfig" + "go.opentelemetry.io/contrib/instrumentation/github.com/aws/aws-sdk-go-v2/otelaws" + "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" + "go.opentelemetry.io/otel/sdk/trace" ) const ( virusFound = "infected" ) +var ( + tableName = os.Getenv("LPAS_TABLE") + notifyIsProduction = os.Getenv("GOVUK_NOTIFY_IS_PRODUCTION") == "1" + appPublicURL = os.Getenv("APP_PUBLIC_URL") + awsBaseURL = os.Getenv("AWS_BASE_URL") + notifyBaseURL = os.Getenv("GOVUK_NOTIFY_BASE_URL") + evidenceBucketName = os.Getenv("UPLOADS_S3_BUCKET_NAME") + uidBaseURL = os.Getenv("UID_BASE_URL") + lpaStoreBaseURL = os.Getenv("LPA_STORE_BASE_URL") + lpaStoreSecretARN = os.Getenv("LPA_STORE_SECRET_ARN") + eventBusName = cmp.Or(os.Getenv("EVENT_BUS_NAME"), "default") + searchEndpoint = os.Getenv("SEARCH_ENDPOINT") + searchIndexName = cmp.Or(os.Getenv("SEARCH_INDEX_NAME"), "lpas") + searchIndexingEnabled = os.Getenv("SEARCH_INDEXING_DISABLED") != "1" + xrayEnabled = os.Getenv("XRAY_ENABLED") == "1" + + cfg aws.Config + httpClient *http.Client + logger *slog.Logger +) + type factory interface { Now() func() time.Time DynamoClient() dynamodbClient @@ -94,38 +122,8 @@ func (e *Event) UnmarshalJSON(data []byte) error { } func handler(ctx context.Context, event Event) (map[string]any, error) { - var ( - tableName = os.Getenv("LPAS_TABLE") - notifyIsProduction = os.Getenv("GOVUK_NOTIFY_IS_PRODUCTION") == "1" - appPublicURL = os.Getenv("APP_PUBLIC_URL") - awsBaseURL = os.Getenv("AWS_BASE_URL") - notifyBaseURL = os.Getenv("GOVUK_NOTIFY_BASE_URL") - evidenceBucketName = os.Getenv("UPLOADS_S3_BUCKET_NAME") - uidBaseURL = os.Getenv("UID_BASE_URL") - lpaStoreBaseURL = os.Getenv("LPA_STORE_BASE_URL") - lpaStoreSecretARN = os.Getenv("LPA_STORE_SECRET_ARN") - eventBusName = cmp.Or(os.Getenv("EVENT_BUS_NAME"), "default") - searchEndpoint = os.Getenv("SEARCH_ENDPOINT") - searchIndexName = cmp.Or(os.Getenv("SEARCH_INDEX_NAME"), "lpas") - searchIndexingEnabled = os.Getenv("SEARCH_INDEXING_DISABLED") != "1" - ) - - logger := slog.New(slog.NewJSONHandler(os.Stdout, nil). - WithAttrs([]slog.Attr{ - slog.String("service_name", "opg-modernising-lpa/event-received"), - })) - result := map[string]any{} - cfg, err := config.LoadDefaultConfig(ctx) - if err != nil { - return result, fmt.Errorf("failed to load default config: %w", err) - } - - if len(awsBaseURL) > 0 { - cfg.BaseEndpoint = aws.String(awsBaseURL) - } - dynamoClient, err := dynamo.NewClient(cfg, tableName) if err != nil { return result, fmt.Errorf("failed to create dynamodb client: %w", err) @@ -158,6 +156,7 @@ func handler(ctx context.Context, event Event) (map[string]any, error) { searchEndpoint: searchEndpoint, searchIndexName: searchIndexName, searchIndexingEnabled: searchIndexingEnabled, + httpClient: httpClient, } if event.SQSEvent != nil { @@ -200,13 +199,72 @@ func handleCloudWatchEvent(ctx context.Context, factory *Factory, event *events. return fmt.Errorf("unknown event received: %s", string(eJson)) } + logger.InfoContext(ctx, "handling event", slog.String("source", event.Source), slog.String("detailType", event.DetailType)) if err := handler.Handle(ctx, factory, event); err != nil { return fmt.Errorf("%s: %w", event.DetailType, err) } + logger.InfoContext(ctx, "successfully handled event") return nil } func main() { - lambda.Start(handler) + ctx := context.Background() + + httpClient = &http.Client{Timeout: 30 * time.Second} + + logger = slog.New(telemetry.NewSlogHandler(slog. + NewJSONHandler(os.Stdout, &slog.HandlerOptions{ + ReplaceAttr: func(_ []string, a slog.Attr) slog.Attr { + switch a.Value.Kind() { + case slog.KindAny: + switch v := a.Value.Any().(type) { + case *http.Request: + return slog.Group(a.Key, + slog.String("method", v.Method), + slog.String("uri", v.URL.String())) + } + } + + return a + }, + })). + WithAttrs([]slog.Attr{ + slog.String("service_name", "opg-modernising-lpa/event-received"), + })) + + var err error + cfg, err = config.LoadDefaultConfig(ctx) + if err != nil { + logger.ErrorContext(ctx, "failed to load default config", slog.Any("err", err)) + return + } + + if len(awsBaseURL) > 0 { + cfg.BaseEndpoint = aws.String(awsBaseURL) + } + + var tp *trace.TracerProvider + if xrayEnabled { + tp, err = telemetry.SetupLambda(ctx) + if err != nil { + logger.WarnContext(ctx, "error creating tracer provider", slog.Any("err", err)) + } + } + + if tp != nil { + otelaws.AppendMiddlewares(&cfg.APIOptions) + telemetry.AppendMiddlewares(&cfg.APIOptions) + httpClient.Transport = otelhttp.NewTransport(httpClient.Transport) + + defer func(ctx context.Context) { + if err := tp.Shutdown(ctx); err != nil { + logger.WarnContext(ctx, "error shutting down tracer provider", slog.Any("err", err)) + } + }(ctx) + + lambda.Start(otellambda.InstrumentHandler(handler, xrayconfig.WithRecommendedOptions(tp)...)) + } else { + lambda.Start(handler) + } } diff --git a/cmd/schedule-runner/main.go b/cmd/schedule-runner/main.go index 8db81fbebf..7639be700c 100644 --- a/cmd/schedule-runner/main.go +++ b/cmd/schedule-runner/main.go @@ -25,7 +25,7 @@ import ( "go.opentelemetry.io/contrib/instrumentation/github.com/aws/aws-lambda-go/otellambda/xrayconfig" "go.opentelemetry.io/contrib/instrumentation/github.com/aws/aws-sdk-go-v2/otelaws" "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" - "google.golang.org/appengine/log" + "go.opentelemetry.io/otel/sdk/trace" ) var ( @@ -41,15 +41,10 @@ var ( httpClient *http.Client cfg aws.Config - logHandler slog.Handler + logger *slog.Logger ) func handleRunSchedule(ctx context.Context) error { - logger := slog.New(logHandler. - WithAttrs([]slog.Attr{ - slog.String("service_name", "opg-modernising-lpa/schedule-runner"), - })) - secretsClient, err := secrets.NewClient(cfg, time.Hour) if err != nil { return err @@ -98,54 +93,60 @@ func handleRunSchedule(ctx context.Context) error { func main() { ctx := context.Background() + httpClient = &http.Client{Timeout: time.Second * 30} + + logger = slog.New(telemetry.NewSlogHandler(slog. + NewJSONHandler(os.Stdout, &slog.HandlerOptions{ + ReplaceAttr: func(_ []string, a slog.Attr) slog.Attr { + switch a.Value.Kind() { + case slog.KindAny: + switch v := a.Value.Any().(type) { + case *http.Request: + return slog.Group(a.Key, + slog.String("method", v.Method), + slog.String("uri", v.URL.String())) + } + } + + return a + }, + })). + WithAttrs([]slog.Attr{ + slog.String("service_name", "opg-modernising-lpa/schedule-runner"), + })) + var err error cfg, err = config.LoadDefaultConfig(ctx) if err != nil { - log.Errorf(ctx, "failed to load default config: %v", err) + logger.ErrorContext(ctx, "failed to load default config", slog.Any("err", err)) return } - httpClient = &http.Client{Timeout: time.Second * 30} - if len(awsBaseURL) > 0 { cfg.BaseEndpoint = aws.String(awsBaseURL) } + var tp *trace.TracerProvider if xrayEnabled { - tp, err := telemetry.SetupLambda(ctx) + tp, err = telemetry.SetupLambda(ctx) if err != nil { - fmt.Printf("error creating tracer provider: %v", err) + logger.WarnContext(ctx, "error creating tracer provider", slog.Any("err", err)) } + } + if tp != nil { otelaws.AppendMiddlewares(&cfg.APIOptions) + telemetry.AppendMiddlewares(&cfg.APIOptions) httpClient.Transport = otelhttp.NewTransport(httpClient.Transport) defer func(ctx context.Context) { if err := tp.Shutdown(ctx); err != nil { - fmt.Printf("error shutting down tracer provider: %v", err) + logger.WarnContext(ctx, "error shutting down tracer provider", slog.Any("err", err)) } }(ctx) - logHandler = telemetry.NewSlogHandler(slog. - NewJSONHandler(os.Stdout, &slog.HandlerOptions{ - ReplaceAttr: func(_ []string, a slog.Attr) slog.Attr { - switch a.Value.Kind() { - case slog.KindAny: - switch v := a.Value.Any().(type) { - case *http.Request: - return slog.Group(a.Key, - slog.String("method", v.Method), - slog.String("uri", v.URL.String())) - } - } - - return a - }, - })) - lambda.Start(otellambda.InstrumentHandler(handleRunSchedule, xrayconfig.WithRecommendedOptions(tp)...)) } else { - logHandler = slog.NewJSONHandler(os.Stdout, nil) lambda.Start(handleRunSchedule) } } diff --git a/docker/event-received/Dockerfile b/docker/event-received/Dockerfile index 19865b7ef6..34d4ed29e1 100644 --- a/docker/event-received/Dockerfile +++ b/docker/event-received/Dockerfile @@ -16,8 +16,12 @@ WORKDIR /app COPY --from=build /app/event-received /var/task/event-received COPY --link lang /var/task/lang +COPY --link ./docker/adot-collector/ /opt COPY --link docker/aws-lambda-rie ./aws-lambda-rie +ENV AWS_LAMBDA_EXEC_WRAPPER=/opt/otel-handler +ENV OPENTELEMETRY_COLLECTOR_CONFIG_FILE="/opt/config/config.yaml" + ENTRYPOINT ["./event-received"] FROM public.ecr.aws/lambda/provided:al2023.2024.10.14.12 AS production @@ -30,5 +34,11 @@ RUN chmod +x /app/install_lambda_insights.sh \ COPY --from=build /app/event-received ./event-received COPY --link lang ./lang +COPY --link ./docker/adot-collector/ /opt + +RUN chmod 755 /opt/config/config.yaml + +ENV AWS_LAMBDA_EXEC_WRAPPER=/opt/otel-handler +ENV OPENTELEMETRY_COLLECTOR_CONFIG_FILE="/opt/config/config.yaml" ENTRYPOINT ["./event-received"] diff --git a/terraform/environment/region/modules/event_received/main.tf b/terraform/environment/region/modules/event_received/main.tf index 9cc3639234..7f4841446e 100644 --- a/terraform/environment/region/modules/event_received/main.tf +++ b/terraform/environment/region/modules/event_received/main.tf @@ -15,6 +15,7 @@ module "event_received" { SEARCH_INDEXING_DISABLED = 1 EVENT_BUS_NAME = var.event_bus_name JWT_KEY_SECRET_ARN = data.aws_secretsmanager_secret.lpa_store_jwt_key.arn + XRAY_ENABLED = 1 } image_uri = "${var.lambda_function_image_ecr_url}:${var.lambda_function_image_tag}" aws_iam_role = var.event_received_lambda_role @@ -345,7 +346,10 @@ data "aws_iam_policy_document" "event_received" { effect = "Allow" actions = [ "xray:PutTraceSegments", - "xray:PutTelemetryRecords" + "xray:PutTelemetryRecords", + "xray:GetSamplingRules", + "xray:GetSamplingTargets", + "xray:GetSamplingStatisticSummaries", ] resources = ["*"] }