From be49c891ecbd7dd911206d269864682512df69b2 Mon Sep 17 00:00:00 2001 From: Jaume Sala Date: Sat, 22 Jun 2024 23:47:25 +0200 Subject: [PATCH] Add GetSubjectVersionsById method (#104) --- mockSchemaRegistryClient.go | 29 ++++++++++++++++++++ mockSchemaRegistryClient_test.go | 40 +++++++++++++++++++++++++++ schemaRegistryClient.go | 41 ++++++++++++++++++++++------ schemaRegistryClient_test.go | 47 ++++++++++++++++++++++++++++++++ 4 files changed, 149 insertions(+), 8 deletions(-) diff --git a/mockSchemaRegistryClient.go b/mockSchemaRegistryClient.go index 611e035..92bbf58 100644 --- a/mockSchemaRegistryClient.go +++ b/mockSchemaRegistryClient.go @@ -131,6 +131,35 @@ func (mck *MockSchemaRegistryClient) GetSchemaVersions(subject string) ([]int, e return versions, nil } +// GetSubjectVersionsById Returns subject-version pairs identified by the schema ID. +func (mck *MockSchemaRegistryClient) GetSubjectVersionsById(schemaID int) (SubjectVersionResponse, error) { + for subjectName, schemaVersionsMap := range mck.schemaVersions { + for _, schema := range schemaVersionsMap { + if schema.id == schemaID { + subjectVersionResponse := make(SubjectVersionResponse, 0, len(schemaVersionsMap)) + for schemaVersionKey := range schemaVersionsMap { + subjectVersionResponse = append( + subjectVersionResponse, + subjectVersionPair{ + Subject: subjectName, + Version: schemaVersionKey, + }, + ) + } + return subjectVersionResponse, nil + } + } + } + + posErr := url.Error{ + Op: "GET", + URL: fmt.Sprintf("%s/schemas/ids/%d/versions", mck.schemaRegistryURL, schemaID), + Err: errSchemaNotFound, + } + + return nil, &posErr +} + // GetSchemaByVersion Returns the given Schema according to the passed in subject and version number func (mck *MockSchemaRegistryClient) GetSchemaByVersion(subject string, version int) (*Schema, error) { var schema *Schema diff --git a/mockSchemaRegistryClient_test.go b/mockSchemaRegistryClient_test.go index 7a1b58a..4285f1b 100644 --- a/mockSchemaRegistryClient_test.go +++ b/mockSchemaRegistryClient_test.go @@ -323,6 +323,46 @@ func TestMockSchemaRegistryClient_GetSchema_ReturnsErrOnNotFound(t *testing.T) { assert.Nil(t, result) } +func TestMockSchemaRegistryClient_GetSubjectVersionsById_ReturnsSubjectVersions(t *testing.T) { + t.Parallel() + // Arrange + registry := CreateMockSchemaRegistryClient("http://localhost:8081") + registry.schemaVersions["cupcake"] = map[int]*Schema{ + 1: {id: 1, version: 1}, + 2: {id: 2, version: 2}, + 3: {id: 3, version: 3}, + } + registry.schemaVersions["bakery"] = map[int]*Schema{ + 1: {id: 4, version: 1}, + 2: {id: 5, version: 2}, + } + + // Act + result, err := registry.GetSubjectVersionsById(2) + + // Assert + assert.Nil(t, err) + + assert.Len(t, result, 3) + assert.Equal(t, "cupcake", result[0].Subject) + assert.Equal(t, "cupcake", result[1].Subject) + assert.Equal(t, "cupcake", result[2].Subject) +} + +func TestMockSchemaRegistryClient_GetSubjectVersionsById_ReturnsErrOnNotFound(t *testing.T) { + t.Parallel() + // Arrange + registry := CreateMockSchemaRegistryClient("http://localhost:8081") + + // Act + result, err := registry.GetSubjectVersionsById(2) + + // Assert + assert.ErrorIs(t, err, errSchemaNotFound) + + assert.Nil(t, result) +} + func TestMockSchemaRegistryClient_GetLatestSchema_ReturnsErrorOn0SchemaVersions(t *testing.T) { t.Parallel() // Arrange diff --git a/schemaRegistryClient.go b/schemaRegistryClient.go index 745847e..7021fce 100644 --- a/schemaRegistryClient.go +++ b/schemaRegistryClient.go @@ -34,6 +34,7 @@ type ISchemaRegistryClient interface { GetSchema(schemaID int) (*Schema, error) GetLatestSchema(subject string) (*Schema, error) GetSchemaVersions(subject string) ([]int, error) + GetSubjectVersionsById(schemaID int) (SubjectVersionResponse, error) GetSchemaByVersion(subject string, version int) (*Schema, error) CreateSchema(subject string, schema string, schemaType SchemaType, references ...Reference) (*Schema, error) LookupSchema(subject string, schema string, schemaType SchemaType, references ...Reference) (*Schema, error) @@ -166,15 +167,23 @@ type configChangeRequest struct { type configChangeResponse configChangeRequest +type SubjectVersionResponse []subjectVersionPair + +type subjectVersionPair struct { + Subject string `json:"subject"` + Version int `json:"version"` +} + const ( - schemaByID = "/schemas/ids/%d" - subjectBySubject = "/subjects/%s" - subjectVersions = "/subjects/%s/versions" - subjectByVersion = "/subjects/%s/versions/%s" - subjects = "/subjects" - config = "/config" - configBySubject = "/config/%s" - contentType = "application/vnd.schemaregistry.v1+json" + schemaByID = "/schemas/ids/%d" + subjectVersionsByID = "/schemas/ids/%d/versions" + subjectBySubject = "/subjects/%s" + subjectVersions = "/subjects/%s/versions" + subjectByVersion = "/subjects/%s/versions/%s" + subjects = "/subjects" + config = "/config" + configBySubject = "/config/%s" + contentType = "application/vnd.schemaregistry.v1+json" ) // schemaRegistryConfig is used in NewSchemaRegistryClient and is configured through Option @@ -304,6 +313,22 @@ func (client *SchemaRegistryClient) GetLatestSchema(subject string) (*Schema, er return client.getVersion(subject, "latest") } +// GetSubjectVersionsById returns subject-version pairs identified by the schema ID. +func (client *SchemaRegistryClient) GetSubjectVersionsById(schemaID int) (SubjectVersionResponse, error) { + resp, err := client.httpRequest("GET", fmt.Sprintf(subjectVersionsByID, schemaID), nil) + if err != nil { + return nil, err + } + + var response = new(SubjectVersionResponse) + err = json.Unmarshal(resp, &response) + if err != nil { + return nil, err + } + + return *response, nil +} + // GetSchemaVersions returns a list of versions from a given subject. func (client *SchemaRegistryClient) GetSchemaVersions(subject string) ([]int, error) { resp, err := client.httpRequest("GET", fmt.Sprintf(subjectVersions, url.QueryEscape(subject)), nil) diff --git a/schemaRegistryClient_test.go b/schemaRegistryClient_test.go index 1ca66ee..80af04c 100644 --- a/schemaRegistryClient_test.go +++ b/schemaRegistryClient_test.go @@ -560,6 +560,34 @@ func TestSchemaRegistryClient_GetSchemaType(t *testing.T) { } } +func TestSchemaRegistryClient_GetSubjectVersionsById(t *testing.T) { + t.Parallel() + { + server, call := mockServerWithSubjectVersionResponse(t, fmt.Sprintf("/schemas/ids/%d/versions", 1), SubjectVersionResponse{ + subjectVersionPair{ + Subject: "test1", + Version: 1, + }, + subjectVersionPair{ + Subject: "test1", + Version: 2, + }, + }) + + srClient := CreateSchemaRegistryClient(server.URL) + response, err := srClient.GetSubjectVersionsById(1) + + // Test response + assert.NoError(t, err) + assert.Equal(t, 1, *call) + assert.Len(t, response, 2) + assert.Equal(t, response[0].Subject, "test1") + assert.Equal(t, response[0].Version, 1) + assert.Equal(t, response[1].Subject, "test1") + assert.Equal(t, response[1].Version, 2) + } +} + func TestSchemaRegistryClient_JsonSchemaParses(t *testing.T) { t.Parallel() { @@ -766,3 +794,22 @@ func mockServerWithSchemaResponse(t *testing.T, url string, schemaResponse schem } })), &count } + +func mockServerWithSubjectVersionResponse(t *testing.T, url string, subjectVersionResponse SubjectVersionResponse) (*httptest.Server, *int) { + var count int + return httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { + count++ + response, _ := json.Marshal(subjectVersionResponse) + + switch req.URL.String() { + case url: + // Send response to be tested + _, err := rw.Write(response) + if err != nil { + t.Errorf("could not write response %s", err) + } + default: + require.Fail(t, "unhandled request") + } + })), &count +}