Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce CrateDB to the benchmark [draft] #29

Open
wants to merge 16 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ ELASTIC_AUTH="ApiKey xxx" ELASTIC_URL=https://... ELASTIC_INDEX=index_name WPS=1

# Send data to Snowflake and report data latency
SNOWFLAKE_ACCOUNT=xxxx SNOWFLAKE_USER=xxxx SNOWFLAKE_PASSWORD=xxxx SNOWFLAKE_WAREHOUSE=xxxx SNOWFLAKE_DATABASE=xxxx SNOWFLAKE_STAGES3BUCKETNAME=xxxx AWS_REGION=xxxx WPS=1 BATCH_SIZE=50 TRACK_LATENCY=true DESTINATION=Snowflake ./rockbench

# Send data to CrateDB and report data latency
CRATEDB_URI="postgres://crate:@localhost:5432/test?pool_max_conns=10&pool_min_conns=3" WPS=1 BATCH_SIZE=50 DESTINATION=CrateDB TRACK_LATENCY=true ./rockbench
```

- To run with Docker container
Expand Down Expand Up @@ -96,3 +99,13 @@ two methods:

Once the new source is implemented, handle it
in [main.go](https://github.com/rockset/rockbench/blob/master/generator/main.go).

## Development

```
# Run docker compose for local development
docker compose -f dev/compose.yml up -d

To test CrateDB part run:
CRATEDB_URI="postgres://crate:@localhost:5432/test?pool_max_conns=10&pool_min_conns=3" go test ./...
```
45 changes: 45 additions & 0 deletions dev/compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
services:
opensearch:
container_name: "${LOCALSTACK_DOCKER_NAME-opensearch}"
image: opensearchproject/opensearch:2.10.0
environment:
- node.name=opensearch
- cluster.name=opensearch-docker-cluster
- discovery.type=single-node
- bootstrap.memory_lock=true
- "OPENSEARCH_JAVA_OPTS=-Xms512m -Xmx512m"
- "DISABLE_SECURITY_PLUGIN=true"
ports:
- 9200:9200 # REST API
- 9600:9600 # Performance Analyzer
ulimits:
memlock:
soft: -1
hard: -1
volumes:
- data01:/usr/share/opensearch/data

cratedb:
image: crate:latest
ports:
- "4200:4200"
- "5432:5432"
volumes:
- /tmp/crate/00:/data
command: ["crate",
"-Cnetwork.host=_site_"]
deploy:
replicas: 1
restart_policy:
condition: on-failure
ulimits:
memlock:
soft: -1
hard: -1
environment:
- CRATE_HEAP_SIZE=2g

volumes:
data01:
name: tmp
driver: local
20 changes: 20 additions & 0 deletions dev/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#!/bin/bash

cwd=$(dirname "$0")
project_root=$(dirname "$cwd")

docker compose -f $cwd/compose.yml up -d
trap "docker compose -f $cwd/compose.yml down" EXIT

echo "Build rockbench"
go build .


echo "Wait for 15s for compose containers to start..."
sleep 15

echo "Benchmarking Elastic"
ELASTIC_AUTH="" ELASTIC_URL=http://localhost:9200 ELASTIC_INDEX=index_name WPS=1 BATCH_SIZE=50 DESTINATION=Elastic TRACK_LATENCY=true ./rockbench

Echo "Benchmarking CrateDB"
CRATEDB_URI="postgres://crate:@localhost:5432/test?pool_max_conns=10&pool_min_conns=3" WPS=1 BATCH_SIZE=50 DESTINATION=CrateDB TRACK_LATENCY=true ./rockbench
131 changes: 131 additions & 0 deletions generator/cratedb.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package generator

import (
"context"
"encoding/json"
"fmt"
"github.com/jackc/pgx/v5"
"time"

"github.com/jackc/pgx/v5/pgxpool"
)

func NewCrateDB(ctx context.Context, url string) (*CrateDB, error) {
conn, err := pgxpool.New(ctx, url)
if err != nil {
return nil, fmt.Errorf("cratedb:NewCrateDB: pool; %v", err)
}

conn2, err := pgx.Connect(ctx, url)
if err != nil {
return nil, fmt.Errorf("cratedb:NewCrateDB: single; %v", err)
}

return &CrateDB{
conn: conn,
conn2: conn2,
}, nil
}

var _ Destination = (*CrateDB)(nil)

type ma = map[string]any

// CrateDB contains all configurations needed to send documents to CrateDB
type CrateDB struct {
conn *pgxpool.Pool
conn2 *pgx.Conn
}

func (c *CrateDB) Init(ctx context.Context) error {
table := `CREATE TABLE IF NOT EXISTS test (doc OBJECT)`

_, err := c.conn.Exec(ctx, table)
if err != nil {
return fmt.Errorf("cratedb:Init: %v", err)
}

return nil
}

func (c *CrateDB) Reset(ctx context.Context) error {
_, err := c.conn.Exec(ctx, "DROP TABLE IF EXISTS test")
if err != nil {
return fmt.Errorf("cratedb:Reset: %v", err)
}

return nil
}

func (c *CrateDB) SendDocument(docs []any) error {
b := &pgx.Batch{}
for _, doc := range docs {
insert := `INSERT INTO test (doc) VALUES ($1);`

//doc.(ma)["event_time"] = doc.(ma)["_event_time"]
//doc.(ma)["ts"] = doc.(ma)["_ts"]
//doc.(ma)["id"] = doc.(ma)["_id"]

data, err := json.Marshal(doc)
if err != nil {
return fmt.Errorf("cratedb:SendDocument: %v", err)
}
b.Queue(insert, data)
}

br := c.conn.SendBatch(context.Background(), b)
_, err := br.Exec()
if err != nil {
return fmt.Errorf("cratedb:SendDocument: exec; %v", err)
}

err = br.Close()
if err != nil {
return fmt.Errorf("cratedb:SendDocument: close; %v", err)
}

return nil
}

func (c *CrateDB) SendPatch(docs []any) error {
//TODO implement me
panic("implement me")
}

func (c *CrateDB) GetLatestTimestamp() (time.Time, error) {
ctx := context.Background()
_, err := c.conn.Exec(ctx, `REFRESH TABLE test;`)
if err != nil {
return time.Time{}, fmt.Errorf("cratedb:GetLatestTimestamp: %v", err)
}

query := `SELECT MAX(doc['_event_time'])::TIMESTAMP FROM test;`

var ts time.Time
rows, err := c.conn.Query(ctx, query)
if err != nil {
return time.Time{}, fmt.Errorf("cratedb:GetLatestTimestamp: query; %v", err)
}
if !rows.Next() {
return time.Time{}, fmt.Errorf("cratedb:GetLatestTimestamp: no rows")
}
err = rows.Scan(&ts)
if err != nil {
return time.Time{}, fmt.Errorf("cratedb:GetLatestTimestamp: scan; %v", err)
}
rows.Close()

tsv := ts.UnixMicro()

return time.Unix(0, tsv), nil
}

func (c *CrateDB) ConfigureDestination() error {
//TODO implement me
panic("implement me")
}

func (c *CrateDB) Close(ctx context.Context) error {
c.conn.Close()
return nil
}
124 changes: 124 additions & 0 deletions generator/cratedb_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package generator

import (
"context"
"os"
"testing"
"time"
)

func crateDbDocs() ([]any, error) {
spec := DocumentSpec{
Destination: "elastic",
GeneratorIdentifier: "1",
BatchSize: 10,
Mode: "add",
IdMode: "sequential",
UpdatePercentage: -1,
NumClusters: -1,
HotClusterPercentage: -1,
}

docs, err := GenerateDocs(spec)
return docs, err
}

func findMaxTimestamp(rows []any) int64 {
var maxTimestamp int64
for _, row := range rows {
timestamp := row.(ma)["_event_time"].(int64)
if timestamp > maxTimestamp {
maxTimestamp = timestamp
}
}

return maxTimestamp
}

func TestTimestampsConversion(t *testing.T) {
var given int64 = 1720004969682695

t1 := time.Unix(given/1_000_000, (given%1_000_000)*1_000)
back := t1.UnixMicro()

t2 := time.Unix(back/1_000_000, (back%1_000_000)*1_000)
output := t2.UnixMicro()

if given != output {
t.Fatalf("timestamps do not match: given=%d, output=%d", given, output)
}
}

func TestNewCrateDB(t *testing.T) {
uri := os.Getenv("CRATEDB_URI")
if uri == "" {
t.Skipf(`
CRATEDB_URI not set.

