Skip to content

Commit

Permalink
gcp/saver: Only return errors.KindAlreadyExists if all three exist (g…
Browse files Browse the repository at this point in the history
…omods#1957)

* gcp/saver: Only return errors.KindAlreadyExists if all three exist

In gomods#1124, a GCP lock type was added as a singleflight backend. As part of this work, the GCP backend's Save() was made serial, likely because moduploader.Upload requires a call to Exists() before it, rendering the GCP lock less useful, by doubling the calls to GCS.

However, by doing this, the existence check was now only checking the existence of the mod file, and not the info or zip. This meant that if during a Save, the zip or info uploads failed, on subsequent rquests, that when using the GCP singleflight backend, Athens would assume everything had been stashed and saved properly, and then fail to serve up the info or zip that had failed upload, meaning the cache was in an unhealable broklen state, requiring a manual intervention.

To fix this, without breaking the singleflight behavior, introduce a metadata key that is set on the mod file during its initial upload, indicating that a Stash is still in progress on subsequent files, which gets removed once all three files are uploaded successfully, which can be checked if it it is determined that the mod file already exists. That way we can return a errors.KindAlreadyExists if a Stash is in progress, but also properly return it when a Stash is *not* currently in progress if and only if all three files exist on GCS, which prevents the cache from becoming permanently poisoned.

One note is that it is possible the GCS call to remove the metadata key fails, which would mean it is left on the mod object forever. To avoid this, consider it stale after 2 minutes.

---------

Signed-off-by: Derek Buitenhuis <[email protected]>
Co-authored-by: Matt <[email protected]>
  • Loading branch information
dwbuiten and matt0x6F authored Jun 2, 2024
1 parent c1891f1 commit 0ef761c
Show file tree
Hide file tree
Showing 10 changed files with 255 additions and 17 deletions.
6 changes: 3 additions & 3 deletions cmd/proxy/actions/app_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func addProxyRoutes(

lister := module.NewVCSLister(c.GoBinary, c.GoBinaryEnvVars, fs)
checker := storage.WithChecker(s)
withSingleFlight, err := getSingleFlight(l, c, checker)
withSingleFlight, err := getSingleFlight(l, c, s, checker)
if err != nil {
return err
}
Expand Down Expand Up @@ -137,7 +137,7 @@ func (l *athensLoggerForRedis) Printf(ctx context.Context, format string, v ...a
l.logger.WithContext(ctx).Printf(format, v...)
}

func getSingleFlight(l *log.Logger, c *config.Config, checker storage.Checker) (stash.Wrapper, error) {
func getSingleFlight(l *log.Logger, c *config.Config, s storage.Backend, checker storage.Checker) (stash.Wrapper, error) {
switch c.SingleFlightType {
case "", "memory":
return stash.WithSingleflight, nil
Expand Down Expand Up @@ -173,7 +173,7 @@ func getSingleFlight(l *log.Logger, c *config.Config, checker storage.Checker) (
if c.StorageType != "gcp" {
return nil, fmt.Errorf("gcp SingleFlight only works with a gcp storage type and not: %v", c.StorageType)
}
return stash.WithGCSLock, nil
return stash.WithGCSLock(c.SingleFlight.GCP.StaleThreshold, s)
case "azureblob":
if c.StorageType != "azureblob" {
return nil, fmt.Errorf("azureblob SingleFlight only works with a azureblob storage type and not: %v", c.StorageType)
Expand Down
4 changes: 4 additions & 0 deletions config.dev.toml
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,10 @@ ShutdownTimeout = 60
# Max retries while acquiring the lock. Defaults to 10.
# Env override: ATHENS_REDIS_LOCK_MAX_RETRIES
MaxRetries = 10
[SingleFlight.GCP]
# Threshold for how long to wait in seconds for an in-progress GCP upload to
# be considered to have failed to unlock.
StaleThreshold = 120
[Storage]
# Only storage backends that are specified in Proxy.StorageType are required here
[Storage.CDN]
Expand Down
11 changes: 11 additions & 0 deletions docs/content/configuration/storage.md
Original file line number Diff line number Diff line change
Expand Up @@ -492,3 +492,14 @@ Optionally, like `redis`, you can also specify a password to connect to the `red
SentinelPassword = "sekret"

Distributed lock options can be customised for redis sentinal as well, in a similar manner as described above for redis.


### Using GCP as a singleflight mechanism

The GCP singleflight mechanism does not required configuration, and works out of the box. It has a
single option with which it can be customized:

[SingleFlight.GCP]
# Threshold for how long to wait in seconds for an in-progress GCP upload to
# be considered to have failed to unlock.
StaleThreshold = 120
1 change: 1 addition & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ func defaultConfig() *Config {
SentinelPassword: "sekret",
LockConfig: DefaultRedisLockConfig(),
},
GCP: DefaultGCPConfig(),
},
Index: &Index{
MySQL: &MySQL{
Expand Down
3 changes: 3 additions & 0 deletions pkg/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ func TestParseExampleConfig(t *testing.T) {
LockConfig: DefaultRedisLockConfig(),
},
Etcd: &Etcd{Endpoints: "localhost:2379,localhost:22379,localhost:32379"},
GCP: DefaultGCPConfig(),
}

expConf := &Config{
Expand Down Expand Up @@ -391,6 +392,8 @@ func getEnvMap(config *Config) map[string]string {
} else if singleFlight.Etcd != nil {
envVars["ATHENS_SINGLE_FLIGHT_TYPE"] = "etcd"
envVars["ATHENS_ETCD_ENDPOINTS"] = singleFlight.Etcd.Endpoints
} else if singleFlight.GCP != nil {
envVars["ATHENS_GCP_STALE_THRESHOLD"] = strconv.Itoa(singleFlight.GCP.StaleThreshold)
}
}
return envVars
Expand Down
13 changes: 13 additions & 0 deletions pkg/config/singleflight.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ type SingleFlight struct {
Etcd *Etcd
Redis *Redis
RedisSentinel *RedisSentinel
GCP *GCP
}

// Etcd holds client side configuration
Expand Down Expand Up @@ -48,3 +49,15 @@ func DefaultRedisLockConfig() *RedisLockConfig {
MaxRetries: 10,
}
}

// GCP is the configuration for GCP locking.
type GCP struct {
StaleThreshold int `envconfig:"ATHENS_GCP_STALE_THRESHOLD"`
}

// DefaultGCPConfig returns the default GCP locking configuration.
func DefaultGCPConfig() *GCP {
return &GCP{
StaleThreshold: 120,
}
}
21 changes: 19 additions & 2 deletions pkg/stash/with_gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,32 @@ package stash

import (
"context"
"fmt"
"time"

"github.com/gomods/athens/pkg/errors"
"github.com/gomods/athens/pkg/observ"
"github.com/gomods/athens/pkg/storage"
"github.com/gomods/athens/pkg/storage/gcp"
)

// WithGCSLock returns a distributed singleflight
// using a GCS backend. See the config.toml documentation for details.
func WithGCSLock(s Stasher) Stasher {
return &gcsLock{s}
func WithGCSLock(staleThreshold int, s storage.Backend) (Wrapper, error) {
if staleThreshold <= 0 {
return nil, errors.E("stash.WithGCSLock", fmt.Errorf("invalid stale threshold"))
}
// Since we *must* be using a GCP stoagfe backend, we can abuse this
// fact to mutate it, so that we can get our threshold into Save().
// Your instincts are correct, this is kind of gross.
gs, ok := s.(*gcp.Storage)
if !ok {
return nil, errors.E("stash.WithGCSLock", fmt.Errorf("GCP singleflight can only be used with GCP storage"))
}
gs.SetStaleThreshold(time.Duration(staleThreshold) * time.Second)
return func(s Stasher) Stasher {
return &gcsLock{s}
}, nil
}

type gcsLock struct {
Expand Down
79 changes: 78 additions & 1 deletion pkg/stash/with_gcs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package stash
import (
"bytes"
"context"
"fmt"
"io"
"os"
"strings"
Expand All @@ -17,6 +18,12 @@ import (
"golang.org/x/sync/errgroup"
)

type failReader int

func (f *failReader) Read([]byte) (int, error) {
return 0, fmt.Errorf("failure")
}

// TestWithGCS requires a real GCP backend implementation
// and it will ensure that saving to modules at the same time
// is done synchronously so that only the first module gets saved.
Expand All @@ -41,7 +48,11 @@ func TestWithGCS(t *testing.T) {
for i := 0; i < 5; i++ {
content := uuid.New().String()
ms := &mockGCPStasher{strg, content}
s := WithGCSLock(ms)
gs, err := WithGCSLock(120, strg)
if err != nil {
t.Fatal(err)
}
s := gs(ms)
eg.Go(func() error {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
Expand Down Expand Up @@ -79,6 +90,72 @@ func TestWithGCS(t *testing.T) {
}
}

// TestWithGCSPartialFailure equires a real GCP backend implementation
// and ensures that if one of the non-singleflight-lock files fails to
// upload, that the cache does not remain poisoned.
func TestWithGCSPartialFailure(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5)
defer cancel()
const (
mod = "stashmod"
ver = "v1.0.0"
)
strg := getStorage(t)
strg.Delete(ctx, mod, ver)
defer strg.Delete(ctx, mod, ver)

// sanity check
_, err := strg.GoMod(ctx, mod, ver)
if !errors.Is(err, errors.KindNotFound) {
t.Fatalf("expected the stash bucket to return a NotFound error but got: %v", err)
}

content := uuid.New().String()
ms := &mockGCPStasher{strg, content}
fr := new(failReader)
gs, err := WithGCSLock(120, strg)
if err != nil {
t.Fatal(err)
}
s := gs(ms)
// We simulate a failure by manually passing an io.Reader that will fail.
err = ms.strg.Save(ctx, "stashmod", "v1.0.0", []byte(ms.content), fr, []byte(ms.content))
if err == nil {
// We *want* to fail.
t.Fatal(err)
}

// Now try a Stash. This should upload the missing files.
_, err = s.Stash(ctx, "stashmod", "v1.0.0")
if err != nil {
t.Fatal(err)
}

info, err := strg.Info(ctx, mod, ver)
if err != nil {
t.Fatal(err)
}
modContent, err := strg.GoMod(ctx, mod, ver)
if err != nil {
t.Fatal(err)
}
zip, err := strg.Zip(ctx, mod, ver)
if err != nil {
t.Fatal(err)
}
defer zip.Close()
zipContent, err := io.ReadAll(zip)
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(info, modContent) {
t.Fatalf("expected info and go.mod to be equal but info was {%v} and content was {%v}", string(info), string(modContent))
}
if !bytes.Equal(info, zipContent) {
t.Fatalf("expected info and zip to be equal but info was {%v} and content was {%v}", string(info), string(zipContent))
}
}

// mockGCPStasher is like mockStasher
// but leverages in memory storage
// so that redis can determine
Expand Down
5 changes: 3 additions & 2 deletions pkg/storage/gcp/gcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@ import (

// Storage implements the (./pkg/storage).Backend interface.
type Storage struct {
bucket *storage.BucketHandle
timeout time.Duration
bucket *storage.BucketHandle
timeout time.Duration
staleThreshold time.Duration
}

// New returns a new Storage instance backed by a Google Cloud Storage bucket.
Expand Down
Loading

0 comments on commit 0ef761c

Please sign in to comment.