diff --git a/cmd/SplitGrantsGovXMLDB/handler.go b/cmd/SplitGrantsGovXMLDB/handler.go index 1360d5fc..24b152ab 100644 --- a/cmd/SplitGrantsGovXMLDB/handler.go +++ b/cmd/SplitGrantsGovXMLDB/handler.go @@ -5,7 +5,6 @@ import ( "bytes" "context" "encoding/xml" - "fmt" "io" "github.com/aws/aws-lambda-go/events" @@ -14,32 +13,25 @@ import ( "github.com/go-kit/log/level" "github.com/hashicorp/go-multierror" "github.com/usdigitalresponse/grants-ingest/internal/log" - grantsgov "github.com/usdigitalresponse/grants-ingest/pkg/grantsSchemas/grants.gov" "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" ) const ( MB = int64(1024 * 1024) GRANT_OPPORTUNITY_XML_NAME = "OpportunitySynopsisDetail_1_0" + GRANT_FORECAST_XML_NAME = "OpportunityForecastDetail_1_0" ) -type opportunity grantsgov.OpportunitySynopsisDetail_1_0 - -// S3ObjectKey returns a string to use as the object key when saving the opportunity to an S3 bucket. -func (o *opportunity) S3ObjectKey() string { - return fmt.Sprintf("%s/%s/grants.gov/v2.xml", o.OpportunityID[0:3], o.OpportunityID) -} - // handleS3Event handles events representing S3 bucket notifications of type "ObjectCreated:*" // for XML DB extracts saved from Grants.gov. The XML data from the source S3 object provided -// by each event record is read from S3. Grant opportunity records are extracted from the XML +// by each event record is read from S3. Grant opportunity/forecast records are extracted from the XML // and uploaded to a "prepared data" destination bucket as individual S3 objects. // Uploads are handled by a pool of workers; the size of the pool is determined by the // MAX_CONCURRENT_UPLOADS environment variable. // Returns and error that represents any and all errors accumulated during the invocation, // either while handling a source object or while processing its contents; an error may indicate // a partial or complete invocation failure. -// Returns nil when all grant opportunities are successfully processed from all source records, +// Returns nil when all grant records are successfully processed from all source records, // indicating complete success. func handleS3EventWithConfig(cfg aws.Config, ctx context.Context, s3Event events.S3Event) error { // Configure service clients @@ -47,21 +39,21 @@ func handleS3EventWithConfig(cfg aws.Config, ctx context.Context, s3Event events o.UsePathStyle = env.UsePathStyleS3Opt }) - // Create an opportunities channel to direct grantOpportunity values parsed from the source + // Create a records channel to direct opportunity/forecast values parsed from the source // record to individual S3 object uploads - opportunities := make(chan opportunity) + records := make(chan grantRecord) - // Create a pool of workers to consume and upload values received from the opportunities channel + // Create a pool of workers to consume and upload values received from the records channel processingSpan, processingCtx := tracer.StartSpanFromContext(ctx, "processing") wg := multierror.Group{} for i := 0; i < env.MaxConcurrentUploads; i++ { wg.Go(func() error { - return processOpportunities(processingCtx, s3svc, opportunities) + return processRecords(processingCtx, s3svc, records) }) } // Iterate over all received source records to split into per-grant values and submit them to - // the opportunities channel for processing by the workers pool. Instead of failing on the + // the records channel for processing by the workers pool. Instead of failing on the // first encountered error, we instead accumulate them into a single "multi-error". // Only one source record is consumed at a time; in normal cases, the invocation event // will only provide a single source record. @@ -86,8 +78,8 @@ func handleS3EventWithConfig(cfg aws.Config, ctx context.Context, s3Event events } buffer := bufio.NewReaderSize(resp.Body, int(env.DownloadChunkLimit*MB)) - if err := readOpportunities(recordCtx, buffer, opportunities); err != nil { - log.Error(logger, "Error reading source opportunities from S3", err) + if err := readRecords(recordCtx, buffer, records); err != nil { + log.Error(logger, "Error reading source records from S3", err) return err } @@ -102,7 +94,7 @@ func handleS3EventWithConfig(cfg aws.Config, ctx context.Context, s3Event events // All source records have been consumed; close the channel so that workers shut down // after the channel is emptied. - close(opportunities) + close(records) sourcingSpan.Finish() // Wait for workers to finish processing and collect any errors they encountered @@ -130,11 +122,11 @@ func handleS3EventWithConfig(cfg aws.Config, ctx context.Context, s3Event events return nil } -// readOpportunities reads XML from r, sending all parsed grantOpportunity records to ch. +// readRecords reads XML from r, sending all parsed grantRecords to ch. // Returns nil when the end of the file is reached. -// readOpportunities stops and returns an error when the context is canceled +// readRecords stops and returns an error when the context is canceled // or an error is encountered while reading. -func readOpportunities(ctx context.Context, r io.Reader, ch chan<- opportunity) error { +func readRecords(ctx context.Context, r io.Reader, ch chan<- grantRecord) error { span, ctx := tracer.StartSpanFromContext(ctx, "read.xml") d := xml.NewDecoder(r) @@ -157,34 +149,44 @@ func readOpportunities(ctx context.Context, r io.Reader, ch chan<- opportunity) return err } - // When reading the start of a new element, check if it is a grant opportunity - se, ok := token.(xml.StartElement) - if ok && se.Name.Local == GRANT_OPPORTUNITY_XML_NAME { - var opportunity opportunity - if err := d.DecodeElement(&opportunity, &se); err != nil { - level.Error(logger).Log("msg", "Error decoding XML token", "error", err) + // When reading the start of a new element, check if it is a grant opportunity or forecast + if se, ok := token.(xml.StartElement); ok { + var err error + if se.Name.Local == GRANT_OPPORTUNITY_XML_NAME { + var o opportunity + if err = d.DecodeElement(&o, &se); err == nil { + ch <- &o + } + } else if se.Name.Local == GRANT_FORECAST_XML_NAME && env.IsForecastedGrantsEnabled { + var f forecast + if err = d.DecodeElement(&f, &se); err == nil { + ch <- &f + } + } + + if err != nil { + log.Error(logger, "Error decoding XML", err, "element_name", se.Name.Local) span.Finish(tracer.WithError(err)) return err } - ch <- opportunity } } - log.Info(logger, "Finished reading opportunities from source") + log.Info(logger, "Finished reading source XML") span.Finish() return nil } -// processOpportunities is a work loop that receives and processes grantOpportunity value until +// processRecords is a work loop that receives and processes grantRecord values until // the receive channel is closed and returns or the context is canceled. // It returns a multi-error containing any errors encountered while processing a received -// grantOpportunity as well as the reason for the context cancelation, if any. -// Returns nil if all opportunities were processed successfully until the channel was closed. -func processOpportunities(ctx context.Context, svc *s3.Client, ch <-chan opportunity) (errs error) { +// grantRecord as well as the reason for the context cancelation, if any. +// Returns nil if all records were processed successfully until the channel was closed. +func processRecords(ctx context.Context, svc *s3.Client, ch <-chan grantRecord) (errs error) { span, ctx := tracer.StartSpanFromContext(ctx, "processing.worker") whenCanceled := func() error { err := ctx.Err() - log.Debug(logger, "Done processing opportunities because context canceled", "reason", err) + log.Debug(logger, "Done processing records because context canceled", "reason", err) span.Finish(tracer.WithError(err)) errs = multierror.Append(errs, err) return errs @@ -199,17 +201,17 @@ func processOpportunities(ctx context.Context, svc *s3.Client, ch <-chan opportu default: select { - case opportunity, ok := <-ch: + case record, ok := <-ch: if !ok { - log.Debug(logger, "Done processing opportunities because channel is closed") + log.Debug(logger, "Done processing records because channel is closed") span.Finish() return } workSpan, ctx := tracer.StartSpanFromContext(ctx, "processing.worker.work") - err := processOpportunity(ctx, svc, opportunity) + err := processRecord(ctx, svc, record) if err != nil { - sendMetric("opportunity.failed", 1) + sendMetric("record.failed", 1) errs = multierror.Append(errs, err) } workSpan.Finish(tracer.WithError(err)) @@ -221,58 +223,56 @@ func processOpportunities(ctx context.Context, svc *s3.Client, ch <-chan opportu } } -// processOpportunity takes a single opportunity and conditionally uploads an XML -// representation of the opportunity to its configured S3 destination. Before uploading, +// processRecord takes a single record and conditionally uploads an XML +// representation of the grant forecast/opportunity to its configured S3 destination. Before uploading, // any extant S3 object with a matching key in the bucket named by env.DestinationBucket -// is compared with the opportunity. An upload is initiated when the opportunity was updated +// is compared with the record. An upload is initiated when the record was updated // more recently than the extant object was last modified, or when no extant object exists. -func processOpportunity(ctx context.Context, svc S3ReadWriteObjectAPI, opp opportunity) error { - logger := log.With(logger, - "opportunity_id", opp.OpportunityID, "opportunity_number", opp.OpportunityNumber) +func processRecord(ctx context.Context, svc S3ReadWriteObjectAPI, record grantRecord) error { + logger := record.logWith(logger) - lastModified, err := opp.LastUpdatedDate.Time() + lastModified, err := record.lastModified() if err != nil { - return log.Errorf(logger, "Error getting last modified time for opportunity", err) + return log.Errorf(logger, "Error getting last modified time for record", err) } - log.Debug(logger, "Parsed last modified time from opportunity last update date", - "raw_value", opp.LastUpdatedDate, "parsed_value", lastModified) - logger = log.With(logger, "opportunity_last_modified", lastModified) + logger = log.With(logger, "record_last_modified", lastModified) + log.Debug(logger, "Parsed last modified time from record last update date") - key := opp.S3ObjectKey() + key := record.s3ObjectKey() logger = log.With(logger, "bucket", env.DestinationBucket, "key", key) remoteLastModified, err := GetS3LastModified(ctx, svc, env.DestinationBucket, key) if err != nil { - return log.Errorf(logger, "Error determining last modified time for remote opportunity", err) + return log.Errorf(logger, "Error determining last modified time for remote record", err) } logger = log.With(logger, "remote_last_modified", remoteLastModified) isNew := false if remoteLastModified != nil { if remoteLastModified.After(lastModified) { - log.Debug(logger, "Skipping opportunity upload because the extant record is up-to-date") - sendMetric("opportunity.skipped", 1) + log.Debug(logger, "Skipping record upload because the extant record is up-to-date") + sendMetric("record.skipped", 1) return nil } - log.Debug(logger, "Uploading updated opportunity to replace outdated remote record") + log.Debug(logger, "Uploading updated record to replace outdated remote record") } else { isNew = true - log.Debug(logger, "Uploading new opportunity") + log.Debug(logger, "Uploading new record") } - b, err := xml.Marshal(grantsgov.OpportunitySynopsisDetail_1_0(opp)) + b, err := record.toXML() if err != nil { - return log.Errorf(logger, "Error marshaling XML for opportunity", err) + return log.Errorf(logger, "Error marshaling XML for record", err) } if err := UploadS3Object(ctx, svc, env.DestinationBucket, key, bytes.NewReader(b)); err != nil { - return log.Errorf(logger, "Error uploading prepared grant opportunity to S3", err) + return log.Errorf(logger, "Error uploading prepared grant record to S3", err) } - log.Info(logger, "Successfully uploaded opportunity") + log.Info(logger, "Successfully uploaded record") if isNew { - sendMetric("opportunity.created", 1) + sendMetric("record.created", 1) } else { - sendMetric("opportunity.updated", 1) + sendMetric("record.updated", 1) } return nil } diff --git a/cmd/SplitGrantsGovXMLDB/handler_test.go b/cmd/SplitGrantsGovXMLDB/handler_test.go index 18005b8f..6e079842 100644 --- a/cmd/SplitGrantsGovXMLDB/handler_test.go +++ b/cmd/SplitGrantsGovXMLDB/handler_test.go @@ -30,11 +30,6 @@ import ( grantsgov "github.com/usdigitalresponse/grants-ingest/pkg/grantsSchemas/grants.gov" ) -func TestOpportunityS3ObjectKey(t *testing.T) { - opp := &opportunity{OpportunityID: "123456789"} - assert.Equal(t, opp.S3ObjectKey(), "123/123456789/grants.gov/v2.xml") -} - func setupLambdaEnvForTesting(t *testing.T) { t.Helper() @@ -119,6 +114,60 @@ const SOURCE_OPPORTUNITY_TEMPLATE = ` ` +const SOURCE_FORECAST_TEMPLATE = ` + + {{.OpportunityID}} + Fun Grant + ABCD-1234 + Some Category + Clarinet + My Funding Category + Meow meow meow + 1234.567 + 25 + This is some additional information on eligibility. + TEST-AC + Bureau of Testing + 09082022 + 02102016 + 2016 + 04112016 + 09082016 + 09302016 + {{.LastUpdatedDate}} + 600000 + 400000 + 600000 + 10 + Here is a description of the opportunity. + Synopsis 2 + No + 02012023 + test@example.gov + Inquiries + Tester Person + 800-123-4567 + +` + +type grantValues struct { + template string + OpportunityID string + LastUpdatedDate string + isExtant bool + isValid bool + isSkipped bool + isForecast bool +} + +func (values grantValues) getFilename() string { + if values.isForecast { + return "v2.OpportunityForecastDetail_1_0.xml" + } else { + return "v2.OpportunitySynopsisDetail_1_0.xml" + } +} + func TestLambdaInvocationScenarios(t *testing.T) { setupLambdaEnvForTesting(t) sourceBucketName := "test-source-bucket" @@ -126,96 +175,177 @@ func TestLambdaInvocationScenarios(t *testing.T) { s3client, cfg, err := setupS3ForTesting(t, sourceBucketName) assert.NoError(t, err, "Error configuring test environment") - type grantValues struct { - template string - OpportunityID string - LastUpdatedDate string - isExtant bool - isValid bool - } + seenOpportunityIDs := make(map[string]struct{}) for _, tt := range []struct { - name string - grantValues []grantValues + name string + isForecastedGrantsEnabled bool + grantValues []grantValues }{ { "Well-formed source XML for single new grant", + true, []grantValues{ { SOURCE_OPPORTUNITY_TEMPLATE, - "1234", + "1001", + now.AddDate(-1, 0, 0).Format("01022006"), + false, + true, + false, + false, + }, + }, + }, + { + "Well-formed source XML for single new forecast", + true, + []grantValues{ + { + SOURCE_FORECAST_TEMPLATE, + "1002", now.AddDate(-1, 0, 0).Format("01022006"), false, true, + false, + true, + }, + }, + }, + { + "When flag is disabled, ignores well-formed source XML for single new forecast", + false, + []grantValues{ + { + SOURCE_FORECAST_TEMPLATE, + "1003", + now.AddDate(-1, 0, 0).Format("01022006"), + false, + true, + true, + true, + }, + }, + }, + { + "Mixed well-formed grant and forecast", + true, + []grantValues{ + { + SOURCE_OPPORTUNITY_TEMPLATE, + "1004", + now.AddDate(-1, 0, 0).Format("01022006"), + false, + true, + false, + false, + }, + { + SOURCE_FORECAST_TEMPLATE, + "1005", + now.AddDate(-1, 0, 0).Format("01022006"), + false, + true, + false, + true, }, }, }, { "One grant to update and one to ignore", + true, []grantValues{ { SOURCE_OPPORTUNITY_TEMPLATE, - "2345", + "1006", now.AddDate(-1, 0, 0).Format("01022006"), true, true, + false, + false, }, { SOURCE_OPPORTUNITY_TEMPLATE, - "3456", + "1007", now.AddDate(1, 0, 0).Format("01022006"), true, true, + false, + false, }, }, }, { "One grant to update and one with malformed source data", + true, []grantValues{ { SOURCE_OPPORTUNITY_TEMPLATE, - "4567", + "1008", now.AddDate(-1, 0, 0).Format("01022006"), true, true, + false, + false, }, { ` {{.OpportunityID}} {{.LastUpdatedDate}} Fun Grant`, - "5678", + "1009", now.AddDate(-1, 0, 0).Format("01022006"), false, false, + false, + false, }, }, }, { "One grant with invalid date format", + true, []grantValues{ { SOURCE_OPPORTUNITY_TEMPLATE, - "6789", + "1010", now.AddDate(-1, 0, 0).Format("01/02/06"), false, false, + false, + false, }, }, }, { "Source contains invalid token", + true, []grantValues{ { "") @@ -230,8 +360,8 @@ func TestLambdaInvocationScenarios(t *testing.T) { "LastUpdatedDate": values.LastUpdatedDate, })) if values.isExtant { - extantKey := fmt.Sprintf("%s/%s/grants.gov/v2.xml", - values.OpportunityID[0:3], values.OpportunityID) + extantKey := fmt.Sprintf("%s/%s/grants.gov/%s", + values.OpportunityID[0:3], values.OpportunityID, values.getFilename()) _, err := s3client.PutObject(context.TODO(), &s3.PutObjectInput{ Bucket: aws.String(env.DestinationBucket), Key: aws.String(extantKey), @@ -246,7 +376,9 @@ func TestLambdaInvocationScenarios(t *testing.T) { _, err = sourceGrantsData.WriteString("") require.NoError(t, err) + // Execute the test case t.Run(tt.name, func(t *testing.T) { + // Place the XML file constructed above into the correct S3 location objectKey := fmt.Sprintf("sources/%s/grants.gov/extract.xml", now.Format("2006/01/02")) _, err := s3client.PutObject(context.TODO(), &s3.PutObjectInput{ Bucket: aws.String(sourceBucketName), @@ -255,6 +387,7 @@ func TestLambdaInvocationScenarios(t *testing.T) { }) require.NoErrorf(t, err, "Error creating test source object %s", objectKey) + // Invoke the handler under test with a constructed S3 event invocationErr := handleS3EventWithConfig(cfg, context.TODO(), events.S3Event{ Records: []events.S3EventRecord{{ S3: events.S3Entity{ @@ -263,6 +396,8 @@ func TestLambdaInvocationScenarios(t *testing.T) { }, }}, }) + + // Determine the list of expected grant objects to have been saved by the handler sourceContainsInvalidOpportunities := false for _, v := range tt.grantValues { if !v.isValid { @@ -274,24 +409,27 @@ func TestLambdaInvocationScenarios(t *testing.T) { } else { require.NoError(t, invocationErr) } - var expectedGrants grantsgov.Grants err = xml.Unmarshal(sourceGrantsData.Bytes(), &expectedGrants) if !sourceContainsInvalidOpportunities { require.NoError(t, err) } + // For each grant value in the test case, we'll verify it was handled correctly for _, v := range tt.grantValues { - key := fmt.Sprintf("%s/%s/grants.gov/v2.xml", - v.OpportunityID[0:3], v.OpportunityID) + key := fmt.Sprintf("%s/%s/grants.gov/%s", + v.OpportunityID[0:3], v.OpportunityID, v.getFilename()) resp, err := s3client.GetObject(context.TODO(), &s3.GetObjectInput{ Bucket: aws.String(env.DestinationBucket), Key: aws.String(key), }) - if !v.isValid && !v.isExtant { + if v.isSkipped || (!v.isValid && !v.isExtant) { + // If there was no extant file and the new grant is invalid, or if we were meant to skip + // this grant, there should be no S3 file assert.Error(t, err) } else { + // Otherwise, we verify the S3 file matches the source from the test case require.NoError(t, err) b, err := io.ReadAll(resp.Body) require.NoError(t, err) @@ -352,7 +490,7 @@ func TestLambdaInvocationScenarios(t *testing.T) { _, err = s3client.GetObject(context.Background(), &s3.GetObjectInput{ Bucket: aws.String(env.DestinationBucket), - Key: aws.String("123/12345/grants.gov/v2.xml"), + Key: aws.String("123/12345/grants.gov/v2.OpportunitySynopsisDetail_1_0.xml"), }) assert.NoError(t, err, "Expected destination object was not created") }) @@ -392,18 +530,18 @@ func (r *MockReader) Read(p []byte) (int, error) { return r.read(p) } -func TestReadOpportunities(t *testing.T) { +func TestReadRecords(t *testing.T) { t.Run("Context cancelled between reads", func(t *testing.T) { ctx, cancel := context.WithCancel(context.TODO()) - err := readOpportunities(ctx, &MockReader{func(p []byte) (int, error) { + err := readRecords(ctx, &MockReader{func(p []byte) (int, error) { cancel() return int(copy(p, []byte(""))), nil - }}, make(chan<- opportunity, 10)) + }}, make(chan<- grantRecord, 10)) assert.ErrorIs(t, err, context.Canceled) }) } -func TestProcessOpportunity(t *testing.T) { +func TestProcessRecord(t *testing.T) { now := time.Now() testOpportunity := opportunity{ OpportunityID: "1234", @@ -422,8 +560,8 @@ func TestProcessOpportunity(t *testing.T) { mockGetObjectAPI(nil), mockPutObjectAPI(nil), } - err := processOpportunity(context.TODO(), c, testOpportunity) - assert.ErrorContains(t, err, "Error determining last modified time for remote opportunity") + err := processRecord(context.TODO(), c, testOpportunity) + assert.ErrorContains(t, err, "Error determining last modified time for remote record") }) t.Run("Error uploading to S3", func(t *testing.T) { @@ -450,7 +588,7 @@ func TestProcessOpportunity(t *testing.T) { }), } fmt.Printf("%T", s3Client) - err := processOpportunity(context.TODO(), s3Client, testOpportunity) - assert.ErrorContains(t, err, "Error uploading prepared grant opportunity to S3") + err := processRecord(context.TODO(), s3Client, testOpportunity) + assert.ErrorContains(t, err, "Error uploading prepared grant record to S3") }) } diff --git a/cmd/SplitGrantsGovXMLDB/main.go b/cmd/SplitGrantsGovXMLDB/main.go index 923e4da1..68806b3a 100644 --- a/cmd/SplitGrantsGovXMLDB/main.go +++ b/cmd/SplitGrantsGovXMLDB/main.go @@ -27,12 +27,13 @@ import ( ) type Environment struct { - LogLevel string `env:"LOG_LEVEL,default=INFO"` - DownloadChunkLimit int64 `env:"DOWNLOAD_CHUNK_LIMIT,default=10"` - DestinationBucket string `env:"GRANTS_PREPARED_DATA_BUCKET_NAME,required=true"` - MaxConcurrentUploads int `env:"MAX_CONCURRENT_UPLOADS,default=1"` - UsePathStyleS3Opt bool `env:"S3_USE_PATH_STYLE,default=false"` - Extras goenv.EnvSet + LogLevel string `env:"LOG_LEVEL,default=INFO"` + DownloadChunkLimit int64 `env:"DOWNLOAD_CHUNK_LIMIT,default=10"` + DestinationBucket string `env:"GRANTS_PREPARED_DATA_BUCKET_NAME,required=true"` + MaxConcurrentUploads int `env:"MAX_CONCURRENT_UPLOADS,default=1"` + UsePathStyleS3Opt bool `env:"S3_USE_PATH_STYLE,default=false"` + IsForecastedGrantsEnabled bool `env:"IS_FORECASTED_GRANTS_ENABLED,default=false"` + Extras goenv.EnvSet } var ( diff --git a/cmd/SplitGrantsGovXMLDB/types.go b/cmd/SplitGrantsGovXMLDB/types.go new file mode 100644 index 00000000..fd99e16a --- /dev/null +++ b/cmd/SplitGrantsGovXMLDB/types.go @@ -0,0 +1,66 @@ +package main + +import ( + "encoding/xml" + "fmt" + "time" + + "github.com/usdigitalresponse/grants-ingest/internal/log" + grantsgov "github.com/usdigitalresponse/grants-ingest/pkg/grantsSchemas/grants.gov" +) + +type grantRecord interface { + logWith(log.Logger) log.Logger + // s3ObjectKey returns a string to use as the object key when saving the opportunity to an S3 bucket + s3ObjectKey() string + lastModified() (time.Time, error) + toXML() ([]byte, error) +} + +type opportunity grantsgov.OpportunitySynopsisDetail_1_0 + +func (o opportunity) logWith(logger log.Logger) log.Logger { + return log.With(logger, + "opportunity_id", o.OpportunityID, + "opportunity_number", o.OpportunityNumber, + "is_forecast", false, + ) +} + +func (o opportunity) s3ObjectKey() string { + return fmt.Sprintf("%s/%s/grants.gov/v2.OpportunitySynopsisDetail_1_0.xml", + o.OpportunityID[0:3], o.OpportunityID, + ) +} + +func (o opportunity) lastModified() (time.Time, error) { + return o.LastUpdatedDate.Time() +} + +func (o opportunity) toXML() ([]byte, error) { + return xml.Marshal(grantsgov.OpportunitySynopsisDetail_1_0(o)) +} + +type forecast grantsgov.OpportunityForecastDetail_1_0 + +func (f forecast) logWith(logger log.Logger) log.Logger { + return log.With(logger, + "opportunity_id", f.OpportunityID, + "opportunity_number", f.OpportunityNumber, + "is_forecast", true, + ) +} + +func (f forecast) s3ObjectKey() string { + return fmt.Sprintf("%s/%s/grants.gov/v2.OpportunityForecastDetail_1_0.xml", + f.OpportunityID[0:3], f.OpportunityID, + ) +} + +func (f forecast) lastModified() (time.Time, error) { + return f.LastUpdatedDate.Time() +} + +func (f forecast) toXML() ([]byte, error) { + return xml.Marshal(grantsgov.OpportunityForecastDetail_1_0(f)) +} diff --git a/cmd/SplitGrantsGovXMLDB/types_test.go b/cmd/SplitGrantsGovXMLDB/types_test.go new file mode 100644 index 00000000..e823dd4e --- /dev/null +++ b/cmd/SplitGrantsGovXMLDB/types_test.go @@ -0,0 +1,17 @@ +package main + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestOpportunityS3ObjectKey(t *testing.T) { + opp := &opportunity{OpportunityID: "123456789"} + assert.Equal(t, opp.s3ObjectKey(), "123/123456789/grants.gov/v2.OpportunitySynopsisDetail_1_0.xml") +} + +func TestForecastS3ObjectKey(t *testing.T) { + f := &forecast{OpportunityID: "123456789"} + assert.Equal(t, f.s3ObjectKey(), "123/123456789/grants.gov/v2.OpportunityForecastDetail_1_0.xml") +} diff --git a/terraform/main.tf b/terraform/main.tf index 7888030c..f1f2f001 100644 --- a/terraform/main.tf +++ b/terraform/main.tf @@ -444,6 +444,12 @@ resource "aws_s3_bucket_notification" "grant_prepared_data" { filter_suffix = "/grants.gov/v2.xml" } + lambda_function { + lambda_function_arn = module.PersistGrantsGovXMLDB.lambda_function_arn + events = ["s3:ObjectCreated:*"] + filter_suffix = "/grants.gov/v2.OpportunitySynopsisDetail_1_0.xml" + } + lambda_function { lambda_function_arn = module.PersistFFISData.lambda_function_arn events = ["s3:ObjectCreated:*"] diff --git a/terraform/modules/SplitGrantsGovXMLDB/main.tf b/terraform/modules/SplitGrantsGovXMLDB/main.tf index b64e1f44..371acc3a 100644 --- a/terraform/modules/SplitGrantsGovXMLDB/main.tf +++ b/terraform/modules/SplitGrantsGovXMLDB/main.tf @@ -106,6 +106,7 @@ module "lambda_function" { GRANTS_PREPARED_DATA_BUCKET_NAME = data.aws_s3_bucket.prepared_data.id LOG_LEVEL = var.log_level MAX_CONCURRENT_UPLOADS = "10" + IS_FORECASTED_GRANTS_ENABLED = var.is_forecasted_grants_enabled }) allowed_triggers = { diff --git a/terraform/modules/SplitGrantsGovXMLDB/variables.tf b/terraform/modules/SplitGrantsGovXMLDB/variables.tf index 16660def..eba28d47 100644 --- a/terraform/modules/SplitGrantsGovXMLDB/variables.tf +++ b/terraform/modules/SplitGrantsGovXMLDB/variables.tf @@ -86,3 +86,9 @@ variable "grants_prepared_data_bucket_name" { description = "Name of the S3 bucket used to store grants prepared data." type = string } + +variable "is_forecasted_grants_enabled" { + description = "Flag to control whether forecasted grants should be processed and stored in S3." + type = bool + default = false +}