Skip to content

Commit

Permalink
dsort: remove dsort-context, rewrite initialization
Browse files Browse the repository at this point in the history
* cleanup and simplify
* rm unit tests
* part fifteen, prev. commit: 393c709

Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Aug 14, 2023
1 parent 393c709 commit feebeae
Show file tree
Hide file tree
Showing 26 changed files with 240 additions and 1,195 deletions.
3 changes: 2 additions & 1 deletion ais/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,8 @@ func (p *proxy) Run() error {
nlog.Infof("%s: [%s net] listening on: %s", p, cmn.NetIntraData, p.si.DataNet.URL)
}

dsort.RegisterNode(p.owner.smap, p.owner.bmd, p.si, nil, p.statsT)
dsort.Pinit(p)

return p.htrun.run()
}

Expand Down
11 changes: 3 additions & 8 deletions ais/target.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,12 +302,8 @@ func (t *target) Run() error {
}

// register object type and workfile type
if err := fs.CSM.Reg(fs.ObjectType, &fs.ObjectContentResolver{}); err != nil {
cos.ExitLog(err)
}
if err := fs.CSM.Reg(fs.WorkfileType, &fs.WorkfileContentResolver{}); err != nil {
cos.ExitLog(err)
}
fs.CSM.Reg(fs.ObjectType, &fs.ObjectContentResolver{})
fs.CSM.Reg(fs.WorkfileType, &fs.WorkfileContentResolver{})

