Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RSDK-8598 - Replace cache after sync #4343

Merged
merged 22 commits into from
Sep 5, 2024
Merged
30 changes: 30 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
package config

import (
"bytes"
"crypto/tls"
"encoding/json"
"fmt"
Expand All @@ -15,6 +16,7 @@ import (
"time"

"github.com/pkg/errors"
"go.viam.com/utils/artifact"
"go.viam.com/utils/jwks"
"go.viam.com/utils/pexec"
"go.viam.com/utils/rpc"
Expand Down Expand Up @@ -68,6 +70,10 @@ type Config struct {

// Revision contains the current revision of the config.
Revision string

// unprocessedConfig stores the unprocessed version of the config that will be cached.
// This version is kept because the config is changed as it moves through the system.
unprocessedConfig *Config
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[minor] maybe we can shorten this field name - conceptually an "unprocessed" config is the marshaled config struct, right?

Suggested change
// unprocessedConfig stores the unprocessed version of the config that will be cached.
// This version is kept because the config is changed as it moves through the system.
unprocessedConfig *Config
// raw stores the unprocessed version of the config that will be cached.
// This version is kept because the config is changed as it moves through the system.
raw *Config

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed the name to toCache to be clearer, what do you think?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep makes sense

}

// NOTE: This data must be maintained with what is in Config.
Expand Down Expand Up @@ -238,6 +244,30 @@ func (c Config) FindComponent(name string) *resource.Config {
return nil
}

// SetUnprocessedConfig sets unprocessedConfig with a copy of the config passed in.
func (c *Config) SetUnprocessedConfig(cfg *Config) error {
cpy, err := cfg.CopyOnlyPublicFields()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm assuming json.MarshalIndent only writes public fields and therefore we're confident CopyOnlyPublicFields is the only thing stuff we need to pull out into a copy?

Having SetUnprocessedConfig doing the marshalling would be obviously equivalent to me. But happy to just get an OK and not worry about it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep, it's the same.

definitely could marshal earlier as well, went ahead and did that

c.unprocessedConfig = cpy
return err
}

// StoreToCache caches the unprocessedConfig.
func (c *Config) StoreToCache() error {
if c.unprocessedConfig == nil {
return errors.New("no unprocessed config to cache")
}
if err := os.MkdirAll(ViamDotDir, 0o700); err != nil {
return err
}
md, err := json.MarshalIndent(c.unprocessedConfig, "", " ")
if err != nil {
return err
}
reader := bytes.NewReader(md)
path := getCloudCacheFilePath(c.Cloud.ID)
return artifact.AtomicStore(path, reader, c.Cloud.ID)
}

// UnmarshalJSON unmarshals JSON into the config and adjusts some
// names if they are not fully filled in.
func (c *Config) UnmarshalJSON(data []byte) error {
Expand Down
22 changes: 2 additions & 20 deletions config/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"github.com/pkg/errors"
apppb "go.viam.com/api/app/v1"
"go.viam.com/utils"
"go.viam.com/utils/artifact"
"go.viam.com/utils/rpc"
"golang.org/x/sys/cpu"

Expand Down Expand Up @@ -114,22 +113,6 @@ func readFromCache(id string) (*Config, error) {
return unprocessedConfig, nil
}

func storeToCache(id string, cfg *Config) error {
if err := os.MkdirAll(ViamDotDir, 0o700); err != nil {
return err
}

md, err := json.MarshalIndent(cfg, "", " ")
if err != nil {
return err
}
reader := bytes.NewReader(md)

path := getCloudCacheFilePath(id)

return artifact.AtomicStore(path, reader, id)
}

func clearCache(id string) {
utils.UncheckedErrorFunc(func() error {
return os.Remove(getCloudCacheFilePath(id))
Expand Down Expand Up @@ -318,10 +301,9 @@ func readFromCloud(
unprocessedConfig.Cloud.TLSCertificate = tls.certificate
unprocessedConfig.Cloud.TLSPrivateKey = tls.privateKey

if err := storeToCache(cloudCfg.ID, unprocessedConfig); err != nil {
logger.Errorw("failed to cache config", "error", err)
if err := cfg.SetUnprocessedConfig(unprocessedConfig); err != nil {
logger.Errorw("failed to set unprocessed config", "error", err)
}

return cfg, nil
}

Expand Down
51 changes: 39 additions & 12 deletions config/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package config

import (
"context"
"errors"
"fmt"
"io/fs"
"os"
Expand All @@ -11,6 +10,7 @@ import (
"time"

"github.com/google/uuid"
"github.com/pkg/errors"
pb "go.viam.com/api/app/v1"
"go.viam.com/test"

Expand Down Expand Up @@ -69,7 +69,6 @@ func TestFromReader(t *testing.T) {
appAddress := fmt.Sprintf("http://%s", fakeServer.Addr().String())
cfgText := fmt.Sprintf(`{"cloud":{"id":%q,"app_address":%q,"secret":%q}}`, robotPartID, appAddress, secret)
gotCfg, err := FromReader(ctx, "", strings.NewReader(cfgText), logger)
defer clearCache(robotPartID)
test.That(t, err, test.ShouldBeNil)

expectedCloud := *cloudResponse
Expand All @@ -79,6 +78,8 @@ func TestFromReader(t *testing.T) {
expectedCloud.RefreshInterval = time.Duration(10000000000)
test.That(t, gotCfg.Cloud, test.ShouldResemble, &expectedCloud)

test.That(t, gotCfg.StoreToCache(), test.ShouldBeNil)
defer clearCache(robotPartID)
cachedCfg, err := readFromCache(robotPartID)
test.That(t, err, test.ShouldBeNil)
expectedCloud.AppAddress = ""
Expand All @@ -102,7 +103,10 @@ func TestFromReader(t *testing.T) {
MachineID: "the-machine",
}
cachedConf := &Config{Cloud: cachedCloud}
err := storeToCache(robotPartID, cachedConf)

cfgToCache := &Config{Cloud: &Cloud{ID: robotPartID}}
cfgToCache.SetUnprocessedConfig(cachedConf)
err := cfgToCache.StoreToCache()
test.That(t, err, test.ShouldBeNil)
defer clearCache(robotPartID)

Expand Down Expand Up @@ -153,14 +157,16 @@ func TestFromReader(t *testing.T) {
appAddress := fmt.Sprintf("http://%s", fakeServer.Addr().String())
cfgText := fmt.Sprintf(`{"cloud":{"id":%q,"app_address":%q,"secret":%q}}`, robotPartID, appAddress, secret)
gotCfg, err := FromReader(ctx, "", strings.NewReader(cfgText), logger)
defer clearCache(robotPartID)
test.That(t, err, test.ShouldBeNil)

expectedCloud := *cloudResponse
expectedCloud.AppAddress = appAddress
expectedCloud.RefreshInterval = time.Duration(10000000000)
test.That(t, gotCfg.Cloud, test.ShouldResemble, &expectedCloud)

err = gotCfg.StoreToCache()
defer clearCache(robotPartID)
test.That(t, err, test.ShouldBeNil)
cachedCfg, err := readFromCache(robotPartID)
test.That(t, err, test.ShouldBeNil)
expectedCloud.AppAddress = ""
Expand Down Expand Up @@ -191,13 +197,20 @@ func TestStoreToCache(t *testing.T) {
}
cfg.Cloud = cloud

// store our config to the cloud
err = storeToCache(cfg.Cloud.ID, cfg)
// errors if no unprocessed config to cache
cfgToCache := &Config{Cloud: &Cloud{ID: "forCachingTest"}}
err = cfgToCache.StoreToCache()
test.That(t, err.Error(), test.ShouldContainSubstring, "no unprocessed config to cache")

// store our config to the cache
cfgToCache.SetUnprocessedConfig(cfg)
err = cfgToCache.StoreToCache()
test.That(t, err, test.ShouldBeNil)

// read config from cloud, confirm consistency
cloudCfg, err := readFromCloud(ctx, cfg, nil, true, false, logger)
test.That(t, err, test.ShouldBeNil)
cloudCfg.unprocessedConfig = nil
test.That(t, cloudCfg, test.ShouldResemble, cfg)

// Modify our config
Expand All @@ -207,17 +220,20 @@ func TestStoreToCache(t *testing.T) {
// read config from cloud again, confirm that the cached config differs from cfg
cloudCfg2, err := readFromCloud(ctx, cfg, nil, true, false, logger)
test.That(t, err, test.ShouldBeNil)
test.That(t, cloudCfg2, test.ShouldNotResemble, cfg)
cloudCfg2.unprocessedConfig = nil
test.That(t, cloudCfg2, test.ShouldNotResemble, cfgToCache)

// store the updated config to the cloud
err = storeToCache(cfg.Cloud.ID, cfg)
cfgToCache.SetUnprocessedConfig(cfg)
err = cfgToCache.StoreToCache()
test.That(t, err, test.ShouldBeNil)

test.That(t, cfg.Ensure(true, logger), test.ShouldBeNil)

// read updated cloud config, confirm that it now matches our updated cfg
cloudCfg3, err := readFromCloud(ctx, cfg, nil, true, false, logger)
test.That(t, err, test.ShouldBeNil)
cloudCfg3.unprocessedConfig = nil
test.That(t, cloudCfg3, test.ShouldResemble, cfg)
}

Expand Down Expand Up @@ -304,7 +320,9 @@ func TestReadTLSFromCache(t *testing.T) {
defer clearCache(robotPartID)
cfg.Cloud = nil

err = storeToCache(robotPartID, cfg)
cfgToCache := &Config{Cloud: &Cloud{ID: robotPartID}}
cfgToCache.SetUnprocessedConfig(cfg)
err = cfgToCache.StoreToCache()
test.That(t, err, test.ShouldBeNil)

tls := tlsConfig{}
Expand All @@ -315,11 +333,14 @@ func TestReadTLSFromCache(t *testing.T) {
t.Run("invalid cached TLS", func(t *testing.T) {
defer clearCache(robotPartID)
cloud := &Cloud{
ID: robotPartID,
TLSPrivateKey: "key",
}
cfg.Cloud = cloud

err = storeToCache(robotPartID, cfg)
cfgToCache := &Config{Cloud: &Cloud{ID: robotPartID}}
cfgToCache.SetUnprocessedConfig(cfg)
err = cfgToCache.StoreToCache()
test.That(t, err, test.ShouldBeNil)

tls := tlsConfig{}
Expand All @@ -333,12 +354,15 @@ func TestReadTLSFromCache(t *testing.T) {
t.Run("invalid cached TLS but insecure signaling", func(t *testing.T) {
defer clearCache(robotPartID)
cloud := &Cloud{
ID: robotPartID,
TLSPrivateKey: "key",
SignalingInsecure: true,
}
cfg.Cloud = cloud

err = storeToCache(robotPartID, cfg)
cfgToCache := &Config{Cloud: &Cloud{ID: robotPartID}}
cfgToCache.SetUnprocessedConfig(cfg)
err = cfgToCache.StoreToCache()
test.That(t, err, test.ShouldBeNil)

tls := tlsConfig{}
Expand All @@ -352,12 +376,15 @@ func TestReadTLSFromCache(t *testing.T) {
t.Run("valid cached TLS", func(t *testing.T) {
defer clearCache(robotPartID)
cloud := &Cloud{
ID: robotPartID,
TLSCertificate: "cert",
TLSPrivateKey: "key",
}
cfg.Cloud = cloud

err = storeToCache(robotPartID, cfg)
cfgToCache := &Config{Cloud: &Cloud{ID: robotPartID}}
cfgToCache.SetUnprocessedConfig(cfg)
err = cfgToCache.StoreToCache()
test.That(t, err, test.ShouldBeNil)

// the config is missing several fields required to start the robot, but this
Expand Down
13 changes: 13 additions & 0 deletions config/watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,16 @@ func TestNewWatcherCloud(t *testing.T) {
}},
}

unprocessedFromCfg := func(cfg config.Config) *config.Config {
// the unprocessed config uses the original config read from the cloud,
// and the cloud config is missing a few fields in the proto, meaning a few fields need to be cleared out.
unprocessed, err := cfg.CopyOnlyPublicFields()
test.That(t, err, test.ShouldBeNil)
unprocessed.Cloud.AppAddress = ""
unprocessed.Cloud.RefreshInterval = 10 * time.Second
return unprocessed
}

storeConfigInServer(confToReturn)

watcher, err := config.NewWatcher(context.Background(), &config.Config{Cloud: newCloudConf()}, logger)
Expand All @@ -268,6 +278,7 @@ func TestNewWatcherCloud(t *testing.T) {
confToExpect.Cloud.TLSCertificate = certsToReturn.TLSCertificate
confToExpect.Cloud.TLSPrivateKey = certsToReturn.TLSPrivateKey
test.That(t, confToExpect.Ensure(true, logger), test.ShouldBeNil)
confToExpect.SetUnprocessedConfig(unprocessedFromCfg(confToExpect))

newConf := <-watcher.Config()
test.That(t, newConf, test.ShouldResemble, &confToExpect)
Expand Down Expand Up @@ -305,6 +316,7 @@ func TestNewWatcherCloud(t *testing.T) {
confToExpect.Cloud.TLSCertificate = certsToReturn.TLSCertificate
confToExpect.Cloud.TLSPrivateKey = certsToReturn.TLSPrivateKey
test.That(t, confToExpect.Ensure(true, logger), test.ShouldBeNil)
confToExpect.SetUnprocessedConfig(unprocessedFromCfg(confToExpect))

newConf = <-watcher.Config()
test.That(t, newConf, test.ShouldResemble, &confToExpect)
Expand Down Expand Up @@ -356,6 +368,7 @@ func TestNewWatcherCloud(t *testing.T) {
confToExpect.Cloud.TLSCertificate = certsToReturn.TLSCertificate
confToExpect.Cloud.TLSPrivateKey = certsToReturn.TLSPrivateKey
test.That(t, confToExpect.Ensure(true, logger), test.ShouldBeNil)
confToExpect.SetUnprocessedConfig(unprocessedFromCfg(confToExpect))

newConf = <-watcher.Config()
test.That(t, newConf, test.ShouldResemble, &confToExpect)
Expand Down
31 changes: 22 additions & 9 deletions robot/impl/local_robot.go
Original file line number Diff line number Diff line change
Expand Up @@ -1174,13 +1174,6 @@ func (r *localRobot) applyLocalModuleVersions(cfg *config.Config) {
}

func (r *localRobot) reconfigure(ctx context.Context, newConfig *config.Config, forceSync bool) {
r.configRevisionMu.Lock()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the consequence here that we now only report a newer config revision after* package downloading succeeds?

Also I don't see the configRevisionMu being locked. Is that important?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the missing lock is a miss, added it back in.

thinking about it more, the config revision should update at the top of the function so users will know the new config did get pulled and is being processed. In theory, it may make sense to rollback the revision if we exit reconfiguration if download fails, but I also think it's ok to keep the new revision as a sign that the new config did get loaded. whether we revert the revision or not, it will be unclear what state robot reconfiguration is in, so maybe we need to add more information in there

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know @maximpertsov and I talked about how to represent "mixed" state when a new revision is being applied/doesn't apply completely.

so maybe we need to add more information in there

I agree the only path to do better is to add more information. And I agree it's best to do that thinking/work in a separate ticket.

r.configRevision = config.Revision{
Revision: newConfig.Revision,
LastUpdated: time.Now(),
}
r.configRevisionMu.Unlock()

var allErrs error

// Sync Packages before reconfiguring rest of robot and resolving references to any packages
Expand All @@ -1190,14 +1183,34 @@ func (r *localRobot) reconfigure(ctx context.Context, newConfig *config.Config,
// if anything has changed.
err := r.packageManager.Sync(ctx, newConfig.Packages, newConfig.Modules)
if err != nil {
allErrs = multierr.Combine(allErrs, err)
r.Logger().CErrorw(ctx, "reconfiguration aborted because cloud modules or packages download failed", "error", err)
return
}
// For local tarball modules, we create synthetic versions for package management. The `localRobot` keeps track of these because
// config reader would overwrite if we just stored it in config. Here, we copy the synthetic version from the `localRobot` into the
// appropriate `config.Module` object inside the `cfg.Modules` slice. Thus, when a local tarball module is reloaded, the viam-server
// can unpack it into a fresh directory rather than reusing the previous one.
r.applyLocalModuleVersions(newConfig)
allErrs = multierr.Combine(allErrs, r.localPackages.Sync(ctx, newConfig.Packages, newConfig.Modules))
err = r.localPackages.Sync(ctx, newConfig.Packages, newConfig.Modules)
if err != nil {
r.Logger().CErrorw(ctx, "reconfiguration aborted because local modules or packages sync failed", "error", err)
return
}

if newConfig.Cloud != nil {
r.Logger().CDebug(ctx, "updating cached config")
if err := newConfig.StoreToCache(); err != nil {
r.logger.CErrorw(ctx, "error storing the config", "error", err)
}
}

// Update configRevision, as the robot is starting to reconfigure itself.
r.configRevisionMu.Lock()
r.configRevision = config.Revision{
Revision: newConfig.Revision,
LastUpdated: time.Now(),
}
r.configRevisionMu.Unlock()

// Add default services and process their dependencies. Dependencies may
// already come from config validation so we check that here.
Expand Down
Loading