Skip to content

Commit

Permalink
Add GetSubjectVersionsById method (#104)
Browse files Browse the repository at this point in the history
  • Loading branch information
jaumesala authored Jun 22, 2024
1 parent 38c1384 commit be49c89
Show file tree
Hide file tree
Showing 4 changed files with 149 additions and 8 deletions.
29 changes: 29 additions & 0 deletions mockSchemaRegistryClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,35 @@ func (mck *MockSchemaRegistryClient) GetSchemaVersions(subject string) ([]int, e
return versions, nil
}

// GetSubjectVersionsById Returns subject-version pairs identified by the schema ID.
func (mck *MockSchemaRegistryClient) GetSubjectVersionsById(schemaID int) (SubjectVersionResponse, error) {
for subjectName, schemaVersionsMap := range mck.schemaVersions {
for _, schema := range schemaVersionsMap {
if schema.id == schemaID {
subjectVersionResponse := make(SubjectVersionResponse, 0, len(schemaVersionsMap))
for schemaVersionKey := range schemaVersionsMap {
subjectVersionResponse = append(
subjectVersionResponse,
subjectVersionPair{
Subject: subjectName,
Version: schemaVersionKey,
},
)
}
return subjectVersionResponse, nil
}
}
}

posErr := url.Error{
Op: "GET",
URL: fmt.Sprintf("%s/schemas/ids/%d/versions", mck.schemaRegistryURL, schemaID),
Err: errSchemaNotFound,
}

return nil, &posErr
}

// GetSchemaByVersion Returns the given Schema according to the passed in subject and version number
func (mck *MockSchemaRegistryClient) GetSchemaByVersion(subject string, version int) (*Schema, error) {
var schema *Schema
Expand Down
40 changes: 40 additions & 0 deletions mockSchemaRegistryClient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,46 @@ func TestMockSchemaRegistryClient_GetSchema_ReturnsErrOnNotFound(t *testing.T) {
assert.Nil(t, result)
}

func TestMockSchemaRegistryClient_GetSubjectVersionsById_ReturnsSubjectVersions(t *testing.T) {
t.Parallel()
// Arrange
registry := CreateMockSchemaRegistryClient("http://localhost:8081")
registry.schemaVersions["cupcake"] = map[int]*Schema{
1: {id: 1, version: 1},
2: {id: 2, version: 2},
3: {id: 3, version: 3},
}
registry.schemaVersions["bakery"] = map[int]*Schema{
1: {id: 4, version: 1},
2: {id: 5, version: 2},
}

// Act
result, err := registry.GetSubjectVersionsById(2)

// Assert
assert.Nil(t, err)

assert.Len(t, result, 3)
assert.Equal(t, "cupcake", result[0].Subject)
assert.Equal(t, "cupcake", result[1].Subject)
assert.Equal(t, "cupcake", result[2].Subject)
}

func TestMockSchemaRegistryClient_GetSubjectVersionsById_ReturnsErrOnNotFound(t *testing.T) {
t.Parallel()
// Arrange
registry := CreateMockSchemaRegistryClient("http://localhost:8081")

// Act
result, err := registry.GetSubjectVersionsById(2)

// Assert
assert.ErrorIs(t, err, errSchemaNotFound)

assert.Nil(t, result)
}

func TestMockSchemaRegistryClient_GetLatestSchema_ReturnsErrorOn0SchemaVersions(t *testing.T) {
t.Parallel()
// Arrange
Expand Down
41 changes: 33 additions & 8 deletions schemaRegistryClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type ISchemaRegistryClient interface {
GetSchema(schemaID int) (*Schema, error)
GetLatestSchema(subject string) (*Schema, error)
GetSchemaVersions(subject string) ([]int, error)
GetSubjectVersionsById(schemaID int) (SubjectVersionResponse, error)
GetSchemaByVersion(subject string, version int) (*Schema, error)
CreateSchema(subject string, schema string, schemaType SchemaType, references ...Reference) (*Schema, error)
LookupSchema(subject string, schema string, schemaType SchemaType, references ...Reference) (*Schema, error)
Expand Down Expand Up @@ -166,15 +167,23 @@ type configChangeRequest struct {

type configChangeResponse configChangeRequest

type SubjectVersionResponse []subjectVersionPair

type subjectVersionPair struct {
Subject string `json:"subject"`
Version int `json:"version"`
}

const (
schemaByID = "/schemas/ids/%d"
subjectBySubject = "/subjects/%s"
subjectVersions = "/subjects/%s/versions"
subjectByVersion = "/subjects/%s/versions/%s"
subjects = "/subjects"
config = "/config"
configBySubject = "/config/%s"
contentType = "application/vnd.schemaregistry.v1+json"
schemaByID = "/schemas/ids/%d"
subjectVersionsByID = "/schemas/ids/%d/versions"
subjectBySubject = "/subjects/%s"
subjectVersions = "/subjects/%s/versions"
subjectByVersion = "/subjects/%s/versions/%s"
subjects = "/subjects"
config = "/config"
configBySubject = "/config/%s"
contentType = "application/vnd.schemaregistry.v1+json"
)

// schemaRegistryConfig is used in NewSchemaRegistryClient and is configured through Option
Expand Down Expand Up @@ -304,6 +313,22 @@ func (client *SchemaRegistryClient) GetLatestSchema(subject string) (*Schema, er
return client.getVersion(subject, "latest")
}

// GetSubjectVersionsById returns subject-version pairs identified by the schema ID.
func (client *SchemaRegistryClient) GetSubjectVersionsById(schemaID int) (SubjectVersionResponse, error) {
resp, err := client.httpRequest("GET", fmt.Sprintf(subjectVersionsByID, schemaID), nil)
if err != nil {
return nil, err
}

var response = new(SubjectVersionResponse)
err = json.Unmarshal(resp, &response)
if err != nil {
return nil, err
}

return *response, nil
}

// GetSchemaVersions returns a list of versions from a given subject.
func (client *SchemaRegistryClient) GetSchemaVersions(subject string) ([]int, error) {
resp, err := client.httpRequest("GET", fmt.Sprintf(subjectVersions, url.QueryEscape(subject)), nil)
Expand Down
47 changes: 47 additions & 0 deletions schemaRegistryClient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -560,6 +560,34 @@ func TestSchemaRegistryClient_GetSchemaType(t *testing.T) {
}
}

func TestSchemaRegistryClient_GetSubjectVersionsById(t *testing.T) {
t.Parallel()
{
server, call := mockServerWithSubjectVersionResponse(t, fmt.Sprintf("/schemas/ids/%d/versions", 1), SubjectVersionResponse{
subjectVersionPair{
Subject: "test1",
Version: 1,
},
subjectVersionPair{
Subject: "test1",
Version: 2,
},
})

srClient := CreateSchemaRegistryClient(server.URL)
response, err := srClient.GetSubjectVersionsById(1)

// Test response
assert.NoError(t, err)
assert.Equal(t, 1, *call)
assert.Len(t, response, 2)
assert.Equal(t, response[0].Subject, "test1")
assert.Equal(t, response[0].Version, 1)
assert.Equal(t, response[1].Subject, "test1")
assert.Equal(t, response[1].Version, 2)
}
}

func TestSchemaRegistryClient_JsonSchemaParses(t *testing.T) {
t.Parallel()
{
Expand Down Expand Up @@ -766,3 +794,22 @@ func mockServerWithSchemaResponse(t *testing.T, url string, schemaResponse schem
}
})), &count
}

func mockServerWithSubjectVersionResponse(t *testing.T, url string, subjectVersionResponse SubjectVersionResponse) (*httptest.Server, *int) {
var count int
return httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
count++
response, _ := json.Marshal(subjectVersionResponse)

switch req.URL.String() {
case url:
// Send response to be tested
_, err := rw.Write(response)
if err != nil {
t.Errorf("could not write response %s", err)
}
default:
require.Fail(t, "unhandled request")
}
})), &count
}

0 comments on commit be49c89

Please sign in to comment.