Skip to content

Commit

Permalink
Fix the bug that the go client wouldn't update conf when one replica …
Browse files Browse the repository at this point in the history
…coredump.

Add one feature that the client would forward to the priamry when the metalist of client don't contain the primary.
  • Loading branch information
lengyuexuexuan committed Feb 19, 2024
1 parent 9f41be7 commit 41141c1
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 17 deletions.
1 change: 1 addition & 0 deletions go-client/pegasus/table_connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -703,6 +703,7 @@ func (p *pegasusTableConnector) handleReplicaError(err error, replica *session.R

case base.ERR_TIMEOUT:
case context.DeadlineExceeded:
confUpdate = true
case context.Canceled:
// timeout will not trigger a configuration update

Expand Down
8 changes: 7 additions & 1 deletion go-client/pegasus/table_connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,8 +269,14 @@ func TestPegasusTableConnector_TriggerSelfUpdate(t *testing.T) {
assert.True(t, confUpdate)
assert.False(t, retry)

confUpdate, retry, err = ptb.handleReplicaError(context.DeadlineExceeded, nil)
<-ptb.confUpdateCh
assert.Error(t, err)
assert.True(t, confUpdate)
assert.False(t, retry)

{ // Ensure: The following errors should not trigger configuration update
errorTypes := []error{base.ERR_TIMEOUT, context.DeadlineExceeded, base.ERR_CAPACITY_EXCEEDED, base.ERR_NOT_ENOUGH_MEMBER, base.ERR_BUSY, base.ERR_SPLITTING, base.ERR_DISK_INSUFFICIENT}
errorTypes := []error{base.ERR_TIMEOUT, base.ERR_CAPACITY_EXCEEDED, base.ERR_NOT_ENOUGH_MEMBER, base.ERR_BUSY, base.ERR_SPLITTING, base.ERR_DISK_INSUFFICIENT}

for _, err := range errorTypes {
channelEmpty := false
Expand Down
70 changes: 56 additions & 14 deletions go-client/session/meta_call.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@ package session

import (
"context"
"github.com/apache/incubator-pegasus/go-client/idl/base"
"github.com/apache/incubator-pegasus/go-client/idl/replication"
"github.com/apache/incubator-pegasus/go-client/pegalog"
"sync"
"sync/atomic"
"time"

"github.com/apache/incubator-pegasus/go-client/idl/base"
)

type metaCallFunc func(context.Context, *metaSession) (metaResponse, error)
Expand All @@ -42,21 +43,23 @@ type metaCall struct {
backupCh chan interface{}
callFunc metaCallFunc

metas []*metaSession
lead int
metaIPAddrs []string
metas []*metaSession
lead int
// After a Run successfully ends, the current leader will be set in this field.
// If there is no meta failover, `newLead` equals to `lead`.
newLead uint32
}

func newMetaCall(lead int, metas []*metaSession, callFunc metaCallFunc) *metaCall {
func newMetaCall(lead int, metas []*metaSession, callFunc metaCallFunc, meatIPAddr []string) *metaCall {
return &metaCall{
metas: metas,
lead: lead,
newLead: uint32(lead),
respCh: make(chan metaResponse),
callFunc: callFunc,
backupCh: make(chan interface{}),
metas: metas,
metaIPAddrs: meatIPAddr,
lead: lead,
newLead: uint32(lead),
respCh: make(chan metaResponse),
callFunc: callFunc,
backupCh: make(chan interface{}),
}
}

Expand Down Expand Up @@ -106,14 +109,40 @@ func (c *metaCall) Run(ctx context.Context) (metaResponse, error) {
}

// issueSingleMeta returns false if we should try another meta
func (c *metaCall) issueSingleMeta(ctx context.Context, i int) bool {
meta := c.metas[i]
func (c *metaCall) issueSingleMeta(ctx context.Context, curLeader int) bool {
meta := c.metas[curLeader]
resp, err := c.callFunc(ctx, meta)

if err == nil && resp.GetErr().Errno == base.ERR_FORWARD_TO_OTHERS.String() {
forwardAddr := c.getMetaServiceForwardAddress(resp)
if forwardAddr == nil {
return false
}
addr := forwardAddr.GetAddress()
found := false
for i := range c.metaIPAddrs {
if addr == c.metaIPAddrs[i] {
found = true
break
}
}
if !found {
c.metaIPAddrs = append(c.metaIPAddrs, addr)
c.metas = append(c.metas, &metaSession{
NodeSession: newNodeSession(addr, NodeTypeMeta),
logger: pegalog.GetLogger(),
})
curLeader = len(c.metas) - 1
c.metas[curLeader].logger.Printf("add forward address %s as meta server", addr)
}
resp, err = c.callFunc(ctx, c.metas[curLeader])
}

if err != nil || resp.GetErr().Errno == base.ERR_FORWARD_TO_OTHERS.String() {
return false
}
// the RPC succeeds, this meta becomes the new leader now.
atomic.StoreUint32(&c.newLead, uint32(i))
atomic.StoreUint32(&c.newLead, uint32(curLeader))
select {
case <-ctx.Done():
case c.respCh <- resp:
Expand All @@ -133,3 +162,16 @@ func (c *metaCall) issueBackupMetas(ctx context.Context) {
}(i)
}
}

func (c *metaCall) getMetaServiceForwardAddress(resp metaResponse) *base.RPCAddress {
rep, ok := resp.(*replication.QueryCfgResponse)
if !ok || rep.GetErr().Errno != base.ERR_FORWARD_TO_OTHERS.String() {
return nil
} else if rep.GetPartitions() == nil || len(rep.GetPartitions()) == 0 {
return nil
} else {
return rep.Partitions[0].Primary

}

}
18 changes: 17 additions & 1 deletion go-client/session/meta_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,12 @@ func NewMetaManager(addrs []string, creator NodeSessionCreator) *MetaManager {

func (m *MetaManager) call(ctx context.Context, callFunc metaCallFunc) (metaResponse, error) {
lead := m.getCurrentLeader()
call := newMetaCall(lead, m.metas, callFunc)
call := newMetaCall(lead, m.metas, callFunc, m.metaIPAddrs)
resp, err := call.Run(ctx)
if err == nil {
m.setCurrentLeader(int(call.newLead))
m.setNewMetas(call.metas)
m.setMetaIPAddrs(call.metaIPAddrs)
}
return resp, err
}
Expand Down Expand Up @@ -131,6 +133,20 @@ func (m *MetaManager) setCurrentLeader(lead int) {
m.currentLeader = lead
}

func (m *MetaManager) setNewMetas(metas []*metaSession) {
m.mu.Lock()
defer m.mu.Unlock()

m.metas = metas
}

func (m *MetaManager) setMetaIPAddrs(metaIPAddrs []string) {
m.mu.Lock()
defer m.mu.Unlock()

m.metaIPAddrs = metaIPAddrs
}

// Close the sessions.
func (m *MetaManager) Close() error {
funcs := make([]func() error, len(m.metas))
Expand Down
19 changes: 18 additions & 1 deletion go-client/session/meta_session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,28 @@ func TestMetaManager_FirstMetaDead(t *testing.T) {
for i := 0; i < 3; i++ {
call := newMetaCall(mm.currentLeader, mm.metas, func(rpcCtx context.Context, ms *metaSession) (metaResponse, error) {
return ms.queryConfig(rpcCtx, "temp")
})
}, []string{"0.0.0.0:12345", "0.0.0.0:34603", "0.0.0.0:34602", "0.0.0.0:34601"})
// This a trick for testing. If metaCall issue to other meta, not only to the leader, this nil channel will cause panic.
call.backupCh = nil
metaResp, err := call.Run(context.Background())
assert.Nil(t, err)
assert.Equal(t, metaResp.GetErr().Errno, base.ERR_OK.String())
}
}

// This case mocks the case that the server primary meta is not in the client metalist.
// And the client will forward to the primary meta automatically.
func TestNodeSession_ForwardToPrimaryMeta(t *testing.T) {
defer leaktest.Check(t)()

metaList := []string{"0.0.0.0:34601", "0.0.0.0:34602", "0.0.0.0:34603"}

for i := 0; i < 3; i++ {
mm := NewMetaManager(metaList[i:i+1], NewNodeSession)
defer mm.Close()
resp, err := mm.QueryConfig(context.Background(), "temp")
println(resp)
assert.Nil(t, err)
assert.Equal(t, resp.Err.Errno, base.ERR_OK.String())
}
}

0 comments on commit 41141c1

Please sign in to comment.