From feebeae5e09b1231bf67c59449ac0c51b0882933 Mon Sep 17 00:00:00 2001 From: Alex Aizman Date: Sun, 13 Aug 2023 20:29:16 -0400 Subject: [PATCH] dsort: remove dsort-context, rewrite initialization * cleanup and simplify * rm unit tests * part fifteen, prev. commit: 393c709dcebad4c50 Signed-off-by: Alex Aizman --- ais/proxy.go | 3 +- ais/target.go | 11 +- ais/test/common.go | 8 +- ais/test/dsort_test.go | 1 - ais/tgtobj_test.go | 4 +- cluster/lom_test.go | 4 +- cluster/lom_xattr_test.go | 4 +- ext/dsort/bcast.go | 73 +++++ ext/dsort/conc_adjuster_test.go | 4 + ext/dsort/dsort.go | 40 +-- ext/dsort/dsort_general.go | 18 +- ext/dsort/dsort_mem.go | 27 +- ext/dsort/dsort_test.go | 522 -------------------------------- ext/dsort/handler.go | 97 +----- ext/dsort/manager.go | 136 ++++----- ext/dsort/manager_group.go | 4 - ext/dsort/manager_group_test.go | 153 ---------- ext/dsort/manager_test.go | 283 ----------------- ext/dsort/mem_watcher.go | 2 +- fs/content.go | 11 +- fs/fqn_test.go | 8 +- fs/walk_test.go | 4 +- mirror/utils_test.go | 4 +- space/space_test.go | 4 +- tools/client.go | 2 +- tools/file.go | 8 +- 26 files changed, 240 insertions(+), 1195 deletions(-) create mode 100644 ext/dsort/bcast.go delete mode 100644 ext/dsort/dsort_test.go delete mode 100644 ext/dsort/manager_group_test.go delete mode 100644 ext/dsort/manager_test.go diff --git a/ais/proxy.go b/ais/proxy.go index 472a865ef4..fb700fe5c9 100644 --- a/ais/proxy.go +++ b/ais/proxy.go @@ -245,7 +245,8 @@ func (p *proxy) Run() error { nlog.Infof("%s: [%s net] listening on: %s", p, cmn.NetIntraData, p.si.DataNet.URL) } - dsort.RegisterNode(p.owner.smap, p.owner.bmd, p.si, nil, p.statsT) + dsort.Pinit(p) + return p.htrun.run() } diff --git a/ais/target.go b/ais/target.go index 2f79c173e1..16ea9cc86d 100644 --- a/ais/target.go +++ b/ais/target.go @@ -302,12 +302,8 @@ func (t *target) Run() error { } // register object type and workfile type - if err := fs.CSM.Reg(fs.ObjectType, &fs.ObjectContentResolver{}); err != nil { - cos.ExitLog(err) - } - if err := fs.CSM.Reg(fs.WorkfileType, &fs.WorkfileContentResolver{}); err != nil { - cos.ExitLog(err) - } + fs.CSM.Reg(fs.ObjectType, &fs.ObjectContentResolver{}) + fs.CSM.Reg(fs.WorkfileType, &fs.WorkfileContentResolver{}) // Init meta-owners and load local instances if prev := t.owner.bmd.init(); prev { @@ -372,8 +368,7 @@ func (t *target) Run() error { go t.goreslver(marked.Interrupted) } - dsort.InitManagers(db) - dsort.RegisterNode(t.owner.smap, t.owner.bmd, t.si, t, t.statsT) + dsort.Tinit(t, t.statsT, db) err = t.htrun.run() diff --git a/ais/test/common.go b/ais/test/common.go index 151de93d52..8c623bfa4f 100644 --- a/ais/test/common.go +++ b/ais/test/common.go @@ -1032,10 +1032,10 @@ func initFS() { config.Backend = cfg.Backend cmn.GCO.CommitUpdate(config) - _ = fs.CSM.Reg(fs.ObjectType, &fs.ObjectContentResolver{}) - _ = fs.CSM.Reg(fs.WorkfileType, &fs.WorkfileContentResolver{}) - _ = fs.CSM.Reg(fs.ECSliceType, &fs.ECSliceContentResolver{}) - _ = fs.CSM.Reg(fs.ECMetaType, &fs.ECMetaContentResolver{}) + fs.CSM.Reg(fs.ObjectType, &fs.ObjectContentResolver{}) + fs.CSM.Reg(fs.WorkfileType, &fs.WorkfileContentResolver{}) + fs.CSM.Reg(fs.ECSliceType, &fs.ECSliceContentResolver{}) + fs.CSM.Reg(fs.ECMetaType, &fs.ECMetaContentResolver{}) } func initMountpaths(t *testing.T, proxyURL string) { diff --git a/ais/test/dsort_test.go b/ais/test/dsort_test.go index d384eb04a8..4840ec225f 100644 --- a/ais/test/dsort_test.go +++ b/ais/test/dsort_test.go @@ -1364,7 +1364,6 @@ func TestDsortAbort(t *testing.T) { err = api.AbortDSort(df.baseParams, df.managerUUID) tassert.CheckFatal(t, err) - tlog.Logln("waiting for dsort to finish") _, err = tools.WaitForDSortToFinish(m.proxyURL, df.managerUUID) tassert.CheckFatal(t, err) diff --git a/ais/tgtobj_test.go b/ais/tgtobj_test.go index 933580ab3c..bad4940b7f 100644 --- a/ais/tgtobj_test.go +++ b/ais/tgtobj_test.go @@ -59,8 +59,8 @@ func TestMain(m *testing.M) { defer os.RemoveAll(testMountpath) fs.TestNew(nil) fs.TestDisableValidation() - _ = fs.CSM.Reg(fs.ObjectType, &fs.ObjectContentResolver{}) - _ = fs.CSM.Reg(fs.WorkfileType, &fs.WorkfileContentResolver{}) + fs.CSM.Reg(fs.ObjectType, &fs.ObjectContentResolver{}, true) + fs.CSM.Reg(fs.WorkfileType, &fs.WorkfileContentResolver{}, true) // target config := cmn.GCO.Get() diff --git a/cluster/lom_test.go b/cluster/lom_test.go index 047d4f0110..307f234c55 100644 --- a/cluster/lom_test.go +++ b/cluster/lom_test.go @@ -70,8 +70,8 @@ var _ = Describe("LOM", func() { _, _ = fs.Add(mpath, "daeID") } - _ = fs.CSM.Reg(fs.ObjectType, &fs.ObjectContentResolver{}) - _ = fs.CSM.Reg(fs.WorkfileType, &fs.WorkfileContentResolver{}) + fs.CSM.Reg(fs.ObjectType, &fs.ObjectContentResolver{}, true) + fs.CSM.Reg(fs.WorkfileType, &fs.WorkfileContentResolver{}, true) bmd := mock.NewBaseBownerMock( meta.NewBck( diff --git a/cluster/lom_xattr_test.go b/cluster/lom_xattr_test.go index 869c265bde..5b94666753 100644 --- a/cluster/lom_xattr_test.go +++ b/cluster/lom_xattr_test.go @@ -31,8 +31,8 @@ var _ = Describe("LOM Xattributes", func() { localBck := cmn.Bck{Name: bucketLocal, Provider: apc.AIS, Ns: cmn.NsGlobal} cachedBck := cmn.Bck{Name: bucketCached, Provider: apc.AIS, Ns: cmn.NsGlobal} - _ = fs.CSM.Reg(fs.ObjectType, &fs.ObjectContentResolver{}) - _ = fs.CSM.Reg(fs.WorkfileType, &fs.WorkfileContentResolver{}) + fs.CSM.Reg(fs.ObjectType, &fs.ObjectContentResolver{}, true) + fs.CSM.Reg(fs.WorkfileType, &fs.WorkfileContentResolver{}, true) var ( copyMpathInfo *fs.Mountpath diff --git a/ext/dsort/bcast.go b/ext/dsort/bcast.go new file mode 100644 index 0000000000..0808ff140d --- /dev/null +++ b/ext/dsort/bcast.go @@ -0,0 +1,73 @@ +// Package dsort provides distributed massively parallel resharding for very large datasets. +/* + * Copyright (c) 2018-2023, NVIDIA CORPORATION. All rights reserved. + */ +package dsort + +import ( + "io" + "net/http" + "net/url" + "sync" + + "github.com/NVIDIA/aistore/cluster/meta" + "github.com/NVIDIA/aistore/cmn" + "github.com/NVIDIA/aistore/cmn/cos" +) + +// +// common code used by proxy _and_ target, both +// + +var bcastClient *http.Client + +func bcast(method, path string, urlParams url.Values, body []byte, smap *meta.Smap, ignore ...*meta.Snode) []response { + var ( + idx int + responses = make([]response, smap.CountActiveTs()) + wg = &sync.WaitGroup{} + ) +outer: + for _, node := range smap.Tmap { + if smap.InMaintOrDecomm(node) { + continue + } + for _, ignoreNode := range ignore { + if ignoreNode.Equals(node) { + continue outer + } + } + reqArgs := cmn.HreqArgs{ + Method: method, + Base: node.URL(cmn.NetIntraControl), + Path: path, + Query: urlParams, + Body: body, + } + wg.Add(1) + go func(si *meta.Snode, args *cmn.HreqArgs, i int) { + responses[i] = call(args) + responses[i].si = si + wg.Done() + }(node, &reqArgs, idx) + idx++ + } + wg.Wait() + + return responses[:idx] +} + +func call(reqArgs *cmn.HreqArgs) response { + req, err := reqArgs.Req() + if err != nil { + return response{err: err, statusCode: http.StatusInternalServerError} + } + + resp, err := bcastClient.Do(req) //nolint:bodyclose // Closed inside `cos.Close`. + if err != nil { + return response{err: err, statusCode: http.StatusInternalServerError} + } + out, err := io.ReadAll(resp.Body) + cos.Close(resp.Body) + return response{res: out, err: err, statusCode: resp.StatusCode} +} diff --git a/ext/dsort/conc_adjuster_test.go b/ext/dsort/conc_adjuster_test.go index e7dc960a04..f346767d82 100644 --- a/ext/dsort/conc_adjuster_test.go +++ b/ext/dsort/conc_adjuster_test.go @@ -18,6 +18,10 @@ import ( . "github.com/onsi/gomega" ) +const ( + testingConfigDir = "/tmp/ais_tests" +) + func calcSemaLimit(acquire, release func()) int { var i atomic.Int32 wg := &sync.WaitGroup{} diff --git a/ext/dsort/dsort.go b/ext/dsort/dsort.go index f21cbfd6e7..364bfcca21 100644 --- a/ext/dsort/dsort.go +++ b/ext/dsort/dsort.go @@ -72,7 +72,7 @@ var js = jsoniter.ConfigFastest func (m *Manager) finish() { if m.config.FastV(4, cos.SmoduleDsort) { - nlog.Infof("%s: %s finished", m.ctx.t, m.ManagerUUID) + nlog.Infof("%s: %s finished", g.t, m.ManagerUUID) } m.lock() m.setInProgressTo(false) @@ -92,7 +92,7 @@ func (m *Manager) start() (err error) { } // Phase 1. - nlog.Infof("%s: %s started extraction stage", m.ctx.t, m.ManagerUUID) + nlog.Infof("%s: %s started extraction stage", g.t, m.ManagerUUID) if err := m.extractLocalShards(); err != nil { return err } @@ -100,12 +100,12 @@ func (m *Manager) start() (err error) { s := binary.BigEndian.Uint64(m.pars.TargetOrderSalt) targetOrder := randomTargetOrder(s, m.smap.Tmap) if m.config.FastV(4, cos.SmoduleDsort) { - nlog.Infof("%s: %s final target in targetOrder => URL: %s, tid %s", m.ctx.t, m.ManagerUUID, + nlog.Infof("%s: %s final target in targetOrder => URL: %s, tid %s", g.t, m.ManagerUUID, targetOrder[len(targetOrder)-1].PubNet.URL, targetOrder[len(targetOrder)-1].ID()) } // Phase 2. - nlog.Infof("%s: %s started sort stage", m.ctx.t, m.ManagerUUID) + nlog.Infof("%s: %s started sort stage", g.t, m.ManagerUUID) curTargetIsFinal, err := m.participateInRecordDistribution(targetOrder) if err != nil { return err @@ -120,7 +120,7 @@ func (m *Manager) start() (err error) { shardSize := int64(float64(m.pars.OutputShardSize) / ratio) - nlog.Infof("%s: %s started phase 3 distribution", m.ctx.t, m.ManagerUUID) + nlog.Infof("%s: %s started phase 3 distribution", g.t, m.ManagerUUID) if err := m.phase3(shardSize); err != nil { return err } @@ -139,12 +139,12 @@ func (m *Manager) start() (err error) { // After each target participates in the cluster-wide record distribution, // start listening for the signal to start creating shards locally. - nlog.Infof("%s: %s started creation stage", m.ctx.t, m.ManagerUUID) + nlog.Infof("%s: %s started creation stage", g.t, m.ManagerUUID) if err := m.dsorter.createShardsLocally(); err != nil { return err } - nlog.Infof("%s: %s finished successfully", m.ctx.t, m.ManagerUUID) + nlog.Infof("%s: %s finished successfully", g.t, m.ManagerUUID) return nil } @@ -153,7 +153,7 @@ func (m *Manager) startDSorter() error { if err := m.initStreams(); err != nil { return err } - nlog.Infof("%s: %s starting with dsorter: %q", m.ctx.t, m.ManagerUUID, m.dsorter.name()) + nlog.Infof("%s: %s starting with dsorter: %q", g.t, m.ManagerUUID, m.dsorter.name()) return m.dsorter.start() } @@ -281,7 +281,7 @@ func (m *Manager) createShard(s *shard.Shard, lom *cluster.LOM) (err error) { // TODO: params.Xact - in part, to count PUTs and bytes in a generic fashion // (vs metrics.ShardCreationStats.updateThroughput - see below) } - err = m.ctx.t.PutObject(lom, params) + err = g.t.PutObject(lom, params) cluster.FreePutObjParams(params) if err == nil { n = lom.SizeBytes() @@ -296,7 +296,7 @@ func (m *Manager) createShard(s *shard.Shard, lom *cluster.LOM) (err error) { ec := m.ec if m.pars.InputExtension != m.pars.OutputExtension { // NOTE: resharding into a different format - ec = newExtractCreator(m.ctx.t, m.pars.OutputExtension) + ec = newExtractCreator(g.t, m.pars.OutputExtension) } _, err = ec.Create(s, w, m.dsorter) @@ -334,7 +334,7 @@ func (m *Manager) createShard(s *shard.Shard, lom *cluster.LOM) (err error) { // according to HRW, send it there. Since it doesn't really matter // if we have an extra copy of the object local to this target, we // optimize for performance by not removing the object now. - if si.ID() != m.ctx.node.ID() && !m.pars.DryRun { + if si.ID() != g.t.SID() && !m.pars.DryRun { lom.Lock(false) defer lom.Unlock(false) @@ -380,7 +380,7 @@ func (m *Manager) createShard(s *shard.Shard, lom *cluster.LOM) (err error) { exit: metrics.mu.Lock() metrics.CreatedCnt++ - if si.ID() != m.ctx.node.ID() { + if si.ID() != g.t.SID() { metrics.MovedShardCnt++ } if m.Metrics.extended { @@ -434,7 +434,7 @@ func (m *Manager) participateInRecordDistribution(targetOrder meta.Nodes) (curre } for i, d = range targetOrder { - if d != dummyTarget && d.ID() == m.ctx.node.ID() { + if d != dummyTarget && d.ID() == g.t.SID() { break } } @@ -449,7 +449,7 @@ func (m *Manager) participateInRecordDistribution(targetOrder meta.Nodes) (curre ) group.Go(func() error { var ( - buf, slab = mm.AllocSize(serializationBufSize) + buf, slab = g.mm.AllocSize(serializationBufSize) msgpw = msgp.NewWriterBuf(w, buf) ) defer slab.Free(buf) @@ -610,7 +610,7 @@ func (m *Manager) generateShardsWithOrderingFile(maxSize int64) ([]*shard.Shard, return nil, err } // is intra-call - tsi := m.ctx.t.Snode() + tsi := g.t.Snode() req.Header.Set(apc.HdrCallerID, tsi.ID()) req.Header.Set(apc.HdrCallerName, tsi.String()) @@ -746,7 +746,7 @@ func (m *Manager) phase3(maxSize int64) error { } bck := meta.CloneBck(&m.pars.OutputBck) - if err := bck.Init(m.ctx.bmdOwner); err != nil { + if err := bck.Init(g.t.Bowner()); err != nil { return err } for _, s := range shards { @@ -790,7 +790,7 @@ func (m *Manager) phase3(maxSize int64) error { for err := range errCh { return errors.Errorf("error while sending shards: %v", err) } - nlog.Infof("%s: %s finished sending all shards", m.ctx.t, m.ManagerUUID) + nlog.Infof("%s: %s finished sending all shards", g.t, m.ManagerUUID) return nil } @@ -801,7 +801,7 @@ func (m *Manager) _dist(si *meta.Snode, s []*shard.Shard, order map[string]*shar ) group.Go(func() error { var ( - buf, slab = mm.AllocSize(serializationBufSize) + buf, slab = g.mm.AllocSize(serializationBufSize) msgpw = msgp.NewWriterBuf(w, buf) md = &CreationPhaseMetadata{Shards: s, SendOrder: order} ) @@ -873,7 +873,7 @@ func (es *extractShard) do() (err error) { if errV == nil { if !archive.EqExt(ext, m.pars.InputExtension) { if m.config.FastV(4, cos.SmoduleDsort) { - nlog.Infof("%s: %s skipping %s: %q vs %q", m.ctx.t, m.ManagerUUID, + nlog.Infof("%s: %s skipping %s: %q vs %q", g.t, m.ManagerUUID, es.name, ext, m.pars.InputExtension) } return @@ -919,7 +919,7 @@ func (es *extractShard) _do(lom *cluster.LOM) error { return nil // skip } // NOTE: extract-creator for _this_ shard (compare with createShard above) - ec = newExtractCreator(m.ctx.t, ext) + ec = newExtractCreator(g.t, ext) } phaseInfo := &m.extractionPhase diff --git a/ext/dsort/dsort_general.go b/ext/dsort/dsort_general.go index 5bfca89257..c14b4782a8 100644 --- a/ext/dsort/dsort_general.go +++ b/ext/dsort/dsort_general.go @@ -154,15 +154,15 @@ func (ds *dsorterGeneral) start() error { Extra: &transport.Extra{ Compression: config.DSort.Compression, Config: config, - MMSA: mm, + MMSA: g.mm, }, } if err := transport.HandleObjStream(trname, ds.recvResp); err != nil { return errors.WithStack(err) } - ds.streams.request = bundle.New(ds.m.ctx.smapOwner, ds.m.ctx.node, client, reqSbArgs) - ds.streams.response = bundle.New(ds.m.ctx.smapOwner, ds.m.ctx.node, client, respSbArgs) + ds.streams.request = bundle.New(g.t.Sowner(), g.t.Snode(), client, reqSbArgs) + ds.streams.response = bundle.New(g.t.Sowner(), g.t.Snode(), client, respSbArgs) // start watching memory return ds.mw.watch() @@ -264,7 +264,7 @@ func (ds *dsorterGeneral) Load(w io.Writer, rec *shard.Record, obj *shard.Record if ds.m.aborted() { return 0, newDSortAbortedError(ds.m.ManagerUUID) } - if rec.DaemonID != ds.m.ctx.node.ID() { + if rec.DaemonID != g.t.SID() { return ds.loadRemote(w, rec, obj) } return ds.loadLocal(w, obj) @@ -278,7 +278,7 @@ func (ds *dsorterGeneral) loadLocal(w io.Writer, obj *shard.RecordObj) (written ) if storeType != shard.SGLStoreType { // SGL does not need buffer as it is buffer itself - buf, slab = mm.AllocSize(obj.Size) + buf, slab = g.mm.AllocSize(obj.Size) } defer func() { @@ -379,7 +379,7 @@ func (ds *dsorterGeneral) loadRemote(w io.Writer, rec *shard.Record, obj *shard. metrics.RequestStats.updateTime(delta) metrics.mu.Unlock() - ds.m.ctx.stats.AddMany( + g.tstats.AddMany( cos.NamedVal64{Name: stats.DSortCreationReqCount, Value: 1}, cos.NamedVal64{Name: stats.DSortCreationReqLatency, Value: int64(delta)}, ) @@ -422,7 +422,7 @@ func (ds *dsorterGeneral) loadRemote(w io.Writer, rec *shard.Record, obj *shard. metrics.ResponseStats.updateTime(delta) metrics.mu.Unlock() - ds.m.ctx.stats.AddMany( + g.tstats.AddMany( cos.NamedVal64{Name: stats.DSortCreationRespCount, Value: 1}, cos.NamedVal64{Name: stats.DSortCreationRespLatency, Value: int64(delta)}, ) @@ -440,7 +440,7 @@ func (ds *dsorterGeneral) loadRemote(w io.Writer, rec *shard.Record, obj *shard. err = cmn.NewErrAborted("wait for remote content", "", nil) case timed: err = errors.Errorf("wait for remote content has timed out (%q was waiting for %q)", - ds.m.ctx.node.ID(), daemonID) + g.t.SID(), daemonID) default: debug.Assert(false, "pulled but not stopped or timed?") } @@ -601,7 +601,7 @@ func (ds *dsorterGeneral) recvResp(hdr transport.ObjHdr, object io.Reader, err e beforeSend = mono.NanoTime() } - buf, slab := mm.AllocSize(hdr.ObjAttrs.Size) + buf, slab := g.mm.AllocSize(hdr.ObjAttrs.Size) writer.n, writer.err = io.CopyBuffer(writer.w, object, buf) writer.wg.Done() slab.Free(buf) diff --git a/ext/dsort/dsort_mem.go b/ext/dsort/dsort_mem.go index b1129c7950..0b2088df0b 100644 --- a/ext/dsort/dsort_mem.go +++ b/ext/dsort/dsort_mem.go @@ -138,7 +138,7 @@ func (c *rwConnector) connectReader(key string, r io.Reader, size int64) (err er c.mu.Unlock() if !all { - rw.sgl = mm.NewSGL(size) + rw.sgl = g.mm.NewSGL(size) _, err = io.Copy(rw.sgl, r) rw.wgr.Done() return @@ -157,9 +157,10 @@ func (c *rwConnector) connectWriter(key string, w io.Writer) (int64, error) { timed, stopped := rw.wgr.WaitTimeoutWithStop(c.m.callTimeout, c.m.listenAborted()) // wait for reader if timed { - return 0, errors.Errorf("wait for remote content has timed out (%q was waiting)", c.m.ctx.node.ID()) - } else if stopped { - return 0, errors.Errorf("wait for remote content was aborted") + return 0, errors.Errorf("%s: timed out waiting for remote content", g.t) + } + if stopped { + return 0, errors.Errorf("%s: aborted waiting for remote content", g.t) } if all { // reader connected and left SGL with the content @@ -228,15 +229,15 @@ func (ds *dsorterMem) start() error { Extra: &transport.Extra{ Compression: config.DSort.Compression, Config: config, - MMSA: mm, + MMSA: g.mm, }, } if err := transport.HandleObjStream(trname, ds.recvResp); err != nil { return errors.WithStack(err) } - ds.streams.builder = bundle.New(ds.m.ctx.smapOwner, ds.m.ctx.node, client, reqSbArgs) - ds.streams.records = bundle.New(ds.m.ctx.smapOwner, ds.m.ctx.node, client, respSbArgs) + ds.streams.builder = bundle.New(g.t.Sowner(), g.t.Snode(), client, reqSbArgs) + ds.streams.records = bundle.New(g.t.Sowner(), g.t.Snode(), client, respSbArgs) return nil } @@ -287,8 +288,8 @@ func (ds *dsorterMem) preShardCreation(shardName string, mi *fs.Mountpath) error shardName: shardName, } o := transport.AllocSend() - o.Hdr.Opaque = bsi.NewPack(ds.m.ctx.t.ByteMM()) - if ds.m.smap.HasActiveTs(ds.m.ctx.t.SID() /*except*/) { + o.Hdr.Opaque = bsi.NewPack(g.t.ByteMM()) + if ds.m.smap.HasActiveTs(g.t.SID() /*except*/) { if err := ds.streams.builder.Send(o, nil); err != nil { return err } @@ -444,7 +445,7 @@ outer: func (ds *dsorterMem) sendRecordObj(rec *shard.Record, obj *shard.RecordObj, toNode *meta.Snode) (err error) { var ( - local = toNode.ID() == ds.m.ctx.node.ID() + local = toNode.ID() == g.t.SID() req = RemoteResponse{ Record: rec, RecordObj: obj, @@ -467,7 +468,7 @@ func (ds *dsorterMem) sendRecordObj(rec *shard.Record, obj *shard.RecordObj, toN beforeSend = mono.NanoTime() } - debug.Assert(ds.m.ctx.node.ID() == rec.DaemonID, ds.m.ctx.node.ID()+" vs "+rec.DaemonID) + debug.Assert(g.t.SID() == rec.DaemonID, g.t.SID()+" vs "+rec.DaemonID) if local { defer ds.m.decrementRef(1) @@ -639,10 +640,10 @@ func (es *dsmExtractShard) do() error { defer ds.creationPhase.adjuster.read.releaseGoroutineSema() bck := meta.NewBck(ds.m.pars.OutputBck.Name, ds.m.pars.OutputBck.Provider, cmn.NsGlobal) - if err := bck.Init(ds.m.ctx.bmdOwner); err != nil { + if err := bck.Init(g.t.Bowner()); err != nil { return err } - toNode, err := cluster.HrwTarget(bck.MakeUname(shard.Name), ds.m.ctx.smapOwner.Get()) + toNode, err := cluster.HrwTarget(bck.MakeUname(shard.Name), g.t.Sowner().Get()) if err != nil { return err } diff --git a/ext/dsort/dsort_test.go b/ext/dsort/dsort_test.go deleted file mode 100644 index 90cfd88d57..0000000000 --- a/ext/dsort/dsort_test.go +++ /dev/null @@ -1,522 +0,0 @@ -// Package dsort provides APIs for distributed archive file shuffling. -/* - * Copyright (c) 2018-2023, NVIDIA CORPORATION. All rights reserved. - */ -package dsort - -import ( - "fmt" - "io" - "net" - "net/http" - "os" - "sync" - "time" - - "github.com/NVIDIA/aistore/3rdparty/golang/mux" - "github.com/NVIDIA/aistore/api/apc" - "github.com/NVIDIA/aistore/cluster" - "github.com/NVIDIA/aistore/cluster/meta" - "github.com/NVIDIA/aistore/cluster/mock" - "github.com/NVIDIA/aistore/cmn" - "github.com/NVIDIA/aistore/cmn/archive" - "github.com/NVIDIA/aistore/cmn/cos" - "github.com/NVIDIA/aistore/ext/dsort/shard" - "github.com/NVIDIA/aistore/fs" - "github.com/NVIDIA/aistore/memsys" - "github.com/NVIDIA/aistore/stats" - "github.com/NVIDIA/aistore/transport" - . "github.com/onsi/ginkgo" - . "github.com/onsi/ginkgo/extensions/table" - . "github.com/onsi/gomega" -) - -const ( - testIP = "127.0.0.1" - testDir = "/tmp/" + DSortName + "_tests" - testBucket = DSortName + "_tests" - globalManagerUUID = "6ba7b810-9dad-11d1-80b4-00c04fd430c8" -) - -// interface guard -var ( - _ shard.Creator = (*extractCreatorMock)(nil) - _ meta.SmapListeners = (*testSmapListeners)(nil) -) - -// -// MISC FUNCTIONS -// - -func getFreePorts(count int) ([]int, error) { - ports := make([]int, 0, count) - for i := 0; i < count; i++ { - addr, err := net.ResolveTCPAddr("tcp", "localhost:0") - if err != nil { - return nil, err - } - - l, err := net.ListenTCP("tcp", addr) - if err != nil { - return nil, err - } - defer l.Close() - ports = append(ports, l.Addr().(*net.TCPAddr).Port) - } - return ports, nil -} - -// -// TEST SMAP -// - -type testSmapListeners struct { - sync.RWMutex -} - -func (*testSmapListeners) Reg(meta.Slistener) {} -func (*testSmapListeners) Unreg(meta.Slistener) {} - -type testSmap struct { - *meta.Smap - a *testSmapListeners -} - -func newTestSmap(targets ...string) *testSmap { - smap := &testSmap{ - &meta.Smap{}, - &testSmapListeners{}, - } - smap.Tmap = make(meta.NodeMap) - for _, target := range targets { - smap.Tmap[target] = &meta.Snode{} - } - return smap -} - -func (tm *testSmap) addTarget(si *meta.Snode) { - tm.Tmap[si.ID()] = si -} - -func (tm *testSmap) Get() *meta.Smap { - return tm.Smap -} - -func (tm *testSmap) Listeners() meta.SmapListeners { - return tm.a -} - -// -// MOCKs -// - -type extractCreatorMock struct { - createShard func(s *shard.Shard, w io.Writer, loader shard.ContentLoader) // func to hijack CreateShard function -} - -func (*extractCreatorMock) Extract(*cluster.LOM, cos.ReadReaderAt, shard.RecordExtractor, bool) (int64, int, error) { - return 0, 0, nil -} - -func (ec *extractCreatorMock) Create(s *shard.Shard, w io.Writer, loader shard.ContentLoader) (int64, error) { - ec.createShard(s, w, loader) - return 0, nil -} - -func (*extractCreatorMock) SupportsOffset() bool { return true } -func (*extractCreatorMock) MetadataSize() int64 { return 0 } - -type targetNodeMock struct { - daemonID string - mux *mux.ServeMux - s *http.Server - controlCh chan error - managers *ManagerGroup -} - -type testContext struct { - targetCnt int - targets []*targetNodeMock - smap *testSmap - errCh chan error - wg *sync.WaitGroup -} - -func newTargetMock(daemonID string, smap *testSmap) *targetNodeMock { - // Initialize dsort manager - rs := &parsedReqSpec{ - InputExtension: archive.ExtTar, - Algorithm: &Algorithm{ - ContentKeyType: shard.ContentKeyString, - }, - MaxMemUsage: cos.ParsedQuantity{Type: cos.QuantityPercent, Value: 0}, - DSorterType: DSorterGeneralType, - } - - db := mock.NewDBDriver() - dsortManagers := NewManagerGroup(db, true /*skip hk*/) - dsortManager, err := dsortManagers.Add(globalManagerUUID) - Expect(err).ShouldNot(HaveOccurred()) - ctx.node = smap.Get().Tmap[daemonID] - dsortManager.init(rs) - dsortManager.unlock() - - net := smap.GetTarget(daemonID).PubNet - return &targetNodeMock{ - daemonID: daemonID, - s: &http.Server{ - Addr: fmt.Sprintf("%s:%s", net.Hostname, net.Port), - Handler: http.NewServeMux(), - }, - controlCh: make(chan error, 1), - managers: dsortManagers, - } -} - -func (t *targetNodeMock) setHandlers(handlers map[string]http.HandlerFunc) { - mux := mux.NewServeMux() - for path, handler := range handlers { - mux.HandleFunc(path, handler) - } - - t.mux = mux - t.s.Handler = mux -} - -func (t *targetNodeMock) setup() { - // set default handlers - defaultHandlers := map[string]http.HandlerFunc{ - apc.URLPathdSortRecords.S + "/": func(w http.ResponseWriter, r *http.Request) { - manager, _ := t.managers.Get(globalManagerUUID) - manager.incrementReceived() - }, - } - t.setHandlers(defaultHandlers) - - go func() { - t.controlCh <- t.s.ListenAndServe() - }() -} - -func (t *targetNodeMock) beforeTeardown() { - manager, _ := t.managers.Get(globalManagerUUID) - manager.lock() - manager.setInProgressTo(false) - manager.unlock() - manager.cleanup() -} - -func (t *targetNodeMock) teardown() { - t.s.Close() - Expect(<-t.controlCh).To(Equal(http.ErrServerClosed)) -} - -func (tctx *testContext) setup() { - tctx.errCh = make(chan error, tctx.targetCnt) - tctx.wg = &sync.WaitGroup{} - - mm = memsys.PageMM() - - fs.TestNew(nil) - err := cos.CreateDir(testDir) - Expect(err).NotTo(HaveOccurred()) - _, err = fs.Add(testDir, "daeID") - Expect(err).NotTo(HaveOccurred()) - - fs.CSM.Reg(fs.ObjectType, &fs.ObjectContentResolver{}) - - config := cmn.GCO.BeginUpdate() - config.HostNet.UseIntraControl = false - config.HostNet.UseIntraData = false - config.Disk.IostatTimeShort = cos.Duration(10 * time.Millisecond) - cmn.GCO.CommitUpdate(config) - - genNodeID := func(i int) string { - return fmt.Sprintf("target:%d", i) - } - - // Initialize smap. - smap := newTestSmap() - ports, err := getFreePorts(tctx.targetCnt) - Expect(err).ShouldNot(HaveOccurred()) - for i := 0; i < tctx.targetCnt; i++ { - targetPort := ports[i] - ni := meta.NetInfo{ - Hostname: testIP, - Port: fmt.Sprintf("%d", targetPort), - URL: fmt.Sprintf("http://%s:%d", testIP, targetPort), - } - di := &meta.Snode{ - DaeID: genNodeID(i), - PubNet: ni, - ControlNet: ni, - DataNet: ni, - } - smap.addTarget(di) - } - smap.InitDigests() - - // Create and setup target mocks. - targets := make([]*targetNodeMock, tctx.targetCnt) - for i := 0; i < tctx.targetCnt; i++ { - target := newTargetMock(genNodeID(i), smap) - target.setup() - targets[i] = target - } - - // Wait for all targets to start servers. Without this sleep it may happen - // that we close server faster than it starts and many bad things can happen. - time.Sleep(time.Millisecond * 100) - - ctx.smapOwner = smap - - // Initialize BMD owner. - bmdMock := mock.NewBaseBownerMock( - meta.NewBck( - testBucket, apc.AIS, cmn.NsGlobal, - &cmn.BucketProps{Cksum: cmn.CksumConf{Type: cos.ChecksumXXHash}}, - ), - ) - ctx.t = mock.NewTarget(bmdMock) - - tctx.smap = smap - tctx.targets = targets -} - -func (tctx *testContext) teardown() { - for _, target := range tctx.targets { - target.beforeTeardown() - } - for _, target := range tctx.targets { - target.teardown() - } - - os.RemoveAll(testDir) -} - -var _ = Describe("Distributed Sort", func() { - config := cmn.GCO.BeginUpdate() - config.Transport.MaxHeaderSize = memsys.PageSize - config.Log.Level = "3" - cmn.GCO.CommitUpdate(config) - sc := transport.Init(&stats.Trunner{}, config) - go sc.Run() - - Describe("participateInRecordDistribution", func() { - Describe("Simple smoke tests", func() { - runSmokeRecordDistribution := func(targetCnt int) { - ctx := &testContext{ - targetCnt: targetCnt, - } - ctx.setup() - defer ctx.teardown() - - for _, target := range ctx.targets { - ctx.wg.Add(1) - go func(target *targetNodeMock) { - defer ctx.wg.Done() - - targetOrder := randomTargetOrder(1, ctx.smap.Tmap) - finalTarget := targetOrder[len(targetOrder)-1] - manager, exists := target.managers.Get(globalManagerUUID) - Expect(exists).To(BeTrue()) - isFinal, err := manager.participateInRecordDistribution(targetOrder) - if err != nil { - ctx.errCh <- err - return - } - - if target.daemonID == finalTarget.ID() { - if !isFinal { - ctx.errCh <- fmt.Errorf("last target %q is not final", finalTarget.ID()) - return - } - } else { - if isFinal { - ctx.errCh <- fmt.Errorf("non-last %q target is final %q", target.daemonID, finalTarget.ID()) - return - } - } - }(target) - } - - ctx.wg.Wait() - close(ctx.errCh) - for err := range ctx.errCh { - Expect(err).ShouldNot(HaveOccurred()) - } - } - - DescribeTable( - "testing with different number of targets", - runSmokeRecordDistribution, - Entry("should work with 0 targets", 0), - Entry("should work with 2 targets", 2), - Entry("should work with 8 targets", 8), - Entry("should work with 32 targets", 32), - Entry("should work with 1 target", 1), - Entry("should work with 3 targets", 3), - Entry("should work with 31 targets", 31), - Entry("should work with 100 targets", 100), - ) - - Context("Checking for SortedRecords", func() { - var tctx *testContext - - BeforeEach(func() { - tctx = &testContext{ - targetCnt: 10, - } - tctx.setup() - - for _, target := range tctx.targets { - handlers := map[string]http.HandlerFunc{ - apc.URLPathdSortRecords.S + "/": target.managers.recordsHandler, - } - target.setHandlers(handlers) - } - }) - - AfterEach(func() { - tctx.teardown() - }) - - createRecords := func(keys ...string) *shard.Records { - records := shard.NewRecords(len(keys)) - for _, key := range keys { - records.Insert(&shard.Record{ - Key: key, - Name: key, - }) - } - return records - } - - It("should report that final target has all the sorted records", func() { - srecordsCh := make(chan *shard.Records, 1) - for _, target := range tctx.targets { - manager, exists := target.managers.Get(globalManagerUUID) - Expect(exists).To(BeTrue()) - manager.lock() - manager.setInProgressTo(true) - manager.unlock() - } - - for _, target := range tctx.targets { - tctx.wg.Add(1) - go func(target *targetNodeMock) { - defer tctx.wg.Done() - - // For each target add sorted record - manager, exists := target.managers.Get(globalManagerUUID) - Expect(exists).To(BeTrue()) - manager.recm.Records = createRecords(target.daemonID) - - targetOrder := randomTargetOrder(1, tctx.smap.Tmap) - isFinal, err := manager.participateInRecordDistribution(targetOrder) - if err != nil { - tctx.errCh <- err - return - } - - if isFinal { - srecordsCh <- manager.recm.Records - } - }(target) - } - - tctx.wg.Wait() - close(tctx.errCh) - for err := range tctx.errCh { - Expect(err).ShouldNot(HaveOccurred()) - } - - // Get SortedRecrods - close(srecordsCh) - srecords := <-srecordsCh - Expect(srecords.Len()).To(Equal(len(tctx.targets))) - - // Created expected slice of records - keys := make([]string, len(tctx.targets)) - for idx, target := range tctx.targets { - keys[idx] = target.daemonID - } - - expectedRecords := createRecords(keys...) - Expect(srecords.All()).Should(ConsistOf(expectedRecords.All())) - }) - - It("should report that final target has all the records sorted in decreasing order", func() { - srecordsCh := make(chan *shard.Records, 1) - for _, target := range tctx.targets { - manager, exists := target.managers.Get(globalManagerUUID) - Expect(exists).To(BeTrue()) - - rs := &parsedReqSpec{ - Algorithm: &Algorithm{ - Decreasing: true, - ContentKeyType: shard.ContentKeyString, - }, - InputExtension: archive.ExtTar, - MaxMemUsage: cos.ParsedQuantity{Type: cos.QuantityPercent, Value: 0}, - DSorterType: DSorterGeneralType, - } - ctx.node = ctx.smapOwner.Get().Tmap[target.daemonID] - manager.lock() - err := manager.init(rs) - manager.unlock() - if err != nil { - tctx.errCh <- err - return - } - - // For each target add sorted record - manager.recm.Records = createRecords(target.daemonID) - } - - for _, target := range tctx.targets { - tctx.wg.Add(1) - go func(target *targetNodeMock) { - manager, exists := target.managers.Get(globalManagerUUID) - Expect(exists).To(BeTrue()) - - defer tctx.wg.Done() - targetOrder := randomTargetOrder(1, tctx.smap.Tmap) - isFinal, err := manager.participateInRecordDistribution(targetOrder) - if err != nil { - tctx.errCh <- err - return - } - - if isFinal { - manager, exists := target.managers.Get(globalManagerUUID) - Expect(exists).To(BeTrue()) - srecordsCh <- manager.recm.Records - } - }(target) - } - - tctx.wg.Wait() - close(tctx.errCh) - for err := range tctx.errCh { - Expect(err).ShouldNot(HaveOccurred()) - } - - // Get SortedRecrods - close(srecordsCh) - srecords := <-srecordsCh - Expect(srecords.Len()).To(Equal(len(tctx.targets))) - - // Created expected slice of records - keys := make([]string, len(tctx.targets)) - for idx, target := range tctx.targets { - keys[idx] = target.daemonID - } - - expectedRecords := createRecords(keys...) - Expect(srecords.All()).Should(ConsistOf(expectedRecords.All())) - }) - }) - }) - }) -}) diff --git a/ext/dsort/handler.go b/ext/dsort/handler.go index 7a6a67503c..56e734f168 100644 --- a/ext/dsort/handler.go +++ b/ext/dsort/handler.go @@ -11,9 +11,9 @@ import ( "net/url" "regexp" "strconv" - "sync" "github.com/NVIDIA/aistore/api/apc" + "github.com/NVIDIA/aistore/cluster" "github.com/NVIDIA/aistore/cluster/meta" "github.com/NVIDIA/aistore/cmn" "github.com/NVIDIA/aistore/cmn/cos" @@ -38,6 +38,8 @@ type response struct { ///// PROXY ////// ////////////////// +var psi cluster.Node + // POST /v1/sort func PstartHandler(w http.ResponseWriter, r *http.Request, parsc *ParsedReq) { var ( @@ -66,7 +68,7 @@ func PstartHandler(w http.ResponseWriter, r *http.Request, parsc *ParsedReq) { var ( managerUUID = PrefixJobID + cos.GenUUID() // compare w/ p.httpdlpost - smap = ctx.smapOwner.Get() + smap = psi.Sowner().Get() ) // Starting dsort has two phases: @@ -147,7 +149,7 @@ func plistHandler(w http.ResponseWriter, r *http.Request, query url.Values) { return } } - responses := bcast(http.MethodGet, path, query, nil, ctx.smapOwner.Get()) + responses := bcast(http.MethodGet, path, query, nil, psi.Sowner().Get()) resultList := make([]*JobInfo, 0) for _, r := range responses { @@ -185,7 +187,7 @@ func plistHandler(w http.ResponseWriter, r *http.Request, query url.Values) { // GET /v1/sort?id=... func pmetricsHandler(w http.ResponseWriter, r *http.Request, query url.Values) { var ( - smap = ctx.smapOwner.Get() + smap = psi.Sowner().Get() allMetrics = make(map[string]*Metrics, smap.CountActiveTs()) managerUUID = query.Get(apc.QparamUUID) path = apc.URLPathdSortMetrics.Join(managerUUID) @@ -238,7 +240,7 @@ func PabortHandler(w http.ResponseWriter, r *http.Request) { query = r.URL.Query() managerUUID = query.Get(apc.QparamUUID) path = apc.URLPathdSortAbort.Join(managerUUID) - responses = bcast(http.MethodDelete, path, nil, nil, ctx.smapOwner.Get()) + responses = bcast(http.MethodDelete, path, nil, nil, psi.Sowner().Get()) ) allNotFound := true for _, resp := range responses { @@ -270,7 +272,7 @@ func PremoveHandler(w http.ResponseWriter, r *http.Request) { } var ( - smap = ctx.smapOwner.Get() + smap = psi.Sowner().Get() query = r.URL.Query() managerUUID = query.Get(apc.QparamUUID) path = apc.URLPathdSortMetrics.Join(managerUUID) @@ -341,7 +343,7 @@ func dsorterType(pars *parsedReqSpec) (string, error) { query := make(url.Values) query.Add(apc.QparamWhat, apc.WhatNodeStatsAndStatus) - responses := bcast(http.MethodGet, path, query, nil, ctx.smapOwner.Get()) + responses := bcast(http.MethodGet, path, query, nil, psi.Sowner().Get()) for _, response := range responses { if response.err != nil { return "", response.err @@ -364,7 +366,7 @@ func dsorterType(pars *parsedReqSpec) (string, error) { // // baseParams := &api.BaseParams{ // Client: http.DefaultClient, - // URL: ctx.smap.Get().Primary.URL(cmn.NetIntraControl), + // URL: g.smap.Get().Primary.URL(cmn.NetIntraControl), // } // msg := &apc.LsoMsg{Props: "size,status"} // objList, err := api.ListObjects(baseParams, pars.Bucket, msg, 0) @@ -505,8 +507,8 @@ func (m *Manager) startDSort() { } nlog.Infof("[dsort] %s broadcasting finished ack to other targets", m.ManagerUUID) - path := apc.URLPathdSortAck.Join(m.ManagerUUID, m.ctx.node.ID()) - bcast(http.MethodPut, path, nil, nil, ctx.smapOwner.Get(), ctx.node) + path := apc.URLPathdSortAck.Join(m.ManagerUUID, g.t.SID()) + bcast(http.MethodPut, path, nil, nil, g.t.Sowner().Get(), g.t.Snode()) } func (m *Manager) errHandler(err error) { @@ -526,7 +528,7 @@ func (m *Manager) errHandler(err error) { nlog.Warningln("broadcasting abort to other targets") path := apc.URLPathdSortAbort.Join(m.ManagerUUID) - bcast(http.MethodDelete, path, nil, nil, ctx.smapOwner.Get(), ctx.node) + bcast(http.MethodDelete, path, nil, nil, g.t.Sowner().Get(), g.t.Snode()) } } @@ -559,7 +561,7 @@ func (managers *ManagerGroup) shardsHandler(w http.ResponseWriter, r *http.Reque } var ( - buf, slab = mm.AllocSize(serializationBufSize) + buf, slab = g.mm.AllocSize(serializationBufSize) tmpMetadata = &CreationPhaseMetadata{} ) defer slab.Free(buf) @@ -630,7 +632,7 @@ func (managers *ManagerGroup) recordsHandler(w http.ResponseWriter, r *http.Requ } var ( - buf, slab = mm.AllocSize(serializationBufSize) + buf, slab = g.mm.AllocSize(serializationBufSize) records = shard.NewRecords(int(d)) ) defer slab.Free(buf) @@ -776,76 +778,9 @@ func tfiniHandler(w http.ResponseWriter, r *http.Request) { } // -// common: PROXY and TARGET +// http helpers // -func bcast(method, path string, urlParams url.Values, body []byte, smap *meta.Smap, ignore ...*meta.Snode) []response { - var ( - responses = make([]response, smap.CountActiveTs()) - wg = &sync.WaitGroup{} - ) - - call := func(idx int, node *meta.Snode) { - defer wg.Done() - - reqArgs := cmn.HreqArgs{ - Method: method, - Base: node.URL(cmn.NetIntraControl), - Path: path, - Query: urlParams, - Body: body, - } - req, err := reqArgs.Req() - if err != nil { - responses[idx] = response{ - si: node, - err: err, - statusCode: http.StatusInternalServerError, - } - return - } - - resp, err := ctx.client.Do(req) //nolint:bodyclose // Closed inside `cos.Close`. - if err != nil { - responses[idx] = response{ - si: node, - err: err, - statusCode: http.StatusInternalServerError, - } - return - } - out, err := io.ReadAll(resp.Body) - cos.Close(resp.Body) - - responses[idx] = response{ - si: node, - res: out, - err: err, - statusCode: resp.StatusCode, - } - } - - idx := 0 -outer: - for _, node := range smap.Tmap { - if smap.InMaintOrDecomm(node) { - continue - } - for _, ignoreNode := range ignore { - if ignoreNode.Equals(node) { - continue outer - } - } - - wg.Add(1) - go call(idx, node) - idx++ - } - wg.Wait() - - return responses[:idx] -} - func checkHTTPMethod(w http.ResponseWriter, r *http.Request, expected string) bool { if r.Method != expected { s := fmt.Sprintf("invalid method: %s to %s, should be %s", r.Method, r.URL.String(), expected) diff --git a/ext/dsort/manager.go b/ext/dsort/manager.go index e78722cabf..2914595eb6 100644 --- a/ext/dsort/manager.go +++ b/ext/dsort/manager.go @@ -19,6 +19,7 @@ import ( "github.com/NVIDIA/aistore/cmn/atomic" "github.com/NVIDIA/aistore/cmn/cos" "github.com/NVIDIA/aistore/cmn/debug" + "github.com/NVIDIA/aistore/cmn/kvdb" "github.com/NVIDIA/aistore/cmn/mono" "github.com/NVIDIA/aistore/cmn/nlog" "github.com/NVIDIA/aistore/ext/dsort/ct" @@ -51,26 +52,11 @@ const ( serializationBufSize = 10 * cos.MiB ) -var ( - ctx dsortContext - mm *memsys.MMSA -) - -// interface guard -var ( - _ meta.Slistener = (*Manager)(nil) - _ cos.Packer = (*buildingShardInfo)(nil) - _ cos.Unpacker = (*buildingShardInfo)(nil) -) - type ( - dsortContext struct { - smapOwner meta.Sowner - bmdOwner meta.Bowner - t cluster.Target // Set only on target. - stats stats.Tracker - node *meta.Snode - client *http.Client // Client for broadcast. + global struct { + t cluster.Target + tstats stats.Tracker + mm *memsys.MMSA } buildingShardInfo struct { @@ -92,40 +78,33 @@ type ( // Manager maintains all the state required for a single run of a distributed archive file shuffle. Manager struct { - // Fields with json tags are the only fields which are persisted - // into the disk once the dsort finishes. - ManagerUUID string `json:"manager_uuid"` - Metrics *Metrics `json:"metrics"` - - mg *ManagerGroup // parent - - mu sync.Mutex - ctx dsortContext - smap *meta.Smap - - recm *shard.RecordManager - ec shard.Creator - + // tagged fields are the only fields persisted once dsort finishes + ManagerUUID string `json:"manager_uuid"` + Metrics *Metrics `json:"metrics"` + mg *ManagerGroup // parent + mu sync.Mutex + smap *meta.Smap + recm *shard.RecordManager + ec shard.Creator startShardCreation chan struct{} pars *parsedReqSpec - - client *http.Client // Client for sending records metadata - compression struct { + client *http.Client // Client for sending records metadata + compression struct { totalShardSize atomic.Int64 totalExtractedSize atomic.Int64 } received struct { - count atomic.Int32 // Number of FileMeta slices received, defining what step in the sort a target is in. + count atomic.Int32 // Number of FileMeta slices received, defining what step in the sort target is in. ch chan int32 } - refCount atomic.Int64 // Reference counter used to determine if we can do cleanup. - inFlight atomic.Int64 // Reference counter that counts number of in-flight stream requests. + refCount atomic.Int64 // Refcount to cleanup. + inFlight atomic.Int64 // Refcount in-flight stream requests state progressState extractionPhase struct { adjuster *concAdjuster } streams struct { - shards *bundle.Streams // streams for pushing streams to other targets if the fqn is non-local + shards *bundle.Streams } creationPhase struct { metadata CreationPhaseMetadata @@ -136,33 +115,46 @@ type ( } dsorter dsorter dsorterStarted sync.WaitGroup - callTimeout time.Duration // max time to wait for other node to respond + callTimeout time.Duration // max time to wait for another node to respond config *cmn.Config } ) -func RegisterNode(smapOwner meta.Sowner, bmdOwner meta.Bowner, snode *meta.Snode, t cluster.Target, stats stats.Tracker) { - ctx.smapOwner = smapOwner - ctx.bmdOwner = bmdOwner - ctx.node = snode - ctx.t = t - ctx.stats = stats - debug.Assert(mm == nil) +var g global +// interface guard +var ( + _ meta.Slistener = (*Manager)(nil) + _ cos.Packer = (*buildingShardInfo)(nil) + _ cos.Unpacker = (*buildingShardInfo)(nil) +) + +func Pinit(si cluster.Node) { + psi = si + newClient() +} + +func Tinit(t cluster.Target, stats stats.Tracker, db kvdb.Driver) { + Managers = NewManagerGroup(db, false) + + debug.Assert(g.mm == nil) // only once + g.mm = t.PageMM() + g.t = t + g.tstats = stats + + fs.CSM.Reg(ct.DSortFileType, &ct.DSortFile{}) + fs.CSM.Reg(ct.DSortWorkfileType, &ct.DSortFile{}) + + newClient() +} + +func newClient() { config := cmn.GCO.Get() - ctx.client = cmn.NewClient(cmn.TransportArgs{ + bcastClient = cmn.NewClient(cmn.TransportArgs{ Timeout: config.Timeout.MaxHostBusy.D(), UseHTTPS: config.Net.HTTP.UseHTTPS, SkipVerify: config.Net.HTTP.SkipVerify, }) - - if ctx.node.IsTarget() { - mm = t.PageMM() - err := fs.CSM.Reg(ct.DSortFileType, &ct.DSortFile{}) - debug.AssertNoErr(err) - err = fs.CSM.Reg(ct.DSortWorkfileType, &ct.DSortFile{}) - debug.AssertNoErr(err) - } } ///////////// @@ -176,8 +168,7 @@ func (m *Manager) unlock() { m.mu.Unlock() } // init initializes all necessary fields. // PRECONDITION: `m.mu` must be locked. func (m *Manager) init(pars *parsedReqSpec) error { - m.ctx = ctx - m.smap = m.ctx.smapOwner.Get() + m.smap = g.t.Sowner().Get() targetCount := m.smap.CountActiveTs() @@ -185,7 +176,7 @@ func (m *Manager) init(pars *parsedReqSpec) error { m.Metrics = newMetrics(pars.Description, pars.ExtendedMetrics) m.startShardCreation = make(chan struct{}, 1) - m.ctx.smapOwner.Listeners().Reg(m) + g.t.Sowner().Listeners().Reg(m) if err := m.setDSorter(); err != nil { return err @@ -249,9 +240,8 @@ func (m *Manager) init(pars *parsedReqSpec) error { return nil } -// TODO: Currently we create streams for each dsort job but maybe we should -// create streams once and have them available for all the dsort jobs so they -// would share the resource rather than competing for it. +// TODO -- FIXME: create on demand and reuse streams across jobs +// (in re: global rebalance and EC) func (m *Manager) initStreams() error { config := cmn.GCO.Get() @@ -267,14 +257,14 @@ func (m *Manager) initStreams() error { Extra: &transport.Extra{ Compression: config.DSort.Compression, Config: config, - MMSA: mm, + MMSA: g.mm, }, } if err := transport.HandleObjStream(trname, m.recvShard); err != nil { return errors.WithStack(err) } client := transport.NewIntraDataClient() - m.streams.shards = bundle.New(m.ctx.smapOwner, m.ctx.node, client, shardsSbArgs) + m.streams.shards = bundle.New(g.t.Sowner(), g.t.Snode(), client, shardsSbArgs) return nil } @@ -323,10 +313,10 @@ func (m *Manager) cleanup() { m.ec = nil m.client = nil - m.ctx.smapOwner.Listeners().Unreg(m) + g.t.Sowner().Listeners().Unreg(m) if !m.aborted() { - m.updateFinishedAck(m.ctx.node.ID()) + m.updateFinishedAck(g.t.SID()) } } @@ -403,7 +393,7 @@ func (m *Manager) abort(errs ...error) { m.Metrics.unlock() } - nlog.Infof("%s: %s aborted", m.ctx.t, m.ManagerUUID) + nlog.Infof("%s: %s aborted", g.t, m.ManagerUUID) m.setAbortedTo(true) inProgress := m.inProgress() m.unlock() @@ -463,18 +453,18 @@ func (m *Manager) setRW() (err error) { return errors.WithStack(err) } - m.ec = newExtractCreator(m.ctx.t, m.pars.InputExtension) + m.ec = newExtractCreator(g.t, m.pars.InputExtension) if m.ec == nil { debug.Assert(m.pars.InputExtension == "", m.pars.InputExtension) // NOTE: [feature] allow non-specified extension; assign default extract-creator; // handle all shards we encounter - all supported formats - m.ec = shard.NewTarRW(m.ctx.t) + m.ec = shard.NewTarRW(g.t) } if m.pars.DryRun { debug.Assert(m.ec != nil, "dry-run in combination with _any_ shard extension is not supported yet") m.ec = shard.NopRW(m.ec) } - m.recm = shard.NewRecordManager(m.ctx.t, m.pars.InputBck, m.ec, ke, m.onDupRecs) + m.recm = shard.NewRecordManager(g.t, m.pars.InputBck, m.ec, ke, m.onDupRecs) return nil } @@ -673,7 +663,7 @@ func (m *Manager) recvShard(hdr transport.ObjHdr, objReader io.Reader, err error params.Cksum = nil params.Atime = started } - erp := m.ctx.t.PutObject(lom, params) + erp := g.t.PutObject(lom, params) cluster.FreePutObjParams(params) if erp != nil { m.abort(err) @@ -729,7 +719,7 @@ func (m *Manager) doWithAbort(reqArgs *cmn.HreqArgs) error { } func (m *Manager) ListenSmapChanged() { - newSmap := m.ctx.smapOwner.Get() + newSmap := g.t.Sowner().Get() if newSmap.Version <= m.smap.Version { return } diff --git a/ext/dsort/manager_group.go b/ext/dsort/manager_group.go index d636496847..74df7ab723 100644 --- a/ext/dsort/manager_group.go +++ b/ext/dsort/manager_group.go @@ -33,10 +33,6 @@ type ManagerGroup struct { db kvdb.Driver } -func InitManagers(db kvdb.Driver) { - Managers = NewManagerGroup(db, false) -} - // NewManagerGroup returns new, initialized manager group. func NewManagerGroup(db kvdb.Driver, skipHk bool) *ManagerGroup { mg := &ManagerGroup{ diff --git a/ext/dsort/manager_group_test.go b/ext/dsort/manager_group_test.go deleted file mode 100644 index e47b8da5b3..0000000000 --- a/ext/dsort/manager_group_test.go +++ /dev/null @@ -1,153 +0,0 @@ -// Package dsort provides distributed massively parallel resharding for very large datasets. -/* - * Copyright (c) 2018-2021, NVIDIA CORPORATION. All rights reserved. - */ -package dsort - -import ( - "os" - "time" - - "github.com/NVIDIA/aistore/cluster/mock" - "github.com/NVIDIA/aistore/cmn" - "github.com/NVIDIA/aistore/cmn/archive" - "github.com/NVIDIA/aistore/cmn/cos" - "github.com/NVIDIA/aistore/fs" - "github.com/NVIDIA/aistore/hk" - . "github.com/onsi/ginkgo" - . "github.com/onsi/gomega" -) - -const ( - testingConfigDir = "/tmp/ais_tests" -) - -var _ = Describe("ManagerGroup", func() { - var ( - mgrp *ManagerGroup - validRS = &parsedReqSpec{InputExtension: archive.ExtTar, Algorithm: &Algorithm{Kind: None}, - MaxMemUsage: cos.ParsedQuantity{Type: cos.QuantityPercent, Value: 0}, DSorterType: DSorterGeneralType} - ) - - BeforeEach(func() { - err := cos.CreateDir(testingConfigDir) - Expect(err).ShouldNot(HaveOccurred()) - - config := cmn.GCO.BeginUpdate() - config.ConfigDir = testingConfigDir - cmn.GCO.CommitUpdate(config) - db := mock.NewDBDriver() - mgrp = NewManagerGroup(db, false /* skip hk*/) - - fs.TestNew(nil) - fs.Add(testingConfigDir, "daeID") - }) - - AfterEach(func() { - err := os.RemoveAll(testingConfigDir) - Expect(err).ShouldNot(HaveOccurred()) - hk.Unreg(DSortName + hk.NameSuffix) - }) - - Context("add", func() { - It("should add a manager without an error", func() { - m, err := mgrp.Add("uuid") - m.unlock() - Expect(err).ShouldNot(HaveOccurred()) - Expect(m).ToNot(BeNil()) - Expect(m.ManagerUUID).To(Equal("uuid")) - Expect(mgrp.managers).To(HaveLen(1)) - }) - - It("should not add a manager when other manager with same uuid already exists", func() { - m, err := mgrp.Add("uuid") - m.unlock() - Expect(err).ShouldNot(HaveOccurred()) - _, err = mgrp.Add("uuid") - Expect(err).Should(HaveOccurred()) - Expect(mgrp.managers).To(HaveLen(1)) - }) - }) - - Context("get", func() { - It("should return 'not exists' when getting manager with non-existing uuid", func() { - _, exists := mgrp.Get("uuid") - Expect(exists).To(BeFalse()) - }) - - It("should return manager when manager with given uuid exists", func() { - m, err := mgrp.Add("uuid") - m.unlock() - Expect(err).ShouldNot(HaveOccurred()) - m, exists := mgrp.Get("uuid") - Expect(exists).To(BeTrue()) - Expect(m).ToNot(BeNil()) - Expect(m.ManagerUUID).To(Equal("uuid")) - }) - }) - - Context("persist", func() { - ctx.smapOwner = newTestSmap("target") - ctx.node = ctx.smapOwner.Get().Tmap["target"] - - It("should persist manager but not return by default", func() { - m, err := mgrp.Add("uuid") - Expect(err).ShouldNot(HaveOccurred()) - - Expect(m.init(validRS)).NotTo(HaveOccurred()) - m.setInProgressTo(false) - m.unlock() - - mgrp.persist("uuid") - m, exists := mgrp.Get("uuid") - Expect(exists).To(BeFalse()) - Expect(m).To(BeNil()) - }) - - It("should persist manager and return it when requested", func() { - m, err := mgrp.Add("uuid") - Expect(err).ShouldNot(HaveOccurred()) - - Expect(m.init(validRS)).ToNot(HaveOccurred()) - m.setInProgressTo(false) - m.unlock() - - mgrp.persist("uuid") - m, exists := mgrp.Get("uuid", true /*allowPersisted*/) - Expect(exists).To(BeTrue()) - Expect(m).ToNot(BeNil()) - Expect(m.ManagerUUID).To(Equal("uuid")) - }) - }) - - Context("housekeep", func() { - persistManager := func(uuid string, finishedAgo time.Duration) { - m, err := mgrp.Add(uuid) - Expect(err).ShouldNot(HaveOccurred()) - err = m.init(validRS) - Expect(err).ShouldNot(HaveOccurred()) - m.Metrics.Extraction.End = time.Now().Add(-finishedAgo) - m.unlock() - mgrp.persist(m.ManagerUUID) - } - - It("should not clean anything when manager group is empty", func() { - Expect(mgrp.housekeep()).To(Equal(hk.DayInterval)) - jobs := mgrp.List(nil, false /*onlyActive*/) - Expect(jobs).To(HaveLen(0)) - }) - - It("should clean managers which are old", func() { - persistManager("uuid1", 48*time.Hour) - persistManager("uuid2", 24*time.Hour) - persistManager("uuid3", 23*time.Hour) - persistManager("uuid4", 1*time.Hour) - - Expect(mgrp.housekeep()).To(Equal(hk.DayInterval)) - jobs := mgrp.List(nil, false /*onlyActive*/) - Expect(jobs).To(HaveLen(2)) - Expect(jobs[0].ID).To(Equal("uuid3")) - Expect(jobs[1].ID).To(Equal("uuid4")) - }) - }) -}) diff --git a/ext/dsort/manager_test.go b/ext/dsort/manager_test.go deleted file mode 100644 index 2fd6784c9c..0000000000 --- a/ext/dsort/manager_test.go +++ /dev/null @@ -1,283 +0,0 @@ -// Package dsort provides distributed massively parallel resharding for very large datasets. -/* - * Copyright (c) 2018-2022, NVIDIA CORPORATION. All rights reserved. - * - */ -package dsort - -import ( - "bytes" - "fmt" - "io" - "math/rand" - "testing" - - "github.com/NVIDIA/aistore/cluster/mock" - "github.com/NVIDIA/aistore/cmn/archive" - "github.com/NVIDIA/aistore/cmn/cos" - "github.com/NVIDIA/aistore/cmn/debug" - "github.com/NVIDIA/aistore/ext/dsort/shard" - "github.com/NVIDIA/aistore/fs" - "github.com/NVIDIA/aistore/tools/trand" - . "github.com/onsi/ginkgo" - . "github.com/onsi/gomega" - "github.com/tinylib/msgp/msgp" -) - -var _ = Describe("Init", func() { - BeforeEach(func() { - ctx.smapOwner = newTestSmap("target") - ctx.node = ctx.smapOwner.Get().Tmap["target"] - fs.TestNew(nil) - }) - - It("should init with tar extension", func() { - m := &Manager{ctx: dsortContext{t: mock.NewTarget(nil)}} - m.lock() - defer m.unlock() - sr := &parsedReqSpec{InputExtension: archive.ExtTar, Algorithm: &Algorithm{Kind: None}, - MaxMemUsage: cos.ParsedQuantity{Type: cos.QuantityPercent, Value: 0}, DSorterType: DSorterGeneralType} - Expect(m.init(sr)).NotTo(HaveOccurred()) - }) - - It("should init with tgz extension", func() { - m := &Manager{ctx: dsortContext{t: mock.NewTarget(nil)}} - m.lock() - defer m.unlock() - sr := &parsedReqSpec{InputExtension: archive.ExtTarGz, Algorithm: &Algorithm{Kind: None}, - MaxMemUsage: cos.ParsedQuantity{Type: cos.QuantityPercent, Value: 0}, DSorterType: DSorterGeneralType} - Expect(m.init(sr)).NotTo(HaveOccurred()) - }) - - It("should init with tar.gz extension", func() { - m := &Manager{ctx: dsortContext{t: mock.NewTarget(nil)}} - m.lock() - defer m.unlock() - sr := &parsedReqSpec{InputExtension: archive.ExtTgz, Algorithm: &Algorithm{Kind: None}, - MaxMemUsage: cos.ParsedQuantity{Type: cos.QuantityPercent, Value: 0}, DSorterType: DSorterGeneralType} - Expect(m.init(sr)).NotTo(HaveOccurred()) - }) - - It("should init with zip extension", func() { - m := &Manager{ctx: dsortContext{t: mock.NewTarget(nil)}} - m.lock() - defer m.unlock() - sr := &parsedReqSpec{InputExtension: archive.ExtZip, Algorithm: &Algorithm{Kind: None}, - MaxMemUsage: cos.ParsedQuantity{Type: cos.QuantityPercent, Value: 0}, DSorterType: DSorterGeneralType} - Expect(m.init(sr)).NotTo(HaveOccurred()) - }) -}) - -func BenchmarkRecordsMarshal(b *testing.B) { - benches := []struct { - recordCnt int - recordObjCnt int - }{ - {recordCnt: 100, recordObjCnt: 1}, - {recordCnt: 100, recordObjCnt: 3}, - {recordCnt: 100, recordObjCnt: 5}, - - {recordCnt: 1_000, recordObjCnt: 1}, - {recordCnt: 1_000, recordObjCnt: 3}, - {recordCnt: 1_000, recordObjCnt: 5}, - - {recordCnt: 10_000, recordObjCnt: 1}, - {recordCnt: 10_000, recordObjCnt: 3}, - {recordCnt: 10_000, recordObjCnt: 5}, - - {recordCnt: 100_000, recordObjCnt: 1}, - {recordCnt: 100_000, recordObjCnt: 3}, - {recordCnt: 100_000, recordObjCnt: 5}, - } - - buf := make([]byte, 0, serializationBufSize) - for _, bench := range benches { - name := fmt.Sprintf("r:%d_o:%d", bench.recordCnt, bench.recordObjCnt) - b.Run(name, func(b *testing.B) { - b.ReportAllocs() - records := generateRecords(bench.recordCnt, bench.recordObjCnt) - b.ResetTimer() - - for i := 0; i < b.N; i++ { - w := io.Discard - err := records.EncodeMsg(msgp.NewWriterBuf(w, buf)) - debug.AssertNoErr(err) - } - }) - } -} - -func BenchmarkRecordsUnmarshal(b *testing.B) { - benches := []struct { - recordCnt int - recordObjCnt int - }{ - {recordCnt: 100, recordObjCnt: 1}, - {recordCnt: 100, recordObjCnt: 3}, - {recordCnt: 100, recordObjCnt: 5}, - - {recordCnt: 1_000, recordObjCnt: 1}, - {recordCnt: 1_000, recordObjCnt: 3}, - {recordCnt: 1_000, recordObjCnt: 5}, - - {recordCnt: 10_000, recordObjCnt: 1}, - {recordCnt: 10_000, recordObjCnt: 3}, - {recordCnt: 10_000, recordObjCnt: 5}, - - {recordCnt: 100_000, recordObjCnt: 1}, - {recordCnt: 100_000, recordObjCnt: 3}, - {recordCnt: 100_000, recordObjCnt: 5}, - } - - buf := make([]byte, 0, serializationBufSize) - for _, bench := range benches { - name := fmt.Sprintf("r:%d_o:%d", bench.recordCnt, bench.recordObjCnt) - b.Run(name, func(b *testing.B) { - b.ReportAllocs() - var ( - records = generateRecords(bench.recordCnt, bench.recordObjCnt) - network = bytes.NewBuffer(nil) - w = msgp.NewWriter(network) - ) - - err := records.EncodeMsg(w) - debug.AssertNoErr(err) - debug.AssertNoErr(w.Flush()) - - b.ResetTimer() - for i := 0; i < b.N; i++ { - b.StopTimer() - var ( - r = bytes.NewReader(network.Bytes()) - newRecords = shard.NewRecords(bench.recordCnt) - ) - b.StartTimer() - - err := newRecords.DecodeMsg(msgp.NewReaderBuf(r, buf)) - debug.AssertNoErr(err) - } - }) - } -} - -func BenchmarkCreationPhaseMetadataMarshal(b *testing.B) { - benches := []struct { - shardCnt int - recordCnt int - recordObjCnt int - }{ - {shardCnt: 10, recordCnt: 100, recordObjCnt: 1}, - {shardCnt: 10, recordCnt: 100, recordObjCnt: 3}, - {shardCnt: 1_0000, recordCnt: 100, recordObjCnt: 1}, - {shardCnt: 1_0000, recordCnt: 100, recordObjCnt: 3}, - - {shardCnt: 10, recordCnt: 10_000, recordObjCnt: 1}, - {shardCnt: 10, recordCnt: 10_000, recordObjCnt: 3}, - {shardCnt: 1_000, recordCnt: 1_000, recordObjCnt: 1}, - {shardCnt: 1_000, recordCnt: 1_000, recordObjCnt: 3}, - } - - for _, bench := range benches { - name := fmt.Sprintf("s:%d_r:%d_o:%d", bench.shardCnt, bench.recordCnt, bench.recordObjCnt) - b.Run(name, func(b *testing.B) { - b.ReportAllocs() - - md := CreationPhaseMetadata{ - Shards: generateShards(bench.shardCnt, bench.recordCnt, bench.recordObjCnt), - } - - b.ResetTimer() - for i := 0; i < b.N; i++ { - w := io.Discard - err := md.EncodeMsg(msgp.NewWriterSize(w, serializationBufSize)) - debug.AssertNoErr(err) - } - }) - } -} - -func BenchmarkCreationPhaseMetadataUnmarshal(b *testing.B) { - benches := []struct { - shardCnt int - recordCnt int - recordObjCnt int - }{ - {shardCnt: 10, recordCnt: 100, recordObjCnt: 1}, - {shardCnt: 10, recordCnt: 100, recordObjCnt: 3}, - {shardCnt: 1_0000, recordCnt: 100, recordObjCnt: 1}, - {shardCnt: 1_0000, recordCnt: 100, recordObjCnt: 3}, - - {shardCnt: 10, recordCnt: 10_000, recordObjCnt: 1}, - {shardCnt: 10, recordCnt: 10_000, recordObjCnt: 3}, - {shardCnt: 1_000, recordCnt: 1_000, recordObjCnt: 1}, - {shardCnt: 1_000, recordCnt: 1_000, recordObjCnt: 3}, - } - - buf := make([]byte, 0, serializationBufSize) - for _, bench := range benches { - name := fmt.Sprintf("s:%d_r:%d_o:%d", bench.shardCnt, bench.recordCnt, bench.recordObjCnt) - b.Run(name, func(b *testing.B) { - b.ReportAllocs() - - var ( - md = CreationPhaseMetadata{ - Shards: generateShards(bench.shardCnt, bench.recordCnt, bench.recordObjCnt), - } - network = bytes.NewBuffer(nil) - w = msgp.NewWriter(network) - ) - debug.AssertNoErr(md.EncodeMsg(w)) - debug.AssertNoErr(w.Flush()) - - b.ResetTimer() - for i := 0; i < b.N; i++ { - b.StopTimer() - var ( - r = bytes.NewReader(network.Bytes()) - newMD = &CreationPhaseMetadata{} - ) - b.StartTimer() - - err := newMD.DecodeMsg(msgp.NewReaderBuf(r, buf)) - debug.AssertNoErr(err) - } - }) - } -} - -func generateShards(shardCnt, recordCnt, recordObjCnt int) []*shard.Shard { - shards := make([]*shard.Shard, 0, shardCnt) - for i := 0; i < shardCnt; i++ { - s := &shard.Shard{ - Name: fmt.Sprintf("shard-%d", i), - Size: rand.Int63(), - Records: generateRecords(recordCnt, recordObjCnt), - } - shards = append(shards, s) - } - return shards -} - -func generateRecords(recordCnt, recordObjCnt int) *shard.Records { - records := shard.NewRecords(recordCnt) - for i := 0; i < recordCnt; i++ { - r := &shard.Record{ - Key: trand.String(20), - Name: trand.String(30), - DaemonID: trand.String(10), - } - for j := 0; j < recordObjCnt; j++ { - r.Objects = append(r.Objects, &shard.RecordObj{ - ContentPath: trand.String(50), - ObjectFileType: "abc", - StoreType: "ab", - Offset: rand.Int63(), - MetadataSize: archive.TarBlockSize, - Size: rand.Int63(), - Extension: "." + trand.String(4), - }) - } - records.Insert(r) - } - return records -} diff --git a/ext/dsort/mem_watcher.go b/ext/dsort/mem_watcher.go index 3b8ffba988..f0670aa563 100644 --- a/ext/dsort/mem_watcher.go +++ b/ext/dsort/mem_watcher.go @@ -126,7 +126,7 @@ func (mw *memoryWatcher) watchReserved() { func (mw *memoryWatcher) watchExcess(memStat sys.MemStat) { defer mw.excess.wg.Done() - buf, slab := mm.Alloc() + buf, slab := g.mm.Alloc() defer slab.Free(buf) lastMemoryUsage := memStat.ActualUsed diff --git a/fs/content.go b/fs/content.go index 151aef2f73..5c059af6c0 100644 --- a/fs/content.go +++ b/fs/content.go @@ -12,6 +12,7 @@ import ( "github.com/NVIDIA/aistore/cmn" "github.com/NVIDIA/aistore/cmn/cos" + "github.com/NVIDIA/aistore/cmn/debug" "github.com/NVIDIA/aistore/cmn/nlog" ) @@ -85,7 +86,15 @@ func (f *contentSpecMgr) Resolver(contentType string) ContentResolver { // Reg registers new content type with a given content resolver. // NOTE: all content type registrations must happen at startup. -func (f *contentSpecMgr) Reg(contentType string, spec ContentResolver) error { +func (f *contentSpecMgr) Reg(contentType string, spec ContentResolver, unitTest ...bool) { + err := f._reg(contentType, spec) + if err != nil && len(unitTest) == 0 { + debug.Assert(false) + cos.ExitLog(err) + } +} + +func (f *contentSpecMgr) _reg(contentType string, spec ContentResolver) error { if strings.ContainsRune(contentType, filepath.Separator) { return fmt.Errorf("%s content type cannot contain %q", contentType, filepath.Separator) } diff --git a/fs/fqn_test.go b/fs/fqn_test.go index 9f2da55dbb..33b1c91be5 100644 --- a/fs/fqn_test.go +++ b/fs/fqn_test.go @@ -267,8 +267,8 @@ func TestParseFQN(t *testing.T) { tassert.CheckFatal(t, err) } } - fs.CSM.Reg(fs.ObjectType, &fs.ObjectContentResolver{}) - fs.CSM.Reg(fs.WorkfileType, &fs.WorkfileContentResolver{}) + fs.CSM.Reg(fs.ObjectType, &fs.ObjectContentResolver{}, true) + fs.CSM.Reg(fs.WorkfileType, &fs.WorkfileContentResolver{}, true) parsedFQN, err := fs.ParseFQN(tt.fqn) if (err != nil) != tt.wantErr { @@ -360,8 +360,8 @@ func TestMakeAndParseFQN(t *testing.T) { _, err := fs.Add(tt.mpath, "daeID") tassert.CheckFatal(t, err) - fs.CSM.Reg(fs.ObjectType, &fs.ObjectContentResolver{}) - fs.CSM.Reg(fs.WorkfileType, &fs.WorkfileContentResolver{}) + fs.CSM.Reg(fs.ObjectType, &fs.ObjectContentResolver{}, true) + fs.CSM.Reg(fs.WorkfileType, &fs.WorkfileContentResolver{}, true) mpaths := fs.GetAvail() fqn := mpaths[tt.mpath].MakePathFQN(&tt.bck, tt.contentType, tt.objName) diff --git a/fs/walk_test.go b/fs/walk_test.go index 2a6bd03f6f..dfc5bc011f 100644 --- a/fs/walk_test.go +++ b/fs/walk_test.go @@ -38,7 +38,7 @@ func TestWalkBck(t *testing.T) { t.Run(test.name, func(t *testing.T) { fs.TestNew(mock.NewIOS()) fs.TestDisableValidation() - _ = fs.CSM.Reg(fs.ObjectType, &fs.ObjectContentResolver{}) + fs.CSM.Reg(fs.ObjectType, &fs.ObjectContentResolver{}, true) mpaths := make([]string, 0, test.mpathCnt) defer func() { @@ -125,7 +125,7 @@ func TestWalkBckSkipDir(t *testing.T) { fs.TestNew(mock.NewIOS()) fs.TestDisableValidation() - _ = fs.CSM.Reg(fs.ObjectType, &fs.ObjectContentResolver{}) + fs.CSM.Reg(fs.ObjectType, &fs.ObjectContentResolver{}, true) defer func() { for mpath := range mpaths { diff --git a/mirror/utils_test.go b/mirror/utils_test.go index 7a4b39b513..7c3782c79b 100644 --- a/mirror/utils_test.go +++ b/mirror/utils_test.go @@ -43,8 +43,8 @@ var _ = Describe("Mirror", func() { fs.TestDisableValidation() _, _ = fs.Add(mpath, "daeID") _, _ = fs.Add(mpath2, "daeID") - _ = fs.CSM.Reg(fs.ObjectType, &fs.ObjectContentResolver{}) - _ = fs.CSM.Reg(fs.WorkfileType, &fs.WorkfileContentResolver{}) + fs.CSM.Reg(fs.ObjectType, &fs.ObjectContentResolver{}, true) + fs.CSM.Reg(fs.WorkfileType, &fs.WorkfileContentResolver{}, true) var ( props = &cmn.BucketProps{ diff --git a/space/space_test.go b/space/space_test.go index 1e8b36b582..aac9423a1c 100644 --- a/space/space_test.go +++ b/space/space_test.go @@ -381,8 +381,8 @@ func createAndAddMountpath(path string) { fs.TestNew(nil) fs.Add(path, "daeID") - fs.CSM.Reg(fs.ObjectType, &fs.ObjectContentResolver{}) - fs.CSM.Reg(fs.WorkfileType, &fs.WorkfileContentResolver{}) + fs.CSM.Reg(fs.ObjectType, &fs.ObjectContentResolver{}, true) + fs.CSM.Reg(fs.WorkfileType, &fs.WorkfileContentResolver{}, true) } func getRandomFileName(fileCounter int) string { diff --git a/tools/client.go b/tools/client.go index 19d56d086a..cc0cbd3f6b 100644 --- a/tools/client.go +++ b/tools/client.go @@ -424,7 +424,7 @@ func GetObjectAtime(t *testing.T, bp api.BaseParams, bck cmn.Bck, object, timeFo // WaitForDSortToFinish waits until all dSorts jobs finished without failure or // all jobs abort. func WaitForDSortToFinish(proxyURL, managerUUID string) (allAborted bool, err error) { - tlog.Logf("waiting for dsort[%s] to finish\n", managerUUID) + tlog.Logf("waiting for dsort[%s]\n", managerUUID) bp := BaseAPIParams(proxyURL) deadline := time.Now().Add(DsortFinishTimeout) diff --git a/tools/file.go b/tools/file.go index 986f6cbf18..f70e12ac68 100644 --- a/tools/file.go +++ b/tools/file.go @@ -149,10 +149,10 @@ func PrepareObjects(t *testing.T, desc ObjectsDesc) *ObjectsOut { fs.TestNew(mios) fs.TestDisableValidation() - _ = fs.CSM.Reg(fs.WorkfileType, &fs.WorkfileContentResolver{}) - _ = fs.CSM.Reg(fs.ObjectType, &fs.ObjectContentResolver{}) - _ = fs.CSM.Reg(fs.ECSliceType, &fs.ECSliceContentResolver{}) - _ = fs.CSM.Reg(fs.ECMetaType, &fs.ECMetaContentResolver{}) + fs.CSM.Reg(fs.WorkfileType, &fs.WorkfileContentResolver{}, true) + fs.CSM.Reg(fs.ObjectType, &fs.ObjectContentResolver{}, true) + fs.CSM.Reg(fs.ECSliceType, &fs.ECSliceContentResolver{}, true) + fs.CSM.Reg(fs.ECMetaType, &fs.ECMetaContentResolver{}, true) dir := t.TempDir()