Skip to content

Commit

Permalink
Schema support (#1702)
Browse files Browse the repository at this point in the history
  • Loading branch information
lovromazgon authored Jul 15, 2024
1 parent c0f7e22 commit 930afae
Show file tree
Hide file tree
Showing 56 changed files with 1,010 additions and 3,279 deletions.
14 changes: 8 additions & 6 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,17 @@ require (
github.com/Masterminds/sprig/v3 v3.2.3
github.com/NYTimes/gziphandler v1.1.1
github.com/bufbuild/buf v1.34.0
github.com/conduitio/conduit-commons v0.2.0
github.com/conduitio/conduit-commons v0.2.1-0.20240708122218-5d1883981cfc
github.com/conduitio/conduit-connector-file v0.6.1-0.20240621111422-221c138201d3
github.com/conduitio/conduit-connector-generator v0.6.1-0.20240621111436-e9fa3464f7b2
github.com/conduitio/conduit-connector-kafka v0.8.1-0.20240621111431-87c01cf39a06
github.com/conduitio/conduit-connector-log v0.3.1-0.20240621111440-e2f0f04a35a4
github.com/conduitio/conduit-connector-postgres v0.7.6-0.20240630172132-84b5a6e6104f
github.com/conduitio/conduit-connector-protocol v0.6.1-0.20240619121958-1df466646d01
github.com/conduitio/conduit-connector-protocol v0.6.1-0.20240705154009-b938cfa7f251
github.com/conduitio/conduit-connector-s3 v0.5.2-0.20240630172807-e278fde1fb46
github.com/conduitio/conduit-connector-sdk v0.9.2-0.20240628152134-4cafa91a4ded
github.com/conduitio/conduit-connector-sdk v0.9.2-0.20240705162050-971c5f7facc2
github.com/conduitio/conduit-processor-sdk v0.1.2-0.20240516124003-442e4a3f0edd
github.com/conduitio/conduit-schema-registry v0.0.0-20240705193355-7e2064b44e0d
github.com/conduitio/yaml/v3 v3.3.0
github.com/dgraph-io/badger/v4 v4.2.0
github.com/dop251/goja v0.0.0-20231027120936-b396bb4c349d
Expand All @@ -29,14 +30,12 @@ require (
github.com/google/uuid v1.6.0
github.com/gorilla/websocket v1.5.3
github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0
github.com/hamba/avro/v2 v2.22.1
github.com/hashicorp/go-hclog v1.6.3
github.com/hashicorp/go-plugin v1.6.1
github.com/jackc/pgx/v5 v5.6.0
github.com/jpillora/backoff v1.0.0
github.com/lovromazgon/franz-go/pkg/sr v0.0.0-20230630140346-bb9ce3f90f4a
github.com/matryer/is v1.4.1
github.com/modern-go/reflect2 v1.0.2
github.com/neilotoole/slogt v1.1.0
github.com/peterbourgon/ff/v3 v3.4.0
github.com/piotrkowalczuk/promgrpc/v4 v4.1.3
github.com/prometheus/client_golang v1.19.1
Expand All @@ -45,6 +44,7 @@ require (
github.com/rs/zerolog v1.33.0
github.com/stealthrocket/wazergo v0.19.1
github.com/tetratelabs/wazero v1.7.3
github.com/twmb/franz-go/pkg/sr v1.0.0
github.com/twmb/go-cache v1.2.1
go.uber.org/mock v0.4.0
golang.org/x/exp v0.0.0-20240613232115-7f521ea00fb8
Expand Down Expand Up @@ -187,6 +187,7 @@ require (
github.com/gostaticanalysis/comment v1.4.2 // indirect
github.com/gostaticanalysis/forcetypeassert v0.1.0 // indirect
github.com/gostaticanalysis/nilerr v0.1.1 // indirect
github.com/hamba/avro/v2 v2.22.1 // indirect
github.com/hashicorp/go-version v1.7.0 // indirect
github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
Expand Down Expand Up @@ -240,6 +241,7 @@ require (
github.com/moby/docker-image-spec v1.3.1 // indirect
github.com/moby/term v0.5.0 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/moricho/tparallel v0.3.1 // indirect
github.com/morikuni/aec v1.0.0 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
Expand Down
20 changes: 12 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -211,8 +211,8 @@ github.com/ckaznocha/intrange v0.1.2/go.mod h1:RWffCw/vKBwHeOEwWdCikAtY0q4gGt8Vh
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
github.com/colinmarc/hdfs/v2 v2.1.1/go.mod h1:M3x+k8UKKmxtFu++uAZ0OtDU8jR3jnaZIAc6yK4Ue0c=
github.com/conduitio/conduit-commons v0.2.0 h1:TMpVGXi0Wski537qLAyQWdGjuGHEhaZxOS5L90pZJSQ=
github.com/conduitio/conduit-commons v0.2.0/go.mod h1:i7Q2jm7FBSi2zj1/4MCsFD1hIKAbvamlNtSQfkhUTiY=
github.com/conduitio/conduit-commons v0.2.1-0.20240708122218-5d1883981cfc h1:q2o3R/cXbQBUrJFBHV1YietwB275rBjkmPu1njzV4bk=
github.com/conduitio/conduit-commons v0.2.1-0.20240708122218-5d1883981cfc/go.mod h1:oF0KZc+TiVGBdKoMLLQ9Otb83Shq4CRZ2hClhNlFRio=
github.com/conduitio/conduit-connector-file v0.6.1-0.20240621111422-221c138201d3 h1:/mdy7vQzdfqDFLM13M39CYwI6Pk7xClMVZpGQW3+5DQ=
github.com/conduitio/conduit-connector-file v0.6.1-0.20240621111422-221c138201d3/go.mod h1:bCnmA+29l871cNhroZfiCS2O8+GhBNVECfL5DOof2ew=
github.com/conduitio/conduit-connector-generator v0.6.1-0.20240621111436-e9fa3464f7b2 h1:WMKvmvaE/E+03/0nz/2JpyelCd2nPtOTuBy3eyWcI58=
Expand All @@ -223,14 +223,16 @@ github.com/conduitio/conduit-connector-log v0.3.1-0.20240621111440-e2f0f04a35a4
github.com/conduitio/conduit-connector-log v0.3.1-0.20240621111440-e2f0f04a35a4/go.mod h1:6IkveRPUPJDCtdH6vXOW1T+B8Vj99OA+szybqYSnlyY=
github.com/conduitio/conduit-connector-postgres v0.7.6-0.20240630172132-84b5a6e6104f h1:p8CH8UlYkOSlqOREJtUW9eHm6fyn3M+5b0lUQByMVvg=
github.com/conduitio/conduit-connector-postgres v0.7.6-0.20240630172132-84b5a6e6104f/go.mod h1:2v+hTwyTZFjM9evlMv6Id9M/rVuCZgzUnA3szRnWOiI=
github.com/conduitio/conduit-connector-protocol v0.6.1-0.20240619121958-1df466646d01 h1:sZA0aZpZlleULAu+KQYL+WAapXdJNzV3XnSJmwAF0Mg=
github.com/conduitio/conduit-connector-protocol v0.6.1-0.20240619121958-1df466646d01/go.mod h1:3R3eUxN/Z3O3jR1TcfFb9zeGWpiDLvpSOlSWUVa8KsI=
github.com/conduitio/conduit-connector-protocol v0.6.1-0.20240705154009-b938cfa7f251 h1:X/dY6GJ8PxIDPgqpWO0bZqBoHrBUVA+8x//tO50PQMk=
github.com/conduitio/conduit-connector-protocol v0.6.1-0.20240705154009-b938cfa7f251/go.mod h1:LDGRw1uphxd0MNaF9NbLUbFwoJWS+GehsX4eQYau6f4=
github.com/conduitio/conduit-connector-s3 v0.5.2-0.20240630172807-e278fde1fb46 h1:tur/pSyX1RLzkxiBwhsV1qa6wP60pb20hJMptH5RRJY=
github.com/conduitio/conduit-connector-s3 v0.5.2-0.20240630172807-e278fde1fb46/go.mod h1:m+pf2cMF+qCwhMj9gUBV1BPGLPYauhtYkj2zFddfvdE=
github.com/conduitio/conduit-connector-sdk v0.9.2-0.20240628152134-4cafa91a4ded h1:qsc8PuCkWY5G4IeQW1fZQOVWUzB8DoVzFat41BPYf68=
github.com/conduitio/conduit-connector-sdk v0.9.2-0.20240628152134-4cafa91a4ded/go.mod h1:hCmuIMKtYqFnLZWNK343dtQEZJIp+wv/0Qck9N+q+oY=
github.com/conduitio/conduit-connector-sdk v0.9.2-0.20240705162050-971c5f7facc2 h1:L7RrVxEn7qlAzDKzSO+bu0N371MwVlwx3SOdnCiYSfU=
github.com/conduitio/conduit-connector-sdk v0.9.2-0.20240705162050-971c5f7facc2/go.mod h1:8gTzxxOZ8tRf7XvWxIDviLLcOPa3oXlVueleYYTZzNs=
github.com/conduitio/conduit-processor-sdk v0.1.2-0.20240516124003-442e4a3f0edd h1:R+tpcZKWOnr6LRsXr85C167SK9MhaLhYUEjBSUupU9Y=
github.com/conduitio/conduit-processor-sdk v0.1.2-0.20240516124003-442e4a3f0edd/go.mod h1:E9zqj0atY1+yBHWi4eZ3TagCZSBnFxBQBUcZktL6RFE=
github.com/conduitio/conduit-schema-registry v0.0.0-20240705193355-7e2064b44e0d h1:C6wRzdyqdQQCL/lCruAsH0j1JoN2GEZBQdDHoxA2B0o=
github.com/conduitio/conduit-schema-registry v0.0.0-20240705193355-7e2064b44e0d/go.mod h1:bw7SeE1nFhPIxDo5PcmjZySvh7f4iQkNWS/K5CBkGrg=
github.com/conduitio/yaml/v3 v3.3.0 h1:kbbaOSHcuH39gP4+rgbJGl6DSbLZcJgEaBvkEXJlCsI=
github.com/conduitio/yaml/v3 v3.3.0/go.mod h1:JNgFMOX1t8W4YJuRZOh6GggVtSMsgP9XgTw+7dIenpc=
github.com/containerd/log v0.1.0 h1:TCJt7ioM2cr/tfR8GPbGf9/VRAX8D2B4PjzCpfX540I=
Expand Down Expand Up @@ -592,8 +594,6 @@ github.com/ldez/tagliatelle v0.5.0/go.mod h1:rj1HmWiL1MiKQuOONhd09iySTEkUuE/8+5j
github.com/ledongthuc/pdf v0.0.0-20220302134840-0c2507a12d80/go.mod h1:imJHygn/1yfhB7XSJJKlFZKl/J+dCPAknuiaGOshXAs=
github.com/leonklingele/grouper v1.1.2 h1:o1ARBDLOmmasUaNDesWqWCIFH3u7hoFlM84YrjT3mIY=
github.com/leonklingele/grouper v1.1.2/go.mod h1:6D0M/HVkhs2yRKRFZUoGjeDy7EZTfFBE9gl4kjmIGkA=
github.com/lovromazgon/franz-go/pkg/sr v0.0.0-20230630140346-bb9ce3f90f4a h1:TrxQUmJBE1pZsnTW3rqG5Fsx3Xz0wGm5xgqLDV/mMGk=
github.com/lovromazgon/franz-go/pkg/sr v0.0.0-20230630140346-bb9ce3f90f4a/go.mod h1:iz9EnaFViALD6sVqxYHs8BPC0ZEQtfhTpN7SG5b0Nqo=
github.com/lufeee/execinquery v1.2.1 h1:hf0Ems4SHcUGBxpGN7Jz78z1ppVkP/837ZlETPCEtOM=
github.com/lufeee/execinquery v1.2.1/go.mod h1:EC7DrEKView09ocscGHC+apXMIaorh4xqSxS/dy8SbM=
github.com/macabu/inamedparam v0.1.3 h1:2tk/phHkMlEL/1GNe/Yf6kkR/hkcUdAEY3L0hjYV1Mk=
Expand Down Expand Up @@ -658,6 +658,8 @@ github.com/nakabonne/nestif v0.3.1/go.mod h1:9EtoZochLn5iUprVDmDjqGKPofoUEBL8U4N
github.com/ncruces/go-strftime v0.1.9 h1:bY0MQC28UADQmHmaF5dgpLmImcShSi2kHU9XLdhx/f4=
github.com/ncruces/go-strftime v0.1.9/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls=
github.com/ncw/swift v1.0.52/go.mod h1:23YIA4yWVnGwv2dQlN4bB7egfYX6YLn0Yo/S6zZO/ZM=
github.com/neilotoole/slogt v1.1.0 h1:c7qE92sq+V0yvCuaxph+RQ2jOKL61c4hqS1Bv9W7FZE=
github.com/neilotoole/slogt v1.1.0/go.mod h1:RCrGXkPc/hYybNulqQrMHRtvlQ7F6NktNVLuLwk6V+w=
github.com/nishanths/exhaustive v0.12.0 h1:vIY9sALmw6T/yxiASewa4TQcFsVYZQQRUQJhKRf3Swg=
github.com/nishanths/exhaustive v0.12.0/go.mod h1:mEZ95wPIZW+x8kC4TgC+9YCUgiST7ecevsVDTgc2obs=
github.com/nishanths/predeclared v0.2.2 h1:V2EPdZPliZymNAn79T8RkNApBjMmVKh5XRpLm/w98Vk=
Expand Down Expand Up @@ -847,6 +849,8 @@ github.com/twmb/franz-go/pkg/kadm v1.12.0 h1:I8P/gpXFzhl73QcAYmJu+1fOXvrynyH/MAo
github.com/twmb/franz-go/pkg/kadm v1.12.0/go.mod h1:VMvpfjz/szpH9WB+vGM+rteTzVv0djyHFimci9qm2C0=
github.com/twmb/franz-go/pkg/kmsg v1.8.0 h1:lAQB9Z3aMrIP9qF9288XcFf/ccaSxEitNA1CDTEIeTA=
github.com/twmb/franz-go/pkg/kmsg v1.8.0/go.mod h1:HzYEb8G3uu5XevZbtU0dVbkphaKTHk0X68N5ka4q6mU=
github.com/twmb/franz-go/pkg/sr v1.0.0 h1:4FUatTSTEuG2xievT0iDrgnpErgRg7kFLNioJYqfrqs=
github.com/twmb/franz-go/pkg/sr v1.0.0/go.mod h1:aUFRRLI5WYKpKzmWDztzZFecx5eOkCNuuamd91jUV5c=
github.com/twmb/go-cache v1.2.1 h1:yUkLutow4S2x5NMbqFW24o14OsucoFI5Fzmlb6uBinM=
github.com/twmb/go-cache v1.2.1/go.mod h1:lArg9KhCl+GTFMikitLGhIBh/i11OK0lhSveqlMbbrY=
github.com/ultraware/funlen v0.1.0 h1:BuqclbkY6pO+cvxoq7OsktIXZpgBSkYTQtmwhAK81vI=
Expand Down
98 changes: 64 additions & 34 deletions pkg/conduit/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package conduit
import (
"os"

sdk "github.com/conduitio/conduit-connector-sdk"
"github.com/conduitio/conduit/pkg/foundation/cerrors"
"github.com/conduitio/conduit/pkg/foundation/database"
"github.com/conduitio/conduit/pkg/foundation/log"
Expand All @@ -29,6 +30,9 @@ const (
DBTypePostgres = "postgres"
DBTypeInMemory = "inmemory"
DBTypeSQLite = "sqlite"

SchemaRegistryTypeConfluent = "confluent"
SchemaRegistryTypeBuiltin = "builtin"
)

// Config holds all configurable values for Conduit.
Expand Down Expand Up @@ -81,18 +85,26 @@ type Config struct {
ExitOnError bool
}

PluginDispenserFactories map[string]builtin.DispenserFactory
ConnectorPlugins map[string]sdk.Connector

dev struct {
cpuprofile string
memprofile string
blockprofile string
}

SchemaRegistry struct {
Type string

Confluent struct {
ConnectionString string
}
}
}

func DefaultConfig() Config {
var cfg Config
cfg.DB.Type = "badger"
cfg.DB.Type = DBTypeBadger
cfg.DB.Badger.Path = "conduit.db"
cfg.DB.Postgres.Table = "conduit_kv_store"
cfg.DB.SQLite.Path = "conduit.db"
Expand All @@ -105,18 +117,67 @@ func DefaultConfig() Config {
cfg.Connectors.Path = "./connectors"
cfg.Processors.Path = "./processors"
cfg.Pipelines.Path = "./pipelines"
cfg.SchemaRegistry.Type = SchemaRegistryTypeBuiltin

cfg.PluginDispenserFactories = builtin.DefaultDispenserFactories
cfg.ConnectorPlugins = builtin.DefaultBuiltinConnectors
return cfg
}

func (c Config) validateDBConfig() error {
if c.DB.Driver == nil {
switch c.DB.Type {
case DBTypeBadger:
if c.DB.Badger.Path == "" {
return requiredConfigFieldErr("db.badger.path")
}
case DBTypePostgres:
if c.DB.Postgres.ConnectionString == "" {
return requiredConfigFieldErr("db.postgres.connection-string")
}
if c.DB.Postgres.Table == "" {
return requiredConfigFieldErr("db.postgres.table")
}
case DBTypeInMemory:
// all good
case DBTypeSQLite:
if c.DB.SQLite.Path == "" {
return requiredConfigFieldErr("db.sqlite.path")
}
if c.DB.SQLite.Table == "" {
return requiredConfigFieldErr("db.sqlite.table")
}
default:
return invalidConfigFieldErr("db.type")
}
}
return nil
}

func (c Config) validateSchemaRegistryConfig() error {
switch c.SchemaRegistry.Type {
case SchemaRegistryTypeConfluent:
if c.SchemaRegistry.Confluent.ConnectionString == "" {
return requiredConfigFieldErr("schema-registry.confluent.connection-string")
}
case SchemaRegistryTypeBuiltin:
// all good
default:
return invalidConfigFieldErr("schema-registry.type")
}
return nil
}

func (c Config) Validate() error {
// TODO simplify validation with struct tags

if err := c.validateDBConfig(); err != nil {
return err
}

if err := c.validateSchemaRegistryConfig(); err != nil {
return err
}

if c.API.Enabled {
if c.API.GRPC.Address == "" {
return requiredConfigFieldErr("grpc.address")
Expand Down Expand Up @@ -154,37 +215,6 @@ func (c Config) Validate() error {
return nil
}

func (c Config) validateDBConfig() error {
if c.DB.Driver == nil {
switch c.DB.Type {
case DBTypeBadger:
if c.DB.Badger.Path == "" {
return requiredConfigFieldErr("db.badger.path")
}
case DBTypePostgres:
if c.DB.Postgres.ConnectionString == "" {
return requiredConfigFieldErr("db.postgres.connection-string")
}
if c.DB.Postgres.Table == "" {
return requiredConfigFieldErr("db.postgres.table")
}
case DBTypeInMemory:
// all good
case DBTypeSQLite:
if c.DB.SQLite.Path == "" {
return requiredConfigFieldErr("db.sqlite.path")
}
if c.DB.SQLite.Table == "" {
return requiredConfigFieldErr("db.sqlite.table")
}
default:
return invalidConfigFieldErr("db.type")
}
}

return nil
}

func invalidConfigFieldErr(name string) error {
return cerrors.Errorf("%q config value is invalid", name)
}
Expand Down
12 changes: 1 addition & 11 deletions pkg/conduit/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,18 +149,8 @@ func TestConfig_Validate(t *testing.T) {

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
var validConfig Config
validConfig.DB.Type = DBTypeInMemory
validConfig.DB.Badger.Path = "conduit.app"
validConfig.DB.Postgres.Table = "conduit_kv_store"
validConfig := DefaultConfig()
validConfig.DB.Postgres.ConnectionString = "postgres://user:pass@localhost:5432/mydb?sslmode=disable"
validConfig.API.Enabled = true
validConfig.API.HTTP.Address = ":8080"
validConfig.API.GRPC.Address = ":8084"
validConfig.Log.Level = "info"
validConfig.Log.Format = "cli"
validConfig.Pipelines.Path = "./pipelines"

underTest := tc.setupConfig(validConfig)
got := underTest.Validate()
if got == nil {
Expand Down
3 changes: 3 additions & 0 deletions pkg/conduit/entrypoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ func (*Entrypoint) Flags(cfg *Config) *flag.FlagSet {
flags.StringVar(&cfg.Pipelines.Path, "pipelines.path", cfg.Pipelines.Path, "path to the directory that has the yaml pipeline configuration files, or a single pipeline configuration file")
flags.BoolVar(&cfg.Pipelines.ExitOnError, "pipelines.exit-on-error", cfg.Pipelines.ExitOnError, "exit Conduit if a pipeline experiences an error while running")

flags.StringVar(&cfg.SchemaRegistry.Type, "schema-registry.type", cfg.SchemaRegistry.Type, "schema registry type; accepts builtin,confluent")
flags.StringVar(&cfg.SchemaRegistry.Confluent.ConnectionString, "schema-registry.confluent.connection-string", cfg.SchemaRegistry.Confluent.ConnectionString, "confluent schema registry connection string")

// NB: flags with prefix dev.* are hidden from help output by default, they only show up using '-dev -help'
showDevHelp := flags.Bool("dev", false, "used together with the dev flag it shows dev flags")
flags.StringVar(&cfg.dev.cpuprofile, "dev.cpuprofile", "", "write cpu profile to file")
Expand Down
Loading

0 comments on commit 930afae

Please sign in to comment.