Skip to content

Commit

Permalink
Wait for Cluster & API Key to be ready for use
Browse files Browse the repository at this point in the history
  • Loading branch information
Mongey committed Dec 29, 2020
1 parent f9c9fac commit f839891
Show file tree
Hide file tree
Showing 6 changed files with 200 additions and 19 deletions.
20 changes: 20 additions & 0 deletions ccloud/resource_api_key.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package ccloud
import (
"fmt"
"log"
"time"

ccloud "github.com/cgroschupp/go-client-confluent-cloud/confluentcloud"
"github.com/hashicorp/terraform/helper/resource"
"github.com/hashicorp/terraform/helper/schema"
)

Expand Down Expand Up @@ -93,6 +95,7 @@ func apiKeyCreate(d *schema.ResourceData, meta interface{}) error {
Description: description,
}

log.Printf("[DEBUG] Creating API key")
key, err := c.CreateAPIKey(&req)
if err == nil {
d.SetId(fmt.Sprintf("%d", key.ID))
Expand All @@ -106,10 +109,27 @@ func apiKeyCreate(d *schema.ResourceData, meta interface{}) error {
if err != nil {
return err
}

log.Printf("[INFO] Created API Key, waiting for it become usable")
stateConf := &resource.StateChangeConf{
Pending: []string{"Pending"},
Target: []string{"Ready"},
Refresh: clusterReady(c, clusterID, accountID, key.Key, key.Secret),
Timeout: 300 * time.Second,
Delay: 10 * time.Second,
PollInterval: 5 * time.Second,
MinTimeout: 20 * time.Second,
}

_, err = stateConf.WaitForState()
if err != nil {
return fmt.Errorf("Error waiting for API Key (%s) to be ready: %s", d.Id(), err)
}
} else {
log.Printf("[ERROR] Could not create API key: %s", err)
}

log.Printf("[INFO] API Key Created successfully: %s", err)
return err
}

Expand Down
97 changes: 94 additions & 3 deletions ccloud/resource_kafka_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,12 @@ package ccloud
import (
"fmt"
"log"
"strings"
"time"

"github.com/Shopify/sarama"
ccloud "github.com/cgroschupp/go-client-confluent-cloud/confluentcloud"
"github.com/hashicorp/terraform/helper/resource"
"github.com/hashicorp/terraform/helper/schema"
)

Expand Down Expand Up @@ -96,7 +100,7 @@ func kafkaClusterResource() *schema.Resource {
Optional: true,
ForceNew: true,
Description: "cku",
},
},
},
}
}
Expand Down Expand Up @@ -145,16 +149,103 @@ func clusterCreate(d *schema.ResourceData, meta interface{}) error {
log.Printf("[ERROR] createCluster failed %v, %s", req, err)
return err
}

d.SetId(cluster.ID)
log.Printf("[DEBUG] Created kafka_cluster %s, Endpoint: %s", cluster.ID, cluster.Endpoint)

err = d.Set("bootstrap_servers", cluster.Endpoint)
if err != nil {
return err
}

logicalClusters := []ccloud.LogicalCluster{
ccloud.LogicalCluster{ID: cluster.ID},
}

apiKeyReq := ccloud.ApiKeyCreateRequest{
AccountID: accountID,
LogicalClusters: logicalClusters,
Description: "terraform-provider-kafka cluster connection bootstrap",
}

log.Printf("[DEBUG] Creating bootstrap keypair")
key, err := c.CreateAPIKey(&apiKeyReq)
if err != nil {
return err
}

stateConf := &resource.StateChangeConf{
Pending: []string{"Pending"},
Target: []string{"Ready"},
Refresh: clusterReady(c, d.Id(), accountID, key.Key, key.Secret),
Timeout: 300 * time.Second,
Delay: 3 * time.Second,
PollInterval: 5 * time.Second,
MinTimeout: 20 * time.Second,
}

log.Printf("[DEBUG] Waiting for cluster to become healthy")
_, err = stateConf.WaitForState()
if err != nil {
return fmt.Errorf("Error waiting for cluster (%s) to be ready: %s", d.Id(), err)
}

