From ffdc629753a65e936b2f372c671d2ca09d3ea1df Mon Sep 17 00:00:00 2001 From: Yusuke Sakurai Date: Tue, 25 Jun 2024 18:18:24 +0900 Subject: [PATCH 1/6] wip --- cage.go | 39 ++++++++++--------- canary_task.go | 74 +++++++++++++++++++++++++++--------- cli/cage/commands/command.go | 6 +-- cli/cage/commands/flags.go | 40 +++++++++++++++++++ cli/cage/commands/rollout.go | 4 ++ cli/cage/commands/run.go | 2 + cli/cage/commands/up.go | 1 + env.go | 28 +++++++++----- rollout.go | 2 +- rollout_test.go | 74 ++++++++++++++++++------------------ run.go | 10 ++--- run_test.go | 20 ++++------ task_definition_test.go | 24 ++++++++---- timeout/timeout.go | 60 +++++++++++++++++++++++++++++ up.go | 2 +- up_test.go | 8 +--- 16 files changed, 272 insertions(+), 122 deletions(-) create mode 100644 timeout/timeout.go diff --git a/cage.go b/cage.go index ee01f01..1234ab8 100644 --- a/cage.go +++ b/cage.go @@ -5,6 +5,7 @@ import ( "time" "github.com/loilo-inc/canarycage/awsiface" + "github.com/loilo-inc/canarycage/timeout" ) type Cage interface { @@ -19,33 +20,35 @@ type Time interface { } type cage struct { - Env *Envars - Ecs awsiface.EcsClient - Alb awsiface.AlbClient - Ec2 awsiface.Ec2Client - Time Time - MaxWait time.Duration + *Input + Timeout timeout.Manager } type Input struct { - Env *Envars - ECS awsiface.EcsClient - ALB awsiface.AlbClient - EC2 awsiface.Ec2Client - Time Time - MaxWait time.Duration + Env *Envars + Ecs awsiface.EcsClient + Alb awsiface.AlbClient + Ec2 awsiface.Ec2Client + Time Time } func NewCage(input *Input) Cage { if input.Time == nil { input.Time = &timeImpl{} } + taskRunningWait := (time.Duration)(input.Env.CanaryTaskRunningWait) * time.Second + taskHealthCheckWait := (time.Duration)(input.Env.CanaryTaskHealthCheckWait) * time.Second + taskStoppedWait := (time.Duration)(input.Env.CanaryTaskStoppedWait) * time.Second + serviceStableWait := (time.Duration)(input.Env.ServiceStableWait) * time.Second return &cage{ - Env: input.Env, - Ecs: input.ECS, - Alb: input.ALB, - Ec2: input.EC2, - Time: input.Time, - MaxWait: 5 * time.Minute, + Input: input, + Timeout: timeout.NewManager( + 10*time.Minute, + &timeout.Input{ + TaskRunningWait: taskRunningWait, + TaskHealthCheckWait: taskHealthCheckWait, + TaskStoppedWait: taskStoppedWait, + ServiceStableWait: serviceStableWait, + }), } } diff --git a/canary_task.go b/canary_task.go index aacbd6b..723bc5c 100644 --- a/canary_task.go +++ b/canary_task.go @@ -3,6 +3,7 @@ package cage import ( "context" "fmt" + "strconv" "time" "github.com/apex/log" @@ -71,7 +72,7 @@ func (c *CanaryTask) Wait(ctx context.Context) error { if err := ecs.NewTasksRunningWaiter(c.Ecs).Wait(ctx, &ecs.DescribeTasksInput{ Cluster: &c.Env.Cluster, Tasks: []string{*c.taskArn}, - }, c.MaxWait); err != nil { + }, c.Timeout.TaskRunning()); err != nil { return err } log.Infof("🐣 canary task '%s' is running!", *c.taskArn) @@ -135,8 +136,11 @@ func (c *CanaryTask) waitUntilHealthCeheckPassed(ctx context.Context) error { containerHasHealthChecks[*definition.Name] = struct{}{} } } - for count := 0; count < 10; count++ { - <-c.Time.NewTimer(time.Duration(15) * time.Second).C + healthCheckWait := c.Timeout.TaskHealthCheck() + healthCheckPeriod := 15 * time.Second + countPerPeriod := int(healthCheckWait.Seconds() / 15) + for count := 0; count < countPerPeriod; count++ { + <-c.Time.NewTimer(healthCheckPeriod).C log.Infof("canary task '%s' waits until %d container(s) become healthy", *c.taskArn, len(containerHasHealthChecks)) if o, err := c.Ecs.DescribeTasks(ctx, &ecs.DescribeTasksInput{ Cluster: &c.Env.Cluster, @@ -307,10 +311,36 @@ func (c *CanaryTask) waitUntilTargetHealthy( } } +func (c *CanaryTask) targetDeregistrationDelay(ctx context.Context) (time.Duration, error) { + deregistrationDelay := 300 * time.Second + if o, err := c.Alb.DescribeTargetGroupAttributes(ctx, &elbv2.DescribeTargetGroupAttributesInput{ + TargetGroupArn: c.target.targetGroupArn, + }); err != nil { + return deregistrationDelay, err + } else { + // find deregistration_delay.timeout_seconds + for _, attr := range o.Attributes { + if *attr.Key == "deregistration_delay.timeout_seconds" { + if value, err := strconv.ParseInt(*attr.Value, 10, 64); err != nil { + return deregistrationDelay, err + } else { + deregistrationDelay = time.Duration(value) * time.Second + } + } + } + } + return deregistrationDelay, nil +} + func (c *CanaryTask) Stop(ctx context.Context) error { if c.target == nil { log.Info("no load balancer is attached to service. Skip deregisteration.") } else { + deregistrationDelay, err := c.targetDeregistrationDelay(ctx) + if err != nil { + log.Errorf("failed to get deregistration delay: %v", err) + log.Errorf("deregistration delay is set to %d seconds", deregistrationDelay) + } log.Infof("deregistering the canary task from target group '%s'...", c.target.targetId) if _, err := c.Alb.DeregisterTargets(ctx, &elbv2.DeregisterTargetsInput{ TargetGroupArn: c.target.targetGroupArn, @@ -320,22 +350,28 @@ func (c *CanaryTask) Stop(ctx context.Context) error { Port: c.target.targetPort, }}, }); err != nil { - return err - } - if err := elbv2.NewTargetDeregisteredWaiter(c.Alb).Wait(ctx, &elbv2.DescribeTargetHealthInput{ - TargetGroupArn: c.target.targetGroupArn, - Targets: []elbv2types.TargetDescription{{ - AvailabilityZone: c.target.availabilityZone, - Id: c.target.targetId, - Port: c.target.targetPort, - }}, - }, c.MaxWait); err != nil { - return err + log.Errorf("failed to deregister the canary task from target group: %v", err) + log.Errorf("continuing to stop the canary task...") + } else { + log.Infof("deregister operation accepted. waiting for the canary task to be deregistered...") + deregisterWait := deregistrationDelay + time.Minute // add 1 minute for safety + if err := elbv2.NewTargetDeregisteredWaiter(c.Alb).Wait(ctx, &elbv2.DescribeTargetHealthInput{ + TargetGroupArn: c.target.targetGroupArn, + Targets: []elbv2types.TargetDescription{{ + AvailabilityZone: c.target.availabilityZone, + Id: c.target.targetId, + Port: c.target.targetPort, + }}, + }, deregisterWait); err != nil { + log.Errorf("failed to wait for the canary task deregistered from target group: %v", err) + log.Errorf("continuing to stop the canary task...") + } else { + log.Infof( + "canary task '%s' has successfully been deregistered from target group '%s'", + *c.taskArn, *c.target.targetId, + ) + } } - log.Infof( - "canary task '%s' has successfully been deregistered from target group '%s'", - *c.taskArn, c.target.targetId, - ) } log.Infof("stopping the canary task '%s'...", *c.taskArn) if _, err := c.Ecs.StopTask(ctx, &ecs.StopTaskInput{ @@ -347,7 +383,7 @@ func (c *CanaryTask) Stop(ctx context.Context) error { if err := ecs.NewTasksStoppedWaiter(c.Ecs).Wait(ctx, &ecs.DescribeTasksInput{ Cluster: &c.Env.Cluster, Tasks: []string{*c.taskArn}, - }, c.MaxWait); err != nil { + }, c.Timeout.TaskStopped()); err != nil { return err } log.Infof("canary task '%s' has successfully been stopped", *c.taskArn) diff --git a/cli/cage/commands/command.go b/cli/cage/commands/command.go index bb8096f..3f3fd87 100644 --- a/cli/cage/commands/command.go +++ b/cli/cage/commands/command.go @@ -40,9 +40,9 @@ func DefalutCageCliProvider(envars *cage.Envars) (cage.Cage, error) { } cagecli := cage.NewCage(&cage.Input{ Env: envars, - ECS: ecs.NewFromConfig(conf), - EC2: ec2.NewFromConfig(conf), - ALB: elasticloadbalancingv2.NewFromConfig(conf), + Ecs: ecs.NewFromConfig(conf), + Ec2: ec2.NewFromConfig(conf), + Alb: elasticloadbalancingv2.NewFromConfig(conf), }) return cagecli, nil } diff --git a/cli/cage/commands/flags.go b/cli/cage/commands/flags.go index 78e87b1..cd651d9 100644 --- a/cli/cage/commands/flags.go +++ b/cli/cage/commands/flags.go @@ -48,3 +48,43 @@ func CanaryTaskIdleDurationFlag(dest *int) *cli.IntFlag { Value: 10, } } + +func TaskRunningWaitFlag(dest *int) *cli.IntFlag { + return &cli.IntFlag{ + Name: "canaryTaskRunningWait", + EnvVars: []string{cage.CanaryTaskRunningWait}, + Usage: "Duration seconds for waiting canary task running", + Destination: dest, + Value: 300, + } +} + +func TaskHealthCheckWaitFlag(dest *int) *cli.IntFlag { + return &cli.IntFlag{ + Name: "canaryTaskHealthCheckWait", + EnvVars: []string{cage.CanaryTaskHealthCheckWait}, + Usage: "Duration seconds for waiting canary task health check", + Destination: dest, + Value: 300, + } +} + +func TaskStoppedWaitFlag(dest *int) *cli.IntFlag { + return &cli.IntFlag{ + Name: "canaryTaskStoppedWait", + EnvVars: []string{cage.CanaryTaskStoppedWait}, + Usage: "Duration seconds for waiting canary task stopped", + Destination: dest, + Value: 300, + } +} + +func ServiceStableWaitFlag(dest *int) *cli.IntFlag { + return &cli.IntFlag{ + Name: "serviceStableWait", + EnvVars: []string{cage.ServiceStableWait}, + Usage: "Duration seconds for waiting service stable", + Destination: dest, + Value: 300, + } +} diff --git a/cli/cage/commands/rollout.go b/cli/cage/commands/rollout.go index c48b12a..94f8b51 100644 --- a/cli/cage/commands/rollout.go +++ b/cli/cage/commands/rollout.go @@ -36,6 +36,10 @@ func (c *CageCommands) RollOut( Usage: "Update service configurations except for task definiton. Default is false.", Destination: &updateServiceConf, }, + TaskRunningWaitFlag(&envars.CanaryTaskRunningWait), + TaskHealthCheckWaitFlag(&envars.CanaryTaskHealthCheckWait), + TaskStoppedWaitFlag(&envars.CanaryTaskStoppedWait), + ServiceStableWaitFlag(&envars.ServiceStableWait), }, Action: func(ctx *cli.Context) error { dir, _, err := c.requireArgs(ctx, 1, 1) diff --git a/cli/cage/commands/run.go b/cli/cage/commands/run.go index 605bc95..26506c0 100644 --- a/cli/cage/commands/run.go +++ b/cli/cage/commands/run.go @@ -21,6 +21,8 @@ func (c *CageCommands) Run( Flags: []cli.Flag{ RegionFlag(&envars.Region), ClusterFlag(&envars.Cluster), + TaskRunningWaitFlag(&envars.CanaryTaskRunningWait), + TaskStoppedWaitFlag(&envars.CanaryTaskStoppedWait), }, Action: func(ctx *cli.Context) error { dir, rest, err := c.requireArgs(ctx, 3, 100) diff --git a/cli/cage/commands/up.go b/cli/cage/commands/up.go index 81475d7..1ea12ce 100644 --- a/cli/cage/commands/up.go +++ b/cli/cage/commands/up.go @@ -22,6 +22,7 @@ func (c *CageCommands) Up( ServiceFlag(&envars.Service), TaskDefinitionArnFlag(&envars.TaskDefinitionArn), CanaryTaskIdleDurationFlag(&envars.CanaryTaskIdleDuration), + ServiceStableWaitFlag(&envars.ServiceStableWait), }, Action: func(ctx *cli.Context) error { dir, _, err := c.requireArgs(ctx, 1, 1) diff --git a/env.go b/env.go index 1e4fbfd..8cfcac4 100644 --- a/env.go +++ b/env.go @@ -11,16 +11,20 @@ import ( ) type Envars struct { - _ struct{} `type:"struct"` - CI bool `json:"ci" type:"bool"` - Region string `json:"region" type:"string"` - Cluster string `json:"cluster" type:"string" required:"true"` - Service string `json:"service" type:"string" required:"true"` - CanaryInstanceArn string - TaskDefinitionArn string `json:"nextTaskDefinitionArn" type:"string"` - TaskDefinitionInput *ecs.RegisterTaskDefinitionInput - ServiceDefinitionInput *ecs.CreateServiceInput - CanaryTaskIdleDuration int + _ struct{} `type:"struct"` + CI bool `json:"ci" type:"bool"` + Region string `json:"region" type:"string"` + Cluster string `json:"cluster" type:"string" required:"true"` + Service string `json:"service" type:"string" required:"true"` + CanaryInstanceArn string + TaskDefinitionArn string `json:"nextTaskDefinitionArn" type:"string"` + TaskDefinitionInput *ecs.RegisterTaskDefinitionInput + ServiceDefinitionInput *ecs.CreateServiceInput + CanaryTaskIdleDuration int // sec + CanaryTaskRunningWait int // sec + CanaryTaskHealthCheckWait int // sec + CanaryTaskStoppedWait int // sec + ServiceStableWait int // sec } // required @@ -35,6 +39,10 @@ const CanaryInstanceArnKey = "CAGE_CANARY_INSTANCE_ARN" const RegionKey = "CAGE_REGION" const CanaryTaskIdleDuration = "CAGE_CANARY_TASK_IDLE_DURATION" const UpdateServiceKey = "CAGE_UPDATE_SERVIEC" +const CanaryTaskRunningWait = "CAGE_CANARY_TASK_RUNNING_WAIT" +const CanaryTaskHealthCheckWait = "CAGE_CANARY_TASK_HEALTH_CHECK_WAIT" +const CanaryTaskStoppedWait = "CAGE_CANARY_TASK_STOPPED_WAIT" +const ServiceStableWait = "CAGE_SERVICE_STABLE_WAIT" func EnsureEnvars( dest *Envars, diff --git a/rollout.go b/rollout.go index 47e9601..b0aaa20 100644 --- a/rollout.go +++ b/rollout.go @@ -110,7 +110,7 @@ func (c *cage) RollOut(ctx context.Context, input *RollOutInput) (*RollOutResult if err := ecs.NewServicesStableWaiter(c.Ecs).Wait(ctx, &ecs.DescribeServicesInput{ Cluster: &c.Env.Cluster, Services: []string{c.Env.Service}, - }, c.MaxWait); err != nil { + }, c.Timeout.ServiceStable()); err != nil { return result, err } log.Infof("🥴 service '%s' has become to be stable!", c.Env.Service) diff --git a/rollout_test.go b/rollout_test.go index 13544c5..841a7b6 100644 --- a/rollout_test.go +++ b/rollout_test.go @@ -38,9 +38,9 @@ func TestCage_RollOut_FARGATE(t *testing.T) { cagecli := cage.NewCage(&cage.Input{ Env: envars, - ECS: ecsMock, - ALB: albMock, - EC2: ec2Mock, + Ecs: ecsMock, + Alb: albMock, + Ec2: ec2Mock, Time: test.NewFakeTime(), }) ctx := context.Background() @@ -61,9 +61,9 @@ func TestCage_RollOut_FARGATE(t *testing.T) { mctx, ecsMock, albMock, ec2Mock := test.Setup(ctrl, envars, 1, "FARGATE") cagecli := cage.NewCage(&cage.Input{ Env: envars, - ECS: ecsMock, - ALB: albMock, - EC2: ec2Mock, + Ecs: ecsMock, + Alb: albMock, + Ec2: ec2Mock, Time: test.NewFakeTime(), }) ctx := context.Background() @@ -99,9 +99,9 @@ func TestCage_RollOut_FARGATE(t *testing.T) { ) cagecli := cage.NewCage(&cage.Input{ Env: envars, - ECS: ecsMock, - ALB: albMock, - EC2: ec2Mock, + Ecs: ecsMock, + Alb: albMock, + Ec2: ec2Mock, Time: test.NewFakeTime(), }) ctx := context.Background() @@ -142,9 +142,9 @@ func TestCage_RollOut_FARGATE(t *testing.T) { ) cagecli := cage.NewCage(&cage.Input{ Env: envars, - ECS: ecsMock, - EC2: ec2Mock, - ALB: albMock, + Ecs: ecsMock, + Ec2: ec2Mock, + Alb: albMock, Time: test.NewFakeTime(), }) ctx := context.Background() @@ -171,9 +171,9 @@ func TestCage_RollOut_FARGATE(t *testing.T) { envars.ServiceDefinitionInput.PlatformVersion = aws.String("LATEST") cagecli := cage.NewCage(&cage.Input{ Env: envars, - ECS: ecsMock, - ALB: albMock, - EC2: ec2Mock, + Ecs: ecsMock, + Alb: albMock, + Ec2: ec2Mock, Time: test.NewFakeTime(), }) ctx := context.Background() @@ -194,9 +194,9 @@ func TestCage_RollOut_FARGATE(t *testing.T) { mctx, ecsMock, albMock, ec2Mock := test.Setup(ctrl, envars, 1, "FARGATE") cagecli := cage.NewCage(&cage.Input{ Env: envars, - ECS: ecsMock, - ALB: albMock, - EC2: ec2Mock, + Ecs: ecsMock, + Alb: albMock, + Ec2: ec2Mock, Time: test.NewFakeTime(), }) ctx := context.Background() @@ -219,9 +219,9 @@ func TestCage_RollOut_FARGATE(t *testing.T) { delete(mocker.Services, envars.Service) cagecli := cage.NewCage(&cage.Input{ Env: envars, - ECS: ecsMock, - EC2: ec2Mock, - ALB: albMock, + Ecs: ecsMock, + Ec2: ec2Mock, + Alb: albMock, }) ctx := context.Background() _, err := cagecli.RollOut(ctx, &cage.RollOutInput{}) @@ -235,9 +235,9 @@ func TestCage_RollOut_FARGATE(t *testing.T) { _, ecsMock, albMock, ec2Mock := test.Setup(ctrl, envars, 2, "FARGATE") cagecli := cage.NewCage(&cage.Input{ Env: envars, - ECS: ecsMock, - EC2: ec2Mock, - ALB: albMock, + Ecs: ecsMock, + Alb: albMock, + Ec2: ec2Mock, Time: test.NewFakeTime(), }) ctx := context.Background() @@ -261,7 +261,7 @@ func TestCage_RollOut_FARGATE(t *testing.T) { ) cagecli := cage.NewCage(&cage.Input{ Env: envars, - ECS: ecsMock, + Ecs: ecsMock, Time: test.NewFakeTime(), }) _, err := cagecli.RollOut(context.Background(), &cage.RollOutInput{}) @@ -304,9 +304,9 @@ func TestCage_RollOut_FARGATE(t *testing.T) { cagecli := cage.NewCage(&cage.Input{ Env: envars, - ECS: ecsMock, - EC2: ec2Mock, - ALB: albMock, + Ecs: ecsMock, + Ec2: ec2Mock, + Alb: albMock, Time: test.NewFakeTime(), }) ctx := context.Background() @@ -350,9 +350,9 @@ func TestCage_RollOut_EC2(t *testing.T) { } cagecli := cage.NewCage(&cage.Input{ Env: envars, - ECS: ecsMock, - EC2: ec2Mock, - ALB: albMock, + Ecs: ecsMock, + Ec2: ec2Mock, + Alb: albMock, Time: test.NewFakeTime(), }) ctx := context.Background() @@ -380,9 +380,9 @@ func TestCage_RollOut_EC2_without_ContainerInstanceArn(t *testing.T) { } cagecli := cage.NewCage(&cage.Input{ Env: envars, - ECS: ecsMock, - EC2: ec2Mock, - ALB: albMock, + Ecs: ecsMock, + Ec2: ec2Mock, + Alb: albMock, Time: test.NewFakeTime(), }) ctx := context.Background() @@ -415,9 +415,9 @@ func TestCage_RollOut_EC2_no_attribute(t *testing.T) { ecsMock.EXPECT().PutAttributes(gomock.Any(), gomock.Any()).Return(&ecs.PutAttributesOutput{}, nil).AnyTimes() cagecli := cage.NewCage(&cage.Input{ Env: envars, - ECS: ecsMock, - EC2: ec2Mock, - ALB: albMock, + Ecs: ecsMock, + Ec2: ec2Mock, + Alb: albMock, Time: test.NewFakeTime(), }) ctx := context.Background() diff --git a/run.go b/run.go index 7d3f9fb..4aff00c 100644 --- a/run.go +++ b/run.go @@ -2,7 +2,6 @@ package cage import ( "context" - "time" "github.com/apex/log" "github.com/aws/aws-sdk-go-v2/aws" @@ -14,8 +13,8 @@ import ( type RunInput struct { Container *string Overrides *types.TaskOverride - MaxWait time.Duration } + type RunResult struct { ExitCode int32 } @@ -30,9 +29,6 @@ func containerExistsInDefinition(td *ecs.RegisterTaskDefinitionInput, container } func (c *cage) Run(ctx context.Context, input *RunInput) (*RunResult, error) { - if input.MaxWait == 0 { - input.MaxWait = 5 * time.Minute - } if !containerExistsInDefinition(c.Env.TaskDefinitionInput, input.Container) { return nil, xerrors.Errorf("🚫 '%s' not found in container definitions", *input.Container) } @@ -57,7 +53,7 @@ func (c *cage) Run(ctx context.Context, input *RunInput) (*RunResult, error) { if err := ecs.NewTasksRunningWaiter(c.Ecs).Wait(ctx, &ecs.DescribeTasksInput{ Cluster: &c.Env.Cluster, Tasks: []string{*taskArn}, - }, input.MaxWait); err != nil { + }, c.Timeout.TaskRunning()); err != nil { return nil, xerrors.Errorf("task failed to start: %w", err) } log.Infof("task '%s' is running", *taskArn) @@ -65,7 +61,7 @@ func (c *cage) Run(ctx context.Context, input *RunInput) (*RunResult, error) { if result, err := ecs.NewTasksStoppedWaiter(c.Ecs).WaitForOutput(ctx, &ecs.DescribeTasksInput{ Cluster: &c.Env.Cluster, Tasks: []string{*taskArn}, - }, input.MaxWait); err != nil { + }, c.Timeout.TaskStopped()); err != nil { return nil, xerrors.Errorf("task failed to stop: %w", err) } else { task := result.Tasks[0] diff --git a/run_test.go b/run_test.go index 039573b..f5fe733 100644 --- a/run_test.go +++ b/run_test.go @@ -40,9 +40,7 @@ func TestCage_Run(t *testing.T) { ) cagecli := cage.NewCage(&cage.Input{ Env: env, - ECS: ecsMock, - ALB: nil, - EC2: nil, + Ecs: ecsMock, Time: test.NewFakeTime(), }) result, err := cagecli.Run(ctx, &cage.RunInput{ @@ -71,13 +69,13 @@ func TestCage_Run(t *testing.T) { ) cagecli := cage.NewCage(&cage.Input{ Env: env, - ECS: ecsMock, + Ecs: ecsMock, Time: test.NewFakeTime(), }) result, err := cagecli.Run(ctx, &cage.RunInput{ Container: &container, Overrides: overrides, - MaxWait: 1, + // MaxWait: 1, }) assert.Nil(t, result) assert.EqualError(t, err, "task failed to start: exceeded max wait time for TasksRunning waiter") @@ -93,13 +91,13 @@ func TestCage_Run(t *testing.T) { ) cagecli := cage.NewCage(&cage.Input{ Env: env, - ECS: ecsMock, + Ecs: ecsMock, Time: test.NewFakeTime(), }) result, err := cagecli.Run(ctx, &cage.RunInput{ Container: &container, Overrides: overrides, - MaxWait: 1, + // MaxWait: 1, }) assert.Nil(t, result) assert.EqualError(t, err, "task failed to stop: exceeded max wait time for TasksStopped waiter") @@ -122,7 +120,7 @@ func TestCage_Run(t *testing.T) { ) cagecli := cage.NewCage(&cage.Input{ Env: env, - ECS: ecsMock, + Ecs: ecsMock, Time: test.NewFakeTime(), }) result, err := cagecli.Run(ctx, &cage.RunInput{ @@ -150,7 +148,7 @@ func TestCage_Run(t *testing.T) { ) cagecli := cage.NewCage(&cage.Input{ Env: env, - ECS: ecsMock, + Ecs: ecsMock, Time: test.NewFakeTime(), }) result, err := cagecli.Run(ctx, &cage.RunInput{ @@ -166,9 +164,7 @@ func TestCage_Run(t *testing.T) { env, _, ecsMock := setupForBasic(t) cagecli := cage.NewCage(&cage.Input{ Env: env, - ECS: ecsMock, - ALB: nil, - EC2: nil, + Ecs: ecsMock, Time: test.NewFakeTime(), }) result, err := cagecli.Run(ctx, &cage.RunInput{ diff --git a/task_definition_test.go b/task_definition_test.go index 76c43a1..4177d66 100644 --- a/task_definition_test.go +++ b/task_definition_test.go @@ -22,8 +22,10 @@ func TestCage_CreateNextTaskDefinition(t *testing.T) { TaskDefinitionArn: "arn://aaa", } c := &cage.CageExport{ - Env: env, - Ecs: ecsMock, + Input: &cage.Input{ + Env: env, + Ecs: ecsMock, + }, } ecsMock.EXPECT().DescribeTaskDefinition(gomock.Any(), gomock.Any()).Return(&ecs.DescribeTaskDefinitionOutput{ TaskDefinition: &ecstypes.TaskDefinition{}, @@ -39,8 +41,10 @@ func TestCage_CreateNextTaskDefinition(t *testing.T) { TaskDefinitionArn: "arn://aaa", } c := &cage.CageExport{ - Env: env, - Ecs: ecsMock, + Input: &cage.Input{ + Env: env, + Ecs: ecsMock, + }, } ecsMock.EXPECT().DescribeTaskDefinition(gomock.Any(), gomock.Any()).Return(nil, xerrors.New("error")) td, err := c.CreateNextTaskDefinition(context.Background()) @@ -52,8 +56,10 @@ func TestCage_CreateNextTaskDefinition(t *testing.T) { ecsMock := mock_awsiface.NewMockEcsClient(ctrl) env := test.DefaultEnvars() c := &cage.CageExport{ - Env: env, - Ecs: ecsMock, + Input: &cage.Input{ + Env: env, + Ecs: ecsMock, + }, } ecsMock.EXPECT().RegisterTaskDefinition(gomock.Any(), gomock.Any()).Return(&ecs.RegisterTaskDefinitionOutput{ TaskDefinition: &ecstypes.TaskDefinition{ @@ -70,8 +76,10 @@ func TestCage_CreateNextTaskDefinition(t *testing.T) { ecsMock := mock_awsiface.NewMockEcsClient(ctrl) env := test.DefaultEnvars() c := &cage.CageExport{ - Env: env, - Ecs: ecsMock, + Input: &cage.Input{ + Env: env, + Ecs: ecsMock, + }, } ecsMock.EXPECT().RegisterTaskDefinition(gomock.Any(), gomock.Any()).Return(nil, xerrors.New("error")) td, err := c.CreateNextTaskDefinition(context.Background()) diff --git a/timeout/timeout.go b/timeout/timeout.go new file mode 100644 index 0000000..e9e7b1a --- /dev/null +++ b/timeout/timeout.go @@ -0,0 +1,60 @@ +package timeout + +import "time" + +type Input struct { + TaskStoppedWait time.Duration + TaskRunningWait time.Duration + TaskHealthCheckWait time.Duration + ServiceStableWait time.Duration +} + +type Manager interface { + TaskRunning() time.Duration + TaskHealthCheck() time.Duration + TaskStopped() time.Duration + ServiceStable() time.Duration +} + +type manager struct { + *Input + DefaultTimeout time.Duration +} + +func NewManager( + defaultTimeout time.Duration, + input *Input, +) Manager { + return &manager{ + Input: input, + DefaultTimeout: defaultTimeout, + } +} + +func (t *manager) TaskRunning() time.Duration { + if t.TaskRunningWait > 0 { + return t.TaskRunningWait + } + return t.DefaultTimeout +} + +func (t *manager) TaskHealthCheck() time.Duration { + if t.TaskHealthCheckWait > 0 { + return t.TaskHealthCheckWait + } + return t.DefaultTimeout +} + +func (t *manager) TaskStopped() time.Duration { + if t.TaskStoppedWait > 0 { + return t.TaskStoppedWait + } + return t.DefaultTimeout +} + +func (t *manager) ServiceStable() time.Duration { + if t.ServiceStableWait > 0 { + return t.ServiceStableWait + } + return t.DefaultTimeout +} diff --git a/up.go b/up.go index b092e0f..62b193c 100644 --- a/up.go +++ b/up.go @@ -49,7 +49,7 @@ func (c *cage) createService(ctx context.Context, serviceDefinitionInput *ecs.Cr if err := ecs.NewServicesStableWaiter(c.Ecs).Wait(ctx, &ecs.DescribeServicesInput{ Cluster: &c.Env.Cluster, Services: []string{*serviceDefinitionInput.ServiceName}, - }, c.MaxWait); err != nil { + }, c.Timeout.ServiceStable()); err != nil { return nil, xerrors.Errorf("failed to wait for service '%s' to be STABLE: %w", *serviceDefinitionInput.ServiceName, err) } return o.Service, nil diff --git a/up_test.go b/up_test.go index 45a1243..06b5b9d 100644 --- a/up_test.go +++ b/up_test.go @@ -18,9 +18,7 @@ func TestCage_Up(t *testing.T) { delete(ctx.Services, env.Service) cagecli := cage.NewCage(&cage.Input{ Env: env, - ECS: ecsMock, - ALB: nil, - EC2: nil, + Ecs: ecsMock, }) result, err := cagecli.Up(context.Background()) assert.Nil(t, err) @@ -33,9 +31,7 @@ func TestCage_Up(t *testing.T) { _, ecsMock, _, _ := test.Setup(ctrl, env, 1, "FARGATE") cagecli := cage.NewCage(&cage.Input{ Env: env, - ECS: ecsMock, - ALB: nil, - EC2: nil, + Ecs: ecsMock, }) result, err := cagecli.Up(context.Background()) assert.Nil(t, result) From 615f6bc7b093ab47b1f27b94eb1c1da2a2afe90e Mon Sep 17 00:00:00 2001 From: Yusuke Sakurai Date: Tue, 25 Jun 2024 20:14:24 +0900 Subject: [PATCH 2/6] Update rollout_test.go --- rollout_test.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/rollout_test.go b/rollout_test.go index 841a7b6..d81cead 100644 --- a/rollout_test.go +++ b/rollout_test.go @@ -79,8 +79,9 @@ func TestCage_RollOut_FARGATE(t *testing.T) { mocker, ecsMock, _, ec2Mock := test.Setup(ctrl, envars, 2, "FARGATE") albMock := mock_awsiface.NewMockAlbClient(ctrl) - albMock.EXPECT().RegisterTargets(gomock.Any(), gomock.Any()).DoAndReturn(mocker.RegisterTarget).AnyTimes() - albMock.EXPECT().DeregisterTargets(gomock.Any(), gomock.Any()).DoAndReturn(mocker.DeregisterTarget).AnyTimes() + albMock.EXPECT().RegisterTargets(gomock.Any(), gomock.Any()).DoAndReturn(mocker.RegisterTarget).Times(1) + albMock.EXPECT().DeregisterTargets(gomock.Any(), gomock.Any()).DoAndReturn(mocker.DeregisterTarget).Times(1) + albMock.EXPECT().DescribeTargetGroupAttributes(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(mocker.DescribeTargetGroupAttibutes).Times(1) gomock.InOrder( albMock.EXPECT().DescribeTargetHealth(gomock.Any(), gomock.Any()).Return(&elbv2.DescribeTargetHealthOutput{ TargetHealthDescriptions: []elbv2types.TargetHealthDescription{ @@ -114,8 +115,9 @@ func TestCage_RollOut_FARGATE(t *testing.T) { ctrl := gomock.NewController(t) mocker, ecsMock, _, ec2Mock := test.Setup(ctrl, envars, 2, "FARGATE") albMock := mock_awsiface.NewMockAlbClient(ctrl) - albMock.EXPECT().RegisterTargets(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(mocker.RegisterTarget).AnyTimes() - albMock.EXPECT().DeregisterTargets(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(mocker.DeregisterTarget).AnyTimes() + albMock.EXPECT().RegisterTargets(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(mocker.RegisterTarget).Times(1) + albMock.EXPECT().DeregisterTargets(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(mocker.DeregisterTarget).Times(1) + albMock.EXPECT().DescribeTargetGroupAttributes(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(mocker.DescribeTargetGroupAttibutes).Times(1) gomock.InOrder( albMock.EXPECT().DescribeTargetHealth(gomock.Any(), gomock.Any(), gomock.Any()).Return(&elbv2.DescribeTargetHealthOutput{ TargetHealthDescriptions: []elbv2types.TargetHealthDescription{{ From ff3850dfc6630db479c11a73232ddc5585c352b4 Mon Sep 17 00:00:00 2001 From: Yusuke Sakurai Date: Tue, 25 Jun 2024 20:42:31 +0900 Subject: [PATCH 3/6] rename opts --- cli/cage/commands/flags.go | 30 +++++++++++++++++------------- env.go | 8 ++++---- run_test.go | 3 ++- timeout/timeout_test.go | 31 +++++++++++++++++++++++++++++++ 4 files changed, 54 insertions(+), 18 deletions(-) create mode 100644 timeout/timeout_test.go diff --git a/cli/cage/commands/flags.go b/cli/cage/commands/flags.go index cd651d9..0ce5d4f 100644 --- a/cli/cage/commands/flags.go +++ b/cli/cage/commands/flags.go @@ -43,7 +43,7 @@ func CanaryTaskIdleDurationFlag(dest *int) *cli.IntFlag { return &cli.IntFlag{ Name: "canaryTaskIdleDuration", EnvVars: []string{cage.CanaryTaskIdleDuration}, - Usage: "Idle duration seconds for ensuring canary task that has no attached load balancer", + Usage: "duration seconds for waiting canary task that isn't attached to target group considered as ready for serving traffic", Destination: dest, Value: 10, } @@ -51,40 +51,44 @@ func CanaryTaskIdleDurationFlag(dest *int) *cli.IntFlag { func TaskRunningWaitFlag(dest *int) *cli.IntFlag { return &cli.IntFlag{ - Name: "canaryTaskRunningWait", - EnvVars: []string{cage.CanaryTaskRunningWait}, - Usage: "Duration seconds for waiting canary task running", + Name: "taskRunningTimeout", + EnvVars: []string{cage.TaskRunningTimeout}, + Usage: "max duration seconds for waiting canary task running", Destination: dest, + Category: "ADVANCED", Value: 300, } } func TaskHealthCheckWaitFlag(dest *int) *cli.IntFlag { return &cli.IntFlag{ - Name: "canaryTaskHealthCheckWait", - EnvVars: []string{cage.CanaryTaskHealthCheckWait}, - Usage: "Duration seconds for waiting canary task health check", + Name: "taskHealthCheckTimeout", + EnvVars: []string{cage.TaskHealthCheckTimeout}, + Usage: "max duration seconds for waiting canary task health check", Destination: dest, + Category: "ADVANCED", Value: 300, } } func TaskStoppedWaitFlag(dest *int) *cli.IntFlag { return &cli.IntFlag{ - Name: "canaryTaskStoppedWait", - EnvVars: []string{cage.CanaryTaskStoppedWait}, - Usage: "Duration seconds for waiting canary task stopped", + Name: "taskStoppedTimeout", + EnvVars: []string{cage.TaskStoppedTimeout}, + Usage: "max duration seconds for waiting canary task stopped", Destination: dest, + Category: "ADVANCED", Value: 300, } } func ServiceStableWaitFlag(dest *int) *cli.IntFlag { return &cli.IntFlag{ - Name: "serviceStableWait", - EnvVars: []string{cage.ServiceStableWait}, - Usage: "Duration seconds for waiting service stable", + Name: "serviceStableTimeout", + EnvVars: []string{cage.ServiceStableTimeout}, + Usage: "max duration seconds for waiting service stable", Destination: dest, + Category: "ADVANCED", Value: 300, } } diff --git a/env.go b/env.go index 8cfcac4..a47c310 100644 --- a/env.go +++ b/env.go @@ -39,10 +39,10 @@ const CanaryInstanceArnKey = "CAGE_CANARY_INSTANCE_ARN" const RegionKey = "CAGE_REGION" const CanaryTaskIdleDuration = "CAGE_CANARY_TASK_IDLE_DURATION" const UpdateServiceKey = "CAGE_UPDATE_SERVIEC" -const CanaryTaskRunningWait = "CAGE_CANARY_TASK_RUNNING_WAIT" -const CanaryTaskHealthCheckWait = "CAGE_CANARY_TASK_HEALTH_CHECK_WAIT" -const CanaryTaskStoppedWait = "CAGE_CANARY_TASK_STOPPED_WAIT" -const ServiceStableWait = "CAGE_SERVICE_STABLE_WAIT" +const TaskRunningTimeout = "CAGE_TASK_RUNNING_TIMEOUT" +const TaskHealthCheckTimeout = "CAGE_TASK_HEALTH_CHECK_TIMEOUT" +const TaskStoppedTimeout = "CAGE_TASK_STOPPED_TIMEOUT" +const ServiceStableTimeout = "CAGE_SERVICE_STABLE_TIMEOUT" func EnsureEnvars( dest *Envars, diff --git a/run_test.go b/run_test.go index f5fe733..146c739 100644 --- a/run_test.go +++ b/run_test.go @@ -55,6 +55,7 @@ func TestCage_Run(t *testing.T) { container := "container" ctx := context.Background() env, mocker, ecsMock := setupForBasic(t) + env.CanaryTaskRunningWait = 1 gomock.InOrder( ecsMock.EXPECT().RunTask(gomock.Any(), gomock.Any()).DoAndReturn(mocker.RunTask), ecsMock.EXPECT().DescribeTasks(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( @@ -85,6 +86,7 @@ func TestCage_Run(t *testing.T) { container := "container" ctx := context.Background() env, mocker, ecsMock := setupForBasic(t) + env.CanaryTaskStoppedWait = 1 gomock.InOrder( ecsMock.EXPECT().RunTask(gomock.Any(), gomock.Any()).DoAndReturn(mocker.RunTask), ecsMock.EXPECT().DescribeTasks(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(mocker.DescribeTasks).Times(2), @@ -97,7 +99,6 @@ func TestCage_Run(t *testing.T) { result, err := cagecli.Run(ctx, &cage.RunInput{ Container: &container, Overrides: overrides, - // MaxWait: 1, }) assert.Nil(t, result) assert.EqualError(t, err, "task failed to stop: exceeded max wait time for TasksStopped waiter") diff --git a/timeout/timeout_test.go b/timeout/timeout_test.go new file mode 100644 index 0000000..f03e2ff --- /dev/null +++ b/timeout/timeout_test.go @@ -0,0 +1,31 @@ +package timeout_test + +import ( + "testing" + "time" + + "github.com/loilo-inc/canarycage/timeout" + "github.com/stretchr/testify/assert" +) + +func TestManager(t *testing.T) { + t.Run("no configu", func(t *testing.T) { + man := timeout.NewManager(10, &timeout.Input{}) + assert.Equal(t, time.Duration(10), man.TaskRunning()) + assert.Equal(t, time.Duration(10), man.TaskStopped()) + assert.Equal(t, time.Duration(10), man.TaskHealthCheck()) + assert.Equal(t, time.Duration(10), man.ServiceStable()) + }) + t.Run("with config", func(t *testing.T) { + man := timeout.NewManager(10, &timeout.Input{ + TaskRunningWait: 1, + TaskStoppedWait: 2, + TaskHealthCheckWait: 3, + ServiceStableWait: 4, + }) + assert.Equal(t, time.Duration(1), man.TaskRunning()) + assert.Equal(t, time.Duration(2), man.TaskStopped()) + assert.Equal(t, time.Duration(3), man.TaskHealthCheck()) + assert.Equal(t, time.Duration(4), man.ServiceStable()) + }) +} From f6727d429cdef0004aff4428606b4b12ac3eff38 Mon Sep 17 00:00:00 2001 From: Yusuke Sakurai Date: Wed, 26 Jun 2024 15:30:03 +0900 Subject: [PATCH 4/6] merge --- .vscode/settings.json | 7 + go.mod | 1 + go.sum | 2 + rollout.go | 452 ++++++------------------------------------ rollout_test.go | 2 +- 5 files changed, 73 insertions(+), 391 deletions(-) create mode 100644 .vscode/settings.json diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..1209016 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,7 @@ +{ + "explorer.fileNesting.enabled": true, + "explorer.fileNesting.patterns": { + "*.go": "${capture}_test.go", + "go.mod": "go.sum" + } +} diff --git a/go.mod b/go.mod index ffc818c..0f41e7c 100644 --- a/go.mod +++ b/go.mod @@ -39,6 +39,7 @@ require ( github.com/pmezard/go-difflib v1.0.0 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1 // indirect + golang.org/x/sync v0.7.0 golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index da0e13b..1649eec 100644 --- a/go.sum +++ b/go.sum @@ -125,6 +125,8 @@ golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96b golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= +golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= diff --git a/rollout.go b/rollout.go index 18aa623..b0aaa20 100644 --- a/rollout.go +++ b/rollout.go @@ -2,17 +2,11 @@ package cage import ( "context" - "fmt" - "time" "github.com/apex/log" - "github.com/aws/aws-sdk-go-v2/aws" - "github.com/aws/aws-sdk-go-v2/service/ec2" - ec2types "github.com/aws/aws-sdk-go-v2/service/ec2/types" "github.com/aws/aws-sdk-go-v2/service/ecs" ecstypes "github.com/aws/aws-sdk-go-v2/service/ecs/types" - elbv2 "github.com/aws/aws-sdk-go-v2/service/elasticloadbalancingv2" - elbv2types "github.com/aws/aws-sdk-go-v2/service/elasticloadbalancingv2/types" + "golang.org/x/sync/errgroup" "golang.org/x/xerrors" ) @@ -22,50 +16,35 @@ type RollOutInput struct { } type RollOutResult struct { - StartTime time.Time - EndTime time.Time ServiceIntact bool } func (c *cage) RollOut(ctx context.Context, input *RollOutInput) (*RollOutResult, error) { - ret := &RollOutResult{ - StartTime: c.Time.Now(), + result := &RollOutResult{ ServiceIntact: true, } - var aggregatedError error - throw := func(err error) (*RollOutResult, error) { - ret.EndTime = c.Time.Now() - aggregatedError = err - return ret, err - } - defer func(result *RollOutResult) { - ret.EndTime = c.Time.Now() - }(ret) - var service ecstypes.Service if out, err := c.Ecs.DescribeServices(ctx, &ecs.DescribeServicesInput{ Cluster: &c.Env.Cluster, Services: []string{ c.Env.Service, }, }); err != nil { - log.Errorf("failed to describe current service due to: %s", err) - return throw(err) + return result, xerrors.Errorf("failed to describe current service due to: %w", err) } else if len(out.Services) == 0 { - return throw(xerrors.Errorf("service '%s' doesn't exist. Run 'cage up' or create service before rolling out", c.Env.Service)) + return result, xerrors.Errorf("service '%s' doesn't exist. Run 'cage up' or create service before rolling out", c.Env.Service) } else { - service = out.Services[0] - } - if *service.Status != "ACTIVE" { - return throw(xerrors.Errorf("😵 '%s' status is '%s'. Stop rolling out", c.Env.Service, *service.Status)) - } - if service.LaunchType == ecstypes.LaunchTypeEc2 && c.Env.CanaryInstanceArn == "" { - return throw(xerrors.Errorf("🥺 --canaryInstanceArn is required when LaunchType = 'EC2'")) + service := out.Services[0] + if *service.Status != "ACTIVE" { + return result, xerrors.Errorf("😵 '%s' status is '%s'. Stop rolling out", c.Env.Service, *service.Status) + } + if service.LaunchType == ecstypes.LaunchTypeEc2 && c.Env.CanaryInstanceArn == "" { + return result, xerrors.Errorf("🥺 --canaryInstanceArn is required when LaunchType = 'EC2'") + } } log.Infof("ensuring next task definition...") var nextTaskDefinition *ecstypes.TaskDefinition if o, err := c.CreateNextTaskDefinition(ctx); err != nil { - log.Errorf("failed to register next task definition due to: %s", err) - return throw(err) + return result, xerrors.Errorf("failed to register next task definition due to: %w", err) } else { nextTaskDefinition = o } @@ -73,46 +52,39 @@ func (c *cage) RollOut(ctx context.Context, input *RollOutInput) (*RollOutResult log.Info("--updateService flag is set. use provided service configurations for canary test instead of current service") } log.Infof("starting canary task...") - canaryTask, startCanaryTaskErr := c.StartCanaryTask(ctx, nextTaskDefinition, input) + canaryTasks, startCanaryTaskErr := c.StartCanaryTasks(ctx, nextTaskDefinition, input) // ensure canary task stopped after rolling out either success or failure - defer func(canaryTask *CanaryTask, result *RollOutResult) { - if canaryTask.taskArn == nil { - return - } - if err := c.StopCanaryTask(ctx, canaryTask); err != nil { - log.Fatalf("failed to stop canary task '%s': %s", *canaryTask.taskArn, err) + defer func() { + _ = recover() + eg := errgroup.Group{} + for _, canaryTask := range canaryTasks { + if canaryTask.taskArn == nil { + continue + } + eg.Go(func() error { + err := canaryTask.Stop(ctx) + if err != nil { + log.Errorf("failed to stop canary task '%s': %s", *canaryTask.taskArn, err) + } + return err + }) } - if aggregatedError == nil { - log.Infof( - "🐥 service '%s' successfully rolled out to '%s:%d'!", - c.Env.Service, *nextTaskDefinition.Family, nextTaskDefinition.Revision, - ) - } else { - log.Errorf("😥 %s", aggregatedError) + if err := eg.Wait(); err != nil { + log.Errorf("failed to stop canary tasks due to: %s", err) } - }(&canaryTask, ret) + }() if startCanaryTaskErr != nil { - log.Errorf("failed to start canary task due to: %s", startCanaryTaskErr) - return throw(startCanaryTaskErr) + return result, xerrors.Errorf("failed to start canary task due to: %w", startCanaryTaskErr) } - log.Infof("😷 ensuring canary task container(s) to become healthy...") - if err := c.waitUntilContainersBecomeHealthy(ctx, *canaryTask.taskArn, nextTaskDefinition); err != nil { - return throw(err) + eg := errgroup.Group{} + for _, canaryTask := range canaryTasks { + eg.Go(func() error { + return canaryTask.Wait(ctx) + }) } - log.Info("🤩 canary task container(s) is healthy!") - log.Infof("canary task '%s' ensured.", *canaryTask.taskArn) - if canaryTask.target != nil { - log.Infof("😷 ensuring canary task to become healthy...") - if err := c.EnsureTaskHealthy( - ctx, - *canaryTask.taskArn, - canaryTask.target, - ); err != nil { - return throw(err) - } - log.Info("🤩 canary task is healthy!") + if err := eg.Wait(); err != nil { + return result, xerrors.Errorf("failed to wait for canary task due to: %w", err) } - ret.ServiceIntact = false log.Infof( "updating the task definition of '%s' into '%s:%d'...", c.Env.Service, *nextTaskDefinition.Family, nextTaskDefinition.Revision, @@ -131,110 +103,32 @@ func (c *cage) RollOut(ctx context.Context, input *RollOutInput) (*RollOutResult updateInput.VolumeConfigurations = c.Env.ServiceDefinitionInput.VolumeConfigurations } if _, err := c.Ecs.UpdateService(ctx, updateInput); err != nil { - return throw(err) + return result, err } + result.ServiceIntact = false log.Infof("waiting for service '%s' to be stable...", c.Env.Service) if err := ecs.NewServicesStableWaiter(c.Ecs).Wait(ctx, &ecs.DescribeServicesInput{ Cluster: &c.Env.Cluster, Services: []string{c.Env.Service}, }, c.Timeout.ServiceStable()); err != nil { - return ret, err + return result, err } log.Infof("🥴 service '%s' has become to be stable!", c.Env.Service) - ret.EndTime = c.Time.Now() - return ret, nil -} - -func (c *cage) EnsureTaskHealthy( - ctx context.Context, - taskArn string, - p *CanaryTarget, -) error { - log.Infof("checking the health state of canary task...") - var unusedCount = 0 - var initialized = false - var recentState *elbv2types.TargetHealthStateEnum - for { - <-c.Time.NewTimer(time.Duration(15) * time.Second).C - if o, err := c.Alb.DescribeTargetHealth(ctx, &elbv2.DescribeTargetHealthInput{ - TargetGroupArn: &p.targetGroupArn, - Targets: []elbv2types.TargetDescription{{ - Id: &p.targetId, - Port: &p.targetPort, - AvailabilityZone: &p.availabilityZone, - }}, - }); err != nil { - return err - } else { - recentState = GetTargetIsHealthy(o, &p.targetId, &p.targetPort) - if recentState == nil { - return xerrors.Errorf("'%s' is not registered to the target group '%s'", p.targetId, p.targetGroupArn) - } - log.Infof("canary task '%s' (%s:%d) state is: %s", taskArn, p.targetId, p.targetPort, *recentState) - switch *recentState { - case "healthy": - return nil - case "initial": - initialized = true - log.Infof("still checking the state...") - continue - case "unused": - unusedCount++ - if !initialized && unusedCount < 5 { - continue - } - default: - } - } - // unhealthy, draining, unused - log.Errorf("😨 canary task '%s' is unhealthy", taskArn) - return xerrors.Errorf( - "canary task '%s' (%s:%d) hasn't become to be healthy. The most recent state: %s", - taskArn, p.targetId, p.targetPort, *recentState, - ) - } -} - -func GetTargetIsHealthy(o *elbv2.DescribeTargetHealthOutput, targetId *string, targetPort *int32) *elbv2types.TargetHealthStateEnum { - for _, desc := range o.TargetHealthDescriptions { - if *desc.Target.Id == *targetId && *desc.Target.Port == *targetPort { - return &desc.TargetHealth.State - } - } - return nil -} - -func (c *cage) DescribeSubnet(ctx context.Context, subnetId *string) (ec2types.Subnet, error) { - if o, err := c.Ec2.DescribeSubnets(ctx, &ec2.DescribeSubnetsInput{ - SubnetIds: []string{*subnetId}, - }); err != nil { - return ec2types.Subnet{}, err - } else { - return o.Subnets[0], nil - } -} - -type CanaryTarget struct { - targetGroupArn string - targetId string - targetPort int32 - availabilityZone string -} - -type CanaryTask struct { - taskArn *string - target *CanaryTarget + log.Infof( + "🐥 service '%s' successfully rolled out to '%s:%d'!", + c.Env.Service, *nextTaskDefinition.Family, nextTaskDefinition.Revision, + ) + return result, nil } -func (c *cage) StartCanaryTask( +func (c *cage) StartCanaryTasks( ctx context.Context, nextTaskDefinition *ecstypes.TaskDefinition, input *RollOutInput, -) (CanaryTask, error) { +) ([]*CanaryTask, error) { var networkConfiguration *ecstypes.NetworkConfiguration var platformVersion *string var loadBalancers []ecstypes.LoadBalancer - var result CanaryTask if input.UpdateService { networkConfiguration = c.Env.ServiceDefinitionInput.NetworkConfiguration platformVersion = c.Env.ServiceDefinitionInput.PlatformVersion @@ -244,7 +138,7 @@ func (c *cage) StartCanaryTask( Cluster: &c.Env.Cluster, Services: []string{c.Env.Service}, }); err != nil { - return result, err + return nil, err } else { service := o.Services[0] networkConfiguration = service.NetworkConfiguration @@ -252,247 +146,25 @@ func (c *cage) StartCanaryTask( loadBalancers = service.LoadBalancers } } - // Phase 1: Start canary task - var taskArn *string - if c.Env.CanaryInstanceArn != "" { - // ec2 - startTask := &ecs.StartTaskInput{ - Cluster: &c.Env.Cluster, - Group: aws.String(fmt.Sprintf("cage:canary-task:%s", c.Env.Service)), - NetworkConfiguration: networkConfiguration, - TaskDefinition: nextTaskDefinition.TaskDefinitionArn, - ContainerInstances: []string{c.Env.CanaryInstanceArn}, - } - if o, err := c.Ecs.StartTask(ctx, startTask); err != nil { - return result, err - } else { - taskArn = o.Tasks[0].TaskArn - } - } else { - // fargate - if o, err := c.Ecs.RunTask(ctx, &ecs.RunTaskInput{ - Cluster: &c.Env.Cluster, - Group: aws.String(fmt.Sprintf("cage:canary-task:%s", c.Env.Service)), - NetworkConfiguration: networkConfiguration, - TaskDefinition: nextTaskDefinition.TaskDefinitionArn, - LaunchType: ecstypes.LaunchTypeFargate, - PlatformVersion: platformVersion, - }); err != nil { - return result, err - } else { - taskArn = o.Tasks[0].TaskArn - } - } - result.taskArn = taskArn - // Phase 2: Wait until canary task is running - log.Infof("🥚 waiting for canary task '%s' is running...", *taskArn) - if err := ecs.NewTasksRunningWaiter(c.Ecs).Wait(ctx, &ecs.DescribeTasksInput{ - Cluster: &c.Env.Cluster, - Tasks: []string{*taskArn}, - }, c.MaxWait); err != nil { - return result, err - } - log.Infof("🐣 canary task '%s' is running!", *taskArn) + var results []*CanaryTask if len(loadBalancers) == 0 { - log.Infof("no load balancer is attached to service '%s'. skip registration to target group", c.Env.Service) - log.Infof("wait %d seconds for ensuring the task goes stable", c.Env.CanaryTaskIdleDuration) - wait := make(chan bool) - go func() { - duration := c.Env.CanaryTaskIdleDuration - for duration > 0 { - log.Infof("still waiting...; %d seconds left", duration) - wt := 10 - if duration < 10 { - wt = duration - } - <-c.Time.NewTimer(time.Duration(wt) * time.Second).C - duration -= 10 - } - wait <- true - }() - <-wait - o, err := c.Ecs.DescribeTasks(ctx, &ecs.DescribeTasksInput{ - Cluster: &c.Env.Cluster, - Tasks: []string{*taskArn}, - }) - if err != nil { - return result, err + task := &CanaryTask{ + c, nextTaskDefinition, nil, networkConfiguration, platformVersion, nil, nil, } - task := o.Tasks[0] - if *task.LastStatus != "RUNNING" { - return result, xerrors.Errorf("😫 canary task has stopped: %s", *task.StoppedReason) + results = append(results, task) + if err := task.Start(ctx); err != nil { + return results, err } - return result, nil - } - // Phase 3: Get task details after network interface is attached - var task ecstypes.Task - if o, err := c.Ecs.DescribeTasks(ctx, &ecs.DescribeTasksInput{ - Cluster: &c.Env.Cluster, - Tasks: []string{*taskArn}, - }); err != nil { - return result, err } else { - task = o.Tasks[0] - } - var targetId *string - var targetPort *int32 - var subnet ec2types.Subnet - for _, container := range nextTaskDefinition.ContainerDefinitions { - if *container.Name == *loadBalancers[0].ContainerName { - targetPort = container.PortMappings[0].HostPort - } - } - if targetPort == nil { - return result, xerrors.Errorf("couldn't find host port in container definition") - } - if c.Env.CanaryInstanceArn == "" { // Fargate - details := task.Attachments[0].Details - var subnetId *string - var privateIp *string - for _, v := range details { - if *v.Name == "subnetId" { - subnetId = v.Value - } else if *v.Name == "privateIPv4Address" { - privateIp = v.Value + for _, lb := range loadBalancers { + task := &CanaryTask{ + c, nextTaskDefinition, &lb, networkConfiguration, platformVersion, nil, nil, } - } - if subnetId == nil || privateIp == nil { - return result, xerrors.Errorf("couldn't find subnetId or privateIPv4Address in task details") - } - if o, err := c.Ec2.DescribeSubnets(ctx, &ec2.DescribeSubnetsInput{ - SubnetIds: []string{*subnetId}, - }); err != nil { - return result, err - } else { - subnet = o.Subnets[0] - } - targetId = privateIp - log.Infof("canary task was placed: privateIp = '%s', hostPort = '%d', az = '%s'", *targetId, *targetPort, *subnet.AvailabilityZone) - } else { - var containerInstance ecstypes.ContainerInstance - if outputs, err := c.Ecs.DescribeContainerInstances(ctx, &ecs.DescribeContainerInstancesInput{ - Cluster: &c.Env.Cluster, - ContainerInstances: []string{c.Env.CanaryInstanceArn}, - }); err != nil { - return result, err - } else { - containerInstance = outputs.ContainerInstances[0] - } - if o, err := c.Ec2.DescribeInstances(ctx, &ec2.DescribeInstancesInput{ - InstanceIds: []string{*containerInstance.Ec2InstanceId}, - }); err != nil { - return result, err - } else if sn, err := c.DescribeSubnet(ctx, o.Reservations[0].Instances[0].SubnetId); err != nil { - return result, err - } else { - targetId = containerInstance.Ec2InstanceId - subnet = sn - } - log.Infof("canary task was placed: instanceId = '%s', hostPort = '%d', az = '%s'", *targetId, *targetPort, *subnet.AvailabilityZone) - } - if _, err := c.Alb.RegisterTargets(ctx, &elbv2.RegisterTargetsInput{ - TargetGroupArn: loadBalancers[0].TargetGroupArn, - Targets: []elbv2types.TargetDescription{{ - AvailabilityZone: subnet.AvailabilityZone, - Id: targetId, - Port: targetPort, - }}, - }); err != nil { - return result, err - } - result.target = &CanaryTarget{ - targetGroupArn: *loadBalancers[0].TargetGroupArn, - targetId: *targetId, - targetPort: *targetPort, - availabilityZone: *subnet.AvailabilityZone, - } - return result, nil -} - -func (c *cage) waitUntilContainersBecomeHealthy(ctx context.Context, taskArn string, nextTaskDefinition *ecstypes.TaskDefinition) error { - containerHasHealthChecks := map[string]struct{}{} - for _, definition := range nextTaskDefinition.ContainerDefinitions { - if definition.HealthCheck != nil { - containerHasHealthChecks[*definition.Name] = struct{}{} - } - } - - for count := 0; count < 10; count++ { - <-c.Time.NewTimer(time.Duration(15) * time.Second).C - log.Infof("canary task '%s' waits until %d container(s) become healthy", taskArn, len(containerHasHealthChecks)) - if o, err := c.Ecs.DescribeTasks(ctx, &ecs.DescribeTasksInput{ - Cluster: &c.Env.Cluster, - Tasks: []string{taskArn}, - }); err != nil { - return err - } else { - task := o.Tasks[0] - if *task.LastStatus != "RUNNING" { - return xerrors.Errorf("😫 canary task has stopped: %s", *task.StoppedReason) - } - - for _, container := range task.Containers { - if _, ok := containerHasHealthChecks[*container.Name]; !ok { - continue - } - if container.HealthStatus != ecstypes.HealthStatusHealthy { - log.Infof("container '%s' is not healthy: %s", *container.Name, container.HealthStatus) - continue - } - delete(containerHasHealthChecks, *container.Name) - } - if len(containerHasHealthChecks) == 0 { - return nil + results = append(results, task) + if err := task.Start(ctx); err != nil { + return results, err } } } - return xerrors.Errorf("😨 canary task hasn't become to be healthy") -} - -func (c *cage) StopCanaryTask(ctx context.Context, input *CanaryTask) error { - if input.target == nil { - log.Info("no load balancer is attached to service. Skip deregisteration.") - } else { - log.Infof("deregistering the canary task from target group '%s'...", input.target.targetId) - if _, err := c.Alb.DeregisterTargets(ctx, &elbv2.DeregisterTargetsInput{ - TargetGroupArn: &input.target.targetGroupArn, - Targets: []elbv2types.TargetDescription{{ - AvailabilityZone: &input.target.availabilityZone, - Id: &input.target.targetId, - Port: &input.target.targetPort, - }}, - }); err != nil { - return err - } - if err := elbv2.NewTargetDeregisteredWaiter(c.Alb).Wait(ctx, &elbv2.DescribeTargetHealthInput{ - TargetGroupArn: &input.target.targetGroupArn, - Targets: []elbv2types.TargetDescription{{ - AvailabilityZone: &input.target.availabilityZone, - Id: &input.target.targetId, - Port: &input.target.targetPort, - }}, - }, c.MaxWait); err != nil { - return err - } - log.Infof( - "canary task '%s' has successfully been deregistered from target group '%s'", - *input.taskArn, input.target.targetId, - ) - } - - log.Infof("stopping the canary task '%s'...", *input.taskArn) - if _, err := c.Ecs.StopTask(ctx, &ecs.StopTaskInput{ - Cluster: &c.Env.Cluster, - Task: input.taskArn, - }); err != nil { - return err - } - if err := ecs.NewTasksStoppedWaiter(c.Ecs).Wait(ctx, &ecs.DescribeTasksInput{ - Cluster: &c.Env.Cluster, - Tasks: []string{*input.taskArn}, - }, c.MaxWait); err != nil { - return err - } - log.Infof("canary task '%s' has successfully been stopped", *input.taskArn) - return nil + return results, nil } diff --git a/rollout_test.go b/rollout_test.go index ffe95fb..d81cead 100644 --- a/rollout_test.go +++ b/rollout_test.go @@ -210,7 +210,7 @@ func TestCage_RollOut_FARGATE(t *testing.T) { }, } result, err := cagecli.RollOut(ctx, &cage.RollOutInput{UpdateService: true}) - assert.EqualError(t, err, "couldn't find host port in container definition") + assert.EqualError(t, err, "failed to wait for canary task due to: couldn't find host port in container definition") assert.Equal(t, result.ServiceIntact, true) assert.Equal(t, 1, mctx.RunningTaskSize()) }) From 191e774c4541e316f2b332c0c854c25121ab4476 Mon Sep 17 00:00:00 2001 From: Yusuke Sakurai Date: Thu, 27 Jun 2024 14:34:06 +0900 Subject: [PATCH 5/6] PR --- cli/cage/commands/flags.go | 8 ++++---- timeout/timeout_test.go | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/cli/cage/commands/flags.go b/cli/cage/commands/flags.go index 0ce5d4f..1ea47ce 100644 --- a/cli/cage/commands/flags.go +++ b/cli/cage/commands/flags.go @@ -56,7 +56,7 @@ func TaskRunningWaitFlag(dest *int) *cli.IntFlag { Usage: "max duration seconds for waiting canary task running", Destination: dest, Category: "ADVANCED", - Value: 300, + Value: 900, // 15 minutes } } @@ -67,7 +67,7 @@ func TaskHealthCheckWaitFlag(dest *int) *cli.IntFlag { Usage: "max duration seconds for waiting canary task health check", Destination: dest, Category: "ADVANCED", - Value: 300, + Value: 900, } } @@ -78,7 +78,7 @@ func TaskStoppedWaitFlag(dest *int) *cli.IntFlag { Usage: "max duration seconds for waiting canary task stopped", Destination: dest, Category: "ADVANCED", - Value: 300, + Value: 900, } } @@ -89,6 +89,6 @@ func ServiceStableWaitFlag(dest *int) *cli.IntFlag { Usage: "max duration seconds for waiting service stable", Destination: dest, Category: "ADVANCED", - Value: 300, + Value: 900, } } diff --git a/timeout/timeout_test.go b/timeout/timeout_test.go index f03e2ff..7643af2 100644 --- a/timeout/timeout_test.go +++ b/timeout/timeout_test.go @@ -9,7 +9,7 @@ import ( ) func TestManager(t *testing.T) { - t.Run("no configu", func(t *testing.T) { + t.Run("no config", func(t *testing.T) { man := timeout.NewManager(10, &timeout.Input{}) assert.Equal(t, time.Duration(10), man.TaskRunning()) assert.Equal(t, time.Duration(10), man.TaskStopped()) From 595a59dd7582adfd32e44667d242afab4d3227fa Mon Sep 17 00:00:00 2001 From: Yusuke Sakurai Date: Thu, 27 Jun 2024 15:41:42 +0900 Subject: [PATCH 6/6] Update run_test.go --- run_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/run_test.go b/run_test.go index 146c739..d7371de 100644 --- a/run_test.go +++ b/run_test.go @@ -76,7 +76,6 @@ func TestCage_Run(t *testing.T) { result, err := cagecli.Run(ctx, &cage.RunInput{ Container: &container, Overrides: overrides, - // MaxWait: 1, }) assert.Nil(t, result) assert.EqualError(t, err, "task failed to start: exceeded max wait time for TasksRunning waiter")