Skip to content

Commit

Permalink
RSDK-8598 - Replace cache after sync (#4343)
Browse files Browse the repository at this point in the history
  • Loading branch information
cheukt authored Sep 5, 2024
1 parent 2201645 commit c4e7142
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 34 deletions.
30 changes: 30 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
package config

import (
"bytes"
"crypto/tls"
"encoding/json"
"fmt"
Expand All @@ -15,6 +16,7 @@ import (
"time"

"github.com/pkg/errors"
"go.viam.com/utils/artifact"
"go.viam.com/utils/jwks"
"go.viam.com/utils/pexec"
"go.viam.com/utils/rpc"
Expand Down Expand Up @@ -68,6 +70,11 @@ type Config struct {

// Revision contains the current revision of the config.
Revision string

// toCache stores the JSON marshalled version of the config to be cached. It should be a copy of
// the config pulled from cloud with minor changes.
// This version is kept because the config is changed as it moves through the system.
toCache []byte
}

// NOTE: This data must be maintained with what is in Config.
Expand Down Expand Up @@ -238,6 +245,29 @@ func (c Config) FindComponent(name string) *resource.Config {
return nil
}

// SetToCache sets toCache with a marshalled copy of the config passed in.
func (c *Config) SetToCache(cfg *Config) error {
md, err := json.MarshalIndent(cfg, "", " ")
if err != nil {
return err
}
c.toCache = md
return nil
}

// StoreToCache caches the toCache.
func (c *Config) StoreToCache() error {
if c.toCache == nil {
return errors.New("no unprocessed config to cache")
}
if err := os.MkdirAll(ViamDotDir, 0o700); err != nil {
return err
}
reader := bytes.NewReader(c.toCache)
path := getCloudCacheFilePath(c.Cloud.ID)
return artifact.AtomicStore(path, reader, c.Cloud.ID)
}

// UnmarshalJSON unmarshals JSON into the config and adjusts some
// names if they are not fully filled in.
func (c *Config) UnmarshalJSON(data []byte) error {
Expand Down
22 changes: 2 additions & 20 deletions config/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"github.com/pkg/errors"
apppb "go.viam.com/api/app/v1"
"go.viam.com/utils"
"go.viam.com/utils/artifact"
"go.viam.com/utils/rpc"
"golang.org/x/sys/cpu"

Expand Down Expand Up @@ -114,22 +113,6 @@ func readFromCache(id string) (*Config, error) {
return unprocessedConfig, nil
}

func storeToCache(id string, cfg *Config) error {
if err := os.MkdirAll(ViamDotDir, 0o700); err != nil {
return err
}

md, err := json.MarshalIndent(cfg, "", " ")
if err != nil {
return err
}
reader := bytes.NewReader(md)

path := getCloudCacheFilePath(id)

return artifact.AtomicStore(path, reader, id)
}

func clearCache(id string) {
utils.UncheckedErrorFunc(func() error {
return os.Remove(getCloudCacheFilePath(id))
Expand Down Expand Up @@ -318,10 +301,9 @@ func readFromCloud(
unprocessedConfig.Cloud.TLSCertificate = tls.certificate
unprocessedConfig.Cloud.TLSPrivateKey = tls.privateKey

if err := storeToCache(cloudCfg.ID, unprocessedConfig); err != nil {
logger.Errorw("failed to cache config", "error", err)
if err := cfg.SetToCache(unprocessedConfig); err != nil {
logger.Errorw("failed to set toCache on config", "error", err)
}

return cfg, nil
}

Expand Down
51 changes: 39 additions & 12 deletions config/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package config

import (
"context"
"errors"
"fmt"
"io/fs"
"os"
Expand All @@ -11,6 +10,7 @@ import (
"time"

"github.com/google/uuid"
"github.com/pkg/errors"
pb "go.viam.com/api/app/v1"
"go.viam.com/test"

Expand Down Expand Up @@ -69,7 +69,6 @@ func TestFromReader(t *testing.T) {
appAddress := fmt.Sprintf("http://%s", fakeServer.Addr().String())
cfgText := fmt.Sprintf(`{"cloud":{"id":%q,"app_address":%q,"secret":%q}}`, robotPartID, appAddress, secret)
gotCfg, err := FromReader(ctx, "", strings.NewReader(cfgText), logger)
defer clearCache(robotPartID)
test.That(t, err, test.ShouldBeNil)

expectedCloud := *cloudResponse
Expand All @@ -79,6 +78,8 @@ func TestFromReader(t *testing.T) {
expectedCloud.RefreshInterval = time.Duration(10000000000)
test.That(t, gotCfg.Cloud, test.ShouldResemble, &expectedCloud)

test.That(t, gotCfg.StoreToCache(), test.ShouldBeNil)
defer clearCache(robotPartID)
cachedCfg, err := readFromCache(robotPartID)
test.That(t, err, test.ShouldBeNil)
expectedCloud.AppAddress = ""
Expand All @@ -102,7 +103,10 @@ func TestFromReader(t *testing.T) {
MachineID: "the-machine",
}
cachedConf := &Config{Cloud: cachedCloud}
err := storeToCache(robotPartID, cachedConf)

cfgToCache := &Config{Cloud: &Cloud{ID: robotPartID}}
cfgToCache.SetToCache(cachedConf)
err := cfgToCache.StoreToCache()
test.That(t, err, test.ShouldBeNil)
defer clearCache(robotPartID)

Expand Down Expand Up @@ -153,14 +157,16 @@ func TestFromReader(t *testing.T) {
appAddress := fmt.Sprintf("http://%s", fakeServer.Addr().String())
cfgText := fmt.Sprintf(`{"cloud":{"id":%q,"app_address":%q,"secret":%q}}`, robotPartID, appAddress, secret)
gotCfg, err := FromReader(ctx, "", strings.NewReader(cfgText), logger)
defer clearCache(robotPartID)
test.That(t, err, test.ShouldBeNil)

expectedCloud := *cloudResponse
expectedCloud.AppAddress = appAddress
expectedCloud.RefreshInterval = time.Duration(10000000000)
test.That(t, gotCfg.Cloud, test.ShouldResemble, &expectedCloud)

err = gotCfg.StoreToCache()
defer clearCache(robotPartID)
test.That(t, err, test.ShouldBeNil)
cachedCfg, err := readFromCache(robotPartID)
test.That(t, err, test.ShouldBeNil)
expectedCloud.AppAddress = ""
Expand Down Expand Up @@ -191,13 +197,20 @@ func TestStoreToCache(t *testing.T) {
}
cfg.Cloud = cloud

// store our config to the cloud
err = storeToCache(cfg.Cloud.ID, cfg)
// errors if no unprocessed config to cache
cfgToCache := &Config{Cloud: &Cloud{ID: "forCachingTest"}}
err = cfgToCache.StoreToCache()
test.That(t, err.Error(), test.ShouldContainSubstring, "no unprocessed config to cache")

// store our config to the cache
cfgToCache.SetToCache(cfg)
err = cfgToCache.StoreToCache()
test.That(t, err, test.ShouldBeNil)

// read config from cloud, confirm consistency
cloudCfg, err := readFromCloud(ctx, cfg, nil, true, false, logger)
test.That(t, err, test.ShouldBeNil)
cloudCfg.toCache = nil
test.That(t, cloudCfg, test.ShouldResemble, cfg)

// Modify our config
Expand All @@ -207,17 +220,20 @@ func TestStoreToCache(t *testing.T) {
// read config from cloud again, confirm that the cached config differs from cfg
cloudCfg2, err := readFromCloud(ctx, cfg, nil, true, false, logger)
test.That(t, err, test.ShouldBeNil)
test.That(t, cloudCfg2, test.ShouldNotResemble, cfg)
cloudCfg2.toCache = nil
test.That(t, cloudCfg2, test.ShouldNotResemble, cfgToCache)

// store the updated config to the cloud
err = storeToCache(cfg.Cloud.ID, cfg)
cfgToCache.SetToCache(cfg)
err = cfgToCache.StoreToCache()
test.That(t, err, test.ShouldBeNil)

test.That(t, cfg.Ensure(true, logger), test.ShouldBeNil)

// read updated cloud config, confirm that it now matches our updated cfg
cloudCfg3, err := readFromCloud(ctx, cfg, nil, true, false, logger)
test.That(t, err, test.ShouldBeNil)
cloudCfg3.toCache = nil
test.That(t, cloudCfg3, test.ShouldResemble, cfg)
}

Expand Down Expand Up @@ -304,7 +320,9 @@ func TestReadTLSFromCache(t *testing.T) {
defer clearCache(robotPartID)
cfg.Cloud = nil

err = storeToCache(robotPartID, cfg)
cfgToCache := &Config{Cloud: &Cloud{ID: robotPartID}}
cfgToCache.SetToCache(cfg)
err = cfgToCache.StoreToCache()
test.That(t, err, test.ShouldBeNil)

tls := tlsConfig{}
Expand All @@ -315,11 +333,14 @@ func TestReadTLSFromCache(t *testing.T) {
t.Run("invalid cached TLS", func(t *testing.T) {
defer clearCache(robotPartID)
cloud := &Cloud{
ID: robotPartID,
TLSPrivateKey: "key",
}
cfg.Cloud = cloud

err = storeToCache(robotPartID, cfg)
cfgToCache := &Config{Cloud: &Cloud{ID: robotPartID}}
cfgToCache.SetToCache(cfg)
err = cfgToCache.StoreToCache()
test.That(t, err, test.ShouldBeNil)

tls := tlsConfig{}
Expand All @@ -333,12 +354,15 @@ func TestReadTLSFromCache(t *testing.T) {
t.Run("invalid cached TLS but insecure signaling", func(t *testing.T) {
defer clearCache(robotPartID)
cloud := &Cloud{
ID: robotPartID,
TLSPrivateKey: "key",
SignalingInsecure: true,
}
cfg.Cloud = cloud

err = storeToCache(robotPartID, cfg)
cfgToCache := &Config{Cloud: &Cloud{ID: robotPartID}}
cfgToCache.SetToCache(cfg)
err = cfgToCache.StoreToCache()
test.That(t, err, test.ShouldBeNil)

tls := tlsConfig{}
Expand All @@ -352,12 +376,15 @@ func TestReadTLSFromCache(t *testing.T) {
t.Run("valid cached TLS", func(t *testing.T) {
defer clearCache(robotPartID)
cloud := &Cloud{
ID: robotPartID,
TLSCertificate: "cert",
TLSPrivateKey: "key",
}
cfg.Cloud = cloud

err = storeToCache(robotPartID, cfg)
cfgToCache := &Config{Cloud: &Cloud{ID: robotPartID}}
cfgToCache.SetToCache(cfg)
err = cfgToCache.StoreToCache()
test.That(t, err, test.ShouldBeNil)

// the config is missing several fields required to start the robot, but this
Expand Down
13 changes: 13 additions & 0 deletions config/watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,16 @@ func TestNewWatcherCloud(t *testing.T) {
}},
}

unprocessedFromCfg := func(cfg config.Config) *config.Config {
// the unprocessed config uses the original config read from the cloud,
// and the cloud config is missing a few fields in the proto, meaning a few fields need to be cleared out.
unprocessed, err := cfg.CopyOnlyPublicFields()
test.That(t, err, test.ShouldBeNil)
unprocessed.Cloud.AppAddress = ""
unprocessed.Cloud.RefreshInterval = 10 * time.Second
return unprocessed
}

storeConfigInServer(confToReturn)

watcher, err := config.NewWatcher(context.Background(), &config.Config{Cloud: newCloudConf()}, logger)
Expand All @@ -268,6 +278,7 @@ func TestNewWatcherCloud(t *testing.T) {
confToExpect.Cloud.TLSCertificate = certsToReturn.TLSCertificate
confToExpect.Cloud.TLSPrivateKey = certsToReturn.TLSPrivateKey
test.That(t, confToExpect.Ensure(true, logger), test.ShouldBeNil)
confToExpect.SetToCache(unprocessedFromCfg(confToExpect))

newConf := <-watcher.Config()
test.That(t, newConf, test.ShouldResemble, &confToExpect)
Expand Down Expand Up @@ -305,6 +316,7 @@ func TestNewWatcherCloud(t *testing.T) {
confToExpect.Cloud.TLSCertificate = certsToReturn.TLSCertificate
confToExpect.Cloud.TLSPrivateKey = certsToReturn.TLSPrivateKey
test.That(t, confToExpect.Ensure(true, logger), test.ShouldBeNil)
confToExpect.SetToCache(unprocessedFromCfg(confToExpect))

newConf = <-watcher.Config()
test.That(t, newConf, test.ShouldResemble, &confToExpect)
Expand Down Expand Up @@ -356,6 +368,7 @@ func TestNewWatcherCloud(t *testing.T) {
confToExpect.Cloud.TLSCertificate = certsToReturn.TLSCertificate
confToExpect.Cloud.TLSPrivateKey = certsToReturn.TLSPrivateKey
test.That(t, confToExpect.Ensure(true, logger), test.ShouldBeNil)
confToExpect.SetToCache(unprocessedFromCfg(confToExpect))

newConf = <-watcher.Config()
test.That(t, newConf, test.ShouldResemble, &confToExpect)
Expand Down
16 changes: 14 additions & 2 deletions robot/impl/local_robot.go
Original file line number Diff line number Diff line change
Expand Up @@ -1190,14 +1190,26 @@ func (r *localRobot) reconfigure(ctx context.Context, newConfig *config.Config,
// if anything has changed.
err := r.packageManager.Sync(ctx, newConfig.Packages, newConfig.Modules)
if err != nil {
allErrs = multierr.Combine(allErrs, err)
r.Logger().CErrorw(ctx, "reconfiguration aborted because cloud modules or packages download failed", "error", err)
return
}
// For local tarball modules, we create synthetic versions for package management. The `localRobot` keeps track of these because
// config reader would overwrite if we just stored it in config. Here, we copy the synthetic version from the `localRobot` into the
// appropriate `config.Module` object inside the `cfg.Modules` slice. Thus, when a local tarball module is reloaded, the viam-server
// can unpack it into a fresh directory rather than reusing the previous one.
r.applyLocalModuleVersions(newConfig)
allErrs = multierr.Combine(allErrs, r.localPackages.Sync(ctx, newConfig.Packages, newConfig.Modules))
err = r.localPackages.Sync(ctx, newConfig.Packages, newConfig.Modules)
if err != nil {
r.Logger().CErrorw(ctx, "reconfiguration aborted because local modules or packages sync failed", "error", err)
return
}

if newConfig.Cloud != nil {
r.Logger().CDebug(ctx, "updating cached config")
if err := newConfig.StoreToCache(); err != nil {
r.logger.CErrorw(ctx, "error storing the config", "error", err)
}
}

// Add default services and process their dependencies. Dependencies may
// already come from config validation so we check that here.
Expand Down

0 comments on commit c4e7142

Please sign in to comment.