Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reapply "feat: support canary test for multiple load balancers (#80)"… #84

Merged
merged 3 commits into from
Jun 26, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
takkyuuplayer marked this conversation as resolved.
Show resolved Hide resolved
"explorer.fileNesting.enabled": true,
"explorer.fileNesting.patterns": {
"*.go": "${capture}_test.go",
"go.mod": "go.sum"
}
}
355 changes: 355 additions & 0 deletions canary_task.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,355 @@
package cage

import (
"context"
"fmt"
"time"

"github.com/apex/log"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/ec2"
ec2types "github.com/aws/aws-sdk-go-v2/service/ec2/types"
"github.com/aws/aws-sdk-go-v2/service/ecs"
ecstypes "github.com/aws/aws-sdk-go-v2/service/ecs/types"
elbv2 "github.com/aws/aws-sdk-go-v2/service/elasticloadbalancingv2"
elbv2types "github.com/aws/aws-sdk-go-v2/service/elasticloadbalancingv2/types"
"golang.org/x/xerrors"
)

type CanaryTarget struct {
targetGroupArn *string
targetId *string
targetPort *int32
availabilityZone *string
}

type CanaryTask struct {
*cage
td *ecstypes.TaskDefinition
lb *ecstypes.LoadBalancer
networkConfiguration *ecstypes.NetworkConfiguration
platformVersion *string
taskArn *string
target *CanaryTarget
}

func (c *CanaryTask) Start(ctx context.Context) error {
if c.Env.CanaryInstanceArn != "" {
// ec2
startTask := &ecs.StartTaskInput{
Cluster: &c.Env.Cluster,
Group: aws.String(fmt.Sprintf("cage:canary-task:%s", c.Env.Service)),
NetworkConfiguration: c.networkConfiguration,
TaskDefinition: c.td.TaskDefinitionArn,
ContainerInstances: []string{c.Env.CanaryInstanceArn},
}
if o, err := c.Ecs.StartTask(ctx, startTask); err != nil {
return err

Check warning on line 47 in canary_task.go

View check run for this annotation

Codecov / codecov/patch

canary_task.go#L47

Added line #L47 was not covered by tests
} else {
c.taskArn = o.Tasks[0].TaskArn
}
} else {
// fargate
if o, err := c.Ecs.RunTask(ctx, &ecs.RunTaskInput{
Cluster: &c.Env.Cluster,
Group: aws.String(fmt.Sprintf("cage:canary-task:%s", c.Env.Service)),
NetworkConfiguration: c.networkConfiguration,
TaskDefinition: c.td.TaskDefinitionArn,
LaunchType: ecstypes.LaunchTypeFargate,
PlatformVersion: c.platformVersion,
}); err != nil {
return err

Check warning on line 61 in canary_task.go

View check run for this annotation

Codecov / codecov/patch

canary_task.go#L61

Added line #L61 was not covered by tests
} else {
c.taskArn = o.Tasks[0].TaskArn
}
}
return nil
}

func (c *CanaryTask) Wait(ctx context.Context) error {
log.Infof("🥚 waiting for canary task '%s' is running...", *c.taskArn)
if err := ecs.NewTasksRunningWaiter(c.Ecs).Wait(ctx, &ecs.DescribeTasksInput{
Cluster: &c.Env.Cluster,
Tasks: []string{*c.taskArn},
}, c.MaxWait); err != nil {
return err

Check warning on line 75 in canary_task.go

View check run for this annotation

Codecov / codecov/patch

canary_task.go#L75

Added line #L75 was not covered by tests
}
log.Infof("🐣 canary task '%s' is running!", *c.taskArn)
if err := c.waitUntilHealthCeheckPassed(ctx); err != nil {
return err
}
log.Info("🤩 canary task container(s) is healthy!")
log.Infof("canary task '%s' ensured.", *c.taskArn)
if c.lb == nil {
log.Infof("no load balancer is attached to service '%s'. skip registration to target group", c.Env.Service)
return c.waitForIdleDuration(ctx)
} else {
if err := c.registerToTargetGroup(ctx); err != nil {
return err
}
log.Infof("😷 ensuring canary task to become healthy...")
if err := c.waitUntilTargetHealthy(ctx); err != nil {
return err
}
log.Info("🤩 canary task is healthy!")
return nil
}
}

