Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: lower memory footprint #543

Closed
wants to merge 43 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
f976f31
Refactor: Improve schema setup and benchmark case handling
outerlook Aug 30, 2024
7ee6b2a
Increase state machine timeout and add comment in benchmark
outerlook Aug 30, 2024
f455cbe
Add depth check to prevent benchmarks from exceeding limits
outerlook Aug 31, 2024
8f3945e
Increase timeouts and adjust polling intervals
outerlook Aug 31, 2024
c804b27
Update execution timeout parameter in Step Functions
outerlook Aug 31, 2024
2ff01af
Merge branch 'fix/bench-timeout' into chore/idx-change-opt
outerlook Sep 2, 2024
f05db59
Optimize get_index_change to remove nested loop
outerlook Sep 2, 2024
1ce8393
Set default LOG_RESULTS to true and conditionally print results.
outerlook Sep 2, 2024
9c4705e
Add index change tests for contract validation
outerlook Sep 2, 2024
8e6fde2
Refactor error handling in benchmark workflow
outerlook Sep 2, 2024
17ab7ea
Remove unsupported micro instances from benchmark types
outerlook Sep 2, 2024
07fbeb3
Merge branch 'main' into chore/idx-change-opt
outerlook Sep 3, 2024
1ccd939
Optimize record insertion process
outerlook Sep 3, 2024
5160981
Add unit tests for NewTree function
outerlook Sep 3, 2024
8dbb1f0
fix null without types at streams template
outerlook Sep 3, 2024
b21b1f4
Merge branch 'chore/idx-change-opt' into fix/bench-err
outerlook Sep 3, 2024
2601f13
Merge branch 'main' into fix/bench-err
outerlook Sep 3, 2024
4b1b2f7
Refactor composed_stream_template procedures
outerlook Sep 3, 2024
0be4679
Assign default value to avoid null error in buffer handling
outerlook Sep 3, 2024
9774350
Merge branch 'main' into fix/unsuport-micro
outerlook Sep 3, 2024
a179725
Add os package import to benchmark.go
outerlook Sep 3, 2024
bd29a5f
Merge branch 'fix/unsuport-micro' into fix/bench-err
outerlook Sep 3, 2024
80b06de
Fix critical bugs and optimize get_raw_record and get_raw_index proce…
outerlook Sep 4, 2024
9e36aee
Simplify data composition logic
outerlook Sep 4, 2024
dba69fa
Refactor and optimize taxonomy processing
outerlook Sep 5, 2024
a99cc4a
Add CSV export alongside markdown in Export Results Lambda
outerlook Sep 5, 2024
85c0bb7
Wrap errors with more context for better debugging
outerlook Sep 5, 2024
8bb3ed3
Update kwil-db dependencies to latest versions
outerlook Sep 5, 2024
409bf35
Merge branch 'refs/heads/main' into fix/bench-err
outerlook Sep 5, 2024
8488d89
Add complex composed tests for contract validation
outerlook Sep 5, 2024
00f7438
Add ToDisplay method to Tree and a test for visualization
outerlook Sep 5, 2024
348fa20
Refactor load_test.go for improved stream and depth testing
outerlook Sep 5, 2024
f0588f4
Add CloudWatch log group for SSM command execution
outerlook Sep 6, 2024
59a6bc2
Retry benchmark execution up to 3 times upon failure
outerlook Sep 6, 2024
c25128b
Fix tree initialization for single stream scenario
outerlook Sep 6, 2024
7e63aa7
Chunk long-running tests to prevent Postgres timeouts
outerlook Sep 6, 2024
ffb277c
Remove retry logic from benchmark script
outerlook Sep 7, 2024
4bec6ec
Refactor benchmark functions and add results handling.
outerlook Sep 7, 2024
73f9a57
Add README for internal contracts directory
outerlook Sep 7, 2024
73d967a
Refactor timeout handling in benchmark state machine
outerlook Sep 7, 2024
908d12c
Parallelize schema parsing and batch metadata insertion
outerlook Sep 7, 2024
738592d
Disable 800 stream test cases due to memory issues
outerlook Sep 7, 2024
94b743b
Refactor schema parsing to use unbuffered channel
outerlook Sep 7, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions deployments/infra/stacks/benchmark/benchmark_stack.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ func BenchmarkStack(scope constructs.Construct, id string, props *awscdk.StackPr

// Add SSM managed policy
ec2InstanceRole.AddManagedPolicy(awsiam.ManagedPolicy_FromAwsManagedPolicyName(jsii.String("AmazonSSMManagedInstanceCore")))
// Add CloudWatch managed policy
ec2InstanceRole.AddManagedPolicy(awsiam.ManagedPolicy_FromAwsManagedPolicyName(jsii.String("CloudWatchAgentServerPolicy")))

// grant read permissions to the binary s3 asset
binaryS3Asset.GrantRead(ec2InstanceRole)
Expand Down
7 changes: 7 additions & 0 deletions deployments/infra/stacks/benchmark/constants.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package benchmark

import "time"

const (
TotalTimeout = 6 * time.Hour
)
57 changes: 27 additions & 30 deletions deployments/infra/stacks/benchmark/lambdas/exportresults/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import (

// -------------------------------------------------------------------------------------------------
// Export Results Lambda
// - takes a list of CSV files from the results bucket, merges them into a single file and saves them as a markdown file
// - the markdown file is then saved back to the results bucket
// - takes a list of CSV files from the results bucket, merges them into a single file and saves them as a markdown file and a CSV file
// - both files are then saved back to the results bucket
// - errors if there's no CSV files to process
// -------------------------------------------------------------------------------------------------

Expand All @@ -37,6 +37,7 @@ type Event struct {
}

const markdownFilePath = "/tmp/results.md"
const csvFilePath = "/tmp/results.csv"

func HandleRequest(ctx context.Context, event Event) error {
// delete if file exists. remember that lambdas can share the same filesystem accross multiple invocations1
Expand Down Expand Up @@ -123,46 +124,30 @@ func HandleRequest(ctx context.Context, event Event) error {
FilePath: markdownFilePath,
})

err = benchexport.SaveOrAppendToCSV(results, csvFilePath)
if err != nil {
log.Printf("Error processing file %s: %v", csvFile, err)
return err
}
}

log.Printf("Exporting results to s3://%s/%s.md", event.Bucket, event.KeyPrefix)

resultsKey := fmt.Sprintf("reports/%s.md", event.KeyPrefix)

// check if the file already exists
_, errExists := s3Client.HeadObject(&s3.HeadObjectInput{
Bucket: aws.String(event.Bucket),
Key: aws.String(resultsKey),
})

// we know the file doesn't exist if we receive an error. we won't try to get if it's 404 to be simple
if errExists != nil {
log.Printf("File already exists: %s", resultsKey)

// delete if it exists
log.Printf("Deleting file: %s", resultsKey)

_, err = s3Client.DeleteObject(&s3.DeleteObjectInput{
Bucket: aws.String(event.Bucket),
Key: aws.String(resultsKey),
})

if err != nil {
log.Printf("Error deleting file: %v", err)
log.Printf("Error processing file %s: %v", csvFile, err)
return err
}
}

log.Printf("Exporting results to s3://%s/%s.md", event.Bucket, event.KeyPrefix)

resultsKey := fmt.Sprintf("reports/%s.md", event.KeyPrefix)
csvResultsKey := fmt.Sprintf("reports/%s.csv", event.KeyPrefix)

mergedMdFile, err := os.ReadFile(markdownFilePath)
if err != nil {
log.Printf("Error uploading merged file: %v", err)
log.Printf("Error reading merged file: %v", err)
return err
}

mergedFile, err := os.ReadFile(markdownFilePath)
mergedCsvFile, err := os.ReadFile(csvFilePath)
if err != nil {
log.Printf("Error reading merged file: %v", err)
return err
Expand All @@ -172,12 +157,24 @@ func HandleRequest(ctx context.Context, event Event) error {
_, err = s3Client.PutObject(&s3.PutObjectInput{
Bucket: aws.String(event.Bucket),
Key: aws.String(resultsKey),
Body: bytes.NewReader(mergedFile),
Body: bytes.NewReader(mergedMdFile),
ContentType: aws.String("text/markdown"),
})

if err != nil {
log.Printf("Error uploading merged file: %v", err)
log.Printf("Error uploading markdown file: %v", err)
return err
}

_, err = s3Client.PutObject(&s3.PutObjectInput{
Bucket: aws.String(event.Bucket),
Key: aws.String(csvResultsKey),
Body: bytes.NewReader(mergedCsvFile),
ContentType: aws.String("text/csv"),
})

if err != nil {
log.Printf("Error uploading CSV file: %v", err)
return err
}

Expand Down
31 changes: 26 additions & 5 deletions deployments/infra/stacks/benchmark/step_functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ package benchmark

import (
"fmt"
"regexp"

"github.com/aws/aws-cdk-go/awscdk/v2"
"github.com/aws/aws-cdk-go/awscdk/v2/awsec2"
"github.com/aws/aws-cdk-go/awscdk/v2/awslambda"
"github.com/aws/aws-cdk-go/awscdk/v2/awslogs"
"github.com/aws/aws-cdk-go/awscdk/v2/awss3"
"github.com/aws/aws-cdk-go/awscdk/v2/awss3assets"
"github.com/aws/aws-cdk-go/awscdk/v2/awsstepfunctions"
Expand Down Expand Up @@ -104,7 +106,7 @@ func createStateMachine(scope constructs.Construct, input CreateStateMachineInpu

stateMachine := awsstepfunctions.NewStateMachine(scope, jsii.String("BenchmarkStateMachine"), &awsstepfunctions.StateMachineProps{
DefinitionBody: awsstepfunctions.DefinitionBody_FromChainable(benchmarkWorkflowChain),
Timeout: awscdk.Duration_Hours(jsii.Number(6)),
Timeout: awscdk.Duration_Hours(jsii.Number(TotalTimeout.Hours())),
// <stackname>-benchmark
StateMachineName: jsii.String(fmt.Sprintf("%s-benchmark", *awscdk.Aws_STACK_NAME())),
})
Expand Down Expand Up @@ -155,6 +157,16 @@ func createBenchmarkWorkflow(scope constructs.Construct, input CreateWorkflowInp
})

// Copy binary to EC2 instance, run benchmark tests and export results to S3
// Create a log group for the command execution
logGroupName := fmt.Sprintf("/aws/ssm/RunBenchmark-%s", *input.LaunchTemplateOutput.InstanceType.ToString())
// only permitted characters: [\.\-_/#A-Za-z0-9]+
logGroupName = regexp.MustCompile(`[^a-zA-Z0-9\.\-_]`).ReplaceAllString(logGroupName, "")
commandLogGroup := awslogs.NewLogGroup(scope, jsii.String("BenchmarkCommandLogGroup"+*input.LaunchTemplateOutput.InstanceType.ToString()), &awslogs.LogGroupProps{
LogGroupName: jsii.String(logGroupName),
Retention: awslogs.RetentionDays_THREE_DAYS,
RemovalPolicy: awscdk.RemovalPolicy_DESTROY,
})

runBenchmarkTask := awsstepfunctionstasks.NewCallAwsService(scope, jsii.String("RunBenchmark"+input.Id), &awsstepfunctionstasks.CallAwsServiceProps{
Service: jsii.String("ssm"),
Action: jsii.String("sendCommand"),
Expand All @@ -170,9 +182,12 @@ func createBenchmarkWorkflow(scope constructs.Construct, input CreateWorkflowInp
"InstanceIds": awsstepfunctions.JsonPath_Array(
awsstepfunctions.JsonPath_StringAt(jsii.String("$.ec2Instance.InstanceId")),
),
"DocumentName": jsii.String("AWS-RunShellScript"),
// 6 hours
"TimeoutSeconds": jsii.Number(6 * 60 * 60),
"DocumentName": jsii.String("AWS-RunShellScript"),
"TimeoutSeconds": jsii.Number(TotalTimeout.Seconds()),
"CloudWatchOutputConfig": map[string]interface{}{
"CloudWatchLogGroupName": commandLogGroup.LogGroupName(),
"CloudWatchOutputEnabled": true,
},
"Parameters": map[string]interface{}{
"executionTimeout": awsstepfunctions.JsonPath_Array(jsii.Sprintf("%d", 6*60*60)),
"commands": awsstepfunctions.JsonPath_Array(
Expand All @@ -185,8 +200,14 @@ func createBenchmarkWorkflow(scope constructs.Construct, input CreateWorkflowInp
// unzip the binary
jsii.String("unzip -o "+input.LaunchTemplateOutput.BenchmarkBinaryZipPath+" -d /home/ec2-user/benchmark"),
jsii.String("chmod +x /home/ec2-user/benchmark/benchmark"),
// export necessary environment variables
jsii.String("export RESULTS_PATH=/tmp/results.csv"),
jsii.String("export LOG_RESULTS=false"),
jsii.String("cleanupCmd=\\'docker rm -f kwil-testing-postgres || true\\'"),
// run the benchmark
jsii.String("/home/ec2-user/benchmark/benchmark"),
awsstepfunctions.JsonPath_Format(
jsii.String("RESULTS_PATH=/tmp/results.csv LOG_RESULTS=false /home/ec2-user/benchmark/benchmark && aws s3 cp /tmp/results.csv s3://{}/{}_{}.csv"),
jsii.String("aws s3 cp /tmp/results.csv s3://{}/{}_{}.csv"),
input.ResultsBucket.BucketName(),
awsstepfunctions.JsonPath_StringAt(jsii.String("$.timestamp")),
awsstepfunctions.JsonPath_StringAt(jsii.String("$.ec2Instance.InstanceType")),
Expand Down
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ require (
github.com/cenkalti/backoff/v4 v4.3.0
github.com/cockroachdb/apd/v3 v3.2.1
github.com/fbiville/markdown-table-formatter v0.3.0
github.com/kwilteam/kwil-db v0.8.4-0.20240827162722-08719d472804
github.com/kwilteam/kwil-db/core v0.2.2-0.20240827162722-08719d472804
github.com/kwilteam/kwil-db/parse v0.2.4-0.20240827162722-08719d472804
github.com/kwilteam/kwil-db v0.8.4-0.20240904172400-17170099ef0f
github.com/kwilteam/kwil-db/core v0.2.2-0.20240904172400-17170099ef0f
github.com/kwilteam/kwil-db/parse v0.2.4-0.20240904172400-17170099ef0f
github.com/mitchellh/mapstructure v1.5.0
github.com/stretchr/testify v1.9.0
github.com/truflation/tsn-sdk v0.1.1-0.20240820124358-fd3a36d9cc50
Expand Down
8 changes: 8 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -284,10 +284,18 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/kwilteam/kwil-db v0.8.4-0.20240827162722-08719d472804 h1:GBoiS7sHQrYLO/9okscY82y9l3zPCZtrFBmpPTUKLU4=
github.com/kwilteam/kwil-db v0.8.4-0.20240827162722-08719d472804/go.mod h1:WqwTWrgJBx3hCh22lY1J4IXOLcWuceZ0C6qdOdQMESw=
github.com/kwilteam/kwil-db v0.8.4-0.20240904172400-17170099ef0f h1:maBmp7YStx39+e7iHDPD/DmvaQ8gdGoT2sGhFXnU/LY=
github.com/kwilteam/kwil-db v0.8.4-0.20240904172400-17170099ef0f/go.mod h1:DM0bcq2oyAhuU8abNcJgcUQSMgQhgwod5kPPXr4zJjA=
github.com/kwilteam/kwil-db/core v0.2.2-0.20240827162722-08719d472804 h1:pMYi3JlFscYGAma1+xN7W1830Ld0dXoKB49Gc9bIYu4=
github.com/kwilteam/kwil-db/core v0.2.2-0.20240827162722-08719d472804/go.mod h1:IZX/X9cPUg1Ppet0MCsBV/Kot6JCikiITcahzSp2i3c=
github.com/kwilteam/kwil-db/core v0.2.2-0.20240904172400-17170099ef0f h1:FJz6PJUHzI7PxEMKvLqFkE6iU3lXeFoedxHv7vSDLEA=
github.com/kwilteam/kwil-db/core v0.2.2-0.20240904172400-17170099ef0f/go.mod h1:IZX/X9cPUg1Ppet0MCsBV/Kot6JCikiITcahzSp2i3c=
github.com/kwilteam/kwil-db/parse v0.2.4-0.20240827162722-08719d472804 h1:AQ7cyq75Py+V43RwIsmCkyeZOLVk0DkmDluhc5NgfFQ=
github.com/kwilteam/kwil-db/parse v0.2.4-0.20240827162722-08719d472804/go.mod h1:lA6Qv9z3XJbCeenUeokA+60OEvT5EKxp014t7/RUlTg=
github.com/kwilteam/kwil-db/parse v0.2.4-0.20240904172033-5314d63642c1 h1:ccL8Jo4JJYGWLp56vsu9pSfsMJdPglsjxCYTSCQxrwE=
github.com/kwilteam/kwil-db/parse v0.2.4-0.20240904172033-5314d63642c1/go.mod h1:lA6Qv9z3XJbCeenUeokA+60OEvT5EKxp014t7/RUlTg=
github.com/kwilteam/kwil-db/parse v0.2.4-0.20240904172400-17170099ef0f h1:ktvsIZ192c6DJAnpEgngL73YVnwuhV2+z8mz5RcYDtI=
github.com/kwilteam/kwil-db/parse v0.2.4-0.20240904172400-17170099ef0f/go.mod h1:lA6Qv9z3XJbCeenUeokA+60OEvT5EKxp014t7/RUlTg=
github.com/kwilteam/kwil-extensions v0.0.0-20230727040522-1cfd930226b7 h1:YiPBu0pOeYOtOVfwKQqdWB07SUef9LvngF4bVFD+x34=
github.com/kwilteam/kwil-extensions v0.0.0-20230727040522-1cfd930226b7/go.mod h1:+BrFrV+3qcdYIfptqjwatE5gT19azuRHJzw77wMPY8c=
github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc=
Expand Down
15 changes: 10 additions & 5 deletions internal/benchmark/benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ package benchmark
import (
"context"
"fmt"
"log"
"os"
"time"

"github.com/kwilteam/kwil-db/core/utils"
"github.com/pkg/errors"

kwilTesting "github.com/kwilteam/kwil-db/testing"
"github.com/truflation/tsn-db/internal/benchmark/trees"
Expand All @@ -21,7 +23,7 @@ func runBenchmark(ctx context.Context, platform *kwilTesting.Platform, c Benchma
Tree: tree,
})
if err != nil {
return nil, err
return nil, errors.Wrap(err, "failed to setup schemas")
}

for _, day := range c.Days {
Expand All @@ -34,7 +36,7 @@ func runBenchmark(ctx context.Context, platform *kwilTesting.Platform, c Benchma
Tree: tree,
})
if err != nil {
return nil, err
return nil, errors.Wrap(err, "failed to run single test")
}
results = append(results, result)
}
Expand Down Expand Up @@ -90,8 +92,10 @@ type RunBenchmarkInput struct {
Samples int
}

func getBenchmarkAndSaveFn(benchmarkCase BenchmarkCase, resultPath string) func(ctx context.Context, platform *kwilTesting.Platform) error {
// it returns a result channel to be accumulated by the caller
func getBenchmarFn(benchmarkCase BenchmarkCase, resultCh *chan []Result) func(ctx context.Context, platform *kwilTesting.Platform) error {
return func(ctx context.Context, platform *kwilTesting.Platform) error {
log.Println("running benchmark", benchmarkCase)
platform.Deployer = deployer.Bytes()

tree := trees.NewTree(trees.NewTreeInput{
Expand All @@ -106,14 +110,15 @@ func getBenchmarkAndSaveFn(benchmarkCase BenchmarkCase, resultPath string) func(

results, err := runBenchmark(ctx, platform, benchmarkCase, tree)
if err != nil {
return err
return errors.Wrap(err, "failed to run benchmark")
}

// if LOG_RESULTS is set, we print the results to the console
if os.Getenv("LOG_RESULTS") == "true" {
printResults(results)
}

return saveResults(results, resultPath)
*resultCh <- results
return nil
}
}
Loading