From 6e1220955e445de7acebe499b878ba759faa7550 Mon Sep 17 00:00:00 2001 From: Alex Aizman Date: Sun, 3 Nov 2024 15:22:00 -0500 Subject: [PATCH] set primary with force * add 'prepare' phase * part three, prev. commit: f3bf414bb006 Signed-off-by: Alex Aizman --- ais/htrun.go | 37 +++++++++++++++++++++++++++++---- ais/proxy.go | 19 ++++++++++++----- ais/prxclu.go | 50 ++++++++++++++++++++++++++++++++++++--------- ais/tgtcp.go | 10 ++++++--- api/apc/urlpaths.go | 1 + 5 files changed, 95 insertions(+), 22 deletions(-) diff --git a/ais/htrun.go b/ais/htrun.go index 4733b82c94..8768fe2b26 100644 --- a/ais/htrun.go +++ b/ais/htrun.go @@ -1440,7 +1440,7 @@ func (h *htrun) reqHealth(si *meta.Snode, tout time.Duration, q url.Values, smap b, status, err = res.bytes, res.status, res.err freeCR(res) if err != nil { - nlog.Warningln(h.si.String(), "=>", si.StringEx(), "failed slow-ping retry:", err) + nlog.Warningln(h.si.String(), "=>", si.StringEx(), "failed req-health retry:", err) } } } @@ -1638,12 +1638,12 @@ func (h *htrun) extractSmap(payload msPayload, caller string, skipValidation boo } if msg.Action == apc.ActPrimaryForce { - var origSmap smapX - if err := cos.MorphMarshal(msg.Value, &origSmap); err != nil { + origSmap := &smapX{} + if err := cos.MorphMarshal(msg.Value, origSmap); err != nil { debug.AssertNoErr(err) // unlikely } if !origSmap.isPresent(h.si) { - err = &errSelfNotFound{act: act + " (with force)", si: h.si, tag: "orig", smap: &origSmap} + err = &errSelfNotFound{act: act + " (with force)", si: h.si, tag: "orig", smap: origSmap} return } logmsync(smap.Version, newSmap, msg, caller) @@ -2192,6 +2192,35 @@ func (h *htrun) externalWD(w http.ResponseWriter, r *http.Request) (responded bo return } +// (primary forceJoin() calling) +func (h *htrun) prepForceJoin(w http.ResponseWriter, r *http.Request) { + const tag = "prep-force-join" + q := r.URL.Query() + if !cos.IsParseBool(q.Get(apc.QparamPrepare)) { + err := errors.New(tag + ": expecting '" + apc.QparamPrepare + "=true' query") + h.writeErr(w, r, err) + return + } + msg, err := h.readAisMsg(w, r) + if err != nil { + return + } + newSmap := &smapX{} + if err := cos.MorphMarshal(msg.Value, &newSmap); err != nil { + h.writeErrf(w, r, cmn.FmtErrMorphUnmarshal, h.si, msg.Action, msg.Value, err) + return + } + var ( + npsi = newSmap.Primary + smap = h.owner.smap.get() + tout = cmn.Rom.CplaneOperation() + ) + if _, code, err := h.reqHealth(npsi, tout, nil, smap, true /*retry pub-addr*/); err != nil { + err = fmt.Errorf("%s: failed to reach %s [%v(%d)]", tag, npsi.StringEx(), err, code) + h.writeErr(w, r, err) + } +} + // // intra-cluster request validations and helpers // diff --git a/ais/proxy.go b/ais/proxy.go index 5a563994f4..d971469fc6 100644 --- a/ais/proxy.go +++ b/ais/proxy.go @@ -1572,13 +1572,13 @@ func (p *proxy) _bcr(w http.ResponseWriter, r *http.Request, query url.Values, m } // remote: check existence and get (cloud) props - rhdr, statusCode, err := p.headRemoteBck(bck.RemoteBck(), nil) + rhdr, code, err := p.headRemoteBck(bck.RemoteBck(), nil) if err != nil { - if statusCode == http.StatusNotFound && msg.Action == apc.ActCreateBck { - statusCode = http.StatusNotImplemented + if msg.Action == apc.ActCreateBck && (code == http.StatusNotFound || code == http.StatusBadRequest) { + code = http.StatusNotImplemented err = cmn.NewErrNotImpl("create", bck.Provider+" bucket") } - p.writeErr(w, r, err, statusCode) + p.writeErr(w, r, err, code) return } remoteHdr = rhdr @@ -2848,13 +2848,22 @@ func (p *proxy) httpdaepost(w http.ResponseWriter, r *http.Request) { if err != nil { return } - if len(apiItems) == 0 || apiItems[0] != apc.AdminJoin { + if len(apiItems) == 0 { p.writeErrURL(w, r) return } if err := p.checkAccess(w, r, nil, apc.AceAdmin); err != nil { return } + act := apiItems[0] + if act == apc.ActPrimaryForce { + p.prepForceJoin(w, r) + return + } + if act != apc.AdminJoin { + p.writeErrURL(w, r) + return + } if !p.keepalive.paused() { nlog.Warningf("%s: keepalive is already active - proceeding to resume (and reset) anyway", p) } diff --git a/ais/prxclu.go b/ais/prxclu.go index dde5fd7999..a5e8b5d5f8 100644 --- a/ais/prxclu.go +++ b/ais/prxclu.go @@ -959,10 +959,11 @@ func (p *proxy) _syncFinal(ctx *smapModifier, clone *smapX) { ///////////////////// // - cluster membership, including maintenance and decommission -// - start/stop xactions // - rebalance +// - set-primary // - cluster-wide configuration -// - cluster membership, xactions, rebalance, configuration +// - start/stop xactions +// - logs... func (p *proxy) httpcluput(w http.ResponseWriter, r *http.Request) { apiItems, err := p.parseURL(w, r, apc.URLPathClu.L, 0, true) if err != nil { @@ -2115,9 +2116,9 @@ func (p *proxy) forceJoin(w http.ResponseWriter, r *http.Request, npid string, q return } - // 2. validate destination cluster + // 2. get destination smap (henceforth, newSmap) newSmap, err := p.smapFromURL(nurl) - if err != nil && psi.PubNet.URL != psi.ControlNet.URL { + if err != nil && psi != nil && psi.PubNet.URL != psi.ControlNet.URL { newSmap, err = p.smapFromURL(psi.PubNet.URL) } if err != nil { @@ -2126,6 +2127,17 @@ func (p *proxy) forceJoin(w http.ResponseWriter, r *http.Request, npid string, q return } npsi := newSmap.Primary + if nurl != npsi.PubNet.URL && nurl != npsi.ControlNet.URL { + // must be reachable via its own (new)Smap + err := fmt.Errorf("%s: %s URLs %q vs (pub %q, ctrl %q)", p, tag, nurl, npsi.PubNet.URL, npsi.ControlNet.URL) + nlog.Warningln(err) + if _, e := p.smapFromURL(npsi.ControlNet.URL); e != nil { + if _, e = p.smapFromURL(npsi.PubNet.URL); e != nil { + p.writeErr(w, r, err) + return + } + } + } if npid != npsi.ID() { err := fmt.Errorf("%s: according to the destination %s %s[%s] is _not_ primary", p, newSmap.StringEx(), tag, npid) p.writeErr(w, r, err) @@ -2164,21 +2176,39 @@ func (p *proxy) forceJoin(w http.ResponseWriter, r *http.Request, npid string, q debug.Assert(ok) freeCR(res) - // backup (to rollback, if need be) + // 5. backup (to rollback, if need be) cm, err := p.cluMeta(cmetaFillOpt{skipPrimeTime: true}) if err != nil { p.writeErrf(w, r, "cannot %s %s to %s: %v", act, p, npname, err) // (unlikely) return } - // TODO -- FIXME: add 'prepare' step whereby all members health(npsi) + // 6. prepare phase whereby all members health-ping(npsi) + bargs := allocBcArgs() + { + aimsg := p.newAmsgActVal(apc.ActPrimaryForce, newSmap) + q := make(url.Values, 1) + q.Set(apc.QparamPrepare, "true") + bargs.req = cmn.HreqArgs{Method: http.MethodPost, Path: apc.URLPathDaeForceJoin.S, Query: q, Body: cos.MustMarshal(aimsg)} + bargs.to = core.AllNodes + } + results := p.bcastGroup(bargs) + freeBcArgs(bargs) + for _, res := range results { + if res.err != nil { + p.writeErrf(w, r, "node %s failed to contact new primary %s in the prepare phase (err: %v)", res.si, npname, res.err) + freeBcastRes(results) + return + } + } + freeBcastRes(results) - // 5. metasync destination cluMeta to _this_ cluster members + // 7. metasync destination cluMeta to _this_ cluster members aimsg := p.newAmsgActVal(apc.ActPrimaryForce, smap) wg := p.metasyncer.sync(revsPair{ncm.Smap, aimsg}, revsPair{ncm.BMD, aimsg}, revsPair{ncm.Config, aimsg}, revsPair{ncm.RMD, aimsg}) wg.Wait() - // 6. update cluMeta in memory (= destination) + // 8. update cluMeta in memory (= destination) if err = cmn.GCO.Update(&ncm.Config.ClusterConfig); err != nil { // rollback #1 wg := p.metasyncer.sync(revsPair{cm.Smap, aimsg}, revsPair{cm.BMD, aimsg}, revsPair{cm.Config, aimsg}, revsPair{cm.RMD, aimsg}) @@ -2191,7 +2221,7 @@ func (p *proxy) forceJoin(w http.ResponseWriter, r *http.Request, npid string, q debug.Assert(ok) bmdOwnerPrx.put(ncm.BMD) - // 7. finally, join self + // 9. finally, join self joinURL, secondURL := npsi.ControlNet.URL, npsi.PubNet.URL if nurl == npsi.PubNet.URL { joinURL, secondURL = npsi.PubNet.URL, npsi.ControlNet.URL @@ -2224,7 +2254,7 @@ func (p *proxy) forceJoin(w http.ResponseWriter, r *http.Request, npid string, q } p.metasyncer.becomeNonPrimary() // metasync to stop syncing and cancel all pending requests - // TODO -- FIXME: persist ncm locally + // TODO -- FIXME: in turn, npsi must metasync all joined members - including this former primary - using apc.ActPrimaryForce message } ////////////////////////////////////////// diff --git a/ais/tgtcp.go b/ais/tgtcp.go index 38120757b8..03d14b94e2 100644 --- a/ais/tgtcp.go +++ b/ais/tgtcp.go @@ -480,12 +480,16 @@ func (t *target) httpdaepost(w http.ResponseWriter, r *http.Request) { t.writeErrURL(w, r) return } - apiOp := apiItems[0] - if apiOp == apc.Mountpaths { + act := apiItems[0] + if act == apc.Mountpaths { t.handleMountpathReq(w, r) return } - if apiOp != apc.AdminJoin { + if act == apc.ActPrimaryForce { + t.prepForceJoin(w, r) + return + } + if act != apc.AdminJoin { t.writeErrURL(w, r) return } diff --git a/api/apc/urlpaths.go b/api/apc/urlpaths.go index 3391fa56c7..57fe6b969d 100644 --- a/api/apc/urlpaths.go +++ b/api/apc/urlpaths.go @@ -128,6 +128,7 @@ var ( URLPathDaeProxy = urlpath(Version, Daemon, Proxy) URLPathDaeSetConf = urlpath(Version, Daemon, ActSetConfig) URLPathDaeAdminJoin = urlpath(Version, Daemon, AdminJoin) + URLPathDaeForceJoin = urlpath(Version, Daemon, ActPrimaryForce) URLPathDaeBendDisable = urlpath(Version, Daemon, ActDisableBackend) URLPathDaeBendEnable = urlpath(Version, Daemon, ActEnableBackend)