func (c *CanaryTask) waitForIdleDuration(ctx context.Context) error {
log.Infof("wait %d seconds for canary task to be stable...", c.Env.CanaryTaskIdleDuration)
wait := make(chan bool)
go func() {
duration := c.Env.CanaryTaskIdleDuration
for duration > 0 {
log.Infof("still waiting...; %d seconds left", duration)
wt := 10
if duration < 10 {
wt = duration
}
<-c.Time.NewTimer(time.Duration(wt) * time.Second).C
duration -= 10
}
wait <- true
}()
<-wait
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unnecessary goroutine. The below seems enough.

		duration := c.Env.CanaryTaskIdleDuration
		for duration > 0 {
			log.Infof("still waiting...; %d seconds left", duration)
			wt := 10
			if duration < 10 {
				wt = duration
			}
			<-c.Time.NewTimer(time.Duration(wt) * time.Second).C
			duration -= 10
		}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree and ctx.Done() should also be evaluated with select.

o, err := c.Ecs.DescribeTasks(ctx, &ecs.DescribeTasksInput{
Cluster: &c.Env.Cluster,
Tasks: []string{*c.taskArn},
})
if err != nil {
return err

Check warning on line 121 in canary_task.go

View check run for this annotation

Codecov / codecov/patch

canary_task.go#L121

Added line #L121 was not covered by tests
}
task := o.Tasks[0]
if *task.LastStatus != "RUNNING" {
return xerrors.Errorf("😫 canary task has stopped: %s", *task.StoppedReason)

Check warning on line 125 in canary_task.go

View check run for this annotation

Codecov / codecov/patch

canary_task.go#L125

Added line #L125 was not covered by tests
}
return nil
}

func (c *CanaryTask) waitUntilHealthCeheckPassed(ctx context.Context) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo

