Skip to content

Commit

Permalink
Merge pull request #1591 from ministryofjustice/MLPAB-2619-event-rece…
Browse files Browse the repository at this point in the history
…ived-xray

MLPAB-2619 Add xray to event-received lambda
  • Loading branch information
hawx authored Nov 5, 2024
2 parents e6bebe1 + 4709cb5 commit c536d42
Show file tree
Hide file tree
Showing 5 changed files with 139 additions and 65 deletions.
5 changes: 3 additions & 2 deletions cmd/event-received/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ type Factory struct {
searchIndexName string
searchIndexingEnabled bool
eventClient EventClient
httpClient *http.Client

// previously constructed values
appData *appcontext.Data
Expand Down Expand Up @@ -122,7 +123,7 @@ func (f *Factory) AppData() (appcontext.Data, error) {

func (f *Factory) LambdaClient() LambdaClient {
if f.lambdaClient == nil {
f.lambdaClient = lambda.New(f.cfg, v4.NewSigner(), &http.Client{Timeout: 10 * time.Second}, time.Now)
f.lambdaClient = lambda.New(f.cfg, v4.NewSigner(), f.httpClient, time.Now)
}

return f.lambdaClient
Expand Down Expand Up @@ -158,7 +159,7 @@ func (f *Factory) ShareCodeSender(ctx context.Context) (ShareCodeSender, error)
return nil, fmt.Errorf("failed to get notify API secret: %w", err)
}

notifyClient, err := notify.New(f.logger, f.notifyIsProduction, f.notifyBaseURL, notifyApiKey, http.DefaultClient, event.NewClient(f.cfg, f.eventBusName), bundle)
notifyClient, err := notify.New(f.logger, f.notifyIsProduction, f.notifyBaseURL, notifyApiKey, f.httpClient, event.NewClient(f.cfg, f.eventBusName), bundle)
if err != nil {
return nil, err
}
Expand Down
120 changes: 89 additions & 31 deletions cmd/event-received/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"errors"
"fmt"
"log/slog"
"net/http"
"os"
"time"

Expand All @@ -22,12 +23,39 @@ import (
"github.com/ministryofjustice/opg-modernising-lpa/internal/event"
"github.com/ministryofjustice/opg-modernising-lpa/internal/random"
"github.com/ministryofjustice/opg-modernising-lpa/internal/s3"
"github.com/ministryofjustice/opg-modernising-lpa/internal/telemetry"
"go.opentelemetry.io/contrib/instrumentation/github.com/aws/aws-lambda-go/otellambda"
"go.opentelemetry.io/contrib/instrumentation/github.com/aws/aws-lambda-go/otellambda/xrayconfig"
"go.opentelemetry.io/contrib/instrumentation/github.com/aws/aws-sdk-go-v2/otelaws"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"go.opentelemetry.io/otel/sdk/trace"
)

const (
virusFound = "infected"
)

var (
tableName = os.Getenv("LPAS_TABLE")
notifyIsProduction = os.Getenv("GOVUK_NOTIFY_IS_PRODUCTION") == "1"
appPublicURL = os.Getenv("APP_PUBLIC_URL")
awsBaseURL = os.Getenv("AWS_BASE_URL")
notifyBaseURL = os.Getenv("GOVUK_NOTIFY_BASE_URL")
evidenceBucketName = os.Getenv("UPLOADS_S3_BUCKET_NAME")
uidBaseURL = os.Getenv("UID_BASE_URL")
lpaStoreBaseURL = os.Getenv("LPA_STORE_BASE_URL")
lpaStoreSecretARN = os.Getenv("LPA_STORE_SECRET_ARN")
eventBusName = cmp.Or(os.Getenv("EVENT_BUS_NAME"), "default")
searchEndpoint = os.Getenv("SEARCH_ENDPOINT")
searchIndexName = cmp.Or(os.Getenv("SEARCH_INDEX_NAME"), "lpas")
searchIndexingEnabled = os.Getenv("SEARCH_INDEXING_DISABLED") != "1"
xrayEnabled = os.Getenv("XRAY_ENABLED") == "1"

cfg aws.Config
httpClient *http.Client
logger *slog.Logger
)

type factory interface {
Now() func() time.Time
DynamoClient() dynamodbClient
Expand Down Expand Up @@ -94,38 +122,8 @@ func (e *Event) UnmarshalJSON(data []byte) error {
}

func handler(ctx context.Context, event Event) (map[string]any, error) {
var (
tableName = os.Getenv("LPAS_TABLE")
notifyIsProduction = os.Getenv("GOVUK_NOTIFY_IS_PRODUCTION") == "1"
appPublicURL = os.Getenv("APP_PUBLIC_URL")
awsBaseURL = os.Getenv("AWS_BASE_URL")
notifyBaseURL = os.Getenv("GOVUK_NOTIFY_BASE_URL")
evidenceBucketName = os.Getenv("UPLOADS_S3_BUCKET_NAME")
uidBaseURL = os.Getenv("UID_BASE_URL")
lpaStoreBaseURL = os.Getenv("LPA_STORE_BASE_URL")
lpaStoreSecretARN = os.Getenv("LPA_STORE_SECRET_ARN")
eventBusName = cmp.Or(os.Getenv("EVENT_BUS_NAME"), "default")
searchEndpoint = os.Getenv("SEARCH_ENDPOINT")
searchIndexName = cmp.Or(os.Getenv("SEARCH_INDEX_NAME"), "lpas")
searchIndexingEnabled = os.Getenv("SEARCH_INDEXING_DISABLED") != "1"
)

logger := slog.New(slog.NewJSONHandler(os.Stdout, nil).
WithAttrs([]slog.Attr{
slog.String("service_name", "opg-modernising-lpa/event-received"),
}))

result := map[string]any{}

cfg, err := config.LoadDefaultConfig(ctx)
if err != nil {
return result, fmt.Errorf("failed to load default config: %w", err)
}

if len(awsBaseURL) > 0 {
cfg.BaseEndpoint = aws.String(awsBaseURL)
}

dynamoClient, err := dynamo.NewClient(cfg, tableName)
if err != nil {
return result, fmt.Errorf("failed to create dynamodb client: %w", err)
Expand Down Expand Up @@ -158,6 +156,7 @@ func handler(ctx context.Context, event Event) (map[string]any, error) {
searchEndpoint: searchEndpoint,
searchIndexName: searchIndexName,
searchIndexingEnabled: searchIndexingEnabled,
httpClient: httpClient,
}

if event.SQSEvent != nil {
Expand Down Expand Up @@ -200,13 +199,72 @@ func handleCloudWatchEvent(ctx context.Context, factory *Factory, event *events.
return fmt.Errorf("unknown event received: %s", string(eJson))
}

logger.InfoContext(ctx, "handling event", slog.String("source", event.Source), slog.String("detailType", event.DetailType))
if err := handler.Handle(ctx, factory, event); err != nil {
return fmt.Errorf("%s: %w", event.DetailType, err)
}
logger.InfoContext(ctx, "successfully handled event")

return nil
}

func main() {
lambda.Start(handler)
ctx := context.Background()

httpClient = &http.Client{Timeout: 30 * time.Second}

logger = slog.New(telemetry.NewSlogHandler(slog.
NewJSONHandler(os.Stdout, &slog.HandlerOptions{
ReplaceAttr: func(_ []string, a slog.Attr) slog.Attr {
switch a.Value.Kind() {
case slog.KindAny:
switch v := a.Value.Any().(type) {
case *http.Request:
return slog.Group(a.Key,
slog.String("method", v.Method),
slog.String("uri", v.URL.String()))
}
}

return a
},
})).
WithAttrs([]slog.Attr{
slog.String("service_name", "opg-modernising-lpa/event-received"),
}))

var err error
cfg, err = config.LoadDefaultConfig(ctx)
if err != nil {
logger.ErrorContext(ctx, "failed to load default config", slog.Any("err", err))
return
}

if len(awsBaseURL) > 0 {
cfg.BaseEndpoint = aws.String(awsBaseURL)
}

var tp *trace.TracerProvider
if xrayEnabled {
tp, err = telemetry.SetupLambda(ctx)
if err != nil {
logger.WarnContext(ctx, "error creating tracer provider", slog.Any("err", err))
}
}

if tp != nil {
otelaws.AppendMiddlewares(&cfg.APIOptions)
telemetry.AppendMiddlewares(&cfg.APIOptions)
httpClient.Transport = otelhttp.NewTransport(httpClient.Transport)

defer func(ctx context.Context) {
if err := tp.Shutdown(ctx); err != nil {
logger.WarnContext(ctx, "error shutting down tracer provider", slog.Any("err", err))
}
}(ctx)

lambda.Start(otellambda.InstrumentHandler(handler, xrayconfig.WithRecommendedOptions(tp)...))
} else {
lambda.Start(handler)
}
}
63 changes: 32 additions & 31 deletions cmd/schedule-runner/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"go.opentelemetry.io/contrib/instrumentation/github.com/aws/aws-lambda-go/otellambda/xrayconfig"
"go.opentelemetry.io/contrib/instrumentation/github.com/aws/aws-sdk-go-v2/otelaws"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"google.golang.org/appengine/log"
"go.opentelemetry.io/otel/sdk/trace"
)

var (
Expand All @@ -41,15 +41,10 @@ var (

httpClient *http.Client
cfg aws.Config
logHandler slog.Handler
logger *slog.Logger
)

func handleRunSchedule(ctx context.Context) error {
logger := slog.New(logHandler.
WithAttrs([]slog.Attr{
slog.String("service_name", "opg-modernising-lpa/schedule-runner"),
}))

secretsClient, err := secrets.NewClient(cfg, time.Hour)
if err != nil {
return err
Expand Down Expand Up @@ -98,54 +93,60 @@ func handleRunSchedule(ctx context.Context) error {
func main() {
ctx := context.Background()

httpClient = &http.Client{Timeout: time.Second * 30}

logger = slog.New(telemetry.NewSlogHandler(slog.
NewJSONHandler(os.Stdout, &slog.HandlerOptions{
ReplaceAttr: func(_ []string, a slog.Attr) slog.Attr {
switch a.Value.Kind() {
case slog.KindAny:
switch v := a.Value.Any().(type) {
case *http.Request:
return slog.Group(a.Key,
slog.String("method", v.Method),
slog.String("uri", v.URL.String()))
}
}

return a
},
})).
WithAttrs([]slog.Attr{
slog.String("service_name", "opg-modernising-lpa/schedule-runner"),
}))

var err error
cfg, err = config.LoadDefaultConfig(ctx)
if err != nil {
log.Errorf(ctx, "failed to load default config: %v", err)
logger.ErrorContext(ctx, "failed to load default config", slog.Any("err", err))
return
}

httpClient = &http.Client{Timeout: time.Second * 30}

if len(awsBaseURL) > 0 {
cfg.BaseEndpoint = aws.String(awsBaseURL)
}

var tp *trace.TracerProvider
if xrayEnabled {
tp, err := telemetry.SetupLambda(ctx)
tp, err = telemetry.SetupLambda(ctx)
if err != nil {
fmt.Printf("error creating tracer provider: %v", err)
logger.WarnContext(ctx, "error creating tracer provider", slog.Any("err", err))
}
}

if tp != nil {
otelaws.AppendMiddlewares(&cfg.APIOptions)
telemetry.AppendMiddlewares(&cfg.APIOptions)
httpClient.Transport = otelhttp.NewTransport(httpClient.Transport)

defer func(ctx context.Context) {
if err := tp.Shutdown(ctx); err != nil {
fmt.Printf("error shutting down tracer provider: %v", err)
logger.WarnContext(ctx, "error shutting down tracer provider", slog.Any("err", err))
}
}(ctx)

logHandler = telemetry.NewSlogHandler(slog.
NewJSONHandler(os.Stdout, &slog.HandlerOptions{
ReplaceAttr: func(_ []string, a slog.Attr) slog.Attr {
switch a.Value.Kind() {
case slog.KindAny:
switch v := a.Value.Any().(type) {
case *http.Request:
return slog.Group(a.Key,
slog.String("method", v.Method),
slog.String("uri", v.URL.String()))
}
}

return a
},
}))

lambda.Start(otellambda.InstrumentHandler(handleRunSchedule, xrayconfig.WithRecommendedOptions(tp)...))
} else {
logHandler = slog.NewJSONHandler(os.Stdout, nil)
lambda.Start(handleRunSchedule)
}
}
10 changes: 10 additions & 0 deletions docker/event-received/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,12 @@ WORKDIR /app

COPY --from=build /app/event-received /var/task/event-received
COPY --link lang /var/task/lang
COPY --link ./docker/adot-collector/ /opt
COPY --link docker/aws-lambda-rie ./aws-lambda-rie

ENV AWS_LAMBDA_EXEC_WRAPPER=/opt/otel-handler
ENV OPENTELEMETRY_COLLECTOR_CONFIG_FILE="/opt/config/config.yaml"

ENTRYPOINT ["./event-received"]

FROM public.ecr.aws/lambda/provided:al2023.2024.10.14.12 AS production
Expand All @@ -30,5 +34,11 @@ RUN chmod +x /app/install_lambda_insights.sh \

COPY --from=build /app/event-received ./event-received
COPY --link lang ./lang
COPY --link ./docker/adot-collector/ /opt

RUN chmod 755 /opt/config/config.yaml

ENV AWS_LAMBDA_EXEC_WRAPPER=/opt/otel-handler
ENV OPENTELEMETRY_COLLECTOR_CONFIG_FILE="/opt/config/config.yaml"

ENTRYPOINT ["./event-received"]
6 changes: 5 additions & 1 deletion terraform/environment/region/modules/event_received/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ module "event_received" {
SEARCH_INDEXING_DISABLED = 1
EVENT_BUS_NAME = var.event_bus_name
JWT_KEY_SECRET_ARN = data.aws_secretsmanager_secret.lpa_store_jwt_key.arn
XRAY_ENABLED = 1
}
image_uri = "${var.lambda_function_image_ecr_url}:${var.lambda_function_image_tag}"
aws_iam_role = var.event_received_lambda_role
Expand Down Expand Up @@ -345,7 +346,10 @@ data "aws_iam_policy_document" "event_received" {
effect = "Allow"
actions = [
"xray:PutTraceSegments",
"xray:PutTelemetryRecords"
"xray:PutTelemetryRecords",
"xray:GetSamplingRules",
"xray:GetSamplingTargets",
"xray:GetSamplingStatisticSummaries",
]
resources = ["*"]
}
Expand Down

0 comments on commit c536d42

Please sign in to comment.