Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reapply "feat: support canary test for multiple load balancers (#80)"… #84

Merged
merged 3 commits into from
Jun 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
takkyuuplayer marked this conversation as resolved.
Show resolved Hide resolved
"explorer.fileNesting.enabled": true,
"explorer.fileNesting.patterns": {
"*.go": "${capture}_test.go",
"go.mod": "go.sum"
}
}
354 changes: 354 additions & 0 deletions canary_task.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,354 @@
package cage

import (
"context"
"fmt"
"time"

"github.com/apex/log"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/ec2"
ec2types "github.com/aws/aws-sdk-go-v2/service/ec2/types"
"github.com/aws/aws-sdk-go-v2/service/ecs"
ecstypes "github.com/aws/aws-sdk-go-v2/service/ecs/types"
elbv2 "github.com/aws/aws-sdk-go-v2/service/elasticloadbalancingv2"
elbv2types "github.com/aws/aws-sdk-go-v2/service/elasticloadbalancingv2/types"
"golang.org/x/xerrors"
)

type CanaryTarget struct {
targetGroupArn *string
targetId *string
targetPort *int32
availabilityZone *string
}

type CanaryTask struct {
*cage
td *ecstypes.TaskDefinition
lb *ecstypes.LoadBalancer
networkConfiguration *ecstypes.NetworkConfiguration
platformVersion *string
taskArn *string
target *CanaryTarget
}

func (c *CanaryTask) Start(ctx context.Context) error {
if c.Env.CanaryInstanceArn != "" {
// ec2
startTask := &ecs.StartTaskInput{
Cluster: &c.Env.Cluster,
Group: aws.String(fmt.Sprintf("cage:canary-task:%s", c.Env.Service)),
NetworkConfiguration: c.networkConfiguration,
TaskDefinition: c.td.TaskDefinitionArn,
ContainerInstances: []string{c.Env.CanaryInstanceArn},
}
if o, err := c.Ecs.StartTask(ctx, startTask); err != nil {
return err

Check warning on line 47 in canary_task.go

View check run for this annotation

Codecov / codecov/patch

canary_task.go#L47

Added line #L47 was not covered by tests
} else {
c.taskArn = o.Tasks[0].TaskArn
}
} else {
// fargate
if o, err := c.Ecs.RunTask(ctx, &ecs.RunTaskInput{
Cluster: &c.Env.Cluster,
Group: aws.String(fmt.Sprintf("cage:canary-task:%s", c.Env.Service)),
NetworkConfiguration: c.networkConfiguration,
TaskDefinition: c.td.TaskDefinitionArn,
LaunchType: ecstypes.LaunchTypeFargate,
PlatformVersion: c.platformVersion,
}); err != nil {
return err

Check warning on line 61 in canary_task.go

View check run for this annotation

Codecov / codecov/patch

canary_task.go#L61

Added line #L61 was not covered by tests
} else {
c.taskArn = o.Tasks[0].TaskArn
}
}
return nil
}

func (c *CanaryTask) Wait(ctx context.Context) error {
log.Infof("🥚 waiting for canary task '%s' is running...", *c.taskArn)
if err := ecs.NewTasksRunningWaiter(c.Ecs).Wait(ctx, &ecs.DescribeTasksInput{
Cluster: &c.Env.Cluster,
Tasks: []string{*c.taskArn},
}, c.MaxWait); err != nil {
return err

Check warning on line 75 in canary_task.go

View check run for this annotation

Codecov / codecov/patch

canary_task.go#L75

Added line #L75 was not covered by tests
}
log.Infof("🐣 canary task '%s' is running!", *c.taskArn)
if err := c.waitUntilHealthCheckPassed(ctx); err != nil {
return err
}
log.Info("🤩 canary task container(s) is healthy!")
log.Infof("canary task '%s' ensured.", *c.taskArn)
if c.lb == nil {
log.Infof("no load balancer is attached to service '%s'. skip registration to target group", c.Env.Service)
return c.waitForIdleDuration(ctx)
} else {
if err := c.registerToTargetGroup(ctx); err != nil {
return err
}
log.Infof("😷 ensuring canary task to become healthy...")
if err := c.waitUntilTargetHealthy(ctx); err != nil {
return err
}
log.Info("🤩 canary task is healthy!")
return nil
}
}

