Skip to content

Commit

Permalink
v1.7 del-addr-handler (#238)
Browse files Browse the repository at this point in the history
  • Loading branch information
brzyangg authored Mar 29, 2021
1 parent 0ae929e commit d72cf0f
Show file tree
Hide file tree
Showing 9 changed files with 115 additions and 18 deletions.
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ github.com/daixiang0/gci v0.2.4/go.mod h1:+AV8KmHTGxxwp/pY84TLQfFKp2vuKXXJVzF3kD
github.com/danieljoos/wincred v1.0.2/go.mod h1:SnuYRW9lp1oJrZX/dXJqr0cPK5gYXqx3EJbmjhLdK9U=
github.com/datadog/zstd v1.4.6-0.20200617134701-89f69fb7df32/go.mod h1:inRp+etsHuvVqMPNTXaFlpf/Tj7wqviBtdJoPVrPEFQ=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/denis-tingajkin/go-header v0.3.1/go.mod h1:sq/2IxMhaZX+RRcgHfCRx/m0M5na0fBt4/CRe7Lrji0=
github.com/denisenkom/go-mssqldb v0.0.0-20190515213511-eb9f6a1743f3/go.mod h1:zAg7JM8CkOJ43xKXIj7eRO9kmWm/TW578qo+oDO6tuM=
Expand Down Expand Up @@ -420,6 +421,7 @@ github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndrE9hABlRI=
github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
Expand Down Expand Up @@ -557,6 +559,7 @@ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UV
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchrcom/testify v1.2.2/go.mod h1:zUrQijuLcfRPyrWG6SBFjct9CuJZz2Ybtack4DGF2Jo=
github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
Expand Down Expand Up @@ -888,6 +891,7 @@ gopkg.in/yaml.v2 v2.2.7/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU=
gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
honnef.co/go/tools v0.0.0-20180728063816-88497007e858/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
Expand Down
7 changes: 7 additions & 0 deletions util/service/client_grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func NewClientGrpcWithRouterType(cb ClientLookup, processor string, capacity int
}
// 目前为写死值,后期改为动态配置获取的方式
pool := NewClientPool(defaultMaxIdle, defaultMaxActive, clientGrpc.newConn, cb.ServKey())
cb.RegisterDeleteAddrHandler(clientGrpc.deleteAddrHandler)
clientGrpc.pool = pool

return clientGrpc
Expand Down Expand Up @@ -313,6 +314,12 @@ func (m *ClientGrpc) newConn(addr string) (rpcClientConn, error) {
}, nil
}

func (m *ClientGrpc) deleteAddrHandler(addrs []string) {
for _, addr := range addrs {
deleteAddrFromConnPool(addr, m.pool)
}
}

