Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Schemas] Connector SDK: Create and get a schema from Conduit #96

Merged
merged 40 commits into from
Jul 2, 2024
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
c042fc0
[Schemas] Connector SDK: Create and get a schema from Conduit
hariso May 31, 2024
2fadefb
Merge branch 'schema-support' into haris/schema-support-connector-sdk
hariso May 31, 2024
221949a
remove replace
hariso Jun 10, 2024
c0b5b4d
change port
hariso Jun 11, 2024
36454fc
add mocks
hariso Jun 11, 2024
5ede4ee
Get by name and version
hariso Jun 12, 2024
acf859a
update commons
hariso Jun 13, 2024
6c1f722
add tests
hariso Jun 13, 2024
19bbc11
a couple of tests, move around
hariso Jun 13, 2024
ab9dc45
update commons
hariso Jun 13, 2024
0a29f5c
move around
hariso Jun 13, 2024
0a9b1b5
update commons
hariso Jun 19, 2024
35b5297
fix compilation
hariso Jun 19, 2024
c0c7d14
update commons
hariso Jun 19, 2024
2476e66
fix compilation
hariso Jun 19, 2024
b6ace30
update commons
hariso Jun 21, 2024
9df2b7f
update commons
hariso Jun 21, 2024
3e374e4
fix
hariso Jun 21, 2024
636d910
add in memory schema service
hariso Jun 24, 2024
175b589
simplify
hariso Jun 24, 2024
1e9d356
comments
hariso Jun 24, 2024
fbb27f5
rename name to subject
hariso Jun 24, 2024
469f0be
rename package to pschema
hariso Jun 25, 2024
da1f64d
typed mock
hariso Jun 25, 2024
d6381d2
update commons
hariso Jun 25, 2024
9c87126
Merge branch 'schema-support' into haris/schema-support-connector-sdk
hariso Jun 25, 2024
d24ebd2
go mod tidy
hariso Jun 25, 2024
f28a6a9
update commons use schema
raulb Jun 28, 2024
56a0ed7
update to latest commons
raulb Jul 1, 2024
68e3b92
simplify proto
raulb Jul 1, 2024
23c5660
unwrap error and create internal/errors.go
raulb Jul 2, 2024
74adabd
provide target to client
raulb Jul 2, 2024
4513705
update proto schema
raulb Jul 2, 2024
fab5aa6
update schema type
raulb Jul 2, 2024
5538273
remove dependency
raulb Jul 2, 2024
9e93075
update grpc_client
raulb Jul 2, 2024
3530789
update service schema methods
raulb Jul 2, 2024
6e40b79
update naming to follow convention
raulb Jul 2, 2024
3d6f82b
refactor leftover
raulb Jul 2, 2024
2cd8c41
Schema support proposed changes (#105)
lovromazgon Jul 2, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ proto-generate:

.PHONY: proto-update
proto-update:
cd proto && buf mod update
cd proto && buf dep update

.PHONY: proto-lint
proto-lint:
Expand Down
21 changes: 21 additions & 0 deletions conduit/pschema/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright © 2024 Meroxa, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package pschema

import "errors"

var (
ErrSchemaNotFound = errors.New("schema not found")
)
68 changes: 68 additions & 0 deletions conduit/pschema/in_memory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Copyright © 2024 Meroxa, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package pschema

import (
"context"
"fmt"
"sync"

"github.com/conduitio/conduit-commons/schema"

Check failure on line 22 in conduit/pschema/in_memory.go

View workflow job for this annotation

GitHub Actions / test

github.com/conduitio/[email protected]: replacement directory ../conduit-commons does not exist
)

type inMemoryService struct {
// schemas is a map of schema subjects to all the versions of that schema
// versioning starts at 1, newer versions are appended to the end of the versions slice.
schemas map[string][]schema.Instance
// m guards access to schemas
m sync.Mutex
}

func NewInMemoryService() Service {
return &inMemoryService{
schemas: make(map[string][]schema.Instance),
}
}

func (s *inMemoryService) Create(_ context.Context, request CreateRequest) (CreateResponse, error) {
s.m.Lock()
defer s.m.Unlock()

inst := schema.Instance{
Subject: request.Subject,
Version: len(s.schemas[request.Subject]) + 1,
Type: schema.Type(request.Type),
Bytes: request.Bytes,
}
s.schemas[request.Subject] = append(s.schemas[request.Subject], inst)

return CreateResponse{Instance: inst}, nil
}

func (s *inMemoryService) Get(_ context.Context, request GetRequest) (GetResponse, error) {
s.m.Lock()
defer s.m.Unlock()

versions, ok := s.schemas[request.Subject]
if !ok {
return GetResponse{}, fmt.Errorf("name %v: %w", request.Subject, ErrSchemaNotFound)
raulb marked this conversation as resolved.
Show resolved Hide resolved
}

if len(versions) < request.Version {
return GetResponse{}, fmt.Errorf("version %v: %w", request.Version, ErrSchemaNotFound)
}

return GetResponse{Instance: versions[request.Version-1]}, nil
}
72 changes: 72 additions & 0 deletions conduit/pschema/in_memory_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Copyright © 2024 Meroxa, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package pschema

import (
"context"
"testing"

"github.com/conduitio/conduit-commons/schema"
"github.com/google/go-cmp/cmp"
"github.com/matryer/is"
)

func TestInMemoryService(t *testing.T) {
is := is.New(t)
ctx := context.Background()

underTest := NewInMemoryService()

want1 := schema.Instance{
Subject: "test-subject",
Version: 1,
Type: schema.TypeAvro,
Bytes: []byte("irrelevant 1"),
}

// Create first version
got1, err := underTest.Create(ctx, CreateRequest{
Subject: want1.Subject,
Type: Type(want1.Type),
Bytes: want1.Bytes,
})
is.NoErr(err)
is.Equal("", cmp.Diff(want1, got1.Instance))

// Create second version
want2 := schema.Instance{
Subject: want1.Subject,
Version: 2,
Type: want1.Type,
Bytes: []byte("irrelevant 2"),
}
got2, err := underTest.Create(ctx, CreateRequest{
Subject: want2.Subject,
Type: Type(want2.Type),
Bytes: want2.Bytes,
})
is.NoErr(err)
is.Equal("", cmp.Diff(want2, got2.Instance))

// Get first version
getResp1, err := underTest.Get(ctx, GetRequest{Subject: want1.Subject, Version: 1})
is.NoErr(err)
is.Equal("", cmp.Diff(want1, getResp1.Instance))

// Get second version
getResp2, err := underTest.Get(ctx, GetRequest{Subject: want2.Subject, Version: 2})
is.NoErr(err)
is.Equal("", cmp.Diff(want2, getResp2.Instance))
}
119 changes: 119 additions & 0 deletions conduit/pschema/mock/pschema.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 8 additions & 5 deletions conduit/schema/schema.go → conduit/pschema/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package schema
//go:generate mockgen -typed -destination=mock/pschema.go -package=mock -mock_names=Service=Service . Service

package pschema

import (
"context"
Expand All @@ -32,16 +34,17 @@ const (
)

type CreateRequest struct {
Name string
Type Type
Bytes []byte
Subject string
Type Type
raulb marked this conversation as resolved.
Show resolved Hide resolved
Bytes []byte
}
type CreateResponse struct {
schema.Instance
}

type GetRequest struct {
ID string
Subject string
Version int
}
type GetResponse struct {
schema.Instance
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ package fromproto

import (
cschema "github.com/conduitio/conduit-commons/schema"
"github.com/conduitio/conduit-connector-protocol/conduit/schema"
"github.com/conduitio/conduit-connector-protocol/conduit/pschema"
conduitv1 "github.com/conduitio/conduit-connector-protocol/proto/conduit/v1"
)

Expand All @@ -27,23 +27,33 @@ func _() {
_ = cTypes[int(cschema.TypeAvro)-int(conduitv1.Schema_TYPE_AVRO)]

// Compatibility between the protocol's schema.Type and the Protobuf schema type
_ = cTypes[int(schema.TypeAvro)-int(conduitv1.Schema_TYPE_AVRO)]
_ = cTypes[int(pschema.TypeAvro)-int(conduitv1.Schema_TYPE_AVRO)]
}

func CreateRequest(req *conduitv1.CreateSchemaRequest) schema.CreateRequest {
return schema.CreateRequest{
Name: req.Name,
Type: schema.Type(req.Type),
Bytes: req.Bytes,
func CreateRequest(req *conduitv1.CreateSchemaRequest) pschema.CreateRequest {
return pschema.CreateRequest{
Subject: req.Subject,
Type: pschema.Type(req.Type),
Bytes: req.Bytes,
}
}

func CreateResponse(resp *conduitv1.CreateSchemaResponse) schema.CreateResponse {
return schema.CreateResponse{
func CreateResponse(resp *conduitv1.CreateSchemaResponse) pschema.CreateResponse {
return pschema.CreateResponse{
Instance: cschema.Instance{
ID: resp.Schema.Id,
Name: resp.Schema.Name,
Version: resp.Schema.Version,
Subject: resp.Schema.Subject,
Version: int(resp.Schema.Version),
Type: cschema.Type(resp.Schema.Type),
Bytes: resp.Schema.Bytes,
},
}
}

func GetResponse(resp *conduitv1.GetSchemaResponse) pschema.GetResponse {
return pschema.GetResponse{
Instance: cschema.Instance{
Subject: resp.Schema.Subject,
Version: int(resp.Schema.Version),
Type: cschema.Type(resp.Schema.Type),
Bytes: resp.Schema.Bytes,
},
raulb marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
Loading
Loading