From 8c5c314956d8bc0a29cc78b06eff22a33e18c438 Mon Sep 17 00:00:00 2001 From: Aaron Levy Date: Fri, 22 Dec 2023 11:02:40 -0500 Subject: [PATCH] Add integration for read replicas --- docs/resources/service.md | 1 + examples/resources/resource.tf | 5 + internal/client/client.go | 2 + .../client/queries/create_service.graphql | 3 +- .../client/queries/get_all_services.graphql | 44 ++++++ internal/client/queries/get_service.graphql | 5 + internal/client/service.go | 44 ++++++ internal/provider/provider_test.go | 25 +++- internal/provider/service_resource.go | 137 +++++++++++++---- internal/provider/service_resource_test.go | 138 ++++++++++++++++++ 10 files changed, 364 insertions(+), 40 deletions(-) create mode 100644 internal/client/queries/get_all_services.graphql diff --git a/docs/resources/service.md b/docs/resources/service.md index c8c5476..45d00bc 100644 --- a/docs/resources/service.md +++ b/docs/resources/service.md @@ -26,6 +26,7 @@ The change has been taken into account but must still be propagated. You can run - `memory_gb` (Number) Memory GB - `milli_cpu` (Number) Milli CPU - `name` (String) Service Name is the configurable name assigned to this resource. If none is provided, a default will be generated by the provider. +- `read_replica_source` (String) If set, this database will be a read replica of the provided source database. The region must be the same as the source, or if ommitted will be handled by the provider - `region_code` (String) The region for this service. - `storage_gb` (Number, Deprecated) Deprecated: Storage GB - `timeouts` (Attributes) (see [below for nested schema](#nestedatt--timeouts)) diff --git a/examples/resources/resource.tf b/examples/resources/resource.tf index 96ecd45..2a5f8e4 100644 --- a/examples/resources/resource.tf +++ b/examples/resources/resource.tf @@ -4,3 +4,8 @@ resource "timescale_service" "test" { # memory_gb = 4 # region_code = "" } + +# Read replica +resource "timescale_service" "read_replica" { + read_replica_source = timescale_service.test.id +} diff --git a/internal/client/client.go b/internal/client/client.go index 21f21ab..ee9a1c0 100644 --- a/internal/client/client.go +++ b/internal/client/client.go @@ -24,6 +24,8 @@ var ( ResizeInstanceMutation string //go:embed queries/delete_service.graphql DeleteServiceMutation string + //go:embed queries/get_all_services.graphql + GetAllServicesQuery string //go:embed queries/get_service.graphql GetServiceQuery string //go:embed queries/vpcs.graphql diff --git a/internal/client/queries/create_service.graphql b/internal/client/queries/create_service.graphql index 7ea89b6..4f2af23 100644 --- a/internal/client/queries/create_service.graphql +++ b/internal/client/queries/create_service.graphql @@ -1,11 +1,12 @@ mutation CreateService($projectId: ID!, $name: String!, $type: Type!, $resourceConfig: - ResourceConfig, $regionCode: String!, $vpcId: ID) { + ResourceConfig, $regionCode: String!, $vpcId: ID, $forkConfig: ForkConfig) { createService(data:{ projectId:$projectId, name:$name, type:$type, resourceConfig:$resourceConfig, regionCode:$regionCode, + forkConfig:$forkConfig, vpcId: $vpcId }){ initialPassword diff --git a/internal/client/queries/get_all_services.graphql b/internal/client/queries/get_all_services.graphql new file mode 100644 index 0000000..3fdf26d --- /dev/null +++ b/internal/client/queries/get_all_services.graphql @@ -0,0 +1,44 @@ +query GetAllServices($projectId: ID!) { + getAllServices(projectId: $projectId) { + id + projectId + name + type + created + status + replicaStatus + autoscaleSettings { + enabled + } + regionCode + spec { + ... on TimescaleDBServiceSpec { + hostname + username + port + defaultDBName + } + } + resources { + id + spec { + ... on ResourceNode { + milliCPU + memoryGB + storageGB + } + } + } + created + vpcEndpoint { + host + port + vpcId + } + forkedFromId { + projectId + serviceId + isStandby + } + } +} diff --git a/internal/client/queries/get_service.graphql b/internal/client/queries/get_service.graphql index a99a82d..7c487ed 100644 --- a/internal/client/queries/get_service.graphql +++ b/internal/client/queries/get_service.graphql @@ -38,5 +38,10 @@ query GetService($projectId: ID!, $serviceId: ID!) { port vpcId } + forkedFromId { + projectId + serviceId + isStandby + } } } diff --git a/internal/client/service.go b/internal/client/service.go index 7cc5fa6..63fd184 100644 --- a/internal/client/service.go +++ b/internal/client/service.go @@ -12,6 +12,7 @@ import ( type Service struct { ID string `json:"id"` + ProjectID string `json:"projectId"` Name string `json:"name"` AutoscaleSettings struct { Enabled bool `json:"enabled"` @@ -23,6 +24,7 @@ type Service struct { Created string `json:"created"` ReplicaStatus string `json:"replicaStatus"` VpcEndpoint *VpcEndpoint `json:"vpcEndpoint"` + ForkSpec *ForkSpec `json:"forkedFromId"` } type ServiceSpec struct { @@ -52,6 +54,19 @@ type CreateServiceRequest struct { RegionCode string ReplicaCount string VpcID int64 + ForkConfig *ForkConfig +} + +type ForkConfig struct { + ProjectID string `json:"projectID"` + ServiceID string `json:"serviceID"` + IsStandby bool `json:"isStandby"` +} + +type ForkSpec struct { + ProjectID string `json:"projectId"` + ServiceID string `json:"serviceId"` + IsStandby bool `json:"isStandby"` } type CreateServiceResponseData struct { @@ -67,6 +82,10 @@ type GetServiceResponse struct { Service Service `json:"getService"` } +type GetAllServicesResponse struct { + Services []*Service `json:"getAllServices"` +} + type DeleteServiceResponse struct { Service Service `json:"deleteService"` } @@ -93,6 +112,9 @@ func (c *Client) CreateService(ctx context.Context, request CreateServiceRequest if request.VpcID > 0 { variables["vpcId"] = request.VpcID } + if request.ForkConfig != nil { + variables["forkConfig"] = request.ForkConfig + } req := map[string]interface{}{ "operationName": "CreateService", @@ -220,6 +242,28 @@ func (c *Client) GetService(ctx context.Context, id string) (*Service, error) { return &resp.Data.Service, nil } +func (c *Client) GetAllServices(ctx context.Context) ([]*Service, error) { + tflog.Trace(ctx, "Client.GetAllServices") + req := map[string]interface{}{ + "operationName": "GetAllServices", + "query": GetAllServicesQuery, + "variables": map[string]string{ + "projectId": c.projectID, + }, + } + var resp Response[GetAllServicesResponse] + if err := c.do(ctx, req, &resp); err != nil { + return nil, err + } + if len(resp.Errors) > 0 { + return nil, resp.Errors[0] + } + if resp.Data == nil { + return nil, errors.New("no response found") + } + return resp.Data.Services, nil +} + func (c *Client) DeleteService(ctx context.Context, id string) (*Service, error) { tflog.Trace(ctx, "Client.DeleteService") req := map[string]interface{}{ diff --git a/internal/provider/provider_test.go b/internal/provider/provider_test.go index 8c3608c..6a4394c 100644 --- a/internal/provider/provider_test.go +++ b/internal/provider/provider_test.go @@ -44,14 +44,15 @@ var testAccProtoV6ProviderFactories = map[string]func() (tfprotov6.ProviderServe } type Config struct { - ResourceName string - Name string - Timeouts Timeouts - MilliCPU int64 - MemoryGB int64 - RegionCode string - EnableHAReplica bool - VpcID int64 + ResourceName string + Name string + Timeouts Timeouts + MilliCPU int64 + MemoryGB int64 + RegionCode string + EnableHAReplica bool + VpcID int64 + ReadReplicaSource string } func (c *Config) WithName(name string) *Config { @@ -75,6 +76,11 @@ func (c *Config) WithHAReplica(enableHAReplica bool) *Config { return c } +func (c *Config) WithReadReplica(source string) *Config { + c.ReadReplicaSource = source + return c +} + func (c *Config) String(t *testing.T) string { c.setDefaults() b := &strings.Builder{} @@ -87,6 +93,9 @@ func (c *Config) String(t *testing.T) string { if c.Name != "" { write("name = %q \n", c.Name) } + if c.ReadReplicaSource != "" { + write("read_replica_source = %s \n", c.ReadReplicaSource) + } if c.EnableHAReplica { write("enable_ha_replica = %t \n", c.EnableHAReplica) } diff --git a/internal/provider/service_resource.go b/internal/provider/service_resource.go index 0249135..73d1660 100644 --- a/internal/provider/service_resource.go +++ b/internal/provider/service_resource.go @@ -2,8 +2,10 @@ package provider import ( "context" + "errors" "fmt" "strconv" + "sync" "time" "github.com/hashicorp/terraform-plugin-framework-timeouts/resource/timeouts" @@ -31,12 +33,15 @@ var _ resource.Resource = &ServiceResource{} var _ resource.ResourceWithImportState = &ServiceResource{} const ( - ErrCreateTimeout = "Error waiting for service creation" - ErrUpdateService = "Error updating service" - ErrInvalidAttribute = "Invalid Attribute Value" - - DefaultMilliCPU = 500 - DefaultMemoryGB = 2 + ErrCreateTimeout = "Error waiting for service creation" + ErrUpdateService = "Error updating service" + ErrInvalidAttribute = "Invalid Attribute Value" + errMultipleReadReplicas = "cannot create multiple read replicas for a service" + errReplicaFromFork = "cannot create a read replica from a read replica or fork" + errReplicaWithHA = "cannot create a read replica with HA enabled" + errUpdateReplicaSource = "cannot update read replica source" + DefaultMilliCPU = 500 + DefaultMemoryGB = 2 DefaultEnableHAReplica = false ) @@ -50,6 +55,9 @@ func NewServiceResource() resource.Resource { return &ServiceResource{} } +// readReplicaMu synchronizes operations on read replicas for a project. +var readReplicaMu sync.Mutex + // ServiceResource defines the resource implementation. type ServiceResource struct { client *tsClient.Client @@ -57,19 +65,20 @@ type ServiceResource struct { // serviceResourceModel maps the resource schema data. type serviceResourceModel struct { - ID types.String `tfsdk:"id"` - Name types.String `tfsdk:"name"` - Timeouts timeouts.Value `tfsdk:"timeouts"` - MilliCPU types.Int64 `tfsdk:"milli_cpu"` - StorageGB types.Int64 `tfsdk:"storage_gb"` - MemoryGB types.Int64 `tfsdk:"memory_gb"` - Password types.String `tfsdk:"password"` - Hostname types.String `tfsdk:"hostname"` - Port types.Int64 `tfsdk:"port"` - Username types.String `tfsdk:"username"` - RegionCode types.String `tfsdk:"region_code"` - EnableHAReplica types.Bool `tfsdk:"enable_ha_replica"` - VpcId types.Int64 `tfsdk:"vpc_id"` + ID types.String `tfsdk:"id"` + Name types.String `tfsdk:"name"` + Timeouts timeouts.Value `tfsdk:"timeouts"` + MilliCPU types.Int64 `tfsdk:"milli_cpu"` + StorageGB types.Int64 `tfsdk:"storage_gb"` + MemoryGB types.Int64 `tfsdk:"memory_gb"` + Password types.String `tfsdk:"password"` + Hostname types.String `tfsdk:"hostname"` + Port types.Int64 `tfsdk:"port"` + Username types.String `tfsdk:"username"` + RegionCode types.String `tfsdk:"region_code"` + EnableHAReplica types.Bool `tfsdk:"enable_ha_replica"` + ReadReplicaSource types.String `tfsdk:"read_replica_source"` + VpcId types.Int64 `tfsdk:"vpc_id"` } func (r *ServiceResource) Metadata(ctx context.Context, req resource.MetadataRequest, resp *resource.MetadataResponse) { @@ -125,6 +134,11 @@ The change has been taken into account but must still be propagated. You can run Computed: true, Default: booldefault.StaticBool(DefaultEnableHAReplica), }, + "read_replica_source": schema.StringAttribute{ + MarkdownDescription: "If set, this database will be a read replica of the provided source database. The region must be the same as the source, or if ommitted will be handled by the provider", + Description: "If set, this database will be a read replica of the provided source database. The region must be the same as the source, or if ommitted will be handled by the provider", + Optional: true, + }, "storage_gb": schema.Int64Attribute{ MarkdownDescription: "Deprecated: Storage GB", Description: "Deprecated: Storage GB", @@ -237,6 +251,36 @@ func (r *ServiceResource) Create(ctx context.Context, req resource.CreateRequest if !plan.VpcId.IsNull() { request.VpcID = plan.VpcId.ValueInt64() } + + readReplicaSource := plan.ReadReplicaSource.ValueString() + if readReplicaSource != "" { + // Locking is done to prevent multiple read replicas being created for a service at once + readReplicaMu.Lock() + defer readReplicaMu.Unlock() + + primary, err := r.client.GetService(ctx, readReplicaSource) + if err != nil { + resp.Diagnostics.AddError("Client Error", fmt.Sprintf("unable to get primary service %s, got error: %s", readReplicaSource, err)) + return + } + err = r.validateCreateReadReplicaRequest(ctx, primary, plan) + if err != nil { + resp.Diagnostics.AddError("read replica validation error", err.Error()) + return + } + if request.Name == "" { + request.Name = "replica-" + primary.Name + } + if request.RegionCode == "" { + request.RegionCode = primary.RegionCode + } + request.ForkConfig = &tsClient.ForkConfig{ + ProjectID: primary.ProjectID, + ServiceID: primary.ID, + IsStandby: true, + } + } + response, err := r.client.CreateService(ctx, request) if err != nil { @@ -263,6 +307,27 @@ func (r *ServiceResource) Create(ctx context.Context, req resource.CreateRequest } } +func (r *ServiceResource) validateCreateReadReplicaRequest(ctx context.Context, primary *tsClient.Service, plan serviceResourceModel) error { + tflog.Trace(ctx, "validateCreateReadReplicaRequest") + + if primary.ForkSpec != nil { + return errors.New(errReplicaFromFork) + } + if plan.EnableHAReplica.ValueBool() { + return errors.New(errReplicaWithHA) + } + services, err := r.client.GetAllServices(ctx) + if err != nil { + return err + } + for _, service := range services { + if service.ForkSpec != nil && service.ForkSpec.ServiceID == primary.ID { + return errors.New(errMultipleReadReplicas) + } + } + return nil +} + func (r *ServiceResource) waitForServiceReadiness(ctx context.Context, ID string, timeouts timeouts.Value) (*tsClient.Service, error) { tflog.Trace(ctx, "ServiceResource.waitForServiceReadiness") @@ -334,9 +399,18 @@ func (r *ServiceResource) Update(ctx context.Context, req resource.UpdateRequest if resp.Diagnostics.HasError() { return } - serviceID := state.ID.ValueString() + readReplicaSource := plan.ReadReplicaSource.ValueString() + if readReplicaSource != state.ReadReplicaSource.ValueString() { + resp.Diagnostics.AddError(ErrUpdateService, errUpdateReplicaSource) + return + } + if readReplicaSource != "" && plan.EnableHAReplica.ValueBool() { + resp.Diagnostics.AddError(ErrUpdateService, errReplicaWithHA) + return + } + if !plan.Hostname.IsUnknown() { resp.Diagnostics.AddError(ErrUpdateService, "Do not support hostname change") return @@ -462,17 +536,18 @@ func (r *ServiceResource) ImportState(ctx context.Context, req resource.ImportSt func serviceToResource(diag diag.Diagnostics, s *tsClient.Service, state serviceResourceModel) serviceResourceModel { model := serviceResourceModel{ - ID: types.StringValue(s.ID), - Password: state.Password, - Name: types.StringValue(s.Name), - MilliCPU: types.Int64Value(s.Resources[0].Spec.MilliCPU), - MemoryGB: types.Int64Value(s.Resources[0].Spec.MemoryGB), - Hostname: types.StringValue(s.ServiceSpec.Hostname), - Username: types.StringValue(s.ServiceSpec.Username), - Port: types.Int64Value(s.ServiceSpec.Port), - RegionCode: types.StringValue(s.RegionCode), - Timeouts: state.Timeouts, - EnableHAReplica: types.BoolValue(s.ReplicaStatus != ""), + ID: types.StringValue(s.ID), + Password: state.Password, + Name: types.StringValue(s.Name), + MilliCPU: types.Int64Value(s.Resources[0].Spec.MilliCPU), + MemoryGB: types.Int64Value(s.Resources[0].Spec.MemoryGB), + Hostname: types.StringValue(s.ServiceSpec.Hostname), + Username: types.StringValue(s.ServiceSpec.Username), + Port: types.Int64Value(s.ServiceSpec.Port), + RegionCode: types.StringValue(s.RegionCode), + Timeouts: state.Timeouts, + EnableHAReplica: types.BoolValue(s.ReplicaStatus != ""), + ReadReplicaSource: state.ReadReplicaSource, } if s.VpcEndpoint != nil { if vpcId, err := strconv.ParseInt(s.VpcEndpoint.VpcId, 10, 64); err != nil { diff --git a/internal/provider/service_resource_test.go b/internal/provider/service_resource_test.go index 988cf9c..159935a 100644 --- a/internal/provider/service_resource_test.go +++ b/internal/provider/service_resource_test.go @@ -74,6 +74,144 @@ func TestServiceResource_Default_Success(t *testing.T) { }) } +func TestServiceResource_Read_Replica(t *testing.T) { + const ( + primaryName = "primary" + extraName = "extra" + replicaName = "read_replica" + primaryFQID = "timescale_service." + primaryName + extraFQID = "timescale_service." + extraName + replicaFQID = "timescale_service." + replicaName + ) + var ( + primaryConfig = &Config{ + ResourceName: primaryName, + Name: "service resource test init", + } + extraConfig = &Config{ + ResourceName: extraName, + } + replicaConfig = &Config{ + ResourceName: replicaName, + ReadReplicaSource: primaryFQID + ".id", + MilliCPU: 500, + MemoryGB: 2, + } + extraReplicaConfig = &Config{ + ResourceName: replicaName + "_2", + ReadReplicaSource: primaryFQID + ".id", + } + ) + // Test creating a service with a read replica + resource.ParallelTest(t, resource.TestCase{ + ProtoV6ProviderFactories: testAccProtoV6ProviderFactories, + PreCheck: func() { testAccPreCheck(t) }, + Steps: []resource.TestStep{ + { + Config: getConfig(t, primaryConfig, replicaConfig), + Check: resource.ComposeAggregateTestCheckFunc( + // Verify service attributes + resource.TestCheckResourceAttr(primaryFQID, "name", "service resource test init"), + resource.TestCheckResourceAttrSet(primaryFQID, "id"), + resource.TestCheckResourceAttrSet(primaryFQID, "password"), + resource.TestCheckResourceAttrSet(primaryFQID, "hostname"), + resource.TestCheckResourceAttrSet(primaryFQID, "username"), + resource.TestCheckResourceAttrSet(primaryFQID, "port"), + resource.TestCheckResourceAttr(primaryFQID, "milli_cpu", "500"), + resource.TestCheckResourceAttr(primaryFQID, "memory_gb", "2"), + resource.TestCheckResourceAttr(primaryFQID, "region_code", "us-east-1"), + resource.TestCheckResourceAttr(primaryFQID, "enable_ha_replica", "false"), + resource.TestCheckNoResourceAttr(primaryFQID, "vpc_id"), + + // Verify read replica attributes + resource.TestCheckResourceAttr(replicaFQID, "name", "replica-service resource test init"), + resource.TestCheckResourceAttrSet(replicaFQID, "id"), + resource.TestCheckResourceAttrSet(replicaFQID, "password"), + resource.TestCheckResourceAttrSet(replicaFQID, "hostname"), + resource.TestCheckResourceAttrSet(replicaFQID, "username"), + resource.TestCheckResourceAttrSet(replicaFQID, "port"), + resource.TestCheckResourceAttr(replicaFQID, "milli_cpu", "500"), + resource.TestCheckResourceAttr(replicaFQID, "memory_gb", "2"), + resource.TestCheckResourceAttr(replicaFQID, "region_code", "us-east-1"), + resource.TestCheckResourceAttr(replicaFQID, "enable_ha_replica", "false"), + resource.TestCheckResourceAttrSet(replicaFQID, "read_replica_source"), + resource.TestCheckNoResourceAttr(replicaFQID, "vpc_id"), + ), + }, + // Update replica name + { + Config: getConfig(t, primaryConfig, replicaConfig.WithName("replica")), + Check: resource.ComposeAggregateTestCheckFunc( + resource.TestCheckResourceAttr(replicaFQID, "name", "replica"), + ), + }, + // Do a compute resize + { + Config: getConfig(t, primaryConfig, replicaConfig.WithSpec(1000, 4)), + Check: resource.ComposeAggregateTestCheckFunc( + resource.TestCheckResourceAttr(replicaFQID, "milli_cpu", "1000"), + resource.TestCheckResourceAttr(replicaFQID, "memory_gb", "4"), + ), + }, + // Add VPC to the read replica + { + Config: getConfig(t, primaryConfig, replicaConfig.WithVPC(DEFAULT_VPC_ID)), + Check: resource.ComposeAggregateTestCheckFunc( + resource.TestCheckResourceAttr(replicaFQID, "vpc_id", "2074"), + ), + }, + // Remove VPC + { + Config: getConfig(t, primaryConfig, replicaConfig.WithVPC(0)), + Check: resource.ComposeAggregateTestCheckFunc( + resource.TestCheckNoResourceAttr(primaryFQID, "vpc_id"), + ), + }, + // Check adding HA returns an error + { + Config: getConfig(t, primaryConfig, replicaConfig.WithHAReplica(true)), + ExpectError: regexp.MustCompile(errReplicaWithHA), + }, + // Check removing read_replica_source returns an error + { + Config: getConfig(t, primaryConfig, replicaConfig.WithHAReplica(false).WithReadReplica("")), + ExpectError: regexp.MustCompile(errUpdateReplicaSource), + }, + // Check changing read_replica_source returns an error + { + Config: getConfig(t, primaryConfig, extraConfig, replicaConfig.WithReadReplica(extraFQID+".id")), + ExpectError: regexp.MustCompile(errUpdateReplicaSource), + }, + // Check enabling read_replica_source returns an error + { + Config: getConfig(t, primaryConfig.WithReadReplica(extraFQID+".id"), extraConfig, replicaConfig.WithReadReplica(primaryFQID+".id")), + ExpectError: regexp.MustCompile(errUpdateReplicaSource), + }, + // Check creating multiple read replicas returns an error + { + Config: getConfig(t, primaryConfig.WithReadReplica(""), replicaConfig, extraReplicaConfig), + ExpectError: regexp.MustCompile(errMultipleReadReplicas), + }, + // Test creating a read replica from a read replica returns an error + { + Config: getConfig(t, primaryConfig, replicaConfig, extraReplicaConfig.WithReadReplica(replicaFQID+".id")), + ExpectError: regexp.MustCompile(errReplicaFromFork), + }, + // Remove Replica + { + Config: getConfig(t, primaryConfig), + Check: func(state *terraform.State) error { + resources := state.RootModule().Resources + if _, ok := resources[replicaFQID]; ok { + return errors.New("expected replica to be deleted") + } + return nil + }, + }, + }, + }) +} + func TestServiceResource_Timeout(t *testing.T) { resource.Test(t, resource.TestCase{ ProtoV6ProviderFactories: testAccProtoV6ProviderFactories,