From 370c4d96c90cd8de1a4d84fc3a5fa1e8149102fb Mon Sep 17 00:00:00 2001 From: liaochuntao Date: Mon, 3 Jul 2023 11:04:06 +0800 Subject: [PATCH] feat:support push envoy rls filter xds (#1174) --- .github/workflows/codecov.yaml | 71 +------------ apiserver/eurekaserver/access.go | 92 ++++++++--------- apiserver/eurekaserver/applications.go | 16 +-- apiserver/eurekaserver/delta_worker.go | 4 +- apiserver/eurekaserver/eureka_suit_test.go | 2 +- apiserver/eurekaserver/log.go | 5 +- apiserver/eurekaserver/replicate.go | 19 ++-- apiserver/eurekaserver/replicate_test.go | 4 +- apiserver/eurekaserver/replicate_worker.go | 18 ++-- apiserver/eurekaserver/server.go | 38 +++---- apiserver/eurekaserver/write_test.go | 2 +- apiserver/grpcserver/config/server.go | 2 +- apiserver/grpcserver/discover/server.go | 2 +- apiserver/xdsserverv3/resource/help.go | 99 ++++++++++++++----- apiserver/xdsserverv3/resource/model.go | 26 ++--- apiserver/xdsserverv3/resource/node.go | 36 ++++++- apiserver/xdsserverv3/v1/cluster.go | 11 ++- apiserver/xdsserverv3/v1/listener.go | 40 +++++--- apiserver/xdsserverv3/v1/mtls.go | 6 ++ apiserver/xdsserverv3/v1/sidecar.go | 18 +++- apiserver/xdsserverv3/v2/cds.go | 4 +- apiserver/xdsserverv3/v2/lds.go | 13 ++- apiserver/xdsserverv3/v2/rds.go | 4 +- plugin/healthchecker.go | 4 - plugin/healthchecker/leader/checker_leader.go | 12 --- plugin/healthchecker/memory/checker_memory.go | 10 -- plugin/healthchecker/redis/checker_redis.go | 18 ---- release/cluster/helm/Chart.yaml | 4 +- .../templates/config-polaris-console.yaml | 7 -- .../helm/templates/config-polaris-server.yaml | 4 + release/cluster/helm/values.yaml | 3 + .../kubernetes/02-polaris-server-config.yaml | 2 +- release/conf/polaris-server.yaml | 84 +++++++++++----- service/client_v1.go | 4 +- service/healthcheck/check.go | 32 ++---- service/healthcheck/dispatch.go | 1 - service/healthcheck/server.go | 70 +++++++------ service/healthcheck/test_export.go | 75 +++++--------- service/healthcheck/time_adjust.go | 1 - test/codecov.sh | 78 +++++++++++++++ test/outlier/backend/Dockerfile | 15 +++ test/outlier/backend/main.go | 18 +++- test/outlier/build.sh | 15 ++- test/outlier/frontend/Dockerfile | 15 +++ test/outlier/frontend/main.go | 18 +++- test/outlier/outlier.yaml | 15 +++ test/suit/test_suit.go | 2 + 47 files changed, 612 insertions(+), 427 deletions(-) create mode 100644 test/codecov.sh diff --git a/.github/workflows/codecov.yaml b/.github/workflows/codecov.yaml index b466e210a..ac1e134d3 100644 --- a/.github/workflows/codecov.yaml +++ b/.github/workflows/codecov.yaml @@ -76,81 +76,14 @@ jobs: run: bash vert.sh -install && bash vert.sh - name: Go Test - run: | - # bash coverage.sh - go mod vendor && go test -timeout 120m ./... -v -covermode=count -coverprofile=coverage_1.cover -coverpkg=github.com/polarismesh/polaris/apiserver,github.com/polarismesh/polaris/apiserver/eurekaserver,github.com/polarismesh/polaris/auth/defaultauth,github.com/polarismesh/polaris/service,github.com/polarismesh/polaris/service/batch,github.com/polarismesh/polaris/service/healthcheck,github.com/polarismesh/polaris/cache,github.com/polarismesh/polaris/store/boltdb,github.com/polarismesh/polaris/store/mysql,github.com/polarismesh/polaris/plugin,github.com/polarismesh/polaris/config,github.com/polarismesh/polaris/plugin/healthchecker/leader,github.com/polarismesh/polaris/plugin/healthchecker/memory,github.com/polarismesh/polaris/plugin/healthchecker/redis,github.com/polarismesh/polaris/common/batchjob,github.com/polarismesh/polaris/common/eventhub,github.com/polarismesh/polaris/common/redispool,github.com/polarismesh/polaris/common/timewheel - - - # Run unit tests - - name: Go Test With DB env: MYSQL_DB_USER: root MYSQL_DB_PWD: root run: | - export STORE_MODE=sqldb - export MYSQL_DB_USER=${{ env.MYSQL_DB_USER }} - export MYSQL_DB_PWD=${{ env.MYSQL_DB_PWD }} - - echo "cur STORE MODE=${STORE_MODE}, MYSQL_DB_USER=${MYSQL_DB_USER}, MYSQL_DB_PWD=${MYSQL_DB_PWD}" - # 设置严格模式 - mysql -h127.0.0.1 -P3306 -u${{ env.MYSQL_DB_USER }} -p"${{ env.MYSQL_DB_PWD }}" -e "set sql_mode='STRICT_TRANS_TABLES,NO_ENGINE_SUBSTITUTION'"; - # 清空数据 - mysql -h127.0.0.1 -P3306 -u${{ env.MYSQL_DB_USER }} -p"${{ env.MYSQL_DB_PWD }}" -e "DROP DATABASE IF EXISTS polaris_server"; - # 初始化 polaris 数据库 - mysql -h127.0.0.1 -P3306 -u${{ env.MYSQL_DB_USER }} -p"${{ env.MYSQL_DB_PWD }}" < store/mysql/scripts/polaris_server.sql - # 临时放开 DB 的最大连接数 - mysql -h127.0.0.1 -P3306 -u${{ env.MYSQL_DB_USER }} -p"${{ env.MYSQL_DB_PWD }}" -e "set GLOBAL max_connections = 3000;" - - pushd ./config - go mod vendor && go test -v -timeout 120m -v -covermode=count -coverprofile=coverage_sqldb_2.cover -coverpkg=github.com/polarismesh/polaris/apiserver,github.com/polarismesh/polaris/apiserver/eurekaserver,github.com/polarismesh/polaris/auth/defaultauth,github.com/polarismesh/polaris/service,github.com/polarismesh/polaris/service/batch,github.com/polarismesh/polaris/service/healthcheck,github.com/polarismesh/polaris/cache,github.com/polarismesh/polaris/store/boltdb,github.com/polarismesh/polaris/store/mysql,github.com/polarismesh/polaris/plugin,github.com/polarismesh/polaris/config,github.com/polarismesh/polaris/plugin/healthchecker/leader,github.com/polarismesh/polaris/plugin/healthchecker/memory,github.com/polarismesh/polaris/plugin/healthchecker/redis,github.com/polarismesh/polaris/common/batchjob,github.com/polarismesh/polaris/common/eventhub,github.com/polarismesh/polaris/common/redispool,github.com/polarismesh/polaris/common/timewheel - mv coverage_sqldb_2.cover ../ - popd - - sleep 10s - - # 测试服务、治理 - export STORE_MODE=sqldb - export MYSQL_DB_USER=${{ env.MYSQL_DB_USER }} - export MYSQL_DB_PWD=${{ env.MYSQL_DB_PWD }} - - echo "cur STORE MODE=${STORE_MODE}, MYSQL_DB_USER=${MYSQL_DB_USER}, MYSQL_DB_PWD=${MYSQL_DB_PWD}" - - # 设置严格模式 - mysql -h127.0.0.1 -P3306 -u${{ env.MYSQL_DB_USER }} -p"${{ env.MYSQL_DB_PWD }}" -e "set sql_mode='STRICT_TRANS_TABLES,NO_ENGINE_SUBSTITUTION'"; - # 清空数据 - mysql -h127.0.0.1 -P3306 -u${{ env.MYSQL_DB_USER }} -p"${{ env.MYSQL_DB_PWD }}" -e "DROP DATABASE IF EXISTS polaris_server"; - # 初始化 polaris 数据库 - mysql -h127.0.0.1 -P3306 -u${{ env.MYSQL_DB_USER }} -p"${{ env.MYSQL_DB_PWD }}" < store/mysql/scripts/polaris_server.sql - # 临时放开 DB 的最大连接数 - mysql -h127.0.0.1 -P3306 -u${{ env.MYSQL_DB_USER }} -p"${{ env.MYSQL_DB_PWD }}" -e "set GLOBAL max_connections = 3000;" - - pushd ./service - go mod vendor && go test -v -timeout 120m -v -covermode=count -coverprofile=coverage_sqldb_3.cover -coverpkg=github.com/polarismesh/polaris/apiserver,github.com/polarismesh/polaris/apiserver/eurekaserver,github.com/polarismesh/polaris/auth/defaultauth,github.com/polarismesh/polaris/service,github.com/polarismesh/polaris/service/batch,github.com/polarismesh/polaris/service/healthcheck,github.com/polarismesh/polaris/cache,github.com/polarismesh/polaris/store/boltdb,github.com/polarismesh/polaris/store/mysql,github.com/polarismesh/polaris/plugin,github.com/polarismesh/polaris/config,github.com/polarismesh/polaris/plugin/healthchecker/leader,github.com/polarismesh/polaris/plugin/healthchecker/memory,github.com/polarismesh/polaris/plugin/healthchecker/redis,github.com/polarismesh/polaris/common/batchjob,github.com/polarismesh/polaris/common/eventhub,github.com/polarismesh/polaris/common/redispool,github.com/polarismesh/polaris/common/timewheel - mv coverage_sqldb_3.cover ../ - popd - - sleep 10s - - # 测试鉴权 - export STORE_MODE=sqldb + # bash coverage.sh export MYSQL_DB_USER=${{ env.MYSQL_DB_USER }} export MYSQL_DB_PWD=${{ env.MYSQL_DB_PWD }} - - echo "cur STORE MODE=${STORE_MODE}, MYSQL_DB_USER=${MYSQL_DB_USER}, MYSQL_DB_PWD=${MYSQL_DB_PWD}" - - # 设置严格模式 - mysql -h127.0.0.1 -P3306 -u${{ env.MYSQL_DB_USER }} -p"${{ env.MYSQL_DB_PWD }}" -e "set sql_mode='STRICT_TRANS_TABLES,NO_ENGINE_SUBSTITUTION'"; - # 清空数据 - mysql -h127.0.0.1 -P3306 -u${{ env.MYSQL_DB_USER }} -p"${{ env.MYSQL_DB_PWD }}" -e "DROP DATABASE IF EXISTS polaris_server"; - # 初始化 polaris 数据库 - mysql -h127.0.0.1 -P3306 -u${{ env.MYSQL_DB_USER }} -p"${{ env.MYSQL_DB_PWD }}" < store/mysql/scripts/polaris_server.sql - # 临时放开 DB 的最大连接数 - mysql -h127.0.0.1 -P3306 -u${{ env.MYSQL_DB_USER }} -p"${{ env.MYSQL_DB_PWD }}" -e "set GLOBAL max_connections = 3000;" - - pushd ./auth/defaultauth - go mod vendor && go test -v -timeout 120m -v -covermode=count -coverprofile=coverage_sqldb_1.cover -coverpkg=github.com/polarismesh/polaris/apiserver,github.com/polarismesh/polaris/apiserver/eurekaserver,github.com/polarismesh/polaris/auth/defaultauth,github.com/polarismesh/polaris/service,github.com/polarismesh/polaris/service/batch,github.com/polarismesh/polaris/service/healthcheck,github.com/polarismesh/polaris/cache,github.com/polarismesh/polaris/store/boltdb,github.com/polarismesh/polaris/store/mysql,github.com/polarismesh/polaris/plugin,github.com/polarismesh/polaris/config,github.com/polarismesh/polaris/plugin/healthchecker/leader,github.com/polarismesh/polaris/plugin/healthchecker/memory,github.com/polarismesh/polaris/plugin/healthchecker/redis,github.com/polarismesh/polaris/common/batchjob,github.com/polarismesh/polaris/common/eventhub,github.com/polarismesh/polaris/common/redispool,github.com/polarismesh/polaris/common/timewheel - mv coverage_sqldb_1.cover ../../ - popd + bash test/codecov.sh - name: Upload Codecov uses: codecov/codecov-action@v3 diff --git a/apiserver/eurekaserver/access.go b/apiserver/eurekaserver/access.go index 7c5af9de0..15c6c9796 100644 --- a/apiserver/eurekaserver/access.go +++ b/apiserver/eurekaserver/access.go @@ -137,7 +137,7 @@ func (h *EurekaServer) GetAllApplications(req *restful.Request, rsp *restful.Res remoteAddr := req.Request.RemoteAddr acceptValue := getParamFromEurekaRequestHeader(req, restful.HEADER_Accept) if err := writeResponse(parseAcceptValue(acceptValue), appsRespCache, req, rsp); nil != err { - log.Errorf("[EurekaServer]fail to write applications, client: %s, err: %v", remoteAddr, err) + eurekalog.Errorf("[EurekaServer]fail to write applications, client: %s, err: %v", remoteAddr, err) } } @@ -155,7 +155,7 @@ func (h *EurekaServer) GetApplication(req *restful.Request, rsp *restful.Respons apps := appsRespCache.AppsResp.Applications app := apps.GetApplication(appId) if app == nil { - log.Errorf("[EurekaServer]service %s not found, client: %s", appId, remoteAddr) + eurekalog.Errorf("[EurekaServer]service %s not found, client: %s", appId, remoteAddr) writePolarisStatusCode(req, api.NotFoundService) writeHeader(http.StatusNotFound, rsp) return @@ -170,7 +170,7 @@ func (h *EurekaServer) GetApplication(req *restful.Request, rsp *restful.Respons output = appResp } if err := writeEurekaResponse(acceptValue, output, req, rsp); nil != err { - log.Errorf("[EurekaServer]fail to write application, client: %s, err: %v", remoteAddr, err) + eurekalog.Errorf("[EurekaServer]fail to write application, client: %s, err: %v", remoteAddr, err) } } @@ -179,7 +179,7 @@ func (h *EurekaServer) GetAppInstance(req *restful.Request, rsp *restful.Respons remoteAddr := req.Request.RemoteAddr appId := readAppIdFromRequest(req) if len(appId) == 0 { - log.Errorf("[EurekaServer] fail to parse request uri, uri: %s, client: %s, err: %s", + eurekalog.Errorf("[EurekaServer] fail to parse request uri, uri: %s, client: %s, err: %s", req.Request.RequestURI, remoteAddr, "service name is empty") writePolarisStatusCode(req, api.InvalidServiceName) writeHeader(http.StatusBadRequest, rsp) @@ -187,7 +187,7 @@ func (h *EurekaServer) GetAppInstance(req *restful.Request, rsp *restful.Respons } instId := req.PathParameter(ParamInstId) if len(instId) == 0 { - log.Errorf("[EUREKA-SERVER] fail to parse request uri, uri: %s, client: %s, err: %s", + eurekalog.Errorf("[EUREKA-SERVER] fail to parse request uri, uri: %s, client: %s, err: %s", req.Request.RequestURI, remoteAddr, "instance id is required") writePolarisStatusCode(req, api.InvalidInstanceID) writeHeader(http.StatusBadRequest, rsp) @@ -198,14 +198,14 @@ func (h *EurekaServer) GetAppInstance(req *restful.Request, rsp *restful.Respons apps := appsRespCache.AppsResp.Applications app := apps.GetApplication(appId) if app == nil { - log.Errorf("[EurekaServer]service %s not found, client: %s", appId, remoteAddr) + eurekalog.Errorf("[EurekaServer]service %s not found, client: %s", appId, remoteAddr) writePolarisStatusCode(req, api.NotFoundService) writeHeader(http.StatusNotFound, rsp) return } ins := app.GetInstance(instId) if ins == nil { - log.Errorf("[EurekaServer]instance %s not found, service: %s, client: %s", instId, appId, remoteAddr) + eurekalog.Errorf("[EurekaServer]instance %s not found, service: %s, client: %s", instId, appId, remoteAddr) writePolarisStatusCode(req, api.NotFoundInstance) writeHeader(http.StatusNotFound, rsp) return @@ -219,7 +219,7 @@ func (h *EurekaServer) GetAppInstance(req *restful.Request, rsp *restful.Respons output = insResp } if err := writeEurekaResponse(acceptValue, output, req, rsp); nil != err { - log.Errorf("[EurekaServer]fail to write instance, client: %s, err: %v", remoteAddr, err) + eurekalog.Errorf("[EurekaServer]fail to write instance, client: %s, err: %v", remoteAddr, err) } } @@ -288,7 +288,7 @@ func (h *EurekaServer) GetDeltaApplications(req *restful.Request, rsp *restful.R remoteAddr := req.Request.RemoteAddr acceptValue := getParamFromEurekaRequestHeader(req, restful.HEADER_Accept) if err := writeResponse(parseAcceptValue(acceptValue), appsRespCache, req, rsp); nil != err { - log.Errorf("[EurekaServer]fail to write delta applications, client: %s, err: %v", remoteAddr, err) + eurekalog.Errorf("[EurekaServer]fail to write delta applications, client: %s, err: %v", remoteAddr, err) } } @@ -317,21 +317,21 @@ func checkRegisterRequest(registrationRequest *RegistrationRequest, req *restful var err error remoteAddr := req.Request.RemoteAddr if nil == registrationRequest.Instance { - log.Errorf("[EUREKA-SERVER] fail to parse register request, uri: %s, client: %s, err: %s", + eurekalog.Errorf("[EUREKA-SERVER] fail to parse register request, uri: %s, client: %s, err: %s", req.Request.RequestURI, remoteAddr, "instance content required") writePolarisStatusCode(req, api.EmptyRequest) writeHeader(http.StatusBadRequest, rsp) return false } if len(registrationRequest.Instance.InstanceId) == 0 && len(registrationRequest.Instance.HostName) == 0 { - log.Errorf("[EUREKA-SERVER] fail to parse register request, uri: %s, client: %s, err: %s", + eurekalog.Errorf("[EUREKA-SERVER] fail to parse register request, uri: %s, client: %s, err: %s", req.Request.RequestURI, remoteAddr, "instance id required") writePolarisStatusCode(req, api.InvalidInstanceID) writeHeader(http.StatusBadRequest, rsp) } err = convertInstancePorts(registrationRequest.Instance) if nil != err { - log.Errorf("[EUREKA-SERVER] fail to parse instance register request, "+ + eurekalog.Errorf("[EUREKA-SERVER] fail to parse instance register request, "+ "invalid port value, client: %s, err: %v", remoteAddr, err) writePolarisStatusCode(req, api.InvalidInstancePort) writeHeader(http.StatusBadRequest, rsp) @@ -345,7 +345,7 @@ func (h *EurekaServer) RegisterApplication(req *restful.Request, rsp *restful.Re appId := readAppIdFromRequest(req) if len(appId) == 0 { - log.Errorf("[EurekaServer] fail to parse request uri, uri: %s, client: %s, err: %s", + eurekalog.Errorf("[EurekaServer] fail to parse request uri, uri: %s, client: %s, err: %s", req.Request.RequestURI, remoteAddr, "service name is empty") writePolarisStatusCode(req, api.InvalidServiceName) writeHeader(http.StatusBadRequest, rsp) @@ -362,7 +362,7 @@ func (h *EurekaServer) RegisterApplication(req *restful.Request, rsp *restful.Re err = req.ReadEntity(registrationRequest) } if nil != err { - log.Errorf("[EUREKA-SERVER] fail to parse instance register request, uri: %s, client: %s, err: %v", + eurekalog.Errorf("[EUREKA-SERVER] fail to parse instance register request, uri: %s, client: %s, err: %v", req.Request.RequestURI, remoteAddr, err) writePolarisStatusCode(req, api.ParseException) writeHeader(http.StatusBadRequest, rsp) @@ -375,7 +375,7 @@ func (h *EurekaServer) RegisterApplication(req *restful.Request, rsp *restful.Re token, err := getAuthFromEurekaRequestHeader(req) if err != nil { - log.Infof("[EUREKA-SERVER]instance (instId=%s, appId=%s) get basic auth info fail, code is %d", + eurekalog.Infof("[EUREKA-SERVER]instance (instId=%s, appId=%s) get basic auth info fail, code is %d", registrationRequest.Instance.InstanceId, appId, api.ExecuteException) writePolarisStatusCode(req, api.ExecuteException) writeHeader(http.StatusUnauthorized, rsp) @@ -385,13 +385,13 @@ func (h *EurekaServer) RegisterApplication(req *restful.Request, rsp *restful.Re ctx := context.WithValue(context.Background(), utils.ContextAuthTokenKey, token) namespace := readNamespaceFromRequest(req, h.namespace) - log.Infof( + eurekalog.Infof( "[EUREKA-SERVER]received instance register request, "+ "client: %s, namespace: %s, instId: %s, appId: %s, ipAddr: %s", remoteAddr, namespace, registrationRequest.Instance.InstanceId, appId, registrationRequest.Instance.IpAddr) code := h.registerInstances(ctx, namespace, appId, registrationRequest.Instance, false) if code == api.ExecuteSuccess || code == api.ExistedResource || code == api.SameInstanceRequest { - log.Infof( + eurekalog.Infof( "[EUREKA-SERVER]instance (namespace=%s, instId=%s, appId=%s) has been registered successfully,"+ " code is %d", namespace, registrationRequest.Instance.InstanceId, appId, code) @@ -399,7 +399,7 @@ func (h *EurekaServer) RegisterApplication(req *restful.Request, rsp *restful.Re writeHeader(http.StatusNoContent, rsp) return } - log.Errorf("[EUREKA-SERVER]instance (namespace=%s, instId=%s, appId=%s) has been registered failed, "+ + eurekalog.Errorf("[EUREKA-SERVER]instance (namespace=%s, instId=%s, appId=%s) has been registered failed, "+ "code is %d", namespace, registrationRequest.Instance.InstanceId, appId, code) writePolarisStatusCode(req, code) @@ -411,7 +411,7 @@ func (h *EurekaServer) UpdateStatus(req *restful.Request, rsp *restful.Response) remoteAddr := req.Request.RemoteAddr appId := readAppIdFromRequest(req) if len(appId) == 0 { - log.Errorf("[EurekaServer] fail to parse request uri, uri: %s, client: %s, err: %s", + eurekalog.Errorf("[EurekaServer] fail to parse request uri, uri: %s, client: %s, err: %s", req.Request.RequestURI, remoteAddr, "service name is empty") writePolarisStatusCode(req, api.InvalidServiceName) writeHeader(http.StatusBadRequest, rsp) @@ -419,7 +419,7 @@ func (h *EurekaServer) UpdateStatus(req *restful.Request, rsp *restful.Response) } instId := req.PathParameter(ParamInstId) if len(instId) == 0 { - log.Errorf("[EUREKA-SERVER] fail to parse request uri, uri: %s, client: %s, err: %s", + eurekalog.Errorf("[EUREKA-SERVER] fail to parse request uri, uri: %s, client: %s, err: %s", req.Request.RequestURI, remoteAddr, "instance id is required") writePolarisStatusCode(req, api.InvalidInstanceID) writeHeader(http.StatusBadRequest, rsp) @@ -427,7 +427,7 @@ func (h *EurekaServer) UpdateStatus(req *restful.Request, rsp *restful.Response) } status := req.QueryParameter(ParamValue) namespace := readNamespaceFromRequest(req, h.namespace) - log.Infof("[EUREKA-SERVER]received instance updateStatus request, "+ + eurekalog.Infof("[EUREKA-SERVER]received instance updateStatus request, "+ "client: %s, namespace: %s, instId: %s, appId: %s, status: %s", remoteAddr, namespace, instId, appId, status) // check status @@ -439,12 +439,12 @@ func (h *EurekaServer) UpdateStatus(req *restful.Request, rsp *restful.Response) code := h.updateStatus(context.Background(), namespace, appId, instId, status, false) writePolarisStatusCode(req, code) if code == api.ExecuteSuccess || code == api.NoNeedUpdate { - log.Infof("[EUREKA-SERVER]instance (namespace=%s, instId=%s, appId=%s) has been updated successfully", + eurekalog.Infof("[EUREKA-SERVER]instance (namespace=%s, instId=%s, appId=%s) has been updated successfully", namespace, instId, appId) writeHeader(http.StatusOK, rsp) return } - log.Errorf("[EUREKA-SERVER]instance ((namespace=%s, instId=%s, appId=%s) has been updated failed, "+ + eurekalog.Errorf("[EUREKA-SERVER]instance ((namespace=%s, instId=%s, appId=%s) has been updated failed, "+ "code is %d", namespace, instId, appId, code) if code == api.NotFoundResource { @@ -459,7 +459,7 @@ func (h *EurekaServer) DeleteStatus(req *restful.Request, rsp *restful.Response) remoteAddr := req.Request.RemoteAddr appId := readAppIdFromRequest(req) if len(appId) == 0 { - log.Errorf("[EurekaServer] fail to parse request uri, uri: %s, client: %s, err: %s", + eurekalog.Errorf("[EurekaServer] fail to parse request uri, uri: %s, client: %s, err: %s", req.Request.RequestURI, remoteAddr, "service name is empty") writePolarisStatusCode(req, api.InvalidServiceName) writeHeader(http.StatusBadRequest, rsp) @@ -467,7 +467,7 @@ func (h *EurekaServer) DeleteStatus(req *restful.Request, rsp *restful.Response) } instId := req.PathParameter(ParamInstId) if len(instId) == 0 { - log.Errorf("[EUREKA-SERVER] fail to parse request uri, uri: %s, client: %s, err: %s", + eurekalog.Errorf("[EUREKA-SERVER] fail to parse request uri, uri: %s, client: %s, err: %s", req.Request.RequestURI, remoteAddr, "instance id is required") writePolarisStatusCode(req, api.InvalidInstanceID) writeHeader(http.StatusBadRequest, rsp) @@ -476,20 +476,20 @@ func (h *EurekaServer) DeleteStatus(req *restful.Request, rsp *restful.Response) namespace := readNamespaceFromRequest(req, h.namespace) - log.Infof("[EUREKA-SERVER]received instance status delete request, "+ + eurekalog.Infof("[EUREKA-SERVER]received instance status delete request, "+ "client: %s,namespace=%s, instId=%s, appId=%s", remoteAddr, namespace, instId, appId) code := h.updateStatus(context.Background(), namespace, appId, instId, StatusUp, false) writePolarisStatusCode(req, code) if code == api.ExecuteSuccess { - log.Infof("[EUREKA-SERVER]instance status (namespace=%s, instId=%s, appId=%s) "+ + eurekalog.Infof("[EUREKA-SERVER]instance status (namespace=%s, instId=%s, appId=%s) "+ "has been deleted successfully", namespace, instId, appId) writeHeader(http.StatusOK, rsp) return } - log.Errorf("[EUREKA-SERVER]instance status (namespace=%s, instId=%s, appId=%s) "+ + eurekalog.Errorf("[EUREKA-SERVER]instance status (namespace=%s, instId=%s, appId=%s) "+ "has been deleted failed, code is %d", namespace, instId, appId, code) if code == api.NotFoundResource { @@ -504,7 +504,7 @@ func (h *EurekaServer) RenewInstance(req *restful.Request, rsp *restful.Response remoteAddr := req.Request.RemoteAddr appId := readAppIdFromRequest(req) if len(appId) == 0 { - log.Errorf("[EurekaServer] fail to parse request uri, uri: %s, client: %s, err: %s", + eurekalog.Errorf("[EurekaServer] fail to parse request uri, uri: %s, client: %s, err: %s", req.Request.RequestURI, remoteAddr, "service name is empty") writePolarisStatusCode(req, api.InvalidServiceName) writeHeader(http.StatusBadRequest, rsp) @@ -512,7 +512,7 @@ func (h *EurekaServer) RenewInstance(req *restful.Request, rsp *restful.Response } instId := req.PathParameter(ParamInstId) if len(instId) == 0 { - log.Errorf("[EUREKA-SERVER] fail to parse request uri, uri: %s, client: %s, err: %s", + eurekalog.Errorf("[EUREKA-SERVER] fail to parse request uri, uri: %s, client: %s, err: %s", req.Request.RequestURI, remoteAddr, "instance id is required") writePolarisStatusCode(req, api.InvalidInstanceID) writeHeader(http.StatusBadRequest, rsp) @@ -525,7 +525,7 @@ func (h *EurekaServer) RenewInstance(req *restful.Request, rsp *restful.Response writeHeader(http.StatusOK, rsp) return } - log.Errorf("[EUREKA-SERVER]instance (namespace=%s, instId=%s, appId=%s) heartbeat failed, code is %d", + eurekalog.Errorf("[EUREKA-SERVER]instance (namespace=%s, instId=%s, appId=%s) heartbeat failed, code is %d", namespace, instId, appId, code) if code == api.NotFoundResource { writeHeader(http.StatusNotFound, rsp) @@ -539,7 +539,7 @@ func (h *EurekaServer) CancelInstance(req *restful.Request, rsp *restful.Respons appId := readAppIdFromRequest(req) remoteAddr := req.Request.RemoteAddr if len(appId) == 0 { - log.Errorf("[EurekaServer] fail to parse request uri, uri: %s, client: %s, err: %s", + eurekalog.Errorf("[EurekaServer] fail to parse request uri, uri: %s, client: %s, err: %s", req.Request.RequestURI, remoteAddr, "service name is empty") writePolarisStatusCode(req, api.InvalidServiceName) writeHeader(http.StatusBadRequest, rsp) @@ -547,26 +547,26 @@ func (h *EurekaServer) CancelInstance(req *restful.Request, rsp *restful.Respons } instId := req.PathParameter(ParamInstId) if len(instId) == 0 { - log.Errorf("[EUREKA-SERVER] fail to parse request uri, uri: %s, client: %s, err: %s", + eurekalog.Errorf("[EUREKA-SERVER] fail to parse request uri, uri: %s, client: %s, err: %s", req.Request.RequestURI, remoteAddr, "instance id is required") writePolarisStatusCode(req, api.InvalidInstanceID) writeHeader(http.StatusBadRequest, rsp) return } namespace := readNamespaceFromRequest(req, h.namespace) - log.Infof("[EUREKA-SERVER]received instance deregistered request, "+ + eurekalog.Infof("[EUREKA-SERVER]received instance deregistered request, "+ "client: %s, namespace: %s, instId: %s, appId: %s", remoteAddr, namespace, instId, appId) code := h.deregisterInstance(context.Background(), namespace, appId, instId, false) writePolarisStatusCode(req, code) if code == api.ExecuteSuccess || code == api.NotFoundResource || code == api.SameInstanceRequest { writeHeader(http.StatusOK, rsp) - log.Infof("[EUREKA-SERVER]instance (namespace=%s, instId=%s, appId=%s) "+ + eurekalog.Infof("[EUREKA-SERVER]instance (namespace=%s, instId=%s, appId=%s) "+ "has been deregistered successfully, code is %d", namespace, instId, appId, code) return } - log.Errorf("[EUREKA-SERVER]instance (namespace=%s, instId=%s, appId=%s) has been deregistered failed,"+ + eurekalog.Errorf("[EUREKA-SERVER]instance (namespace=%s, instId=%s, appId=%s) has been deregistered failed,"+ " code is %d", namespace, instId, appId, code) writeHeader(int(code/1000), rsp) @@ -577,7 +577,7 @@ func (h *EurekaServer) GetInstance(req *restful.Request, rsp *restful.Response) remoteAddr := req.Request.RemoteAddr instId := req.PathParameter(ParamInstId) if len(instId) == 0 { - log.Errorf("[EUREKA-SERVER] fail to parse request uri, uri: %s, client: %s, err: %s", + eurekalog.Errorf("[EUREKA-SERVER] fail to parse request uri, uri: %s, client: %s, err: %s", req.Request.RequestURI, remoteAddr, "instance id is required") writePolarisStatusCode(req, api.InvalidInstanceID) writeHeader(http.StatusBadRequest, rsp) @@ -600,7 +600,7 @@ func (h *EurekaServer) GetInstance(req *restful.Request, rsp *restful.Response) output = insResp } if err := writeEurekaResponse(acceptValue, output, req, rsp); nil != err { - log.Errorf("[EurekaServer]fail to write instance, client: %s, err: %v", remoteAddr, err) + eurekalog.Errorf("[EurekaServer]fail to write instance, client: %s, err: %v", remoteAddr, err) } } @@ -609,7 +609,7 @@ func (h *EurekaServer) UpdateMetadata(req *restful.Request, rsp *restful.Respons remoteAddr := req.Request.RemoteAddr appId := readAppIdFromRequest(req) if len(appId) == 0 { - log.Errorf("[EurekaServer] fail to parse request uri, uri: %s, client: %s, err: %s", + eurekalog.Errorf("[EurekaServer] fail to parse request uri, uri: %s, client: %s, err: %s", req.Request.RequestURI, remoteAddr, "service name is empty") writePolarisStatusCode(req, api.InvalidServiceName) writeHeader(http.StatusBadRequest, rsp) @@ -617,7 +617,7 @@ func (h *EurekaServer) UpdateMetadata(req *restful.Request, rsp *restful.Respons } instId := req.PathParameter(ParamInstId) if len(instId) == 0 { - log.Errorf("[EUREKA-SERVER] fail to parse request uri, uri: %s, client: %s, err: %s", + eurekalog.Errorf("[EUREKA-SERVER] fail to parse request uri, uri: %s, client: %s, err: %s", req.Request.RequestURI, remoteAddr, "instance id is required") writePolarisStatusCode(req, api.InvalidInstanceID) writeHeader(http.StatusBadRequest, rsp) @@ -636,12 +636,12 @@ func (h *EurekaServer) UpdateMetadata(req *restful.Request, rsp *restful.Respons code := h.updateMetadata(context.Background(), namespace, appId, instId, metadataMap) writePolarisStatusCode(req, code) if code == api.ExecuteSuccess { - log.Infof("[EUREKA-SERVER]instance metadata (namespace=%s, instId=%s, appId=%s) has been updated successfully", + eurekalog.Infof("[EUREKA-SERVER]instance metadata (namespace=%s, instId=%s, appId=%s) has been updated successfully", namespace, instId, appId) writeHeader(http.StatusOK, rsp) return } - log.Errorf("[EUREKA-SERVER]instance metadata (namespace=%s, instId=%s, appId=%s) has been updated failed, "+ + eurekalog.Errorf("[EUREKA-SERVER]instance metadata (namespace=%s, instId=%s, appId=%s) has been updated failed, "+ "code is %d", namespace, instId, appId, code) if code == api.NotFoundResource { @@ -656,7 +656,7 @@ func (h *EurekaServer) QueryByVipAddress(req *restful.Request, rsp *restful.Resp remoteAddr := req.Request.RemoteAddr vipAddress := req.PathParameter(ParamVip) if len(vipAddress) == 0 { - log.Errorf("[EurekaServer] fail to parse request uri, uri: %s, client: %s, err: %s", + eurekalog.Errorf("[EurekaServer] fail to parse request uri, uri: %s, client: %s, err: %s", req.Request.RequestURI, remoteAddr, "vip address is empty") writePolarisStatusCode(req, api.InvalidParameter) writeHeader(http.StatusBadRequest, rsp) @@ -670,7 +670,7 @@ func (h *EurekaServer) QueryByVipAddress(req *restful.Request, rsp *restful.Resp }) acceptValue := getParamFromEurekaRequestHeader(req, restful.HEADER_Accept) if err := writeResponse(parseAcceptValue(acceptValue), appsRespCache, req, rsp); nil != err { - log.Errorf("[EurekaServer]fail to write vip applications, client: %s, err: %v", remoteAddr, err) + eurekalog.Errorf("[EurekaServer]fail to write vip applications, client: %s, err: %v", remoteAddr, err) } } @@ -679,7 +679,7 @@ func (h *EurekaServer) QueryBySVipAddress(req *restful.Request, rsp *restful.Res remoteAddr := req.Request.RemoteAddr vipAddress := req.PathParameter(ParamSVip) if len(vipAddress) == 0 { - log.Errorf("[EurekaServer] fail to parse request uri, uri: %s, client: %s, err: %s", + eurekalog.Errorf("[EurekaServer] fail to parse request uri, uri: %s, client: %s, err: %s", req.Request.RequestURI, remoteAddr, "svip address is empty") writePolarisStatusCode(req, api.InvalidParameter) writeHeader(http.StatusBadRequest, rsp) @@ -692,6 +692,6 @@ func (h *EurekaServer) QueryBySVipAddress(req *restful.Request, rsp *restful.Res }) acceptValue := getParamFromEurekaRequestHeader(req, restful.HEADER_Accept) if err := writeResponse(parseAcceptValue(acceptValue), appsRespCache, req, rsp); nil != err { - log.Errorf("[EurekaServer]fail to write svip applications, client: %s, err: %v", remoteAddr, err) + eurekalog.Errorf("[EurekaServer]fail to write svip applications, client: %s, err: %v", remoteAddr, err) } } diff --git a/apiserver/eurekaserver/applications.go b/apiserver/eurekaserver/applications.go index 27b5ae722..ca2facd0a 100644 --- a/apiserver/eurekaserver/applications.go +++ b/apiserver/eurekaserver/applications.go @@ -89,7 +89,7 @@ func (a *ApplicationsBuilder) BuildApplications(oldAppsCache *ApplicationsRespCa for _, newService := range newServices { instances, revision, err := getCacheInstancesFunc(a.namingServer, newService.ID) if err != nil { - log.Errorf("[EurekaServer]fail to get revision for service %s, err is %v", newService.Name, err) + eurekalog.Errorf("[EurekaServer]fail to get revision for service %s, err is %v", newService.Name, err) continue } // eureka does not return services without instances @@ -214,12 +214,12 @@ func parsePortWrapper(info *InstanceInfo, instance *apiservice.Instance) { sePort, err := strconv.Atoi(securePort) if err != nil { sePort = 0 - log.Errorf("[EUREKA_SERVER]parse secure port error: %+v", err) + eurekalog.Errorf("[EUREKA_SERVER]parse secure port error: %+v", err) } sePortEnabled, err := strconv.ParseBool(securePortEnabled) if err != nil { sePortEnabled = false - log.Errorf("[EUREKA_SERVER]parse secure port enabled error: %+v", err) + eurekalog.Errorf("[EUREKA_SERVER]parse secure port enabled error: %+v", err) } info.SecurePort.Port = sePort @@ -228,12 +228,12 @@ func parsePortWrapper(info *InstanceInfo, instance *apiservice.Instance) { insePort, err := strconv.Atoi(insecurePort) if err != nil { insePort = 0 - log.Errorf("[EUREKA_SERVER]parse insecure port error: %+v", err) + eurekalog.Errorf("[EUREKA_SERVER]parse insecure port error: %+v", err) } insePortEnabled, err := strconv.ParseBool(insecurePortEnabled) if err != nil { insePortEnabled = false - log.Errorf("[EUREKA_SERVER]parse insecure port enabled error: %+v", err) + eurekalog.Errorf("[EUREKA_SERVER]parse insecure port enabled error: %+v", err) } info.Port.Port = insePort @@ -430,20 +430,20 @@ func constructResponseCache(newApps *Applications, instCount int, delta bool) *A // 预先做一次序列化,以免高并发时候序列化会使得内存峰值过高 jsonBytes, err := json.MarshalIndent(newAppsCache.AppsResp, "", " ") if err != nil { - log.Errorf("[EUREKA_SERVER]fail to marshal apps %s to json, err is %v", appsHashCode, err) + eurekalog.Errorf("[EUREKA_SERVER]fail to marshal apps %s to json, err is %v", appsHashCode, err) } else { newAppsCache.JsonBytes = jsonBytes } xmlBytes, err := xml.MarshalIndent(newAppsCache.AppsResp.Applications, " ", " ") if err != nil { - log.Errorf("[EUREKA_SERVER]fail to marshal apps %s to xml, err is %v", appsHashCode, err) + eurekalog.Errorf("[EUREKA_SERVER]fail to marshal apps %s to xml, err is %v", appsHashCode, err) } else { newAppsCache.XmlBytes = xmlBytes } if !delta && len(jsonBytes) > 0 { newAppsCache.Revision = sha1s(jsonBytes) } - log.Infof("[EUREKA_SERVER]success to build apps cache, delta is %v, "+ + eurekalog.Infof("[EUREKA_SERVER]success to build apps cache, delta is %v, "+ "length xmlBytes is %d, length jsonBytes is %d, instCount is %d", delta, len(xmlBytes), len(jsonBytes), instCount) return newAppsCache } diff --git a/apiserver/eurekaserver/delta_worker.go b/apiserver/eurekaserver/delta_worker.go index 71c8a7800..acf624c8d 100644 --- a/apiserver/eurekaserver/delta_worker.go +++ b/apiserver/eurekaserver/delta_worker.go @@ -181,7 +181,7 @@ func (a *ApplicationsWorker) cleanupExpiredLeases() { startIndex = i break } - log.Infof("[Eureka]lease %s(%s) has expired, lastUpdateTime %d, curTimeSec %d", + eurekalog.Infof("[Eureka]lease %s(%s) has expired, lastUpdateTime %d, curTimeSec %d", lease.instance.InstanceId, lease.instance.ActionType, lease.lastUpdateTimeSec, curTimeSec) } if startIndex == -1 && len(a.leases) > 0 { @@ -305,7 +305,7 @@ func diffApplicationInstances(curTimeSec int64, oldApplication *Application, new } func addLease(out []*Lease, lease *Lease) []*Lease { - log.Infof("[EUREKA] add delta instance %s(%s)", lease.instance.InstanceId, lease.instance.ActionType) + eurekalog.Infof("[EUREKA] add delta instance %s(%s)", lease.instance.InstanceId, lease.instance.ActionType) out = append(out, lease) return out } diff --git a/apiserver/eurekaserver/eureka_suit_test.go b/apiserver/eurekaserver/eureka_suit_test.go index 776e560a3..d1f6f37aa 100644 --- a/apiserver/eurekaserver/eureka_suit_test.go +++ b/apiserver/eurekaserver/eureka_suit_test.go @@ -138,7 +138,7 @@ func (d *EurekaTestSuit) initialize(t *testing.T, callback func(t *testing.T, s bc, err := batch.NewBatchCtrlWithConfig(d.storage, cacheMgn, batchConfig) if err != nil { - log.Errorf("new batch ctrl with config err: %s", err.Error()) + eurekalog.Errorf("new batch ctrl with config err: %s", err.Error()) return err } bc.Start(ctx) diff --git a/apiserver/eurekaserver/log.go b/apiserver/eurekaserver/log.go index 575e87cd5..ff05ea28a 100644 --- a/apiserver/eurekaserver/log.go +++ b/apiserver/eurekaserver/log.go @@ -21,4 +21,7 @@ import ( commonlog "github.com/polarismesh/polaris/common/log" ) -var log = commonlog.GetScopeOrDefaultByName(commonlog.APIServerLoggerName) +var ( + accesslog = commonlog.GetScopeOrDefaultByName(commonlog.APIServerLoggerName) + eurekalog = commonlog.GetScopeOrDefaultByName("eureka") +) diff --git a/apiserver/eurekaserver/replicate.go b/apiserver/eurekaserver/replicate.go index 78a8f5d41..8b4f8ea65 100644 --- a/apiserver/eurekaserver/replicate.go +++ b/apiserver/eurekaserver/replicate.go @@ -52,14 +52,14 @@ const ( // BatchReplication do the server request replication func (h *EurekaServer) BatchReplication(req *restful.Request, rsp *restful.Response) { - log.Infof("[EUREKA-SERVER] received replicate request %+v", req) + eurekalog.Infof("[EUREKA-SERVER] received replicate request %+v", req) sourceSvrName := req.HeaderParameter(headerIdentityName) remoteAddr := req.Request.RemoteAddr if sourceSvrName == valueIdentityName { // we should not process the replication from polaris batchResponse := &ReplicationListResponse{ResponseList: []*ReplicationInstanceResponse{}} if err := writeEurekaResponse(restful.MIME_JSON, batchResponse, req, rsp); nil != err { - log.Errorf("[EurekaServer]fail to write replicate response, client: %s, err: %v", remoteAddr, err) + eurekalog.Errorf("[EurekaServer]fail to write replicate response, client: %s, err: %v", remoteAddr, err) } return } @@ -67,7 +67,7 @@ func (h *EurekaServer) BatchReplication(req *restful.Request, rsp *restful.Respo var err error err = req.ReadEntity(replicateRequest) if nil != err { - log.Errorf("[EUREKA-SERVER] fail to parse peer replicate request, uri: %s, client: %s, err: %v", + eurekalog.Errorf("[EUREKA-SERVER] fail to parse peer replicate request, uri: %s, client: %s, err: %v", req.Request.RequestURI, remoteAddr, err) writePolarisStatusCode(req, api.ParseException) writeHeader(http.StatusBadRequest, rsp) @@ -75,7 +75,7 @@ func (h *EurekaServer) BatchReplication(req *restful.Request, rsp *restful.Respo } token, err := getAuthFromEurekaRequestHeader(req) if err != nil { - log.Infof("[EUREKA-SERVER]replicate request get basic auth info fail, code is %d", api.ExecuteException) + eurekalog.Infof("[EUREKA-SERVER]replicate request get basic auth info fail, code is %d", api.ExecuteException) writePolarisStatusCode(req, api.ExecuteException) writeHeader(http.StatusForbidden, rsp) return @@ -83,7 +83,7 @@ func (h *EurekaServer) BatchReplication(req *restful.Request, rsp *restful.Respo namespace := readNamespaceFromRequest(req, h.namespace) batchResponse, resultCode := h.doBatchReplicate(replicateRequest, token, namespace) if err := writeEurekaResponseWithCode(restful.MIME_JSON, batchResponse, req, rsp, resultCode); nil != err { - log.Errorf("[EurekaServer]fail to write replicate response, client: %s, err: %v", remoteAddr, err) + eurekalog.Errorf("[EurekaServer]fail to write replicate response, client: %s, err: %v", remoteAddr, err) } } @@ -105,7 +105,8 @@ func (h *EurekaServer) doBatchReplicate( resp, code := h.dispatch(instanceInfo, token, namespace) if code != api.ExecuteSuccess { atomic.CompareAndSwapUint32(&resultCode, api.ExecuteSuccess, code) - log.Warnf("[EUREKA-SERVER] fail to process replicate instance request, code is %d, action %s, instance %s, app %s", + eurekalog.Warnf("[EUREKA-SERVER] fail to process replicate instance request, code is %d, "+ + "action %s, instance %s, app %s", code, instanceInfo.Action, instanceInfo.Id, instanceInfo.AppName) } mutex.Lock() @@ -122,10 +123,10 @@ func (h *EurekaServer) dispatch( appName := formatReadName(replicationInstance.AppName) ctx := context.WithValue(context.Background(), utils.ContextAuthTokenKey, token) var retCode = api.ExecuteSuccess - log.Debugf("[EurekaServer]dispatch replicate request %+v", replicationInstance) + eurekalog.Debugf("[EurekaServer]dispatch replicate request %+v", replicationInstance) if nil != replicationInstance.InstanceInfo { _ = convertInstancePorts(replicationInstance.InstanceInfo) - log.Debugf("[EurekaServer]dispatch replicate instance %+v, port %+v, sport %+v", + eurekalog.Debugf("[EurekaServer]dispatch replicate instance %+v, port %+v, sport %+v", replicationInstance.InstanceInfo, replicationInstance.InstanceInfo.Port, replicationInstance.InstanceInfo.SecurePort) } switch replicationInstance.Action { @@ -202,7 +203,7 @@ func (h *EurekaServer) shouldReplicate(e model.InstanceEvent) bool { } if len(e.Service) == 0 { - log.Warnf("[EUREKA]fail to replicate, service name is empty for event %s", e) + eurekalog.Warnf("[EUREKA]fail to replicate, service name is empty for event %s", e) return false } metadata := e.MetaData diff --git a/apiserver/eurekaserver/replicate_test.go b/apiserver/eurekaserver/replicate_test.go index 797e5cc6b..1fb782553 100644 --- a/apiserver/eurekaserver/replicate_test.go +++ b/apiserver/eurekaserver/replicate_test.go @@ -53,7 +53,7 @@ func TestDispatchHeartbeat(t *testing.T) { var replicateInstances = &ReplicationList{} for i, instance := range instances { - log.Infof("replicate test: register %d", i) + eurekalog.Infof("replicate test: register %d", i) replicateInstances.ReplicationList = append(replicateInstances.ReplicationList, &ReplicationInstance{ AppName: appId, Id: instance.InstanceId, @@ -66,7 +66,7 @@ func TestDispatchHeartbeat(t *testing.T) { time.Sleep(10 * time.Second) for i := 0; i < 5; i++ { - log.Infof("replicate test: heartbeat %d", i) + eurekalog.Infof("replicate test: heartbeat %d", i) replicateInstances = &ReplicationList{} for _, instance := range instances { replicateInstances.ReplicationList = append(replicateInstances.ReplicationList, &ReplicationInstance{ diff --git a/apiserver/eurekaserver/replicate_worker.go b/apiserver/eurekaserver/replicate_worker.go index ced802e91..e99f83825 100644 --- a/apiserver/eurekaserver/replicate_worker.go +++ b/apiserver/eurekaserver/replicate_worker.go @@ -64,7 +64,7 @@ func (r *ReplicateWorker) batchReplicate() { for { select { case <-r.ctx.Done(): - log.Infof("[EUREKA-SERVER] replicate worker done") + eurekalog.Infof("[EUREKA-SERVER] replicate worker done") batchTicker.Stop() case task := <-r.taskChannel: batchTasks = append(batchTasks, task) @@ -101,14 +101,14 @@ func (r *ReplicateWorker) doBatchReplicate(tasks []*ReplicationInstance) { } jsonData, err := json.Marshal(request) if nil != err { - log.Errorf("[EUREKA-SERVER] fail to marshal replicate tasks: %v", err) + eurekalog.Errorf("[EUREKA-SERVER] fail to marshal replicate tasks: %v", err) return } replicateInfo := make([]string, 0, len(tasks)) for _, task := range tasks { replicateInfo = append(replicateInfo, fmt.Sprintf("%s:%s", task.Action, task.Id)) } - log.Infof("start to send replicate text %s, peers %v", string(jsonData), r.peers) + eurekalog.Infof("start to send replicate text %s, peers %v", string(jsonData), r.peers) for _, peer := range r.peers { go r.doReplicateToPeer(peer, tasks, jsonData, replicateInfo) } @@ -118,7 +118,7 @@ func (r *ReplicateWorker) doReplicateToPeer( peer string, tasks []*ReplicationInstance, jsonData []byte, replicateInfo []string) { response, err := sendHttpRequest(r.namespace, peer, jsonData, replicateInfo) if nil != err { - log.Errorf("[EUREKA-SERVER] fail to batch replicate to %s, err: %v", peer, err) + eurekalog.Errorf("[EUREKA-SERVER] fail to batch replicate to %s, err: %v", peer, err) return } if len(response.ResponseList) == 0 { @@ -128,7 +128,7 @@ func (r *ReplicateWorker) doReplicateToPeer( if respInstance.StatusCode == http.StatusNotFound { task := tasks[i] if task.Action == actionHeartbeat { - log.Infof("[EUREKA-SERVER] instance %s of service %s not exists in %s, do register instance info %+v", + eurekalog.Infof("[EUREKA-SERVER] instance %s of service %s not exists in %s, do register instance info %+v", task.Id, task.AppName, peer, task.InstanceInfo) // do the re-register registerTask := &ReplicationInstance{ @@ -170,7 +170,7 @@ func sendHttpRequest(namespace string, peer string, req, err := http.NewRequest(http.MethodPost, fmt.Sprintf("http://%s/eureka/peerreplication/batch/", peer), bytes.NewBuffer(jsonData)) if nil != err { - log.Errorf("[EUREKA-SERVER] fail to create replicate request: %v", err) + eurekalog.Errorf("[EUREKA-SERVER] fail to create replicate request: %v", err) return nil, err } req.Header.Set(headerIdentityName, valueIdentityName) @@ -182,7 +182,7 @@ func sendHttpRequest(namespace string, peer string, } response, err := client.Do(req) if err != nil { - log.Errorf("[EUREKA-SERVER] fail to send replicate request: %v", err) + eurekalog.Errorf("[EUREKA-SERVER] fail to send replicate request: %v", err) return nil, err } defer func() { @@ -194,9 +194,9 @@ func sendHttpRequest(namespace string, peer string, respObj := &ReplicationListResponse{} err = json.Unmarshal(respStr, respObj) if nil != err { - log.Errorf("[EUREKA-SERVER] fail unmarshal text %s to ReplicationListResponse: %v", string(respStr), err) + eurekalog.Errorf("[EUREKA-SERVER] fail unmarshal text %s to ReplicationListResponse: %v", string(respStr), err) return nil, err } - log.Infof("[EUREKA-SERVER] success to replicate to %s, instances %v", peer, replicateInfo) + eurekalog.Infof("[EUREKA-SERVER] success to replicate to %s, instances %v", peer, replicateInfo) return respObj, nil } diff --git a/apiserver/eurekaserver/server.go b/apiserver/eurekaserver/server.go index 892bffe87..986c88951 100644 --- a/apiserver/eurekaserver/server.go +++ b/apiserver/eurekaserver/server.go @@ -251,7 +251,7 @@ func (h *EurekaServer) Initialize(ctx context.Context, option map[string]interfa } } - log.Infof("[EUREKA] custom eureka parameters: %v", CustomEurekaParameters) + eurekalog.Infof("[EUREKA] custom eureka parameters: %v", CustomEurekaParameters) return nil } @@ -307,7 +307,7 @@ func parsePeersToReplicate(defaultNamespace string, replicatePeerObjs []interfac // Run 启动HTTP API服务器 func (h *EurekaServer) Run(errCh chan error) { - log.Infof("start EurekaServer") + eurekalog.Infof("start EurekaServer") h.exitCh = make(chan struct{}) h.start = true defer func() { @@ -318,13 +318,13 @@ func (h *EurekaServer) Run(errCh chan error) { // 引入功能模块和插件 h.namingServer, err = service.GetServer() if err != nil { - log.Errorf("%v", err) + eurekalog.Errorf("%v", err) errCh <- err return } h.healthCheckServer, err = healthcheck.GetServer() if err != nil { - log.Errorf("%v", err) + eurekalog.Errorf("%v", err) errCh <- err return } @@ -353,18 +353,18 @@ func (h *EurekaServer) Run(errCh chan error) { ln, err := net.Listen("tcp", address) if err != nil { - log.Errorf("net listen(%s) err: %s", address, err.Error()) + eurekalog.Errorf("net listen(%s) err: %s", address, err.Error()) errCh <- err return } ln = keepalive.NewTcpKeepAliveListener(3*time.Minute, ln.(*net.TCPListener)) // 开启最大连接数限制 if h.connLimitConfig != nil && h.connLimitConfig.OpenConnLimit { - log.Infof("http server use max connection limit per ip: %d, http max limit: %d", + eurekalog.Infof("http server use max connection limit per ip: %d, http max limit: %d", h.connLimitConfig.MaxConnPerHost, h.connLimitConfig.MaxConnLimit) ln, err = connlimit.NewListener(ln, h.GetProtocol(), h.connLimitConfig) if err != nil { - log.Errorf("conn limit init err: %s", err.Error()) + eurekalog.Errorf("conn limit init err: %s", err.Error()) errCh <- err return } @@ -378,14 +378,14 @@ func (h *EurekaServer) Run(errCh chan error) { err = server.ServeTLS(ln, h.tlsInfo.CertFile, h.tlsInfo.KeyFile) } if err != nil && err != http.ErrServerClosed { - log.Errorf("%+v", err) + eurekalog.Errorf("%+v", err) if !h.restart { - log.Infof("not in restart progress, broadcast error") + eurekalog.Infof("not in restart progress, broadcast error") errCh <- err } return } - log.Infof("EurekaServer stop") + eurekalog.Infof("EurekaServer stop") } // 创建handler @@ -431,7 +431,7 @@ func (h *EurekaServer) preprocess(req *restful.Request, rsp *restful.Response) e if isImportantRequest(req) { // 打印请求 - log.Info("receive request", + accesslog.Info("receive request", zap.String("client-address", req.Request.RemoteAddr), zap.String("user-agent", req.HeaderParameter("User-Agent")), zap.String("method", req.Request.Method), @@ -460,7 +460,7 @@ func (h *EurekaServer) enterRateLimit(req *restful.Request, rsp *restful.Respons return nil } if ok := h.rateLimit.Allow(plugin.IPRatelimit, segments[0]); !ok { - log.Error("ip ratelimit is not allow", zap.String("client", address)) + accesslog.Error("ip ratelimit is not allow", zap.String("client", address)) RateLimitResponse(rsp) return errors.New("ip ratelimit is not allow") } @@ -469,7 +469,7 @@ func (h *EurekaServer) enterRateLimit(req *restful.Request, rsp *restful.Respons apiName := fmt.Sprintf("%s:%s", req.Request.Method, strings.TrimSuffix(req.Request.URL.Path, "/")) if ok := h.rateLimit.Allow(plugin.APIRatelimit, apiName); !ok { - log.Error("api ratelimit is not allow", zap.String("client", address), zap.String("api", apiName)) + accesslog.Error("api ratelimit is not allow", zap.String("client", address), zap.String("api", apiName)) RateLimitResponse(rsp) return errors.New("api ratelimit is not allow") } @@ -505,7 +505,7 @@ func (h *EurekaServer) postproccess(req *restful.Request, rsp *restful.Response) diff := now.Sub(startTime) // 打印耗时超过1s的请求 if diff > time.Second { - log.Info("handling time > 1s", + accesslog.Info("handling time > 1s", zap.String("client-address", req.Request.RemoteAddr), zap.String("user-agent", req.HeaderParameter("User-Agent")), zap.String("method", req.Request.Method), @@ -589,7 +589,7 @@ func (h *EurekaServer) Stop() { // Restart 重启eurekaServer func (h *EurekaServer) Restart( option map[string]interface{}, api map[string]apiserver.APIConfig, errCh chan error) error { - log.Infof("restart httpserver new config: %+v", option) + eurekalog.Infof("restart httpserver new config: %+v", option) // 备份一下option backupOption := h.option // 备份一下api @@ -604,21 +604,21 @@ func (h *EurekaServer) Restart( <-h.exitCh } - log.Infof("old httpserver has stopped, begin restart httpserver") + eurekalog.Infof("old httpserver has stopped, begin restart httpserver") if err := h.Initialize(context.Background(), option, api); err != nil { h.restart = false if initErr := h.Initialize(context.Background(), backupOption, backupAPI); initErr != nil { - log.Errorf("start httpserver with backup cfg err: %s", initErr.Error()) + eurekalog.Errorf("start httpserver with backup cfg err: %s", initErr.Error()) return initErr } go h.Run(errCh) - log.Errorf("restart httpserver initialize err: %s", err.Error()) + eurekalog.Errorf("restart httpserver initialize err: %s", err.Error()) return err } - log.Infof("init httpserver successfully, restart it") + eurekalog.Infof("init httpserver successfully, restart it") h.restart = false go h.Run(errCh) return nil diff --git a/apiserver/eurekaserver/write_test.go b/apiserver/eurekaserver/write_test.go index ab1acfe14..d6d41a2d2 100644 --- a/apiserver/eurekaserver/write_test.go +++ b/apiserver/eurekaserver/write_test.go @@ -115,7 +115,7 @@ func TestEurekaServer_renew(t *testing.T) { s.EXPECT().GetInstancesCount().AnyTimes().Return(uint32(1), nil) s.EXPECT().GetUnixSecond(gomock.Any()).AnyTimes().Return(time.Now().Unix(), nil) s.EXPECT().GetServicesCount().Return(uint32(1), nil).AnyTimes() - s.EXPECT().StartLeaderElection(gomock.Any()).Times(1) + s.EXPECT().StartLeaderElection(gomock.Any()).AnyTimes() s.EXPECT().Destroy().Return(nil) return nil }); err != nil { diff --git a/apiserver/grpcserver/config/server.go b/apiserver/grpcserver/config/server.go index 629d8d0bd..dbd08bf51 100644 --- a/apiserver/grpcserver/config/server.go +++ b/apiserver/grpcserver/config/server.go @@ -60,7 +60,7 @@ func (g *ConfigGRPCServer) Initialize(ctx context.Context, option map[string]int return g.BaseGrpcServer.Initialize(ctx, option, grpcserver.WithModule(model.ConfigModule), grpcserver.WithProtocol(g.GetProtocol()), - grpcserver.WithLogger(configLog), + grpcserver.WithLogger(commonlog.FindScope(commonlog.APIServerLoggerName)), ) } diff --git a/apiserver/grpcserver/discover/server.go b/apiserver/grpcserver/discover/server.go index 9cbb9a0fc..c337c08c0 100644 --- a/apiserver/grpcserver/discover/server.go +++ b/apiserver/grpcserver/discover/server.go @@ -153,7 +153,7 @@ func (g *GRPCServer) buildInitOptions(option map[string]interface{}) []grpcserve initOptions := []grpcserver.InitOption{ grpcserver.WithModule(model.DiscoverModule), grpcserver.WithProtocol(g.GetProtocol()), - grpcserver.WithLogger(namingLog), + grpcserver.WithLogger(commonlog.FindScope(commonlog.APIServerLoggerName)), grpcserver.WithMessageToCacheObject(discoverCacheConvert), } diff --git a/apiserver/xdsserverv3/resource/help.go b/apiserver/xdsserverv3/resource/help.go index ec33aa42d..ba1be2c1a 100644 --- a/apiserver/xdsserverv3/resource/help.go +++ b/apiserver/xdsserverv3/resource/help.go @@ -22,6 +22,7 @@ import ( "fmt" "math" "strconv" + "strings" "time" accesslog "github.com/envoyproxy/go-control-plane/envoy/config/accesslog/v3" @@ -29,11 +30,14 @@ import ( core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" corev3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" listenerv3 "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" + ratelimitconfv3 "github.com/envoyproxy/go-control-plane/envoy/config/ratelimit/v3" route "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" filev3 "github.com/envoyproxy/go-control-plane/envoy/extensions/access_loggers/file/v3" envoy_extensions_common_ratelimit_v3 "github.com/envoyproxy/go-control-plane/envoy/extensions/common/ratelimit/v3" ratelimitv32 "github.com/envoyproxy/go-control-plane/envoy/extensions/common/ratelimit/v3" lrl "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/local_ratelimit/v3" + ratelimitfilter "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/ratelimit/v3" + routerv3 "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/router/v3" hcm "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3" tcp "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/tcp_proxy/v3" v32 "github.com/envoyproxy/go-control-plane/envoy/type/matcher/v3" @@ -66,6 +70,13 @@ const ( InBoundRouteConfigName = "polaris-inbound-cluster" ) +const ( + // LocalRateLimitStage envoy local ratelimit stage + LocalRateLimitStage = 0 + // DistributedRateLimitStage envoy remote ratelimit stage + DistributedRateLimitStage = 1 +) + var ( TrafficBoundRoute = map[corev3.TrafficDirection]string{ corev3.TrafficDirection_INBOUND: InBoundRouteConfigName, @@ -286,7 +297,7 @@ func BuildRateLimitConf(prefix string) *lrl.LocalRateLimit { return rateLimitConf } -func BuildLocalRateLimitDescriptors(rule *traffic_manage.Rule) ([]*route.RateLimit_Action, +func BuildRateLimitDescriptors(rule *traffic_manage.Rule) ([]*route.RateLimit_Action, []*ratelimitv32.LocalRateLimitDescriptor) { actions := make([]*route.RateLimit_Action, 0, 8) descriptors := make([]*ratelimitv32.LocalRateLimitDescriptor, 0, 8) @@ -309,34 +320,35 @@ func BuildLocalRateLimitDescriptors(rule *traffic_manage.Rule) ([]*route.RateLim }, }) entries = append(entries, &ratelimitv32.RateLimitDescriptor_Entry{ - Key: "header_match", + Key: ":path", Value: methodName, }) arguments := rule.GetArguments() for i := range arguments { arg := arguments[i] + descriptorKey := strings.ToLower(arg.GetType().String()) + "." + arg.Key switch arg.Type { case apitraffic.MatchArgument_HEADER: - headerValueMatch := BuildRateLimitActionHeaderValueMatch(arg.Key, arg.Value) + headerValueMatch := BuildRateLimitActionHeaderValueMatch(descriptorKey, arg.Value) actions = append(actions, &route.RateLimit_Action{ ActionSpecifier: &route.RateLimit_Action_HeaderValueMatch_{ HeaderValueMatch: headerValueMatch, }, }) entries = append(entries, &ratelimitv32.RateLimitDescriptor_Entry{ - Key: "header_match", + Key: descriptorKey, Value: arg.GetValue().GetValue().GetValue(), }) case apitraffic.MatchArgument_QUERY: - queryParameterValueMatch := BuildRateLimitActionQueryParameterValueMatch(arg.Key, arg.Value) + queryParameterValueMatch := BuildRateLimitActionQueryParameterValueMatch(descriptorKey, arg.Value) actions = append(actions, &route.RateLimit_Action{ ActionSpecifier: &route.RateLimit_Action_QueryParameterValueMatch_{ QueryParameterValueMatch: queryParameterValueMatch, }, }) entries = append(entries, &ratelimitv32.RateLimitDescriptor_Entry{ - Key: "query_match", + Key: descriptorKey, Value: arg.GetValue().GetValue().GetValue(), }) case apitraffic.MatchArgument_METHOD: @@ -344,12 +356,12 @@ func BuildLocalRateLimitDescriptors(rule *traffic_manage.Rule) ([]*route.RateLim ActionSpecifier: &route.RateLimit_Action_RequestHeaders_{ RequestHeaders: &route.RateLimit_Action_RequestHeaders{ HeaderName: ":method", - DescriptorKey: arg.Key, + DescriptorKey: descriptorKey, }, }, }) entries = append(entries, &envoy_extensions_common_ratelimit_v3.RateLimitDescriptor_Entry{ - Key: arg.Key, + Key: descriptorKey, Value: arg.GetValue().GetValue().GetValue(), }) case apitraffic.MatchArgument_CALLER_IP: @@ -421,6 +433,7 @@ func BuildRateLimitActionQueryParameterValueMatch(key string, func BuildRateLimitActionHeaderValueMatch(key string, value *apimodel.MatchString) *route.RateLimit_Action_HeaderValueMatch { headerValueMatch := &route.RateLimit_Action_HeaderValueMatch{ + DescriptorKey: key, DescriptorValue: value.GetValue().GetValue(), Headers: []*route.HeaderMatcher{}, } @@ -605,10 +618,15 @@ func MakeDefaultFilterChain() *listenerv3.FilterChain { } } -func MakeBoundHCM(trafficDirection corev3.TrafficDirection) *hcm.HttpConnectionManager { +func MakeSidecarBoundHCM(svcKey model.ServiceKey, + trafficDirection corev3.TrafficDirection) *hcm.HttpConnectionManager { + hcmFilters := []*hcm.HttpFilter{ { Name: wellknown.Router, + ConfigType: &hcm.HttpFilter_TypedConfig{ + TypedConfig: MustNewAny(&routerv3.Router{}), + }, }, } if trafficDirection == corev3.TrafficDirection_INBOUND { @@ -618,6 +636,29 @@ func MakeBoundHCM(trafficDirection corev3.TrafficDirection) *hcm.HttpConnectionM ConfigType: &hcm.HttpFilter_TypedConfig{ TypedConfig: MustNewAny(&lrl.LocalRateLimit{ StatPrefix: "http_local_rate_limiter", + Stage: LocalRateLimitStage, + }), + }, + }, + { + Name: "envoy.filters.http.ratelimit", + ConfigType: &hcm.HttpFilter_TypedConfig{ + TypedConfig: MustNewAny(&ratelimitfilter.RateLimit{ + Domain: fmt.Sprintf("%s.%s", svcKey.Name, svcKey.Namespace), + Stage: DistributedRateLimitStage, + RequestType: "external", + Timeout: durationpb.New(2 * time.Second), + RateLimitService: &ratelimitconfv3.RateLimitServiceConfig{ + GrpcService: &corev3.GrpcService{ + TargetSpecifier: &corev3.GrpcService_EnvoyGrpc_{ + EnvoyGrpc: &corev3.GrpcService_EnvoyGrpc{ + ClusterName: "polaris_ratelimit", + }, + }, + Timeout: durationpb.New(time.Second), + }, + TransportApiVersion: core.ApiVersion_V3, + }, }), }, }, @@ -648,6 +689,9 @@ func MakeGatewayBoundHCM() *hcm.HttpConnectionManager { }, { Name: wellknown.Router, + ConfigType: &hcm.HttpFilter_TypedConfig{ + TypedConfig: MustNewAny(&routerv3.Router{}), + }, }, } trafficDirectionName := corev3.TrafficDirection_name[int32(corev3.TrafficDirection_INBOUND)] @@ -709,21 +753,22 @@ func MakeGatewayLocalRateLimit(rateLimitCache cache.RateLimitCache, pathSpecifie if rule == nil { continue } - // 跳过全局限流配置 - // TODO 暂时不放开全局限流规则下发,后续等待 envoy polaris filter 插件开发或者在 polaris-sidecar 中实现 RLS 协议后 - // 在放开该设置 - if rule.GetType() == apitraffic.Rule_GLOBAL || rule.GetDisable().GetValue() { + if rule.GetDisable().GetValue() { continue } if rule.GetMethod().GetValue().GetValue() != pathSpecifier { continue } - actions, descriptors := BuildLocalRateLimitDescriptors(rule) + actions, descriptors := BuildRateLimitDescriptors(rule) rateLimitConf.Descriptors = descriptors - ratelimits = append(ratelimits, &route.RateLimit{ - Actions: actions, - }) - break + ratelimitRule := &route.RateLimit{Actions: actions} + switch rule.GetType() { + case apitraffic.Rule_LOCAL: + ratelimitRule.Stage = wrapperspb.UInt32(LocalRateLimitStage) + case apitraffic.Rule_GLOBAL: + ratelimitRule.Stage = wrapperspb.UInt32(DistributedRateLimitStage) + } + ratelimits = append(ratelimits, ratelimitRule) } if len(ratelimits) == 0 { return nil, nil, nil @@ -747,17 +792,19 @@ func MakeSidecarLocalRateLimit(rateLimitCache cache.RateLimitCache, if rule == nil { continue } - // 跳过全局限流配置 - // TODO 暂时不放开全局限流规则下发,后续等待 envoy polaris filter 插件开发或者在 polaris-sidecar 中实现 RLS 协议后 - // 在放开该设置 - if rule.GetType() == apitraffic.Rule_GLOBAL || rule.GetDisable().GetValue() { + if rule.GetDisable().GetValue() { continue } - actions, descriptors := BuildLocalRateLimitDescriptors(rule) + actions, descriptors := BuildRateLimitDescriptors(rule) rateLimitConf.Descriptors = descriptors - ratelimits = append(ratelimits, &route.RateLimit{ - Actions: actions, - }) + ratelimitRule := &route.RateLimit{Actions: actions} + switch rule.GetType() { + case apitraffic.Rule_LOCAL: + ratelimitRule.Stage = wrapperspb.UInt32(LocalRateLimitStage) + case apitraffic.Rule_GLOBAL: + ratelimitRule.Stage = wrapperspb.UInt32(DistributedRateLimitStage) + } + ratelimits = append(ratelimits, ratelimitRule) } filters["envoy.filters.http.local_ratelimit"] = MustNewAny(rateLimitConf) return ratelimits, filters, nil diff --git a/apiserver/xdsserverv3/resource/model.go b/apiserver/xdsserverv3/resource/model.go index a4f637ed0..5ff2f2b5e 100644 --- a/apiserver/xdsserverv3/resource/model.go +++ b/apiserver/xdsserverv3/resource/model.go @@ -53,25 +53,25 @@ const ( TLSModePermissive TLSMode = "permissive" ) -var ( +const ( // 这个是特殊指定的 prefix MatchString_Prefix = apimodel.MatchString_MatchStringType(-1) ) // ServiceInfo 北极星服务结构体 type ServiceInfo struct { - ID string - Name string - Namespace string - ServiceKey model.ServiceKey - AliasFor *model.Service - Instances []*apiservice.Instance - SvcInsRevision string - Routing *traffic_manage.Routing - SvcRoutingRevision string - Ports []*model.ServicePort - RateLimit *traffic_manage.RateLimit - SvcRateLimitRevision string + ID string + Name string + Namespace string + ServiceKey model.ServiceKey + AliasFor *model.Service + Instances []*apiservice.Instance + SvcInsRevision string + Routing *traffic_manage.Routing + SvcRoutingRevision string + Ports []*model.ServicePort + RateLimit *traffic_manage.RateLimit + SvcRateLimitRevision string CircuitBreaker *fault_tolerance.CircuitBreaker CircuitBreakerRevision string FaultDetect *fault_tolerance.FaultDetector diff --git a/apiserver/xdsserverv3/resource/node.go b/apiserver/xdsserverv3/resource/node.go index d3952ac2c..f721c134c 100644 --- a/apiserver/xdsserverv3/resource/node.go +++ b/apiserver/xdsserverv3/resource/node.go @@ -159,8 +159,9 @@ func (x *XDSNodeManager) ListSidecarNodes() []*XDSClient { // ID id 的格式是 ${sidecar|gateway}~namespace/uuid~hostIp // case 1: envoy 为 sidecar 模式时,则 NodeID 的格式为以下两种 // -// eg 1. namespace/uuid~hostIp -// eg 2. sidecar~namespace/uuid-hostIp +// eg 1. namespace/uuid~hostIp +// eg 2. sidecar~namespace/uuid-hostIp +// eg 3. envoy_node_id="${NAMESPACE}/${INSTANCE_IP}~${POD_NAME}" // // case 2: envoy 为 gateway 模式时,则 NodeID 的格式为: gateway~namespace/uuid~hostIp func (PolarisNodeHash) ID(node *core.Node) string { @@ -169,11 +170,24 @@ func (PolarisNodeHash) ID(node *core.Node) string { } runType, ns, _, _ := ParseNodeID(node.Id) + if node.Metadata == nil || len(node.Metadata.Fields) == 0 { + return ns + } // Gateway 类型直接按照 gateway_service 以及 gateway_namespace 纬度 if runType != string(RunTypeSidecar) { gatewayNamespace := node.Metadata.Fields[GatewayNamespaceName].GetStringValue() gatewayService := node.Metadata.Fields[GatewayServiceName].GetStringValue() + // 兼容老的 envoy gateway metadata 参数设置 + if gatewayNamespace == "" { + gatewayNamespace = node.Metadata.Fields[OldGatewayNamespaceName].GetStringValue() + } + if gatewayNamespace == "" { + gatewayService = node.Metadata.Fields[OldGatewayServiceName].GetStringValue() + } + if gatewayNamespace == "" { + gatewayNamespace = ns + } return runType + "/" + gatewayNamespace + "/" + gatewayService } // 兼容老版本注入的 envoy, 默认获取 snapshot resource 粒度为 namespace 级别, 只能下发 OUTBOUND 规则 @@ -185,7 +199,7 @@ func (PolarisNodeHash) ID(node *core.Node) string { sidecarService := node.Metadata.Fields[SidecarServiceName].GetStringValue() // 如果存在, 则表示是由新版本 controller 注入的 envoy, 可以下发 INBOUND 规则 if sidecarNamespace != "" && sidecarService != "" { - ret = sidecarNamespace + "/" + sidecarService + ret = runType + "/" + sidecarNamespace + "/" + sidecarService } // 在判断是否设置了 TLS 相关参数 @@ -232,6 +246,7 @@ type XDSClient struct { User string Namespace string IPAddr string + PodIP string Metadata map[string]string Version string Node *core.Node @@ -253,6 +268,7 @@ func (n *XDSClient) IsGateway() bool { return n.RunType == RunTypeGateway && (hasNew || hasOld) } +// GetSelfService 获取 envoy 对应的 service 信息 func (n *XDSClient) GetSelfService() string { if n.IsGateway() { val, ok := n.Metadata[GatewayServiceName] @@ -264,21 +280,31 @@ func (n *XDSClient) GetSelfService() string { return n.Metadata[SidecarServiceName] } +// GetSelfNamespace 获取 envoy 对应的 namespace 信息 func (n *XDSClient) GetSelfNamespace() string { if n.IsGateway() { val, ok := n.Metadata[GatewayNamespaceName] if ok { return val } - return n.Metadata[OldGatewayNamespaceName] + val, ok = n.Metadata[OldGatewayNamespaceName] + if ok { + return val + } + return n.Namespace + } + val, ok := n.Metadata[SidecarNamespaceName] + if ok { + return val } - return n.Metadata[SidecarNamespaceName] + return n.Namespace } func parseNodeProxy(node *core.Node) *XDSClient { runType, polarisNamespace, _, hostIP := ParseNodeID(node.Id) proxy := &XDSClient{ IPAddr: hostIP, + PodIP: hostIP, RunType: RunType(runType), Namespace: polarisNamespace, Node: node, diff --git a/apiserver/xdsserverv3/v1/cluster.go b/apiserver/xdsserverv3/v1/cluster.go index 245906bf3..ab5c4aa96 100644 --- a/apiserver/xdsserverv3/v1/cluster.go +++ b/apiserver/xdsserverv3/v1/cluster.go @@ -47,9 +47,9 @@ func (x *XDSServer) makeCluster(service *resource.ServiceInfo) *cluster.Cluster }, }, - LbSubsetConfig: resource.MakeLbSubsetConfig(service), + LbSubsetConfig: resource.MakeLbSubsetConfig(service), OutlierDetection: resource.MakeOutlierDetection(service), - HealthChecks: resource.MakeHealthCheck(service), + HealthChecks: resource.MakeHealthCheck(service), } } @@ -78,13 +78,14 @@ func (x *XDSServer) makePermissiveClusters(services map[model.ServiceKey]*resour c := x.makeCluster(service) // In permissive mode, we should use `TLSTransportSocket` to connect to mtls enabled endpoints. // Or we use rawbuffer transport for those endpoints which not enabled mtls. + sni := fmt.Sprintf("outbound_.default_.%s.%s.svc.cluster.local", service.Name, service.Namespace) c.TransportSocketMatches = []*cluster.Cluster_TransportSocketMatch{ { Name: "tls-mode", Match: mtlsTransportSocketMatch, TransportSocket: makeTLSTransportSocket(&tlstrans.UpstreamTlsContext{ CommonTlsContext: outboundCommonTLSContext, - Sni: fmt.Sprintf("outbound_.default_.%s.%s.svc.cluster.local", service.Name, service.Namespace), + Sni: sni, }), }, rawBufferTransportSocketMatch, @@ -107,18 +108,18 @@ func (x *XDSServer) makeStrictClusters(services map[model.ServiceKey]*resource.S c := x.makeCluster(service) // In strict mode, we should only use `TLSTransportSocket` to connect to mtls enabled endpoints. + sni := fmt.Sprintf("outbound_.default_.%s.%s.svc.cluster.local", service.Name, service.Namespace) c.TransportSocketMatches = []*cluster.Cluster_TransportSocketMatch{ { Name: "tls-mode", TransportSocket: makeTLSTransportSocket(&tlstrans.UpstreamTlsContext{ CommonTlsContext: outboundCommonTLSContext, - Sni: fmt.Sprintf("outbound_.default_.%s.%s.svc.cluster.local", service.Name, service.Namespace), + Sni: sni, }), }, } clusters = append(clusters, c) } - return clusters } diff --git a/apiserver/xdsserverv3/v1/listener.go b/apiserver/xdsserverv3/v1/listener.go index 897c8434c..668dd125c 100644 --- a/apiserver/xdsserverv3/v1/listener.go +++ b/apiserver/xdsserverv3/v1/listener.go @@ -20,22 +20,26 @@ package v1 import ( core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" listener "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" + routerv3 "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/router/v3" + original_dstv3 "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/listener/original_dst/v3" hcm "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3" tcp "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/tcp_proxy/v3" "github.com/envoyproxy/go-control-plane/pkg/cache/types" - "github.com/envoyproxy/go-control-plane/pkg/resource/v3" + resourcev3 "github.com/envoyproxy/go-control-plane/pkg/resource/v3" "github.com/envoyproxy/go-control-plane/pkg/wellknown" "github.com/golang/protobuf/ptypes" + + "github.com/polarismesh/polaris/apiserver/xdsserverv3/resource" ) -func makeListeners() []types.Resource { +func makeListeners() ([]types.Resource, error) { manager := &hcm.HttpConnectionManager{ CodecType: hcm.HttpConnectionManager_AUTO, StatPrefix: "http", RouteSpecifier: &hcm.HttpConnectionManager_Rds{ Rds: &hcm.Rds{ ConfigSource: &core.ConfigSource{ - ResourceApiVersion: resource.DefaultAPIVersion, + ResourceApiVersion: resourcev3.DefaultAPIVersion, ConfigSourceSpecifier: &core.ConfigSource_Ads{ Ads: &core.AggregatedConfigSource{}, }, @@ -47,11 +51,14 @@ func makeListeners() []types.Resource { } manager.HttpFilters = append(manager.HttpFilters, &hcm.HttpFilter{ Name: wellknown.Router, + ConfigType: &hcm.HttpFilter_TypedConfig{ + TypedConfig: resource.MustNewAny(&routerv3.Router{}), + }, }) pbst, err := ptypes.MarshalAny(manager) if err != nil { - panic(err) + return nil, err } tcpConfig := &tcp.TcpProxy{ @@ -63,7 +70,7 @@ func makeListeners() []types.Resource { tcpC, err := ptypes.MarshalAny(tcpConfig) if err != nil { - panic(err) + return nil, err } return []types.Resource{ @@ -107,20 +114,29 @@ func makeListeners() []types.Resource { { // type.googleapis.com/envoy.extensions.filters.listener.original_dst.v3.OriginalDst Name: wellknown.OriginalDestination, + ConfigType: &listener.ListenerFilter_TypedConfig{ + TypedConfig: resource.MustNewAny(&original_dstv3.OriginalDst{}), + }, }, }, }, - } + }, nil } -func makePermissiveListeners() []types.Resource { - resources := makeListeners() +func makePermissiveListeners() ([]types.Resource, error) { + resources, err := makeListeners() + if err != nil { + return nil, err + } resources = append(resources, inboundListener()) - return resources + return resources, nil } -func makeStrictListeners() []types.Resource { - resources := makeListeners() +func makeStrictListeners() ([]types.Resource, error) { + resources, err := makeListeners() + if err != nil { + return nil, err + } resources = append(resources, inboundStrictListener()) - return resources + return resources, nil } diff --git a/apiserver/xdsserverv3/v1/mtls.go b/apiserver/xdsserverv3/v1/mtls.go index 639ed7952..9492176a3 100644 --- a/apiserver/xdsserverv3/v1/mtls.go +++ b/apiserver/xdsserverv3/v1/mtls.go @@ -27,6 +27,7 @@ import ( route "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" filev3 "github.com/envoyproxy/go-control-plane/envoy/extensions/access_loggers/file/v3" lrl "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/local_ratelimit/v3" + routerv3 "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/router/v3" httpinspector "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/listener/http_inspector/v3" tlsinspector "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/listener/tls_inspector/v3" hcm "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3" @@ -42,6 +43,8 @@ import ( "google.golang.org/protobuf/types/known/anypb" "google.golang.org/protobuf/types/known/durationpb" "google.golang.org/protobuf/types/known/structpb" + + "github.com/polarismesh/polaris/apiserver/xdsserverv3/resource" ) func mustNewAny(src proto.Message) *anypb.Any { @@ -224,6 +227,9 @@ func inboundHCM() *hcm.HttpConnectionManager { } filters = append(filters, &hcm.HttpFilter{ Name: wellknown.Router, + ConfigType: &hcm.HttpFilter_TypedConfig{ + TypedConfig: resource.MustNewAny(&routerv3.Router{}), + }, }) return &hcm.HttpConnectionManager{ StatPrefix: "Inbound", diff --git a/apiserver/xdsserverv3/v1/sidecar.go b/apiserver/xdsserverv3/v1/sidecar.go index 76ca9f0ff..2c2495598 100644 --- a/apiserver/xdsserverv3/v1/sidecar.go +++ b/apiserver/xdsserverv3/v1/sidecar.go @@ -37,7 +37,7 @@ import ( ) func (x *XDSServer) makeSnapshot(ns, version string, tlsMode resource.TLSMode, - services map[model.ServiceKey]*resource.ServiceInfo) (err error) { + services map[model.ServiceKey]*resource.ServiceInfo) error { resources := make(map[resourcev3.Type][]types.Resource) resources[resourcev3.EndpointType] = makeEndpoints(services) @@ -45,19 +45,25 @@ func (x *XDSServer) makeSnapshot(ns, version string, tlsMode resource.TLSMode, cacheKey := ns + var err error switch tlsMode { case resource.TLSModeNone: resources[resourcev3.ClusterType] = x.makeClusters(services) - resources[resourcev3.ListenerType] = makeListeners() + resources[resourcev3.ListenerType], err = makeListeners() case resource.TLSModePermissive: resources[resourcev3.ClusterType] = x.makePermissiveClusters(services) - resources[resourcev3.ListenerType] = makePermissiveListeners() + resources[resourcev3.ListenerType], err = makePermissiveListeners() cacheKey = ns + "/permissive" case resource.TLSModeStrict: resources[resourcev3.ClusterType] = x.makeStrictClusters(services) - resources[resourcev3.ListenerType] = makeStrictListeners() + resources[resourcev3.ListenerType], err = makeStrictListeners() cacheKey = ns + "/strict" } + if err != nil { + log.Error("[XDS][Sidecar][V1] fail to create resource", zap.String("namespace", ns), + zap.String("tls", string(tlsMode)), zap.Error(err)) + return err + } snapshot, err := cachev3.NewSnapshot(version, resources) if err != nil { @@ -66,6 +72,8 @@ func (x *XDSServer) makeSnapshot(ns, version string, tlsMode resource.TLSMode, return err } if err = snapshot.Consistent(); err != nil { + log.Error("[XDS][Sidecar][V1] check snapshot consistent", zap.String("namespace", ns), + zap.String("tls", string(tlsMode)), zap.Error(err)) return err } log.Info("[XDS][Sidecar][V1] upsert snapshot success", zap.String("namespace", ns), @@ -76,7 +84,7 @@ func (x *XDSServer) makeSnapshot(ns, version string, tlsMode resource.TLSMode, zap.String("tls", string(tlsMode)), zap.Error(err)) return err } - return + return err } func (x *XDSServer) makeSidecarVirtualHosts(services map[model.ServiceKey]*resource.ServiceInfo) []types.Resource { diff --git a/apiserver/xdsserverv3/v2/cds.go b/apiserver/xdsserverv3/v2/cds.go index 24cc79878..e07190be5 100644 --- a/apiserver/xdsserverv3/v2/cds.go +++ b/apiserver/xdsserverv3/v2/cds.go @@ -164,8 +164,8 @@ func (cds *CDSBuilder) makeCluster(svcInfo *resource.ServiceInfo, }, }, }, - LbSubsetConfig: resource.MakeLbSubsetConfig(svcInfo), + LbSubsetConfig: resource.MakeLbSubsetConfig(svcInfo), OutlierDetection: resource.MakeOutlierDetection(svcInfo), - HealthChecks: resource.MakeHealthCheck(svcInfo), + HealthChecks: resource.MakeHealthCheck(svcInfo), } } diff --git a/apiserver/xdsserverv3/v2/lds.go b/apiserver/xdsserverv3/v2/lds.go index 96237adfc..0f6beef54 100644 --- a/apiserver/xdsserverv3/v2/lds.go +++ b/apiserver/xdsserverv3/v2/lds.go @@ -24,6 +24,7 @@ import ( corev3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" listenerv3 "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" httpinspector "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/listener/http_inspector/v3" + original_dstv3 "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/listener/original_dst/v3" tlsinspector "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/listener/tls_inspector/v3" hcm "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3" tlstrans "github.com/envoyproxy/go-control-plane/envoy/extensions/transport_sockets/tls/v3" @@ -32,6 +33,7 @@ import ( "github.com/golang/protobuf/ptypes/wrappers" "github.com/polarismesh/polaris/apiserver/xdsserverv3/resource" + "github.com/polarismesh/polaris/common/model" "github.com/polarismesh/polaris/service" ) @@ -55,6 +57,9 @@ var ( { // type.googleapis.com/envoy.extensions.filters.listener.original_dst.v3.OriginalDst Name: wellknown.OriginalDestination, + ConfigType: &listenerv3.ListenerFilter_TypedConfig{ + TypedConfig: resource.MustNewAny(&original_dstv3.OriginalDst{}), + }, }, } @@ -105,9 +110,15 @@ func (lds *LDSBuilder) Generate(option *resource.BuildOption) (interface{}, erro func (lds *LDSBuilder) makeListener(option *resource.BuildOption, direction corev3.TrafficDirection) ([]types.Resource, error) { - boundHCM := resource.MakeBoundHCM(direction) + var boundHCM *hcm.HttpConnectionManager if lds.client.IsGateway() { boundHCM = resource.MakeGatewayBoundHCM() + } else { + selfService := model.ServiceKey{ + Namespace: lds.client.GetSelfNamespace(), + Name: lds.client.GetSelfService(), + } + boundHCM = resource.MakeSidecarBoundHCM(selfService, direction) } listener := makeDefaultListener(direction, boundHCM) diff --git a/apiserver/xdsserverv3/v2/rds.go b/apiserver/xdsserverv3/v2/rds.go index c05eca22c..a2823d72f 100644 --- a/apiserver/xdsserverv3/v2/rds.go +++ b/apiserver/xdsserverv3/v2/rds.go @@ -248,8 +248,8 @@ func (rds *RDSBuilder) makeGatewayRoutes(option *resource.BuildOption, xdsNode *resource.XDSClient) ([]*route.Route, error) { routes := make([]*route.Route, 0, 16) - callerService := xdsNode.Metadata[resource.GatewayServiceName] - callerNamespace := xdsNode.Metadata[resource.GatewayNamespaceName] + callerService := xdsNode.GetSelfService() + callerNamespace := xdsNode.GetSelfNamespace() selfService := model.ServiceKey{ Namespace: callerNamespace, Name: callerService, diff --git a/plugin/healthchecker.go b/plugin/healthchecker.go index 600bae375..696c03a69 100644 --- a/plugin/healthchecker.go +++ b/plugin/healthchecker.go @@ -90,10 +90,6 @@ type HealthChecker interface { Check(request *CheckRequest) (*CheckResponse, error) // Query queries the heartbeat time Query(ctx context.Context, request *QueryRequest) (*QueryResponse, error) - // AddToCheck add the instances to check procedure - AddToCheck(request *AddCheckRequest) error - // RemoveFromCheck removes the instances from check procedure - RemoveFromCheck(request *AddCheckRequest) error // Suspend health checker for entire expired duration manually Suspend() // SuspendTimeSec get the suspend time in seconds diff --git a/plugin/healthchecker/leader/checker_leader.go b/plugin/healthchecker/leader/checker_leader.go index 7b83628be..aaf86102b 100644 --- a/plugin/healthchecker/leader/checker_leader.go +++ b/plugin/healthchecker/leader/checker_leader.go @@ -364,18 +364,6 @@ func (c *LeaderHealthChecker) Query(ctx context.Context, request *plugin.QueryRe }, nil } -// AddToCheck add the instances to check procedure -// NOTE: not support in LeaderHealthChecker -func (c *LeaderHealthChecker) AddToCheck(request *plugin.AddCheckRequest) error { - return nil -} - -// RemoveFromCheck removes the instances from check procedure -// NOTE: not support in LeaderHealthChecker -func (c *LeaderHealthChecker) RemoveFromCheck(request *plugin.AddCheckRequest) error { - return nil -} - // Delete delete record by key func (c *LeaderHealthChecker) Delete(ctx context.Context, key string) error { if isSendFromPeer(ctx) { diff --git a/plugin/healthchecker/memory/checker_memory.go b/plugin/healthchecker/memory/checker_memory.go index 9808756fe..fa148cc0b 100644 --- a/plugin/healthchecker/memory/checker_memory.go +++ b/plugin/healthchecker/memory/checker_memory.go @@ -153,16 +153,6 @@ func (r *MemoryHealthChecker) Check(request *plugin.CheckRequest) (*plugin.Check return checkResp, nil } -// AddToCheck add the instances to check procedure -func (r *MemoryHealthChecker) AddToCheck(request *plugin.AddCheckRequest) error { - return nil -} - -// RemoveFromCheck removes the instances from check procedure -func (r *MemoryHealthChecker) RemoveFromCheck(request *plugin.AddCheckRequest) error { - return nil -} - // Delete delete the id func (r *MemoryHealthChecker) Delete(ctx context.Context, id string) error { r.hbRecords.Delete(id) diff --git a/plugin/healthchecker/redis/checker_redis.go b/plugin/healthchecker/redis/checker_redis.go index 7eebd9f17..f63ccd894 100644 --- a/plugin/healthchecker/redis/checker_redis.go +++ b/plugin/healthchecker/redis/checker_redis.go @@ -292,24 +292,6 @@ func (r *RedisHealthChecker) Check(request *plugin.CheckRequest) (*plugin.CheckR return checkResp, nil } -// AddToCheck add the instances to check procedure -func (r *RedisHealthChecker) AddToCheck(request *plugin.AddCheckRequest) error { - if len(request.Instances) == 0 { - return nil - } - resp := r.checkPool.Sdd(request.LocalHost, request.Instances) - return resp.Err -} - -// RemoveFromCheck AddToCheck add the instances to check procedure -func (r *RedisHealthChecker) RemoveFromCheck(request *plugin.AddCheckRequest) error { - if len(request.Instances) == 0 { - return nil - } - resp := r.checkPool.Srem(request.LocalHost, request.Instances) - return resp.Err -} - // Delete delete the target id func (r *RedisHealthChecker) Delete(ctx context.Context, id string) error { resp := r.checkPool.Del(id) diff --git a/release/cluster/helm/Chart.yaml b/release/cluster/helm/Chart.yaml index e6a6bbe40..73cb9f15f 100644 --- a/release/cluster/helm/Chart.yaml +++ b/release/cluster/helm/Chart.yaml @@ -2,6 +2,6 @@ apiVersion: v1 appVersion: "1.0" description: A Helm chart for polaris name: polaris -version: 0.0.2 +version: v1.17.3 maintainers: - - name: trezhang \ No newline at end of file + - name: trezhang diff --git a/release/cluster/helm/templates/config-polaris-console.yaml b/release/cluster/helm/templates/config-polaris-console.yaml index ecaa8e269..170c43c2b 100644 --- a/release/cluster/helm/templates/config-polaris-console.yaml +++ b/release/cluster/helm/templates/config-polaris-console.yaml @@ -29,10 +29,3 @@ data: polarisToken: "nu/0WRA4EqSR1FagrjRj0fZwPXuGlMpX+zCuWu4uMqy8xr1vRjisSbA25aAC3mtU8MeeRsKhQiDAynUR09I=" monitorServer: address: "polaris-prometheus:9090" - # eventServer: - # requestUrl: - # timeout: 5s - # operationServer: - # requestUrl: - # timeout: 5s - diff --git a/release/cluster/helm/templates/config-polaris-server.yaml b/release/cluster/helm/templates/config-polaris-server.yaml index f7b6abe3c..df41270d9 100644 --- a/release/cluster/helm/templates/config-polaris-server.yaml +++ b/release/cluster/helm/templates/config-polaris-server.yaml @@ -316,6 +316,7 @@ data: concurrency: 64 checkers: {{- if eq .Values.global.mode "cluster" }} + {{- if eq .Values.polaris.healthChecker "heartbeatRedis" }} - name: heartbeatRedis option: kvAddr: {{ .Values.polaris.storage.redis.address }} @@ -332,6 +333,9 @@ data: msgTimeout: 200ms concurrency: 200 withTLS: {{ .Values.polaris.storage.redis.withTLS | default false }} + {{- else }} + - name: heartbeatLeader + {{- end}} {{- else }} - name: heartbeatMemory {{- end }} diff --git a/release/cluster/helm/values.yaml b/release/cluster/helm/values.yaml index be5545573..5eba698f3 100644 --- a/release/cluster/helm/values.yaml +++ b/release/cluster/helm/values.yaml @@ -30,6 +30,9 @@ polaris: limit: cpu: "500m" memory: "1000Mi" + healthChecker: + type: heartbeatRedis + type: heartbeatLeader replicaCount: 1 limiterReplicaCount: 1 auth: diff --git a/release/cluster/kubernetes/02-polaris-server-config.yaml b/release/cluster/kubernetes/02-polaris-server-config.yaml index 4ae111002..bf6fa8586 100644 --- a/release/cluster/kubernetes/02-polaris-server-config.yaml +++ b/release/cluster/kubernetes/02-polaris-server-config.yaml @@ -435,7 +435,7 @@ data: option: remote-conf: false # 是否使用远程配置 ip-limit: # ip级限流,全局 - open: true # 系统是否开启ip级限流 + open: false # 系统是否开启ip级限流 global: open: false bucket: 300 # 最高峰值 diff --git a/release/conf/polaris-server.yaml b/release/conf/polaris-server.yaml index d092013d2..821bd6b61 100644 --- a/release/conf/polaris-server.yaml +++ b/release/conf/polaris-server.yaml @@ -18,6 +18,7 @@ bootstrap: # Global log logger: # Log scope name + # Configuration center related logs config: # Log file location rotateOutputPath: log/runtime/polaris-config.log @@ -25,99 +26,121 @@ bootstrap: errorRotateOutputPath: log/runtime/polaris-config-error.log # The maximum size of a single log file, 100 default, the unit is MB rotationMaxSize: 100 - # How many log files are saved, default 10 - rotationMaxBackups: 10 + # How many log files are saved, default 30 + rotationMaxBackups: 30 # The maximum preservation days of a single log file, default 7 rotationMaxAge: 7 # Log output level,debug/info/warn/error outputLevel: info # Open the log file compression - # compress: false + compress: true + # onlyContent just print log content, not print log timestamp + # onlyContent: false + # Resource Auth, User Management Log auth: rotateOutputPath: log/runtime/polaris-auth.log errorRotateOutputPath: log/runtime/polaris-auth-error.log rotationMaxSize: 100 - rotationMaxBackups: 10 + rotationMaxBackups: 30 rotationMaxAge: 7 outputLevel: info + compress: true + # Storage layer log store: rotateOutputPath: log/runtime/polaris-store.log errorRotateOutputPath: log/runtime/polaris-store-error.log rotationMaxSize: 100 - rotationMaxBackups: 10 + rotationMaxBackups: 30 rotationMaxAge: 7 outputLevel: info + compress: true + # Server cache log log cache: rotateOutputPath: log/runtime/polaris-cache.log errorRotateOutputPath: log/runtime/polaris-cache-error.log rotationMaxSize: 100 - rotationMaxBackups: 10 + rotationMaxBackups: 30 rotationMaxAge: 7 outputLevel: info + compress: true + # Service discovery and governance rules related logs naming: rotateOutputPath: log/runtime/polaris-naming.log errorRotateOutputPath: log/runtime/polaris-naming-error.log rotationMaxSize: 100 - rotationMaxBackups: 10 + rotationMaxBackups: 30 rotationMaxAge: 7 outputLevel: info + compress: true + # Service discovery institutional health check log healthcheck: rotateOutputPath: log/runtime/polaris-healthcheck.log errorRotateOutputPath: log/runtime/polaris-healthcheck-error.log rotationMaxSize: 100 - rotationMaxBackups: 10 + rotationMaxBackups: 30 rotationMaxAge: 7 outputLevel: info + compress: true + # XDS protocol layer plug -in log xdsv3: rotateOutputPath: log/runtime/polaris-xdsv3.log errorRotateOutputPath: log/runtime/polaris-xdsv3-error.log rotationMaxSize: 100 - rotationMaxBackups: 10 + rotationMaxBackups: 30 rotationMaxAge: 7 outputLevel: info - apiserver: - rotateOutputPath: log/runtime/polaris-apiserver.log - errorRotateOutputPath: log/runtime/polaris-apiserver-error.log + compress: true + # Eureka protocol layer plug -in log + eureka: + rotateOutputPath: log/runtime/polaris-eureka.log + errorRotateOutputPath: log/runtime/polaris-eureka-error.log rotationMaxSize: 100 - rotationMaxBackups: 10 + rotationMaxBackups: 30 rotationMaxAge: 7 outputLevel: info - token-bucket: - rotateOutputPath: log/runtime/polaris-ratelimit.log - errorRotateOutputPath: log/runtime/polaris-ratelimit-error.log + compress: true + # APISERVER common log, record inbound request and outbound response + apiserver: + rotateOutputPath: log/runtime/polaris-apiserver.log + errorRotateOutputPath: log/runtime/polaris-apiserver-error.log rotationMaxSize: 100 - rotationMaxBackups: 10 + rotationMaxBackups: 30 rotationMaxAge: 7 outputLevel: info + compress: true default: rotateOutputPath: log/runtime/polaris-default.log errorRotateOutputPath: log/runtime/polaris-default-error.log rotationMaxSize: 100 - rotationMaxBackups: 10 + rotationMaxBackups: 30 rotationMaxAge: 7 outputLevel: info - discoverEventLocal: - rotateOutputPath: log/event/polaris-discoverevent.log - errorRotateOutputPath: log/event/polaris-discoverevent-error.log + compress: true + # server plugin logs + token-bucket: + rotateOutputPath: log/runtime/polaris-ratelimit.log + errorRotateOutputPath: log/runtime/polaris-ratelimit-error.log rotationMaxSize: 100 - rotationMaxBackups: 10 + rotationMaxBackups: 30 rotationMaxAge: 7 outputLevel: info - onlyContent: true + compress: true discoverLocal: rotateOutputPath: log/statis/polaris-discoverstat.log errorRotateOutputPath: log/statis/polaris-discoverstat-error.log rotationMaxSize: 100 - rotationMaxBackups: 10 + rotationMaxBackups: 30 rotationMaxAge: 7 outputLevel: info + compress: true local: rotateOutputPath: log/statis/polaris-statis.log errorRotateOutputPath: log/statis/polaris-statis-error.log rotationMaxSize: 100 - rotationMaxBackups: 10 + rotationMaxBackups: 30 rotationMaxAge: 7 outputLevel: info + compress: true HistoryLogger: rotateOutputPath: log/operation/polaris-history.log errorRotateOutputPath: log/operation/polaris-history-error.log @@ -127,13 +150,22 @@ bootstrap: rotationMaxDurationForHour: 24 outputLevel: info onlyContent: true + discoverEventLocal: + rotateOutputPath: log/event/polaris-discoverevent.log + errorRotateOutputPath: log/event/polaris-discoverevent-error.log + rotationMaxSize: 100 + rotationMaxBackups: 30 + rotationMaxAge: 7 + outputLevel: info + onlyContent: true cmdb: rotateOutputPath: log/runtime/polaris-cmdb.log errorRotateOutputPath: log/runtime/polaris-cmdb-error.log rotationMaxSize: 100 - rotationMaxBackups: 10 + rotationMaxBackups: 30 rotationMaxAge: 7 outputLevel: info + compress: true # Start the server in order startInOrder: # Start the Polaris-Server in order, mainly to avoid data synchronization logic when the server starts the DB to pull the DB out of high load diff --git a/service/client_v1.go b/service/client_v1.go index 3a39a6c88..aa8ca7e9f 100644 --- a/service/client_v1.go +++ b/service/client_v1.go @@ -162,7 +162,7 @@ func (s *Server) GetServiceWithCache(ctx context.Context, req *apiservice.Servic return resp } - log.Info("[Service][Discover] list servies", zap.Int("size", len(svcs)), zap.String("revision", revision)) + log.Debug("[Service][Discover] list servies", zap.Int("size", len(svcs)), zap.String("revision", revision)) if revision == req.GetRevision().GetValue() { return api.NewDiscoverServiceResponse(apimodel.Code_DataNoChange, req) } @@ -204,7 +204,7 @@ func (s *Server) ServiceInstancesCache(ctx context.Context, req *apiservice.Serv // 数据源都来自Cache,这里拿到的service,已经是源服务 aliasFor := s.getServiceCache(serviceName, namespaceName) if aliasFor == nil { - log.Infof("[Server][Service][Instance] not found name(%s) namespace(%s) service", + log.Debugf("[Server][Service][Instance] not found name(%s) namespace(%s) service", serviceName, namespaceName) return api.NewDiscoverInstanceResponse(apimodel.Code_NotFoundResource, req) } diff --git a/service/healthcheck/check.go b/service/healthcheck/check.go index 8b1aae4f1..10cd60782 100644 --- a/service/healthcheck/check.go +++ b/service/healthcheck/check.go @@ -121,12 +121,15 @@ func newCheckScheduler(ctx context.Context, slotNum int, minCheckInterval time.D adoptInstancesChan: make(chan AdoptEvent, 1024), ctx: ctx, } - go scheduler.doCheckInstances(ctx) - go scheduler.doCheckClient(ctx) - go scheduler.doAdopt(ctx) return scheduler } +func (c *CheckScheduler) run(ctx context.Context) { + go c.doCheckInstances(ctx) + go c.doCheckClient(ctx) + go c.doAdopt(ctx) +} + func (c *CheckScheduler) doCheckInstances(ctx context.Context) { c.timeWheel.Start() log.Infof("[Health Check][Check]timeWheel has been started") @@ -187,26 +190,9 @@ func (c *CheckScheduler) processAdoptEvents( for id := range instances { instanceIds = append(instanceIds, id) } - var err error - if add { - log.Infof("[Health Check][Check]add adopting instances, ids are %v", instanceIds) - err = checker.AddToCheck(&plugin.AddCheckRequest{ - Instances: instanceIds, - LocalHost: server.localHost, - }) - } else { - log.Infof("[Health Check][Check]remove adopting instances, ids are %v", instanceIds) - err = checker.RemoveFromCheck(&plugin.AddCheckRequest{ - Instances: instanceIds, - LocalHost: server.localHost, - }) - } - if err != nil { - log.Errorf("[Health Check][Check]fail to do adopt event, instances %v, localhost %s, add %v", - instanceIds, server.localHost, add) - return instances - } - return make(map[string]bool) + log.Info("[Health Check][Check] adopt event", zap.Any("instances", instanceIds), + zap.String("server", server.localHost), zap.Bool("add", add)) + return instances } func (c *CheckScheduler) addAdopting(instanceId string, checker plugin.HealthChecker) { diff --git a/service/healthcheck/dispatch.go b/service/healthcheck/dispatch.go index b83a2b1e8..99fa57816 100644 --- a/service/healthcheck/dispatch.go +++ b/service/healthcheck/dispatch.go @@ -58,7 +58,6 @@ func newDispatcher(ctx context.Context, svr *Server) *Dispatcher { svr: svr, mutex: &sync.Mutex{}, } - dispatcher.startDispatchingJob(ctx) return dispatcher } diff --git a/service/healthcheck/server.go b/service/healthcheck/server.go index 2074978a1..c96d10a90 100644 --- a/service/healthcheck/server.go +++ b/service/healthcheck/server.go @@ -81,32 +81,31 @@ func Initialize(ctx context.Context, hcOpt *Config, cacheOpen bool, bc *batch.Co func initialize(ctx context.Context, hcOpt *Config, cacheOpen bool, bc *batch.Controller) error { server.hcOpt = hcOpt - if !hcOpt.Open { - return nil - } if !cacheOpen { return fmt.Errorf("[healthcheck]cache not open") } hcOpt.SetDefault() - if len(hcOpt.Checkers) > 0 { - server.checkers = make(map[int32]plugin.HealthChecker, len(hcOpt.Checkers)) - for _, entry := range hcOpt.Checkers { - checker := plugin.GetHealthChecker(entry.Name, &entry) - if checker == nil { - return fmt.Errorf("[healthcheck]unknown healthchecker %s", entry.Name) - } - // The same health type check plugin can only exist in one - _, exist := server.checkers[int32(checker.Type())] - if exist { - return fmt.Errorf("[healthcheck]duplicate healthchecker %s, checkType %d", entry.Name, checker.Type()) - } - server.checkers[int32(checker.Type())] = checker - if nil == server.defaultChecker { - server.defaultChecker = checker + if hcOpt.Open { + if len(hcOpt.Checkers) > 0 { + server.checkers = make(map[int32]plugin.HealthChecker, len(hcOpt.Checkers)) + for _, entry := range hcOpt.Checkers { + checker := plugin.GetHealthChecker(entry.Name, &entry) + if checker == nil { + return fmt.Errorf("[healthcheck]unknown healthchecker %s", entry.Name) + } + // The same health type check plugin can only exist in one + _, exist := server.checkers[int32(checker.Type())] + if exist { + return fmt.Errorf("[healthcheck]duplicate healthchecker %s, checkType %d", entry.Name, checker.Type()) + } + server.checkers[int32(checker.Type())] = checker + if nil == server.defaultChecker { + server.defaultChecker = checker + } } + } else { + return fmt.Errorf("[healthcheck]no checker config") } - } else { - return fmt.Errorf("[healthcheck]no checker config") } var err error if server.storage, err = store.GetStore(); err != nil { @@ -124,25 +123,35 @@ func initialize(ctx context.Context, hcOpt *Config, cacheOpen bool, bc *batch.Co server.checkScheduler = newCheckScheduler(ctx, hcOpt.SlotNum, hcOpt.MinCheckInterval, hcOpt.MaxCheckInterval, hcOpt.ClientCheckInterval, hcOpt.ClientCheckTtl) server.dispatcher = newDispatcher(ctx, server) + return server.run(ctx) +} - server.instanceEventChannel = make(chan *model.InstanceEvent, 1000) - go server.handleInstanceEventWorker(ctx) +func (s *Server) run(ctx context.Context) error { + if !s.isOpen() { + return nil + } + + s.checkScheduler.run(ctx) + go s.timeAdjuster.doTimeAdjust(ctx) + s.dispatcher.startDispatchingJob(ctx) + + s.instanceEventChannel = make(chan *model.InstanceEvent, 1000) + go s.handleInstanceEventWorker(ctx) - leaderChangeEventHandler := newLeaderChangeEventHandler(server.cacheProvider, hcOpt.MinCheckInterval) - if err = eventhub.Subscribe(eventhub.LeaderChangeEventTopic, "selfServiceChecker", + leaderChangeEventHandler := newLeaderChangeEventHandler(s.cacheProvider, s.hcOpt.MinCheckInterval) + if err := eventhub.Subscribe(eventhub.LeaderChangeEventTopic, "selfServiceChecker", leaderChangeEventHandler); err != nil { return err } - instanceEventHandler := newInstanceEventHealthCheckHandler(ctx, server.instanceEventChannel) - if err = eventhub.Subscribe(eventhub.InstanceEventTopic, "instanceHealthChecker", + instanceEventHandler := newInstanceEventHealthCheckHandler(ctx, s.instanceEventChannel) + if err := eventhub.Subscribe(eventhub.InstanceEventTopic, "instanceHealthChecker", instanceEventHandler); err != nil { return err } - if err = server.storage.StartLeaderElection(store.ElectionKeySelfServiceChecker); err != nil { + if err := s.storage.StartLeaderElection(store.ElectionKeySelfServiceChecker); err != nil { return err } - return nil } @@ -199,7 +208,6 @@ func (s *Server) ListCheckerServer() []*model.Instance { s.cacheProvider.selfServiceInstances.Range(func(instanceId string, value ItemWithChecker) { ret = append(ret, value.GetInstance()) }) - return ret } @@ -302,6 +310,10 @@ func (s *Server) Checkers() map[int32]plugin.HealthChecker { return s.checkers } +func (s *Server) isOpen() bool { + return s.hcOpt.Open +} + func currentTimeSec() int64 { return time.Now().Unix() - server.timeAdjuster.GetDiff() } diff --git a/service/healthcheck/test_export.go b/service/healthcheck/test_export.go index 0afffe357..e92aa1735 100644 --- a/service/healthcheck/test_export.go +++ b/service/healthcheck/test_export.go @@ -19,11 +19,9 @@ package healthcheck import ( "context" - "errors" "fmt" "github.com/polarismesh/polaris/common/eventhub" - "github.com/polarismesh/polaris/common/model" "github.com/polarismesh/polaris/plugin" "github.com/polarismesh/polaris/service/batch" "github.com/polarismesh/polaris/store" @@ -35,70 +33,51 @@ func TestInitialize(ctx context.Context, hcOpt *Config, cacheOpen bool, bc *batc testServer := new(Server) testServer.hcOpt = hcOpt - if !hcOpt.Open { - return nil, errors.New("healthcheck not open") - } if !cacheOpen { return nil, fmt.Errorf("[healthcheck]cache not open") } hcOpt.SetDefault() - if len(hcOpt.Checkers) > 0 { - testServer.checkers = make(map[int32]plugin.HealthChecker, len(hcOpt.Checkers)) - for _, entry := range hcOpt.Checkers { - checker := plugin.GetHealthChecker(entry.Name, &entry) - if checker == nil { - return nil, fmt.Errorf("[healthcheck]unknown healthchecker %s", entry.Name) - } - // The same health type check plugin can only exist in one - _, exist := testServer.checkers[int32(checker.Type())] - if exist { - return nil, fmt.Errorf("[healthcheck]duplicate healthchecker %s, checkType %d", - entry.Name, checker.Type()) - } - - testServer.checkers[int32(checker.Type())] = checker - if nil == testServer.defaultChecker { - testServer.defaultChecker = checker + if hcOpt.Open { + if len(hcOpt.Checkers) > 0 { + testServer.checkers = make(map[int32]plugin.HealthChecker, len(hcOpt.Checkers)) + for _, entry := range hcOpt.Checkers { + checker := plugin.GetHealthChecker(entry.Name, &entry) + if checker == nil { + return nil, fmt.Errorf("[healthcheck]unknown healthchecker %s", entry.Name) + } + // The same health type check plugin can only exist in one + _, exist := testServer.checkers[int32(checker.Type())] + if exist { + return nil, fmt.Errorf("[healthcheck]duplicate healthchecker %s, checkType %d", + entry.Name, checker.Type()) + } + testServer.checkers[int32(checker.Type())] = checker + if nil == testServer.defaultChecker { + testServer.defaultChecker = checker + } } + } else { + return nil, fmt.Errorf("[healthcheck]no checker config") } - } else { - return nil, fmt.Errorf("[healthcheck]no checker config") + } + var err error + if testServer.storage, err = store.GetStore(); err != nil { + return nil, err } - testServer.storage = storage testServer.bc = bc testServer.localHost = hcOpt.LocalHost testServer.history = plugin.GetHistory() + testServer.discoverEvent = plugin.GetDiscoverEvent() testServer.cacheProvider = newCacheProvider(hcOpt.Service, testServer) - testServer.timeAdjuster = newTimeAdjuster(ctx, storage) + testServer.timeAdjuster = newTimeAdjuster(ctx, testServer.storage) testServer.checkScheduler = newCheckScheduler(ctx, hcOpt.SlotNum, hcOpt.MinCheckInterval, hcOpt.MaxCheckInterval, hcOpt.ClientCheckInterval, hcOpt.ClientCheckTtl) testServer.dispatcher = newDispatcher(ctx, testServer) - - testServer.instanceEventChannel = make(chan *model.InstanceEvent, 1000) - go testServer.handleInstanceEventWorker(ctx) - - leaderChangeEventHandler := newLeaderChangeEventHandler(testServer.cacheProvider, hcOpt.MinCheckInterval) - if err := eventhub.Subscribe(eventhub.LeaderChangeEventTopic, "selfServiceChecker", - leaderChangeEventHandler); err != nil { - return nil, err - } - - instanceEventHandler := newInstanceEventHealthCheckHandler(ctx, server.instanceEventChannel) - if err := eventhub.Subscribe(eventhub.InstanceEventTopic, "instanceHealthChecker", - instanceEventHandler); err != nil { - return nil, err - } - - if err := testServer.storage.StartLeaderElection(store.ElectionKeySelfServiceChecker); err != nil { - return nil, err - } - finishInit = true - - return testServer, nil + return testServer, testServer.run(ctx) } func TestDestroy() { diff --git a/service/healthcheck/time_adjust.go b/service/healthcheck/time_adjust.go index 09197fc76..f8ce08fc6 100644 --- a/service/healthcheck/time_adjust.go +++ b/service/healthcheck/time_adjust.go @@ -35,7 +35,6 @@ type TimeAdjuster struct { func newTimeAdjuster(ctx context.Context, storage store.Store) *TimeAdjuster { adjuster := &TimeAdjuster{storage: storage} - go adjuster.doTimeAdjust(ctx) return adjuster } diff --git a/test/codecov.sh b/test/codecov.sh new file mode 100644 index 000000000..292f0f7e4 --- /dev/null +++ b/test/codecov.sh @@ -0,0 +1,78 @@ +#!/bin/bash +# Tencent is pleased to support the open source community by making Polaris available. +# +# Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved. +# +# Licensed under the BSD 3-Clause License (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://opensource.org/licenses/BSD-3-Clause +# +# Unless required by applicable law or agreed to in writing, software distributed +# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +# CONDITIONS OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. + +set -ex # Exit on error; debugging enabled. + +function test_standalone() { + export STORE_MODE="" + go mod vendor + go test -timeout 120m ./... -v -covermode=count -coverprofile=coverage_1.cover -coverpkg=github.com/polarismesh/polaris/apiserver,github.com/polarismesh/polaris/apiserver/eurekaserver,github.com/polarismesh/polaris/auth/defaultauth,github.com/polarismesh/polaris/service,github.com/polarismesh/polaris/service/batch,github.com/polarismesh/polaris/service/healthcheck,github.com/polarismesh/polaris/cache,github.com/polarismesh/polaris/store/boltdb,github.com/polarismesh/polaris/store/mysql,github.com/polarismesh/polaris/plugin,github.com/polarismesh/polaris/config,github.com/polarismesh/polaris/plugin/healthchecker/leader,github.com/polarismesh/polaris/plugin/healthchecker/memory,github.com/polarismesh/polaris/plugin/healthchecker/redis,github.com/polarismesh/polaris/common/batchjob,github.com/polarismesh/polaris/common/eventhub,github.com/polarismesh/polaris/common/redispool,github.com/polarismesh/polaris/common/timewheel +} + +function prepare_cluster_env() { + # 测试配置 + echo "cur STORE MODE=${STORE_MODE}, MYSQL_DB_USER=${MYSQL_DB_USER}, MYSQL_DB_PWD=${MYSQL_DB_PWD}" + # 设置严格模式 + mysql -h127.0.0.1 -P3306 -u${MYSQL_DB_USER} -p"${MYSQL_DB_PWD}" -e "set sql_mode='STRICT_TRANS_TABLES,NO_ENGINE_SUBSTITUTION'" + # 清空数据 + mysql -h127.0.0.1 -P3306 -u${MYSQL_DB_USER} -p"${MYSQL_DB_PWD}" -e "DROP DATABASE IF EXISTS polaris_server" + # 初始化 polaris 数据库 + mysql -h127.0.0.1 -P3306 -u${MYSQL_DB_USER} -p"${MYSQL_DB_PWD}"