From bb07c90d41bb9009d129943ed43df51059a0fa5f Mon Sep 17 00:00:00 2001 From: widmogrod Date: Tue, 2 Jul 2024 20:00:45 +0200 Subject: [PATCH 01/16] working draft --- dev/compose.yml | 97 +++++++++++++ dev/run.sh | 13 ++ generator/cratedb.go | 294 ++++++++++++++++++++++++++++++++++++++ generator/cratedb_test.go | 64 +++++++++ go.mod | 16 ++- go.sum | 32 +++-- main.go | 24 ++++ 7 files changed, 526 insertions(+), 14 deletions(-) create mode 100644 dev/compose.yml create mode 100755 dev/run.sh create mode 100644 generator/cratedb.go create mode 100644 generator/cratedb_test.go diff --git a/dev/compose.yml b/dev/compose.yml new file mode 100644 index 0000000..def779f --- /dev/null +++ b/dev/compose.yml @@ -0,0 +1,97 @@ +services: + opensearch: + container_name: "${LOCALSTACK_DOCKER_NAME-opensearch}" + image: opensearchproject/opensearch:2.10.0 + environment: + - node.name=opensearch + - cluster.name=opensearch-docker-cluster + - discovery.type=single-node + - bootstrap.memory_lock=true + - "OPENSEARCH_JAVA_OPTS=-Xms512m -Xmx512m" + - "DISABLE_SECURITY_PLUGIN=true" + ports: + - 9200:9200 # REST API + - 9600:9600 # Performance Analyzer + ulimits: + memlock: + soft: -1 + hard: -1 + volumes: + - data01:/usr/share/opensearch/data + + cratedb: + image: crate:latest + ports: + - "4200:4200" + - "5432:5432" + volumes: + - /tmp/crate/00:/data + command: ["crate", +# "-Ccluster.name=crate-docker-cluster", +# "-Cnode.name=cratedb01", +# "-Cnode.data=true", + "-Cnetwork.host=_site_", +# "-Cdiscovery.seed_hosts=cratedb02,cratedb03", +# "-Ccluster.initial_master_nodes=cratedb01,cratedb02,cratedb03", +# "-Cgateway.expected_data_nodes=3", +# "-Cgateway.recover_after_data_nodes=2" + ] + deploy: + replicas: 1 + restart_policy: + condition: on-failure + ulimits: + memlock: + soft: -1 + hard: -1 + environment: + - CRATE_HEAP_SIZE=2g +# +# cratedb02: +# image: crate:latest +# ports: +# - "4202:4200" +# volumes: +# - /tmp/crate/02:/data +# command: ["crate", +# "-Ccluster.name=crate-docker-cluster", +# "-Cnode.name=cratedb02", +# "-Cnode.data=true", +# "-Cnetwork.host=_site_", +# "-Cdiscovery.seed_hosts=cratedb01,cratedb03", +# "-Ccluster.initial_master_nodes=cratedb01,cratedb02,cratedb03", +# "-Cgateway.expected_data_nodes=3", +# "-Cgateway.recover_after_data_nodes=2"] +# deploy: +# replicas: 1 +# restart_policy: +# condition: on-failure +# environment: +# - CRATE_HEAP_SIZE=2g +# +# cratedb03: +# image: crate:latest +# ports: +# - "4203:4200" +# volumes: +# - /tmp/crate/03:/data +# command: ["crate", +# "-Ccluster.name=crate-docker-cluster", +# "-Cnode.name=cratedb03", +# "-Cnode.data=true", +# "-Cnetwork.host=_site_", +# "-Cdiscovery.seed_hosts=cratedb01,cratedb02", +# "-Ccluster.initial_master_nodes=cratedb01,cratedb02,cratedb03", +# "-Cgateway.expected_data_nodes=3", +# "-Cgateway.recover_after_data_nodes=2"] +# deploy: +# replicas: 1 +# restart_policy: +# condition: on-failure +# environment: +# - CRATE_HEAP_SIZE=2g + +volumes: + data01: + name: tmp + driver: local diff --git a/dev/run.sh b/dev/run.sh new file mode 100755 index 0000000..104289c --- /dev/null +++ b/dev/run.sh @@ -0,0 +1,13 @@ +#!/bin/bash + +cwd=$(dirname "$0") +project_root=$(dirname "$cwd") + +#docker compose -f $cwd/compose.yml up -d +#trap "docker compose -f $cwd/compose.yml down" EXIT + +#echo "Benchmarking Elastic" +#ELASTIC_AUTH="" ELASTIC_URL=http://localhost:9200 ELASTIC_INDEX=index_name WPS=1 BATCH_SIZE=50 DESTINATION=Elastic TRACK_LATENCY=true ./rockbench + +Echo "Benchmarking CrateDB" +CRATEDB_URI="postgres://crate:@localhost:5432/test" WPS=1 BATCH_SIZE=50 DESTINATION=CrateDB TRACK_LATENCY=true ./rockbench \ No newline at end of file diff --git a/generator/cratedb.go b/generator/cratedb.go new file mode 100644 index 0000000..7e16cc4 --- /dev/null +++ b/generator/cratedb.go @@ -0,0 +1,294 @@ +package generator + +import ( + "context" + "fmt" + "github.com/jackc/pgx/v5" + "time" + + "github.com/jackc/pgx/v5/pgxpool" +) + +func NewCrateDB(ctx context.Context, url string) (*CrateDB, error) { + conn, err := pgxpool.New(ctx, url) + if err != nil { + return nil, fmt.Errorf("cratedb:NewCrateDB: %v", err) + } + + return &CrateDB{ + conn: conn, + }, nil +} + +var _ Destination = (*CrateDB)(nil) + +type ma = map[string]any + +// CrateDB contains all configurations needed to send documents to CrateDB +type CrateDB struct { + conn *pgxpool.Pool +} + +func (c CrateDB) Init(ctx context.Context) error { + table := `CREATE TABLE IF NOT EXISTS test ( + About STRING, + + Address OBJECT(STRICT) AS ( + City STRING, + Street STRING, + ZipCode INTEGER, + Coordinates OBJECT(STRICT) AS ( + Latitude REAL, + Longitude REAL + ) + ), + + Age INTEGER, + Balance REAL, + Company STRING, + Email STRING, + Friends OBJECT(STRICT) AS ( + Friend1 OBJECT(STRICT) AS ( + Name OBJECT(STRICT) AS ( + "First" STRING, + "Last" STRING + ), + Age SMALLINT + ), + Friend2 OBJECT(STRICT) AS ( + Name OBJECT(STRICT) AS ( + "First" STRING, + "Last" STRING + ), + Age SMALLINT + ), + Friend3 OBJECT(STRICT) AS ( + Name OBJECT(STRICT) AS ( + "First" STRING, + "Last" STRING + ), + Age SMALLINT + ), + Friend4 OBJECT(STRICT) AS ( + Name OBJECT(STRICT) AS ( + "First" STRING, + "Last" STRING + ), + Age SMALLINT + ), + Friend5 OBJECT(STRICT) AS ( + Name OBJECT(STRICT) AS ( + "First" STRING, + "Last" STRING + ), + Age SMALLINT + ) + ), + Greeting STRING, + Guid STRING, + IsActive BOOLEAN, + Name OBJECT(STRICT) AS ( + "First" STRING, + "Last" STRING + ), + Phone STRING, + Picture STRING, + Registered STRING, + Tags ARRAY(STRING), + event_time TIMESTAMP WITHOUT TIME ZONE, + id STRING, + ts TIMESTAMP WITHOUT TIME ZONE, + generator_identifier STRING + ) + ` + + _, err := c.conn.Exec(ctx, table) + if err != nil { + return fmt.Errorf("cratedb:Init: %v", err) + } + + return nil +} + +func (c CrateDB) Reset(ctx context.Context) error { + _, err := c.conn.Exec(ctx, "DROP TABLE IF EXISTS test") + if err != nil { + return fmt.Errorf("cratedb:Reset: %v", err) + } + + return nil +} + +func (c CrateDB) SendDocument(docs []any) error { + b := &pgx.Batch{} + for _, doc := range docs { + insert := `INSERT INTO test ( + About, + Address, + Age, + Balance, + Company, + Email, + Friends, + Greeting, + Guid, + IsActive, + Name, + Phone, + Picture, + Registered, + Tags, + event_time, + id, + ts, + generator_identifier + ) VALUES ( + $1, + { + City = $2, + Coordinates = { + Latitude = $3::REAL, + Longitude = $4::REAL + }, + Street = $5, + ZipCode = $6::INTEGER + }, + $7::INTEGER, + $8::REAL, + $9, + $10, + { + Friend1 = { + Name = { + "First" = $11, + "Last" = $12 + }, + Age = $13::SMALLINT + }, + Friend2 = { + Name = { + "First" = $14, + "Last" = $15 + }, + Age = $16::SMALLINT + }, + Friend3 = { + Name = { + "First" = $17, + "Last" = $18 + }, + Age = $19::SMALLINT + }, + Friend4 = { + Name = { + "First" = $20, + "Last" = $21 + }, + Age = $22::SMALLINT + }, + Friend5 = { + Name = { + "First" = $23, + "Last" = $24 + }, + Age = $25::SMALLINT + } + }, + $26, + $27, + $28::BOOLEAN, + { + "First" = $29, + "Last" = $30 + }, + $31, + $32, + $33, + $34, + $35, + $36, + $37, + $38 + )` + + b.Queue(insert, + doc.(ma)["About"], + doc.(ma)["Address"].(ma)["City"], + doc.(ma)["Address"].(ma)["Coordinates"].(ma)["Latitude"], + doc.(ma)["Address"].(ma)["Coordinates"].(ma)["Longitude"], + doc.(ma)["Address"].(ma)["Street"], + doc.(ma)["Address"].(ma)["ZipCode"], + doc.(ma)["Age"], + doc.(ma)["Balance"], + doc.(ma)["Company"], + doc.(ma)["Email"], + doc.(ma)["Friends"].(ma)["Friend1"].(ma)["Name"].(ma)["First"], + doc.(ma)["Friends"].(ma)["Friend1"].(ma)["Name"].(ma)["Last"], + doc.(ma)["Friends"].(ma)["Friend1"].(ma)["Age"], + doc.(ma)["Friends"].(ma)["Friend2"].(ma)["Name"].(ma)["First"], + doc.(ma)["Friends"].(ma)["Friend2"].(ma)["Name"].(ma)["Last"], + doc.(ma)["Friends"].(ma)["Friend2"].(ma)["Age"], + doc.(ma)["Friends"].(ma)["Friend3"].(ma)["Name"].(ma)["First"], + doc.(ma)["Friends"].(ma)["Friend3"].(ma)["Name"].(ma)["Last"], + doc.(ma)["Friends"].(ma)["Friend3"].(ma)["Age"], + doc.(ma)["Friends"].(ma)["Friend4"].(ma)["Name"].(ma)["First"], + doc.(ma)["Friends"].(ma)["Friend4"].(ma)["Name"].(ma)["Last"], + doc.(ma)["Friends"].(ma)["Friend4"].(ma)["Age"], + doc.(ma)["Friends"].(ma)["Friend5"].(ma)["Name"].(ma)["First"], + doc.(ma)["Friends"].(ma)["Friend5"].(ma)["Name"].(ma)["Last"], + doc.(ma)["Friends"].(ma)["Friend5"].(ma)["Age"], + doc.(ma)["Greeting"], + doc.(ma)["Guid"], + doc.(ma)["IsActive"], + doc.(ma)["Name"].(ma)["First"], + doc.(ma)["Name"].(ma)["Last"], + doc.(ma)["Phone"], + doc.(ma)["Picture"], + doc.(ma)["Registered"], + doc.(ma)["Tags"], + time.Unix(0, doc.(ma)["_event_time"].(int64)).Format(time.RFC3339Nano), + doc.(ma)["_id"], + time.Unix(0, doc.(ma)["_ts"].(int64)).Format(time.RFC3339Nano), + doc.(ma)["generator_identifier"], + ) + } + + results := c.conn.SendBatch(context.Background(), b) + _, err := results.Exec() + if err != nil { + return fmt.Errorf("cratedb:SendDocument: %v", err) + } + + err = results.Close() + if err != nil { + return fmt.Errorf("cratedb:SendDocument: %v", err) + } + + return nil +} + +func (c CrateDB) SendPatch(docs []any) error { + //TODO implement me + panic("implement me") +} + +func (c CrateDB) GetLatestTimestamp() (time.Time, error) { + query := `SELECT MAX(ts) FROM test` + var ts time.Time + err := c.conn.QueryRow(context.Background(), query).Scan(&ts) + if err != nil { + return time.Time{}, fmt.Errorf("cratedb:GetLatestTimestamp: %v", err) + } + + return ts, nil +} + +func (c CrateDB) ConfigureDestination() error { + //TODO implement me + panic("implement me") +} + +func (c CrateDB) Close(ctx context.Context) error { + c.conn.Close() + return nil +} diff --git a/generator/cratedb_test.go b/generator/cratedb_test.go new file mode 100644 index 0000000..64f3fa6 --- /dev/null +++ b/generator/cratedb_test.go @@ -0,0 +1,64 @@ +package generator + +import ( + "context" + "testing" +) + +func crateDbDocs() ([]any, error) { + spec := DocumentSpec{ + Destination: "elastic", + GeneratorIdentifier: "1", + BatchSize: 10, + Mode: "add", + IdMode: "sequential", + UpdatePercentage: -1, + NumClusters: -1, + HotClusterPercentage: -1, + } + + docs, err := GenerateDocs(spec) + return docs, err +} + +func TestNewCrateDB(t *testing.T) { + uri := "postgres://crate:@localhost:5432/test" + c, err := NewCrateDB(context.Background(), uri) + if err != nil { + t.Fatalf("NewCrateDB() error = %v", err) + } + + if c == nil { + t.Fatalf("NewCrateDB() = nil") + } + + err = c.Reset(context.Background()) + if err != nil { + t.Fatalf("CrateDB.Reset() error = %v", err) + } + + err = c.Init(context.Background()) + if err != nil { + t.Fatalf("CrateDB.Init() error = %v", err) + } + + docs, err := crateDbDocs() + if err != nil { + t.Fatalf("crateDbDocs() error = %v", err) + } + + err = c.SendDocument(docs) + if err != nil { + t.Fatalf("CrateDB.SendDocument() error = %v", err) + } + + err = c.Reset(context.Background()) + if err != nil { + t.Fatalf("CrateDB.Reset() error = %v", err) + } + + err = c.Close(context.Background()) + if err != nil { + t.Fatalf("CrateDB.Close() error = %v", err) + } +} diff --git a/go.mod b/go.mod index 2d34e1b..7d55c04 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,7 @@ require ( github.com/aws/aws-sdk-go-v2/service/s3 v1.27.11 github.com/go-faker/faker/v4 v4.0.0-beta.4 github.com/google/uuid v1.3.0 + github.com/jackc/pgx/v5 v5.6.0 github.com/prometheus/client_golang v1.14.0 github.com/snowflakedb/gosnowflake v1.6.16 github.com/stretchr/testify v1.8.1 @@ -46,6 +47,9 @@ require ( github.com/golang/protobuf v1.5.2 // indirect github.com/google/flatbuffers v2.0.8+incompatible // indirect github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c // indirect + github.com/jackc/pgpassfile v1.0.0 // indirect + github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect + github.com/jackc/puddle/v2 v2.2.1 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/klauspost/compress v1.15.11 // indirect github.com/mattn/go-ieproxy v0.0.1 // indirect @@ -57,12 +61,14 @@ require ( github.com/prometheus/client_model v0.3.0 // indirect github.com/prometheus/common v0.37.0 // indirect github.com/prometheus/procfs v0.8.0 // indirect + github.com/rogpeppe/go-internal v1.12.0 // indirect github.com/sirupsen/logrus v1.9.0 // indirect - golang.org/x/crypto v0.0.0-20220926161630-eccd6366d1be // indirect - golang.org/x/net v0.0.0-20221002022538-bcab6841153b // indirect - golang.org/x/sys v0.4.0 // indirect - golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 // indirect - golang.org/x/text v0.3.7 // indirect + golang.org/x/crypto v0.17.0 // indirect + golang.org/x/net v0.10.0 // indirect + golang.org/x/sync v0.1.0 // indirect + golang.org/x/sys v0.15.0 // indirect + golang.org/x/term v0.15.0 // indirect + golang.org/x/text v0.14.0 // indirect golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect google.golang.org/protobuf v1.28.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect diff --git a/go.sum b/go.sum index 099dcf2..9accb77 100644 --- a/go.sum +++ b/go.sum @@ -232,6 +232,14 @@ github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c/go.mod h1:NM github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= +github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= +github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= +github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk= +github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= +github.com/jackc/pgx/v5 v5.6.0 h1:SWJzexBzPL5jb0GEsrPMLIsi/3jOo7RHlzTjcAeDrPY= +github.com/jackc/pgx/v5 v5.6.0/go.mod h1:DNZ/vlrUnhWCoFGxHAG8U2ljioxukquj7utPDgtQdTw= +github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk= +github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg= github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8= @@ -255,8 +263,8 @@ github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxv github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= -github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= +github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= @@ -315,6 +323,8 @@ github.com/prometheus/procfs v0.8.0 h1:ODq8ZFEaYeCaZOJlZZdJA2AbQR98dSHSM1KW/You5 github.com/prometheus/procfs v0.8.0/go.mod h1:z7EfXMXOkbkqb9IINtpCn86r/to3BnA0uaxHdg830/4= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= +github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= +github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= github.com/ruudk/golang-pdf417 v0.0.0-20181029194003-1af4ab5afa58/go.mod h1:6lfFZQK844Gfx8o5WFuvpxWRwnSoipWe/p622j1v06w= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= @@ -355,8 +365,8 @@ golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/crypto v0.0.0-20220926161630-eccd6366d1be h1:fmw3UbQh+nxngCAHrDCCztao/kbYFnWjoqop8dHx05A= -golang.org/x/crypto v0.0.0-20220926161630-eccd6366d1be/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= +golang.org/x/crypto v0.17.0 h1:r8bRNjWL3GshPW3gkd+RpvzWrZAwPS49OmTGZ/uhM4k= +golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4= golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -437,8 +447,8 @@ golang.org/x/net v0.0.0-20210614182718-04defd469f4e/go.mod h1:9nx3DQGgdP8bBQD5qx golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/net v0.0.0-20220624214902-1bab6f366d9e/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= -golang.org/x/net v0.0.0-20221002022538-bcab6841153b h1:6e93nYa3hNqAvLr0pD4PN1fFS+gKzp2zAXqrnTCstqU= -golang.org/x/net v0.0.0-20221002022538-bcab6841153b/go.mod h1:YDH+HFinaLZZlnHAfSS6ZXJJ9M9t4Dl22yv3iI2vPwk= +golang.org/x/net v0.10.0 h1:X2//UzNDwYmtCLn7To6G58Wr6f5ahEAQgKNzv9Y951M= +golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -456,6 +466,8 @@ golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= +golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -503,11 +515,12 @@ golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.4.0 h1:Zr2JFtRQNX3BCZ8YtxRE9hNJYC8J6I1MVbMg6owUp18= -golang.org/x/sys v0.4.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc= +golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= -golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 h1:JGgROgKl9N8DuW20oFS5gxc+lE67/N3FcwmBPMe7ArY= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/term v0.15.0 h1:y/Oo/a/q3IXu26lQgl04j/gjuBDOBlx7X6Om1j2CPW4= +golang.org/x/term v0.15.0/go.mod h1:BDl952bC7+uMoWR75FIrCDx79TPU9oHkTZ9yRbYOrX0= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -515,8 +528,9 @@ golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= +golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= diff --git a/main.go b/main.go index fe071aa..99aebfe 100644 --- a/main.go +++ b/main.go @@ -1,6 +1,7 @@ package main import ( + "context" "fmt" "log" "math/rand" @@ -161,6 +162,29 @@ func main() { if configErr != nil { log.Fatal("Unable to configure snowflake for sending documents: ", configErr) } + case "cratedb": + ctx := context.Background() + uri := mustGetEnvString("CRATEDB_URI") + + crate, err := generator.NewCrateDB(ctx, uri) + if err != nil { + log.Fatal("Unable to configure CrateDB: ", err) + } + + err = crate.Init(ctx) + if err != nil { + log.Fatal("Unable to initialize CrateDB: ", err) + } + + defer func() { + err := crate.Reset(ctx) + if err != nil { + log.Fatal("Unable to reset CrateDB: ", err) + } + }() + + d = crate + case "null": d = &generator.Null{} default: From 768717135c1d43c9c85396aabb47dcf6e8a0b245 Mon Sep 17 00:00:00 2001 From: widmogrod Date: Wed, 3 Jul 2024 01:17:36 +0200 Subject: [PATCH 02/16] fix timestamp issue, but struggle with non existing tables? --- generator/cratedb.go | 64 +++++++++++++++++++++++++-------------- generator/cratedb_test.go | 13 ++++++-- 2 files changed, 51 insertions(+), 26 deletions(-) diff --git a/generator/cratedb.go b/generator/cratedb.go index 7e16cc4..3293266 100644 --- a/generator/cratedb.go +++ b/generator/cratedb.go @@ -12,11 +12,17 @@ import ( func NewCrateDB(ctx context.Context, url string) (*CrateDB, error) { conn, err := pgxpool.New(ctx, url) if err != nil { - return nil, fmt.Errorf("cratedb:NewCrateDB: %v", err) + return nil, fmt.Errorf("cratedb:NewCrateDB: pool; %v", err) + } + + conn2, err := pgx.Connect(ctx, url) + if err != nil { + return nil, fmt.Errorf("cratedb:NewCrateDB: single; %v", err) } return &CrateDB{ - conn: conn, + conn: conn, + conn2: conn2, }, nil } @@ -26,10 +32,11 @@ type ma = map[string]any // CrateDB contains all configurations needed to send documents to CrateDB type CrateDB struct { - conn *pgxpool.Pool + conn *pgxpool.Pool + conn2 *pgx.Conn } -func (c CrateDB) Init(ctx context.Context) error { +func (c *CrateDB) Init(ctx context.Context) error { table := `CREATE TABLE IF NOT EXISTS test ( About STRING, @@ -99,7 +106,7 @@ func (c CrateDB) Init(ctx context.Context) error { id STRING, ts TIMESTAMP WITHOUT TIME ZONE, generator_identifier STRING - ) + ); ` _, err := c.conn.Exec(ctx, table) @@ -110,7 +117,7 @@ func (c CrateDB) Init(ctx context.Context) error { return nil } -func (c CrateDB) Reset(ctx context.Context) error { +func (c *CrateDB) Reset(ctx context.Context) error { _, err := c.conn.Exec(ctx, "DROP TABLE IF EXISTS test") if err != nil { return fmt.Errorf("cratedb:Reset: %v", err) @@ -119,7 +126,7 @@ func (c CrateDB) Reset(ctx context.Context) error { return nil } -func (c CrateDB) SendDocument(docs []any) error { +func (c *CrateDB) SendDocument(docs []any) error { b := &pgx.Batch{} for _, doc := range docs { insert := `INSERT INTO test ( @@ -209,7 +216,7 @@ func (c CrateDB) SendDocument(docs []any) error { $36, $37, $38 - )` + );` b.Queue(insert, doc.(ma)["About"], @@ -246,49 +253,60 @@ func (c CrateDB) SendDocument(docs []any) error { doc.(ma)["Picture"], doc.(ma)["Registered"], doc.(ma)["Tags"], - time.Unix(0, doc.(ma)["_event_time"].(int64)).Format(time.RFC3339Nano), + time.Unix(doc.(ma)["_event_time"].(int64)/1_000_000, (doc.(ma)["_ts"].(int64)%1_000_000)*1_000), doc.(ma)["_id"], - time.Unix(0, doc.(ma)["_ts"].(int64)).Format(time.RFC3339Nano), + time.Unix(doc.(ma)["_ts"].(int64)/1_000_000, (doc.(ma)["_ts"].(int64)%1_000_000)*1_000), doc.(ma)["generator_identifier"], ) } - results := c.conn.SendBatch(context.Background(), b) - _, err := results.Exec() + br := c.conn.SendBatch(context.Background(), b) + _, err := br.Exec() if err != nil { - return fmt.Errorf("cratedb:SendDocument: %v", err) + return fmt.Errorf("cratedb:SendDocument: exec; %v", err) } - err = results.Close() + err = br.Close() if err != nil { - return fmt.Errorf("cratedb:SendDocument: %v", err) + return fmt.Errorf("cratedb:SendDocument: close; %v", err) } return nil } -func (c CrateDB) SendPatch(docs []any) error { +func (c *CrateDB) SendPatch(docs []any) error { //TODO implement me panic("implement me") } -func (c CrateDB) GetLatestTimestamp() (time.Time, error) { - query := `SELECT MAX(ts) FROM test` +func (c *CrateDB) GetLatestTimestamp() (time.Time, error) { + query := `SELECT MAX(event_time) FROM test.test;` + var ts time.Time - err := c.conn.QueryRow(context.Background(), query).Scan(&ts) + rows, err := c.conn.Query(context.Background(), query) if err != nil { - return time.Time{}, fmt.Errorf("cratedb:GetLatestTimestamp: %v", err) + return time.Time{}, fmt.Errorf("cratedb:GetLatestTimestamp: query; %v", err) } + if !rows.Next() { + return time.Time{}, fmt.Errorf("cratedb:GetLatestTimestamp: no rows") + } + err = rows.Scan(&ts) + if err != nil { + return time.Time{}, fmt.Errorf("cratedb:GetLatestTimestamp: scan; %v", err) + } + rows.Close() + + tsv := ts.UnixMicro() - return ts, nil + return time.Unix(tsv/1_000_000, (tsv%1_000_000)*1_000), nil } -func (c CrateDB) ConfigureDestination() error { +func (c *CrateDB) ConfigureDestination() error { //TODO implement me panic("implement me") } -func (c CrateDB) Close(ctx context.Context) error { +func (c *CrateDB) Close(ctx context.Context) error { c.conn.Close() return nil } diff --git a/generator/cratedb_test.go b/generator/cratedb_test.go index 64f3fa6..1f890c8 100644 --- a/generator/cratedb_test.go +++ b/generator/cratedb_test.go @@ -22,7 +22,7 @@ func crateDbDocs() ([]any, error) { } func TestNewCrateDB(t *testing.T) { - uri := "postgres://crate:@localhost:5432/test" + uri := "postgres://crate:@localhost:5432/test?pool_max_conns=100&pool_min_conns=10" c, err := NewCrateDB(context.Background(), uri) if err != nil { t.Fatalf("NewCrateDB() error = %v", err) @@ -52,11 +52,18 @@ func TestNewCrateDB(t *testing.T) { t.Fatalf("CrateDB.SendDocument() error = %v", err) } - err = c.Reset(context.Background()) + timestamp, err := c.GetLatestTimestamp() if err != nil { - t.Fatalf("CrateDB.Reset() error = %v", err) + t.Fatalf("CrateDB.GetLatestTimestamp() error = %v", err) } + t.Logf("CrateDB.GetLatestTimestamp() = %s", timestamp.String()) + + //err = c.Reset(context.Background()) + //if err != nil { + // t.Fatalf("CrateDB.Reset() error = %v", err) + //} + err = c.Close(context.Background()) if err != nil { t.Fatalf("CrateDB.Close() error = %v", err) From a7e63fd17a938f36f701af28068aae1afebf2871 Mon Sep 17 00:00:00 2001 From: widmogrod Date: Wed, 3 Jul 2024 13:28:55 +0200 Subject: [PATCH 03/16] improve time support --- dev/compose.yml | 54 +-------------------------------------- dev/run.sh | 10 ++++---- generator/cratedb.go | 10 +++++--- generator/cratedb_test.go | 48 +++++++++++++++++++++++++++++++--- 4 files changed, 56 insertions(+), 66 deletions(-) diff --git a/dev/compose.yml b/dev/compose.yml index def779f..6609560 100644 --- a/dev/compose.yml +++ b/dev/compose.yml @@ -27,15 +27,7 @@ services: volumes: - /tmp/crate/00:/data command: ["crate", -# "-Ccluster.name=crate-docker-cluster", -# "-Cnode.name=cratedb01", -# "-Cnode.data=true", - "-Cnetwork.host=_site_", -# "-Cdiscovery.seed_hosts=cratedb02,cratedb03", -# "-Ccluster.initial_master_nodes=cratedb01,cratedb02,cratedb03", -# "-Cgateway.expected_data_nodes=3", -# "-Cgateway.recover_after_data_nodes=2" - ] + "-Cnetwork.host=_site_"] deploy: replicas: 1 restart_policy: @@ -46,50 +38,6 @@ services: hard: -1 environment: - CRATE_HEAP_SIZE=2g -# -# cratedb02: -# image: crate:latest -# ports: -# - "4202:4200" -# volumes: -# - /tmp/crate/02:/data -# command: ["crate", -# "-Ccluster.name=crate-docker-cluster", -# "-Cnode.name=cratedb02", -# "-Cnode.data=true", -# "-Cnetwork.host=_site_", -# "-Cdiscovery.seed_hosts=cratedb01,cratedb03", -# "-Ccluster.initial_master_nodes=cratedb01,cratedb02,cratedb03", -# "-Cgateway.expected_data_nodes=3", -# "-Cgateway.recover_after_data_nodes=2"] -# deploy: -# replicas: 1 -# restart_policy: -# condition: on-failure -# environment: -# - CRATE_HEAP_SIZE=2g -# -# cratedb03: -# image: crate:latest -# ports: -# - "4203:4200" -# volumes: -# - /tmp/crate/03:/data -# command: ["crate", -# "-Ccluster.name=crate-docker-cluster", -# "-Cnode.name=cratedb03", -# "-Cnode.data=true", -# "-Cnetwork.host=_site_", -# "-Cdiscovery.seed_hosts=cratedb01,cratedb02", -# "-Ccluster.initial_master_nodes=cratedb01,cratedb02,cratedb03", -# "-Cgateway.expected_data_nodes=3", -# "-Cgateway.recover_after_data_nodes=2"] -# deploy: -# replicas: 1 -# restart_policy: -# condition: on-failure -# environment: -# - CRATE_HEAP_SIZE=2g volumes: data01: diff --git a/dev/run.sh b/dev/run.sh index 104289c..e576ae5 100755 --- a/dev/run.sh +++ b/dev/run.sh @@ -3,11 +3,11 @@ cwd=$(dirname "$0") project_root=$(dirname "$cwd") -#docker compose -f $cwd/compose.yml up -d -#trap "docker compose -f $cwd/compose.yml down" EXIT +docker compose -f $cwd/compose.yml up -d +trap "docker compose -f $cwd/compose.yml down" EXIT -#echo "Benchmarking Elastic" -#ELASTIC_AUTH="" ELASTIC_URL=http://localhost:9200 ELASTIC_INDEX=index_name WPS=1 BATCH_SIZE=50 DESTINATION=Elastic TRACK_LATENCY=true ./rockbench +echo "Benchmarking Elastic" +ELASTIC_AUTH="" ELASTIC_URL=http://localhost:9200 ELASTIC_INDEX=index_name WPS=1 BATCH_SIZE=50 DESTINATION=Elastic TRACK_LATENCY=true ./rockbench Echo "Benchmarking CrateDB" -CRATEDB_URI="postgres://crate:@localhost:5432/test" WPS=1 BATCH_SIZE=50 DESTINATION=CrateDB TRACK_LATENCY=true ./rockbench \ No newline at end of file +CRATEDB_URI="postgres://crate:@localhost:5432/test" WPS=1 BATCH_SIZE=50 DESTINATION=CrateDB TRACK_LATENCY=true ./rockbench diff --git a/generator/cratedb.go b/generator/cratedb.go index 3293266..07cbbd5 100644 --- a/generator/cratedb.go +++ b/generator/cratedb.go @@ -102,9 +102,9 @@ func (c *CrateDB) Init(ctx context.Context) error { Picture STRING, Registered STRING, Tags ARRAY(STRING), - event_time TIMESTAMP WITHOUT TIME ZONE, + event_time TIMESTAMP WITH TIME ZONE, id STRING, - ts TIMESTAMP WITHOUT TIME ZONE, + ts TIMESTAMP WITH TIME ZONE, generator_identifier STRING ); ` @@ -253,9 +253,11 @@ func (c *CrateDB) SendDocument(docs []any) error { doc.(ma)["Picture"], doc.(ma)["Registered"], doc.(ma)["Tags"], - time.Unix(doc.(ma)["_event_time"].(int64)/1_000_000, (doc.(ma)["_ts"].(int64)%1_000_000)*1_000), + time.UnixMicro(doc.(ma)["_event_time"].(int64)), + //time.Unix(doc.(ma)["_event_time"].(int64)/1_000_000, (doc.(ma)["_ts"].(int64)%1_000_000)*1_000), doc.(ma)["_id"], - time.Unix(doc.(ma)["_ts"].(int64)/1_000_000, (doc.(ma)["_ts"].(int64)%1_000_000)*1_000), + time.UnixMicro(doc.(ma)["_ts"].(int64)), + //time.Unix(doc.(ma)["_ts"].(int64)/1_000_000, (doc.(ma)["_ts"].(int64)%1_000_000)*1_000), doc.(ma)["generator_identifier"], ) } diff --git a/generator/cratedb_test.go b/generator/cratedb_test.go index 1f890c8..cf1f58e 100644 --- a/generator/cratedb_test.go +++ b/generator/cratedb_test.go @@ -3,6 +3,7 @@ package generator import ( "context" "testing" + "time" ) func crateDbDocs() ([]any, error) { @@ -21,6 +22,32 @@ func crateDbDocs() ([]any, error) { return docs, err } +func findMaxTimestamp(rows []any) int64 { + var maxTimestamp int64 + for _, row := range rows { + timestamp := row.(ma)["_event_time"].(int64) + if timestamp > maxTimestamp { + maxTimestamp = timestamp + } + } + + return maxTimestamp +} + +func TestTimestamps(t *testing.T) { + var given int64 = 1720004969682695 + + t1 := time.Unix(given/1_000_000, (given%1_000_000)*1_000) + back := t1.UnixMicro() + + t2 := time.Unix(back/1_000_000, (back%1_000_000)*1_000) + output := t2.UnixMicro() + + if given != output { + t.Fatalf("timestamps do not match: given=%d, output=%d", given, output) + } +} + func TestNewCrateDB(t *testing.T) { uri := "postgres://crate:@localhost:5432/test?pool_max_conns=100&pool_min_conns=10" c, err := NewCrateDB(context.Background(), uri) @@ -52,17 +79,30 @@ func TestNewCrateDB(t *testing.T) { t.Fatalf("CrateDB.SendDocument() error = %v", err) } + maxTimestamp := findMaxTimestamp(docs) + t.Logf("findMaxTimestamp() = %d", maxTimestamp) + t.Logf("UnixMicro() = %s", time.UnixMicro(maxTimestamp)) + t.Logf("findMaxTimestamp() = %s", time.Unix(maxTimestamp/1_000_000, (maxTimestamp%1_000_000)*1_000)) + + // need to sleep, otherwise error + // I don't know why + time.Sleep(1 * time.Second) timestamp, err := c.GetLatestTimestamp() if err != nil { t.Fatalf("CrateDB.GetLatestTimestamp() error = %v", err) } + delta := timestamp.UnixMicro() - maxTimestamp + if delta > 0 || delta < 0 { + t.Errorf("CrateDB.GetLatestTimestamp() delta is %d, want 0", delta) + } + t.Logf("CrateDB.GetLatestTimestamp() = %s", timestamp.String()) - //err = c.Reset(context.Background()) - //if err != nil { - // t.Fatalf("CrateDB.Reset() error = %v", err) - //} + err = c.Reset(context.Background()) + if err != nil { + t.Fatalf("CrateDB.Reset() error = %v", err) + } err = c.Close(context.Background()) if err != nil { From 8fc257ad77616dca44bed2ee7d3136325db23551 Mon Sep 17 00:00:00 2001 From: widmogrod Date: Wed, 3 Jul 2024 16:12:51 +0200 Subject: [PATCH 04/16] cleanup comments --- generator/cratedb.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/generator/cratedb.go b/generator/cratedb.go index 07cbbd5..b374e6c 100644 --- a/generator/cratedb.go +++ b/generator/cratedb.go @@ -254,10 +254,8 @@ func (c *CrateDB) SendDocument(docs []any) error { doc.(ma)["Registered"], doc.(ma)["Tags"], time.UnixMicro(doc.(ma)["_event_time"].(int64)), - //time.Unix(doc.(ma)["_event_time"].(int64)/1_000_000, (doc.(ma)["_ts"].(int64)%1_000_000)*1_000), doc.(ma)["_id"], time.UnixMicro(doc.(ma)["_ts"].(int64)), - //time.Unix(doc.(ma)["_ts"].(int64)/1_000_000, (doc.(ma)["_ts"].(int64)%1_000_000)*1_000), doc.(ma)["generator_identifier"], ) } From 242a1adda1f9984245b7833d514f476b27f9750a Mon Sep 17 00:00:00 2001 From: widmogrod Date: Wed, 3 Jul 2024 16:13:06 +0200 Subject: [PATCH 05/16] rename tests and clean up logs --- generator/cratedb_test.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/generator/cratedb_test.go b/generator/cratedb_test.go index cf1f58e..59192eb 100644 --- a/generator/cratedb_test.go +++ b/generator/cratedb_test.go @@ -34,7 +34,7 @@ func findMaxTimestamp(rows []any) int64 { return maxTimestamp } -func TestTimestamps(t *testing.T) { +func TestTimestampsConversion(t *testing.T) { var given int64 = 1720004969682695 t1 := time.Unix(given/1_000_000, (given%1_000_000)*1_000) @@ -80,9 +80,6 @@ func TestNewCrateDB(t *testing.T) { } maxTimestamp := findMaxTimestamp(docs) - t.Logf("findMaxTimestamp() = %d", maxTimestamp) - t.Logf("UnixMicro() = %s", time.UnixMicro(maxTimestamp)) - t.Logf("findMaxTimestamp() = %s", time.Unix(maxTimestamp/1_000_000, (maxTimestamp%1_000_000)*1_000)) // need to sleep, otherwise error // I don't know why From 5283eb3287c36cae1f67ab9ba3ebed16716ba4d9 Mon Sep 17 00:00:00 2001 From: widmogrod Date: Wed, 3 Jul 2024 16:21:23 +0200 Subject: [PATCH 06/16] cratedb test cleanup --- generator/cratedb_test.go | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/generator/cratedb_test.go b/generator/cratedb_test.go index 59192eb..f2a14d0 100644 --- a/generator/cratedb_test.go +++ b/generator/cratedb_test.go @@ -81,20 +81,25 @@ func TestNewCrateDB(t *testing.T) { maxTimestamp := findMaxTimestamp(docs) - // need to sleep, otherwise error - // I don't know why + // I don't know why, but without slip I get error + // > can't scan into dest[0]: cannot scan NULL into *time.Time + // It looks like table is not visible to be queried by driver imminently after insert, + // but 1s later everything works. This could be go driver or cratedb thingy? time.Sleep(1 * time.Second) timestamp, err := c.GetLatestTimestamp() if err != nil { t.Fatalf("CrateDB.GetLatestTimestamp() error = %v", err) } - delta := timestamp.UnixMicro() - maxTimestamp - if delta > 0 || delta < 0 { - t.Errorf("CrateDB.GetLatestTimestamp() delta is %d, want 0", delta) + delta := maxTimestamp - timestamp.UnixMicro() + epsilon := int64(1000) + if delta > epsilon { + t.Errorf("CrateDB.GetLatestTimestamp() delta is %d, cannot be bigger than %d", delta, epsilon) } t.Logf("CrateDB.GetLatestTimestamp() = %s", timestamp.String()) + t.Logf(" timestamp = %d", timestamp.UnixMicro()) + t.Logf("maxTimestamp = %d", maxTimestamp) err = c.Reset(context.Background()) if err != nil { From 0e73e52b15323366bf432d51588848d01ac45e52 Mon Sep 17 00:00:00 2001 From: widmogrod Date: Wed, 3 Jul 2024 16:21:39 +0200 Subject: [PATCH 07/16] update readme --- README.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/README.md b/README.md index e49c948..5def058 100644 --- a/README.md +++ b/README.md @@ -46,6 +46,9 @@ ELASTIC_AUTH="ApiKey xxx" ELASTIC_URL=https://... ELASTIC_INDEX=index_name WPS=1 # Send data to Snowflake and report data latency SNOWFLAKE_ACCOUNT=xxxx SNOWFLAKE_USER=xxxx SNOWFLAKE_PASSWORD=xxxx SNOWFLAKE_WAREHOUSE=xxxx SNOWFLAKE_DATABASE=xxxx SNOWFLAKE_STAGES3BUCKETNAME=xxxx AWS_REGION=xxxx WPS=1 BATCH_SIZE=50 TRACK_LATENCY=true DESTINATION=Snowflake ./rockbench + +# Send data to CrateDB and report data latency +CRATEDB_URI="postgres://crate:@localhost:5432/test?pool_max_conns=10&pool_min_conns=3" WPS=1 BATCH_SIZE=50 DESTINATION=CrateDB TRACK_LATENCY=true ./rockbench ``` - To run with Docker container From f72c1e2c6f98aaf0f4332b3f270b94da681b81b2 Mon Sep 17 00:00:00 2001 From: widmogrod Date: Wed, 3 Jul 2024 16:22:10 +0200 Subject: [PATCH 08/16] dev/run.sh allows simple run of benchmarks --- dev/run.sh | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/dev/run.sh b/dev/run.sh index e576ae5..0621822 100755 --- a/dev/run.sh +++ b/dev/run.sh @@ -6,8 +6,11 @@ project_root=$(dirname "$cwd") docker compose -f $cwd/compose.yml up -d trap "docker compose -f $cwd/compose.yml down" EXIT +echo "Build rockbench" +go build . + echo "Benchmarking Elastic" ELASTIC_AUTH="" ELASTIC_URL=http://localhost:9200 ELASTIC_INDEX=index_name WPS=1 BATCH_SIZE=50 DESTINATION=Elastic TRACK_LATENCY=true ./rockbench Echo "Benchmarking CrateDB" -CRATEDB_URI="postgres://crate:@localhost:5432/test" WPS=1 BATCH_SIZE=50 DESTINATION=CrateDB TRACK_LATENCY=true ./rockbench +CRATEDB_URI="postgres://crate:@localhost:5432/test?pool_max_conns=10&pool_min_conns=3" WPS=1 BATCH_SIZE=50 DESTINATION=CrateDB TRACK_LATENCY=true ./rockbench From 6dafe71b27923fc98adcb7dccb41c00c01874561 Mon Sep 17 00:00:00 2001 From: widmogrod Date: Wed, 3 Jul 2024 16:22:38 +0200 Subject: [PATCH 09/16] main: add CrateDB to options message --- main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/main.go b/main.go index 99aebfe..6cb60d9 100644 --- a/main.go +++ b/main.go @@ -188,7 +188,7 @@ func main() { case "null": d = &generator.Null{} default: - log.Fatal("Unsupported destination. Supported options are Rockset, Elastic & Null") + log.Fatal("Unsupported destination. Supported options are Rockset, Elastic, CrateDB & Null") } if exportMetrics { From ce751416cfd565d91cc690a8dff1133d4531a0b1 Mon Sep 17 00:00:00 2001 From: widmogrod Date: Wed, 3 Jul 2024 16:23:11 +0200 Subject: [PATCH 10/16] run go fmt ./... to clean up the codebase --- generator/document.go | 360 +++++++++++++++++++------------------- generator/elastic_test.go | 2 +- generator/rockset_test.go | 2 +- 3 files changed, 182 insertions(+), 182 deletions(-) diff --git a/generator/document.go b/generator/document.go index de5ae1b..8971c8f 100644 --- a/generator/document.go +++ b/generator/document.go @@ -93,7 +93,7 @@ func GenerateDoc(spec DocumentSpec) (interface{}, error) { doc["_id"] = formatDocId(rand.Intn(getMaxDoc())) } else { doc["_id"] = formatDocId(getMaxDoc()) - SetMaxDoc(getMaxDoc()+1) + SetMaxDoc(getMaxDoc() + 1) } doc_id = doc_id + 1 // All other modes @@ -119,7 +119,7 @@ func GenerateDoc(spec DocumentSpec) (interface{}, error) { } func getClusterKey(numClusters int, hotClusterPercentage int) string { - if hotClusterPercentage > 0 && rand.Intn(100) < hotClusterPercentage { + if hotClusterPercentage > 0 && rand.Intn(100) < hotClusterPercentage { return "0@gmail.com" } else { return fmt.Sprintf("%d@gmail.com", rand.Intn(numClusters)) @@ -171,11 +171,11 @@ func GeneratePatches(num_patch int, destination string, c chan map[string]interf ids_to_patch := genUniqueInRange(getMaxDoc(), num_patch) for _, id := range ids_to_patch { - if (destination == "elastic") { + if destination == "elastic" { patch := generateElasticPatch(id, <-c) patches = append(patches, patch) - } else if (destination == "rockset") { + } else if destination == "rockset" { patch := generateRocksetPatch(id, <-c) patches = append(patches, patch) } @@ -186,26 +186,26 @@ func GeneratePatches(num_patch int, destination string, c chan map[string]interf func RandomFieldAdd(destination string, c chan map[string]interface{}) { // Adding fields or array members for { - if (destination == "rockset") { + if destination == "rockset" { options := []map[string]interface{}{{ "op": "add", "path": "/" + faker.UUIDDigit(), "value": faker.Email(), - }, + }, { - "op": "add", - "path": "/Tags/-", - "value": faker.UUIDHyphenated(), // Append to tags array + "op": "add", + "path": "/Tags/-", + "value": faker.UUIDHyphenated(), // Append to tags array }, } shuffleAndFillChannel(options, c) - } else if (destination == "elastic") { + } else if destination == "elastic" { options := []map[string]interface{}{{ - "doc": map[string]interface{}{ - faker.UUIDDigit(): faker.Email(), - "_ts": CurrentTimeMicros(), - }, + "doc": map[string]interface{}{ + faker.UUIDDigit(): faker.Email(), + "_ts": CurrentTimeMicros(), }, + }, { "script": map[string]interface{}{ "source": "ctx._source.Tags.add(params.tag)", @@ -225,203 +225,203 @@ func RandomFieldReplace(destination string, c chan map[string]interface{}) { // Purely replacement of fields random := rand.New(rand.NewSource(time.Now().UnixNano())) for { - if (destination == "rockset") { + if destination == "rockset" { options := []map[string]interface{}{{ "op": "replace", "path": "/Email", "value": faker.Email(), }, - { - "op": "replace", - "path": "/About", - "value": faker.Sentence(), - }, - { - "op": "replace", - "path": "/Company", - "value": faker.Word() + "-" + faker.Word(), - }, - { - "op": "replace", - "path": "/Name/First", - "value": faker.FirstName(), - }, - { - "op": "replace", - "path": "/Name/Last", - "value": faker.LastName(), - }, - { - "op": "replace", - "path": "/Age", - "value": random.Intn(100), - }, - { - "op": "replace", - "path": "/Balance", - "value": random.Float64(), - }, - { - "op": "replace", - "path": "/Registered", - "value": faker.Timestamp(), - }, - { - "op": "replace", - "path": "/Phone", - "value": faker.Phonenumber(), - }, - { - "op": "replace", - "path": "/Picture", - "value": faker.UUIDDigit(), - }, - { - "op": "replace", - "path": "/Guid", - "value": faker.UUIDHyphenated(), - }, - { - "op": "replace", - "path": "/Greeting", - "value": faker.Paragraph(), - }, - { - "op": "replace", - "path": "/Address/ZipCode", - "value": random.Intn(100000), - }, - { - "op": "replace", - "path": "/Address/Coordinates/Longitude", - "value": faker.Longitude(), - }, - { - "op": "replace", - "path": "/Address/Coordinates/Latitude", - "value": faker.Latitude(), - }, - { - "op": "replace", - "path": "/Address/City", - "value": faker.Word(), - }} + { + "op": "replace", + "path": "/About", + "value": faker.Sentence(), + }, + { + "op": "replace", + "path": "/Company", + "value": faker.Word() + "-" + faker.Word(), + }, + { + "op": "replace", + "path": "/Name/First", + "value": faker.FirstName(), + }, + { + "op": "replace", + "path": "/Name/Last", + "value": faker.LastName(), + }, + { + "op": "replace", + "path": "/Age", + "value": random.Intn(100), + }, + { + "op": "replace", + "path": "/Balance", + "value": random.Float64(), + }, + { + "op": "replace", + "path": "/Registered", + "value": faker.Timestamp(), + }, + { + "op": "replace", + "path": "/Phone", + "value": faker.Phonenumber(), + }, + { + "op": "replace", + "path": "/Picture", + "value": faker.UUIDDigit(), + }, + { + "op": "replace", + "path": "/Guid", + "value": faker.UUIDHyphenated(), + }, + { + "op": "replace", + "path": "/Greeting", + "value": faker.Paragraph(), + }, + { + "op": "replace", + "path": "/Address/ZipCode", + "value": random.Intn(100000), + }, + { + "op": "replace", + "path": "/Address/Coordinates/Longitude", + "value": faker.Longitude(), + }, + { + "op": "replace", + "path": "/Address/Coordinates/Latitude", + "value": faker.Latitude(), + }, + { + "op": "replace", + "path": "/Address/City", + "value": faker.Word(), + }} shuffleAndFillChannel(options, c) - } else if (destination == "elastic") { + } else if destination == "elastic" { options := []map[string]interface{}{{ "doc": map[string]interface{}{ "Email": faker.Email(), - "_ts": CurrentTimeMicros(), - }, - }, - { - "doc": map[string]interface{}{ - "About": faker.Sentence(), - "_ts": CurrentTimeMicros(), + "_ts": CurrentTimeMicros(), }, }, - { - "doc": map[string]interface{}{ - "Company": faker.Word() + "-" + faker.Word(), - "_ts": CurrentTimeMicros(), + { + "doc": map[string]interface{}{ + "About": faker.Sentence(), + "_ts": CurrentTimeMicros(), + }, }, - }, - { - "script": map[string]interface{}{ - "source": "ctx._source.Name.First = params.updated_nested_first_name; ctx._source._ts = params.ts", - "params" : map[string]interface{}{ - "updated_nested_first_name" : faker.FirstName(), - "ts": CurrentTimeMicros(), + { + "doc": map[string]interface{}{ + "Company": faker.Word() + "-" + faker.Word(), + "_ts": CurrentTimeMicros(), }, }, - }, - { - "script": map[string]interface{}{ - "source": "ctx._source.Name.Last = params.updated_nested_last_name; ctx._source._ts = params.ts", - "params" : map[string]interface{}{ - "updated_nested_last_name": faker.LastName(), - "ts": CurrentTimeMicros(), + { + "script": map[string]interface{}{ + "source": "ctx._source.Name.First = params.updated_nested_first_name; ctx._source._ts = params.ts", + "params": map[string]interface{}{ + "updated_nested_first_name": faker.FirstName(), + "ts": CurrentTimeMicros(), + }, }, }, - }, - { - "doc": map[string]interface{}{ - "Age": random.Intn(100), - "_ts": CurrentTimeMicros(), + { + "script": map[string]interface{}{ + "source": "ctx._source.Name.Last = params.updated_nested_last_name; ctx._source._ts = params.ts", + "params": map[string]interface{}{ + "updated_nested_last_name": faker.LastName(), + "ts": CurrentTimeMicros(), + }, + }, }, - }, - { - "doc": map[string]interface{}{ - "Balance": random.Float64(), - "_ts": CurrentTimeMicros(), + { + "doc": map[string]interface{}{ + "Age": random.Intn(100), + "_ts": CurrentTimeMicros(), + }, }, - }, - { - "doc": map[string]interface{}{ - "Registered": faker.Timestamp(), - "_ts": CurrentTimeMicros(), + { + "doc": map[string]interface{}{ + "Balance": random.Float64(), + "_ts": CurrentTimeMicros(), + }, }, - }, - { - "doc": map[string]interface{}{ - "Phone": faker.Phonenumber(), - "_ts": CurrentTimeMicros(), + { + "doc": map[string]interface{}{ + "Registered": faker.Timestamp(), + "_ts": CurrentTimeMicros(), + }, }, - }, - { - "doc": map[string]interface{}{ - "Picture": faker.UUIDDigit(), - "_ts": CurrentTimeMicros(), + { + "doc": map[string]interface{}{ + "Phone": faker.Phonenumber(), + "_ts": CurrentTimeMicros(), + }, }, - }, - { - "doc": map[string]interface{}{ - "Guid": faker.UUIDHyphenated(), - "_ts": CurrentTimeMicros(), + { + "doc": map[string]interface{}{ + "Picture": faker.UUIDDigit(), + "_ts": CurrentTimeMicros(), + }, }, - }, - { - "doc": map[string]interface{}{ - "Greeting": faker.Paragraph(), - "_ts": CurrentTimeMicros(), + { + "doc": map[string]interface{}{ + "Guid": faker.UUIDHyphenated(), + "_ts": CurrentTimeMicros(), + }, }, - }, - { - "script": map[string]interface{}{ - "source": "ctx._source.Address.ZipCode = params.updated_nested_zcode; ctx._source._ts = params.ts", - "params" : map[string]interface{}{ - "updated_nested_zcode": random.Intn(100000), - "ts": CurrentTimeMicros(), + { + "doc": map[string]interface{}{ + "Greeting": faker.Paragraph(), + "_ts": CurrentTimeMicros(), }, }, - }, - { - "script": map[string]interface{}{ - "source": "ctx._source.Address.Coordinates.Longitude = params.updated_nested_coord_long; ctx._source._ts = params.ts", - "params" : map[string]interface{}{ - "updated_nested_coord_long": faker.Longitude(), - "ts": CurrentTimeMicros(), + { + "script": map[string]interface{}{ + "source": "ctx._source.Address.ZipCode = params.updated_nested_zcode; ctx._source._ts = params.ts", + "params": map[string]interface{}{ + "updated_nested_zcode": random.Intn(100000), + "ts": CurrentTimeMicros(), + }, }, }, - }, - { - "script": map[string]interface{}{ - "source": "ctx._source.Address.Coordinates.Latitude = params.updated_nested_coord_lat; ctx._source._ts = params.ts", - "params" : map[string]interface{}{ - "updated_nested_coord_lat": faker.Latitude(), - "ts": CurrentTimeMicros(), + { + "script": map[string]interface{}{ + "source": "ctx._source.Address.Coordinates.Longitude = params.updated_nested_coord_long; ctx._source._ts = params.ts", + "params": map[string]interface{}{ + "updated_nested_coord_long": faker.Longitude(), + "ts": CurrentTimeMicros(), + }, }, }, - }, - { - "script": map[string]interface{}{ - "source": "ctx._source.Address.City = params.updated_nested_city; ctx._source._ts = params.ts", - "params" : map[string]interface{}{ - "updated_nested_city": faker.Word(), - "ts": CurrentTimeMicros(), + { + "script": map[string]interface{}{ + "source": "ctx._source.Address.Coordinates.Latitude = params.updated_nested_coord_lat; ctx._source._ts = params.ts", + "params": map[string]interface{}{ + "updated_nested_coord_lat": faker.Latitude(), + "ts": CurrentTimeMicros(), + }, }, }, - }} + { + "script": map[string]interface{}{ + "source": "ctx._source.Address.City = params.updated_nested_city; ctx._source._ts = params.ts", + "params": map[string]interface{}{ + "updated_nested_city": faker.Word(), + "ts": CurrentTimeMicros(), + }, + }, + }} shuffleAndFillChannel(options, c) } } diff --git a/generator/elastic_test.go b/generator/elastic_test.go index a2d4066..28b6d59 100644 --- a/generator/elastic_test.go +++ b/generator/elastic_test.go @@ -51,7 +51,7 @@ func TestElastic_SendDocument(t *testing.T) { UpdatePercentage: -1, NumClusters: -1, HotClusterPercentage: -1, - }; + } docs, err := GenerateDocs(spec) assert.Nil(t, err) diff --git a/generator/rockset_test.go b/generator/rockset_test.go index 3b63fef..395e1c3 100644 --- a/generator/rockset_test.go +++ b/generator/rockset_test.go @@ -66,7 +66,7 @@ func TestRockset_SendDocument(t *testing.T) { UpdatePercentage: -1, NumClusters: -1, HotClusterPercentage: -1, - }; + } docs, err := GenerateDocs(spec) assert.Nil(t, err) From 9ee398f5d2c8e17278e3890cd551a5c6f87d6158 Mon Sep 17 00:00:00 2001 From: widmogrod Date: Wed, 3 Jul 2024 16:38:02 +0200 Subject: [PATCH 11/16] crate test are skipped when no CRATEDB_URI set --- generator/cratedb_test.go | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/generator/cratedb_test.go b/generator/cratedb_test.go index f2a14d0..4acda1f 100644 --- a/generator/cratedb_test.go +++ b/generator/cratedb_test.go @@ -2,6 +2,7 @@ package generator import ( "context" + "os" "testing" "time" ) @@ -49,7 +50,22 @@ func TestTimestampsConversion(t *testing.T) { } func TestNewCrateDB(t *testing.T) { - uri := "postgres://crate:@localhost:5432/test?pool_max_conns=100&pool_min_conns=10" + uri := os.Getenv("CRATEDB_URI") + if uri == "" { + t.Skipf(` +CRATEDB_URI not set. + +To run this test, make sure you run: + docker compose -f dev/compose.yml up -d + +And then run: + CRATEDB_URI="postgres://crate:@localhost:5432/test?pool_max_conns=10&pool_min_conns=3" go test ./... + +`) + } + + t.Logf("CRATEDB_URI: %s", uri) + c, err := NewCrateDB(context.Background(), uri) if err != nil { t.Fatalf("NewCrateDB() error = %v", err) From 212da947a0b824738d6c94c1fddecacea20d9d82 Mon Sep 17 00:00:00 2001 From: widmogrod Date: Wed, 3 Jul 2024 16:38:28 +0200 Subject: [PATCH 12/16] Add to documentation development notes --- README.md | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/README.md b/README.md index 5def058..40871cd 100644 --- a/README.md +++ b/README.md @@ -99,3 +99,13 @@ two methods: Once the new source is implemented, handle it in [main.go](https://github.com/rockset/rockbench/blob/master/generator/main.go). + +## Development + +``` +# Run docker compose for local development +docker compose -f dev/compose.yml up -d + +To test CrateDB part run: +CRATEDB_URI="postgres://crate:@localhost:5432/test?pool_max_conns=10&pool_min_conns=3" go test ./... +``` From c03faeed0b691f925febc5c2f43838498e69894d Mon Sep 17 00:00:00 2001 From: widmogrod Date: Wed, 3 Jul 2024 16:57:21 +0200 Subject: [PATCH 13/16] Update run.sh --- dev/run.sh | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/dev/run.sh b/dev/run.sh index 0621822..8de759f 100755 --- a/dev/run.sh +++ b/dev/run.sh @@ -9,8 +9,12 @@ trap "docker compose -f $cwd/compose.yml down" EXIT echo "Build rockbench" go build . + +echo "Wait for 15s for compose containers to start..." +sleep 15 + echo "Benchmarking Elastic" -ELASTIC_AUTH="" ELASTIC_URL=http://localhost:9200 ELASTIC_INDEX=index_name WPS=1 BATCH_SIZE=50 DESTINATION=Elastic TRACK_LATENCY=true ./rockbench +ELASTIC_AUTH="" ELASTIC_URL=http://localhost:9200 ELASTIC_INDEX=index_name WPS=2 BATCH_SIZE=50 DESTINATION=Elastic TRACK_LATENCY=true ./rockbench Echo "Benchmarking CrateDB" -CRATEDB_URI="postgres://crate:@localhost:5432/test?pool_max_conns=10&pool_min_conns=3" WPS=1 BATCH_SIZE=50 DESTINATION=CrateDB TRACK_LATENCY=true ./rockbench +CRATEDB_URI="postgres://crate:@localhost:5432/test?pool_max_conns=10&pool_min_conns=3" WPS=2 BATCH_SIZE=50 DESTINATION=CrateDB TRACK_LATENCY=true ./rockbench From 6293fa0009d8ee3062a1abc58e478046a6f70453 Mon Sep 17 00:00:00 2001 From: widmogrod Date: Wed, 3 Jul 2024 17:02:02 +0200 Subject: [PATCH 14/16] update WPS=1 --- dev/run.sh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dev/run.sh b/dev/run.sh index 8de759f..b5908aa 100755 --- a/dev/run.sh +++ b/dev/run.sh @@ -14,7 +14,7 @@ echo "Wait for 15s for compose containers to start..." sleep 15 echo "Benchmarking Elastic" -ELASTIC_AUTH="" ELASTIC_URL=http://localhost:9200 ELASTIC_INDEX=index_name WPS=2 BATCH_SIZE=50 DESTINATION=Elastic TRACK_LATENCY=true ./rockbench +ELASTIC_AUTH="" ELASTIC_URL=http://localhost:9200 ELASTIC_INDEX=index_name WPS=1 BATCH_SIZE=50 DESTINATION=Elastic TRACK_LATENCY=true ./rockbench Echo "Benchmarking CrateDB" -CRATEDB_URI="postgres://crate:@localhost:5432/test?pool_max_conns=10&pool_min_conns=3" WPS=2 BATCH_SIZE=50 DESTINATION=CrateDB TRACK_LATENCY=true ./rockbench +CRATEDB_URI="postgres://crate:@localhost:5432/test?pool_max_conns=10&pool_min_conns=3" WPS=1 BATCH_SIZE=50 DESTINATION=CrateDB TRACK_LATENCY=true ./rockbench From 512b03984a0df6c80c11b9c0e75711c07bc60139 Mon Sep 17 00:00:00 2001 From: widmogrod Date: Wed, 3 Jul 2024 17:31:29 +0200 Subject: [PATCH 15/16] simplify cratedb implementation --- generator/cratedb.go | 221 ++++--------------------------------------- 1 file changed, 20 insertions(+), 201 deletions(-) diff --git a/generator/cratedb.go b/generator/cratedb.go index b374e6c..3d4e8cf 100644 --- a/generator/cratedb.go +++ b/generator/cratedb.go @@ -2,6 +2,7 @@ package generator import ( "context" + "encoding/json" "fmt" "github.com/jackc/pgx/v5" "time" @@ -37,77 +38,7 @@ type CrateDB struct { } func (c *CrateDB) Init(ctx context.Context) error { - table := `CREATE TABLE IF NOT EXISTS test ( - About STRING, - - Address OBJECT(STRICT) AS ( - City STRING, - Street STRING, - ZipCode INTEGER, - Coordinates OBJECT(STRICT) AS ( - Latitude REAL, - Longitude REAL - ) - ), - - Age INTEGER, - Balance REAL, - Company STRING, - Email STRING, - Friends OBJECT(STRICT) AS ( - Friend1 OBJECT(STRICT) AS ( - Name OBJECT(STRICT) AS ( - "First" STRING, - "Last" STRING - ), - Age SMALLINT - ), - Friend2 OBJECT(STRICT) AS ( - Name OBJECT(STRICT) AS ( - "First" STRING, - "Last" STRING - ), - Age SMALLINT - ), - Friend3 OBJECT(STRICT) AS ( - Name OBJECT(STRICT) AS ( - "First" STRING, - "Last" STRING - ), - Age SMALLINT - ), - Friend4 OBJECT(STRICT) AS ( - Name OBJECT(STRICT) AS ( - "First" STRING, - "Last" STRING - ), - Age SMALLINT - ), - Friend5 OBJECT(STRICT) AS ( - Name OBJECT(STRICT) AS ( - "First" STRING, - "Last" STRING - ), - Age SMALLINT - ) - ), - Greeting STRING, - Guid STRING, - IsActive BOOLEAN, - Name OBJECT(STRICT) AS ( - "First" STRING, - "Last" STRING - ), - Phone STRING, - Picture STRING, - Registered STRING, - Tags ARRAY(STRING), - event_time TIMESTAMP WITH TIME ZONE, - id STRING, - ts TIMESTAMP WITH TIME ZONE, - generator_identifier STRING - ); - ` + table := `CREATE TABLE IF NOT EXISTS test (doc OBJECT)` _, err := c.conn.Exec(ctx, table) if err != nil { @@ -129,135 +60,17 @@ func (c *CrateDB) Reset(ctx context.Context) error { func (c *CrateDB) SendDocument(docs []any) error { b := &pgx.Batch{} for _, doc := range docs { - insert := `INSERT INTO test ( - About, - Address, - Age, - Balance, - Company, - Email, - Friends, - Greeting, - Guid, - IsActive, - Name, - Phone, - Picture, - Registered, - Tags, - event_time, - id, - ts, - generator_identifier - ) VALUES ( - $1, - { - City = $2, - Coordinates = { - Latitude = $3::REAL, - Longitude = $4::REAL - }, - Street = $5, - ZipCode = $6::INTEGER - }, - $7::INTEGER, - $8::REAL, - $9, - $10, - { - Friend1 = { - Name = { - "First" = $11, - "Last" = $12 - }, - Age = $13::SMALLINT - }, - Friend2 = { - Name = { - "First" = $14, - "Last" = $15 - }, - Age = $16::SMALLINT - }, - Friend3 = { - Name = { - "First" = $17, - "Last" = $18 - }, - Age = $19::SMALLINT - }, - Friend4 = { - Name = { - "First" = $20, - "Last" = $21 - }, - Age = $22::SMALLINT - }, - Friend5 = { - Name = { - "First" = $23, - "Last" = $24 - }, - Age = $25::SMALLINT - } - }, - $26, - $27, - $28::BOOLEAN, - { - "First" = $29, - "Last" = $30 - }, - $31, - $32, - $33, - $34, - $35, - $36, - $37, - $38 - );` + insert := `INSERT INTO test (doc) VALUES ($1);` + + //doc.(ma)["event_time"] = doc.(ma)["_event_time"] + //doc.(ma)["ts"] = doc.(ma)["_ts"] + //doc.(ma)["id"] = doc.(ma)["_id"] - b.Queue(insert, - doc.(ma)["About"], - doc.(ma)["Address"].(ma)["City"], - doc.(ma)["Address"].(ma)["Coordinates"].(ma)["Latitude"], - doc.(ma)["Address"].(ma)["Coordinates"].(ma)["Longitude"], - doc.(ma)["Address"].(ma)["Street"], - doc.(ma)["Address"].(ma)["ZipCode"], - doc.(ma)["Age"], - doc.(ma)["Balance"], - doc.(ma)["Company"], - doc.(ma)["Email"], - doc.(ma)["Friends"].(ma)["Friend1"].(ma)["Name"].(ma)["First"], - doc.(ma)["Friends"].(ma)["Friend1"].(ma)["Name"].(ma)["Last"], - doc.(ma)["Friends"].(ma)["Friend1"].(ma)["Age"], - doc.(ma)["Friends"].(ma)["Friend2"].(ma)["Name"].(ma)["First"], - doc.(ma)["Friends"].(ma)["Friend2"].(ma)["Name"].(ma)["Last"], - doc.(ma)["Friends"].(ma)["Friend2"].(ma)["Age"], - doc.(ma)["Friends"].(ma)["Friend3"].(ma)["Name"].(ma)["First"], - doc.(ma)["Friends"].(ma)["Friend3"].(ma)["Name"].(ma)["Last"], - doc.(ma)["Friends"].(ma)["Friend3"].(ma)["Age"], - doc.(ma)["Friends"].(ma)["Friend4"].(ma)["Name"].(ma)["First"], - doc.(ma)["Friends"].(ma)["Friend4"].(ma)["Name"].(ma)["Last"], - doc.(ma)["Friends"].(ma)["Friend4"].(ma)["Age"], - doc.(ma)["Friends"].(ma)["Friend5"].(ma)["Name"].(ma)["First"], - doc.(ma)["Friends"].(ma)["Friend5"].(ma)["Name"].(ma)["Last"], - doc.(ma)["Friends"].(ma)["Friend5"].(ma)["Age"], - doc.(ma)["Greeting"], - doc.(ma)["Guid"], - doc.(ma)["IsActive"], - doc.(ma)["Name"].(ma)["First"], - doc.(ma)["Name"].(ma)["Last"], - doc.(ma)["Phone"], - doc.(ma)["Picture"], - doc.(ma)["Registered"], - doc.(ma)["Tags"], - time.UnixMicro(doc.(ma)["_event_time"].(int64)), - doc.(ma)["_id"], - time.UnixMicro(doc.(ma)["_ts"].(int64)), - doc.(ma)["generator_identifier"], - ) + data, err := json.Marshal(doc) + if err != nil { + return fmt.Errorf("cratedb:SendDocument: %v", err) + } + b.Queue(insert, data) } br := c.conn.SendBatch(context.Background(), b) @@ -280,10 +93,16 @@ func (c *CrateDB) SendPatch(docs []any) error { } func (c *CrateDB) GetLatestTimestamp() (time.Time, error) { - query := `SELECT MAX(event_time) FROM test.test;` + ctx := context.Background() + _, err := c.conn.Exec(ctx, `REFRESH TABLE test;`) + if err != nil { + return time.Time{}, fmt.Errorf("cratedb:GetLatestTimestamp: %v", err) + } + + query := `SELECT MAX(doc['_event_time'])::TIMESTAMP FROM test;` var ts time.Time - rows, err := c.conn.Query(context.Background(), query) + rows, err := c.conn.Query(ctx, query) if err != nil { return time.Time{}, fmt.Errorf("cratedb:GetLatestTimestamp: query; %v", err) } From a3f733243517590280c405273ee9279e4b6d467e Mon Sep 17 00:00:00 2001 From: widmogrod Date: Wed, 3 Jul 2024 17:36:06 +0200 Subject: [PATCH 16/16] simplify cratedb implementation even further --- generator/cratedb.go | 2 +- generator/cratedb_test.go | 7 +------ 2 files changed, 2 insertions(+), 7 deletions(-) diff --git a/generator/cratedb.go b/generator/cratedb.go index 3d4e8cf..ca69e26 100644 --- a/generator/cratedb.go +++ b/generator/cratedb.go @@ -117,7 +117,7 @@ func (c *CrateDB) GetLatestTimestamp() (time.Time, error) { tsv := ts.UnixMicro() - return time.Unix(tsv/1_000_000, (tsv%1_000_000)*1_000), nil + return time.Unix(0, tsv), nil } func (c *CrateDB) ConfigureDestination() error { diff --git a/generator/cratedb_test.go b/generator/cratedb_test.go index 4acda1f..8bb093c 100644 --- a/generator/cratedb_test.go +++ b/generator/cratedb_test.go @@ -97,18 +97,13 @@ And then run: maxTimestamp := findMaxTimestamp(docs) - // I don't know why, but without slip I get error - // > can't scan into dest[0]: cannot scan NULL into *time.Time - // It looks like table is not visible to be queried by driver imminently after insert, - // but 1s later everything works. This could be go driver or cratedb thingy? - time.Sleep(1 * time.Second) timestamp, err := c.GetLatestTimestamp() if err != nil { t.Fatalf("CrateDB.GetLatestTimestamp() error = %v", err) } delta := maxTimestamp - timestamp.UnixMicro() - epsilon := int64(1000) + epsilon := int64(100) if delta > epsilon { t.Errorf("CrateDB.GetLatestTimestamp() delta is %d, cannot be bigger than %d", delta, epsilon) }