To run this test, make sure you run:
docker compose -f dev/compose.yml up -d

And then run:
CRATEDB_URI="postgres://crate:@localhost:5432/test?pool_max_conns=10&pool_min_conns=3" go test ./...

`)
}

t.Logf("CRATEDB_URI: %s", uri)

c, err := NewCrateDB(context.Background(), uri)
if err != nil {
t.Fatalf("NewCrateDB() error = %v", err)
}

if c == nil {
t.Fatalf("NewCrateDB() = nil")
}

err = c.Reset(context.Background())
if err != nil {
t.Fatalf("CrateDB.Reset() error = %v", err)
}

err = c.Init(context.Background())
if err != nil {
t.Fatalf("CrateDB.Init() error = %v", err)
}

docs, err := crateDbDocs()
if err != nil {
t.Fatalf("crateDbDocs() error = %v", err)
}

err = c.SendDocument(docs)
if err != nil {
t.Fatalf("CrateDB.SendDocument() error = %v", err)
}

maxTimestamp := findMaxTimestamp(docs)

timestamp, err := c.GetLatestTimestamp()
if err != nil {
t.Fatalf("CrateDB.GetLatestTimestamp() error = %v", err)
}

delta := maxTimestamp - timestamp.UnixMicro()
epsilon := int64(100)
if delta > epsilon {
t.Errorf("CrateDB.GetLatestTimestamp() delta is %d, cannot be bigger than %d", delta, epsilon)
}

t.Logf("CrateDB.GetLatestTimestamp() = %s", timestamp.String())
t.Logf(" timestamp = %d", timestamp.UnixMicro())
t.Logf("maxTimestamp = %d", maxTimestamp)

err = c.Reset(context.Background())
if err != nil {
t.Fatalf("CrateDB.Reset() error = %v", err)
}

err = c.Close(context.Background())
if err != nil {
t.Fatalf("CrateDB.Close() error = %v", err)
}
}
Loading