Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RSDK-8598 - Replace cache after sync #4343

Merged
merged 22 commits into from
Sep 5, 2024
Merged
30 changes: 30 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
package config

import (
"bytes"
"crypto/tls"
"encoding/json"
"fmt"
Expand All @@ -15,6 +16,7 @@ import (
"time"

"github.com/pkg/errors"
"go.viam.com/utils/artifact"
"go.viam.com/utils/jwks"
"go.viam.com/utils/pexec"
"go.viam.com/utils/rpc"
Expand Down Expand Up @@ -68,6 +70,11 @@ type Config struct {

// Revision contains the current revision of the config.
Revision string

// toCache stores the JSON marshalled version of the config to be cached. It should be a copy of
// the config pulled from cloud with minor changes.
// This version is kept because the config is changed as it moves through the system.
toCache []byte
}

// NOTE: This data must be maintained with what is in Config.
Expand Down Expand Up @@ -238,6 +245,29 @@ func (c Config) FindComponent(name string) *resource.Config {
return nil
}

// SetToCache sets toCache with a marshalled copy of the config passed in.
func (c *Config) SetToCache(cfg *Config) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[minor] this method is public to accommodate it's usage in config/watcher_test.go right? or are we planning to use it other packages as well? If the former, I suggest we make this method private and define a public version of it in config/export_test.go. This seems to be a common pattern that the golang standard library uses to export test-only functions that are used across different packages and _test modules.

(This is a general pattern we can use around our code-base to if we like it)

Copy link
Member

@dgottlieb dgottlieb Sep 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think being public here is fine -- even if for the sake of testing. Data tried playing games with keeping things private, but publishing selective things for testing and it became a huge mess.

I think if we want to be aggressive about making things private and paying a code complexity/readability cost elsewhere, we should start making tangible measurements for when a public method creates a problem .

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will leave as is for now

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think being public here is fine -- even if for the sake of testing. Data tried playing games with keeping things private, but publishing selective things for testing and it became a huge mess.

What made it a mess? Was it unclear what methods were actually public vs public for testing only? If so, I think we can clarify that with some simple sign-posts like requiring any test-only method to take a testing.TB interface or something like that. But in any case I don't feel too strongly about this. Happy to leave as is.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What made it a mess? Was it unclear what methods were actually public vs public for testing only?

No, worse. The indirection resulted in a bunch of interfaces. And finding out which method was getting called (in testing and* in production) was impossible without knowing how the magic worked.

If I wanted to pick a single file as the starting point, I'd say it's this file.

What is a DMService? What is a ManagerConstructor? Following the ManagerConstructor definition, what is a datasync.Manager? Is the *datamanger.builtin.builtin type a datamanager.internal.DMService? Is it a DMService if testing is not enabled? Does that even matter?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

woah, that file adds a test-only global? that's a bit confusing...

md, err := json.MarshalIndent(cfg, "", " ")
if err != nil {
return err
}
c.toCache = md
return nil
}

// StoreToCache caches the toCache.
func (c *Config) StoreToCache() error {
if c.toCache == nil {
return errors.New("no unprocessed config to cache")
}
if err := os.MkdirAll(ViamDotDir, 0o700); err != nil {
return err
}
reader := bytes.NewReader(c.toCache)
path := getCloudCacheFilePath(c.Cloud.ID)
return artifact.AtomicStore(path, reader, c.Cloud.ID)
}

// UnmarshalJSON unmarshals JSON into the config and adjusts some
// names if they are not fully filled in.
func (c *Config) UnmarshalJSON(data []byte) error {
Expand Down
22 changes: 2 additions & 20 deletions config/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"github.com/pkg/errors"
apppb "go.viam.com/api/app/v1"
"go.viam.com/utils"
"go.viam.com/utils/artifact"
"go.viam.com/utils/rpc"
"golang.org/x/sys/cpu"

Expand Down Expand Up @@ -114,22 +113,6 @@ func readFromCache(id string) (*Config, error) {
return unprocessedConfig, nil
}

func storeToCache(id string, cfg *Config) error {
if err := os.MkdirAll(ViamDotDir, 0o700); err != nil {
return err
}

md, err := json.MarshalIndent(cfg, "", " ")
if err != nil {
return err
}
reader := bytes.NewReader(md)

path := getCloudCacheFilePath(id)

return artifact.AtomicStore(path, reader, id)
}

func clearCache(id string) {
utils.UncheckedErrorFunc(func() error {
return os.Remove(getCloudCacheFilePath(id))
Expand Down Expand Up @@ -318,10 +301,9 @@ func readFromCloud(
unprocessedConfig.Cloud.TLSCertificate = tls.certificate
unprocessedConfig.Cloud.TLSPrivateKey = tls.privateKey

if err := storeToCache(cloudCfg.ID, unprocessedConfig); err != nil {
logger.Errorw("failed to cache config", "error", err)
if err := cfg.SetToCache(unprocessedConfig); err != nil {
logger.Errorw("failed to set toCache on config", "error", err)
}

return cfg, nil
}

Expand Down
51 changes: 39 additions & 12 deletions config/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package config

import (
"context"
"errors"
"fmt"
"io/fs"
"os"
Expand All @@ -11,6 +10,7 @@ import (
"time"

"github.com/google/uuid"
"github.com/pkg/errors"
pb "go.viam.com/api/app/v1"
"go.viam.com/test"

Expand Down Expand Up @@ -69,7 +69,6 @@ func TestFromReader(t *testing.T) {
appAddress := fmt.Sprintf("http://%s", fakeServer.Addr().String())
cfgText := fmt.Sprintf(`{"cloud":{"id":%q,"app_address":%q,"secret":%q}}`, robotPartID, appAddress, secret)
gotCfg, err := FromReader(ctx, "", strings.NewReader(cfgText), logger)
defer clearCache(robotPartID)
test.That(t, err, test.ShouldBeNil)

expectedCloud := *cloudResponse
Expand All @@ -79,6 +78,8 @@ func TestFromReader(t *testing.T) {
expectedCloud.RefreshInterval = time.Duration(10000000000)
test.That(t, gotCfg.Cloud, test.ShouldResemble, &expectedCloud)

test.That(t, gotCfg.StoreToCache(), test.ShouldBeNil)
defer clearCache(robotPartID)
cachedCfg, err := readFromCache(robotPartID)
test.That(t, err, test.ShouldBeNil)
expectedCloud.AppAddress = ""
Expand All @@ -102,7 +103,10 @@ func TestFromReader(t *testing.T) {
MachineID: "the-machine",
}
cachedConf := &Config{Cloud: cachedCloud}
err := storeToCache(robotPartID, cachedConf)

cfgToCache := &Config{Cloud: &Cloud{ID: robotPartID}}
cfgToCache.SetToCache(cachedConf)
err := cfgToCache.StoreToCache()
test.That(t, err, test.ShouldBeNil)
defer clearCache(robotPartID)

Expand Down Expand Up @@ -153,14 +157,16 @@ func TestFromReader(t *testing.T) {
appAddress := fmt.Sprintf("http://%s", fakeServer.Addr().String())
cfgText := fmt.Sprintf(`{"cloud":{"id":%q,"app_address":%q,"secret":%q}}`, robotPartID, appAddress, secret)
gotCfg, err := FromReader(ctx, "", strings.NewReader(cfgText), logger)
defer clearCache(robotPartID)
test.That(t, err, test.ShouldBeNil)

expectedCloud := *cloudResponse
expectedCloud.AppAddress = appAddress
expectedCloud.RefreshInterval = time.Duration(10000000000)
test.That(t, gotCfg.Cloud, test.ShouldResemble, &expectedCloud)

err = gotCfg.StoreToCache()
defer clearCache(robotPartID)
test.That(t, err, test.ShouldBeNil)
cachedCfg, err := readFromCache(robotPartID)
test.That(t, err, test.ShouldBeNil)
expectedCloud.AppAddress = ""
Expand Down Expand Up @@ -191,13 +197,20 @@ func TestStoreToCache(t *testing.T) {
}
cfg.Cloud = cloud

// store our config to the cloud
err = storeToCache(cfg.Cloud.ID, cfg)
// errors if no unprocessed config to cache
cfgToCache := &Config{Cloud: &Cloud{ID: "forCachingTest"}}
err = cfgToCache.StoreToCache()
test.That(t, err.Error(), test.ShouldContainSubstring, "no unprocessed config to cache")

// store our config to the cache
cfgToCache.SetToCache(cfg)
err = cfgToCache.StoreToCache()
test.That(t, err, test.ShouldBeNil)

// read config from cloud, confirm consistency
cloudCfg, err := readFromCloud(ctx, cfg, nil, true, false, logger)
test.That(t, err, test.ShouldBeNil)
cloudCfg.unprocessedConfig = nil
test.That(t, cloudCfg, test.ShouldResemble, cfg)

// Modify our config
Expand All @@ -207,17 +220,20 @@ func TestStoreToCache(t *testing.T) {
// read config from cloud again, confirm that the cached config differs from cfg
cloudCfg2, err := readFromCloud(ctx, cfg, nil, true, false, logger)
test.That(t, err, test.ShouldBeNil)
test.That(t, cloudCfg2, test.ShouldNotResemble, cfg)
cloudCfg2.unprocessedConfig = nil
test.That(t, cloudCfg2, test.ShouldNotResemble, cfgToCache)

// store the updated config to the cloud
err = storeToCache(cfg.Cloud.ID, cfg)
cfgToCache.SetToCache(cfg)
err = cfgToCache.StoreToCache()
test.That(t, err, test.ShouldBeNil)

test.That(t, cfg.Ensure(true, logger), test.ShouldBeNil)

// read updated cloud config, confirm that it now matches our updated cfg
cloudCfg3, err := readFromCloud(ctx, cfg, nil, true, false, logger)
test.That(t, err, test.ShouldBeNil)
cloudCfg3.unprocessedConfig = nil
test.That(t, cloudCfg3, test.ShouldResemble, cfg)
}

Expand Down Expand Up @@ -304,7 +320,9 @@ func TestReadTLSFromCache(t *testing.T) {
defer clearCache(robotPartID)
cfg.Cloud = nil

err = storeToCache(robotPartID, cfg)
cfgToCache := &Config{Cloud: &Cloud{ID: robotPartID}}
cfgToCache.SetToCache(cfg)
err = cfgToCache.StoreToCache()
test.That(t, err, test.ShouldBeNil)

tls := tlsConfig{}
Expand All @@ -315,11 +333,14 @@ func TestReadTLSFromCache(t *testing.T) {
t.Run("invalid cached TLS", func(t *testing.T) {
defer clearCache(robotPartID)
cloud := &Cloud{
ID: robotPartID,
TLSPrivateKey: "key",
}
cfg.Cloud = cloud

err = storeToCache(robotPartID, cfg)
cfgToCache := &Config{Cloud: &Cloud{ID: robotPartID}}
cfgToCache.SetToCache(cfg)
err = cfgToCache.StoreToCache()
test.That(t, err, test.ShouldBeNil)

tls := tlsConfig{}
Expand All @@ -333,12 +354,15 @@ func TestReadTLSFromCache(t *testing.T) {
t.Run("invalid cached TLS but insecure signaling", func(t *testing.T) {
defer clearCache(robotPartID)
cloud := &Cloud{
ID: robotPartID,
TLSPrivateKey: "key",
SignalingInsecure: true,
}
cfg.Cloud = cloud

err = storeToCache(robotPartID, cfg)
cfgToCache := &Config{Cloud: &Cloud{ID: robotPartID}}
cfgToCache.SetToCache(cfg)
err = cfgToCache.StoreToCache()
test.That(t, err, test.ShouldBeNil)

tls := tlsConfig{}
Expand All @@ -352,12 +376,15 @@ func TestReadTLSFromCache(t *testing.T) {
t.Run("valid cached TLS", func(t *testing.T) {
defer clearCache(robotPartID)
cloud := &Cloud{
ID: robotPartID,
TLSCertificate: "cert",
TLSPrivateKey: "key",
}
cfg.Cloud = cloud

err = storeToCache(robotPartID, cfg)
cfgToCache := &Config{Cloud: &Cloud{ID: robotPartID}}
cfgToCache.SetToCache(cfg)
err = cfgToCache.StoreToCache()
test.That(t, err, test.ShouldBeNil)

// the config is missing several fields required to start the robot, but this
Expand Down
13 changes: 13 additions & 0 deletions config/watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,16 @@ func TestNewWatcherCloud(t *testing.T) {
}},
}

unprocessedFromCfg := func(cfg config.Config) *config.Config {
// the unprocessed config uses the original config read from the cloud,
// and the cloud config is missing a few fields in the proto, meaning a few fields need to be cleared out.
unprocessed, err := cfg.CopyOnlyPublicFields()
test.That(t, err, test.ShouldBeNil)
unprocessed.Cloud.AppAddress = ""
unprocessed.Cloud.RefreshInterval = 10 * time.Second
return unprocessed
}

storeConfigInServer(confToReturn)

watcher, err := config.NewWatcher(context.Background(), &config.Config{Cloud: newCloudConf()}, logger)
Expand All @@ -268,6 +278,7 @@ func TestNewWatcherCloud(t *testing.T) {
confToExpect.Cloud.TLSCertificate = certsToReturn.TLSCertificate
confToExpect.Cloud.TLSPrivateKey = certsToReturn.TLSPrivateKey
test.That(t, confToExpect.Ensure(true, logger), test.ShouldBeNil)
confToExpect.SetToCache(unprocessedFromCfg(confToExpect))

newConf := <-watcher.Config()
test.That(t, newConf, test.ShouldResemble, &confToExpect)
Expand Down Expand Up @@ -305,6 +316,7 @@ func TestNewWatcherCloud(t *testing.T) {
confToExpect.Cloud.TLSCertificate = certsToReturn.TLSCertificate
confToExpect.Cloud.TLSPrivateKey = certsToReturn.TLSPrivateKey
test.That(t, confToExpect.Ensure(true, logger), test.ShouldBeNil)
confToExpect.SetToCache(unprocessedFromCfg(confToExpect))

newConf = <-watcher.Config()
test.That(t, newConf, test.ShouldResemble, &confToExpect)
Expand Down Expand Up @@ -356,6 +368,7 @@ func TestNewWatcherCloud(t *testing.T) {
confToExpect.Cloud.TLSCertificate = certsToReturn.TLSCertificate
confToExpect.Cloud.TLSPrivateKey = certsToReturn.TLSPrivateKey
test.That(t, confToExpect.Ensure(true, logger), test.ShouldBeNil)
confToExpect.SetToCache(unprocessedFromCfg(confToExpect))

newConf = <-watcher.Config()
test.That(t, newConf, test.ShouldResemble, &confToExpect)
Expand Down
16 changes: 14 additions & 2 deletions robot/impl/local_robot.go
Original file line number Diff line number Diff line change
Expand Up @@ -1190,14 +1190,26 @@ func (r *localRobot) reconfigure(ctx context.Context, newConfig *config.Config,
// if anything has changed.
err := r.packageManager.Sync(ctx, newConfig.Packages, newConfig.Modules)
if err != nil {
allErrs = multierr.Combine(allErrs, err)
r.Logger().CErrorw(ctx, "reconfiguration aborted because cloud modules or packages download failed", "error", err)
return
}
// For local tarball modules, we create synthetic versions for package management. The `localRobot` keeps track of these because
// config reader would overwrite if we just stored it in config. Here, we copy the synthetic version from the `localRobot` into the
// appropriate `config.Module` object inside the `cfg.Modules` slice. Thus, when a local tarball module is reloaded, the viam-server
// can unpack it into a fresh directory rather than reusing the previous one.
r.applyLocalModuleVersions(newConfig)
allErrs = multierr.Combine(allErrs, r.localPackages.Sync(ctx, newConfig.Packages, newConfig.Modules))
err = r.localPackages.Sync(ctx, newConfig.Packages, newConfig.Modules)
if err != nil {
r.Logger().CErrorw(ctx, "reconfiguration aborted because local modules or packages sync failed", "error", err)
return
}

if newConfig.Cloud != nil {
r.Logger().CDebug(ctx, "updating cached config")
if err := newConfig.StoreToCache(); err != nil {
r.logger.CErrorw(ctx, "error storing the config", "error", err)
}
}

// Add default services and process their dependencies. Dependencies may
// already come from config validation so we check that here.
Expand Down
Loading