Skip to content

Commit

Permalink
set primary with force
Browse files Browse the repository at this point in the history
* add 'prepare' phase
* part three, prev. commit: f3bf414

Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Nov 4, 2024
1 parent 60cb73a commit 6e12209
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 22 deletions.
37 changes: 33 additions & 4 deletions ais/htrun.go
Original file line number Diff line number Diff line change
Expand Up @@ -1440,7 +1440,7 @@ func (h *htrun) reqHealth(si *meta.Snode, tout time.Duration, q url.Values, smap
b, status, err = res.bytes, res.status, res.err
freeCR(res)
if err != nil {
nlog.Warningln(h.si.String(), "=>", si.StringEx(), "failed slow-ping retry:", err)
nlog.Warningln(h.si.String(), "=>", si.StringEx(), "failed req-health retry:", err)
}
}
}
Expand Down Expand Up @@ -1638,12 +1638,12 @@ func (h *htrun) extractSmap(payload msPayload, caller string, skipValidation boo
}

if msg.Action == apc.ActPrimaryForce {
var origSmap smapX
if err := cos.MorphMarshal(msg.Value, &origSmap); err != nil {
origSmap := &smapX{}
if err := cos.MorphMarshal(msg.Value, origSmap); err != nil {
debug.AssertNoErr(err) // unlikely
}
if !origSmap.isPresent(h.si) {
err = &errSelfNotFound{act: act + " (with force)", si: h.si, tag: "orig", smap: &origSmap}
err = &errSelfNotFound{act: act + " (with force)", si: h.si, tag: "orig", smap: origSmap}
return
}
logmsync(smap.Version, newSmap, msg, caller)
Expand Down Expand Up @@ -2192,6 +2192,35 @@ func (h *htrun) externalWD(w http.ResponseWriter, r *http.Request) (responded bo
return
}

// (primary forceJoin() calling)
func (h *htrun) prepForceJoin(w http.ResponseWriter, r *http.Request) {
const tag = "prep-force-join"
q := r.URL.Query()
if !cos.IsParseBool(q.Get(apc.QparamPrepare)) {
err := errors.New(tag + ": expecting '" + apc.QparamPrepare + "=true' query")
h.writeErr(w, r, err)
return
}
msg, err := h.readAisMsg(w, r)
if err != nil {
return
}
newSmap := &smapX{}
if err := cos.MorphMarshal(msg.Value, &newSmap); err != nil {
h.writeErrf(w, r, cmn.FmtErrMorphUnmarshal, h.si, msg.Action, msg.Value, err)
return
}
var (
npsi = newSmap.Primary
smap = h.owner.smap.get()
tout = cmn.Rom.CplaneOperation()
)
if _, code, err := h.reqHealth(npsi, tout, nil, smap, true /*retry pub-addr*/); err != nil {
err = fmt.Errorf("%s: failed to reach %s [%v(%d)]", tag, npsi.StringEx(), err, code)
h.writeErr(w, r, err)
}
}

//
// intra-cluster request validations and helpers
//
Expand Down
19 changes: 14 additions & 5 deletions ais/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -1572,13 +1572,13 @@ func (p *proxy) _bcr(w http.ResponseWriter, r *http.Request, query url.Values, m
}

// remote: check existence and get (cloud) props
rhdr, statusCode, err := p.headRemoteBck(bck.RemoteBck(), nil)
rhdr, code, err := p.headRemoteBck(bck.RemoteBck(), nil)
if err != nil {
if statusCode == http.StatusNotFound && msg.Action == apc.ActCreateBck {
statusCode = http.StatusNotImplemented
if msg.Action == apc.ActCreateBck && (code == http.StatusNotFound || code == http.StatusBadRequest) {
code = http.StatusNotImplemented
err = cmn.NewErrNotImpl("create", bck.Provider+" bucket")
}
p.writeErr(w, r, err, statusCode)
p.writeErr(w, r, err, code)
return
}
remoteHdr = rhdr
Expand Down Expand Up @@ -2848,13 +2848,22 @@ func (p *proxy) httpdaepost(w http.ResponseWriter, r *http.Request) {
if err != nil {
return
}
if len(apiItems) == 0 || apiItems[0] != apc.AdminJoin {
if len(apiItems) == 0 {
p.writeErrURL(w, r)
return
}
if err := p.checkAccess(w, r, nil, apc.AceAdmin); err != nil {
return
}
act := apiItems[0]
if act == apc.ActPrimaryForce {
p.prepForceJoin(w, r)
return
}
if act != apc.AdminJoin {
p.writeErrURL(w, r)
return
}
if !p.keepalive.paused() {
nlog.Warningf("%s: keepalive is already active - proceeding to resume (and reset) anyway", p)
}
Expand Down
50 changes: 40 additions & 10 deletions ais/prxclu.go
Original file line number Diff line number Diff line change
Expand Up @@ -959,10 +959,11 @@ func (p *proxy) _syncFinal(ctx *smapModifier, clone *smapX) {
/////////////////////

// - cluster membership, including maintenance and decommission
// - start/stop xactions
// - rebalance
// - set-primary
// - cluster-wide configuration
// - cluster membership, xactions, rebalance, configuration
// - start/stop xactions
// - logs...
func (p *proxy) httpcluput(w http.ResponseWriter, r *http.Request) {
apiItems, err := p.parseURL(w, r, apc.URLPathClu.L, 0, true)
if err != nil {
Expand Down Expand Up @@ -2115,9 +2116,9 @@ func (p *proxy) forceJoin(w http.ResponseWriter, r *http.Request, npid string, q
return
}

// 2. validate destination cluster
// 2. get destination smap (henceforth, newSmap)
newSmap, err := p.smapFromURL(nurl)
if err != nil && psi.PubNet.URL != psi.ControlNet.URL {
if err != nil && psi != nil && psi.PubNet.URL != psi.ControlNet.URL {
newSmap, err = p.smapFromURL(psi.PubNet.URL)
}
if err != nil {
Expand All @@ -2126,6 +2127,17 @@ func (p *proxy) forceJoin(w http.ResponseWriter, r *http.Request, npid string, q
return
}
npsi := newSmap.Primary
if nurl != npsi.PubNet.URL && nurl != npsi.ControlNet.URL {
// must be reachable via its own (new)Smap
err := fmt.Errorf("%s: %s URLs %q vs (pub %q, ctrl %q)", p, tag, nurl, npsi.PubNet.URL, npsi.ControlNet.URL)
nlog.Warningln(err)
if _, e := p.smapFromURL(npsi.ControlNet.URL); e != nil {
if _, e = p.smapFromURL(npsi.PubNet.URL); e != nil {
p.writeErr(w, r, err)
return
}
}
}
if npid != npsi.ID() {
err := fmt.Errorf("%s: according to the destination %s %s[%s] is _not_ primary", p, newSmap.StringEx(), tag, npid)
p.writeErr(w, r, err)
Expand Down Expand Up @@ -2164,21 +2176,39 @@ func (p *proxy) forceJoin(w http.ResponseWriter, r *http.Request, npid string, q
debug.Assert(ok)
freeCR(res)

// backup (to rollback, if need be)
// 5. backup (to rollback, if need be)
cm, err := p.cluMeta(cmetaFillOpt{skipPrimeTime: true})
if err != nil {
p.writeErrf(w, r, "cannot %s %s to %s: %v", act, p, npname, err) // (unlikely)
return
}

// TODO -- FIXME: add 'prepare' step whereby all members health(npsi)
// 6. prepare phase whereby all members health-ping(npsi)
bargs := allocBcArgs()
{
aimsg := p.newAmsgActVal(apc.ActPrimaryForce, newSmap)
q := make(url.Values, 1)
q.Set(apc.QparamPrepare, "true")
bargs.req = cmn.HreqArgs{Method: http.MethodPost, Path: apc.URLPathDaeForceJoin.S, Query: q, Body: cos.MustMarshal(aimsg)}
bargs.to = core.AllNodes
}
results := p.bcastGroup(bargs)
freeBcArgs(bargs)
for _, res := range results {
if res.err != nil {
p.writeErrf(w, r, "node %s failed to contact new primary %s in the prepare phase (err: %v)", res.si, npname, res.err)
freeBcastRes(results)
return
}
}
freeBcastRes(results)

// 5. metasync destination cluMeta to _this_ cluster members
// 7. metasync destination cluMeta to _this_ cluster members
aimsg := p.newAmsgActVal(apc.ActPrimaryForce, smap)
wg := p.metasyncer.sync(revsPair{ncm.Smap, aimsg}, revsPair{ncm.BMD, aimsg}, revsPair{ncm.Config, aimsg}, revsPair{ncm.RMD, aimsg})
wg.Wait()

// 6. update cluMeta in memory (= destination)
// 8. update cluMeta in memory (= destination)
if err = cmn.GCO.Update(&ncm.Config.ClusterConfig); err != nil {
// rollback #1
wg := p.metasyncer.sync(revsPair{cm.Smap, aimsg}, revsPair{cm.BMD, aimsg}, revsPair{cm.Config, aimsg}, revsPair{cm.RMD, aimsg})
Expand All @@ -2191,7 +2221,7 @@ func (p *proxy) forceJoin(w http.ResponseWriter, r *http.Request, npid string, q
debug.Assert(ok)
bmdOwnerPrx.put(ncm.BMD)

// 7. finally, join self
// 9. finally, join self
joinURL, secondURL := npsi.ControlNet.URL, npsi.PubNet.URL
if nurl == npsi.PubNet.URL {
joinURL, secondURL = npsi.PubNet.URL, npsi.ControlNet.URL
Expand Down Expand Up @@ -2224,7 +2254,7 @@ func (p *proxy) forceJoin(w http.ResponseWriter, r *http.Request, npid string, q
}
p.metasyncer.becomeNonPrimary() // metasync to stop syncing and cancel all pending requests

// TODO -- FIXME: persist ncm locally
// TODO -- FIXME: in turn, npsi must metasync all joined members - including this former primary - using apc.ActPrimaryForce message
}

//////////////////////////////////////////
Expand Down
10 changes: 7 additions & 3 deletions ais/tgtcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,12 +480,16 @@ func (t *target) httpdaepost(w http.ResponseWriter, r *http.Request) {
t.writeErrURL(w, r)
return
}
apiOp := apiItems[0]
if apiOp == apc.Mountpaths {
act := apiItems[0]
if act == apc.Mountpaths {
t.handleMountpathReq(w, r)
return
}
if apiOp != apc.AdminJoin {
if act == apc.ActPrimaryForce {
t.prepForceJoin(w, r)
return
}
if act != apc.AdminJoin {
t.writeErrURL(w, r)
return
}
Expand Down
1 change: 1 addition & 0 deletions api/apc/urlpaths.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ var (
URLPathDaeProxy = urlpath(Version, Daemon, Proxy)
URLPathDaeSetConf = urlpath(Version, Daemon, ActSetConfig)
URLPathDaeAdminJoin = urlpath(Version, Daemon, AdminJoin)
URLPathDaeForceJoin = urlpath(Version, Daemon, ActPrimaryForce)

URLPathDaeBendDisable = urlpath(Version, Daemon, ActDisableBackend)
URLPathDaeBendEnable = urlpath(Version, Daemon, ActEnableBackend)
Expand Down

0 comments on commit 6e12209

Please sign in to comment.