func LaneInfoUnaryClientInterceptor() grpc.UnaryClientInterceptor {
return func(ctx context.Context, method string,
req, reply interface{},
Expand Down
26 changes: 26 additions & 0 deletions util/service/client_thrift.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ func NewClientThriftWithRouterType(cb ClientLookup, processor string, fn func(th
// 目前写死值,后期改为动态获取的方式
pool := NewClientPool(defaultMaxIdle, defaultMaxActive, ct.newConn, cb.ServKey())
ct.pool = pool
cb.RegisterDeleteAddrHandler(ct.deleteAddrHandler)
return ct
}

Expand Down Expand Up @@ -192,6 +193,25 @@ func (m *ClientThrift) injectServInfo(ctx context.Context, si *ServInfo) context
return ctx
}

func deleteAddrFromConnPool(addr string, pool *ClientPool) {
fun := "deleteAddrFromConnPool -->"
xlog.Infof(context.Background(), "%s get change addr success", fun)
pool.mu.Lock()
defer pool.mu.Unlock()
value, ok := pool.clientPool.Load(addr)
if !ok {
return
}
clientPool, ok := value.(*ConnectionPool)
if !ok {
xlog.Warnf(context.Background(), "%s value to connection pool false, key: %s", fun, addr)
return
}
pool.clientPool.Delete(addr)
clientPool.Close()
xlog.Infof(context.Background(), "%s close client pool success, addr: %s", fun, addr)
}

//func (m *ClientThrift) logTraffic(ctx context.Context, si *ServInfo) {
// kv := make(map[string]interface{})
// for k, v := range trafficKVFromContext(ctx) {
Expand Down Expand Up @@ -256,3 +276,9 @@ func (m *ClientThrift) newConn(addr string) (rpcClientConn, error) {
serviceClient: m.fnFactory(useTransport, protocolFactory),
}, nil
}

func (m *ClientThrift) deleteAddrHandler(addrs []string) {
for _, addr := range addrs {
deleteAddrFromConnPool(addr, m.pool)
}
}
3 changes: 3 additions & 0 deletions util/service/conn_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ func (cp *ConnectionPool) stat() {
for !cp.closed.Get() {
select {
case <-tickC:
if cp.closed.Get() {
return
}
confActive, confIdle, active, idle := cp.connections.Stat()
xlog.Infof(context.Background(), "caller: %s, callee: %s, callee_addr: %s, conf_active: %d, conf_idle: %d, active: %d, idle: %d", GetServName(), cp.calleeServiceKey, cp.addr, confActive, confIdle, active, idle)
group, service := GetGroupAndService()
Expand Down
2 changes: 1 addition & 1 deletion util/service/power.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (dr *driverBuilder) powerProcessorDriver(ctx context.Context, n string, p P
return nil, errNilDriver
}

xlog.Infof(ctx, "%s processor: %s type: %s addr: %s", fun, reflect.TypeOf(driver), addr)
xlog.Infof(ctx, "%s processor: %s type: %s addr: %s", fun, n, reflect.TypeOf(driver), addr)

switch d := driver.(type) {
case *httprouter.Router:
Expand Down
56 changes: 56 additions & 0 deletions util/service/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type servCopyData struct {
}

type servCopyCollect map[int]*servCopyData
type deleteAddrHandler func([]string)

type ClientEtcdV2 struct {
confEtcd configEtcd
Expand All @@ -53,6 +54,9 @@ type ClientEtcdV2 struct {
muServlist sync.Mutex
servCopy servCopyCollect
servHash map[string]*consistent.Consistent

muChangeEvent sync.Mutex
changeEventHandler []deleteAddrHandler
}

func checkDistVersion(client etcd.KeysAPI, prefloc, servlocation string) string {
Expand Down Expand Up @@ -325,6 +329,7 @@ func (m *ClientEtcdV2) parseResponseV2(r *etcd.Response) {

}

m.compareAndApplyCopyData(servCopy)
m.upServlist(servCopy)
}

Expand Down Expand Up @@ -387,6 +392,37 @@ func (m *ClientEtcdV2) parseResponseV1(r *etcd.Response) {
m.upServlist(servCopy)
}

func (m *ClientEtcdV2) compareAndApplyCopyData(servCopy servCopyCollect) {
deleteAddr := make([]string, 0)
m.muServlist.Lock()
defer m.muServlist.Unlock()
for sid, data := range m.servCopy {
if data == nil || data.reg == nil || data.reg.Servs == nil {
continue
}
for procName, info := range data.reg.Servs {
// sid在新的servCopy中不存在,认为被删掉,加入info的addr
if servCopy[sid] == nil {
deleteAddr = append(deleteAddr, info.Addr)
continue
}
if servCopy[sid].reg == nil {
deleteAddr = append(deleteAddr, info.Addr)
continue
}
if servCopy[sid].reg.Servs == nil {
deleteAddr = append(deleteAddr, info.Addr)
continue
}
// 地址被修改
if servCopy[sid].reg.Servs[procName].Addr != info.Addr {
deleteAddr = append(deleteAddr, info.Addr)
}
}
}
go m.applyDeleteAddr(deleteAddr)
}

func (m *ClientEtcdV2) upServlist(scopy map[int]*servCopyData) {
fun := "ClientEtcdV2.upServlist -->"
ctx := context.Background()
Expand Down Expand Up @@ -592,6 +628,26 @@ func (m *ClientEtcdV2) String() string {
return fmt.Sprintf("service_key: %s, service_path: %s", m.servKey, m.servPath)
}

func (m *ClientEtcdV2) RegisterDeleteAddrHandler(handle deleteAddrHandler) {
m.muChangeEvent.Lock()
defer m.muChangeEvent.Unlock()

m.changeEventHandler = append(m.changeEventHandler, handle)
}

func (m *ClientEtcdV2) applyDeleteAddr(deleteAddr []string) {
for _, handle := range m.getHandlers() {
handle(deleteAddr)
}
}

func (m *ClientEtcdV2) getHandlers() []deleteAddrHandler {
m.muChangeEvent.Lock()
defer m.muChangeEvent.Unlock()

return m.changeEventHandler
}

// 兼容新旧版本的lane metadata
func (s *servCopyData) containsLane(lane string) bool {
if s.reg != nil {
Expand Down
1 change: 1 addition & 0 deletions util/service/registry_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ type ClientLookup interface {
GetAllServAddrWithGroup(group, processor string) []*ServInfo
ServKey() string
ServPath() string
RegisterDeleteAddrHandler(deleteAddrHandler)
}

func NewClientLookup(etcdaddrs []string, baseLoc string, servlocation string) (*ClientEtcdV2, error) {
Expand Down
17 changes: 16 additions & 1 deletion util/service/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,12 @@ package rocserv
import (
"context"
"fmt"
"gitlab.pri.ibanyu.com/middleware/seaweed/xlog"
"testing"
"time"

"github.com/stretchr/testify/assert"
"gitlab.pri.ibanyu.com/middleware/seaweed/xlog"
"gitlab.pri.ibanyu.com/middleware/util/servbase"
)

func TestClient(t *testing.T) {
Expand Down Expand Up @@ -62,3 +65,15 @@ func TestClient(t *testing.T) {
xlog.Info(ctx, "get test_thrift", s)

}

func TestClientEtcdV2_WatchDeleteAddr(t *testing.T) {
cli, err := NewClientLookup(servbase.ETCDS_CLUSTER_0, "roc", "base/changeboard")
assert.NoError(t, err)
stop := make(chan struct{})
cli.RegisterDeleteAddrHandler(func(strings []string) {
fmt.Println(strings)
stop <- struct{}{}
})
<-stop
<-stop
}
17 changes: 1 addition & 16 deletions util/service/service_interface_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func TestIt(t *testing.T) {
//skey = "beauty"
var sb ServBase
var err error
sb, err = NewServBaseV2(configEtcd{etcds, "/roc"}, "niubi/fuck", skey, "", 0)
sb, err = NewServBaseV2(configEtcd{etcds, "/roc"}, "niubi/fuck", skey, "", 0, nil)

if err != nil {
t.Errorf("create err:%s", err)
Expand All @@ -25,21 +25,6 @@ func TestIt(t *testing.T) {

log.Println(sb)

sfid, err := sb.GenSnowFlakeId()
if err != nil {
t.Errorf("snow id err:%s", err)
return
}

log.Println(sfid)

log.Println(sb.GenUuid())
log.Println(sb.GenUuidMd5())
log.Println(sb.GenUuidSha1())

dr := sb.Dbrouter()
log.Println(dr)

type TConf2 struct {
Uname string
Passwd string
Expand Down

0 comments on commit d72cf0f

Please sign in to comment.