Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Additional max-split configurations for SplitGrantsGovXMLDB #934

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cli/grants-ingest/ffisImport/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (cmd *Cmd) Run(app *kong.Kong, logger *log.Logger) error {
if !cmd.DryRun {
return err
}
app.Errorf(err.Error())
app.Errorf("%s", err.Error())
}

log.Debug(*logger, "Mapping files in directory to S3 keys...", "directory", cmd.SourceDirectory)
Expand Down
27 changes: 18 additions & 9 deletions cmd/SplitGrantsGovXMLDB/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/aws/aws-lambda-go/events"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/go-kit/log/level"
"github.com/hashicorp/go-multierror"
"github.com/usdigitalresponse/grants-ingest/internal/log"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
Expand Down Expand Up @@ -125,7 +124,8 @@ func readRecords(ctx context.Context, r io.Reader, ch chan<- grantRecord) error
span, ctx := tracer.StartSpanFromContext(ctx, "read.xml")

// Count records sent to ch
countSentRecords := 0
countSentOpportunityRecords := 0
countSentForecastRecords := 0

d := xml.NewDecoder(r)
for {
Expand All @@ -136,8 +136,13 @@ func readRecords(ctx context.Context, r io.Reader, ch chan<- grantRecord) error
return err
}

// End early if we have reached any configured limit on the number of records sent to ch
if env.MaxSplitRecords > -1 && countSentRecords >= env.MaxSplitRecords {
// End early if a configured limit on the number of records sent to ch is reached
// OR if both record types have configured limits and both have been reached
if (env.MaxSplitRecords > -1 &&
countSentOpportunityRecords+countSentForecastRecords >= env.MaxSplitRecords) ||
(env.MaxSplitForecastRecords > -1 && env.MaxSplitOpportunityRecords > -1 &&
countSentForecastRecords >= env.MaxSplitForecastRecords &&
countSentOpportunityRecords >= env.MaxSplitOpportunityRecords) {
break
}

Expand All @@ -147,7 +152,7 @@ func readRecords(ctx context.Context, r io.Reader, ch chan<- grantRecord) error
// EOF means that we're done reading
break
}
level.Error(logger).Log("msg", "Error reading XML token", "error", err)
log.Error(logger, "Error reading XML token", err)
span.Finish(tracer.WithError(err))
return err
}
Expand All @@ -158,14 +163,18 @@ func readRecords(ctx context.Context, r io.Reader, ch chan<- grantRecord) error
if se.Name.Local == GRANT_OPPORTUNITY_XML_NAME {
var o opportunity
if err = d.DecodeElement(&o, &se); err == nil {
countSentRecords++
ch <- &o
if env.MaxSplitOpportunityRecords < 0 || countSentOpportunityRecords < env.MaxSplitOpportunityRecords {
ch <- &o
countSentOpportunityRecords++
}
}
} else if se.Name.Local == GRANT_FORECAST_XML_NAME && env.IsForecastedGrantsEnabled {
var f forecast
if err = d.DecodeElement(&f, &se); err == nil {
countSentRecords++
ch <- &f
if env.MaxSplitForecastRecords < 0 || countSentForecastRecords < env.MaxSplitForecastRecords {
ch <- &f
countSentForecastRecords++
}
}
}

