-
Notifications
You must be signed in to change notification settings - Fork 16
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add AWS SDK clients for CFN Custom Resource support (#1805)
This change adds the necessary AWS SDK clients (S3 and Lambda) for supporting CFN Custom Resources in aws-native. Relates to pulumi/pulumi-cdk#109
- Loading branch information
1 parent
b590804
commit eafde1c
Showing
8 changed files
with
869 additions
and
28 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,93 @@ | ||
// Copyright 2016-2024, Pulumi Corporation. | ||
|
||
package client | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"time" | ||
|
||
"github.com/aws/aws-sdk-go-v2/aws" | ||
"github.com/aws/aws-sdk-go-v2/service/lambda" | ||
lambdaTypes "github.com/aws/aws-sdk-go-v2/service/lambda/types" | ||
"github.com/pkg/errors" | ||
) | ||
|
||
const DefaultFunctionActivationTimeout = 5 * time.Minute | ||
|
||
//go:generate mockgen -package client -source lambda.go -destination mock_lambda.go LambdaClient,LambdaApi | ||
type LambdaClient interface { | ||
// InvokeAsync invokes the given Lambda function with the 'Event' invocation type. This means that the function | ||
// will be invoked asynchronously and no response will be returned. If the function is not ready to be invoked yet, | ||
// this method will wait for the function to become active before invoking it. | ||
InvokeAsync(ctx context.Context, functionName string, payload []byte) error | ||
} | ||
|
||
type LambdaApi interface { | ||
Invoke(ctx context.Context, params *lambda.InvokeInput, optFns ...func(*lambda.Options)) (*lambda.InvokeOutput, error) | ||
GetFunction(context.Context, *lambda.GetFunctionInput, ...func(*lambda.Options)) (*lambda.GetFunctionOutput, error) | ||
} | ||
|
||
type lambdaClientImpl struct { | ||
api LambdaApi | ||
functionActivationTimeout time.Duration | ||
} | ||
|
||
func NewLambdaClient(api LambdaApi) LambdaClient { | ||
return &lambdaClientImpl{ | ||
api: api, | ||
functionActivationTimeout: DefaultFunctionActivationTimeout, | ||
} | ||
} | ||
|
||
func (c *lambdaClientImpl) InvokeAsync(ctx context.Context, functionName string, payload []byte) error { | ||
input := lambda.InvokeInput{ | ||
FunctionName: aws.String(functionName), | ||
Payload: payload, | ||
// async invocation | ||
InvocationType: lambdaTypes.InvocationTypeEvent, | ||
} | ||
|
||
// fire off an initial invoke. If the function is not ready, we need to wait for it to become ready. | ||
// this initial invoke will trigger the AWS Lambda service to start the function transition process. | ||
invokeResp, err := c.api.Invoke(ctx, &input) | ||
|
||
if err != nil { | ||
// Lambda functions can be in a state where they are not ready to be invoked yet. | ||
// If we get this error, we need to wait for the function to become active | ||
var notReadyErr *lambdaTypes.ResourceNotReadyException | ||
if errors.As(err, ¬ReadyErr) { | ||
err := c.waitForFunctionActive(ctx, functionName) | ||
if err != nil { | ||
return fmt.Errorf("failed to wait for function to become active: %w", err) | ||
} | ||
invokeResp, err = c.api.Invoke(ctx, &input) | ||
if err != nil { | ||
return err | ||
} | ||
} else { | ||
return err | ||
} | ||
} | ||
|
||
if invokeResp.StatusCode != 202 { | ||
return fmt.Errorf("lambda invocation failed with status code %d", invokeResp.StatusCode) | ||
} | ||
|
||
return nil | ||
} | ||
|
||
// waitForFunctionActive waits for the function to be in the active state. If the function is not ready | ||
// after 5 minutes, it will return an error. | ||
func (c *lambdaClientImpl) waitForFunctionActive(ctx context.Context, functionName string) error { | ||
err := lambda.NewFunctionActiveV2Waiter(c.api, func(o *lambda.FunctionActiveV2WaiterOptions) { | ||
// We already aggressively retry SDK errors with plenty retry attempts and | ||
// throttling exceptions will be taken care of by the SDK | ||
o.MinDelay = time.Second | ||
o.MaxDelay = 5 * time.Second | ||
}).Wait(ctx, &lambda.GetFunctionInput{ | ||
FunctionName: aws.String(functionName), | ||
}, c.functionActivationTimeout) | ||
|
||
return err | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,150 @@ | ||
package client | ||
|
||
import ( | ||
"context" | ||
"testing" | ||
"time" | ||
|
||
"github.com/aws/aws-sdk-go-v2/aws" | ||
"github.com/aws/aws-sdk-go-v2/service/lambda" | ||
lambdaTypes "github.com/aws/aws-sdk-go-v2/service/lambda/types" | ||
"github.com/stretchr/testify/assert" | ||
gomock "go.uber.org/mock/gomock" | ||
) | ||
|
||
func lambdaSetup(t *testing.T) (*gomock.Controller, *lambdaClientImpl, *MockLambdaApi) { | ||
ctrl := gomock.NewController(t) | ||
mockLambdaApi := NewMockLambdaApi(ctrl) | ||
client := &lambdaClientImpl{ | ||
api: mockLambdaApi, | ||
functionActivationTimeout: 2 * time.Second, | ||
} | ||
return ctrl, client, mockLambdaApi | ||
} | ||
|
||
func TestInvokeAsync_SuccessfulInvocation(t *testing.T) { | ||
t.Parallel() | ||
ctrl, client, mockLambdaApi := lambdaSetup(t) | ||
defer ctrl.Finish() | ||
|
||
ctx := context.Background() | ||
functionName := "test-function" | ||
payload := []byte(`{"key": "value"}`) | ||
|
||
mockLambdaApi.EXPECT().Invoke(ctx, gomock.Any()).Return(&lambda.InvokeOutput{ | ||
StatusCode: 202, | ||
}, nil) | ||
|
||
err := client.InvokeAsync(ctx, functionName, payload) | ||
assert.NoError(t, err) | ||
} | ||
|
||
func TestInvokeAsync_FunctionNotReadyInitiallyButBecomesReady(t *testing.T) { | ||
t.Parallel() | ||
ctrl, client, mockLambdaApi := lambdaSetup(t) | ||
defer ctrl.Finish() | ||
|
||
ctx := context.Background() | ||
functionName := "test-function" | ||
payload := []byte(`{"key": "value"}`) | ||
|
||
mockLambdaApi.EXPECT().Invoke(ctx, gomock.Any()).Return(nil, &lambdaTypes.ResourceNotReadyException{}) | ||
|
||
mockLambdaApi.EXPECT().GetFunction(gomock.Any(), &lambda.GetFunctionInput{ | ||
FunctionName: aws.String(functionName), | ||
}, gomock.Any()).Return(&lambda.GetFunctionOutput{ | ||
Configuration: &lambdaTypes.FunctionConfiguration{ | ||
State: lambdaTypes.StateInactive, | ||
}, | ||
}, nil) | ||
mockLambdaApi.EXPECT().GetFunction(gomock.Any(), &lambda.GetFunctionInput{ | ||
FunctionName: aws.String(functionName), | ||
}, gomock.Any()).Return(&lambda.GetFunctionOutput{ | ||
Configuration: &lambdaTypes.FunctionConfiguration{ | ||
State: lambdaTypes.StateActive, | ||
}, | ||
}, nil) | ||
|
||
mockLambdaApi.EXPECT().Invoke(ctx, gomock.Any()).Return(&lambda.InvokeOutput{ | ||
StatusCode: 202, | ||
}, nil) | ||
|
||
err := client.InvokeAsync(ctx, functionName, payload) | ||
assert.NoError(t, err) | ||
} | ||
|
||
func TestInvokeAsync_FunctionNotReadyAndFailsToBecomeReady(t *testing.T) { | ||
t.Parallel() | ||
ctrl, client, mockLambdaApi := lambdaSetup(t) | ||
defer ctrl.Finish() | ||
|
||
ctx := context.Background() | ||
functionName := "test-function" | ||
payload := []byte(`{"key": "value"}`) | ||
|
||
mockLambdaApi.EXPECT().Invoke(ctx, gomock.Any()).Return(nil, &lambdaTypes.ResourceNotReadyException{}) | ||
|
||
mockLambdaApi.EXPECT().GetFunction(gomock.Any(), &lambda.GetFunctionInput{ | ||
FunctionName: aws.String(functionName), | ||
}, gomock.Any()).Return(&lambda.GetFunctionOutput{ | ||
Configuration: &lambdaTypes.FunctionConfiguration{ | ||
State: lambdaTypes.StateInactive, | ||
}, | ||
}, nil).AnyTimes() | ||
|
||
err := client.InvokeAsync(ctx, functionName, payload) | ||
assert.Error(t, err) | ||
assert.Contains(t, err.Error(), "failed to wait for function to become active") | ||
} | ||
|
||
func TestInvokeAsync_InvocationFailsWithNon202StatusCode(t *testing.T) { | ||
t.Parallel() | ||
ctrl, client, mockLambdaApi := lambdaSetup(t) | ||
defer ctrl.Finish() | ||
|
||
ctx := context.Background() | ||
functionName := "test-function" | ||
payload := []byte(`{"key": "value"}`) | ||
|
||
mockLambdaApi.EXPECT().Invoke(ctx, gomock.Any()).Return(&lambda.InvokeOutput{ | ||
StatusCode: 400, | ||
}, nil) | ||
|
||
err := client.InvokeAsync(ctx, functionName, payload) | ||
assert.Error(t, err) | ||
assert.Contains(t, err.Error(), "lambda invocation failed with status code 400") | ||
} | ||
|
||
func TestInvokeAsync_FunctionNotReadyInitiallyButBecomesReadyAndThenFails(t *testing.T) { | ||
t.Parallel() | ||
ctrl, client, mockLambdaApi := lambdaSetup(t) | ||
defer ctrl.Finish() | ||
|
||
ctx := context.Background() | ||
functionName := "test-function" | ||
payload := []byte(`{"key": "value"}`) | ||
|
||
mockLambdaApi.EXPECT().Invoke(ctx, gomock.Any()).Return(nil, &lambdaTypes.ResourceNotReadyException{}) | ||
mockLambdaApi.EXPECT().Invoke(ctx, gomock.Any()).Return(&lambda.InvokeOutput{ | ||
StatusCode: 400, | ||
}, nil) | ||
|
||
mockLambdaApi.EXPECT().GetFunction(gomock.Any(), &lambda.GetFunctionInput{ | ||
FunctionName: aws.String(functionName), | ||
}, gomock.Any()).Return(&lambda.GetFunctionOutput{ | ||
Configuration: &lambdaTypes.FunctionConfiguration{ | ||
State: lambdaTypes.StateInactive, | ||
}, | ||
}, nil) | ||
mockLambdaApi.EXPECT().GetFunction(gomock.Any(), &lambda.GetFunctionInput{ | ||
FunctionName: aws.String(functionName), | ||
}, gomock.Any()).Return(&lambda.GetFunctionOutput{ | ||
Configuration: &lambdaTypes.FunctionConfiguration{ | ||
State: lambdaTypes.StateActive, | ||
}, | ||
}, nil) | ||
|
||
err := client.InvokeAsync(ctx, functionName, payload) | ||
assert.Error(t, err) | ||
assert.Contains(t, err.Error(), "lambda invocation failed with status code 400") | ||
} |
Oops, something went wrong.