Suggested change
func (c *CanaryTask) waitUntilHealthCeheckPassed(ctx context.Context) error {
func (c *CanaryTask) waitUntilHealthCheckPassed(ctx context.Context) error {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo still exists

Ceheck --> Check

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've cehecked!

log.Infof("😷 ensuring canary task container(s) to become healthy...")
containerHasHealthChecks := map[string]struct{}{}
for _, definition := range c.td.ContainerDefinitions {
if definition.HealthCheck != nil {
containerHasHealthChecks[*definition.Name] = struct{}{}
}
}
for count := 0; count < 10; count++ {
<-c.Time.NewTimer(time.Duration(15) * time.Second).C
log.Infof("canary task '%s' waits until %d container(s) become healthy", *c.taskArn, len(containerHasHealthChecks))
if o, err := c.Ecs.DescribeTasks(ctx, &ecs.DescribeTasksInput{
Cluster: &c.Env.Cluster,
Tasks: []string{*c.taskArn},
}); err != nil {
return err

Check warning on line 145 in canary_task.go

View check run for this annotation

Codecov / codecov/patch

canary_task.go#L145

Added line #L145 was not covered by tests
} else {
task := o.Tasks[0]
if *task.LastStatus != "RUNNING" {
return xerrors.Errorf("😫 canary task has stopped: %s", *task.StoppedReason)

Check warning on line 149 in canary_task.go

View check run for this annotation

Codecov / codecov/patch

canary_task.go#L149

Added line #L149 was not covered by tests
}

for _, container := range task.Containers {
if _, ok := containerHasHealthChecks[*container.Name]; !ok {
continue
}
if container.HealthStatus != ecstypes.HealthStatusHealthy {
log.Infof("container '%s' is not healthy: %s", *container.Name, container.HealthStatus)
continue
}
delete(containerHasHealthChecks, *container.Name)
}
if len(containerHasHealthChecks) == 0 {
return nil
}
}
}
return xerrors.Errorf("😨 canary task hasn't become to be healthy")
}

func (c *CanaryTask) registerToTargetGroup(ctx context.Context) error {
// Phase 3: Get task details after network interface is attached
var task ecstypes.Task
if o, err := c.Ecs.DescribeTasks(ctx, &ecs.DescribeTasksInput{
Cluster: &c.Env.Cluster,
Tasks: []string{*c.taskArn},
}); err != nil {
return err

Check warning on line 177 in canary_task.go

View check run for this annotation

Codecov / codecov/patch

canary_task.go#L177

Added line #L177 was not covered by tests
} else {
task = o.Tasks[0]
}
var targetId *string
var targetPort *int32
var subnet ec2types.Subnet
for _, container := range c.td.ContainerDefinitions {
if *container.Name == *c.lb.ContainerName {
targetPort = container.PortMappings[0].HostPort
}
}
if targetPort == nil {
return xerrors.Errorf("couldn't find host port in container definition")
}
if c.Env.CanaryInstanceArn == "" { // Fargate
details := task.Attachments[0].Details
var subnetId *string
var privateIp *string
for _, v := range details {
if *v.Name == "subnetId" {
subnetId = v.Value
} else if *v.Name == "privateIPv4Address" {
privateIp = v.Value
}
}
if subnetId == nil || privateIp == nil {
return xerrors.Errorf("couldn't find subnetId or privateIPv4Address in task details")

Check warning on line 204 in canary_task.go

View check run for this annotation

Codecov / codecov/patch

canary_task.go#L204

Added line #L204 was not covered by tests
}
if o, err := c.Ec2.DescribeSubnets(ctx, &ec2.DescribeSubnetsInput{
SubnetIds: []string{*subnetId},
}); err != nil {
return err

Check warning on line 209 in canary_task.go

View check run for this annotation

Codecov / codecov/patch

canary_task.go#L209

Added line #L209 was not covered by tests
} else {
subnet = o.Subnets[0]
}
targetId = privateIp
log.Infof("canary task was placed: privateIp = '%s', hostPort = '%d', az = '%s'", *targetId, *targetPort, *subnet.AvailabilityZone)
} else {
var containerInstance ecstypes.ContainerInstance
if outputs, err := c.Ecs.DescribeContainerInstances(ctx, &ecs.DescribeContainerInstancesInput{
Cluster: &c.Env.Cluster,
ContainerInstances: []string{c.Env.CanaryInstanceArn},
}); err != nil {
return err

Check warning on line 221 in canary_task.go

View check run for this annotation

Codecov / codecov/patch

canary_task.go#L221

Added line #L221 was not covered by tests
} else {
containerInstance = outputs.ContainerInstances[0]
}
if o, err := c.Ec2.DescribeInstances(ctx, &ec2.DescribeInstancesInput{
InstanceIds: []string{*containerInstance.Ec2InstanceId},
}); err != nil {
return err

Check warning on line 228 in canary_task.go

View check run for this annotation

Codecov / codecov/patch

canary_task.go#L228

Added line #L228 was not covered by tests
} else if sn, err := c.Ec2.DescribeSubnets(ctx, &ec2.DescribeSubnetsInput{
SubnetIds: []string{*o.Reservations[0].Instances[0].SubnetId},
}); err != nil {
return err

Check warning on line 232 in canary_task.go

View check run for this annotation

Codecov / codecov/patch

canary_task.go#L232

Added line #L232 was not covered by tests
} else {
targetId = containerInstance.Ec2InstanceId
subnet = sn.Subnets[0]
}
log.Infof("canary task was placed: instanceId = '%s', hostPort = '%d', az = '%s'", *targetId, *targetPort, *subnet.AvailabilityZone)
}
if _, err := c.Alb.RegisterTargets(ctx, &elbv2.RegisterTargetsInput{
TargetGroupArn: c.lb.TargetGroupArn,
Targets: []elbv2types.TargetDescription{{
AvailabilityZone: subnet.AvailabilityZone,
Id: targetId,
Port: targetPort,
}},
}); err != nil {
return err

Check warning on line 247 in canary_task.go

View check run for this annotation

Codecov / codecov/patch

canary_task.go#L247

Added line #L247 was not covered by tests
}
c.target = &CanaryTarget{
targetGroupArn: c.lb.TargetGroupArn,
targetId: targetId,
targetPort: targetPort,
availabilityZone: subnet.AvailabilityZone,
}
return nil
}

func (c *CanaryTask) waitUntilTargetHealthy(
ctx context.Context,
) error {
log.Infof("checking the health state of canary task...")
var unusedCount = 0
var initialized = false
var recentState *elbv2types.TargetHealthStateEnum
for {
<-c.Time.NewTimer(time.Duration(15) * time.Second).C
if o, err := c.Alb.DescribeTargetHealth(ctx, &elbv2.DescribeTargetHealthInput{
TargetGroupArn: c.target.targetGroupArn,
Targets: []elbv2types.TargetDescription{{
Id: c.target.targetId,
Port: c.target.targetPort,
AvailabilityZone: c.target.availabilityZone,
}},
}); err != nil {
return err

Check warning on line 275 in canary_task.go

View check run for this annotation

Codecov / codecov/patch

canary_task.go#L275

Added line #L275 was not covered by tests
} else {
for _, desc := range o.TargetHealthDescriptions {
if *desc.Target.Id == *c.target.targetId && *desc.Target.Port == *c.target.targetPort {
recentState = &desc.TargetHealth.State
}
}
if recentState == nil {
return xerrors.Errorf("'%s' is not registered to the target group '%s'", c.target.targetId, c.target.targetGroupArn)
}
log.Infof("canary task '%s' (%s:%d) state is: %s", *c.taskArn, c.target.targetId, c.target.targetPort, *recentState)
switch *recentState {
case "healthy":
return nil
case "initial":
initialized = true
log.Infof("still checking the state...")
continue

Check warning on line 292 in canary_task.go

View check run for this annotation

Codecov / codecov/patch

canary_task.go#L289-L292

Added lines #L289 - L292 were not covered by tests
case "unused":
unusedCount++
if !initialized && unusedCount < 5 {
continue
}
default:

Check warning on line 298 in canary_task.go

View check run for this annotation

Codecov / codecov/patch

canary_task.go#L298

Added line #L298 was not covered by tests
}
}
// unhealthy, draining, unused
log.Errorf("😨 canary task '%s' is unhealthy", *c.taskArn)
return xerrors.Errorf(
"canary task '%s' (%s:%d) hasn't become to be healthy. The most recent state: %s",
*c.taskArn, c.target.targetId, c.target.targetPort, *recentState,
)

Check warning on line 306 in canary_task.go

View check run for this annotation

Codecov / codecov/patch

canary_task.go#L302-L306

Added lines #L302 - L306 were not covered by tests
}
}

func (c *CanaryTask) Stop(ctx context.Context) error {
if c.target == nil {
log.Info("no load balancer is attached to service. Skip deregisteration.")
} else {
log.Infof("deregistering the canary task from target group '%s'...", c.target.targetId)
if _, err := c.Alb.DeregisterTargets(ctx, &elbv2.DeregisterTargetsInput{
TargetGroupArn: c.target.targetGroupArn,
Targets: []elbv2types.TargetDescription{{
AvailabilityZone: c.target.availabilityZone,
Id: c.target.targetId,
Port: c.target.targetPort,
}},
}); err != nil {
return err

Check warning on line 323 in canary_task.go

View check run for this annotation

Codecov / codecov/patch

canary_task.go#L323

Added line #L323 was not covered by tests
}
if err := elbv2.NewTargetDeregisteredWaiter(c.Alb).Wait(ctx, &elbv2.DescribeTargetHealthInput{
TargetGroupArn: c.target.targetGroupArn,
Targets: []elbv2types.TargetDescription{{
AvailabilityZone: c.target.availabilityZone,
Id: c.target.targetId,
Port: c.target.targetPort,
}},
}, c.MaxWait); err != nil {
return err

Check warning on line 333 in canary_task.go

View check run for this annotation

Codecov / codecov/patch

canary_task.go#L333

Added line #L333 was not covered by tests
}
log.Infof(
"canary task '%s' has successfully been deregistered from target group '%s'",
*c.taskArn, c.target.targetId,
)
}
log.Infof("stopping the canary task '%s'...", *c.taskArn)
if _, err := c.Ecs.StopTask(ctx, &ecs.StopTaskInput{
Cluster: &c.Env.Cluster,
Task: c.taskArn,
}); err != nil {
return err

Check warning on line 345 in canary_task.go

View check run for this annotation

Codecov / codecov/patch

canary_task.go#L345

Added line #L345 was not covered by tests
}
if err := ecs.NewTasksStoppedWaiter(c.Ecs).Wait(ctx, &ecs.DescribeTasksInput{
Cluster: &c.Env.Cluster,
Tasks: []string{*c.taskArn},
}, c.MaxWait); err != nil {
return err

Check warning on line 351 in canary_task.go

View check run for this annotation

Codecov / codecov/patch

canary_task.go#L351

Added line #L351 was not covered by tests
}
log.Infof("canary task '%s' has successfully been stopped", *c.taskArn)
return nil
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ require (
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1 // indirect
golang.org/x/sync v0.7.0
golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028
gopkg.in/yaml.v3 v3.0.1 // indirect
)
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96b
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down
Loading