Skip to content

Commit

Permalink
dsort: continued refactoring
Browse files Browse the repository at this point in the history
* part three, prev. commit: dd35a91

Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Aug 2, 2023
1 parent 958de6e commit cf73a58
Show file tree
Hide file tree
Showing 9 changed files with 261 additions and 269 deletions.
12 changes: 12 additions & 0 deletions cmn/archive/mime.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,3 +170,15 @@ func _detect(file *os.File, archname string, buf []byte) (m string, n int, err e
err = fmt.Errorf("failed to detect file signature in %q", archname)
return
}

func EqExt(ext1, ext2 string) bool {
switch {
case ext1 == ext2:
return true
case ext1 == ExtTarGz && ext2 == ExtTgz:
return true
case ext2 == ExtTarGz && ext1 == ExtTgz:
return true
}
return false
}
91 changes: 52 additions & 39 deletions ext/dsort/dsort.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/NVIDIA/aistore/cluster"
"github.com/NVIDIA/aistore/cluster/meta"
"github.com/NVIDIA/aistore/cmn"
"github.com/NVIDIA/aistore/cmn/archive"
"github.com/NVIDIA/aistore/cmn/cos"
"github.com/NVIDIA/aistore/cmn/debug"
"github.com/NVIDIA/aistore/cmn/mono"
Expand Down Expand Up @@ -96,7 +97,7 @@ func (m *Manager) start() (err error) {
return err
}

