Skip to content

Commit

Permalink
set primary with force
Browse files Browse the repository at this point in the history
* when joining, do not marshall anything other than Smap and BMD
* add extended comment
* add step-wise logs (10 steps)
* new action message `ActBumpMetasync` (internal)
* part five, prev. commit: 5cec026
* separately:
  - remove `apc.ActSendOwnershipTbl` and related handler - not used

Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Nov 4, 2024
1 parent 5cec026 commit aeed9cb
Show file tree
Hide file tree
Showing 8 changed files with 90 additions and 97 deletions.
10 changes: 5 additions & 5 deletions ais/htrun.go
Original file line number Diff line number Diff line change
Expand Up @@ -1954,11 +1954,11 @@ func (h *htrun) regTo(url string, psi *meta.Snode, tout time.Duration, htext hte
skipPrxKalive = h.si.IsProxy() || keepalive
opts = cmetaFillOpt{
htext: htext,
skipSmap: skipPrxKalive,
skipBMD: skipPrxKalive,
skipRMD: keepalive,
skipConfig: keepalive,
skipEtlMD: keepalive,
skipSmap: skipPrxKalive, // when targets self- or admin-join
skipBMD: skipPrxKalive, // ditto
skipRMD: true, // NOTE: not used yet
skipConfig: true, // ditto
skipEtlMD: true, // ditto
fillRebMarker: !keepalive,
skipPrimeTime: true,
}
Expand Down
143 changes: 67 additions & 76 deletions ais/prxclu.go
Original file line number Diff line number Diff line change
Expand Up @@ -1003,19 +1003,17 @@ func (p *proxy) cluputMsg(w http.ResponseWriter, r *http.Request) {
if err != nil {
return
}
if msg.Action != apc.ActSendOwnershipTbl {
// must be primary to execute all the rest actions
if p.forwardCP(w, r, msg, "") {
return
}
// must be primary to execute all the rest actions
if p.forwardCP(w, r, msg, "") {
return
}

// not just 'cluster-started' - must be ready to rebalance as well
// with two distinct exceptions
withRR := (msg.Action != apc.ActShutdownCluster && msg.Action != apc.ActXactStop)
if err := p.pready(nil, withRR); err != nil {
p.writeErr(w, r, err, http.StatusServiceUnavailable)
return
}
// not just 'cluster-started' - must be ready to rebalance as well
// with two distinct exceptions
withRR := (msg.Action != apc.ActShutdownCluster && msg.Action != apc.ActXactStop)
if err := p.pready(nil, withRR); err != nil {
p.writeErr(w, r, err, http.StatusServiceUnavailable)
return
}

switch msg.Action {
Expand Down Expand Up @@ -1085,13 +1083,21 @@ func (p *proxy) cluputMsg(w http.ResponseWriter, r *http.Request) {
p.xstart(w, r, msg)
case apc.ActXactStop:
p.xstop(w, r, msg)
case apc.ActSendOwnershipTbl:
p.sendOwnTbl(w, r, msg)

// internal
case apc.ActBumpMetasync:
p.bumpMsyncAll(w, r)

// fail
default:
p.writeErrAct(w, r, msg.Action)
}
}

// TODO -- FIXME: niy
func (*proxy) bumpMsyncAll(http.ResponseWriter, *http.Request) {
}

func (p *proxy) setCluCfgPersistent(w http.ResponseWriter, r *http.Request, toUpdate *cmn.ConfigToSet, msg *apc.ActMsg) {
ctx := &configModifier{
pre: _setConfPre,
Expand Down Expand Up @@ -1388,57 +1394,6 @@ func (p *proxy) rebalanceCluster(w http.ResponseWriter, r *http.Request, msg *ap
writeXid(w, rmdCtx.rebID)
}

func (p *proxy) sendOwnTbl(w http.ResponseWriter, r *http.Request, msg *apc.ActMsg) {
var (
smap = p.owner.smap.get()
dstID string
)
if err := cos.MorphMarshal(msg.Value, &dstID); err != nil {
p.writeErrf(w, r, cmn.FmtErrMorphUnmarshal, p.si, msg.Action, msg.Value, err)
return
}
dst := smap.GetProxy(dstID)
if dst == nil {
p.writeErrf(w, r, "%s: unknown proxy node p[%s]", p.si, dstID)
return
}
if !smap.IsIC(dst) {
p.writeErrf(w, r, "%s: not an IC member", dst)
return
}
if smap.IsIC(p.si) && !p.si.Eq(dst) {
// node has older version than dst node handle locally
if err := p.ic.sendOwnershipTbl(dst, smap); err != nil {
p.writeErr(w, r, err)
}
return
}
// forward
var (
err error
cargs = allocCargs()
)
{
cargs.req = cmn.HreqArgs{Method: http.MethodPut, Path: apc.URLPathClu.S, Body: cos.MustMarshal(msg)}
cargs.timeout = apc.DefaultTimeout
}
for pid, psi := range smap.Pmap {
if !smap.IsIC(psi) || pid == dstID {
continue
}
cargs.si = psi
res := p.call(cargs, smap)
if res.err != nil {
err = res.toErr()
}
freeCR(res)
}
if err != nil {
p.writeErr(w, r, err)
}
freeCargs(cargs)
}

// gracefully remove node via apc.ActStartMaintenance, apc.ActDecommission, apc.ActShutdownNode
func (p *proxy) rmNode(w http.ResponseWriter, r *http.Request, msg *apc.ActMsg) {
var (
Expand Down Expand Up @@ -1648,7 +1603,7 @@ func (p *proxy) stopMaintenance(w http.ResponseWriter, r *http.Request, msg *apc
}
tout := cmn.Rom.CplaneOperation()
if _, status, err := p.reqHealth(si, tout, nil, smap, false /*retry pub-addr*/); err != nil {
// TODO -- FIXME: use cmn.KeepaliveRetryDuration()
// TODO: use cmn.KeepaliveRetryDuration()
sleep, retries := tout/2, 4

time.Sleep(sleep)
Expand Down Expand Up @@ -2086,13 +2041,14 @@ func (p *proxy) _setPrimary(w http.ResponseWriter, r *http.Request, npsi *meta.S
freeBcastRes(results)
}

// forced primary change - is used when the original primary's network is down
// for a while, long enough for the remaining nodes to elect new primary. When the
// original primary gets back online it does not join automatically to the
// new (elected) primary, which may further lead to split-brain situation.

// TODO -- FIXME: two-phase commit via apc.QparamPrepare = (true | false)

// Force primary change (*****)
// 10-steps sequence that now supports merging two different clusters
// Background:
// - when for whatever reason some of the nodes that include at least one proxy stop seeing the _current_ primary
// they may, after keep-aliving for a while and talking to each other, go ahead and elect a new primary -
// from themselves and for themselves;
// - when the network is back up again we then discover split-brain in progress, and we may not like it.
// Beware!.. well, just beware.
func (p *proxy) forceJoin(w http.ResponseWriter, r *http.Request, npid string, q url.Values) {
const (
tag = "designated primary"
Expand Down Expand Up @@ -2191,6 +2147,8 @@ func (p *proxy) forceJoin(w http.ResponseWriter, r *http.Request, npid string, q
return
}

nlog.Infoln(act, "prepare (6)")

// 6. prepare phase whereby all members health-ping => npsi
bargs := allocBcArgs()
{
Expand All @@ -2211,14 +2169,19 @@ func (p *proxy) forceJoin(w http.ResponseWriter, r *http.Request, npid string, q
}
freeBcastRes(results)

nlog.Infoln(act, "metasync (7)")

// 7. metasync destination cluMeta to _this_ cluster members
aimsg := p.newAmsgActVal(apc.ActPrimaryForce, smap)
wg := p.metasyncer.sync(revsPair{ncm.Smap, aimsg}, revsPair{ncm.BMD, aimsg}, revsPair{ncm.Config, aimsg}, revsPair{ncm.RMD, aimsg})
wg.Wait()

nlog.Infoln(act, "update clu-meta (8)")

// 8. update cluMeta in memory (= destination)
if err = cmn.GCO.Update(&ncm.Config.ClusterConfig); err != nil {
// rollback #1
nlog.Errorln(act, "rollback #1", err)
wg := p.metasyncer.sync(revsPair{cm.Smap, aimsg}, revsPair{cm.BMD, aimsg}, revsPair{cm.Config, aimsg}, revsPair{cm.RMD, aimsg})
wg.Wait()

Expand All @@ -2229,7 +2192,9 @@ func (p *proxy) forceJoin(w http.ResponseWriter, r *http.Request, npid string, q
debug.Assert(ok)
bmdOwnerPrx.put(ncm.BMD)

// 9. finally, join self
nlog.Infoln(act, "join self (9)")

// 9. join self (up to 3 attempts)
joinURL, secondURL := npsi.ControlNet.URL, npsi.PubNet.URL
if nurl == npsi.PubNet.URL {
joinURL, secondURL = npsi.PubNet.URL, npsi.ControlNet.URL
Expand All @@ -2243,12 +2208,20 @@ func (p *proxy) forceJoin(w http.ResponseWriter, r *http.Request, npid string, q
if joinURL != secondURL {
nlog.Warningln(res.toErr(), "- 2nd attempt via", secondURL)
res = p.regTo(secondURL, npsi, apc.DefaultTimeout, nil, false)
eh = res.toErr()
e, eh = res.err, res.toErr()
freeCR(res)
if e != nil {
time.Sleep(time.Second)
nlog.Warningln(res.toErr(), "- 3d (final) attempt via", joinURL)
res = p.regTo(joinURL, npsi, apc.DefaultTimeout, nil, false)
eh = res.toErr()
freeCR(res)
}
}
}
if eh != nil {
// rollback #2
nlog.Errorln(act, "rollback #2", eh)
if nested := cmn.GCO.Update(&cm.Config.ClusterConfig); nested != nil {
nlog.Errorf("FATAL: nested config-update error when rolling back [%s %s to %s]: %v",
act, p, npname, nested) // (unlikely)
Expand All @@ -2263,7 +2236,25 @@ func (p *proxy) forceJoin(w http.ResponseWriter, r *http.Request, npid string, q
}
p.metasyncer.becomeNonPrimary() // metasync to stop syncing and cancel all pending requests

// TODO -- FIXME: in turn, npsi must metasync all joined members - including this former primary - using apc.ActPrimaryForce message
time.Sleep(time.Second)

nlog.Infoln(act, "ask npsi to bump metasync (10)")

// 10. finally, ask npsi to bump versions and metasync all
cargs = allocCargs()
{
cargs.si = npsi
cargs.timeout = cmn.Rom.MaxKeepalive()
aimsg := p.newAmsgActVal(apc.ActBumpMetasync, nil)
cargs.req = cmn.HreqArgs{Method: http.MethodPut, Path: apc.URLPathClu.S, Body: cos.MustMarshal(aimsg)}
}
res = p.call(cargs, newSmap)
err = res.unwrap()
freeCargs(cargs)
if err != nil {
p.writeErr(w, r, err)
}
freeCR(res)
}

//////////////////////////////////////////
Expand Down
24 changes: 12 additions & 12 deletions ais/tgtcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ func (t *target) daeputItems(w http.ResponseWriter, r *http.Request, apiItems []
}
nlog.Infof("%s: %s %s done", t, apc.SyncSmap, newsmap)
case apc.Mountpaths:
t.handleMountpathReq(w, r)
t.handleMpathReq(w, r)
case apc.ActSetConfig: // set-config #1 - via query parameters and "?n1=v1&n2=v2..."
t.setDaemonConfigQuery(w, r)
case apc.ActEnableBackend:
Expand Down Expand Up @@ -481,19 +481,19 @@ func (t *target) httpdaepost(w http.ResponseWriter, r *http.Request) {
return
}
act := apiItems[0]
if act == apc.Mountpaths {
t.handleMountpathReq(w, r)
return
}
if act == apc.ActPrimaryForce {
switch act {
case apc.Mountpaths:
t.handleMpathReq(w, r)
case apc.ActPrimaryForce:
t.prepForceJoin(w, r)
return
}
if act != apc.AdminJoin {
case apc.AdminJoin:
t.adminJoin(w, r)
default:
t.writeErrURL(w, r)
return
}
}

func (t *target) adminJoin(w http.ResponseWriter, r *http.Request) {
// user request to join cluster (compare with `apc.SelfJoin`)
if !t.regstate.disabled.Load() {
if t.keepalive.paused() {
Expand Down Expand Up @@ -533,7 +533,7 @@ func (t *target) httpdaedelete(w http.ResponseWriter, r *http.Request) {
}
switch apiItems[0] {
case apc.Mountpaths:
t.handleMountpathReq(w, r)
t.handleMpathReq(w, r)
default:
t.writeErrURL(w, r)
}
Expand Down Expand Up @@ -562,7 +562,7 @@ func (t *target) cleanupMark(ctx *cleanmark) {
}
}

func (t *target) handleMountpathReq(w http.ResponseWriter, r *http.Request) {
func (t *target) handleMpathReq(w http.ResponseWriter, r *http.Request) {
msg, err := t.readActionMsg(w, r)
if err != nil {
return
Expand Down
2 changes: 1 addition & 1 deletion api/apc/actmsg.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ const (
ActKeepaliveUpdate = "keepalive-update"

// IC
ActSendOwnershipTbl = "ic-send-own-tbl"
ActListenToNotif = "watch-xaction"
ActMergeOwnershipTbl = "ic-merge-own-tbl"
ActRegGlobalXaction = "reg-global-xaction"
Expand All @@ -110,6 +109,7 @@ const (
ActCleanupMarkers = "cleanup-markers" // part of the target joining sequence
ActSelfRemove = "self-initiated-removal" // e.g., when losing last mountpath
ActPrimaryForce = "primary-force" // BEWARE! advanced usage only
ActBumpMetasync = "bump-metasync" // when executing ActPrimaryForce - the final step
)

const (
Expand Down
1 change: 1 addition & 0 deletions cmd/cli/cli/config_hdlr.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ var (
}
)

// TODO: prune config.ClusterConfig - hide deprecated "non_electable"
func setCluConfigHandler(c *cli.Context) error {
var (
nvs cos.StrKVs
Expand Down
1 change: 1 addition & 0 deletions cmd/cli/cli/show_hdlr.go
Original file line number Diff line number Diff line change
Expand Up @@ -707,6 +707,7 @@ func showAnyConfigHandler(c *cli.Context) error {
}
}

// TODO: prune config.ClusterConfig - hide deprecated "non_electable"
func showClusterConfig(c *cli.Context, section string) error {
var (
usejs = flagIsSet(c, jsonFlag)
Expand Down
2 changes: 1 addition & 1 deletion cmd/cli/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/NVIDIA/aistore/cmd/cli
go 1.23.2

require (
github.com/NVIDIA/aistore v1.3.26-0.20241104174155-c8fb2e54f015
github.com/NVIDIA/aistore v1.3.26-0.20241104174947-5cec02626d17
github.com/fatih/color v1.17.0
github.com/json-iterator/go v1.1.12
github.com/onsi/ginkgo/v2 v2.20.2
Expand Down
4 changes: 2 additions & 2 deletions cmd/cli/go.sum
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
code.cloudfoundry.org/bytefmt v0.0.0-20190710193110-1eb035ffe2b6/go.mod h1:wN/zk7mhREp/oviagqUXY3EwuHhWyOvAdsn5Y4CzOrc=
github.com/BurntSushi/toml v1.4.0/go.mod h1:ukJfTF/6rtPPRCnwkur4qwRxa8vTRFBF0uk2lLoLwho=
github.com/NVIDIA/aistore v1.3.26-0.20241104174155-c8fb2e54f015 h1:xh9EglFymBCbVtfcu99BENTKynxQe5nTMeWHblJ9TQM=
github.com/NVIDIA/aistore v1.3.26-0.20241104174155-c8fb2e54f015/go.mod h1:IGdDyXEbwtj194tZukn6ptpn8ldL2pApqfOSIyTQyw4=
github.com/NVIDIA/aistore v1.3.26-0.20241104174947-5cec02626d17 h1:bZpyEV3NEQfWfYtfqv1OTXrO7LF6UenaNPUxFfcyXu0=
github.com/NVIDIA/aistore v1.3.26-0.20241104174947-5cec02626d17/go.mod h1:IGdDyXEbwtj194tZukn6ptpn8ldL2pApqfOSIyTQyw4=
github.com/OneOfOne/xxhash v1.2.8 h1:31czK/TI9sNkxIKfaUfGlU47BAxQ0ztGgd9vPyqimf8=
github.com/OneOfOne/xxhash v1.2.8/go.mod h1:eZbhyaAYD41SGSSsnmcpxVoRiQ/MPUTjUdIIOT9Um7Q=
github.com/VividCortex/ewma v1.1.1/go.mod h1:2Tkkvm3sRDVXaiyucHiACn4cqf7DpdyLvmxzcbUokwA=
Expand Down

0 comments on commit aeed9cb

Please sign in to comment.