Skip to content

Commit

Permalink
initial work
Browse files Browse the repository at this point in the history
  • Loading branch information
alekLukanen committed Jul 21, 2024
1 parent 7fedee2 commit 5c86b99
Show file tree
Hide file tree
Showing 28 changed files with 2,534 additions and 2 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*.out

# Dependency directories (remove the comment below to include it)
# vendor/
vendor/

# Go workspace file
go.work
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
# chapterhouseDB
A self-managed streaming data warehouse build on Parquet and DuckDB
A self-managed streaming data warehouse built on Parquet and DuckDB
13 changes: 13 additions & 0 deletions cmd/app/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package main

import (
"log/slog"
"os"
)

func main() {

logger := slog.New(slog.NewJSONHandler(os.Stdout, nil))
logger.Info("Running ChapterhouseDB")

}
153 changes: 153 additions & 0 deletions cmd/scratch/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
package main

import (
"context"
"log/slog"
"os"
"time"

"github.com/apache/arrow/go/v16/arrow"
"github.com/apache/arrow/go/v16/arrow/array"
"github.com/apache/arrow/go/v16/arrow/memory"

"github.com/alekLukanen/chapterhouseDB/elements"
"github.com/alekLukanen/chapterhouseDB/operations"
"github.com/alekLukanen/chapterhouseDB/partitionFuncs"
"github.com/alekLukanen/chapterhouseDB/storage"
)

func main() {
logger := slog.New(slog.NewJSONHandler(os.Stdout, nil))
logger.Info("Running ChapterhouseDB Scratch")

ctx := context.Background()

keyStorage, err := storage.NewKeyStorage(ctx, logger, storage.KeyStorageOptions{
Address: "localhost:6379",
Password: "",
KeyPrefix: "chapterhouseDB",
})
if err != nil {
logger.Error("unable to start storage", err)
return
}

tableRegistery := operations.NewTableRegistry(ctx, logger)

table1 := elements.NewTable("table1").
AddColumns(
elements.NewColumn("column1", &arrow.Int32Type{}),
elements.NewColumn("column2", &arrow.BooleanType{}),
elements.NewColumn("column3", &arrow.Float64Type{}),
).
AddColumnPartitions(
elements.NewColumnPartition(
"column1",
partitionFuncs.NewIntegerRangePartitionOptions(10, 10),
),
).
AddSubscriptionGroups(
elements.NewSubscriptionGroup(
"group1",
).
AddSubscriptions(
elements.NewExternalSubscription(
"externalTable1",
nil,
elements.NewColumn("column1", &arrow.Int32Type{}),
elements.NewColumn("column2", &arrow.BooleanType{}),
),
),
)

tableRegistery.AddTables(table1)

pool := memory.NewGoAllocator()
inserter := operations.NewInserter(
logger,
tableRegistery,
keyStorage,
pool,
operations.InserterOptions{PartitionLockDuration: 15 * time.Second},
)

schema := arrow.NewSchema(
[]arrow.Field{
{Name: "column1", Type: &arrow.Int32Type{}},
{Name: "column2", Type: &arrow.BooleanType{}},
}, nil,
)
recBuilder := array.NewRecordBuilder(pool, schema)
defer recBuilder.Release()

recBuilder.Field(0).(*array.Int32Builder).AppendValues([]int32{1, 2, 3, 4, 10, 20, 29, 35, 36, 37}, nil)
recBuilder.Field(1).(*array.BooleanBuilder).AppendValues([]bool{true, true, true, false, true, false, true, false, true, false}, nil)

rec := recBuilder.NewRecord()
defer rec.Release()

logger.Info("record schema", slog.Any("schema", rec.Schema().String()), slog.Int("numFields", rec.Schema().NumFields()))

err = inserter.InsertTuples(ctx, table1.TableName(), "external.externalTable1", rec)
if err != nil {
logger.Error("unable to insert tuples", slog.Any("err", err))
return
}

// Read partitions
////////////////////////////////////////////////////
/*
partitions, err := keyStorage.GetTablePartitions(ctx, "table1", 0, 100)
if err != nil {
logger.Error("unable to get partitions", slog.Any("error", err))
return
}
for _, part := range partitions {
logger.Info("partition", slog.String("key", part.Key))
items, err := keyStorage.GetTablePartitionItems(ctx, elements.Partition{TableName: "table1", Key: part.Key}, 100)
if err != nil {
logger.Error("unable to get items", err)
return
}
for idx, item := range items {
logger.Info("item in key storage", slog.Int("idx", idx), slog.String("item", item))
}
}
*/

// Read record from partition and claim the lock
//////////////////////////////////////////////////////

logger.Info("attempt to read at most 10 partitions")
for i := 0; i < 10; i++ {
logger.Info("attempt to read an entire record", slog.Int("attempt", i))

part, lock, record, err := inserter.GetPartition(ctx, "table1", 100, 5*time.Second)
if err != nil {
logger.Error("unable to read additional items", slog.Any("error", err))
break
}

_, err = keyStorage.DeleteTablePartitionTimestamp(ctx, part)
if err != nil {
logger.Error("unable to delete partition timestamp", slog.Any("error", err))
break
}

_, err = lock.UnlockContext(ctx)
if err != nil {
logger.Error("unable to unlock partition", slog.Any("error", err))
}

logger.Info("partition", slog.String("key", part.Key), slog.String("subscription", part.SubscriptionSourceName))
logger.Info("record", slog.Any("record", record))
logger.Info("record length", slog.Int64("length", record.NumRows()))
logger.Info("recrod schema", slog.Any("schema", record.Schema().String()), slog.Int("numFields", record.Schema().NumFields()))
}

logger.Info("ChapterhouseDB Scratch Complete")
}
16 changes: 16 additions & 0 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
version: '3.8'

services:
keydb:
image: eqalpha/keydb:latest
container_name: keydb
restart: always
ports:
- "6379:6379"
volumes:
- keydb-data:/data
command: ["keydb-server", "/etc/keydb/keydb.conf"]

volumes:
keydb-data:
driver: local
12 changes: 12 additions & 0 deletions elements/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package elements

import "errors"

var (
ErrTableAlreadyAddedToRegistry = errors.New("table already added to registry")
ErrTableNotFound = errors.New("table not found")
ErrSubscriptionNotFound = errors.New("subscription not found")
ErrColumnNotFound = errors.New("column not found")
ErrTupleColumnsDifferentThanSubscription = errors.New("tuple columns different than subscription")
ErrUnsupportedArrowToAvroTypeConversion = errors.New("unsupported arrow to avro type conversion")
)
12 changes: 12 additions & 0 deletions elements/funcs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package elements

import (
"context"

"github.com/apache/arrow/go/v16/arrow"
"github.com/apache/arrow/go/v16/arrow/memory"
)

type PartitionFunc func(*memory.GoAllocator, arrow.Record, string, IPartitionOptions) (arrow.Array, error)

type Transformer func(ctx context.Context, allocator memory.GoAllocator, tuples *arrow.Record) (*arrow.Record, error)
Loading

0 comments on commit 5c86b99

Please sign in to comment.