Expand Down
76 changes: 75 additions & 1 deletion cmd/SplitGrantsGovXMLDB/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"io"
"net/http"
"net/http/httptest"
"reflect"
"strings"
"testing"
"text/template"
"time"
Expand Down Expand Up @@ -516,7 +518,6 @@ func TestLambdaInvocationScenarios(t *testing.T) {
Body: bytes.NewReader(sourceData.Bytes()),
})
require.NoError(t, err)
// err = handleS3Event(context.TODO(), s3client, newMockDDBClient(t, mockDDBGetItemRVLookup{}), events.S3Event{
err = handleS3Event(context.TODO(), s3client, make(mockDDBClientGetItemCollection, 0).NewGetItemClient(t), events.S3Event{
Records: []events.S3EventRecord{
{S3: events.S3Entity{
Expand Down Expand Up @@ -580,6 +581,7 @@ func (r *MockReader) Read(p []byte) (int, error) {
}

func TestReadRecords(t *testing.T) {
setupLambdaEnvForTesting(t)
t.Run("Context cancelled between reads", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.TODO())
err := readRecords(ctx, &MockReader{func(p []byte) (int, error) {
Expand All @@ -588,6 +590,78 @@ func TestReadRecords(t *testing.T) {
}}, make(chan<- grantRecord, 10))
assert.ErrorIs(t, err, context.Canceled)
})

t.Run("max record limits", func(t *testing.T) {
for _, tt := range []struct {
name string
maxSplitRecords, maxSplitOpportunityRecords, maxSplitForecastRecords int
expOpportunityRecords, expForecastRecords int
}{
{
"no limits processes all records",
-1, -1, -1,
10, 10,
},
{
"opportunity limit does not limit forecasts",
-1, 2, -1,
2, 10,
},
{
"forecast limit does not limit opportunities",
-1, -1, 2,
10, 2,
},
{
"hard limit takes precedent over no type limits",
2, -1, -1,
2, 0,
},
{
"hard limit takes precedent over type limits",
2, 3, 5,
2, 0,
},
{
"mix of limits",
5, 3, -1,
3, 2,
},
} {
t.Run(tt.name, func(t *testing.T) {
env.MaxSplitRecords = tt.maxSplitRecords
env.MaxSplitOpportunityRecords = tt.maxSplitOpportunityRecords
env.MaxSplitForecastRecords = tt.maxSplitForecastRecords
env.IsForecastedGrantsEnabled = true

xmlData := "<Grants>\n" +
// Content of records doesn't matter since we're just looking at the tag
strings.Repeat("<OpportunitySynopsisDetail_1_0></OpportunitySynopsisDetail_1_0>\n", 10) +
strings.Repeat("<OpportunityForecastDetail_1_0></OpportunityForecastDetail_1_0>\n", 10) +
"</Grants>"
ch := make(chan grantRecord, 20)
require.NoError(t, readRecords(context.TODO(), strings.NewReader(xmlData), ch))
close(ch)
var countSentOpportunityRecords, countSentForecastRecords int
for rec := range ch {
switch reflect.Indirect(reflect.ValueOf(rec)).Type().Name() {
case "opportunity":
countSentOpportunityRecords++
case "forecast":
countSentForecastRecords++
default:
require.Fail(t,
"Unknown grantRecord type sent to channel during test setup",
"type %T unrecognized", rec)
}
}
assert.Equalf(t, tt.expOpportunityRecords, countSentOpportunityRecords,
"Unexpected number of opportunity records sent to channel")
assert.Equalf(t, tt.expForecastRecords, countSentForecastRecords,
"Unexpected number of forecast records sent to channel")
})
}
})
}

func TestProcessRecord(t *testing.T) {
Expand Down
20 changes: 11 additions & 9 deletions cmd/SplitGrantsGovXMLDB/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,17 @@ import (
)

type Environment struct {
LogLevel string `env:"LOG_LEVEL,default=INFO"`
DownloadChunkLimit int64 `env:"DOWNLOAD_CHUNK_LIMIT,default=10"`
DestinationBucket string `env:"GRANTS_PREPARED_DATA_BUCKET_NAME,required=true"`
DynamoDBTableName string `env:"GRANTS_PREPARED_DATA_TABLE_NAME,required=true"`
MaxConcurrentUploads int `env:"MAX_CONCURRENT_UPLOADS,default=1"`
MaxSplitRecords int `env:"MAX_SPLIT_RECORDS,default=-1"`
UsePathStyleS3Opt bool `env:"S3_USE_PATH_STYLE,default=false"`
IsForecastedGrantsEnabled bool `env:"IS_FORECASTED_GRANTS_ENABLED,default=false"`
Extras goenv.EnvSet
LogLevel string `env:"LOG_LEVEL,default=INFO"`
DownloadChunkLimit int64 `env:"DOWNLOAD_CHUNK_LIMIT,default=10"`
DestinationBucket string `env:"GRANTS_PREPARED_DATA_BUCKET_NAME,required=true"`
DynamoDBTableName string `env:"GRANTS_PREPARED_DATA_TABLE_NAME,required=true"`
MaxConcurrentUploads int `env:"MAX_CONCURRENT_UPLOADS,default=1"`
UsePathStyleS3Opt bool `env:"S3_USE_PATH_STYLE,default=false"`
IsForecastedGrantsEnabled bool `env:"IS_FORECASTED_GRANTS_ENABLED,default=false"`
MaxSplitRecords int `env:"MAX_SPLIT_RECORDS,default=-1"` // Hard limit of records to process, regardless of type. -1 for no limit.
MaxSplitOpportunityRecords int `env:"MAX_SPLIT_OPPORTUNITY_RECORDS,default=-1"` // Limit opportunity-type records to process. -1 for no limit.
MaxSplitForecastRecords int `env:"MAX_SPLIT_FORECAST_RECORDS,default=-1"` // Limit forecast-type records to process. -1 for no limit.
Extras goenv.EnvSet
}

var (
Expand Down
34 changes: 18 additions & 16 deletions terraform/local.tfvars
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@
namespace = "grants-ingest"
environment = "sandbox"
version_identifier = "dev"
permissions_boundary_policy_name = ""
datadog_enabled = false
datadog_dashboards_enabled = false
datadog_lambda_extension_version = "38"
lambda_binaries_autobuild = true
lambda_default_log_retention_in_days = 7
lambda_default_log_level = "DEBUG"
eventbridge_scheduler_enabled = false
ssm_deployment_parameters_path_prefix = "/grants-ingest/local"
dynamodb_contributor_insights_enabled = false
ffis_ingest_email_address = "[email protected]"
max_split_grantsgov_records = 10
ses_active_receipt_rule_set_enabled = false
namespace = "grants-ingest"
environment = "sandbox"
version_identifier = "dev"
permissions_boundary_policy_name = ""
datadog_enabled = false
datadog_dashboards_enabled = false
datadog_lambda_extension_version = "38"
lambda_binaries_autobuild = true
lambda_default_log_retention_in_days = 7
lambda_default_log_level = "DEBUG"
eventbridge_scheduler_enabled = false
ssm_deployment_parameters_path_prefix = "/grants-ingest/local"
dynamodb_contributor_insights_enabled = false
ffis_ingest_email_address = "[email protected]"
is_forecasted_grants_enabled = true
max_split_grantsgov_opportunity_records = 10
max_split_grantsgov_forecast_records = 10
ses_active_receipt_rule_set_enabled = false

additional_lambda_environment_variables = {
S3_USE_PATH_STYLE = "true"
Expand Down
3 changes: 3 additions & 0 deletions terraform/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -520,7 +520,10 @@ module "SplitGrantsGovXMLDB" {
grants_prepared_data_bucket_name = module.grants_prepared_data_bucket.bucket_id
grants_prepared_dynamodb_table_name = module.grants_prepared_dynamodb_table.table_name
grants_prepared_dynamodb_table_arn = module.grants_prepared_dynamodb_table.table_arn
is_forecasted_grants_enabled = var.is_forecasted_grants_enabled
max_split_records = var.max_split_grantsgov_records
max_split_opportunity_records = var.max_split_grantsgov_opportunity_records
max_split_forecast_records = var.max_split_grantsgov_forecast_records
}

module "ReceiveFFISEmail" {
Expand Down
2 changes: 2 additions & 0 deletions terraform/modules/SplitGrantsGovXMLDB/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ module "lambda_function" {
LOG_LEVEL = var.log_level
MAX_CONCURRENT_UPLOADS = "10"
MAX_SPLIT_RECORDS = tostring(var.max_split_records)
MAX_SPLIT_OPPORTUNITY_RECORDS = tostring(var.max_split_opportunity_records)
MAX_SPLIT_FORECAST_RECORDS = tostring(var.max_split_forecast_records)
IS_FORECASTED_GRANTS_ENABLED = var.is_forecasted_grants_enabled
})

Expand Down
14 changes: 13 additions & 1 deletion terraform/modules/SplitGrantsGovXMLDB/variables.tf
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,19 @@ variable "is_forecasted_grants_enabled" {
}

variable "max_split_records" {
description = "Optional limit (i.e. for testing) on the number of records that the handler will process during a single invocation."
description = "Optional limit (i.e. for testing) on the number of records that the handler will process during a single invocation. This setting is a hard cap on top of opportunity- and forecast-specific limits."
type = number
default = -1
}

variable "max_split_opportunity_records" {
description = "Optional limit (i.e. for testing) on the number of opportunity records that the handler will process during a single invocation."
type = number
default = -1
}

variable "max_split_forecast_records" {
description = "Optional limit (i.e. for testing) on the number of opportunity records that the handler will process during a single invocation."
type = number
default = -1
}
20 changes: 19 additions & 1 deletion terraform/variables.tf
Original file line number Diff line number Diff line change
Expand Up @@ -226,8 +226,26 @@ variable "ses_active_receipt_rule_set_enabled" {
default = true
}

variable "is_forecasted_grants_enabled" {
description = "When true, enables processing of forecasted grant records from Grants.gov."
type = bool
default = false
}

variable "max_split_grantsgov_records" {
description = "Optional limit (i.e. for testing) on the number of records that SplitGrantsGovXMLDB handler will process during a single invocation."
description = "Optional hard limit (i.e. for testing) on the number of records (of any type) that SplitGrantsGovXMLDB handler will process during a single invocation."
type = number
default = -1
}

variable "max_split_grantsgov_opportunity_records" {
description = "Optional limit (i.e. for testing) on the number of opportunity records that SplitGrantsGovXMLDB handler will process during a single invocation."
type = number
default = -1
}

variable "max_split_grantsgov_forecast_records" {
description = "Optional limit (i.e. for testing) on the number of forecast records that SplitGrantsGovXMLDB handler will process during a single invocation."
type = number
default = -1
}
Loading