func (c *CanaryTask) waitForIdleDuration(ctx context.Context) error {
log.Infof("wait %d seconds for canary task to be stable...", c.Env.CanaryTaskIdleDuration)
duration := c.Env.CanaryTaskIdleDuration
for duration > 0 {
wt := 10
if duration < 10 {
wt = duration
}
select {
case <-ctx.Done():
return ctx.Err()

Check warning on line 109 in canary_task.go

View check run for this annotation

Codecov / codecov/patch

canary_task.go#L108-L109

Added lines #L108 - L109 were not covered by tests
case <-c.Time.NewTimer(time.Duration(wt) * time.Second).C:
duration -= 10
}
log.Infof("still waiting...; %d seconds left", duration)
}
o, err := c.Ecs.DescribeTasks(ctx, &ecs.DescribeTasksInput{
Cluster: &c.Env.Cluster,
Tasks: []string{*c.taskArn},
})
if err != nil {
return err

Check warning on line 120 in canary_task.go

View check run for this annotation

Codecov / codecov/patch

canary_task.go#L120

Added line #L120 was not covered by tests
}
task := o.Tasks[0]
if *task.LastStatus != "RUNNING" {
return xerrors.Errorf("😫 canary task has stopped: %s", *task.StoppedReason)

Check warning on line 124 in canary_task.go

View check run for this annotation

Codecov / codecov/patch

canary_task.go#L124

Added line #L124 was not covered by tests
}
return nil
}

func (c *CanaryTask) waitUntilHealthCheckPassed(ctx context.Context) error {
log.Infof("😷 ensuring canary task container(s) to become healthy...")
containerHasHealthChecks := map[string]struct{}{}
for _, definition := range c.td.ContainerDefinitions {
if definition.HealthCheck != nil {
containerHasHealthChecks[*definition.Name] = struct{}{}
}
}
for count := 0; count < 10; count++ {
<-c.Time.NewTimer(time.Duration(15) * time.Second).C
log.Infof("canary task '%s' waits until %d container(s) become healthy", *c.taskArn, len(containerHasHealthChecks))
if o, err := c.Ecs.DescribeTasks(ctx, &ecs.DescribeTasksInput{
Cluster: &c.Env.Cluster,
Tasks: []string{*c.taskArn},
}); err != nil {
return err

Check warning on line 144 in canary_task.go

View check run for this annotation

Codecov / codecov/patch

canary_task.go#L144

Added line #L144 was not covered by tests
} else {
task := o.Tasks[0]
if *task.LastStatus != "RUNNING" {
return xerrors.Errorf("😫 canary task has stopped: %s", *task.StoppedReason)

Check warning on line 148 in canary_task.go

View check run for this annotation

Codecov / codecov/patch

canary_task.go#L148

Added line #L148 was not covered by tests
}

for _, container := range task.Containers {
if _, ok := containerHasHealthChecks[*container.Name]; !ok {
continue
}
if container.HealthStatus != ecstypes.HealthStatusHealthy {
log.Infof("container '%s' is not healthy: %s", *container.Name, container.HealthStatus)
continue
}
delete(containerHasHealthChecks, *container.Name)
}
if len(containerHasHealthChecks) == 0 {
return nil
}
}
}
return xerrors.Errorf("😨 canary task hasn't become to be healthy")
}