log.Printf("[DEBUG] Deleting bootstrap keypair")
err = c.DeleteAPIKey(fmt.Sprintf("%d", key.ID), accountID, logicalClusters)
if err != nil {
log.Printf("[ERROR] Unable to delete bootstrap api key %s", err)
}

return nil
}

func clusterReady(client *ccloud.Client, clusterID, accountID, username, password string) resource.StateRefreshFunc {
return func() (result interface{}, s string, err error) {
cluster, err := client.GetCluster(clusterID, accountID)
log.Printf("[DEBUG] Waiting for Cluster to be UP: current status %s %s:%s", cluster.Status, username, password)
log.Printf("[DEBUG] cluster %v", cluster)

if err != nil {
return cluster, "UNKNOWN", err
}

log.Printf("[DEBUG] Attempting to connect to %s, created %s", cluster.Endpoint, cluster.Deployment.Created)
if cluster.Status == "UP" {
if canConnect(cluster.Endpoint, username, password) {
return cluster, "Ready", nil
}
}

return cluster, "Pending", nil
}
}

func canConnect(connection, username, password string) bool {
bootstrapServers := strings.Replace(connection, "SASL_SSL://", "", 1)
log.Printf("[INFO] Trying to connect to %s", bootstrapServers)

cfg := sarama.NewConfig()
cfg.Net.SASL.Enable = true
cfg.Net.SASL.User = username
cfg.Net.SASL.Password = password
cfg.Net.SASL.Handshake = true
cfg.Net.TLS.Enable = true

client, err := sarama.NewClient([]string{bootstrapServers}, cfg)
if err != nil {
log.Printf("[ERROR] Could not build client %s", err)
return false
}

err = client.RefreshMetadata()
if err != nil {
log.Printf("[ERROR] Could not refresh metadata %s", err)
return false
}

log.Printf("[INFO] Success! Connected to %s", bootstrapServers)
return true
}

func clusterDelete(d *schema.ResourceData, meta interface{}) error {
c := meta.(*ccloud.Client)
accountID := d.Get("environment_id").(string)
Expand Down Expand Up @@ -184,7 +275,7 @@ func clusterRead(d *schema.ResourceData, meta interface{}) error {
err = d.Set("availability", cluster.Durability)
}
if err == nil {
err = d.Set("deployment", map[string]interface{}{ "sku": cluster.Deployment.Sku })
err = d.Set("deployment", map[string]interface{}{"sku": cluster.Deployment.Sku})
}
if err == nil {
err = d.Set("storage", cluster.Storage)
Expand Down
22 changes: 15 additions & 7 deletions examples/main.tf
Original file line number Diff line number Diff line change
@@ -1,11 +1,19 @@
terraform {
required_providers {
kafka = {
source = "Mongey/kafka"
version = "0.2.11"
}
confluentcloud = {
source = "Mongey/confluentcloud"
}
}
}

provider "confluentcloud" {}

resource "confluentcloud_environment" "environment" {
name = "default"
}

resource "confluentcloud_environment" "staging" {
name = "staging-env"
name = "production"
}

resource "confluentcloud_kafka_cluster" "test" {
Expand All @@ -27,7 +35,7 @@ resource "confluentcloud_schema_registry" "test" {
service_provider = "aws"
region = "EU"

depends_on = [confluentcloud_kafka_cluster.test]
depends_on = [confluentcloud_kafka_cluster.test]

}

Expand All @@ -47,7 +55,7 @@ provider "kafka" {
sasl_username = confluentcloud_api_key.provider_test.key
sasl_password = confluentcloud_api_key.provider_test.secret
sasl_mechanism = "plain"
timeout = 2
timeout = 10
}

resource "kafka_topic" "syslog" {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ module github.com/Mongey/terraform-provider-confluentcloud
go 1.12

require (
github.com/Shopify/sarama v1.27.1
github.com/cgroschupp/go-client-confluent-cloud v0.0.0-20201105075001-2e15b5846d7e
github.com/go-resty/resty/v2 v2.2.0 // indirect
github.com/hashicorp/terraform v0.12.1
golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7 // indirect
)
Loading

0 comments on commit f839891

Please sign in to comment.