Skip to content

Commit

Permalink
fix: update provider and schema resource definition
Browse files Browse the repository at this point in the history
  • Loading branch information
dstrates committed Jun 28, 2024
1 parent bcd4556 commit 0aa435b
Show file tree
Hide file tree
Showing 5 changed files with 107 additions and 19 deletions.
2 changes: 1 addition & 1 deletion .env
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
GOPRIVATE=github.com/cultureamp/*
SCHEMA_REGISTRY_URL=http://localhost:8081
SCHEMA_REGISTRY_URL=https://schema-registry.kafka.usw2.dev-us.cultureamp.io # dev platform cluster
SCHEMA_REGISTRY_USERNAME=user
SCHEMA_REGISTRY_PASSWORD=password
12 changes: 6 additions & 6 deletions internal/provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,14 @@ type Provider struct {
// ProviderModel maps provider schema data to a Go type.
type ProviderModel struct {
URL types.String `tfsdk:"schema_registry_url"`
Username types.String `tfsdk:"schema_registry_username"`
Password types.String `tfsdk:"schema_registry_password"`
Username types.String `tfsdk:"username"`
Password types.String `tfsdk:"password"`
}

// Metadata returns the provider type name.
func (p *Provider) Metadata(ctx context.Context, req provider.MetadataRequest,
resp *provider.MetadataResponse) {
resp.TypeName = ""
resp.TypeName = "schemaregistry"
resp.Version = p.version
}

Expand Down Expand Up @@ -88,8 +88,8 @@ func (p *Provider) Configure(ctx context.Context, req provider.ConfigureRequest,
password := getEnvOrDefault(config.Password, "SCHEMA_REGISTRY_PASSWORD")

ctx = tflog.SetField(ctx, "schema_registry_url", url)
ctx = tflog.SetField(ctx, "schema_registry_username", username)
ctx = tflog.MaskFieldValuesWithFieldKeys(ctx, "schema_registry_password")
ctx = tflog.SetField(ctx, "username", username)
ctx = tflog.MaskFieldValuesWithFieldKeys(ctx, "password")

tflog.Debug(ctx, "Creating Schema Registry client")

Expand Down Expand Up @@ -126,7 +126,7 @@ func getEnvOrDefault(configValue types.String, envVar string) string {
// Resources defines the resources implemented in the provider.
func (p *Provider) Resources(ctx context.Context) []func() resource.Resource {
return []func() resource.Resource{
//TODO SchemaResource,
NewSchemaResource,
}
}

Expand Down
4 changes: 2 additions & 2 deletions internal/provider/provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ var testAccProtoV6ProviderFactories = map[string]func() (tfprotov6.ProviderServe

const (
testAccProviderVersion = "0.0.1"
testAccProviderType = "schema_registry"
testAccProviderType = "schemaregistry"

providerConfig = `
provider "schemaregistry" {
schema_registry_url = "http://test-url"
schema_registry_url = "https://schema-registry.kafka.usw2.dev-us.cultureamp.io"
username = "test-user"
password = "test-pass"
}
Expand Down
102 changes: 95 additions & 7 deletions internal/provider/resource_schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (

"github.com/hashicorp/terraform-plugin-framework-validators/stringvalidator"
"github.com/hashicorp/terraform-plugin-framework/attr"
"github.com/hashicorp/terraform-plugin-framework/path"
"github.com/hashicorp/terraform-plugin-framework/resource"
"github.com/hashicorp/terraform-plugin-framework/resource/schema"
"github.com/hashicorp/terraform-plugin-framework/resource/schema/planmodifier"
Expand Down Expand Up @@ -35,6 +34,7 @@ type schemaResource struct {

// schemaResourceModel describes the resource data model.
type schemaResourceModel struct {
ID types.String `tfsdk:"id"`
Subject types.String `tfsdk:"subject"`
Schema types.String `tfsdk:"schema"`
SchemaID types.Int64 `tfsdk:"schema_id"`
Expand All @@ -56,6 +56,13 @@ func (r *schemaResource) Schema(_ context.Context, _ resource.SchemaRequest, res
MarkdownDescription: "Schema resource. Manages a schema in the Schema Registry.",
Description: "Manages a schema in the Schema Registry.",
Attributes: map[string]schema.Attribute{
"id": schema.StringAttribute{
Description: "The ID of the schema, which is the subject name.",
Computed: true,
PlanModifiers: []planmodifier.String{
stringplanmodifier.UseStateForUnknown(),
},
},
"subject": schema.StringAttribute{
Description: "The subject related to the schema.",
Required: true,
Expand All @@ -74,6 +81,9 @@ func (r *schemaResource) Schema(_ context.Context, _ resource.SchemaRequest, res
"schema_type": schema.StringAttribute{
Description: "The schema type. Default is avro.",
Optional: true,
PlanModifiers: []planmodifier.String{
stringplanmodifier.RequiresReplace(),
},
Validators: []validator.String{
stringvalidator.OneOf(
"avro",
Expand Down Expand Up @@ -154,8 +164,8 @@ func (r *schemaResource) Create(ctx context.Context, req resource.CreateRequest,

// Generate API request body from plan
schemaString := plan.Schema.ValueString()
references := ToRegistryReferences(plan.Reference)
schemaType := ToSchemaType(plan.SchemaType.ValueString())
references := ToRegistryReferences(plan.Reference)
compatibilityLevel := ToCompatibilityLevelType(plan.CompatibilityLevel.ValueString())

// Create new schema resource
Expand All @@ -177,10 +187,15 @@ func (r *schemaResource) Create(ctx context.Context, req resource.CreateRequest,
return
}

//convert *srclient.SchemaType to string
schemaTypeStr := FromSchemaType(schema.SchemaType())

// Map response body to schema
plan.SchemaID = types.Int64Value(int64(schema.ID()))
plan.Version = types.Int64Value(int64(schema.Version()))
plan.ID = plan.Subject
plan.Schema = types.StringValue(schema.Schema())
plan.SchemaID = types.Int64Value(int64(schema.ID()))
plan.SchemaType = types.StringValue(schemaTypeStr)
plan.Version = types.Int64Value(1) // Set the version to 1 for new schema
plan.Reference = FromRegistryReferences(schema.References())

// Set state to fully populated data
Expand Down Expand Up @@ -210,9 +225,12 @@ func (r *schemaResource) Read(ctx context.Context, req resource.ReadRequest, res
return
}

schemaType := FromSchemaType(schema.SchemaType())

// Update state with refreshed values
state.Schema = types.StringValue(schema.Schema())
state.SchemaID = types.Int64Value(int64(schema.ID()))
state.SchemaType = types.StringValue(schemaType)
state.Version = types.Int64Value(int64(schema.Version()))
state.Reference = FromRegistryReferences(schema.References())

Expand Down Expand Up @@ -260,9 +278,9 @@ func (r *schemaResource) Update(ctx context.Context, req resource.UpdateRequest,
}

// Update state with refreshed values
plan.Schema = types.StringValue(schema.Schema())
plan.SchemaID = types.Int64Value(int64(schema.ID()))
plan.Version = types.Int64Value(int64(schema.Version()))
plan.Schema = types.StringValue(schema.Schema())
plan.Reference = FromRegistryReferences(schema.References())

diags = resp.State.Set(ctx, plan)
Expand Down Expand Up @@ -295,8 +313,50 @@ func (r *schemaResource) Delete(ctx context.Context, req resource.DeleteRequest,

func (r *schemaResource) ImportState(ctx context.Context, req resource.ImportStateRequest,
resp *resource.ImportStateResponse) {
// Retrieve import ID and save to id attribute
resource.ImportStatePassthroughID(ctx, path.Root("id"), req, resp)
subject := req.ID

// Retrieve the latest schema for the subject
schema, err := r.client.GetLatestSchema(subject)
if err != nil {
resp.Diagnostics.AddError(
"Error Importing Schema",
fmt.Sprintf("Could not retrieve schema for subject %s: %s", subject,
err.Error(),
),
)
return
}

// Retrieve the compatibility level for the subject
compatibilityLevel, err := r.client.GetCompatibilityLevel(subject, true)
if err != nil {
resp.Diagnostics.AddError(
"Error Importing Compatibility Level",
fmt.Sprintf("Could not retrieve compatibility level for subject %s: %s", subject, err.Error()),
)
return
}

schemaType := FromSchemaType(schema.SchemaType())

// Create state from retrieved schema
state := schemaResourceModel{
ID: types.StringValue(subject),
Subject: types.StringValue(subject),
Schema: types.StringValue(schema.Schema()),
SchemaID: types.Int64Value(int64(schema.ID())),
SchemaType: types.StringValue(schemaType),
Version: types.Int64Value(int64(schema.Version())),
Reference: FromRegistryReferences(schema.References()),
CompatibilityLevel: types.StringValue(FromCompatibilityLevelType(*compatibilityLevel)),
}

// Set the state
diags := resp.State.Set(ctx, state)
resp.Diagnostics.Append(diags...)
if resp.Diagnostics.HasError() {
return
}
}

func FromRegistryReferences(references []srclient.Reference) types.List {
Expand Down Expand Up @@ -385,6 +445,13 @@ func ToRegistryReferences(references types.List) []srclient.Reference {
return refs
}

func FromSchemaType(schemaType *srclient.SchemaType) string {
if schemaType == nil {
return "avro"
}
return string(*schemaType)
}

func ToSchemaType(schemaType string) srclient.SchemaType {
switch schemaType {
case "json":
Expand All @@ -396,6 +463,27 @@ func ToSchemaType(schemaType string) srclient.SchemaType {
}
}

func FromCompatibilityLevelType(compatibilityLevel srclient.CompatibilityLevel) string {
switch compatibilityLevel {
case srclient.None:
return "NONE"
case srclient.Backward:
return "BACKWARD"
case srclient.BackwardTransitive:
return "BACKWARD_TRANSITIVE"
case srclient.Forward:
return "FORWARD"
case srclient.ForwardTransitive:
return "FORWARD_TRANSITIVE"
case srclient.Full:
return "FULL"
case srclient.FullTransitive:
return "FULL_TRANSITIVE"
default:
return "FORWARD_TRANSITIVE"
}
}

func ToCompatibilityLevelType(compatibilityLevel string) srclient.CompatibilityLevel {
switch compatibilityLevel {
case "NONE":
Expand Down
6 changes: 3 additions & 3 deletions internal/provider/resource_schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@ resource "schemaregistry_schema" "test" {
subject = "test-subject"
schema = "{\"type\":\"record\",\"name\":\"Test\",\"fields\":[{\"name\":\"f1\",\"type\":\"string\"}]}"
schema_type = "avro"
compatibility_level = "FULL"
compatibility_level = "NONE"
}
`,
Check: resource.ComposeAggregateTestCheckFunc(
// Verify schema attributes
resource.TestCheckResourceAttr("schemaregistry_schema.test", "subject", "test-subject"),
resource.TestCheckResourceAttr("schemaregistry_schema.test", "schema", "{\"type\":\"record\",\"name\":\"Test\",\"fields\":[{\"name\":\"f1\",\"type\":\"string\"}]}"),
resource.TestCheckResourceAttr("schemaregistry_schema.test", "schema_type", "json"),
resource.TestCheckResourceAttr("schemaregistry_schema.test", "compatibility_level", "FULL"),
resource.TestCheckResourceAttr("schemaregistry_schema.test", "schema_type", "avro"),
resource.TestCheckResourceAttr("schemaregistry_schema.test", "compatibility_level", "NONE"),
resource.TestCheckResourceAttrSet("schemaregistry_schema.test", "schema_id"),
resource.TestCheckResourceAttrSet("schemaregistry_schema.test", "version"),
),
Expand Down

0 comments on commit 0aa435b

Please sign in to comment.