Skip to content

Commit

Permalink
Merge pull request #42 from takutakahashi/add_aws_backend
Browse files Browse the repository at this point in the history
async apply|delete
  • Loading branch information
takutakahashi authored May 25, 2020
2 parents dad462a + dff1828 commit 844faf5
Showing 1 changed file with 67 additions and 24 deletions.
91 changes: 67 additions & 24 deletions pkg/terraform/terraform.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import (
"os"
"path/filepath"
"regexp"
"strings"
"text/template"
"time"

"github.com/Masterminds/sprig"
"github.com/takutakahashi/loadbalancer-controller/api/v1beta1"
Expand Down Expand Up @@ -80,9 +80,15 @@ func (t TerraformClient) createConfig() error {
}
return err
}
func (t TerraformClient) createJob(ops string, force bool) error {
func (t TerraformClient) ensureJob(ops string, force bool) error {
job := t.buildJob(ops, force)
_, err := t.clientset.BatchV1().Jobs(t.awsBackend.Namespace).Create(&job)
c := t.clientset.BatchV1().Jobs(t.awsBackend.Namespace)
if _, err := c.Get(job.Name, metav1.GetOptions{}); err == nil {
return nil
} else if !apierrors.IsNotFound(err) {
return err
}
_, err := c.Create(&job)
return err
}

Expand All @@ -91,8 +97,14 @@ func (t TerraformClient) execute(ops string, force bool, watch bool) error {
if err != nil {
return err
}
t.watchCompleteOrError()
err = t.createJob(ops, force)
processing, err := t.isProcessing()
if err != nil {
return err
}
if processing {
return errors.New("before task is processing")
}
err = t.ensureJob(ops, force)
if err != nil {
return err
}
Expand All @@ -109,8 +121,9 @@ func (t TerraformClient) execute(ops string, force bool, watch bool) error {
}

func (t TerraformClient) cleanup() error {
name := fmt.Sprintf("%s-%s", t.awsBackend.Name, t.awsBackend.ResourceVersion)
jc := t.clientset.BatchV1().Jobs(t.awsBackend.Namespace)
job, err := jc.Get(t.awsBackend.Name, metav1.GetOptions{})
job, err := jc.Get(name, metav1.GetOptions{})
if err != nil {
return err
}
Expand Down Expand Up @@ -195,7 +208,7 @@ func (t TerraformClient) updateReport(job *batchv1.Job) error {
}
}
cmclient := t.clientset.CoreV1().ConfigMaps(t.awsBackend.Namespace)
cm, err := cmclient.Get(job.Name, metav1.GetOptions{})
cm, err := cmclient.Get(t.awsBackend.Name, metav1.GetOptions{})
if err != nil {
return err
}
Expand All @@ -209,29 +222,59 @@ func (t TerraformClient) updateReport(job *batchv1.Job) error {
return err
}

func (t TerraformClient) watchCompleteOrError() error {
name := t.awsBackend.Name
namespace := t.awsBackend.Namespace
c := t.clientset.BatchV1().Jobs(namespace)
opt := metav1.GetOptions{}
for i := 0; i < 150; i++ {
job, err := c.Get(name, opt)
if err != nil {
return err
func (t TerraformClient) isProcessing() (bool, error) {
c := t.clientset.BatchV1().Jobs(t.awsBackend.Namespace)
opt := metav1.ListOptions{}
jobs, err := c.List(opt)
if err != nil {
if apierrors.IsNotFound(err) {
return false, nil
}
return false, err
}
for _, job := range jobs.Items {
if !strings.Contains(job.Name, t.awsBackend.Name) {
continue
}
if !strings.Contains(job.Name, t.awsBackend.ResourceVersion) {
return true, nil
}
if job.Status.CompletionTime != nil {
t.updateReport(job)
return c.Delete(name, &metav1.DeleteOptions{})
return false, nil
}

if job.Status.Failed > 0 {
c.Delete(name, &metav1.DeleteOptions{})
return errors.New("Job errored")
return false, nil
}
time.Sleep(5 * time.Second)
}
c.Delete(name, &metav1.DeleteOptions{})
return errors.New("Job completion timeout")
return false, nil
}

func (t TerraformClient) watchCompleteOrError() error {
name := fmt.Sprintf("%s-%s", t.awsBackend.Name, t.awsBackend.ResourceVersion)
namespace := t.awsBackend.Namespace
c := t.clientset.BatchV1().Jobs(namespace)
opt := metav1.GetOptions{}
job, err := c.Get(name, opt)
if err != nil {
return err
}

// job is completed
if job.Status.CompletionTime != nil {
t.updateReport(job)
return c.Delete(name, &metav1.DeleteOptions{})
}

// job is failed
if job.Status.Failed > 0 {
t.updateReport(job)
c.Delete(name, &metav1.DeleteOptions{})
return errors.New("Job errored")
}

// job is still processing
return errors.New("Job is processing")
}

func (t TerraformClient) Apply() error {
Expand Down Expand Up @@ -306,7 +349,7 @@ func (t TerraformClient) workDir() string {
func (t TerraformClient) buildJob(ops string, force bool) batchv1.Job {
// secretName := ""
om := metav1.ObjectMeta{
Name: t.awsBackend.Name,
Name: fmt.Sprintf("%s-%s", t.awsBackend.Name, t.awsBackend.ResourceVersion),
Namespace: t.awsBackend.Namespace,
}
cmd := []string{"/bin/terraform.sh", ops, t.awsBackend.Kind}
Expand Down

0 comments on commit 844faf5

Please sign in to comment.