From 03a52e4241f1ade0f905d1e2145c80cab12b740c Mon Sep 17 00:00:00 2001 From: liaochuntao Date: Mon, 18 Sep 2023 10:00:04 +0800 Subject: [PATCH] Fix issue 171 & 172 (#173) --- pkg/config/api.go | 35 ++++ pkg/config/config_file.go | 136 +++++++++++- pkg/config/location_provider.go | 8 +- pkg/flow/configuration/config_flow.go | 22 +- pkg/flow/configuration/file_repo.go | 48 ++++- pkg/flow/configuration/local_cache.go | 195 ++++++++++++++++++ pkg/flow/impl.go | 6 +- pkg/model/util.go | 5 + pkg/plugin/configconnector/config_file.go | 51 ++++- .../polaris/config_connector.go | 16 +- plugin/configfilter/crypto/crypto.go | 107 +++------- polaris.yaml | 12 ++ 12 files changed, 527 insertions(+), 114 deletions(-) create mode 100644 pkg/flow/configuration/local_cache.go diff --git a/pkg/config/api.go b/pkg/config/api.go index dce8cf62..983a48c9 100644 --- a/pkg/config/api.go +++ b/pkg/config/api.go @@ -85,6 +85,8 @@ type ConfigFileConfig interface { GetPropertiesValueCacheSize() int32 // GetPropertiesValueExpireTime 缓存的过期时间,默认为 60s GetPropertiesValueExpireTime() int64 + // GetLocalCache . + GetLocalCache() ConfigLocalCacheConfig } // RateLimitConfig 限流相关配置. @@ -488,6 +490,39 @@ type ServiceSpecificConfig interface { GetServiceRouter() ServiceRouterConfig } +type ConfigLocalCacheConfig interface { + BaseConfig + // IsPersistEnable consumer.localCache.persistEnable + // 是否启用本地缓存 + IsPersistEnable() bool + // SetPersistEnable 设置是否启用本地缓存 + SetPersistEnable(enable bool) + // GetPersistDir consumer.localCache.persistDir + // 本地缓存持久化路径 + GetPersistDir() string + // SetPersistDir 设置本地缓存持久化路径 + SetPersistDir(string) + // GetPersistMaxWriteRetry consumer.localCache.persistMaxWriteRetry + // 缓存最大写重试次数 + GetPersistMaxWriteRetry() int + // SetPersistMaxWriteRetry 设置缓存最大写重试次数 + SetPersistMaxWriteRetry(int) + // GetPersistMaxReadRetry consumer.localCache.persistMaxReadRetry + // 缓存最大读重试次数 + GetPersistMaxReadRetry() int + // SetPersistMaxReadRetry 设置缓存最大读重试次数 + SetPersistMaxReadRetry(int) + // GetPersistRetryInterval consumer.localCache.persistRetryInterval + // 缓存持久化重试间隔 + GetPersistRetryInterval() time.Duration + // SetPersistRetryInterval 设置缓存持久化重试间隔 + SetPersistRetryInterval(time.Duration) + // SetFallbackToLocalCache . + SetFallbackToLocalCache(enable bool) + // IsFallbackToLocalCache . + IsFallbackToLocalCache() bool +} + // ConfigConnectorConfig 配置中心连接相关的配置. type ConfigConnectorConfig interface { ServerConnectorConfig diff --git a/pkg/config/config_file.go b/pkg/config/config_file.go index e10ca7b1..13617600 100644 --- a/pkg/config/config_file.go +++ b/pkg/config/config_file.go @@ -21,9 +21,13 @@ package config import ( "errors" "fmt" + "path/filepath" + "time" "github.com/golang/protobuf/proto" "github.com/hashicorp/go-multierror" + + "github.com/polarismesh/polaris-go/pkg/model" ) // DefaultConfigFileEnable 默认打开配置中心能力 @@ -31,8 +35,9 @@ var DefaultConfigFileEnable = true // ConfigFileConfigImpl 对接配置中心相关配置. type ConfigFileConfigImpl struct { - ConfigConnectorConfig *ConfigConnectorConfigImpl `yaml:"configConnector" json:"configConnector"` - ConfigFilterConfig *ConfigFilterConfigImpl `yaml:"configFilter" json:"configFilter"` + LocalCache *ConfigLocalCacheConfigImpl `yaml:"localCache" json:"localCache"` + ConfigConnectorConfig *ConfigConnectorConfigImpl `yaml:"configConnector" json:"configConnector"` + ConfigFilterConfig *ConfigFilterConfigImpl `yaml:"configFilter" json:"configFilter"` // 是否启动配置中心 Enable *bool `yaml:"enable" json:"enable"` PropertiesValueCacheSize *int32 `yaml:"propertiesValueCacheSize" json:"propertiesValueCacheSize"` @@ -79,6 +84,11 @@ func (c *ConfigFileConfigImpl) SetPropertiesValueExpireTime(propertiesValueExpir c.PropertiesValueExpireTime = &propertiesValueExpireTime } +// GetLocalCache . +func (c *ConfigFileConfigImpl) GetLocalCache() ConfigLocalCacheConfig { + return c.LocalCache +} + // Verify 检验ConfigConnector配置. func (c *ConfigFileConfigImpl) Verify() error { if c == nil { @@ -107,6 +117,7 @@ func (c *ConfigFileConfigImpl) Verify() error { func (c *ConfigFileConfigImpl) SetDefault() { c.ConfigConnectorConfig.SetDefault() c.ConfigFilterConfig.SetDefault() + c.LocalCache.SetDefault() if c.Enable == nil { c.Enable = &DefaultConfigFileEnable } @@ -124,4 +135,125 @@ func (c *ConfigFileConfigImpl) Init() { c.ConfigConnectorConfig.Init() c.ConfigFilterConfig = &ConfigFilterConfigImpl{} c.ConfigFilterConfig.Init() + c.LocalCache = &ConfigLocalCacheConfigImpl{} + c.LocalCache.Init() +} + +// ConfigLocalCacheConfigImpl 本地缓存配置. +type ConfigLocalCacheConfigImpl struct { + // config.localCache.persistDir + // 本地缓存持久化路径 + PersistDir string `yaml:"persistDir" json:"persistDir"` + // 是否启用本地缓存 + PersistEnable *bool `yaml:"persistEnable" json:"persistEnable"` + // config.localCache.persistMaxWriteRetry + PersistMaxWriteRetry int `yaml:"persistMaxWriteRetry" json:"persistMaxWriteRetry"` + // config.localCache.persistReadRetry + PersistMaxReadRetry int `yaml:"persistMaxReadRetry" json:"persistMaxReadRetry"` + // config.localCache.persistRetryInterval + PersistRetryInterval *time.Duration `yaml:"persistRetryInterval" json:"persistRetryInterval"` + // config.localCache.fallbackToLocalCache + FallbackToLocalCache *bool `yaml:"fallbackToLocalCache" json:"fallbackToLocalCache"` +} + +// IsPersistEnable consumer.localCache.persistEnable +// 是否启用本地缓存 +func (l *ConfigLocalCacheConfigImpl) IsPersistEnable() bool { + if l.PersistEnable == nil { + return true + } + return *l.PersistEnable +} + +// SetPersistEnable 设置是否启用本地缓存 +func (l *ConfigLocalCacheConfigImpl) SetPersistEnable(enable bool) { + l.PersistEnable = &enable +} + +// GetPersistDir consumer.localCache.persist.path +// 本地缓存持久化路径. +func (l *ConfigLocalCacheConfigImpl) GetPersistDir() string { + return l.PersistDir +} + +// SetPersistDir 设置本地缓存持久化路径. +func (l *ConfigLocalCacheConfigImpl) SetPersistDir(dir string) { + l.PersistDir = dir +} + +// GetPersistMaxWriteRetry consumer.localCache.persist.maxWriteRetry. +func (l *ConfigLocalCacheConfigImpl) GetPersistMaxWriteRetry() int { + return l.PersistMaxWriteRetry +} + +// SetPersistMaxWriteRetry 设置本地缓存持久化写入失败重试次数. +func (l *ConfigLocalCacheConfigImpl) SetPersistMaxWriteRetry(maxWriteRetry int) { + l.PersistMaxWriteRetry = maxWriteRetry +} + +// GetPersistMaxReadRetry consumer.localCache.persist.maxReadRetry. +func (l *ConfigLocalCacheConfigImpl) GetPersistMaxReadRetry() int { + return l.PersistMaxReadRetry +} + +// SetPersistMaxReadRetry 设置本地缓存持久化读取失败重试次数. +func (l *ConfigLocalCacheConfigImpl) SetPersistMaxReadRetry(maxReadRetry int) { + l.PersistMaxReadRetry = maxReadRetry +} + +// GetPersistRetryInterval consumer.localCache.persist.retryInterval. +func (l *ConfigLocalCacheConfigImpl) GetPersistRetryInterval() time.Duration { + return *l.PersistRetryInterval +} + +// SetPersistRetryInterval 设置本地缓存持久化重试间隔. +func (l *ConfigLocalCacheConfigImpl) SetPersistRetryInterval(interval time.Duration) { + l.PersistRetryInterval = &interval +} + +// SetFallbackToLocalCache . +func (l *ConfigLocalCacheConfigImpl) SetFallbackToLocalCache(enable bool) { + l.FallbackToLocalCache = &enable +} + +// IsFallbackToLocalCache . +func (l *ConfigLocalCacheConfigImpl) IsFallbackToLocalCache() bool { + if l.FallbackToLocalCache == nil { + return true + } + return *l.FallbackToLocalCache +} + +// Verify 检验LocalCacheConfig配置. +func (l *ConfigLocalCacheConfigImpl) Verify() error { + if nil == l { + return errors.New("LocalCacheConfig is nil") + } + return nil +} + +// SetDefault 设置LocalCacheConfig配置的默认值. +func (l *ConfigLocalCacheConfigImpl) SetDefault() { + if l.PersistEnable == nil { + l.PersistEnable = model.ToBoolPtr(true) + } + if l.FallbackToLocalCache == nil { + l.FallbackToLocalCache = model.ToBoolPtr(true) + } + if len(l.PersistDir) == 0 { + l.PersistDir = filepath.Join(DefaultCachePersistDir, "config") + } + if nil == l.PersistRetryInterval { + l.PersistRetryInterval = model.ToDurationPtr(DefaultPersistRetryInterval) + } + if l.PersistMaxReadRetry == 0 { + l.PersistMaxReadRetry = DefaultPersistMaxReadRetry + } + if l.PersistMaxWriteRetry == 0 { + l.PersistMaxWriteRetry = DefaultPersistMaxWriteRetry + } +} + +// Init localche配置初始化. +func (l *ConfigLocalCacheConfigImpl) Init() { } diff --git a/pkg/config/location_provider.go b/pkg/config/location_provider.go index 0ce66743..725ad73f 100644 --- a/pkg/config/location_provider.go +++ b/pkg/config/location_provider.go @@ -26,22 +26,22 @@ type LocationProviderConfigImpl struct { Options map[string]interface{} `yaml:"options" json:"options"` } -func (l LocationProviderConfigImpl) GetType() string { +func (l *LocationProviderConfigImpl) GetType() string { return l.Type } -func (l LocationProviderConfigImpl) GetOptions() map[string]interface{} { +func (l *LocationProviderConfigImpl) GetOptions() map[string]interface{} { return l.Options } -func (l LocationProviderConfigImpl) Verify() error { +func (l *LocationProviderConfigImpl) Verify() error { if l.Type == "" { return errors.New("type is empty") } return nil } -func (l LocationProviderConfigImpl) SetDefault() { +func (l *LocationProviderConfigImpl) SetDefault() { if len(l.Options) == 0 { l.Options = map[string]interface{}{} } diff --git a/pkg/flow/configuration/config_flow.go b/pkg/flow/configuration/config_flow.go index 08897aeb..f1a05def 100644 --- a/pkg/flow/configuration/config_flow.go +++ b/pkg/flow/configuration/config_flow.go @@ -46,13 +46,24 @@ type ConfigFileFlow struct { chain configfilter.Chain configuration config.Configuration + persistHandler *CachePersistHandler + startLongPollingTaskOnce sync.Once } // NewConfigFileFlow 创建配置中心服务 -func NewConfigFileFlow(connector configconnector.ConfigConnector, - chain configfilter.Chain, - configuration config.Configuration) *ConfigFileFlow { +func NewConfigFileFlow(connector configconnector.ConfigConnector, chain configfilter.Chain, + configuration config.Configuration) (*ConfigFileFlow, error) { + persistHandler, err := NewCachePersistHandler( + configuration.GetConfigFile().GetLocalCache().GetPersistDir(), + configuration.GetConfigFile().GetLocalCache().GetPersistMaxWriteRetry(), + configuration.GetConfigFile().GetLocalCache().GetPersistMaxReadRetry(), + configuration.GetConfigFile().GetLocalCache().GetPersistRetryInterval(), + ) + if err != nil { + return nil, err + } + configFileService := &ConfigFileFlow{ connector: connector, chain: chain, @@ -61,9 +72,10 @@ func NewConfigFileFlow(connector configconnector.ConfigConnector, configFileCache: map[string]model.ConfigFile{}, configFilePool: map[string]*ConfigFileRepo{}, notifiedVersion: map[string]uint64{}, + persistHandler: persistHandler, } - return configFileService + return configFileService, nil } // Destroy 销毁服务 @@ -99,7 +111,7 @@ func (c *ConfigFileFlow) GetConfigFile(namespace, fileGroup, fileName string) (m return configFile, nil } - fileRepo, err := newConfigFileRepo(configFileMetadata, c.connector, c.chain, c.configuration) + fileRepo, err := newConfigFileRepo(configFileMetadata, c.connector, c.chain, c.configuration, c.persistHandler) if err != nil { return nil, err } diff --git a/pkg/flow/configuration/file_repo.go b/pkg/flow/configuration/file_repo.go index d9bef45a..ed485081 100644 --- a/pkg/flow/configuration/file_repo.go +++ b/pkg/flow/configuration/file_repo.go @@ -19,6 +19,7 @@ package configuration import ( "fmt" + "net/url" "time" apimodel "github.com/polarismesh/specification/source/go/api/v1/model" @@ -47,6 +48,8 @@ type ConfigFileRepo struct { remoteConfigFile *configconnector.ConfigFile // 从服务端获取的原始配置对象 retryPolicy retryPolicy listeners []ConfigFileRepoChangeListener + + persistHandler *CachePersistHandler } // ConfigFileRepoChangeListener 远程配置文件发布监听器 @@ -56,7 +59,8 @@ type ConfigFileRepoChangeListener func(configFileMetadata model.ConfigFileMetada func newConfigFileRepo(metadata model.ConfigFileMetadata, connector configconnector.ConfigConnector, chain configfilter.Chain, - configuration config.Configuration) (*ConfigFileRepo, error) { + configuration config.Configuration, + persistHandler *CachePersistHandler) (*ConfigFileRepo, error) { repo := &ConfigFileRepo{ connector: connector, chain: chain, @@ -73,6 +77,7 @@ func newConfigFileRepo(metadata model.ConfigFileMetadata, FileName: metadata.GetFileName(), Version: initVersion, }, + persistHandler: persistHandler, } // 1. 同步从服务端拉取配置 if err := repo.pull(); err != nil { @@ -145,7 +150,7 @@ func (r *ConfigFileRepo) pull() error { pulledConfigFileVersion = int64(pulledConfigFile.GetVersion()) } log.GetBaseLogger().Infof("[Config] pull config file finished. config file = %+v, code = %d, version = %d, duration = %d ms", - pulledConfigFile, responseCode, pulledConfigFileVersion, time.Since(startTime).Milliseconds()) + pulledConfigFile.String(), responseCode, pulledConfigFileVersion, time.Since(startTime).Milliseconds()) // 拉取成功 if responseCode == uint32(apimodel.Code_ExecuteSuccess) { @@ -153,6 +158,8 @@ func (r *ConfigFileRepo) pull() error { if r.remoteConfigFile == nil || pulledConfigFile.Version >= r.remoteConfigFile.Version { r.remoteConfigFile = deepCloneConfigFile(pulledConfigFile) r.fireChangeEvent(pulledConfigFile.GetContent()) + // save into local_cache + r.saveCacheConfigFile(pulledConfigFile) } return nil } @@ -161,6 +168,11 @@ func (r *ConfigFileRepo) pull() error { if responseCode == uint32(apimodel.Code_NotFoundResource) { log.GetBaseLogger().Warnf("[Config] config file not found, please check whether config file released. %+v", r.configFileMetadata) // 删除配置文件 + r.removeCacheConfigFile(&configconnector.ConfigFile{ + Namespace: pullConfigFileReq.Namespace, + FileGroup: pullConfigFileReq.FileGroup, + FileName: pullConfigFileReq.FileName, + }) if r.remoteConfigFile != nil { r.remoteConfigFile = nil r.fireChangeEvent(NotExistedFileContent) @@ -179,6 +191,18 @@ func (r *ConfigFileRepo) pull() error { return err } +func (r *ConfigFileRepo) saveCacheConfigFile(file *configconnector.ConfigFile) { + fileName := fmt.Sprintf(PatternService, url.QueryEscape(file.Namespace), url.QueryEscape(file.FileGroup), + url.QueryEscape(file.FileName)) + CacheSuffix + r.persistHandler.SaveMessageToFile(fileName, file) +} + +func (r *ConfigFileRepo) removeCacheConfigFile(file *configconnector.ConfigFile) { + fileName := fmt.Sprintf(PatternService, url.QueryEscape(file.Namespace), url.QueryEscape(file.FileGroup), + url.QueryEscape(file.FileName)) + CacheSuffix + r.persistHandler.DeleteCacheFromFile(fileName) +} + func deepCloneConfigFile(sourceConfigFile *configconnector.ConfigFile) *configconnector.ConfigFile { tags := make([]*configconnector.ConfigFileTag, 0, len(sourceConfigFile.Tags)) for _, tag := range sourceConfigFile.Tags { @@ -187,16 +211,18 @@ func deepCloneConfigFile(sourceConfigFile *configconnector.ConfigFile) *configco Value: tag.Value, }) } - return &configconnector.ConfigFile{ - Namespace: sourceConfigFile.GetNamespace(), - FileGroup: sourceConfigFile.GetFileGroup(), - FileName: sourceConfigFile.GetFileName(), - Content: sourceConfigFile.GetContent(), - Version: sourceConfigFile.GetVersion(), - Md5: sourceConfigFile.GetMd5(), - Encrypted: sourceConfigFile.GetEncrypted(), - Tags: tags, + ret := &configconnector.ConfigFile{ + Namespace: sourceConfigFile.GetNamespace(), + FileGroup: sourceConfigFile.GetFileGroup(), + FileName: sourceConfigFile.GetFileName(), + SourceContent: sourceConfigFile.GetSourceContent(), + Version: sourceConfigFile.GetVersion(), + Md5: sourceConfigFile.GetMd5(), + Encrypted: sourceConfigFile.GetEncrypted(), + Tags: tags, } + ret.SetContent(sourceConfigFile.GetContent()) + return ret } func (r *ConfigFileRepo) onLongPollingNotified(newVersion uint64) { diff --git a/pkg/flow/configuration/local_cache.go b/pkg/flow/configuration/local_cache.go new file mode 100644 index 00000000..d187bfe4 --- /dev/null +++ b/pkg/flow/configuration/local_cache.go @@ -0,0 +1,195 @@ +/** + * Tencent is pleased to support the open source community by making polaris-go available. + * + * Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved. + * + * Licensed under the BSD 3-Clause License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://opensource.org/licenses/BSD-3-Clause + * + * Unless required by applicable law or agreed to in writing, software distributed + * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package configuration + +import ( + "encoding/json" + "fmt" + "io/ioutil" + "os" + "path/filepath" + "time" + + "github.com/golang/protobuf/proto" + "github.com/hashicorp/go-multierror" + + "github.com/polarismesh/polaris-go/pkg/log" + "github.com/polarismesh/polaris-go/pkg/model" +) + +const ( + // PatternService is the pattern of service name + PatternService = "config#%s#%s#%s" + // CacheSuffix filesystem suffix + CacheSuffix = ".json" + // PatternGlob is the pattern of glob + PatternGlob = "#?*#?*#?*" +) + +// CachePersistHandler 持久化工具类 +type CachePersistHandler struct { + persistDir string + maxWriteRetry int + maxReadRetry int + retryInterval time.Duration +} + +// CacheFileInfo 文件信息 +type CacheFileInfo struct { + Msg proto.Message + FileInfo os.FileInfo +} + +// NewCachePersistHandler create persistence handler +func NewCachePersistHandler(persistDir string, maxWriteRetry int, + maxReadRetry int, retryInterval time.Duration) (*CachePersistHandler, error) { + handler := &CachePersistHandler{} + handler.persistDir = persistDir + handler.maxReadRetry = maxReadRetry + handler.maxWriteRetry = maxWriteRetry + handler.retryInterval = retryInterval + if err := handler.init(); err != nil { + return nil, model.NewSDKError(model.ErrCodeAPIInvalidConfig, err, "fail to init cachePersistHandler") + } + return handler, nil +} + +// 持久化配置初始化 +func (cph *CachePersistHandler) init() error { + if err := model.EnsureAndVerifyDir(cph.persistDir); err != nil { + return err + } + return nil +} + +// LoadMessageFromFile 从相对文件中加载缓存 +func (cph *CachePersistHandler) LoadMessageFromFile(relativeFile string, message interface{}) error { + absFile := filepath.Join(cph.persistDir, relativeFile) + return cph.loadMessageFromAbsoluteFile(absFile, message, cph.maxReadRetry) +} + +// 从绝对文件中加载缓存 +func (cph *CachePersistHandler) loadMessageFromAbsoluteFile(cacheFile string, message interface{}, + maxRetry int) error { + log.GetBaseLogger().Infof("Start to load cache from %s", cacheFile) + var lastErr error + var retryTimes int + for retryTimes = 0; retryTimes <= maxRetry; retryTimes++ { + cacheJson, err := ioutil.ReadFile(cacheFile) + if err != nil { + lastErr = model.NewSDKError(model.ErrCodeDiskError, err, "fail to read file cache") + // 文件打开失败的话,重试没有意义,直接失败 + break + } + if err := json.Unmarshal(cacheJson, message); err != nil { + lastErr = multierror.Prefix(err, "Fail to unmarshal file cache: ") + time.Sleep(cph.retryInterval) + // 解码失败可能是读到了部分数据,所以这里可以重试 + continue + } + return nil + } + return multierror.Prefix(lastErr, + fmt.Sprintf("load message from %s failed after retry %d times", cacheFile, retryTimes)) +} + +// DeleteCacheFromFile 删除缓存文件 +func (cph *CachePersistHandler) DeleteCacheFromFile(fileName string) { + fileToDelete := filepath.Join(cph.persistDir, fileName) + log.GetBaseLogger().Infof("Start to delete cache for %s", fileToDelete) + for retryTimes := 0; retryTimes <= cph.maxWriteRetry; retryTimes++ { + err := os.Remove(fileToDelete) + if err != nil { + if !os.IsNotExist(err) { + log.GetBaseLogger().Warnf("Fail to delete cache file %s,"+ + " because %s, next retrytimes %d", fileToDelete, err.Error(), retryTimes) + } else { + log.GetBaseLogger().Warnf("Fail to delete cache file %s,"+ + " error %s is not retryable, stop retrying", fileToDelete, err.Error()) + return + } + } else { + log.GetBaseLogger().Infof("Success to delete cache file %s", fileToDelete) + return + } + time.Sleep(cph.retryInterval) + } +} + +// SaveMessageToFile 按服务来进行缓存存储 +func (cph *CachePersistHandler) SaveMessageToFile(fileName string, svcResp interface{}) { + fileToAdd := filepath.Join(cph.persistDir, fileName) + log.GetBaseLogger().Infof("Start to save cache to file %s", fileToAdd) + msg, err := json.Marshal(svcResp) + if err != nil { + log.GetBaseLogger().Warnf("Fail to marshal the service response for %s", fileToAdd) + return + } + for retryTimes := 0; retryTimes <= cph.maxWriteRetry; retryTimes++ { + err = cph.doWriteFile(fileToAdd, []byte(msg)) + if err != nil { + if retryTimes > 0 { + log.GetBaseLogger().Warnf("Fail to write cache file %s, error: %s,"+ + " retry times: %v", fileToAdd, err.Error(), retryTimes) + } + } else { + log.GetBaseLogger().Infof("Success to write cache file %s", fileToAdd) + return + } + time.Sleep(cph.retryInterval) + } +} + +// 实际写文件 +func (cph *CachePersistHandler) doWriteFile(cacheFile string, msg []byte) error { + tempFileName := cacheFile + ".tmp" + tmpFile, err := os.OpenFile(tempFileName, os.O_WRONLY|os.O_CREATE, 0600) + if err != nil { + return model.NewSDKError(model.ErrCodeDiskError, err, "fail to open file %s to write", tempFileName) + } + n, err := tmpFile.Write(msg) + if err == nil && n < len(msg) { + return model.NewSDKError(model.ErrCodeDiskError, nil, "unable to write all bytes to file %s", tempFileName) + } + if err = cph.closeTmpFile(tmpFile, cacheFile); err != nil { + _ = os.Remove(tempFileName) + return err + } + return nil +} + +// 关闭文件 +func (cph *CachePersistHandler) closeTmpFile(tmpFile *os.File, cacheFile string) error { + if err := tmpFile.Sync(); err != nil { + _ = tmpFile.Close() + return model.NewSDKError(model.ErrCodeDiskError, err, "fail to sync file %s", tmpFile.Name()) + } + if err := tmpFile.Close(); err != nil { + return model.NewSDKError(model.ErrCodeDiskError, err, "fail to close file %s", tmpFile.Name()) + } + if model.PathExist(cacheFile) { + if err := os.Chmod(cacheFile, 0600); err != nil { + return model.NewSDKError(model.ErrCodeDiskError, err, "fail to chmod file %s", cacheFile) + } + } + err := os.Rename(tmpFile.Name(), cacheFile) + if err != nil { + return model.NewSDKError(model.ErrCodeDiskError, err, "fail to rename file %s to %s", tmpFile.Name(), cacheFile) + } + return nil +} diff --git a/pkg/flow/impl.go b/pkg/flow/impl.go index f83fa753..0bf2e6f4 100644 --- a/pkg/flow/impl.go +++ b/pkg/flow/impl.go @@ -180,7 +180,11 @@ func InitFlowEngine(flowEngine *Engine, initContext plugin.InitContext) error { // 初始化配置中心服务 if cfg.GetConfigFile().IsEnable() { - flowEngine.configFileFlow = configuration.NewConfigFileFlow(flowEngine.configConnector, flowEngine.configFilterChain, flowEngine.configuration) + configFileFlow, err := configuration.NewConfigFileFlow(flowEngine.configConnector, flowEngine.configFilterChain, flowEngine.configuration) + if err != nil { + return err + } + flowEngine.configFileFlow = configFileFlow } // 初始注册状态管理器 diff --git a/pkg/model/util.go b/pkg/model/util.go index ef04afb3..6a844721 100644 --- a/pkg/model/util.go +++ b/pkg/model/util.go @@ -151,6 +151,11 @@ func ToDurationPtr(v time.Duration) *time.Duration { return &v } +// ToBoolPtr . +func ToBoolPtr(v bool) *bool { + return &v +} + // ToMilliSeconds 时间转换成毫秒 func ToMilliSeconds(v time.Duration) int64 { return ParseMilliSeconds(v.Nanoseconds()) diff --git a/pkg/plugin/configconnector/config_file.go b/pkg/plugin/configconnector/config_file.go index 3993ef17..e2d7ca66 100644 --- a/pkg/plugin/configconnector/config_file.go +++ b/pkg/plugin/configconnector/config_file.go @@ -18,6 +18,12 @@ package configconnector +import ( + "bytes" + "encoding/json" + "strconv" +) + const ( // ConfigFileTagKeyUseEncrypted 配置加密开关标识,value 为 boolean ConfigFileTagKeyUseEncrypted = "internal-encrypted" @@ -29,15 +35,31 @@ const ( // ConfigFile 配置文件 type ConfigFile struct { - Namespace string - FileGroup string - FileName string - Content string - Version uint64 - Md5 string - Encrypted bool - PublicKey string - Tags []*ConfigFileTag + Namespace string + FileGroup string + FileName string + SourceContent string + Version uint64 + Md5 string + Encrypted bool + PublicKey string + Tags []*ConfigFileTag + + // 实际暴露给应用的配置内容数据 + content string +} + +func (c *ConfigFile) String() string { + var bf bytes.Buffer + _, _ = bf.WriteString("namespace=" + c.Namespace) + _, _ = bf.WriteString("group=" + c.FileGroup) + _, _ = bf.WriteString("file_name=" + c.FileName) + _, _ = bf.WriteString("version=" + strconv.FormatUint(c.Version, 10)) + _, _ = bf.WriteString("encrypt=" + strconv.FormatBool(c.Encrypted)) + //nolint: errchkjson + data, _ := json.Marshal(c.Tags) + _, _ = bf.WriteString("tags=" + string(data)) + return bf.String() } type ConfigFileTag struct { @@ -60,9 +82,18 @@ func (c *ConfigFile) GetFileName() string { return c.FileName } +// GetSourceContent 获取配置文件内容 +func (c *ConfigFile) GetSourceContent() string { + return c.SourceContent +} + +func (c *ConfigFile) SetContent(v string) { + c.content = v +} + // GetContent 获取配置文件内容 func (c *ConfigFile) GetContent() string { - return c.Content + return c.content } // GetVersion 获取配置文件版本号 diff --git a/plugin/configconnector/polaris/config_connector.go b/plugin/configconnector/polaris/config_connector.go index 389733e3..34ac4153 100644 --- a/plugin/configconnector/polaris/config_connector.go +++ b/plugin/configconnector/polaris/config_connector.go @@ -252,14 +252,14 @@ func transferFromClientConfigFileInfo(configFileInfo *config_manage.ClientConfig }) } return &configconnector.ConfigFile{ - Namespace: configFileInfo.GetNamespace().GetValue(), - FileGroup: configFileInfo.GetGroup().GetValue(), - FileName: configFileInfo.GetFileName().GetValue(), - Content: configFileInfo.GetContent().GetValue(), - Version: configFileInfo.GetVersion().GetValue(), - Md5: configFileInfo.GetMd5().GetValue(), - Encrypted: configFileInfo.GetEncrypted().GetValue(), - Tags: tags, + Namespace: configFileInfo.GetNamespace().GetValue(), + FileGroup: configFileInfo.GetGroup().GetValue(), + FileName: configFileInfo.GetFileName().GetValue(), + SourceContent: configFileInfo.GetContent().GetValue(), + Version: configFileInfo.GetVersion().GetValue(), + Md5: configFileInfo.GetMd5().GetValue(), + Encrypted: configFileInfo.GetEncrypted().GetValue(), + Tags: tags, } } diff --git a/plugin/configfilter/crypto/crypto.go b/plugin/configfilter/crypto/crypto.go index 9e5467d0..ef01e51f 100644 --- a/plugin/configfilter/crypto/crypto.go +++ b/plugin/configfilter/crypto/crypto.go @@ -20,7 +20,6 @@ package crypto import ( "fmt" - "sync" "github.com/polarismesh/polaris-go/pkg/config" "github.com/polarismesh/polaris-go/pkg/log" @@ -45,9 +44,8 @@ func init() { // CryptoFilter crypto filter plugin type CryptoFilter struct { *plugin.PluginBase - cfg *Config - cryptos map[string]Crypto - dataKeyCache *sync.Map + cfg *Config + cryptos map[string]Crypto } // Type plugin type @@ -64,7 +62,6 @@ func (c *CryptoFilter) Name() string { func (c *CryptoFilter) Init(ctx *plugin.InitContext) error { c.PluginBase = plugin.NewPluginBase(ctx) c.cryptos = make(map[string]Crypto) - c.dataKeyCache = new(sync.Map) cfgValue := ctx.Config.GetConfigFile().GetConfigFilterConfig().GetPluginConfig(c.Name()) if cfgValue != nil { @@ -100,14 +97,12 @@ func (c *CryptoFilter) IsEnable(cfg config.Configuration) bool { // DoFilter do crypto filter func (c *CryptoFilter) DoFilter(configFile *configconnector.ConfigFile, next configfilter.ConfigFileHandleFunc) configfilter.ConfigFileHandleFunc { return func(configFile *configconnector.ConfigFile) (*configconnector.ConfigFileResponse, error) { - // 查询缓存的数据密钥 - cacheKey := genCacheKey(configFile.Namespace, configFile.FileGroup, configFile.FileName) - cacheEncryptInfo := c.getEncryptInfo(cacheKey) - - var privateKey *rsa.RSAKey - var err error + var ( + privateKey *rsa.RSAKey + err error + ) // 如果是加密配置并且缓存密钥为空 - if configFile.GetEncrypted() && cacheEncryptInfo == nil { + if configFile.GetEncrypted() { // 生成公钥和私钥请求数据密钥 privateKey, err = rsa.GenerateRSAKey() if err != nil { @@ -121,47 +116,34 @@ func (c *CryptoFilter) DoFilter(configFile *configconnector.ConfigFile, next con return resp, err } // 如果是加密配置 - if resp.GetConfigFile().GetEncrypted() && resp.GetConfigFile().GetContent() != "" { - cipherContent := resp.GetConfigFile().GetContent() - cipherDataKey := resp.GetConfigFile().GetDataKey() - encryptAlgo := resp.GetConfigFile().GetEncryptAlgo() - - // 返回了数据密钥,解密配置 - if cipherDataKey != "" && privateKey != nil { - crypto, err := c.GetCrypto(encryptAlgo) - if err != nil { - return nil, err - } - dataKey, err := rsa.DecryptFromBase64(cipherDataKey, privateKey.PrivateKey) - if err != nil { - return nil, err - } - plainContent, err := crypto.Decrypt(cipherContent, dataKey) - if err != nil { - return nil, err - } - resp.ConfigFile.Content = string(plainContent) - // 缓存数据密钥 - c.setEncryptInfo(cacheKey, &encryptInfo{ - Key: dataKey, - Algo: encryptAlgo, - }) - } else if cacheEncryptInfo != nil { - // 有缓存的数据密钥和加密算法 - crypto, err := c.GetCrypto(cacheEncryptInfo.Algo) - if err != nil { - return nil, err - } - plainContent, err := crypto.Decrypt(cipherContent, cacheEncryptInfo.Key) - if err != nil { - return nil, err - } - resp.ConfigFile.Content = string(plainContent) - } else { - // 没有返回数据密钥,设置为加密配置重新请求 - configFile.Encrypted = true - return c.DoFilter(configFile, next)(configFile) + if !resp.GetConfigFile().GetEncrypted() { + // 删除掉之前保存的 token cache + return resp, err + } + cipherContent := resp.GetConfigFile().GetSourceContent() + cipherDataKey := resp.GetConfigFile().GetDataKey() + encryptAlgo := resp.GetConfigFile().GetEncryptAlgo() + + // 返回了数据密钥,解密配置 + if cipherDataKey != "" && privateKey != nil { + crypto, err := c.GetCrypto(encryptAlgo) + if err != nil { + return nil, err + } + dataKey, err := rsa.DecryptFromBase64(cipherDataKey, privateKey.PrivateKey) + if err != nil { + return nil, err + } + plainContent, err := crypto.Decrypt(cipherContent, dataKey) + if err != nil { + return nil, err } + resp.ConfigFile.SetContent(string(plainContent)) + // 缓存数据密钥 + } else { + // 没有返回数据密钥,设置为加密配置重新请求 + configFile.Encrypted = true + return c.DoFilter(configFile, next)(configFile) } return resp, err } @@ -177,27 +159,6 @@ func (c *CryptoFilter) GetCrypto(algo string) (Crypto, error) { return crypto, nil } -type encryptInfo struct { - Key []byte - Algo string -} - -func (c *CryptoFilter) getEncryptInfo(key string) *encryptInfo { - obj, ok := c.dataKeyCache.Load(key) - if ok { - return obj.(*encryptInfo) - } - return nil -} - -func (c *CryptoFilter) setEncryptInfo(key string, value *encryptInfo) { - c.dataKeyCache.Store(key, value) -} - -func genCacheKey(namespace, fileGroup, fileName string) string { - return namespace + separator + fileGroup + separator + fileName -} - // Crypto Crypto interface type Crypto interface { GenerateKey() ([]byte, error) diff --git a/polaris.yaml b/polaris.yaml index 8d510980..c8290feb 100644 --- a/polaris.yaml +++ b/polaris.yaml @@ -338,6 +338,18 @@ config: propertiesValueCacheSize: 100 # 类型转化缓存的过期时间,默认为1分钟 propertiesValueExpireTime: 60000 + # 本地缓存配置 + localCache: + #描述: 配置文件持久化到本地开关 + persistEnable: true + #描述: 配置文件持久化目录,SDK在配置文件变更后,把相关的配置持久化到本地磁盘 + persistDir: ./polaris/backup/config + #描述: 配置文件写盘失败的最大重试次数 + persistMaxWriteRetry: 1 + #描述: 配置文件从磁盘读取失败的最大重试次数 + persistMaxReadRetry: 0 + #描述: 缓存读写磁盘的重试间隔 + persistRetryInterval: 500ms # 连接器配置,默认为北极星服务端 configConnector: id: polaris-config