Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[stability] Survive vips sigabrt #75

Merged
merged 4 commits into from
Jan 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ jobs:
run: pre-commit run --all-files

- name: Build
run: cd cmd && go build -v main.go
run: cd cmd/filesystem && go build -v main.go

- name: Test
env:
Expand Down
90 changes: 90 additions & 0 deletions cmd/db/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package main
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a binary to stress test the lib in the vectorDB path


import (
datago "datago/pkg"
"flag"
"fmt"
"os"
"runtime/pprof"
"runtime/trace"
"time"
)

func main() {

cropAndResize := flag.Bool("crop_and_resize", false, "Whether to crop and resize the images and masks")
itemFetchBuffer := flag.Int("item_fetch_buffer", 256, "The number of items to pre-load")
itemReadyBuffer := flag.Int("item_ready_buffer", 128, "The number of items ready to be served")
limit := flag.Int("limit", 2000, "The number of items to fetch")
profile := flag.Bool("profile", false, "Whether to profile the code")
source := flag.String("source", os.Getenv("DATAGO_TEST_DB"), "The data source to select on")

// Parse the flags before setting the configuration values
flag.Parse()

// Initialize the configuration
config := datago.GetDatagoConfig()

sourceConfig := datago.GetDefaultSourceDBConfig()
sourceConfig.Sources = *source

config.ImageConfig = datago.GetDefaultImageTransformConfig()
config.ImageConfig.CropAndResize = *cropAndResize

config.SourceConfig = sourceConfig
config.PrefetchBufferSize = int32(*itemFetchBuffer)
config.SamplesBufferSize = int32(*itemReadyBuffer)
config.Limit = *limit

dataroom_client := datago.GetClient(config)

// Go-routine which will feed the sample data to the workers
// and fetch the next page
startTime := time.Now() // Record the start time

if *profile {
fmt.Println("Profiling the code")
{
f, _ := os.Create("trace.out")
// read with go tool trace trace.out

err := trace.Start(f)
if err != nil {
panic(err)
}
defer trace.Stop()
}
{
f, _ := os.Create("cpu.prof")
// read with go tool pprof cpu.prof
err := pprof.StartCPUProfile(f)
if err != nil {
panic(err)
}
defer pprof.StopCPUProfile()
}
}

dataroom_client.Start()

// Fetch all of the binary payloads as they become available
// NOTE: This is useless, just making sure that we empty the payloads channel
n_samples := 0
for {
sample := dataroom_client.GetSample()
if sample.ID == "" {
fmt.Println("No more samples")
break
}
n_samples++
}

// Cancel the context to kill the goroutines
dataroom_client.Stop()

// Calculate the elapsed time
elapsedTime := time.Since(startTime)
fps := float64(config.Limit) / elapsedTime.Seconds()
fmt.Printf("Total execution time: %.2f seconds. Samples %d \n", elapsedTime.Seconds(), n_samples)
fmt.Printf("Average throughput: %.2f samples per second \n", fps)
}
8 changes: 3 additions & 5 deletions cmd/main.go → cmd/filesystem/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,9 @@ func main() {
sourceConfig.Rank = 0
sourceConfig.WorldSize = 1

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a binary to stress test the lib in the filesystem path

config.ImageConfig = datago.ImageTransformConfig{
DefaultImageSize: 1024,
DownsamplingRatio: 32,
CropAndResize: *cropAndResize,
}
config.ImageConfig = datago.GetDefaultImageTransformConfig()
config.ImageConfig.CropAndResize = *cropAndResize

config.SourceConfig = sourceConfig
config.PrefetchBufferSize = int32(*itemFetchBuffer)
config.SamplesBufferSize = int32(*itemReadyBuffer)
Expand Down
14 changes: 8 additions & 6 deletions pkg/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ func (c *ImageTransformConfig) setDefaults() {
c.PreEncodeImages = false
}

func GetDefaultImageTransformConfig() ImageTransformConfig {
config := ImageTransformConfig{}
config.setDefaults()
return config
}

// DatagoConfig is the main configuration structure for the datago client
type DatagoConfig struct {
SourceType DatagoSourceType `json:"source_type"`
Expand Down Expand Up @@ -161,12 +167,8 @@ type DatagoClient struct {

// GetClient is a constructor for the DatagoClient, given a JSON configuration string
func GetClient(config DatagoConfig) *DatagoClient {
// Make sure that the GC is run more often than usual
// VIPS will allocate a lot of memory and we want to make sure that it's released as soon as possible
os.Setenv("GOGC", "10") // Default is 100, we're running it when heap is 10% larger than the last GC
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this was ugly, not required when we properly release the buffers (and it doesn't crash on us)


// Initialize the vips library
err := os.Setenv("VIPS_DISC_THRESHOLD", "5g")
err := os.Setenv("VIPS_DISC_THRESHOLD", "10g")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in other terms: never go do disk. Could be changed depending on where this runs, 2TB servers for Photoroom so ram is fine

if err != nil {
log.Panicf("Error setting VIPS_DISC_THRESHOLD: %v", err)
}
Expand Down Expand Up @@ -255,7 +257,7 @@ func (c *DatagoClient) Start() {
if c.imageConfig.CropAndResize {
fmt.Println("Cropping and resizing images")
fmt.Println("Base image size | downsampling ratio | min | max:", c.imageConfig.DefaultImageSize, c.imageConfig.DownsamplingRatio, c.imageConfig.MinAspectRatio, c.imageConfig.MaxAspectRatio)
arAwareTransform = newARAwareTransform(c.imageConfig)
arAwareTransform = GetArAwareTransform(c.imageConfig)
}

if c.imageConfig.PreEncodeImages {
Expand Down
12 changes: 6 additions & 6 deletions pkg/generator_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,12 @@ func (c *SourceDBConfig) setDefaults() {
c.DuplicateState = -1
}

func GetDefaultSourceDBConfig() SourceDBConfig {
config := SourceDBConfig{}
config.setDefaults()
return config
}

func (c *SourceDBConfig) getDbRequest() dbRequest {

fields := "attributes,image_direct_url,source"
Expand Down Expand Up @@ -210,12 +216,6 @@ func (c *SourceDBConfig) getDbRequest() dbRequest {
}
}

func GetSourceDBConfig() SourceDBConfig {
config := SourceDBConfig{}
config.setDefaults()
return config
}

type datagoGeneratorDB struct {
baseRequest http.Request
config SourceDBConfig
Expand Down
2 changes: 1 addition & 1 deletion pkg/generator_filesystem.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func (c *SourceFileSystemConfig) setDefaults() {
c.RootPath = os.Getenv("DATAGO_TEST_FILESYSTEM")
}

func GetSourceFileSystemConfig() SourceFileSystemConfig {
func GetDefaultSourceFileSystemConfig() SourceFileSystemConfig {
config := SourceFileSystemConfig{}
config.setDefaults()
return config
Expand Down
Loading