Skip to content

Commit

Permalink
Add preallocations, add constructor with fuctional options (#103)
Browse files Browse the repository at this point in the history
* Add new constructor

* Update example

---------

Co-authored-by: Maarten van der Heijden <[email protected]>
  • Loading branch information
survivorbat and Maarten van der Heijden authored Jun 22, 2024
1 parent baa74d8 commit 38c1384
Show file tree
Hide file tree
Showing 4 changed files with 132 additions and 30 deletions.
4 changes: 2 additions & 2 deletions EXAMPLES_AVRO.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func main() {
}()

// 2) Fetch the latest version of the schema, or create a new one if it is the first
schemaRegistryClient := srclient.CreateSchemaRegistryClient("http://localhost:8081")
schemaRegistryClient := srclient.NewSchemaRegistryClient("http://localhost:8081")
schema, err := schemaRegistryClient.GetLatestSchema(topic)
if schema == nil {
schemaBytes, _ := ioutil.ReadFile("complexType.avsc")
Expand Down Expand Up @@ -105,7 +105,7 @@ func main() {
c.SubscribeTopics([]string{"myTopic", "^aRegex.*[Tt]opic"}, nil)

// 2) Create a instance of the client to retrieve the schemas for each message
schemaRegistryClient := srclient.CreateSchemaRegistryClient("http://localhost:8081")
schemaRegistryClient := srclient.NewSchemaRegistryClient("http://localhost:8081")

for {
msg, err := c.ReadMessage(-1)
Expand Down
13 changes: 10 additions & 3 deletions mockSchemaRegistryClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,10 +163,12 @@ func (mck *MockSchemaRegistryClient) GetSchemaByVersion(subject string, version

// GetSubjects Returns all registered subjects
func (mck *MockSchemaRegistryClient) GetSubjects() ([]string, error) {
var allSubjects []string
allSubjects := make([]string, len(mck.schemaVersions))

var count int
for subject := range mck.schemaVersions {
allSubjects = append(allSubjects, subject)
allSubjects[count] = subject
count++
}

return allSubjects, nil
Expand Down Expand Up @@ -321,8 +323,13 @@ func (mck *MockSchemaRegistryClient) allVersions(subject string) []int {
result, ok := mck.schemaVersions[subject]

if ok {
versions = make([]int, len(result))

var count int

for version := range result {
versions = append(versions, version)
versions[count] = version
count++
}
}

Expand Down
86 changes: 65 additions & 21 deletions schemaRegistryClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ import (
"golang.org/x/sync/semaphore"
)

const defaultSemaphoreWeight int64 = 16
const defaultTimeout = 5 * time.Second

// ISchemaRegistryClient provides the
// definition of the operations that
// this Schema Registry client provides.
Expand Down Expand Up @@ -102,7 +105,7 @@ func (s CompatibilityLevel) String() string {
return string(s)
}

// Schema references use the import statement of Protobuf and
// Reference references use the import statement of Protobuf and
// the $ref field of JSON Schema. They are defined by the name
// of the import or $ref and the associated subject in the registry.
type Reference struct {
Expand Down Expand Up @@ -174,37 +177,77 @@ const (
contentType = "application/vnd.schemaregistry.v1+json"
)

// CreateSchemaRegistryClient creates a client that allows
// schemaRegistryConfig is used in NewSchemaRegistryClient and is configured through Option
type schemaRegistryConfig struct {
client *http.Client
semaphoreWeight int64
}

// Option serves as an input for NewSchemaRegistryClient
type Option func(*schemaRegistryConfig)

// WithClient is used in NewSchemaRegistryClient to override the default client
func WithClient(client *http.Client) Option {
return func(registryConfig *schemaRegistryConfig) {
registryConfig.client = client
}
}

// WithSemaphoreWeight is used in NewSchemaRegistryClient to override the default semaphoreWeight
func WithSemaphoreWeight(semaphoreWeight int64) Option {
return func(registryConfig *schemaRegistryConfig) {
registryConfig.semaphoreWeight = semaphoreWeight
}
}

// NewSchemaRegistryClient creates a client that allows
// interactions with Schema Registry over HTTP. Applications
// using this client can retrieve data about schemas, which
// in turn can be used to serialize and deserialize records.
func CreateSchemaRegistryClient(schemaRegistryURL string) *SchemaRegistryClient {
return CreateSchemaRegistryClientWithOptions(schemaRegistryURL, &http.Client{Timeout: 5 * time.Second}, 16)
}
func NewSchemaRegistryClient(schemaRegistryURL string, options ...Option) *SchemaRegistryClient {
config := &schemaRegistryConfig{
client: &http.Client{Timeout: defaultTimeout},
semaphoreWeight: defaultSemaphoreWeight,
}

for _, option := range options {
option(config)
}

// CreateSchemaRegistryClientWithOptions provides the ability to pass the http.Client to be used, as well as the semaphoreWeight for concurrent requests
func CreateSchemaRegistryClientWithOptions(schemaRegistryURL string, client *http.Client, semaphoreWeight int) *SchemaRegistryClient {
return &SchemaRegistryClient{
schemaRegistryURL: schemaRegistryURL,
httpClient: client,
httpClient: config.client,
cachingEnabled: true,
codecCreationEnabled: false,
idSchemaCache: make(map[int]*Schema),
subjectSchemaCache: make(map[string]*Schema),
sem: semaphore.NewWeighted(int64(semaphoreWeight)),
sem: semaphore.NewWeighted(config.semaphoreWeight),
}
}

// CreateSchemaRegistryClient creates a client that allows
// interactions with Schema Registry over HTTP. Applications
// using this client can retrieve data about schemas, which
// in turn can be used to serialize and deserialize records.
// Deprecated: Prefer NewSchemaRegistryClient(schemaRegistryURL)
func CreateSchemaRegistryClient(schemaRegistryURL string) *SchemaRegistryClient {
return NewSchemaRegistryClient(schemaRegistryURL)
}

// CreateSchemaRegistryClientWithOptions provides the ability to pass the http.Client to be used, as well as the semaphoreWeight for concurrent requests
// Deprecated: Prefer NewSchemaRegistryClient(schemaRegistryURL, WithClient(*http.Client), WithSemaphoreWeight(int64))
func CreateSchemaRegistryClientWithOptions(schemaRegistryURL string, client *http.Client, semaphoreWeight int) *SchemaRegistryClient {
return NewSchemaRegistryClient(schemaRegistryURL, WithClient(client), WithSemaphoreWeight(int64(semaphoreWeight)))
}

// ResetCache resets the schema caches to be able to get updated schemas.
func (client *SchemaRegistryClient) ResetCache() {

client.idSchemaCacheLock.Lock()
client.subjectSchemaCacheLock.Lock()
client.idSchemaCache = make(map[int]*Schema)
client.subjectSchemaCache = make(map[string]*Schema)
client.idSchemaCacheLock.Unlock()
client.subjectSchemaCacheLock.Unlock()

}

// GetSchema gets the schema associated with the given id.
Expand All @@ -225,17 +268,18 @@ func (client *SchemaRegistryClient) GetSchema(schemaID int) (*Schema, error) {
}

var schemaResp = new(schemaResponse)
err = json.Unmarshal(resp, &schemaResp)
if err != nil {
if err := json.Unmarshal(resp, &schemaResp); err != nil {
return nil, err
}

var codec *goavro.Codec
if client.getCodecCreationEnabled() {
codec, err = goavro.NewCodec(schemaResp.Schema)
if err != nil {
return nil, err
}
}

var schema = &Schema{
id: schemaID,
schema: schemaResp.Schema,
Expand Down Expand Up @@ -324,8 +368,7 @@ func (client *SchemaRegistryClient) GetCompatibilityLevel(subject string, defaul
}

var configResponse = new(configResponse)
err = json.Unmarshal(resp, &configResponse)
if err != nil {
if err := json.Unmarshal(resp, &configResponse); err != nil {
return nil, err
}

Expand All @@ -338,11 +381,12 @@ func (client *SchemaRegistryClient) GetSubjects() ([]string, error) {
if err != nil {
return nil, err
}
var allSubjects = []string{}
err = json.Unmarshal(resp, &allSubjects)
if err != nil {

var allSubjects []string
if err = json.Unmarshal(resp, &allSubjects); err != nil {
return nil, err
}

return allSubjects, nil
}

Expand All @@ -352,11 +396,11 @@ func (client *SchemaRegistryClient) GetSubjectsIncludingDeleted() ([]string, err
if err != nil {
return nil, err
}
var allSubjects = []string{}
err = json.Unmarshal(resp, &allSubjects)
if err != nil {
var allSubjects []string
if err = json.Unmarshal(resp, &allSubjects); err != nil {
return nil, err
}

return allSubjects, nil
}

Expand Down
59 changes: 55 additions & 4 deletions schemaRegistryClient_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package srclient

import (
"bytes"
"encoding/json"
"errors"
"fmt"
Expand All @@ -17,9 +16,61 @@ import (
)

func bodyToString(in io.ReadCloser) string {
buf := new(bytes.Buffer)
buf.ReadFrom(in)
return buf.String()
result, _ := io.ReadAll(in)
return string(result)
}

func TestNewSchemaRegistryClient_SetsExpectedOptions(t *testing.T) {
t.Parallel()

tests := map[string]struct {
registryUrl string
options []Option

expectedClient *http.Client
expectedSemaphoreWeight int64
}{
"no options": {
registryUrl: "localhost:8080",

expectedClient: &http.Client{Timeout: defaultTimeout},
expectedSemaphoreWeight: defaultSemaphoreWeight,
},
"custom semaphore weight": {
registryUrl: "local:8080",
options: []Option{WithSemaphoreWeight(32)},

expectedClient: &http.Client{Timeout: defaultTimeout},
expectedSemaphoreWeight: 32,
},
"custom client": {
registryUrl: "172.0.0.1:8080",
options: []Option{WithClient(&http.Client{Timeout: 32})},

expectedClient: &http.Client{Timeout: 32},
expectedSemaphoreWeight: defaultSemaphoreWeight,
},
}

for name, testData := range tests {
testData := testData
t.Run(name, func(t *testing.T) {
t.Parallel()
// Act
result := NewSchemaRegistryClient(testData.registryUrl, testData.options...)

// Assert
assert.Equal(t, testData.registryUrl, result.schemaRegistryURL)
assert.Equal(t, testData.expectedClient, result.httpClient)

// We should be able to acquire the semaphore by the size we specified
assert.True(t, result.sem.TryAcquire(testData.expectedSemaphoreWeight))
result.sem.Release(testData.expectedSemaphoreWeight)

// Anthing bigger than that weight should fail
assert.False(t, result.sem.TryAcquire(testData.expectedSemaphoreWeight+1))
})
}
}

func TestSchemaRegistryClient_CreateSchemaWithoutReferences(t *testing.T) {
Expand Down

0 comments on commit 38c1384

Please sign in to comment.