func (c *CanaryTask) registerToTargetGroup(ctx context.Context) error {
// Phase 3: Get task details after network interface is attached
var task ecstypes.Task
if o, err := c.Ecs.DescribeTasks(ctx, &ecs.DescribeTasksInput{
Cluster: &c.Env.Cluster,
Tasks: []string{*c.taskArn},
}); err != nil {
return err

Check warning on line 176 in canary_task.go

View check run for this annotation

Codecov / codecov/patch

canary_task.go#L176

Added line #L176 was not covered by tests
} else {
task = o.Tasks[0]
}
var targetId *string
var targetPort *int32
var subnet ec2types.Subnet
for _, container := range c.td.ContainerDefinitions {
if *container.Name == *c.lb.ContainerName {
targetPort = container.PortMappings[0].HostPort
}
}
if targetPort == nil {
return xerrors.Errorf("couldn't find host port in container definition")
}
if c.Env.CanaryInstanceArn == "" { // Fargate
details := task.Attachments[0].Details
var subnetId *string
var privateIp *string
for _, v := range details {
if *v.Name == "subnetId" {
subnetId = v.Value
} else if *v.Name == "privateIPv4Address" {
privateIp = v.Value
}
}
if subnetId == nil || privateIp == nil {
return xerrors.Errorf("couldn't find subnetId or privateIPv4Address in task details")

Check warning on line 203 in canary_task.go

View check run for this annotation

Codecov / codecov/patch

canary_task.go#L203

Added line #L203 was not covered by tests
}
if o, err := c.Ec2.DescribeSubnets(ctx, &ec2.DescribeSubnetsInput{
SubnetIds: []string{*subnetId},
}); err != nil {
return err

Check warning on line 208 in canary_task.go

View check run for this annotation

Codecov / codecov/patch

canary_task.go#L208

Added line #L208 was not covered by tests
} else {
subnet = o.Subnets[0]
}
targetId = privateIp
log.Infof("canary task was placed: privateIp = '%s', hostPort = '%d', az = '%s'", *targetId, *targetPort, *subnet.AvailabilityZone)
} else {
var containerInstance ecstypes.ContainerInstance
if outputs, err := c.Ecs.DescribeContainerInstances(ctx, &ecs.DescribeContainerInstancesInput{
Cluster: &c.Env.Cluster,
ContainerInstances: []string{c.Env.CanaryInstanceArn},
}); err != nil {
return err

Check warning on line 220 in canary_task.go

View check run for this annotation

Codecov / codecov/patch

canary_task.go#L220

Added line #L220 was not covered by tests
} else {
containerInstance = outputs.ContainerInstances[0]
}
if o, err := c.Ec2.DescribeInstances(ctx, &ec2.DescribeInstancesInput{
InstanceIds: []string{*containerInstance.Ec2InstanceId},
}); err != nil {
return err

Check warning on line 227 in canary_task.go

View check run for this annotation

Codecov / codecov/patch

canary_task.go#L227

Added line #L227 was not covered by tests
} else if sn, err := c.Ec2.DescribeSubnets(ctx, &ec2.DescribeSubnetsInput{
SubnetIds: []string{*o.Reservations[0].Instances[0].SubnetId},
}); err != nil {
return err

Check warning on line 231 in canary_task.go

View check run for this annotation

Codecov / codecov/patch

canary_task.go#L231

Added line #L231 was not covered by tests
} else {
targetId = containerInstance.Ec2InstanceId
subnet = sn.Subnets[0]
}
log.Infof("canary task was placed: instanceId = '%s', hostPort = '%d', az = '%s'", *targetId, *targetPort, *subnet.AvailabilityZone)
}
if _, err := c.Alb.RegisterTargets(ctx, &elbv2.RegisterTargetsInput{
TargetGroupArn: c.lb.TargetGroupArn,
Targets: []elbv2types.TargetDescription{{
AvailabilityZone: subnet.AvailabilityZone,
Id: targetId,
Port: targetPort,
}},
}); err != nil {
return err

Check warning on line 246 in canary_task.go

View check run for this annotation

Codecov / codecov/patch

canary_task.go#L246

Added line #L246 was not covered by tests
}
c.target = &CanaryTarget{
targetGroupArn: c.lb.TargetGroupArn,
targetId: targetId,
targetPort: targetPort,
availabilityZone: subnet.AvailabilityZone,
}
return nil
}

