Skip to content

Commit

Permalink
[Refactor] 配置中心重构+支持配置回滚/撤回发布 (polarismesh#1187)
Browse files Browse the repository at this point in the history
  • Loading branch information
chuntaojun authored Aug 4, 2023
1 parent e51daba commit 70d69ce
Show file tree
Hide file tree
Showing 166 changed files with 4,691 additions and 6,970 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/benchmark.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,9 @@ jobs:
steps:
# Setup the environment.
- name: Setup Go
uses: actions/setup-go@v2
uses: actions/setup-go@v4
with:
go-version: 1.19
go-version: "1.20"
# Checkout latest code
- name: Checkout repo
uses: actions/checkout@v2
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/codecov.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ jobs:
steps:
# Setup the environment.
- name: Setup Go
uses: actions/setup-go@v2
uses: actions/setup-go@v4
with:
go-version: 1.19
go-version: "1.20"
# Checkout latest code
- name: Checkout repo
uses: actions/checkout@v2
Expand Down
6 changes: 3 additions & 3 deletions .github/workflows/docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ jobs:
id: get_version
run: echo ::set-output name=VERSION::${GITHUB_REF/refs\/tags\//}

- name: Set up Go
uses: actions/setup-go@v2
- name: Setup Go
uses: actions/setup-go@v4
with:
go-version: 1.19
go-version: "1.20"

- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v1
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/golangci-lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ jobs:
golangci:
strategy:
matrix:
go-version: [ 1.19 ]
go-version: [ "1.20" ]
name: golangci-lint
runs-on: ubuntu-latest
steps:
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/integration-testing-mysql.yml
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,9 @@ jobs:
steps:
# Setup the environment.
- name: Setup Go
uses: actions/setup-go@v2
uses: actions/setup-go@v4
with:
go-version: 1.19
go-version: "1.20"
# Checkout latest code
- name: Checkout repo
uses: actions/checkout@v2
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/integration-testing.yml
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,9 @@ jobs:
steps:
# Setup the environment.
- name: Setup Go
uses: actions/setup-go@v2
uses: actions/setup-go@v4
with:
go-version: 1.19
go-version: "1.20"
# Checkout latest code
- name: Checkout repo
uses: actions/checkout@v2
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ jobs:
uses: actions/checkout@v2

- name: Setup Go
uses: actions/setup-go@v2
uses: actions/setup-go@v4
with:
go-version: 1.19
go-version: "1.20"

- name: Get version
id: get_version
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/standalone.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ jobs:
ref: ${{ github.event.inputs.server_version }}

- name: Setup Go
uses: actions/setup-go@v2
uses: actions/setup-go@v4
with:
go-version: 1.19
go-version: "1.20"

- name: Build
id: build
Expand Down
23 changes: 22 additions & 1 deletion admin/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,30 @@

package admin

import "github.com/polarismesh/polaris/admin/job"
import (
"github.com/polarismesh/polaris/admin/job"
)

// Config maintain configuration
type Config struct {
Jobs []job.JobConfig `yaml:"jobs"`
}

func DefaultConfig() *Config {
return &Config{
Jobs: []job.JobConfig{
{
Name: "CleanDeletedClients",
Enable: true,
},
{
Name: "CleanDeletedInstances",
Enable: true,
},
{
Name: "CleanConfigReleaseHistory",
Enable: true,
},
},
}
}
78 changes: 78 additions & 0 deletions admin/job/history_clean.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/**
* Tencent is pleased to support the open source community by making Polaris available.
*
* Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package job

import (
"time"

"github.com/mitchellh/mapstructure"

"github.com/polarismesh/polaris/store"
)

// 默认保存配置发布天数
const defaultHistoryRetentionDays = 7 * 24 * time.Hour

type CleanConfigFileHistoryJobConfig struct {
RetentionDays time.Duration `mapstructure:"retentionDays"`
BatchSize uint64 `mapstructure:"batchSize"`
}

type cleanConfigFileHistoryJob struct {
cfg *CleanConfigFileHistoryJobConfig
storage store.Store
}

func (job *cleanConfigFileHistoryJob) init(raw map[string]interface{}) error {
cfg := &CleanConfigFileHistoryJobConfig{
RetentionDays: defaultHistoryRetentionDays,
BatchSize: 1000,
}
decodeConfig := &mapstructure.DecoderConfig{
DecodeHook: mapstructure.StringToTimeDurationHookFunc(),
Result: cfg,
}
decoder, err := mapstructure.NewDecoder(decodeConfig)
if err != nil {
log.Errorf("[Maintain][Job][cleanConfigFileHistoryJob] new config decoder err: %v", err)
return err
}
if err = decoder.Decode(raw); err != nil {
log.Errorf("[Maintain][Job][cleanConfigFileHistoryJob] parse config err: %v", err)
return err
}
if cfg.RetentionDays < time.Minute {
cfg.RetentionDays = time.Minute
}
job.cfg = cfg
return nil
}

func (job *cleanConfigFileHistoryJob) execute() {
endTime := time.Now().Add(-1 * job.cfg.RetentionDays)
if err := job.storage.CleanConfigFileReleaseHistory(endTime, job.cfg.BatchSize); err != nil {
log.Errorf("[Maintain][Job][cleanConfigFileHistoryJob] execute err: %v", err)
}
}

func (job *cleanConfigFileHistoryJob) interval() time.Duration {
return time.Minute
}

func (job *cleanConfigFileHistoryJob) clear() {
}
2 changes: 2 additions & 0 deletions admin/job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ func NewMaintainJobs(namingServer service.DiscoverServer, cacheMgn *cache.CacheM
storage: storage},
"CleanDeletedClients": &cleanDeletedClientsJob{
storage: storage},
"CleanConfigReleaseHistory": &cleanConfigFileHistoryJob{
storage: storage},
},
startedJobs: map[string]maintainJob{},
storage: storage,
Expand Down
1 change: 1 addition & 0 deletions apiserver/apiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ const (
DiscoverAccess string = "discover"
RegisterAccess string = "register"
HealthcheckAccess string = "healthcheck"
CreateFileAccess string = "createfile"
)

// Config API服务器配置
Expand Down
6 changes: 4 additions & 2 deletions apiserver/eurekaserver/applications.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,9 +186,11 @@ func buildHashCode(version string, hashBuilder map[string]int, newApps *Applicat

func parseStatus(instance *apiservice.Instance) string {
if instance.GetIsolate().GetValue() {
status, ok := instance.Metadata[InternalMetadataStatus]
if ok {
return status
}
return StatusOutOfService
} else if !instance.GetHealthy().GetValue() {
return StatusDown
}
return StatusUp
}
Expand Down
17 changes: 17 additions & 0 deletions apiserver/eurekaserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ const (
MetadataReplicate = "internal-eureka-replicate"
MetadataInstanceId = "internal-eureka-instance-id"

InternalMetadataStatus = "internal-eureka-status"
InternalMetadataOverriddenStatus = "internal-eureka-overriddenStatus"

ServerEureka = "eureka"

KeyRegion = "region"
Expand Down Expand Up @@ -125,6 +128,7 @@ var (
type EurekaServer struct {
server *http.Server
namingServer service.DiscoverServer
originDiscoverSvr service.DiscoverServer
healthCheckServer *healthcheck.Server
connLimitConfig *connlimit.Config
tlsInfo *secure.TLSInfo
Expand Down Expand Up @@ -322,6 +326,12 @@ func (h *EurekaServer) Run(errCh chan error) {
errCh <- err
return
}
h.originDiscoverSvr, err = service.GetOriginServer()
if err != nil {
eurekalog.Errorf("%v", err)
errCh <- err
return
}
h.healthCheckServer, err = healthcheck.GetServer()
if err != nil {
eurekalog.Errorf("%v", err)
Expand Down Expand Up @@ -395,9 +405,16 @@ func (h *EurekaServer) createRestfulContainer() (*restful.Container, error) {
wsContainer.Add(h.GetEurekaV2Server())
wsContainer.Add(h.GetEurekaV1Server())
wsContainer.Add(h.GetEurekaServer())
wsContainer.RecoverHandler(h.recoverFunc)
return wsContainer, nil
}

func (h *EurekaServer) recoverFunc(i interface{}, w http.ResponseWriter) {
eurekalog.Errorf("panic %+v", i)
w.WriteHeader(http.StatusInternalServerError)
w.Header().Add(restful.HEADER_ContentType, restful.MIME_JSON)
}

// process 在接收和回复时统一处理请求
func (h *EurekaServer) process(req *restful.Request, rsp *restful.Response, chain *restful.FilterChain) {
func() {
Expand Down
27 changes: 20 additions & 7 deletions apiserver/eurekaserver/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,9 @@ func convertEurekaInstance(
targetInstance.Metadata[MetadataInsecurePortEnabled] = strconv.FormatBool(insecureEnable)
targetInstance.Metadata[MetadataSecurePort] = strconv.Itoa(securePort)
targetInstance.Metadata[MetadataSecurePortEnabled] = strconv.FormatBool(secureEnable)
// 保存客户端注册时设置的 status 信息,该信息不会随着心跳的变化而调整
targetInstance.Metadata[InternalMetadataStatus] = instance.Status
targetInstance.Metadata[InternalMetadataOverriddenStatus] = instance.OverriddenStatus
return targetInstance
}

Expand Down Expand Up @@ -244,18 +247,29 @@ func (h *EurekaServer) deregisterInstance(

func (h *EurekaServer) updateStatus(
ctx context.Context, namespace string, appId string, instanceId string, status string, replicated bool) uint32 {
var isolated = false
if status != StatusUp {
isolated = true
}
ctx = context.WithValue(
ctx, model.CtxEventKeyMetadata, map[string]string{
MetadataReplicate: strconv.FormatBool(replicated),
MetadataInstanceId: instanceId,
})
instanceId = checkOrBuildNewInstanceIdByNamespace(namespace, h.namespace, appId, instanceId, h.generateUniqueInstId)
resp := h.namingServer.UpdateInstance(ctx, &apiservice.Instance{
Id: &wrappers.StringValue{Value: instanceId}, Isolate: &wrappers.BoolValue{Value: isolated}})

saveIns, err := h.originDiscoverSvr.Cache().GetStore().GetInstance(instanceId)
if err != nil {
return uint32(apimodel.Code_StoreLayerException)
}

metadata := saveIns.Metadata()
metadata[InternalMetadataStatus] = status
isolated := status != StatusUp

updateIns := &apiservice.Instance{
Id: &wrappers.StringValue{Value: instanceId},
Isolate: &wrappers.BoolValue{Value: isolated},
Metadata: metadata,
}

resp := h.namingServer.UpdateInstance(ctx, updateIns)
return resp.GetCode().GetValue()
}

Expand All @@ -274,7 +288,6 @@ func (h *EurekaServer) renew(ctx context.Context, namespace string, appId string
if code == api.HeartbeatOnDisabledIns {
return api.ExecuteSuccess
}

return code
}

Expand Down
17 changes: 17 additions & 0 deletions apiserver/grpcserver/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,13 @@ func (b *BaseGrpcServer) unaryInterceptor(ctx context.Context, req interface{},
rsp = api.NewResponse(apimodel.Code(code))
return
}
defer func() {
if panicInfo := recover(); panicInfo != nil {
// just log
b.log.Errorf("panic %+v", panicInfo)
}
}()

rsp, err = handler(ctx, req)
}()

Expand All @@ -238,6 +245,10 @@ func (b *BaseGrpcServer) unaryInterceptor(ctx context.Context, req interface{},
return
}

func (b *BaseGrpcServer) recoverFunc(i interface{}, w http.ResponseWriter) {

}

func (b *BaseGrpcServer) streamInterceptor(srv interface{}, ss grpc.ServerStream,
info *grpc.StreamServerInfo, handler grpc.StreamHandler) (err error) {
stream := newVirtualStream(ss.Context(),
Expand All @@ -248,6 +259,12 @@ func (b *BaseGrpcServer) streamInterceptor(srv interface{}, ss grpc.ServerStream
WithVirtualStreamPostProcessFunc(b.postprocess),
)

defer func() {
if panicInfo := recover(); err != nil {
b.log.Errorf("panic %+v", panicInfo)
}
}()

err = handler(srv, stream)
if err != nil {
fromError, ok := status.FromError(err)
Expand Down
7 changes: 7 additions & 0 deletions apiserver/grpcserver/config/client_access.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,10 @@ func (g *ConfigGRPCServer) WatchConfigFiles(ctx context.Context,
}
return callback(), nil
}

func (g *ConfigGRPCServer) GetConfigFileMetadataList(ctx context.Context,
req *apiconfig.ConfigFileGroupRequest) (*apiconfig.ConfigClientListResponse, error) {

ctx = grpcserver.ConvertContext(ctx)
return g.configServer.GetConfigFileNamesWithCache(ctx, req), nil
}
Loading

0 comments on commit 70d69ce

Please sign in to comment.