Skip to content

Commit

Permalink
Refactor and add testing for Athena and S3 row parsing (opencost#2916)
Browse files Browse the repository at this point in the history
* Refactor and add testing for Athena and S3 row parsing

Signed-off-by: Sean Holcomb <[email protected]>

* Add extra test cases

Signed-off-by: Sean Holcomb <[email protected]>

* Replace const with string literals to make "quality" gate pass

Signed-off-by: Sean Holcomb <[email protected]>

---------

Signed-off-by: Sean Holcomb <[email protected]>
  • Loading branch information
Sean-Holcomb authored Dec 9, 2024
1 parent 3880307 commit 768bcc1
Show file tree
Hide file tree
Showing 4 changed files with 895 additions and 173 deletions.
21 changes: 11 additions & 10 deletions pkg/cloud/aws/athenaintegration.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,10 +170,12 @@ func (ai *AthenaIntegration) GetCloudCost(start, end time.Time) (*opencost.Cloud

// Generate row handling function.
rowHandler := func(row types.Row) {
err2 := ai.RowToCloudCost(row, aqi, ccsr)
cc, err2 := athenaRowToCloudCost(row, aqi)
if err2 != nil {
log.Errorf("AthenaIntegration: GetCloudCost: error while parsing row: %s", err2.Error())
return
}
ccsr.LoadCloudCost(cc)
}
log.Debugf("AthenaIntegration[%s]: GetCloudCost: querying: %s", ai.Key(), aqi.Query)
// Query CUR data and fill out CCSR
Expand Down Expand Up @@ -334,9 +336,9 @@ func (ai *AthenaIntegration) GetPartitionWhere(start, end time.Time) string {
return str
}

func (ai *AthenaIntegration) RowToCloudCost(row types.Row, aqi AthenaQueryIndexes, ccsr *opencost.CloudCostSetRange) error {
func athenaRowToCloudCost(row types.Row, aqi AthenaQueryIndexes) (*opencost.CloudCost, error) {
if len(row.Data) < len(aqi.ColumnIndexes) {
return fmt.Errorf("rowToCloudCost: row with fewer than %d columns (has only %d)", len(aqi.ColumnIndexes), len(row.Data))
return nil, fmt.Errorf("rowToCloudCost: row with fewer than %d columns (has only %d)", len(aqi.ColumnIndexes), len(row.Data))
}

// Iterate through the slice of tag columns, assigning
Expand Down Expand Up @@ -379,22 +381,22 @@ func (ai *AthenaIntegration) RowToCloudCost(row types.Row, aqi AthenaQueryIndexe

listCost, err := GetAthenaRowValueFloat(row, aqi.ColumnIndexes, aqi.ListCostColumn)
if err != nil {
return err
return nil, err
}

netCost, err := GetAthenaRowValueFloat(row, aqi.ColumnIndexes, aqi.NetCostColumn)
if err != nil {
return err
return nil, err
}

amortizedNetCost, err := GetAthenaRowValueFloat(row, aqi.ColumnIndexes, aqi.AmortizedNetCostColumn)
if err != nil {
return err
return nil, err
}

amortizedCost, err := GetAthenaRowValueFloat(row, aqi.ColumnIndexes, aqi.AmortizedCostColumn)
if err != nil {
return err
return nil, err
}

// Identify resource category in the CUR
Expand Down Expand Up @@ -429,7 +431,7 @@ func (ai *AthenaIntegration) RowToCloudCost(row types.Row, aqi AthenaQueryIndexe

start, err := time.Parse(AthenaDateLayout, startStr)
if err != nil {
return fmt.Errorf("unable to parse %s: '%s'", AthenaDateTruncColumn, err.Error())
return nil, fmt.Errorf("unable to parse %s: '%s'", AthenaDateTruncColumn, err.Error())
}
end := start.AddDate(0, 0, 1)

Expand Down Expand Up @@ -458,8 +460,7 @@ func (ai *AthenaIntegration) RowToCloudCost(row types.Row, aqi AthenaQueryIndexe
},
}

ccsr.LoadCloudCost(cc)
return nil
return cc, nil
}

func (ai *AthenaIntegration) GetConnectionStatusFromResult(result cloud.EmptyChecker, currentStatus cloud.ConnectionStatus) cloud.ConnectionStatus {
Expand Down
Loading

0 comments on commit 768bcc1

Please sign in to comment.