func (c *CanaryTask) waitUntilTargetHealthy(
ctx context.Context,
) error {
log.Infof("checking the health state of canary task...")
var unusedCount = 0
var initialized = false
var recentState *elbv2types.TargetHealthStateEnum
for {
<-c.Time.NewTimer(time.Duration(15) * time.Second).C
if o, err := c.Alb.DescribeTargetHealth(ctx, &elbv2.DescribeTargetHealthInput{
TargetGroupArn: c.target.targetGroupArn,
Targets: []elbv2types.TargetDescription{{
Id: c.target.targetId,
Port: c.target.targetPort,
AvailabilityZone: c.target.availabilityZone,
}},
}); err != nil {
return err

Check warning on line 274 in canary_task.go

View check run for this annotation

Codecov / codecov/patch

canary_task.go#L274

Added line #L274 was not covered by tests
} else {
for _, desc := range o.TargetHealthDescriptions {
if *desc.Target.Id == *c.target.targetId && *desc.Target.Port == *c.target.targetPort {
recentState = &desc.TargetHealth.State
}
}
if recentState == nil {
return xerrors.Errorf("'%s' is not registered to the target group '%s'", c.target.targetId, c.target.targetGroupArn)
}
log.Infof("canary task '%s' (%s:%d) state is: %s", *c.taskArn, c.target.targetId, c.target.targetPort, *recentState)
switch *recentState {
case "healthy":
return nil
case "initial":
initialized = true
log.Infof("still checking the state...")
continue

Check warning on line 291 in canary_task.go

View check run for this annotation

Codecov / codecov/patch

canary_task.go#L288-L291

Added lines #L288 - L291 were not covered by tests
case "unused":
unusedCount++
if !initialized && unusedCount < 5 {
continue
}
default:

Check warning on line 297 in canary_task.go

View check run for this annotation

Codecov / codecov/patch

canary_task.go#L297

Added line #L297 was not covered by tests
}
}
// unhealthy, draining, unused
log.Errorf("😨 canary task '%s' is unhealthy", *c.taskArn)
return xerrors.Errorf(
"canary task '%s' (%s:%d) hasn't become to be healthy. The most recent state: %s",
*c.taskArn, c.target.targetId, c.target.targetPort, *recentState,
)

Check warning on line 305 in canary_task.go

View check run for this annotation

Codecov / codecov/patch

canary_task.go#L301-L305

Added lines #L301 - L305 were not covered by tests
}
}

func (c *CanaryTask) Stop(ctx context.Context) error {
if c.target == nil {
log.Info("no load balancer is attached to service. Skip deregisteration.")
} else {
log.Infof("deregistering the canary task from target group '%s'...", c.target.targetId)
if _, err := c.Alb.DeregisterTargets(ctx, &elbv2.DeregisterTargetsInput{
TargetGroupArn: c.target.targetGroupArn,
Targets: []elbv2types.TargetDescription{{
AvailabilityZone: c.target.availabilityZone,
Id: c.target.targetId,
Port: c.target.targetPort,
}},
}); err != nil {
return err

Check warning on line 322 in canary_task.go

View check run for this annotation

Codecov / codecov/patch

canary_task.go#L322

Added line #L322 was not covered by tests
}
if err := elbv2.NewTargetDeregisteredWaiter(c.Alb).Wait(ctx, &elbv2.DescribeTargetHealthInput{
TargetGroupArn: c.target.targetGroupArn,
Targets: []elbv2types.TargetDescription{{
AvailabilityZone: c.target.availabilityZone,
Id: c.target.targetId,
Port: c.target.targetPort,
}},
}, c.MaxWait); err != nil {
return err

Check warning on line 332 in canary_task.go

View check run for this annotation

Codecov / codecov/patch

canary_task.go#L332

Added line #L332 was not covered by tests
}
log.Infof(
"canary task '%s' has successfully been deregistered from target group '%s'",
*c.taskArn, c.target.targetId,
)
}
log.Infof("stopping the canary task '%s'...", *c.taskArn)
if _, err := c.Ecs.StopTask(ctx, &ecs.StopTaskInput{
Cluster: &c.Env.Cluster,
Task: c.taskArn,
}); err != nil {
return err

Check warning on line 344 in canary_task.go

View check run for this annotation

Codecov / codecov/patch

canary_task.go#L344

Added line #L344 was not covered by tests
}
if err := ecs.NewTasksStoppedWaiter(c.Ecs).Wait(ctx, &ecs.DescribeTasksInput{
Cluster: &c.Env.Cluster,
Tasks: []string{*c.taskArn},
}, c.MaxWait); err != nil {
return err

Check warning on line 350 in canary_task.go

View check run for this annotation

Codecov / codecov/patch

canary_task.go#L350

Added line #L350 was not covered by tests
}
log.Infof("canary task '%s' has successfully been stopped", *c.taskArn)
return nil
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ require (
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1 // indirect
golang.org/x/sync v0.7.0
golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028
gopkg.in/yaml.v3 v3.0.1 // indirect
)
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96b
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down
Loading