Skip to content

Commit

Permalink
Ingest forecasted opportunities (#848)
Browse files Browse the repository at this point in the history
* Refactor SplitGrantsGovXMLDB command to ingest forecasted grants

* Update downstream lambda to only load opportunities into DynamoDB for now

* Update tests fore forecast grants

* Restore previous PersistGrantsGovXMLDB lambda function

Having the two lambdas deployed alongside each other for now assures
we won't drop any events for S3 objects in the old location. Eventually
we can remove the old v2.xml version of the lambda.

* Add flag for enabling forecasted grant processing

* Add tests for forecast enabled flag

* Fix failing test setup

* Properly test that skipped forecasted grants are indeed skipped

* Check that all test case opportunity IDs are distinct
  • Loading branch information
jeffsmohan authored Jul 10, 2024
1 parent 51dfc5b commit ef138fc
Show file tree
Hide file tree
Showing 8 changed files with 339 additions and 104 deletions.
124 changes: 62 additions & 62 deletions cmd/SplitGrantsGovXMLDB/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"bytes"
"context"
"encoding/xml"
"fmt"
"io"

"github.com/aws/aws-lambda-go/events"
Expand All @@ -14,54 +13,47 @@ import (
"github.com/go-kit/log/level"
"github.com/hashicorp/go-multierror"
"github.com/usdigitalresponse/grants-ingest/internal/log"
grantsgov "github.com/usdigitalresponse/grants-ingest/pkg/grantsSchemas/grants.gov"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
)

const (
MB = int64(1024 * 1024)
GRANT_OPPORTUNITY_XML_NAME = "OpportunitySynopsisDetail_1_0"
GRANT_FORECAST_XML_NAME = "OpportunityForecastDetail_1_0"
)

type opportunity grantsgov.OpportunitySynopsisDetail_1_0

// S3ObjectKey returns a string to use as the object key when saving the opportunity to an S3 bucket.
func (o *opportunity) S3ObjectKey() string {
return fmt.Sprintf("%s/%s/grants.gov/v2.xml", o.OpportunityID[0:3], o.OpportunityID)
}

// handleS3Event handles events representing S3 bucket notifications of type "ObjectCreated:*"
// for XML DB extracts saved from Grants.gov. The XML data from the source S3 object provided
// by each event record is read from S3. Grant opportunity records are extracted from the XML
// by each event record is read from S3. Grant opportunity/forecast records are extracted from the XML
// and uploaded to a "prepared data" destination bucket as individual S3 objects.
// Uploads are handled by a pool of workers; the size of the pool is determined by the
// MAX_CONCURRENT_UPLOADS environment variable.
// Returns and error that represents any and all errors accumulated during the invocation,
// either while handling a source object or while processing its contents; an error may indicate
// a partial or complete invocation failure.
// Returns nil when all grant opportunities are successfully processed from all source records,
// Returns nil when all grant records are successfully processed from all source records,
// indicating complete success.
func handleS3EventWithConfig(cfg aws.Config, ctx context.Context, s3Event events.S3Event) error {
// Configure service clients
s3svc := s3.NewFromConfig(cfg, func(o *s3.Options) {
o.UsePathStyle = env.UsePathStyleS3Opt
})

// Create an opportunities channel to direct grantOpportunity values parsed from the source
// Create a records channel to direct opportunity/forecast values parsed from the source
// record to individual S3 object uploads
opportunities := make(chan opportunity)
records := make(chan grantRecord)

// Create a pool of workers to consume and upload values received from the opportunities channel
// Create a pool of workers to consume and upload values received from the records channel
processingSpan, processingCtx := tracer.StartSpanFromContext(ctx, "processing")
wg := multierror.Group{}
for i := 0; i < env.MaxConcurrentUploads; i++ {
wg.Go(func() error {
return processOpportunities(processingCtx, s3svc, opportunities)
return processRecords(processingCtx, s3svc, records)
})
}

// Iterate over all received source records to split into per-grant values and submit them to
// the opportunities channel for processing by the workers pool. Instead of failing on the
// the records channel for processing by the workers pool. Instead of failing on the
// first encountered error, we instead accumulate them into a single "multi-error".
// Only one source record is consumed at a time; in normal cases, the invocation event
// will only provide a single source record.
Expand All @@ -86,8 +78,8 @@ func handleS3EventWithConfig(cfg aws.Config, ctx context.Context, s3Event events
}

buffer := bufio.NewReaderSize(resp.Body, int(env.DownloadChunkLimit*MB))
if err := readOpportunities(recordCtx, buffer, opportunities); err != nil {
log.Error(logger, "Error reading source opportunities from S3", err)
if err := readRecords(recordCtx, buffer, records); err != nil {
log.Error(logger, "Error reading source records from S3", err)
return err
}

Expand All @@ -102,7 +94,7 @@ func handleS3EventWithConfig(cfg aws.Config, ctx context.Context, s3Event events

// All source records have been consumed; close the channel so that workers shut down
// after the channel is emptied.
close(opportunities)
close(records)
sourcingSpan.Finish()

// Wait for workers to finish processing and collect any errors they encountered
Expand Down Expand Up @@ -130,11 +122,11 @@ func handleS3EventWithConfig(cfg aws.Config, ctx context.Context, s3Event events
return nil
}

// readOpportunities reads XML from r, sending all parsed grantOpportunity records to ch.
// readRecords reads XML from r, sending all parsed grantRecords to ch.
// Returns nil when the end of the file is reached.
// readOpportunities stops and returns an error when the context is canceled
// readRecords stops and returns an error when the context is canceled
// or an error is encountered while reading.
func readOpportunities(ctx context.Context, r io.Reader, ch chan<- opportunity) error {
func readRecords(ctx context.Context, r io.Reader, ch chan<- grantRecord) error {
span, ctx := tracer.StartSpanFromContext(ctx, "read.xml")

d := xml.NewDecoder(r)
Expand All @@ -157,34 +149,44 @@ func readOpportunities(ctx context.Context, r io.Reader, ch chan<- opportunity)
return err
}

// When reading the start of a new element, check if it is a grant opportunity
se, ok := token.(xml.StartElement)
if ok && se.Name.Local == GRANT_OPPORTUNITY_XML_NAME {
var opportunity opportunity
if err := d.DecodeElement(&opportunity, &se); err != nil {
level.Error(logger).Log("msg", "Error decoding XML token", "error", err)
// When reading the start of a new element, check if it is a grant opportunity or forecast
if se, ok := token.(xml.StartElement); ok {
var err error
if se.Name.Local == GRANT_OPPORTUNITY_XML_NAME {
var o opportunity
if err = d.DecodeElement(&o, &se); err == nil {
ch <- &o
}
} else if se.Name.Local == GRANT_FORECAST_XML_NAME && env.IsForecastedGrantsEnabled {
var f forecast
if err = d.DecodeElement(&f, &se); err == nil {
ch <- &f
}
}

if err != nil {
log.Error(logger, "Error decoding XML", err, "element_name", se.Name.Local)
span.Finish(tracer.WithError(err))
return err
}
ch <- opportunity
}
}
log.Info(logger, "Finished reading opportunities from source")
log.Info(logger, "Finished reading source XML")
span.Finish()
return nil
}

// processOpportunities is a work loop that receives and processes grantOpportunity value until
// processRecords is a work loop that receives and processes grantRecord values until
// the receive channel is closed and returns or the context is canceled.
// It returns a multi-error containing any errors encountered while processing a received
// grantOpportunity as well as the reason for the context cancelation, if any.
// Returns nil if all opportunities were processed successfully until the channel was closed.
func processOpportunities(ctx context.Context, svc *s3.Client, ch <-chan opportunity) (errs error) {
// grantRecord as well as the reason for the context cancelation, if any.
// Returns nil if all records were processed successfully until the channel was closed.
func processRecords(ctx context.Context, svc *s3.Client, ch <-chan grantRecord) (errs error) {
span, ctx := tracer.StartSpanFromContext(ctx, "processing.worker")

whenCanceled := func() error {
err := ctx.Err()
log.Debug(logger, "Done processing opportunities because context canceled", "reason", err)
log.Debug(logger, "Done processing records because context canceled", "reason", err)
span.Finish(tracer.WithError(err))
errs = multierror.Append(errs, err)
return errs
Expand All @@ -199,17 +201,17 @@ func processOpportunities(ctx context.Context, svc *s3.Client, ch <-chan opportu

default:
select {
case opportunity, ok := <-ch:
case record, ok := <-ch:
if !ok {
log.Debug(logger, "Done processing opportunities because channel is closed")
log.Debug(logger, "Done processing records because channel is closed")
span.Finish()
return
}

workSpan, ctx := tracer.StartSpanFromContext(ctx, "processing.worker.work")
err := processOpportunity(ctx, svc, opportunity)
err := processRecord(ctx, svc, record)
if err != nil {
sendMetric("opportunity.failed", 1)
sendMetric("record.failed", 1)
errs = multierror.Append(errs, err)
}
workSpan.Finish(tracer.WithError(err))
Expand All @@ -221,58 +223,56 @@ func processOpportunities(ctx context.Context, svc *s3.Client, ch <-chan opportu
}
}

// processOpportunity takes a single opportunity and conditionally uploads an XML
// representation of the opportunity to its configured S3 destination. Before uploading,
// processRecord takes a single record and conditionally uploads an XML
// representation of the grant forecast/opportunity to its configured S3 destination. Before uploading,
// any extant S3 object with a matching key in the bucket named by env.DestinationBucket
// is compared with the opportunity. An upload is initiated when the opportunity was updated
// is compared with the record. An upload is initiated when the record was updated
// more recently than the extant object was last modified, or when no extant object exists.
func processOpportunity(ctx context.Context, svc S3ReadWriteObjectAPI, opp opportunity) error {
logger := log.With(logger,
"opportunity_id", opp.OpportunityID, "opportunity_number", opp.OpportunityNumber)
func processRecord(ctx context.Context, svc S3ReadWriteObjectAPI, record grantRecord) error {
logger := record.logWith(logger)

lastModified, err := opp.LastUpdatedDate.Time()
lastModified, err := record.lastModified()
if err != nil {
return log.Errorf(logger, "Error getting last modified time for opportunity", err)
return log.Errorf(logger, "Error getting last modified time for record", err)
}
log.Debug(logger, "Parsed last modified time from opportunity last update date",
"raw_value", opp.LastUpdatedDate, "parsed_value", lastModified)
logger = log.With(logger, "opportunity_last_modified", lastModified)
logger = log.With(logger, "record_last_modified", lastModified)
log.Debug(logger, "Parsed last modified time from record last update date")

key := opp.S3ObjectKey()
key := record.s3ObjectKey()
logger = log.With(logger, "bucket", env.DestinationBucket, "key", key)
remoteLastModified, err := GetS3LastModified(ctx, svc, env.DestinationBucket, key)
if err != nil {
return log.Errorf(logger, "Error determining last modified time for remote opportunity", err)
return log.Errorf(logger, "Error determining last modified time for remote record", err)
}
logger = log.With(logger, "remote_last_modified", remoteLastModified)

isNew := false
if remoteLastModified != nil {
if remoteLastModified.After(lastModified) {
log.Debug(logger, "Skipping opportunity upload because the extant record is up-to-date")
sendMetric("opportunity.skipped", 1)
log.Debug(logger, "Skipping record upload because the extant record is up-to-date")
sendMetric("record.skipped", 1)
return nil
}
log.Debug(logger, "Uploading updated opportunity to replace outdated remote record")
log.Debug(logger, "Uploading updated record to replace outdated remote record")
} else {
isNew = true
log.Debug(logger, "Uploading new opportunity")
log.Debug(logger, "Uploading new record")
}

b, err := xml.Marshal(grantsgov.OpportunitySynopsisDetail_1_0(opp))
b, err := record.toXML()
if err != nil {
return log.Errorf(logger, "Error marshaling XML for opportunity", err)
return log.Errorf(logger, "Error marshaling XML for record", err)
}

if err := UploadS3Object(ctx, svc, env.DestinationBucket, key, bytes.NewReader(b)); err != nil {
return log.Errorf(logger, "Error uploading prepared grant opportunity to S3", err)
return log.Errorf(logger, "Error uploading prepared grant record to S3", err)
}

log.Info(logger, "Successfully uploaded opportunity")
log.Info(logger, "Successfully uploaded record")
if isNew {
sendMetric("opportunity.created", 1)
sendMetric("record.created", 1)
} else {
sendMetric("opportunity.updated", 1)
sendMetric("record.updated", 1)
}
return nil
}
Loading

0 comments on commit ef138fc

Please sign in to comment.