Skip to content

Commit

Permalink
fix: fix race conditions (#4074)
Browse files Browse the repository at this point in the history
* test: test deployMachinesApp

This test is a by-product of my investigation. The investigation resulted
a server-side fix, but the test itself and some doc comments may be useful.

* fix: fix race conditions

There are multiple race conditions we've missed. Our unit tests are
running with -race, but we don't have much unit tests around deploy
package.
  • Loading branch information
kzys authored Nov 25, 2024
1 parent cad0510 commit b983a0d
Show file tree
Hide file tree
Showing 13 changed files with 197 additions and 52 deletions.
2 changes: 1 addition & 1 deletion agent/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
)

// Establish starts the daemon, if necessary, and returns a client to it.
func Establish(ctx context.Context, apiClient flyutil.Client) (*Client, error) {
func Establish(ctx context.Context, apiClient wireguard.WebClient) (*Client, error) {
if err := wireguard.PruneInvalidPeers(ctx, apiClient); err != nil {
return nil, err
}
Expand Down
17 changes: 10 additions & 7 deletions internal/command/deploy/machines.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,13 +113,16 @@ func argsFromManifest(manifest *DeployManifest, app *fly.AppCompact) MachineDepl
}

type machineDeployment struct {
apiClient flyutil.Client
flapsClient flapsutil.FlapsClient
io *iostreams.IOStreams
colorize *iostreams.ColorScheme
app *fly.AppCompact
appConfig *appconfig.Config
img string
// apiClient is a client to use web.
apiClient webClient
// flapsClient is a client to use flaps.
flapsClient flapsutil.FlapsClient
io *iostreams.IOStreams
colorize *iostreams.ColorScheme
app *fly.AppCompact
appConfig *appconfig.Config
img string
// machineSet is this application's machines.
machineSet machine.MachineSet
releaseCommandMachine machine.MachineSet
volumes map[string][]fly.Volume
Expand Down
50 changes: 24 additions & 26 deletions internal/command/deploy/machines_deploymachinesapp.go
Original file line number Diff line number Diff line change
Expand Up @@ -809,32 +809,29 @@ func (md *machineDeployment) updateUsingRollingStrategy(parentCtx context.Contex
groupsPool.Go(func(ctx context.Context) error {
eg, ctx := errgroup.WithContext(ctx)

eg.Go(func() (err error) {
poolSize := len(coldMachines)
if poolSize >= STOPPED_MACHINES_POOL_SIZE {
poolSize = STOPPED_MACHINES_POOL_SIZE
}

if len(coldMachines) > 0 {
// for cold machines, we can update all of them at once.
// there's no need for protection against downtime since the machines are already stopped
startIdx += len(coldMachines)
return md.updateEntriesGroup(ctx, group, coldMachines, sl, startIdx-len(coldMachines), poolSize)
}

return nil
})

eg.Go(func() (err error) {
// for warm machines, we update them in chunks of size, md.maxUnavailable.
// this is to prevent downtime/low-latency during deployments
startIdx += len(warmMachines)
poolSize := md.getPoolSize(len(warmMachines))
if len(warmMachines) > 0 {
return md.updateEntriesGroup(ctx, group, warmMachines, sl, startIdx-len(warmMachines), poolSize)
}
return nil
})
coldIdx := startIdx
if len(coldMachines) > 0 {
eg.Go(func() error {
// Capping the size just in case, it may be okay to stop all of them at once.
chunk := len(coldMachines)
if chunk >= STOPPED_MACHINES_POOL_SIZE {
chunk = STOPPED_MACHINES_POOL_SIZE
}
return md.updateEntriesGroup(ctx, group, coldMachines, sl, coldIdx, chunk)
})
}
startIdx += len(coldMachines)

warmIdx := startIdx
if len(warmMachines) > 0 {
eg.Go(func() error {
// Since these machines are still receiving traffic, the chunk size here is more conservative (lower)
// then the one above.
chunk := md.getPoolSize(len(warmMachines))
return md.updateEntriesGroup(ctx, group, warmMachines, sl, warmIdx, chunk)
})
}
startIdx += len(warmMachines)

return eg.Wait()
})
Expand Down Expand Up @@ -1106,6 +1103,7 @@ func (md *machineDeployment) spawnMachineInGroup(ctx context.Context, groupName
return lm, nil
}

// resolveProcessGroupChanges returns a diff between machines
func (md *machineDeployment) resolveProcessGroupChanges() ProcessGroupsDiff {
output := ProcessGroupsDiff{
groupsToRemove: map[string]int{},
Expand Down
40 changes: 39 additions & 1 deletion internal/command/deploy/machines_deploymachinesapp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,21 @@ package deploy
import (
"context"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/superfly/fly-go"
"github.com/superfly/flyctl/internal/appconfig"
"github.com/superfly/flyctl/internal/flapsutil"
"github.com/superfly/flyctl/internal/machine"
"github.com/superfly/flyctl/internal/mock"
"github.com/superfly/flyctl/iostreams"
)

func TestUpdateExistingMachinesWRecovery(t *testing.T) {
ios, _, _, _ := iostreams.Test()
client := &mockFlapsClient{}
client.machines = []*fly.Machine{{ID: "test-machine-id"}}
client.machines = []*fly.Machine{{ID: "test-machine-id", LeaseNonce: "foobar"}}
md := &machineDeployment{
app: &fly.AppCompact{},
io: ios,
Expand All @@ -34,3 +38,37 @@ func TestUpdateExistingMachinesWRecovery(t *testing.T) {
})
assert.Error(t, err, "failed to find machine test-machine-id")
}

func TestDeployMachinesApp(t *testing.T) {
ios, _, _, _ := iostreams.Test()
client := &mockFlapsClient{}
webClient := &mock.Client{
GetAppLogsFunc: func(ctx context.Context, appName, token, region, instanceID string) (entries []fly.LogEntry, nextToken string, err error) {
return nil, "", nil
},
}
client.machines = []*fly.Machine{
{ID: "m1", LeaseNonce: "m1-lease", Config: &fly.MachineConfig{Metadata: map[string]string{fly.MachineConfigMetadataKeyFlyProcessGroup: "app"}}},
{ID: "m2", LeaseNonce: "m2-lease", Config: &fly.MachineConfig{Metadata: map[string]string{fly.MachineConfigMetadataKeyFlyProcessGroup: "app"}}},
{ID: "m3", LeaseNonce: "m3-lease", Config: &fly.MachineConfig{Metadata: map[string]string{fly.MachineConfigMetadataKeyFlyProcessGroup: "app"}}},
{ID: "m4", LeaseNonce: "m4-lease", Config: &fly.MachineConfig{Metadata: map[string]string{fly.MachineConfigMetadataKeyFlyProcessGroup: "app"}}},
}
md := &machineDeployment{
app: &fly.AppCompact{},
io: ios,
colorize: ios.ColorScheme(),
flapsClient: client,
apiClient: webClient,
strategy: "canary",
appConfig: &appconfig.Config{},
machineSet: machine.NewMachineSet(client, ios, client.machines, false),
skipSmokeChecks: true,
waitTimeout: 1 * time.Second,
}

ctx := context.Background()
ctx = iostreams.NewContext(ctx, ios)
ctx = flapsutil.NewContextWithClient(ctx, client)
err := md.deployMachinesApp(ctx)
assert.NoError(t, err)
}
54 changes: 48 additions & 6 deletions internal/command/deploy/mock_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"net/http"
"sync"
"time"

fly "github.com/superfly/fly-go"
Expand All @@ -22,12 +23,19 @@ type mockFlapsClient struct {
breakUncordon bool
breakSetMetadata bool
breakList bool
breakDestroy bool
breakLease bool

machines []*fly.Machine
// mu to protect the members below.
mu sync.Mutex
machines []*fly.Machine
leases map[string]struct{}
nextMachineID int
}

func (m *mockFlapsClient) AcquireLease(ctx context.Context, machineID string, ttl *int) (*fly.MachineLease, error) {
return nil, fmt.Errorf("failed to acquire lease for %s", machineID)
nonce := fmt.Sprintf("%x-lease", machineID)
return m.RefreshLease(ctx, machineID, ttl, nonce)
}

func (m *mockFlapsClient) Cordon(ctx context.Context, machineID string, nonce string) (err error) {
Expand Down Expand Up @@ -63,7 +71,10 @@ func (m *mockFlapsClient) DeleteVolume(ctx context.Context, volumeId string) (*f
}

func (m *mockFlapsClient) Destroy(ctx context.Context, input fly.RemoveMachineInput, nonce string) (err error) {
return fmt.Errorf("failed to destroy %s", input.ID)
if m.breakDestroy {
return fmt.Errorf("failed to destroy %s", input.ID)
}
return nil
}

func (m *mockFlapsClient) Exec(ctx context.Context, machineID string, in *fly.MachineExecRequest) (*fly.MachineExecResponse, error) {
Expand Down Expand Up @@ -119,10 +130,17 @@ func (m *mockFlapsClient) Kill(ctx context.Context, machineID string) (err error
}

func (m *mockFlapsClient) Launch(ctx context.Context, builder fly.LaunchMachineInput) (*fly.Machine, error) {
m.mu.Lock()
defer m.mu.Unlock()

if m.breakLaunch {
return nil, fmt.Errorf("failed to launch %s", builder.ID)
}
return &fly.Machine{}, nil
m.nextMachineID += 1
return &fly.Machine{
ID: fmt.Sprintf("%x", m.nextMachineID),
LeaseNonce: fmt.Sprintf("%x-launch-lease", m.nextMachineID),
}, nil
}

func (m *mockFlapsClient) List(ctx context.Context, state string) ([]*fly.Machine, error) {
Expand All @@ -149,11 +167,35 @@ func (m *mockFlapsClient) NewRequest(ctx context.Context, method, path string, i
}

func (m *mockFlapsClient) RefreshLease(ctx context.Context, machineID string, ttl *int, nonce string) (*fly.MachineLease, error) {
return nil, fmt.Errorf("failed to refresh lease for %s", machineID)
m.mu.Lock()
defer m.mu.Unlock()

if m.breakLease {
return nil, fmt.Errorf("failed to acquire lease for %s", machineID)
}

if m.leases == nil {
m.leases = make(map[string]struct{})
}
m.leases[machineID] = struct{}{}

return &fly.MachineLease{
Status: "success",
Data: &fly.MachineLeaseData{Nonce: nonce},
}, nil
}

func (m *mockFlapsClient) ReleaseLease(ctx context.Context, machineID, nonce string) error {
return fmt.Errorf("failed to release lease for %s", machineID)
m.mu.Lock()
defer m.mu.Unlock()

_, exists := m.leases[machineID]
if !exists {
return fmt.Errorf("failed to release lease for %s", machineID)
}
delete(m.leases, machineID)

return nil
}

func (m *mockFlapsClient) Restart(ctx context.Context, in fly.RestartMachineInput, nonce string) (err error) {
Expand Down
3 changes: 3 additions & 0 deletions internal/command/deploy/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,9 @@ func (md *machineDeployment) acquireLeases(ctx context.Context, machineTuples []
ctx, span := tracing.GetTracer().Start(ctx, "acquire_leases")

leaseGroup := errgroup.Group{}
if poolSize <= 0 {
panic("pool size must be > 0")
}
leaseGroup.SetLimit(poolSize)

for _, machineTuple := range machineTuples {
Expand Down
28 changes: 28 additions & 0 deletions internal/command/deploy/web_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package deploy

import (
"context"
"net"

fly "github.com/superfly/fly-go"
"github.com/superfly/flyctl/logs"
)

// webClient is a subset of web API that is needed for the deploy package.
type webClient interface {
AddCertificate(ctx context.Context, appName, hostname string) (*fly.AppCertificate, *fly.HostnameCheck, error)
AllocateIPAddress(ctx context.Context, appName string, addrType string, region string, org *fly.Organization, network string) (*fly.IPAddress, error)
GetIPAddresses(ctx context.Context, appName string) ([]fly.IPAddress, error)
AllocateSharedIPAddress(ctx context.Context, appName string) (net.IP, error)

LatestImage(ctx context.Context, appName string) (string, error)

CreateRelease(ctx context.Context, input fly.CreateReleaseInput) (*fly.CreateReleaseResponse, error)
UpdateRelease(ctx context.Context, input fly.UpdateReleaseInput) (*fly.UpdateReleaseResponse, error)

GetApp(ctx context.Context, appName string) (*fly.App, error)
GetOrganizationBySlug(ctx context.Context, slug string) (*fly.Organization, error)

logs.WebClient
blueGreenWebClient
}
13 changes: 12 additions & 1 deletion internal/machine/leasable_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"net/http"
"sync"
"time"

"github.com/jpillora/backoff"
Expand Down Expand Up @@ -49,10 +50,14 @@ type leasableMachine struct {
io *iostreams.IOStreams
colorize *iostreams.ColorScheme
machine *fly.Machine
leaseNonce string
leaseRefreshCancelFunc context.CancelFunc
destroyed bool
showLogs bool

// mu protects leaseNonce. A leasableMachine shouldn't be shared between
// goroutines, but StartBackgroundLeaseRefresh breaks the rule.
mu sync.Mutex
leaseNonce string
}

// TODO: make sure the other functions handle showLogs correctly
Expand Down Expand Up @@ -466,6 +471,9 @@ func (lm *leasableMachine) AcquireLease(ctx context.Context, duration time.Durat
}

func (lm *leasableMachine) RefreshLease(ctx context.Context, duration time.Duration) error {
lm.mu.Lock()
defer lm.mu.Unlock()

seconds := int(duration.Seconds())
refreshedLease, err := lm.flapsClient.RefreshLease(ctx, lm.machine.ID, &seconds, lm.leaseNonce)
if err != nil {
Expand Down Expand Up @@ -509,6 +517,9 @@ func (lm *leasableMachine) refreshLeaseUntilCanceled(ctx context.Context, durati
}

func (lm *leasableMachine) ReleaseLease(ctx context.Context) error {
lm.mu.Lock()
defer lm.mu.Unlock()

nonce := lm.leaseNonce
lm.resetLease()
if nonce == "" {
Expand Down
17 changes: 14 additions & 3 deletions internal/statuslogger/noninteractive.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,19 @@ package statuslogger
import (
"fmt"
"strings"
"sync"

"github.com/superfly/flyctl/iostreams"
)

type noninteractiveLogger struct {
io *iostreams.IOStreams
lines []*noninteractiveLine
// mu protects io.
mu sync.Mutex
io *iostreams.IOStreams

logNumbers bool
showStatus bool
lines []*noninteractiveLine
}

func (nl *noninteractiveLogger) Line(i int) StatusLine {
Expand All @@ -36,7 +40,14 @@ func (line *noninteractiveLine) Log(s string) {
buf += formatIndex(line.lineNum, len(line.logger.lines)) + " "
}
buf += s
fmt.Fprintln(line.logger.io.Out, buf)

line.println(buf)
}

func (line *noninteractiveLine) println(s string) {
line.logger.mu.Lock()
defer line.logger.mu.Unlock()
fmt.Fprintln(line.logger.io.Out, s)
}

func (line *noninteractiveLine) Logf(format string, args ...interface{}) {
Expand Down
6 changes: 5 additions & 1 deletion internal/wireguard/wg.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ import (

var cleanDNSPattern = regexp.MustCompile(`[^a-zA-Z0-9\\-]`)

type WebClient interface {
ValidateWireGuardPeers(ctx context.Context, peerIPs []string) (invalid []string, err error)
}

func generatePeerName(ctx context.Context, apiClient flyutil.Client) (string, error) {
user, err := apiClient.GetCurrentUser(ctx)
if err != nil {
Expand Down Expand Up @@ -181,7 +185,7 @@ func setWireGuardStateForOrg(ctx context.Context, orgSlug, network string, s *wg
return setWireGuardState(ctx, states)
}

func PruneInvalidPeers(ctx context.Context, apiClient flyutil.Client) error {
func PruneInvalidPeers(ctx context.Context, apiClient WebClient) error {
state, err := GetWireGuardState()
if err != nil {
return nil
Expand Down
Loading

0 comments on commit b983a0d

Please sign in to comment.