Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: Skip s3 upload if record's last-modified time matches DB value #921

Merged
merged 8 commits into from
Sep 27, 2024
4 changes: 2 additions & 2 deletions cmd/SplitGrantsGovXMLDB/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ func processRecords(ctx context.Context, s3svc *s3.Client, ddbsvc DynamoDBGetIte
// is compared with the last-modified date the record on-hand.
// An upload is initiated when the record on-hand has a last-modified date that is more recent
// than that of the extant item, or when no extant item exists.
func processRecord(ctx context.Context, s3svc S3ReadWriteObjectAPI, ddbsvc DynamoDBGetItemAPI, record grantRecord) error {
func processRecord(ctx context.Context, s3svc S3PutObjectAPI, ddbsvc DynamoDBGetItemAPI, record grantRecord) error {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a simplified interface since we don't need to perform read operations anymore.

logger := record.logWith(logger)

lastModified, err := record.lastModified()
Expand All @@ -254,7 +254,7 @@ func processRecord(ctx context.Context, s3svc S3ReadWriteObjectAPI, ddbsvc Dynam

isNew := false
if remoteLastModified != nil {
if remoteLastModified.After(lastModified) {
if !remoteLastModified.Before(lastModified) {
log.Debug(logger, "Skipping record upload because the extant record is up-to-date")
sendMetric("record.skipped", 1)
return nil
Expand Down
65 changes: 27 additions & 38 deletions cmd/SplitGrantsGovXMLDB/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,12 @@ import (
goenv "github.com/Netflix/go-env"
"github.com/aws/aws-lambda-go/events"
"github.com/aws/aws-sdk-go-v2/aws"
awsTransport "github.com/aws/aws-sdk-go-v2/aws/transport/http"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue"
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
ddbtypes "github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
"github.com/aws/aws-sdk-go-v2/service/s3"
smithyhttp "github.com/aws/smithy-go/transport/http"
"github.com/go-kit/log"
"github.com/hashicorp/go-multierror"
"github.com/johannesboyne/gofakes3"
Expand Down Expand Up @@ -53,9 +51,8 @@ func (m mockDDBClientGetItemCollection) NewGetItemClient(t *testing.T) mockDynam
err := attributevalue.UnmarshalMap(params.Key, &getItemKey)
require.NoError(t, err, "Failed to extract grant_id value from DynamoDB GetItem key")
output := dynamodb.GetItemOutput{Item: nil}
targetGrantId, exists := getItemKey["grant_id"]
var rvErr error
if exists {
if targetGrantId, exists := getItemKey["grant_id"]; exists {
Comment on lines -56 to +55
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just a change in variable grouping that I noticed as a readability issue; it has nothing to do with the rest of the PR.

for _, rv := range m {
if rv.GrantId == targetGrantId {
output.Item = map[string]ddbtypes.AttributeValue{
Expand Down Expand Up @@ -602,54 +599,46 @@ func TestProcessRecord(t *testing.T) {

t.Run("Error getting item from DynamoDB", func(t *testing.T) {
setupLambdaEnvForTesting(t)
c := mockS3ReadwriteObjectAPI{
mockHeadObjectAPI(
func(ctx context.Context, params *s3.HeadObjectInput, optFns ...func(*s3.Options)) (*s3.HeadObjectOutput, error) {
t.Helper()
return &s3.HeadObjectOutput{}, fmt.Errorf("server error")
},
),
mockGetObjectAPI(nil),
mockPutObjectAPI(nil),
}
s3client := mockPutObjectAPI(func(context.Context, *s3.PutObjectInput, ...func(*s3.Options)) (*s3.PutObjectOutput, error) {
t.Helper()
require.Fail(t, "PutObject called unexpectedly")
return nil, nil
})
ddbLookups := make(mockDDBClientGetItemCollection, 0)
ddbLookups = append(ddbLookups, mockDDBClientGetItemReturnValue{
GrantId: string(testOpportunity.OpportunityID),
ItemLastModified: string(testOpportunity.LastUpdatedDate),
GetItemErr: errors.New("Some issue with DynamoDB"),
})
err := processRecord(context.TODO(), c, ddbLookups.NewGetItemClient(t), testOpportunity)
err := processRecord(context.TODO(), s3client, ddbLookups.NewGetItemClient(t), testOpportunity)
Comment on lines -605 to +613
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change corresponds to the simplified PutObjectAPI argument in the processRecord() signature.

assert.ErrorContains(t, err, "Error determining last modified time for remote record")
})

t.Run("Error uploading to S3", func(t *testing.T) {
setupLambdaEnvForTesting(t)
s3Client := mockS3ReadwriteObjectAPI{
mockHeadObjectAPI(
func(context.Context, *s3.HeadObjectInput, ...func(*s3.Options)) (*s3.HeadObjectOutput, error) {
t.Helper()
return nil, &awsTransport.ResponseError{
ResponseError: &smithyhttp.ResponseError{Response: &smithyhttp.Response{
Response: &http.Response{StatusCode: 404},
}},
}
},
),
mockGetObjectAPI(func(context.Context, *s3.GetObjectInput, ...func(*s3.Options)) (*s3.GetObjectOutput, error) {
t.Helper()
require.Fail(t, "GetObject called unexpectedly")
return nil, nil
}),
mockPutObjectAPI(func(context.Context, *s3.PutObjectInput, ...func(*s3.Options)) (*s3.PutObjectOutput, error) {
t.Helper()
return nil, fmt.Errorf("some PutObject error")
}),
}
fmt.Printf("%T", s3Client)
s3Client := mockPutObjectAPI(func(context.Context, *s3.PutObjectInput, ...func(*s3.Options)) (*s3.PutObjectOutput, error) {
t.Helper()
return nil, fmt.Errorf("some PutObject error")
})
Comment on lines -627 to +622
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change corresponds to the simplified PutObjectAPI argument in the processRecord() signature.

ddb := mockDDBClientGetItemCollection([]mockDDBClientGetItemReturnValue{
{GrantId: string(testOpportunity.OpportunityID), ItemLastModified: string(testOpportunity.LastUpdatedDate)},
// Do not provide a matching record, ensuring that processRecord() will attempt to upload
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixes a test that started to fail once the date-time equality check in processRecord() was updated.

})
err := processRecord(context.TODO(), s3Client, ddb.NewGetItemClient(t), testOpportunity)
assert.ErrorContains(t, err, "Error uploading prepared grant record to S3")
})

t.Run("matching LastUpdatedDate skips upload to S3", func(t *testing.T) {
setupLambdaEnvForTesting(t)
s3Client := mockPutObjectAPI(func(context.Context, *s3.PutObjectInput, ...func(*s3.Options)) (*s3.PutObjectOutput, error) {
t.Helper()
require.Fail(t, "PutObject called unexpectedly")
return nil, fmt.Errorf("PutObject called unexpectedly")
})
ddb := mockDDBClientGetItemCollection([]mockDDBClientGetItemReturnValue{{
GrantId: string(testOpportunity.OpportunityID),
ItemLastModified: string(testOpportunity.LastUpdatedDate),
}})
err := processRecord(context.TODO(), s3Client, ddb.NewGetItemClient(t), testOpportunity)
assert.NoError(t, err)
})
Comment on lines +630 to +643
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Covers the modified date-time equality check in processRecord().

}
12 changes: 0 additions & 12 deletions cmd/SplitGrantsGovXMLDB/s3_test.go
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some of the mock interfaces and structs in this file are no longer needed since processRecord() only performs PutObject operations.

Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,6 @@ import (
"github.com/stretchr/testify/assert"
)

type mockGetObjectAPI func(ctx context.Context, params *s3.GetObjectInput, optFns ...func(*s3.Options)) (*s3.GetObjectOutput, error)

func (m mockGetObjectAPI) GetObject(ctx context.Context, params *s3.GetObjectInput, optFns ...func(*s3.Options)) (*s3.GetObjectOutput, error) {
return m(ctx, params, optFns...)
}

type mockHeadObjectAPI func(ctx context.Context, params *s3.HeadObjectInput, optFns ...func(*s3.Options)) (*s3.HeadObjectOutput, error)

func (m mockHeadObjectAPI) HeadObject(ctx context.Context, params *s3.HeadObjectInput, optFns ...func(*s3.Options)) (*s3.HeadObjectOutput, error) {
Expand All @@ -34,12 +28,6 @@ func (m mockPutObjectAPI) PutObject(ctx context.Context, params *s3.PutObjectInp
return m(ctx, params, optFns...)
}

type mockS3ReadwriteObjectAPI struct {
mockHeadObjectAPI
mockGetObjectAPI
mockPutObjectAPI
}

func createErrorResponseMap() map[int]*awsTransport.ResponseError {
errorResponses := map[int]*awsTransport.ResponseError{}
for _, statusCode := range []int{404, 500} {
Expand Down
4 changes: 2 additions & 2 deletions cmd/SplitGrantsGovXMLDB/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ func (f forecast) toXML() ([]byte, error) {
return xml.Marshal(grantsgov.OpportunityForecastDetail_1_0(f))
}

func (o forecast) dynamoDBItemKey() map[string]ddbtypes.AttributeValue {
func (f forecast) dynamoDBItemKey() map[string]ddbtypes.AttributeValue {
return map[string]ddbtypes.AttributeValue{
"grant_id": &ddbtypes.AttributeValueMemberS{Value: string(o.OpportunityID)},
"grant_id": &ddbtypes.AttributeValueMemberS{Value: string(f.OpportunityID)},
Comment on lines +76 to +78
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just fixing a variable name that I noticed; likely due to a copy/paste flub.

}
}
43 changes: 35 additions & 8 deletions terraform/datadog_dashboard.tf
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that the dashboard still counts metrics named according to the old (pre-#848) opportunities terminology so that metrics emitted before that change are still represented in the dashboard. We simply add the sums of the two corresponding metrics together in the various graphs.

Original file line number Diff line number Diff line change
Expand Up @@ -2071,52 +2071,73 @@ resource "datadog_dashboard" "service_dashboard" {
display_type = "bars"

formula {
formula_expression = "records_skipped"
formula_expression = "opportunities_skipped + records_skipped"
alias = "Skipped"
style {
palette = "cool"
palette_index = 4
}
}
query {
// Legacy metric (before Forecasted grants were in the pipeline)
metric_query {
name = "records_skipped"
name = "opportunities_skipped"
query = "sum:grants_ingest.SplitGrantsGovXMLDB.opportunity.skipped{$env,$service,$version}.as_count()"
}
}
query {
metric_query {
name = "records_skipped"
query = "sum:grants_ingest.SplitGrantsGovXMLDB.record.skipped{$env,$service,$version}.as_count()"
}
}

formula {
formula_expression = "records_updated"
formula_expression = "opportunities_updated + records_updated"
alias = "Updated"
style {
palette = "purple"
palette_index = 4
}
}
query {
// Legacy metric (before Forecasted grants were in the pipeline)
metric_query {
name = "records_updated"
name = "opportunities_updated"
query = "sum:grants_ingest.SplitGrantsGovXMLDB.opportunity.updated{$env,$service,$version}.as_count()"
}
}
query {
metric_query {
name = "records_updated"
query = "sum:grants_ingest.SplitGrantsGovXMLDB.record.updated{$env,$service,$version}.as_count()"
}
}

formula {
formula_expression = "records_created"
formula_expression = "opportunities_created + records_created"
alias = "Created"
style {
palette = "classic"
palette_index = 4
}
}
query {
// Legacy metric (before Forecasted grants were in the pipeline)
metric_query {
name = "records_created"
name = "opportunities_created"
query = "sum:grants_ingest.SplitGrantsGovXMLDB.opportunity.created{$env,$service,$version}.as_count()"
}
}
query {
metric_query {
name = "records_created"
query = "sum:grants_ingest.SplitGrantsGovXMLDB.record.created{$env,$service,$version}.as_count()"
}
}

formula {
formula_expression = "records_failed"
formula_expression = "opportunities_failed + records_failed"
alias = "Failed"
style {
palette = "warm"
Expand All @@ -2125,10 +2146,16 @@ resource "datadog_dashboard" "service_dashboard" {
}
query {
metric_query {
name = "records_failed"
name = "opportunities_failed"
query = "sum:grants_ingest.SplitGrantsGovXMLDB.opportunity.failed{$env,$service,$version}.as_count()"
}
}
query {
metric_query {
name = "records_failed"
query = "sum:grants_ingest.SplitGrantsGovXMLDB.record.failed{$env,$service,$version}.as_count()"
}
}
}
}
widget_layout {
Expand Down
24 changes: 12 additions & 12 deletions terraform/staging.tfvars
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updates the Datadog metric definitions according to the new terminology.

Original file line number Diff line number Diff line change
Expand Up @@ -133,27 +133,27 @@ datadog_metrics_metadata = {
unit = "error"
}

"SplitGrantsGovXMLDB.opportunity.created" = {
short_name = "New grant opportunities"
description = "Count of new grant opportunity records created from Grants.gov data during invocation."
"SplitGrantsGovXMLDB.record.created" = {
short_name = "New grant records"
description = "Count of new grant records created from Grants.gov data during invocation."
unit = "record"
}

"SplitGrantsGovXMLDB.opportunity.updated" = {
short_name = "Updated grant opportunities"
description = "Count of modified grant opportunity records updated from Grants.gov data during invocation."
"SplitGrantsGovXMLDB.record.updated" = {
short_name = "Updated grant records"
description = "Count of modified grant records updated from Grants.gov data during invocation."
unit = "record"
}

"SplitGrantsGovXMLDB.opportunity.skipped" = {
short_name = "Skipped grant opportunities"
description = "Count of unchanged grant opportunity records from Grants.gov data skipped during invocation."
"SplitGrantsGovXMLDB.record.skipped" = {
short_name = "Skipped grant records"
description = "Count of unchanged grant records from Grants.gov data skipped during invocation."
unit = "record"
}

"SplitGrantsGovXMLDB.opportunity.failed" = {
short_name = "Failed grant opportunities"
description = "Count of grant opportunity records from Grants.gov data that failed to process during invocation."
"SplitGrantsGovXMLDB.record.failed" = {
short_name = "Failed grant records"
description = "Count of grant records from Grants.gov data that failed to process during invocation."
unit = "record"
}
}
Loading