Skip to content

Commit

Permalink
feat:xdsv3 support envoy odcds (polarismesh#1304)
Browse files Browse the repository at this point in the history
  • Loading branch information
chuntaojun authored Dec 19, 2023
1 parent 0806c91 commit dfa9741
Show file tree
Hide file tree
Showing 213 changed files with 5,095 additions and 2,884 deletions.
1 change: 0 additions & 1 deletion .github/workflows/benchmark.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ jobs:
sleep 120s
ls -alR
cat ./log/stdout 2>&1
cd ..
ls -lstrh
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/codecov.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ jobs:
mysql -e "ALTER USER '${{ env.MYSQL_DB_USER }}'@'localhost' IDENTIFIED WITH mysql_native_password BY 'root';" -u${{ env.MYSQL_DB_USER }} -p${{ env.MYSQL_DB_PWD }}
# Execute vert check
- name: Vert check
run: bash vert.sh -install && bash vert.sh
# - name: Vert check
# run: bash vert.sh -install && bash vert.sh

- name: Standalone Test
env:
Expand Down
2 changes: 0 additions & 2 deletions .github/workflows/integration-testing-mysql.yml
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,6 @@ jobs:
sleep 120s
ls -alR
cat ./log/stdout 2>&1
cd ..
ls -lstrh
# 先测试普通的集成测试
Expand Down
1 change: 0 additions & 1 deletion .github/workflows/integration-testing.yml
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ jobs:
sleep 120s
ls -alR
cat ./log/stdout 2>&1
cd ..
ls -lstrh
Expand Down
2 changes: 2 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ run:
- pkg/model/pb
- .*~
- test
- "apiserver/nacosserver/v2/pb"

# Which files to skip: they will be analyzed, but issues from them won't be reported.
# Default value is empty list,
Expand All @@ -51,6 +52,7 @@ run:
- ".*_test\\.go$"
- ".*\\.yaml$"
- ".*\\.yml$"
- "apiserver/xdsserverv3/cache/linear.go"


# Main linters configurations.
Expand Down
2 changes: 2 additions & 0 deletions .licenserc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ header: # `header` section is configurations for source codes license header.
- "deploy"
- "release"
- "test/data/xds"
- "apiserver/nacosserver/v2/pb"

# single file
- "LICENSE"
Expand All @@ -74,6 +75,7 @@ header: # `header` section is configurations for source codes license header.
- "**/*.md"
- "**/go.mod"
- "**/go.sum"
- "apiserver/xdsserverv3/cache/linear.go"
comment: on-failure # on what condition license-eye will comment on the pull request, `on-failure`, `always`, `never`.

# license-location-threshold specifies the index threshold where the license header can be located,
Expand Down
10 changes: 3 additions & 7 deletions apiserver/apiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ package apiserver
import (
"context"
"fmt"
"net/http"

"github.com/polarismesh/polaris/common/model"
)

const (
Expand Down Expand Up @@ -61,12 +62,7 @@ type Apiserver interface {

type EnrichApiserver interface {
Apiserver
DebugHandlers() []DebugHandler
}

type DebugHandler struct {
Path string
Handler http.HandlerFunc
DebugHandlers() []model.DebugHandler
}

var (
Expand Down
61 changes: 31 additions & 30 deletions apiserver/eurekaserver/access_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ func createEurekaServerForTest(
if err != nil {
return nil, err
}
// 注册实例信息修改 chain 数据
eurekaSrv.registerInstanceChain()
return eurekaSrv, nil
}

Expand Down Expand Up @@ -88,7 +90,7 @@ func batchBuildInstances(appId string, host string, port int, lease *LeaseInfo,
func batchCreateInstance(t *testing.T, eurekaSvr *EurekaServer, namespace string, instances []*InstanceInfo) {
for _, instance := range instances {
code := eurekaSvr.registerInstances(context.Background(), namespace, instance.AppName, instance, false)
assert.Equal(t, api.ExecuteSuccess, code)
assert.Equal(t, api.ExecuteSuccess, code, fmt.Sprintf("%+v", code))
}
}

Expand Down Expand Up @@ -223,36 +225,34 @@ func Test_EurekaWrite(t *testing.T) {

mockIns := genMockEurekaInstance()

t.Run("RegisterInstance", func(t *testing.T) {
// pretty output must be created and written explicitly
output, err := xml.MarshalIndent(mockIns, " ", " ")
assert.NoError(t, err)

var body bytes.Buffer
_, err = body.Write([]byte(xml.Header))
assert.NoError(t, err)
_, err = body.Write(output)
assert.NoError(t, err)

mockReq := httptest.NewRequest("", fmt.Sprintf("http://127.0.0.1:8761/eureka/v2/apps/%s", mockIns.AppName), &body)
mockReq.Header.Add(restful.HEADER_Accept, restful.MIME_XML)
mockReq.Header.Add(restful.HEADER_ContentType, restful.MIME_XML)
mockRsp := newMockResponseWriter()

restfulReq := restful.NewRequest(mockReq)
injectRestfulReqPathParameters(t, restfulReq, map[string]string{
ParamAppId: mockIns.AppName,
})
// 这里是异步注册
eurekaSrv.RegisterApplication(restfulReq, restful.NewResponse(mockRsp))
assert.Equal(t, http.StatusNoContent, mockRsp.statusCode)
assert.Equal(t, restfulReq.Attribute(statusCodeHeader), uint32(apimodel.Code_ExecuteSuccess))

time.Sleep(5 * time.Second)
saveIns, err := eurekaSrv.originDiscoverSvr.Cache().GetStore().GetInstance(mockIns.InstanceId)
assert.NoError(t, err)
assert.NotNil(t, saveIns)
// pretty output must be created and written explicitly
output, err := xml.MarshalIndent(mockIns, " ", " ")
assert.NoError(t, err)

var body bytes.Buffer
_, err = body.Write([]byte(xml.Header))
assert.NoError(t, err)
_, err = body.Write(output)
assert.NoError(t, err)

mockReq := httptest.NewRequest("", fmt.Sprintf("http://127.0.0.1:8761/eureka/v2/apps/%s", mockIns.AppName), &body)
mockReq.Header.Add(restful.HEADER_Accept, restful.MIME_XML)
mockReq.Header.Add(restful.HEADER_ContentType, restful.MIME_XML)
mockRsp := newMockResponseWriter()

restfulReq := restful.NewRequest(mockReq)
injectRestfulReqPathParameters(t, restfulReq, map[string]string{
ParamAppId: mockIns.AppName,
})
// 这里是异步注册
eurekaSrv.RegisterApplication(restfulReq, restful.NewResponse(mockRsp))
assert.Equal(t, http.StatusNoContent, mockRsp.statusCode)
assert.Equal(t, restfulReq.Attribute(statusCodeHeader), uint32(apimodel.Code_ExecuteSuccess))

time.Sleep(5 * time.Second)
saveIns, err := eurekaSrv.originDiscoverSvr.Cache().GetStore().GetInstance(mockIns.InstanceId)
assert.NoError(t, err)
assert.NotNil(t, saveIns)

t.Run("UpdateStatus", func(t *testing.T) {
t.Run("StatusUnknown", func(t *testing.T) {
Expand All @@ -274,6 +274,7 @@ func Test_EurekaWrite(t *testing.T) {
//
saveIns, err := discoverSuit.Storage.GetInstance(mockIns.InstanceId)
assert.NoError(t, err)
assert.NotNil(t, saveIns)
assert.False(t, saveIns.Isolate())
})

Expand Down
3 changes: 1 addition & 2 deletions apiserver/eurekaserver/applications.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,9 +393,8 @@ func buildDataCenterInfo() *DataCenterInfo {
Clazz: DefaultDciClazz,
Name: customDciName,
}
} else {
return DefaultDataCenterInfo
}
return DefaultDataCenterInfo
}

func buildLocationInfo(instanceInfo *InstanceInfo, instance *apiservice.Instance) {
Expand Down
8 changes: 8 additions & 0 deletions apiserver/eurekaserver/write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/polarismesh/polaris/common/eventhub"
"github.com/polarismesh/polaris/common/model"
"github.com/polarismesh/polaris/common/utils"
"github.com/polarismesh/polaris/service"
"github.com/polarismesh/polaris/store"
"github.com/polarismesh/polaris/store/mock"
testsuit "github.com/polarismesh/polaris/test/suit"
Expand Down Expand Up @@ -120,10 +121,14 @@ func TestEurekaServer_renew(t *testing.T) {
},
}, nil)

mockStore.EXPECT().GetMoreClients(gomock.Any(), gomock.Any()).Return(map[string]*model.Client{}, nil).AnyTimes()
mockStore.EXPECT().GetMoreGrayResouces(gomock.Any(), gomock.Any()).Return([]*model.GrayResource{}, nil).AnyTimes()
mockStore.EXPECT().GetInstancesCountTx(gomock.Any()).AnyTimes().Return(uint32(1), nil)
mockStore.EXPECT().GetUnixSecond(gomock.Any()).AnyTimes().Return(time.Now().Unix(), nil)
mockStore.EXPECT().GetServicesCount().Return(uint32(1), nil).AnyTimes()
mockStore.EXPECT().StartLeaderElection(gomock.Any()).AnyTimes()
mockStore.EXPECT().GetMoreServiceContracts(gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes()
mockStore.EXPECT().GetMoreNamespaces(gomock.Any()).Return(nil, nil).AnyTimes()
mockStore.EXPECT().Destroy().Return(nil)
mockStore.EXPECT().Initialize(gomock.Any()).Return(nil).AnyTimes()
mockStore.EXPECT().Name().Return("eureka_store_test").AnyTimes()
Expand All @@ -135,7 +140,10 @@ func TestEurekaServer_renew(t *testing.T) {
return mockStore
})
eurekaSuit.Initialize(func(conf *testsuit.TestConfig) {
conf.DisableAuth = true
conf.Cache = cache.Config{}
conf.DisableConfig = true
conf.ServiceCacheEntries = service.GetRegisterCaches()
store.TestInjectConfig(store.Config{
Name: "eureka_store_test",
})
Expand Down
128 changes: 121 additions & 7 deletions apiserver/grpcserver/config/client_access.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,33 +20,48 @@ package config
import (
"context"
"fmt"
"io"
"strconv"

apiconfig "github.com/polarismesh/specification/source/go/api/v1/config_manage"
apimodel "github.com/polarismesh/specification/source/go/api/v1/model"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/wrapperspb"

api "github.com/polarismesh/polaris/common/api/v1"
commonlog "github.com/polarismesh/polaris/common/log"
"github.com/polarismesh/polaris/common/metrics"
commontime "github.com/polarismesh/polaris/common/time"
"github.com/polarismesh/polaris/common/utils"
"github.com/polarismesh/polaris/plugin"
)

var (
accesslog = commonlog.GetScopeOrDefaultByName(commonlog.APIServerLoggerName)
)

// GetConfigFile 拉取配置
func (g *ConfigGRPCServer) GetConfigFile(ctx context.Context,
req *apiconfig.ClientConfigFileInfo) (*apiconfig.ConfigClientResponse, error) {
ctx = utils.ConvertGRPCContext(ctx)

startTime := commontime.CurrentMillisecond()
var ret *apiconfig.ConfigClientResponse
defer func() {
plugin.GetStatis().ReportDiscoverCall(metrics.ClientDiscoverMetric{
Action: metrics.ActionGetConfigFile,
ClientIP: utils.ParseClientAddress(ctx),
Namespace: req.GetNamespace().GetValue(),
Resource: fmt.Sprintf("CONFIG_FILE:%s|%s|%d", req.GetGroup().GetValue(),
req.GetFileName().GetValue(), req.GetVersion().GetValue()),
Resource: metrics.ResourceOfConfigFile(req.GetGroup().GetValue(), req.GetFileName().GetValue()),
Timestamp: startTime,
CostTime: commontime.CurrentMillisecond() - startTime,
Revision: strconv.FormatUint(ret.GetConfigFile().GetVersion().GetValue(), 10),
Success: ret.GetCode().GetValue() > uint32(apimodel.Code_DataNoChange),
})
}()
response := g.configServer.GetConfigFileForClient(ctx, req)
return response, nil
ret = g.configServer.GetConfigFileWithCache(ctx, req)
return ret, nil
}

// CreateConfigFile 创建或更新配置
Expand Down Expand Up @@ -90,17 +105,116 @@ func (g *ConfigGRPCServer) GetConfigFileMetadataList(ctx context.Context,
req *apiconfig.ConfigFileGroupRequest) (*apiconfig.ConfigClientListResponse, error) {

startTime := commontime.CurrentMillisecond()
var ret *apiconfig.ConfigClientListResponse
defer func() {
plugin.GetStatis().ReportDiscoverCall(metrics.ClientDiscoverMetric{
Action: metrics.ActionListConfigFiles,
ClientIP: utils.ParseClientAddress(ctx),
Namespace: req.GetConfigFileGroup().GetNamespace().GetValue(),
Resource: fmt.Sprintf("CONFIG_FILE_LIST:%s|%s", req.GetConfigFileGroup().GetName().GetValue(),
req.GetRevision().GetValue()),
Resource: metrics.ResourceOfConfigFileList(req.GetConfigFileGroup().GetName().GetValue()),
Timestamp: startTime,
CostTime: commontime.CurrentMillisecond() - startTime,
Revision: ret.GetRevision().GetValue(),
Success: ret.GetCode().GetValue() > uint32(apimodel.Code_DataNoChange),
})
}()

ctx = utils.ConvertGRPCContext(ctx)
return g.configServer.GetConfigFileNamesWithCache(ctx, req), nil
ret = g.configServer.GetConfigFileNamesWithCache(ctx, req)
return ret, nil
}

func (g *ConfigGRPCServer) Discover(svr apiconfig.PolarisConfigGRPC_DiscoverServer) error {
ctx := utils.ConvertGRPCContext(svr.Context())
clientIP, _ := ctx.Value(utils.StringContext("client-ip")).(string)
clientAddress, _ := ctx.Value(utils.StringContext("client-address")).(string)
requestID, _ := ctx.Value(utils.StringContext("request-id")).(string)
userAgent, _ := ctx.Value(utils.StringContext("user-agent")).(string)
method, _ := grpc.MethodFromServerStream(svr)

for {
in, err := svr.Recv()
if err != nil {
if io.EOF == err {
return nil
}
return err
}

msg := fmt.Sprintf("receive grpc discover request: %s", in.String())
accesslog.Info(msg,
zap.String("type", apiconfig.ConfigDiscoverRequest_ConfigDiscoverRequestType_name[int32(in.Type)]),
zap.String("client-address", clientAddress),
zap.String("user-agent", userAgent),
utils.ZapRequestID(requestID),
)

// 是否允许访问
if ok := g.allowAccess(method); !ok {
resp := api.NewConfigDiscoverResponse(apimodel.Code_ClientAPINotOpen)
if sendErr := svr.Send(resp); sendErr != nil {
return sendErr
}
continue
}

// stream模式,需要对每个包进行检测
if code := g.enterRateLimit(clientIP, method); code != uint32(apimodel.Code_ExecuteSuccess) {
resp := api.NewConfigDiscoverResponse(apimodel.Code(code))
if err = svr.Send(resp); err != nil {
return err
}
continue
}

var out *apiconfig.ConfigDiscoverResponse
var action string
startTime := commontime.CurrentMillisecond()
defer func() {
plugin.GetStatis().ReportDiscoverCall(metrics.ClientDiscoverMetric{
Action: action,
ClientIP: utils.ParseClientAddress(ctx),
Namespace: in.GetConfigFile().GetNamespace().GetValue(),
Resource: metrics.ResourceOfConfigFile(in.GetConfigFile().GetGroup().GetValue(), in.GetConfigFile().GetFileName().GetValue()),
Timestamp: startTime,
CostTime: commontime.CurrentMillisecond() - startTime,
Revision: out.GetRevision(),
Success: out.GetCode() > uint32(apimodel.Code_DataNoChange),
})
}()

switch in.Type {
case apiconfig.ConfigDiscoverRequest_CONFIG_FILE:
action = metrics.ActionGetConfigFile
ret := g.configServer.GetConfigFileWithCache(ctx, &apiconfig.ClientConfigFileInfo{})
out = api.NewConfigDiscoverResponse(apimodel.Code(ret.GetCode().GetValue()))
out.ConfigFile = ret.GetConfigFile()
out.Type = apiconfig.ConfigDiscoverResponse_CONFIG_FILE
out.Revision = strconv.Itoa(int(out.GetConfigFile().GetVersion().GetValue()))
case apiconfig.ConfigDiscoverRequest_CONFIG_FILE_Names:
action = metrics.ActionListConfigFiles
ret := g.configServer.GetConfigFileNamesWithCache(ctx, &apiconfig.ConfigFileGroupRequest{
Revision: wrapperspb.String(in.GetRevision()),
ConfigFileGroup: &apiconfig.ConfigFileGroup{
Namespace: in.GetConfigFile().GetNamespace(),
Name: in.GetConfigFile().GetGroup(),
},
})
out = api.NewConfigDiscoverResponse(apimodel.Code(ret.GetCode().GetValue()))
out.ConfigFileNames = ret.GetConfigFileInfos()
out.Type = apiconfig.ConfigDiscoverResponse_CONFIG_FILE_Names
out.Revision = ret.GetRevision().GetValue()
case apiconfig.ConfigDiscoverRequest_CONFIG_FILE_GROUPS:
action = metrics.ActionListConfigGroups
req := in.GetConfigFile()
req.Md5 = wrapperspb.String(in.GetRevision())
out = g.configServer.GetConfigGroupsWithCache(ctx, req)
default:
out = api.NewConfigDiscoverResponse(apimodel.Code_InvalidDiscoverResource)
}

if err := svr.Send(out); err != nil {
return err
}
}
}
Loading

0 comments on commit dfa9741

Please sign in to comment.