Skip to content

Commit

Permalink
fix(contracts): improve contracts and add tests (#534)
Browse files Browse the repository at this point in the history
* Refactor: Improve schema setup and benchmark case handling

Updated the benchmark setup to use tree structure for schemas and enhanced the benchmark case handling. Adjusted setup functions, created a `SetupSchemasInput`, modified results handling, and added more descriptive comments.

* Increase state machine timeout and add comment in benchmark

    Extended the state machine timeout from 30 to 120 minutes to accommodate longer-running benchmarks. Added a comment in the runSingleTest function to clarify the query of the index-0 stream as the root stream.

* Add depth check to prevent benchmarks from exceeding limits

Introduce a check in the benchmark setup to ensure that the tree's maximum depth does not exceed the PostgreSQL limitations, preventing potential errors. Added a new constant, maxDepth, set at 179, based on empirical findings.

* Increase timeouts and adjust polling intervals

Extended the state machine timeout from 2 hours to 6 hours to accommodate longer processing times. Adjusted task-specific timeouts and added a new polling interval to optimize the frequency of status checks during prolonged operations.

* Update execution timeout parameter in Step Functions

Changed the "timeoutSeconds" parameter to "executionTimeout" for better clarity. Also corrected the naming convention of the "TimeoutSeconds" constant to align with the updated AWS guideline.

* Optimize get_index_change to remove nested loop

* Set default LOG_RESULTS to true and conditionally print results.

Added default setting for LOG_RESULTS to true in TestBench to ensure results are logged unless specified otherwise. Modified benchmark.go to conditionally print results based on the LOG_RESULTS environment variable. Updated step_functions.go to explicitly set LOG_RESULTS to false when executing benchmark from the deployed environment.

* Add index change tests for contract validation

Implement test cases to validate index change and YoY index calculations. Includes initialization, data insertion, and result conversion to ensure accuracy and coverage of edge cases.

* Refactor error handling in benchmark workflow

Updated the benchmark workflow to introduce a `formatErrorState` pass state for better error formatting and handling. Replaced `Fail` state with a chain of `Pass` and `Fail` to ensure structured error information is passed upstream. Adjusted error catching and chaining to integrate with the new error handling structure.

* Remove unsupported micro instances from benchmark types

Micro instances were causing errors and hangs during tests, hence they have been commented out from the list of tested EC2 instance types. Medium and large instance types have been added to ensure thorough benchmarking.

* Optimize record insertion process

Modified insertRecordsForPrimitive function to use bulk insert for faster database operations. The records are now batched into a single SQL insert statement, significantly improving performance by reducing the number of individual insert operations.

* Add unit tests for NewTree function

Implemented comprehensive unit tests for the NewTree function covering various scenarios such as different quantities of streams and branching factors. The tests also include checks for tree structure, node properties, and special cases for larger trees.

* fix null without types at streams template

* Refactor composed_stream_template procedures

Added parameters `child_data_providers` and `child_stream_ids` to `get_raw_record` and `get_raw_index`. Updated the logic to buffer and emit ordered results, ensuring proper handling of data arrays' lengths and emitting results by date and taxonomy index sequentially.

* Assign default value to avoid null error in buffer handling

Assigned default value 0 to the buffer length to prevent null errors during buffer length evaluation. This ensures the buffer dates are processed correctly and avoids unexpected termination due to null length assignment.

* Add os package import to benchmark.go

* Fix critical bugs and optimize get_raw_record and get_raw_index procedures

- Add checks for empty child taxonomies to prevent null upper bound errors
- Improve buffer handling and array initialization to avoid potential issues
- Refactor loop structure for better efficiency and correctness
- Update comments and improve code readability

* Simplify data composition logic

Refactored the logic for handling child data providers and stream IDs by removing unnecessary buffering and looping. This results in cleaner code that directly returns data values and indices in a simplified manner, ensuring proper ordering by date and taxonomy index.

* Refactor and optimize taxonomy processing

Simplify the loop logic for processing taxonomies and emitting values by removing unnecessary steps and optimizing array handling. Introduce a new approach to handling array element removal and managing date-based value emission efficiently. This reduces code complexity and enhances maintainability.

* Add CSV export alongside markdown in Export Results Lambda

The function now merges CSV files and saves both a markdown and a CSV file back to the results bucket. New code handles reading and uploading the merged CSV file to S3, ensuring both formats are available.

* Wrap errors with more context for better debugging

Added `github.com/pkg/errors` to wrap errors throughout the codebase, providing more context and improving the debugging process. This includes error wrapping in file operations, schema creation, metadata insertion, and benchmark runs.

* Update kwil-db dependencies to latest versions

Upgraded the kwil-db, kwil-db/core, and kwil-db/parse modules to their latest revisions in the go.mod and go.sum files. This ensures we are using the most current features and fixes provided by these libraries.

* Add complex composed tests for contract validation

Introduce comprehensive test cases within `complex_composed_test.go` to validate various scenarios including record retrieval, index checks, latest value checks, out-of-range data handling, and error scenarios. Deploy necessary contracts and initialize datasets for testing.

* Add ToDisplay method to Tree and a test for visualization

The new ToDisplay method provides a string representation of the tree, showing parent-child relationships. A corresponding test function, TestDisplayTree, has been added to verify the output for various branching factors.

* Refactor load_test.go for improved stream and depth testing

Simplified the shapePairs for better clarity and added new test cases to evaluate the cost of adding streams and depth. Reduced the number of samples from 10 to 3, and adjusted the days array to exclude 3 days. Commented out tests that caused errors or had call stack size issues.

* Add CloudWatch log group for SSM command execution

Created a CloudWatch log group to capture logs for EC2 benchmark tasks. Updated IAM role to include CloudWatch managed policy for logging.

* Retry benchmark execution up to 3 times upon failure

Added a loop to attempt running the benchmark up to three times before giving up, with a 10-second interval between retries. This change ensures more robust handling of transient failures during benchmark execution. Also, removed redundant command concatenations for better readability.

* Fix tree initialization for single stream scenario

Ensure that the root node is correctly marked as a leaf when there is only one stream. This change returns the initialized tree immediately if the condition is met, optimizing the tree setup process.

* Chunk long-running tests to prevent Postgres timeouts

Split function tests into groups of 10 to avoid exhausting Postgres during execution. Introduced a helper function `chunk` to divide tests, ensuring better test reliability and stability.

* Remove retry logic from benchmark script

Simplified the benchmark step by removing the retry logic in the script. The benchmark will now run just once without reattempting on failure.

* Refactor benchmark functions and add results handling.

Introduced a results channel for collecting benchmark results and improved test robustness with retry logic. Added logging to track benchmark execution and integrated a cleanup function to handle interruptions gracefully.

* Add README for internal contracts directory

Introduce a README file for the `internal/contracts` directory, detailing the purpose and contents of the Kuneiform contracts used in the Truflation Stream Network (TSN). This includes descriptions of each contract file, synchronization practices, and links to additional resources.

* Refactor timeout handling in benchmark state machine

Updated timeout handling to use a centralized constant in the benchmark state machine. This improves maintainability by defining `TotalTimeout` in a new constants file and referencing it across the code. Consequently, it ensures consistency and eases future modifications.

* Parallelize schema parsing and batch metadata insertion

This update parallelizes the schema parsing process using goroutines to improve efficiency and adds a bulk insertion for metadata. These changes enhance the performance and overall speed of the setup operation.

* Disable 800 stream test cases due to memory issues

Commented out the test cases involving 800 streams as they cause memory starvation in t3.small instances. These tests significantly impact memory usage because they store the entire tree in memory.
  • Loading branch information
outerlook authored Sep 9, 2024
1 parent ac15d18 commit 53dd668
Show file tree
Hide file tree
Showing 16 changed files with 1,380 additions and 417 deletions.
2 changes: 2 additions & 0 deletions deployments/infra/stacks/benchmark/benchmark_stack.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ func BenchmarkStack(scope constructs.Construct, id string, props *awscdk.StackPr

// Add SSM managed policy
ec2InstanceRole.AddManagedPolicy(awsiam.ManagedPolicy_FromAwsManagedPolicyName(jsii.String("AmazonSSMManagedInstanceCore")))
// Add CloudWatch managed policy
ec2InstanceRole.AddManagedPolicy(awsiam.ManagedPolicy_FromAwsManagedPolicyName(jsii.String("CloudWatchAgentServerPolicy")))

// grant read permissions to the binary s3 asset
binaryS3Asset.GrantRead(ec2InstanceRole)
Expand Down
7 changes: 7 additions & 0 deletions deployments/infra/stacks/benchmark/constants.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package benchmark

import "time"

const (
TotalTimeout = 6 * time.Hour
)
57 changes: 27 additions & 30 deletions deployments/infra/stacks/benchmark/lambdas/exportresults/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import (

// -------------------------------------------------------------------------------------------------
// Export Results Lambda
// - takes a list of CSV files from the results bucket, merges them into a single file and saves them as a markdown file
// - the markdown file is then saved back to the results bucket
// - takes a list of CSV files from the results bucket, merges them into a single file and saves them as a markdown file and a CSV file
// - both files are then saved back to the results bucket
// - errors if there's no CSV files to process
// -------------------------------------------------------------------------------------------------

Expand All @@ -37,6 +37,7 @@ type Event struct {
}

const markdownFilePath = "/tmp/results.md"
const csvFilePath = "/tmp/results.csv"

func HandleRequest(ctx context.Context, event Event) error {
// delete if file exists. remember that lambdas can share the same filesystem accross multiple invocations1
Expand Down Expand Up @@ -123,46 +124,30 @@ func HandleRequest(ctx context.Context, event Event) error {
FilePath: markdownFilePath,
})

err = benchexport.SaveOrAppendToCSV(results, csvFilePath)
if err != nil {
log.Printf("Error processing file %s: %v", csvFile, err)
return err
}
}

log.Printf("Exporting results to s3://%s/%s.md", event.Bucket, event.KeyPrefix)

resultsKey := fmt.Sprintf("reports/%s.md", event.KeyPrefix)

// check if the file already exists
_, errExists := s3Client.HeadObject(&s3.HeadObjectInput{
Bucket: aws.String(event.Bucket),
Key: aws.String(resultsKey),
})

// we know the file doesn't exist if we receive an error. we won't try to get if it's 404 to be simple
if errExists != nil {
log.Printf("File already exists: %s", resultsKey)

// delete if it exists
log.Printf("Deleting file: %s", resultsKey)

_, err = s3Client.DeleteObject(&s3.DeleteObjectInput{
Bucket: aws.String(event.Bucket),
Key: aws.String(resultsKey),
})

if err != nil {
log.Printf("Error deleting file: %v", err)
log.Printf("Error processing file %s: %v", csvFile, err)
return err
}
}

log.Printf("Exporting results to s3://%s/%s.md", event.Bucket, event.KeyPrefix)

resultsKey := fmt.Sprintf("reports/%s.md", event.KeyPrefix)
csvResultsKey := fmt.Sprintf("reports/%s.csv", event.KeyPrefix)

mergedMdFile, err := os.ReadFile(markdownFilePath)
if err != nil {
log.Printf("Error uploading merged file: %v", err)
log.Printf("Error reading merged file: %v", err)
return err
}

mergedFile, err := os.ReadFile(markdownFilePath)
mergedCsvFile, err := os.ReadFile(csvFilePath)
if err != nil {
log.Printf("Error reading merged file: %v", err)
return err
Expand All @@ -172,12 +157,24 @@ func HandleRequest(ctx context.Context, event Event) error {
_, err = s3Client.PutObject(&s3.PutObjectInput{
Bucket: aws.String(event.Bucket),
Key: aws.String(resultsKey),
Body: bytes.NewReader(mergedFile),
Body: bytes.NewReader(mergedMdFile),
ContentType: aws.String("text/markdown"),
})

if err != nil {
log.Printf("Error uploading merged file: %v", err)
log.Printf("Error uploading markdown file: %v", err)
return err
}

_, err = s3Client.PutObject(&s3.PutObjectInput{
Bucket: aws.String(event.Bucket),
Key: aws.String(csvResultsKey),
Body: bytes.NewReader(mergedCsvFile),
ContentType: aws.String("text/csv"),
})

if err != nil {
log.Printf("Error uploading CSV file: %v", err)
return err
}

Expand Down
31 changes: 26 additions & 5 deletions deployments/infra/stacks/benchmark/step_functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ package benchmark

import (
"fmt"
"regexp"

"github.com/aws/aws-cdk-go/awscdk/v2"
"github.com/aws/aws-cdk-go/awscdk/v2/awsec2"
"github.com/aws/aws-cdk-go/awscdk/v2/awslambda"
"github.com/aws/aws-cdk-go/awscdk/v2/awslogs"
"github.com/aws/aws-cdk-go/awscdk/v2/awss3"
"github.com/aws/aws-cdk-go/awscdk/v2/awss3assets"
"github.com/aws/aws-cdk-go/awscdk/v2/awsstepfunctions"
Expand Down Expand Up @@ -104,7 +106,7 @@ func createStateMachine(scope constructs.Construct, input CreateStateMachineInpu

stateMachine := awsstepfunctions.NewStateMachine(scope, jsii.String("BenchmarkStateMachine"), &awsstepfunctions.StateMachineProps{
DefinitionBody: awsstepfunctions.DefinitionBody_FromChainable(benchmarkWorkflowChain),
Timeout: awscdk.Duration_Hours(jsii.Number(6)),
Timeout: awscdk.Duration_Hours(jsii.Number(TotalTimeout.Hours())),
// <stackname>-benchmark
StateMachineName: jsii.String(fmt.Sprintf("%s-benchmark", *awscdk.Aws_STACK_NAME())),
})
Expand Down Expand Up @@ -155,6 +157,16 @@ func createBenchmarkWorkflow(scope constructs.Construct, input CreateWorkflowInp
})

// Copy binary to EC2 instance, run benchmark tests and export results to S3
// Create a log group for the command execution
logGroupName := fmt.Sprintf("/aws/ssm/RunBenchmark-%s", *input.LaunchTemplateOutput.InstanceType.ToString())
// only permitted characters: [\.\-_/#A-Za-z0-9]+
logGroupName = regexp.MustCompile(`[^a-zA-Z0-9\.\-_]`).ReplaceAllString(logGroupName, "")
commandLogGroup := awslogs.NewLogGroup(scope, jsii.String("BenchmarkCommandLogGroup"+*input.LaunchTemplateOutput.InstanceType.ToString()), &awslogs.LogGroupProps{
LogGroupName: jsii.String(logGroupName),
Retention: awslogs.RetentionDays_THREE_DAYS,
RemovalPolicy: awscdk.RemovalPolicy_DESTROY,
})

runBenchmarkTask := awsstepfunctionstasks.NewCallAwsService(scope, jsii.String("RunBenchmark"+input.Id), &awsstepfunctionstasks.CallAwsServiceProps{
Service: jsii.String("ssm"),
Action: jsii.String("sendCommand"),
Expand All @@ -170,9 +182,12 @@ func createBenchmarkWorkflow(scope constructs.Construct, input CreateWorkflowInp
"InstanceIds": awsstepfunctions.JsonPath_Array(
awsstepfunctions.JsonPath_StringAt(jsii.String("$.ec2Instance.InstanceId")),
),
"DocumentName": jsii.String("AWS-RunShellScript"),
// 6 hours
"TimeoutSeconds": jsii.Number(6 * 60 * 60),
"DocumentName": jsii.String("AWS-RunShellScript"),
"TimeoutSeconds": jsii.Number(TotalTimeout.Seconds()),
"CloudWatchOutputConfig": map[string]interface{}{
"CloudWatchLogGroupName": commandLogGroup.LogGroupName(),
"CloudWatchOutputEnabled": true,
},
"Parameters": map[string]interface{}{
"executionTimeout": awsstepfunctions.JsonPath_Array(jsii.Sprintf("%d", 6*60*60)),
"commands": awsstepfunctions.JsonPath_Array(
Expand All @@ -185,8 +200,14 @@ func createBenchmarkWorkflow(scope constructs.Construct, input CreateWorkflowInp
// unzip the binary
jsii.String("unzip -o "+input.LaunchTemplateOutput.BenchmarkBinaryZipPath+" -d /home/ec2-user/benchmark"),
jsii.String("chmod +x /home/ec2-user/benchmark/benchmark"),
// export necessary environment variables
jsii.String("export RESULTS_PATH=/tmp/results.csv"),
jsii.String("export LOG_RESULTS=false"),
jsii.String("cleanupCmd=\\'docker rm -f kwil-testing-postgres || true\\'"),
// run the benchmark
jsii.String("/home/ec2-user/benchmark/benchmark"),
awsstepfunctions.JsonPath_Format(
jsii.String("RESULTS_PATH=/tmp/results.csv LOG_RESULTS=false /home/ec2-user/benchmark/benchmark && aws s3 cp /tmp/results.csv s3://{}/{}_{}.csv"),
jsii.String("aws s3 cp /tmp/results.csv s3://{}/{}_{}.csv"),
input.ResultsBucket.BucketName(),
awsstepfunctions.JsonPath_StringAt(jsii.String("$.timestamp")),
awsstepfunctions.JsonPath_StringAt(jsii.String("$.ec2Instance.InstanceType")),
Expand Down
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ require (
github.com/cenkalti/backoff/v4 v4.3.0
github.com/cockroachdb/apd/v3 v3.2.1
github.com/fbiville/markdown-table-formatter v0.3.0
github.com/kwilteam/kwil-db v0.8.4-0.20240827162722-08719d472804
github.com/kwilteam/kwil-db/core v0.2.2-0.20240827162722-08719d472804
github.com/kwilteam/kwil-db/parse v0.2.4-0.20240827162722-08719d472804
github.com/kwilteam/kwil-db v0.8.4-0.20240904172400-17170099ef0f
github.com/kwilteam/kwil-db/core v0.2.2-0.20240904172400-17170099ef0f
github.com/kwilteam/kwil-db/parse v0.2.4-0.20240904172400-17170099ef0f
github.com/mitchellh/mapstructure v1.5.0
github.com/stretchr/testify v1.9.0
github.com/truflation/tsn-sdk v0.1.1-0.20240820124358-fd3a36d9cc50
Expand Down
8 changes: 8 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -284,10 +284,18 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/kwilteam/kwil-db v0.8.4-0.20240827162722-08719d472804 h1:GBoiS7sHQrYLO/9okscY82y9l3zPCZtrFBmpPTUKLU4=
github.com/kwilteam/kwil-db v0.8.4-0.20240827162722-08719d472804/go.mod h1:WqwTWrgJBx3hCh22lY1J4IXOLcWuceZ0C6qdOdQMESw=
github.com/kwilteam/kwil-db v0.8.4-0.20240904172400-17170099ef0f h1:maBmp7YStx39+e7iHDPD/DmvaQ8gdGoT2sGhFXnU/LY=
github.com/kwilteam/kwil-db v0.8.4-0.20240904172400-17170099ef0f/go.mod h1:DM0bcq2oyAhuU8abNcJgcUQSMgQhgwod5kPPXr4zJjA=
github.com/kwilteam/kwil-db/core v0.2.2-0.20240827162722-08719d472804 h1:pMYi3JlFscYGAma1+xN7W1830Ld0dXoKB49Gc9bIYu4=
github.com/kwilteam/kwil-db/core v0.2.2-0.20240827162722-08719d472804/go.mod h1:IZX/X9cPUg1Ppet0MCsBV/Kot6JCikiITcahzSp2i3c=
github.com/kwilteam/kwil-db/core v0.2.2-0.20240904172400-17170099ef0f h1:FJz6PJUHzI7PxEMKvLqFkE6iU3lXeFoedxHv7vSDLEA=
github.com/kwilteam/kwil-db/core v0.2.2-0.20240904172400-17170099ef0f/go.mod h1:IZX/X9cPUg1Ppet0MCsBV/Kot6JCikiITcahzSp2i3c=
github.com/kwilteam/kwil-db/parse v0.2.4-0.20240827162722-08719d472804 h1:AQ7cyq75Py+V43RwIsmCkyeZOLVk0DkmDluhc5NgfFQ=
github.com/kwilteam/kwil-db/parse v0.2.4-0.20240827162722-08719d472804/go.mod h1:lA6Qv9z3XJbCeenUeokA+60OEvT5EKxp014t7/RUlTg=
github.com/kwilteam/kwil-db/parse v0.2.4-0.20240904172033-5314d63642c1 h1:ccL8Jo4JJYGWLp56vsu9pSfsMJdPglsjxCYTSCQxrwE=
github.com/kwilteam/kwil-db/parse v0.2.4-0.20240904172033-5314d63642c1/go.mod h1:lA6Qv9z3XJbCeenUeokA+60OEvT5EKxp014t7/RUlTg=
github.com/kwilteam/kwil-db/parse v0.2.4-0.20240904172400-17170099ef0f h1:ktvsIZ192c6DJAnpEgngL73YVnwuhV2+z8mz5RcYDtI=
github.com/kwilteam/kwil-db/parse v0.2.4-0.20240904172400-17170099ef0f/go.mod h1:lA6Qv9z3XJbCeenUeokA+60OEvT5EKxp014t7/RUlTg=
github.com/kwilteam/kwil-extensions v0.0.0-20230727040522-1cfd930226b7 h1:YiPBu0pOeYOtOVfwKQqdWB07SUef9LvngF4bVFD+x34=
github.com/kwilteam/kwil-extensions v0.0.0-20230727040522-1cfd930226b7/go.mod h1:+BrFrV+3qcdYIfptqjwatE5gT19azuRHJzw77wMPY8c=
github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc=
Expand Down
15 changes: 10 additions & 5 deletions internal/benchmark/benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ package benchmark
import (
"context"
"fmt"
"log"
"os"
"time"

"github.com/kwilteam/kwil-db/core/utils"
"github.com/pkg/errors"

kwilTesting "github.com/kwilteam/kwil-db/testing"
"github.com/truflation/tsn-db/internal/benchmark/trees"
Expand All @@ -21,7 +23,7 @@ func runBenchmark(ctx context.Context, platform *kwilTesting.Platform, c Benchma
Tree: tree,
})
if err != nil {
return nil, err
return nil, errors.Wrap(err, "failed to setup schemas")
}

for _, day := range c.Days {
Expand All @@ -34,7 +36,7 @@ func runBenchmark(ctx context.Context, platform *kwilTesting.Platform, c Benchma
Tree: tree,
})
if err != nil {
return nil, err
return nil, errors.Wrap(err, "failed to run single test")
}
results = append(results, result)
}
Expand Down Expand Up @@ -90,8 +92,10 @@ type RunBenchmarkInput struct {
Samples int
}

func getBenchmarkAndSaveFn(benchmarkCase BenchmarkCase, resultPath string) func(ctx context.Context, platform *kwilTesting.Platform) error {
// it returns a result channel to be accumulated by the caller
func getBenchmarFn(benchmarkCase BenchmarkCase, resultCh *chan []Result) func(ctx context.Context, platform *kwilTesting.Platform) error {
return func(ctx context.Context, platform *kwilTesting.Platform) error {
log.Println("running benchmark", benchmarkCase)
platform.Deployer = deployer.Bytes()

tree := trees.NewTree(trees.NewTreeInput{
Expand All @@ -106,14 +110,15 @@ func getBenchmarkAndSaveFn(benchmarkCase BenchmarkCase, resultPath string) func(

results, err := runBenchmark(ctx, platform, benchmarkCase, tree)
if err != nil {
return err
return errors.Wrap(err, "failed to run benchmark")
}

// if LOG_RESULTS is set, we print the results to the console
if os.Getenv("LOG_RESULTS") == "true" {
printResults(results)
}

return saveResults(results, resultPath)
*resultCh <- results
return nil
}
}
Loading

0 comments on commit 53dd668

Please sign in to comment.