Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Schema Evolution #125

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
188 changes: 111 additions & 77 deletions destination/format/csv.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import (
"bytes"
"context"
"crypto/sha256"
"encoding/csv"
"encoding/json"
"fmt"
Expand All @@ -27,6 +28,7 @@
sdk "github.com/conduitio/conduit-connector-sdk"
"github.com/go-errors/errors"
"github.com/hamba/avro/v2"
"golang.org/x/exp/maps"
)

// TODO: just create the table with the types on the left to make this simpler.
Expand Down Expand Up @@ -131,6 +133,13 @@
DeletedAtColumn string
}

type SchemaRecords struct {
Schema map[string]string
Records []*sdk.Record
ConnColumns ConnectorColumns
CsvColumnOrder []string
}

type AvroRecordSchema struct {
Name string `json:"name"`
Type string `json:"type"`
Expand All @@ -140,102 +149,117 @@
} `json:"fields"`
}

func GetDataSchema(
func GetDataSchemas(
ctx context.Context,
records []sdk.Record,
schema map[string]string,
prefix string,
) ([]string, *ConnectorColumns, error) {
// we need to store the operation in a column, to detect updates & deletes
connectorColumns := ConnectorColumns{
OperationColumn: fmt.Sprintf("%s_operation", prefix),
CreatedAtColumn: fmt.Sprintf("%s_created_at", prefix),
UpdatedAtColumn: fmt.Sprintf("%s_updated_at", prefix),
DeletedAtColumn: fmt.Sprintf("%s_deleted_at", prefix),
) ([]SchemaRecords, error) {
if len(records) == 0 {
return nil, errors.New("unexpected empty slice of records")
}

schema[connectorColumns.OperationColumn] = SnowflakeVarchar
schema[connectorColumns.CreatedAtColumn] = SnowflakeTimestampTZ
schema[connectorColumns.UpdatedAtColumn] = SnowflakeTimestampTZ
schema[connectorColumns.DeletedAtColumn] = SnowflakeTimestampTZ
schemaCache := map[string]SchemaRecords{}

csvColumnOrder := []string{}
for _, r := range records {
// we need to store the operation in a column, to detect updates & deletes
connectorColumns := ConnectorColumns{
OperationColumn: fmt.Sprintf("%s_operation", prefix),
CreatedAtColumn: fmt.Sprintf("%s_created_at", prefix),
UpdatedAtColumn: fmt.Sprintf("%s_updated_at", prefix),
DeletedAtColumn: fmt.Sprintf("%s_deleted_at", prefix),
}

// TODO: see whether we need to support a compound key here
// TODO: what if the key field changes? e.g. from `id` to `name`? we need to think about this
schema := map[string]string{}

// Grab the schema from the first record.
// TODO: support schema evolution.
if len(records) == 0 {
return nil, nil, errors.New("unexpected empty slice of records")
}
schema[connectorColumns.OperationColumn] = SnowflakeVarchar
schema[connectorColumns.CreatedAtColumn] = SnowflakeTimestampTZ
schema[connectorColumns.UpdatedAtColumn] = SnowflakeTimestampTZ
schema[connectorColumns.DeletedAtColumn] = SnowflakeTimestampTZ

r := records[0]
data, err := extractPayload(r.Operation, r.Payload)
if err != nil {
return nil, nil, errors.Errorf("failed to extract payload data: %w", err)
}
csvColumnOrder := []string{}

// TODO: see whether we need to support a compound key here
// TODO: what if the key field changes? e.g. from `id` to `name`? we need to think about this

avroStr, okAvro := r.Metadata["postgres.avro.schema"]
// if we have an avro schema in the metadata, interpret the schema from it
if okAvro {
sdk.Logger(ctx).Debug().Msgf("avro schema string: %s", avroStr)
avroSchema, err := avro.Parse(avroStr)
data, err := extractPayload(r.Operation, r.Payload)
if err != nil {
return nil, nil, errors.Errorf("could not parse avro schema: %w", err)
return nil, errors.Errorf("failed to extract payload data: %w", err)
}
avroRecordSchema, ok := avroSchema.(*avro.RecordSchema)
if !ok {
return nil, nil, errors.New("could not coerce avro schema into recordSchema")
}
for _, field := range avroRecordSchema.Fields() {
csvColumnOrder = append(csvColumnOrder, field.Name())
schema[field.Name()], err = mapAvroToSnowflake(ctx, field)

avroStr, okAvro := r.Metadata["postgres.avro.schema"]
// if we have an avro schema in the metadata, interpret the schema from it
if okAvro {
sdk.Logger(ctx).Debug().Msgf("avro schema string: %s", avroStr)
avroSchema, err := avro.Parse(avroStr)
if err != nil {
return nil, nil, fmt.Errorf("failed to map avro field %s: %w", field.Name(), err)
return nil, errors.Errorf("could not parse avro schema: %w", err)
}
}
} else {
// TODO (BEFORE MERGE): move to function
for key, val := range data {
if schema[key] == "" {
csvColumnOrder = append(csvColumnOrder, key)
switch val.(type) {
case int, int8, int16, int32, int64:
schema[key] = SnowflakeInteger
case float32, float64:
schema[key] = SnowflakeFloat
case bool:
schema[key] = SnowflakeBoolean
case time.Time, *time.Time:
schema[key] = SnowflakeTimestampTZ
case nil:
// WE SHOULD KEEP TRACK OF VARIANTS SEPERATELY IN CASE WE RUN INTO CONCRETE TYPE LATER ON
// IF WE RAN INTO NONE NULL VALUE OF THIS VARIANT COL, WE CAN EXECUTE AN ALTER TO DEST TABLE
schema[key] = SnowflakeVariant
default:
schema[key] = SnowflakeVarchar
avroRecordSchema, ok := avroSchema.(*avro.RecordSchema)
if !ok {
return nil, errors.New("could not coerce avro schema into recordSchema")
}
for _, field := range avroRecordSchema.Fields() {
csvColumnOrder = append(csvColumnOrder, field.Name())
schema[field.Name()], err = mapAvroToSnowflake(ctx, field)
if err != nil {
return nil, fmt.Errorf("failed to map avro field %s: %w", field.Name(), err)
}
}
} else {
// TODO (BEFORE MERGE): move to function
for key, val := range data {
if schema[key] == "" {
csvColumnOrder = append(csvColumnOrder, key)
switch val.(type) {
case int, int8, int16, int32, int64:
schema[key] = SnowflakeInteger
case float32, float64:
schema[key] = SnowflakeFloat
case bool:
schema[key] = SnowflakeBoolean
case time.Time, *time.Time:
schema[key] = SnowflakeTimestampTZ
case nil:
// WE SHOULD KEEP TRACK OF VARIANTS SEPERATELY IN CASE WE RUN INTO CONCRETE TYPE LATER ON
// IF WE RAN INTO NONE NULL VALUE OF THIS VARIANT COL, WE CAN EXECUTE AN ALTER TO DEST TABLE
schema[key] = SnowflakeVariant
default:
schema[key] = SnowflakeVarchar
}
}
}
}

// sort data column order alphabetically to make deterministic
// but keep conduit connector columns at the front for ease of use
sort.Strings(csvColumnOrder)
csvColumnOrder = append(
[]string{
connectorColumns.OperationColumn,
connectorColumns.CreatedAtColumn,
connectorColumns.UpdatedAtColumn,
connectorColumns.DeletedAtColumn,
},
csvColumnOrder...,
)

// if we have detected this schema before, simply add it to the set
hash := schemaHash(schema)
if sr, ok := schemaCache[hash]; ok {
sr.Records = append(sr.Records, &r)
} else {
schemaCache[hash] = SchemaRecords{
Schema: schema,
Records: []*sdk.Record{&r},
CsvColumnOrder: csvColumnOrder,
ConnColumns: connectorColumns,
}
}

sdk.Logger(ctx).Debug().Msgf("schema detected: %+v", schema)
}

// sort data column order alphabetically to make deterministic
// but keep conduit connector columns at the front for ease of use
sort.Strings(csvColumnOrder)
csvColumnOrder = append(
[]string{
connectorColumns.OperationColumn,
connectorColumns.CreatedAtColumn,
connectorColumns.UpdatedAtColumn,
connectorColumns.DeletedAtColumn,
},
csvColumnOrder...,
)

sdk.Logger(ctx).Debug().Msgf("schema detected: %+v", schema)

return csvColumnOrder, &connectorColumns, nil
return maps.Values(schemaCache), nil
}

// TODO: refactor this function, make it more modular and readable.
Expand Down Expand Up @@ -558,3 +582,13 @@

return "", fmt.Errorf("could not find snowflake mapping for avro type %s", field.Name())
}

func schemaHash(s map[string]string) string {
hasher := sha256.New()

for key, value := range s {

Check failure on line 589 in destination/format/csv.go

View workflow job for this annotation

GitHub Actions / build

key declared and not used

Check failure on line 589 in destination/format/csv.go

View workflow job for this annotation

GitHub Actions / build

value declared and not used

Check failure on line 589 in destination/format/csv.go

View workflow job for this annotation

GitHub Actions / golangci-lint

key declared and not used

Check failure on line 589 in destination/format/csv.go

View workflow job for this annotation

GitHub Actions / golangci-lint

value declared and not used

Check failure on line 589 in destination/format/csv.go

View workflow job for this annotation

GitHub Actions / golangci-lint

key declared and not used

Check failure on line 589 in destination/format/csv.go

View workflow job for this annotation

GitHub Actions / golangci-lint

value declared and not used
fmt.Fprintf(hasher, "%s:%s,", k, v)

Check failure on line 590 in destination/format/csv.go

View workflow job for this annotation

GitHub Actions / build

undefined: k

Check failure on line 590 in destination/format/csv.go

View workflow job for this annotation

GitHub Actions / build

undefined: v

Check failure on line 590 in destination/format/csv.go

View workflow job for this annotation

GitHub Actions / golangci-lint

undefined: k

Check failure on line 590 in destination/format/csv.go

View workflow job for this annotation

GitHub Actions / golangci-lint

undefined: v) (typecheck)

Check failure on line 590 in destination/format/csv.go

View workflow job for this annotation

GitHub Actions / golangci-lint

undefined: k

Check failure on line 590 in destination/format/csv.go

View workflow job for this annotation

GitHub Actions / golangci-lint

undefined: v (typecheck)
}

return fmt.Sprintf("%x", hasher.Sum(nil))
}
33 changes: 12 additions & 21 deletions destination/writer/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
"time"

"github.com/conduitio-labs/conduit-connector-snowflake/destination/compress"
"github.com/conduitio-labs/conduit-connector-snowflake/destination/format"

Check failure on line 27 in destination/writer/writer.go

View workflow job for this annotation

GitHub Actions / golangci-lint

could not import github.com/conduitio-labs/conduit-connector-snowflake/destination/format (-: # github.com/conduitio-labs/conduit-connector-snowflake/destination/format
"github.com/conduitio-labs/conduit-connector-snowflake/destination/schema"
sdk "github.com/conduitio/conduit-connector-sdk"
"github.com/go-errors/errors"
Expand Down Expand Up @@ -163,10 +163,18 @@
// assign request id to the write cycle
ctx = withRequestID(ctx)

// if s.schema == nil {
// if err := s.initSchema(ctx, records); err != nil {
// return 0, errors.Errorf("failed to initialize schema from records: %w", err)
// }
// extract schemas from record
schemaRecords, err := format.GetDataSchemas(ctx, records, s.Prefix)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here is the main change so far

if err != nil {
return 0, errors.Errorf("failed to convert records to CSV: %w", err)
}

// Samir Ketema:
// now that we have grouped schemas, we could make the calls out to the evolver.Migrate() below,
// but we need to be careful about _when_ we do this.
// in hindsight, I could have also detected some ordering in these records, to help with getting the initial batch in before the change.
// however, we need to be careful about making sure this is all transactional as far as snowflake is concerned..
// and I'm not too sure about whether that's possible in Snowflake / the implications.

// // N.B. Disable until table is created by the migrator
// //
Expand All @@ -175,23 +183,6 @@
// // return 0, errors.Errorf("failed to evolve schema during boot: %w", err)
// // }

// sdk.Logger(ctx).Debug().
// // Bool("success", migrated).
// Msg("schema initialized and migration completed")
// }

// log first record temporarily for debugging
sdk.Logger(ctx).Debug().Msgf("payload=%+v", records[0].Payload)
sdk.Logger(ctx).Debug().Msgf("payload.before=%+v", records[0].Payload.Before)
sdk.Logger(ctx).Debug().Msgf("payload.after=%+v", records[0].Payload.After)
sdk.Logger(ctx).Debug().Msgf("key=%+v", records[0].Key)
// extract schema from payload
schema := make(map[string]string)
csvColumnOrder, meroxaColumns, err := format.GetDataSchema(ctx, records, schema, s.Prefix)
if err != nil {
return 0, errors.Errorf("failed to convert records to CSV: %w", err)
}

// check if table already exists on snowflake, if yes, compare schema
err = s.CheckTable(ctx, records[0].Operation, s.PrimaryKey, schema)
if err != nil {
Expand Down
Loading