s := binary.BigEndian.Uint64(m.rs.TargetOrderSalt)
s := binary.BigEndian.Uint64(m.pars.TargetOrderSalt)
targetOrder := randomTargetOrder(s, m.smap.Tmap)
if m.config.FastV(4, cos.SmoduleDsort) {
nlog.Infof("%s: %s final target in targetOrder => URL: %s, tid %s", m.ctx.t, m.ManagerUUID,
Expand All @@ -112,14 +113,14 @@ func (m *Manager) start() (err error) {

// Phase 3. - run only by the final target
if curTargetIsFinal {
shardSize := m.rs.OutputShardSize
shardSize := m.pars.OutputShardSize
if m.extractCreator.UsingCompression() {
// By making the assumption that the input content is reasonably
// uniform across all shards, the output shard size required (such
// that each gzip compressed output shard will have a size close to
// rs.ShardSizeBytes) can be estimated.
avgCompressRatio := m.avgCompressionRatio()
shardSize = int64(float64(m.rs.OutputShardSize) / avgCompressRatio)
shardSize = int64(float64(m.pars.OutputShardSize) / avgCompressRatio)
if m.config.FastV(4, cos.SmoduleDsort) {
nlog.Infof("%s: %s estimated output shard size required before gzip compression: %d",
m.ctx.t, m.ManagerUUID, shardSize)
Expand Down Expand Up @@ -156,12 +157,10 @@ func (m *Manager) start() (err error) {
}

func (m *Manager) startDSorter() error {
defer m.markDSorterStarted()

defer m.markStarted()
if err := m.initStreams(); err != nil {
return err
}

nlog.Infof("%s: %s starting with dsorter: %q", m.ctx.t, m.ManagerUUID, m.dsorter.name())
return m.dsorter.start()
}
Expand All @@ -173,12 +172,12 @@ func (m *Manager) extractLocalShards() (err error) {
// compare with xact/xs/multiobj.go
group, ctx := errgroup.WithContext(context.Background())
switch {
case m.rs.Pit.isRange():
case m.pars.Pit.isRange():
err = m.iterRange(ctx, group)
case m.rs.Pit.isList():
case m.pars.Pit.isList():
err = m.iterList(ctx, group)
default:
debug.Assert(m.rs.Pit.isPrefix())
debug.Assert(m.pars.Pit.isPrefix())
debug.Assert(false, "not implemented yet") // TODO -- FIXME
}

Expand All @@ -194,7 +193,7 @@ func (m *Manager) extractLocalShards() (err error) {
func (m *Manager) iterRange(ctx context.Context, group *errgroup.Group) error {
var (
metrics = m.Metrics.Extraction
pt = m.rs.Pit.Template
pt = m.pars.Pit.Template
)
metrics.mu.Lock()
metrics.TotalCnt = pt.Count()
Expand All @@ -212,7 +211,7 @@ outer:
}

m.extractionPhase.adjuster.acquireGoroutineSema()
es := &extractShard{m, metrics, name}
es := &extractShard{m, metrics, name, true /*is-range*/}
group.Go(es.do)
}
return group.Wait()
Expand All @@ -221,10 +220,10 @@ outer:
func (m *Manager) iterList(ctx context.Context, group *errgroup.Group) error {
metrics := m.Metrics.Extraction
metrics.mu.Lock()
metrics.TotalCnt = int64(len(m.rs.Pit.ObjNames))
metrics.TotalCnt = int64(len(m.pars.Pit.ObjNames))
metrics.mu.Unlock()
outer:
for _, name := range m.rs.Pit.ObjNames {
for _, name := range m.pars.Pit.ObjNames {
select {
case <-m.listenAborted():
group.Wait()
Expand All @@ -235,7 +234,7 @@ outer:
}

m.extractionPhase.adjuster.acquireGoroutineSema()
es := &extractShard{m, metrics, name}
es := &extractShard{m, metrics, name, false /*is-range*/}
group.Go(es.do)
}
return group.Wait()
Expand All @@ -247,7 +246,7 @@ func (m *Manager) createShard(s *extract.Shard, lom *cluster.LOM) (err error) {
shardName = s.Name
errCh = make(chan error, 2)
)
if err = lom.InitBck(&m.rs.OutputBck); err != nil {
if err = lom.InitBck(&m.pars.OutputBck); err != nil {
return
}
lom.SetAtimeUnix(time.Now().UnixNano())
Expand Down Expand Up @@ -276,7 +275,7 @@ func (m *Manager) createShard(s *extract.Shard, lom *cluster.LOM) (err error) {
wg.Add(1)
go func() {
var err error
if !m.rs.DryRun {
if !m.pars.DryRun {
params := cluster.AllocPutObjParams()
{
params.WorkTag = "dsort"
Expand Down Expand Up @@ -335,7 +334,7 @@ func (m *Manager) createShard(s *extract.Shard, lom *cluster.LOM) (err error) {
// according to HRW, send it there. Since it doesn't really matter
// if we have an extra copy of the object local to this target, we
// optimize for performance by not removing the object now.
if si.ID() != m.ctx.node.ID() && !m.rs.DryRun {
if si.ID() != m.ctx.node.ID() && !m.pars.DryRun {
lom.Lock(false)
defer lom.Unlock(false)

Expand Down Expand Up @@ -484,7 +483,8 @@ func (m *Manager) participateInRecordDistribution(targetOrder meta.Nodes) (curre
err := m.doWithAbort(reqArgs)
r.CloseWithError(err)
if err != nil {
return errors.Errorf("failed to send SortedRecords to next target (%s), err: %v", sendTo.ID(), err)
return errors.Errorf("failed to send SortedRecords to next target (%s): %v",
sendTo.ID(), err)
}
return nil
})
Expand Down Expand Up @@ -533,7 +533,7 @@ func (m *Manager) participateInRecordDistribution(targetOrder meta.Nodes) (curre
m.recManager.MergeEnqueuedRecords()
}

err = sortRecords(m.recManager.Records, m.rs.Algorithm)
err = sortRecords(m.recManager.Records, m.pars.Algorithm)
m.dsorter.postRecordDistribution()
return true, err
}
Expand All @@ -543,7 +543,7 @@ func (m *Manager) generateShardsWithTemplate(maxSize int64) ([]*extract.Shard, e
start int
curShardSize int64
n = m.recManager.Records.Len()
pt = m.rs.Pot.Template
pt = m.pars.Pot.Template
shardCount = pt.Count()
shards = make([]*extract.Shard, 0)
numLocalRecords = make(map[string]int, m.smap.CountActiveTs())
Expand All @@ -568,7 +568,7 @@ func (m *Manager) generateShardsWithTemplate(maxSize int64) ([]*extract.Shard, e
return nil, errors.Errorf("number of shards to be created exceeds expected number of shards (%d)", shardCount)
}
shard := &extract.Shard{
Name: name + m.rs.Extension,
Name: name + m.pars.Extension,
}

shard.Size = curShardSize
Expand All @@ -594,12 +594,12 @@ func (m *Manager) generateShardsWithOrderingFile(maxSize int64) ([]*extract.Shar
if maxSize <= 0 {
return nil, fmt.Errorf(fmtErrInvalidMaxSize, maxSize)
}
parsedURL, err := url.Parse(m.rs.OrderFileURL)
parsedURL, err := url.Parse(m.pars.OrderFileURL)
if err != nil {
return nil, fmt.Errorf(fmtErrOrderURL, m.rs.OrderFileURL, err)
return nil, fmt.Errorf(fmtErrOrderURL, m.pars.OrderFileURL, err)
}

req, err := http.NewRequest(http.MethodGet, m.rs.OrderFileURL, http.NoBody)
req, err := http.NewRequest(http.MethodGet, m.pars.OrderFileURL, http.NoBody)
if err != nil {
return nil, err
}
Expand All @@ -616,7 +616,7 @@ func (m *Manager) generateShardsWithOrderingFile(maxSize int64) ([]*extract.Shar
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf(
"unexpected status code (%d) when requesting order file from %q",
resp.StatusCode, m.rs.OrderFileURL,
resp.StatusCode, m.pars.OrderFileURL,
)
}

Expand Down Expand Up @@ -652,10 +652,10 @@ func (m *Manager) generateShardsWithOrderingFile(maxSize int64) ([]*extract.Shar
continue
}

parts := strings.Split(line, m.rs.OrderFileSep)
parts := strings.Split(line, m.pars.OrderFileSep)
if len(parts) != 2 {
msg := fmt.Sprintf("malformed line (%d) in external key map: %s", idx, line)
if err := m.react(m.rs.EKMMalformedLine, msg); err != nil {
if err := m.react(m.pars.EKMMalformedLine, msg); err != nil {
return nil, err
}
}
Expand All @@ -669,8 +669,8 @@ func (m *Manager) generateShardsWithOrderingFile(maxSize int64) ([]*extract.Shar
key := fmt.Sprintf("%v", r.Key)
shardNameFmt, ok := externalKeyMap[key]
if !ok {
msg := fmt.Sprintf("extracted record %q which does not belong in external key map", key)
if err := m.react(m.rs.EKMMissingKey, msg); err != nil {
msg := fmt.Sprintf("record %q doesn't belong in external key map", key)
if err := m.react(m.pars.EKMMissingKey, msg); err != nil {
return nil, err
}
}
Expand Down Expand Up @@ -734,7 +734,7 @@ func (m *Manager) distributeShardRecords(maxSize int64) error {
sendOrder[d.ID()] = make(map[string]*extract.Shard, 100)
}
}
if m.rs.OrderFileURL != "" {
if m.pars.OrderFileURL != "" {
shards, err = m.generateShardsWithOrderingFile(maxSize)
} else {
shards, err = m.generateShardsWithTemplate(maxSize)
Expand All @@ -760,7 +760,7 @@ func (m *Manager) distributeShardRecords(maxSize int64) error {
// // target.
// }

bck := meta.CloneBck(&m.rs.OutputBck)
bck := meta.CloneBck(&m.pars.OutputBck)
if err := bck.Init(m.ctx.bmdOwner); err != nil {
return err
}
Expand Down Expand Up @@ -829,7 +829,7 @@ func (m *Manager) _dist(si *meta.Snode, s []*extract.Shard, order map[string]*ex
return err
})
group.Go(func() error {
query := m.rs.Bck.AddToQuery(nil)
query := m.pars.Bck.AddToQuery(nil)
reqArgs := &cmn.HreqArgs{
Method: http.MethodPost,
Base: si.URL(cmn.NetIntraData),
Expand Down Expand Up @@ -877,19 +877,32 @@ type extractShard struct {
m *Manager
metrics *LocalExtraction
name string
isRange bool
}

func (es *extractShard) do() (err error) {
m := es.m
shardName := es.name
if !strings.HasSuffix(es.name, es.m.rs.Extension) {
shardName = es.name + es.m.rs.Extension
if es.isRange && m.pars.Extension != "" {
ext, errV := archive.Mime("", es.name) // from filename
if errV == nil {
if !archive.EqExt(ext, m.pars.Extension) {
if m.config.FastV(4, cos.SmoduleDsort) {
nlog.Infof("%s: %s skipping %s: %q vs %q", m.ctx.t, m.ManagerUUID,
es.name, ext, m.pars.Extension)
}
return
}
} else {
shardName = es.name + m.pars.Extension
}
}
lom := cluster.AllocLOM(shardName)

err = es._do(lom)

cluster.FreeLOM(lom)
phaseInfo := &es.m.extractionPhase
phaseInfo := &m.extractionPhase
phaseInfo.adjuster.releaseGoroutineSema()
return
}
Expand All @@ -900,16 +913,16 @@ func (es *extractShard) _do(lom *cluster.LOM) error {
estimateTotalRecordsSize uint64
warnPossibleOOM bool
)
if err := lom.InitBck(&m.rs.Bck); err != nil {
if err := lom.InitBck(&m.pars.Bck); err != nil {
return err
}
if _, local, err := lom.HrwTarget(m.smap); err != nil || !local {
return err
}
if err := lom.Load(false /*cache it*/, false /*locked*/); err != nil {
if cmn.IsErrObjNought(err) {
msg := fmt.Sprintf("shard %q does not exist (is missing)", lom.Cname())
return m.react(m.rs.MissingShards, msg)
msg := fmt.Sprintf("extract.do: %q does not exist", lom.Cname())
return m.react(m.pars.MissingShards, msg)
}
return err
}
Expand Down Expand Up @@ -993,7 +1006,7 @@ func (es *extractShard) _do(lom *cluster.LOM) error {

if warnPossibleOOM {
msg := fmt.Sprintf("(estimated) total size of records (%d) will possibly exceed available memory (%s) during sorting phase",
estimateTotalRecordsSize, m.rs.MaxMemUsage)
estimateTotalRecordsSize, m.pars.MaxMemUsage)
return m.react(cmn.WarnReaction, msg)
}
return nil
Expand Down
12 changes: 6 additions & 6 deletions ext/dsort/dsort_general.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func newDSorterGeneral(m *Manager) (*dsorterGeneral, error) {
if err := mem.Get(); err != nil {
return nil, err
}
maxMemoryToUse := calcMaxMemoryUsage(m.rs.MaxMemUsage, &mem)
maxMemoryToUse := calcMaxMemoryUsage(m.pars.MaxMemUsage, &mem)
ds := &dsorterGeneral{
m: m,
mw: newMemoryWatcher(m, maxMemoryToUse),
Expand Down Expand Up @@ -117,7 +117,7 @@ func (*dsorterGeneral) name() string { return DSorterGeneralType }

func (ds *dsorterGeneral) init() error {
ds.creationPhase.adjuster = newConcAdjuster(
ds.m.rs.CreateConcMaxLimit,
ds.m.pars.CreateConcMaxLimit,
1, /*goroutineLimitCoef*/
)
return nil
Expand Down Expand Up @@ -147,8 +147,8 @@ func (ds *dsorterGeneral) start() error {

trname = fmt.Sprintf(recvRespStreamNameFmt, ds.m.ManagerUUID)
streamMultiplier := config.DSort.SbundleMult
if ds.m.rs.StreamMultiplier != 0 {
streamMultiplier = ds.m.rs.StreamMultiplier
if ds.m.pars.StreamMultiplier != 0 {
streamMultiplier = ds.m.pars.StreamMultiplier
}
respSbArgs := bundle.Args{
Multiplier: streamMultiplier,
Expand Down Expand Up @@ -294,7 +294,7 @@ func (ds *dsorterGeneral) loadLocal(w io.Writer, obj *extract.RecordObj) (writte

fullContentPath := ds.m.recManager.FullContentPath(obj)

if ds.m.rs.DryRun {
if ds.m.pars.DryRun {
r := cos.NopReader(obj.MetadataSize + obj.Size)
written, err = io.CopyBuffer(w, r, buf)
return
Expand Down Expand Up @@ -506,7 +506,7 @@ func (ds *dsorterGeneral) recvReq(hdr transport.ObjHdr, objReader io.Reader, err

fullContentPath := ds.m.recManager.FullContentPath(req.RecordObj)

if ds.m.rs.DryRun {
if ds.m.pars.DryRun {
lr := cos.NopReader(req.RecordObj.MetadataSize + req.RecordObj.Size)
r := cos.NopOpener(io.NopCloser(lr))
o.Hdr.ObjAttrs.Size = req.RecordObj.MetadataSize + req.RecordObj.Size
Expand Down
Loading

0 comments on commit cf73a58

Please sign in to comment.