// Init meta-owners and load local instances
if prev := t.owner.bmd.init(); prev {
Expand Down Expand Up @@ -372,8 +368,7 @@ func (t *target) Run() error {
go t.goreslver(marked.Interrupted)
}

dsort.InitManagers(db)
dsort.RegisterNode(t.owner.smap, t.owner.bmd, t.si, t, t.statsT)
dsort.Tinit(t, t.statsT, db)

err = t.htrun.run()

Expand Down
8 changes: 4 additions & 4 deletions ais/test/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -1032,10 +1032,10 @@ func initFS() {
config.Backend = cfg.Backend
cmn.GCO.CommitUpdate(config)

_ = fs.CSM.Reg(fs.ObjectType, &fs.ObjectContentResolver{})
_ = fs.CSM.Reg(fs.WorkfileType, &fs.WorkfileContentResolver{})
_ = fs.CSM.Reg(fs.ECSliceType, &fs.ECSliceContentResolver{})
_ = fs.CSM.Reg(fs.ECMetaType, &fs.ECMetaContentResolver{})
fs.CSM.Reg(fs.ObjectType, &fs.ObjectContentResolver{})
fs.CSM.Reg(fs.WorkfileType, &fs.WorkfileContentResolver{})
fs.CSM.Reg(fs.ECSliceType, &fs.ECSliceContentResolver{})
fs.CSM.Reg(fs.ECMetaType, &fs.ECMetaContentResolver{})
}

func initMountpaths(t *testing.T, proxyURL string) {
Expand Down
1 change: 0 additions & 1 deletion ais/test/dsort_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1364,7 +1364,6 @@ func TestDsortAbort(t *testing.T) {
err = api.AbortDSort(df.baseParams, df.managerUUID)
tassert.CheckFatal(t, err)

tlog.Logln("waiting for dsort to finish")
_, err = tools.WaitForDSortToFinish(m.proxyURL, df.managerUUID)
tassert.CheckFatal(t, err)

Expand Down
4 changes: 2 additions & 2 deletions ais/tgtobj_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ func TestMain(m *testing.M) {
defer os.RemoveAll(testMountpath)
fs.TestNew(nil)
fs.TestDisableValidation()
_ = fs.CSM.Reg(fs.ObjectType, &fs.ObjectContentResolver{})
_ = fs.CSM.Reg(fs.WorkfileType, &fs.WorkfileContentResolver{})
fs.CSM.Reg(fs.ObjectType, &fs.ObjectContentResolver{}, true)
fs.CSM.Reg(fs.WorkfileType, &fs.WorkfileContentResolver{}, true)

// target
config := cmn.GCO.Get()
Expand Down
4 changes: 2 additions & 2 deletions cluster/lom_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ var _ = Describe("LOM", func() {
_, _ = fs.Add(mpath, "daeID")
}

_ = fs.CSM.Reg(fs.ObjectType, &fs.ObjectContentResolver{})
_ = fs.CSM.Reg(fs.WorkfileType, &fs.WorkfileContentResolver{})
fs.CSM.Reg(fs.ObjectType, &fs.ObjectContentResolver{}, true)
fs.CSM.Reg(fs.WorkfileType, &fs.WorkfileContentResolver{}, true)

bmd := mock.NewBaseBownerMock(
meta.NewBck(
Expand Down
4 changes: 2 additions & 2 deletions cluster/lom_xattr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ var _ = Describe("LOM Xattributes", func() {
localBck := cmn.Bck{Name: bucketLocal, Provider: apc.AIS, Ns: cmn.NsGlobal}
cachedBck := cmn.Bck{Name: bucketCached, Provider: apc.AIS, Ns: cmn.NsGlobal}

_ = fs.CSM.Reg(fs.ObjectType, &fs.ObjectContentResolver{})
_ = fs.CSM.Reg(fs.WorkfileType, &fs.WorkfileContentResolver{})
fs.CSM.Reg(fs.ObjectType, &fs.ObjectContentResolver{}, true)
fs.CSM.Reg(fs.WorkfileType, &fs.WorkfileContentResolver{}, true)

var (
copyMpathInfo *fs.Mountpath
Expand Down
73 changes: 73 additions & 0 deletions ext/dsort/bcast.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Package dsort provides distributed massively parallel resharding for very large datasets.
/*
* Copyright (c) 2018-2023, NVIDIA CORPORATION. All rights reserved.
*/
package dsort

import (
"io"
"net/http"
"net/url"
"sync"

"github.com/NVIDIA/aistore/cluster/meta"
"github.com/NVIDIA/aistore/cmn"
"github.com/NVIDIA/aistore/cmn/cos"
)

//
// common code used by proxy _and_ target, both
//

var bcastClient *http.Client

func bcast(method, path string, urlParams url.Values, body []byte, smap *meta.Smap, ignore ...*meta.Snode) []response {
var (
idx int
responses = make([]response, smap.CountActiveTs())
wg = &sync.WaitGroup{}
)
outer:
for _, node := range smap.Tmap {
if smap.InMaintOrDecomm(node) {
continue
}
for _, ignoreNode := range ignore {
if ignoreNode.Equals(node) {
continue outer
}
}
reqArgs := cmn.HreqArgs{
Method: method,
Base: node.URL(cmn.NetIntraControl),
Path: path,
Query: urlParams,
Body: body,
}
wg.Add(1)
go func(si *meta.Snode, args *cmn.HreqArgs, i int) {
responses[i] = call(args)
responses[i].si = si
wg.Done()
}(node, &reqArgs, idx)
idx++
}
wg.Wait()

return responses[:idx]
}

func call(reqArgs *cmn.HreqArgs) response {
req, err := reqArgs.Req()
if err != nil {
return response{err: err, statusCode: http.StatusInternalServerError}
}

resp, err := bcastClient.Do(req) //nolint:bodyclose // Closed inside `cos.Close`.
if err != nil {
return response{err: err, statusCode: http.StatusInternalServerError}
}
out, err := io.ReadAll(resp.Body)
cos.Close(resp.Body)
return response{res: out, err: err, statusCode: resp.StatusCode}
}
4 changes: 4 additions & 0 deletions ext/dsort/conc_adjuster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ import (
. "github.com/onsi/gomega"
)

const (
testingConfigDir = "/tmp/ais_tests"
)

func calcSemaLimit(acquire, release func()) int {
var i atomic.Int32
wg := &sync.WaitGroup{}
Expand Down
40 changes: 20 additions & 20 deletions ext/dsort/dsort.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ var js = jsoniter.ConfigFastest

func (m *Manager) finish() {
if m.config.FastV(4, cos.SmoduleDsort) {
nlog.Infof("%s: %s finished", m.ctx.t, m.ManagerUUID)
nlog.Infof("%s: %s finished", g.t, m.ManagerUUID)
}
m.lock()
m.setInProgressTo(false)
Expand All @@ -92,20 +92,20 @@ func (m *Manager) start() (err error) {
}

// Phase 1.
nlog.Infof("%s: %s started extraction stage", m.ctx.t, m.ManagerUUID)
nlog.Infof("%s: %s started extraction stage", g.t, m.ManagerUUID)
if err := m.extractLocalShards(); err != nil {
return err
}

s := binary.BigEndian.Uint64(m.pars.TargetOrderSalt)
targetOrder := randomTargetOrder(s, m.smap.Tmap)
if m.config.FastV(4, cos.SmoduleDsort) {
nlog.Infof("%s: %s final target in targetOrder => URL: %s, tid %s", m.ctx.t, m.ManagerUUID,
nlog.Infof("%s: %s final target in targetOrder => URL: %s, tid %s", g.t, m.ManagerUUID,
targetOrder[len(targetOrder)-1].PubNet.URL, targetOrder[len(targetOrder)-1].ID())
}

// Phase 2.
nlog.Infof("%s: %s started sort stage", m.ctx.t, m.ManagerUUID)
nlog.Infof("%s: %s started sort stage", g.t, m.ManagerUUID)
curTargetIsFinal, err := m.participateInRecordDistribution(targetOrder)
if err != nil {
return err
Expand All @@ -120,7 +120,7 @@ func (m *Manager) start() (err error) {

shardSize := int64(float64(m.pars.OutputShardSize) / ratio)

nlog.Infof("%s: %s started phase 3 distribution", m.ctx.t, m.ManagerUUID)
nlog.Infof("%s: %s started phase 3 distribution", g.t, m.ManagerUUID)
if err := m.phase3(shardSize); err != nil {
return err
}
Expand All @@ -139,12 +139,12 @@ func (m *Manager) start() (err error) {

// After each target participates in the cluster-wide record distribution,
// start listening for the signal to start creating shards locally.
nlog.Infof("%s: %s started creation stage", m.ctx.t, m.ManagerUUID)
nlog.Infof("%s: %s started creation stage", g.t, m.ManagerUUID)
if err := m.dsorter.createShardsLocally(); err != nil {
return err
}

nlog.Infof("%s: %s finished successfully", m.ctx.t, m.ManagerUUID)
nlog.Infof("%s: %s finished successfully", g.t, m.ManagerUUID)
return nil
}

Expand All @@ -153,7 +153,7 @@ func (m *Manager) startDSorter() error {
if err := m.initStreams(); err != nil {
return err
}
nlog.Infof("%s: %s starting with dsorter: %q", m.ctx.t, m.ManagerUUID, m.dsorter.name())
nlog.Infof("%s: %s starting with dsorter: %q", g.t, m.ManagerUUID, m.dsorter.name())
return m.dsorter.start()
}

Expand Down Expand Up @@ -281,7 +281,7 @@ func (m *Manager) createShard(s *shard.Shard, lom *cluster.LOM) (err error) {
// TODO: params.Xact - in part, to count PUTs and bytes in a generic fashion
// (vs metrics.ShardCreationStats.updateThroughput - see below)
}
err = m.ctx.t.PutObject(lom, params)
err = g.t.PutObject(lom, params)
cluster.FreePutObjParams(params)
if err == nil {
n = lom.SizeBytes()
Expand All @@ -296,7 +296,7 @@ func (m *Manager) createShard(s *shard.Shard, lom *cluster.LOM) (err error) {
ec := m.ec
if m.pars.InputExtension != m.pars.OutputExtension {
// NOTE: resharding into a different format
ec = newExtractCreator(m.ctx.t, m.pars.OutputExtension)
ec = newExtractCreator(g.t, m.pars.OutputExtension)
}

_, err = ec.Create(s, w, m.dsorter)
Expand Down Expand Up @@ -334,7 +334,7 @@ func (m *Manager) createShard(s *shard.Shard, lom *cluster.LOM) (err error) {
// according to HRW, send it there. Since it doesn't really matter
// if we have an extra copy of the object local to this target, we
// optimize for performance by not removing the object now.
if si.ID() != m.ctx.node.ID() && !m.pars.DryRun {
if si.ID() != g.t.SID() && !m.pars.DryRun {
lom.Lock(false)
defer lom.Unlock(false)

Expand Down Expand Up @@ -380,7 +380,7 @@ func (m *Manager) createShard(s *shard.Shard, lom *cluster.LOM) (err error) {
exit:
metrics.mu.Lock()
metrics.CreatedCnt++
if si.ID() != m.ctx.node.ID() {
if si.ID() != g.t.SID() {
metrics.MovedShardCnt++
}
if m.Metrics.extended {
Expand Down Expand Up @@ -434,7 +434,7 @@ func (m *Manager) participateInRecordDistribution(targetOrder meta.Nodes) (curre
}

for i, d = range targetOrder {
if d != dummyTarget && d.ID() == m.ctx.node.ID() {
if d != dummyTarget && d.ID() == g.t.SID() {
break
}
}
Expand All @@ -449,7 +449,7 @@ func (m *Manager) participateInRecordDistribution(targetOrder meta.Nodes) (curre
)
group.Go(func() error {
var (
buf, slab = mm.AllocSize(serializationBufSize)
buf, slab = g.mm.AllocSize(serializationBufSize)
msgpw = msgp.NewWriterBuf(w, buf)
)
defer slab.Free(buf)
Expand Down Expand Up @@ -610,7 +610,7 @@ func (m *Manager) generateShardsWithOrderingFile(maxSize int64) ([]*shard.Shard,
return nil, err
}
// is intra-call
tsi := m.ctx.t.Snode()
tsi := g.t.Snode()
req.Header.Set(apc.HdrCallerID, tsi.ID())
req.Header.Set(apc.HdrCallerName, tsi.String())

Expand Down Expand Up @@ -746,7 +746,7 @@ func (m *Manager) phase3(maxSize int64) error {
}

bck := meta.CloneBck(&m.pars.OutputBck)
if err := bck.Init(m.ctx.bmdOwner); err != nil {
if err := bck.Init(g.t.Bowner()); err != nil {
return err
}
for _, s := range shards {
Expand Down Expand Up @@ -790,7 +790,7 @@ func (m *Manager) phase3(maxSize int64) error {
for err := range errCh {
return errors.Errorf("error while sending shards: %v", err)
}
nlog.Infof("%s: %s finished sending all shards", m.ctx.t, m.ManagerUUID)
nlog.Infof("%s: %s finished sending all shards", g.t, m.ManagerUUID)
return nil
}

Expand All @@ -801,7 +801,7 @@ func (m *Manager) _dist(si *meta.Snode, s []*shard.Shard, order map[string]*shar
)
group.Go(func() error {
var (
buf, slab = mm.AllocSize(serializationBufSize)
buf, slab = g.mm.AllocSize(serializationBufSize)
msgpw = msgp.NewWriterBuf(w, buf)
md = &CreationPhaseMetadata{Shards: s, SendOrder: order}
)
Expand Down Expand Up @@ -873,7 +873,7 @@ func (es *extractShard) do() (err error) {
if errV == nil {
if !archive.EqExt(ext, m.pars.InputExtension) {
if m.config.FastV(4, cos.SmoduleDsort) {
nlog.Infof("%s: %s skipping %s: %q vs %q", m.ctx.t, m.ManagerUUID,
nlog.Infof("%s: %s skipping %s: %q vs %q", g.t, m.ManagerUUID,
es.name, ext, m.pars.InputExtension)
}
return
Expand Down Expand Up @@ -919,7 +919,7 @@ func (es *extractShard) _do(lom *cluster.LOM) error {
return nil // skip
}
// NOTE: extract-creator for _this_ shard (compare with createShard above)
ec = newExtractCreator(m.ctx.t, ext)
ec = newExtractCreator(g.t, ext)
}

phaseInfo := &m.extractionPhase
Expand Down
Loading

0 comments on commit feebeae

Please sign in to comment.