Skip to content

Commit

Permalink
dsort: stateless shard readers/writers; refactor and simplify
Browse files Browse the repository at this point in the history
* there's no need to keep creating TAR, TGZ, etc. readers/writers
* make them all stateless, initialize at startup
  (all the 4 currently supported, that is)
* ref: consistency in the naming, shorter names, etc.
* part sixteen, prev. commit: feebeae

Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Aug 14, 2023
1 parent feebeae commit 6c74cb9
Show file tree
Hide file tree
Showing 14 changed files with 199 additions and 192 deletions.
4 changes: 2 additions & 2 deletions ais/test/dsort_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ outer:
_, err := api.GetObject(baseParams, bucket, shardName, &getArgs)
if err != nil {
herr, ok := err.(*cmn.ErrHTTP)
if ok && herr.Status == http.StatusNotFound && archive.IsCompressed(df.inputExt) && i > 0 {
if ok && herr.Status == http.StatusNotFound && shard.IsCompressed(df.inputExt) && i > 0 {
// check for NotFound a few more, and break; see also 'skipped == 0' check below
switch skipped {
case 0:
Expand Down Expand Up @@ -475,7 +475,7 @@ outer:
}
}

if archive.IsCompressed(df.inputExt) {
if shard.IsCompressed(df.inputExt) {
tlog.Logf("%s: computed output shard count (%d) vs resulting compressed (%d)\n",
df.job(), df.outputShardCnt, realOutputShardCnt)
}
Expand Down
14 changes: 4 additions & 10 deletions cmn/archive/mime.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,14 @@ import (
"strings"

"github.com/NVIDIA/aistore/cmn/cos"
"github.com/NVIDIA/aistore/cmn/debug"
"github.com/NVIDIA/aistore/memsys"
)

// supported archive types (file extensions); see also archExts in cmd/cli/cli/const.go
// NOTE: when adding/removing update:
// - FileExtensions
// - IsCompressed
// - allMagics
// NOTE: when adding/removing formats - update:
// - FileExtensions
// - allMagics
// - ext/dsort/shard/rw.go
const (
ExtTar = ".tar"
ExtTgz = ".tgz"
Expand All @@ -48,11 +47,6 @@ type detect struct {

var FileExtensions = []string{ExtTar, ExtTgz, ExtTarGz, ExtZip, ExtTarLz4}

func IsCompressed(mime string) bool {
debug.Assert(cos.StringInSlice(mime, FileExtensions), mime)
return mime != ExtTar
}

// standard file signatures
var (
magicTar = detect{offset: 257, sig: []byte("ustar"), mime: ExtTar}
Expand Down
36 changes: 21 additions & 15 deletions ext/dsort/dsort.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func (m *Manager) start() (err error) {
if curTargetIsFinal {
// assuming uniform distribution estimate avg. output shard size
ratio := m.compressionRatio()
debug.Assertf(archive.IsCompressed(m.pars.InputExtension) || ratio == 1, "tar ratio=%f, ext=%q",
debug.Assertf(shard.IsCompressed(m.pars.InputExtension) || ratio == 1, "tar ratio=%f, ext=%q",
ratio, m.pars.InputExtension)

shardSize := int64(float64(m.pars.OutputShardSize) / ratio)
Expand Down Expand Up @@ -293,13 +293,18 @@ func (m *Manager) createShard(s *shard.Shard, lom *cluster.LOM) (err error) {
wg.Done()
}()

ec := m.ec
if m.pars.InputExtension != m.pars.OutputExtension {
// NOTE: resharding into a different format
ec = newExtractCreator(g.t, m.pars.OutputExtension)
// may reshard into a different format
shardRW := m.shardRW
//
// TODO -- FIXME: compare with extractShard._do()
//
if !m.pars.DryRun && m.pars.OutputExtension != m.pars.InputExtension {
debug.Assert(m.pars.OutputExtension != "")
shardRW = shard.RWs[m.pars.OutputExtension]
debug.Assert(shardRW != nil, m.pars.OutputExtension)
}

_, err = ec.Create(s, w, m.dsorter)
_, err = shardRW.Create(s, w, m.dsorter)
w.CloseWithError(err)
if err != nil {
r.CloseWithError(err)
Expand Down Expand Up @@ -682,7 +687,7 @@ func (m *Manager) generateShardsWithOrderingFile(maxSize int64) ([]*shard.Shard,
}

shards := shardsBuilder[shardNameFmt]
recordSize := r.TotalSize() + m.ec.MetadataSize()*int64(len(r.Objects))
recordSize := r.TotalSize() + m.shardRW.MetadataSize()*int64(len(r.Objects))
shardCount := len(shards)
if shardCount == 0 || shards[shardCount-1].Size > maxSize {
shard := &shard.Shard{
Expand Down Expand Up @@ -896,7 +901,7 @@ func (es *extractShard) _do(lom *cluster.LOM) error {
var (
m = es.m
estimateTotalRecordsSize uint64
warnPossibleOOM bool
warnOOM bool
)
if err := lom.InitBck(&m.pars.InputBck); err != nil {
return err
Expand All @@ -912,14 +917,15 @@ func (es *extractShard) _do(lom *cluster.LOM) error {
return err
}

ec := m.ec
if m.pars.InputExtension == "" {
shardRW := m.shardRW
if shardRW == nil {
debug.Assert(!m.pars.DryRun)
ext, err := archive.Mime("", lom.FQN)
if err != nil {
return nil // skip
}
// NOTE: extract-creator for _this_ shard (compare with createShard above)
ec = newExtractCreator(g.t, ext)
shardRW = shard.RWs[ext]
debug.Assert(shardRW != nil, ext)
}

phaseInfo := &m.extractionPhase
Expand Down Expand Up @@ -949,7 +955,7 @@ func (es *extractShard) _do(lom *cluster.LOM) error {

beforeExtraction := mono.NanoTime()

extractedSize, extractedCount, err := ec.Extract(lom, fh, m.recm, toDisk)
extractedSize, extractedCount, err := shardRW.Extract(lom, fh, m.recm, toDisk)
cos.Close(fh)

dur := mono.Since(beforeExtraction)
Expand Down Expand Up @@ -980,7 +986,7 @@ func (es *extractShard) _do(lom *cluster.LOM) error {
recordSize := int(m.recm.Records.RecordMemorySize())
estimateTotalRecordsSize = uint64(metrics.TotalCnt * int64(extractedCount*recordSize))
if estimateTotalRecordsSize > m.freeMemory() {
warnPossibleOOM = true
warnOOM = true
}
}
metrics.ExtractedSize += extractedSize
Expand All @@ -994,7 +1000,7 @@ func (es *extractShard) _do(lom *cluster.LOM) error {
}
metrics.mu.Unlock()

if warnPossibleOOM {
if warnOOM {
msg := fmt.Sprintf("(estimated) total size of records (%d) will possibly exceed available memory (%s) during sorting phase",
estimateTotalRecordsSize, m.pars.MaxMemUsage)
return m.react(cmn.WarnReaction, msg)
Expand Down
37 changes: 11 additions & 26 deletions ext/dsort/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/NVIDIA/aistore/cluster"
"github.com/NVIDIA/aistore/cluster/meta"
"github.com/NVIDIA/aistore/cmn"
"github.com/NVIDIA/aistore/cmn/archive"
"github.com/NVIDIA/aistore/cmn/atomic"
"github.com/NVIDIA/aistore/cmn/cos"
"github.com/NVIDIA/aistore/cmn/debug"
Expand Down Expand Up @@ -85,7 +84,7 @@ type (
mu sync.Mutex
smap *meta.Smap
recm *shard.RecordManager
ec shard.Creator
shardRW shard.RW
startShardCreation chan struct{}
pars *parsedReqSpec
client *http.Client // Client for sending records metadata
Expand Down Expand Up @@ -142,6 +141,8 @@ func Tinit(t cluster.Target, stats stats.Tracker, db kvdb.Driver) {
g.t = t
g.tstats = stats

shard.T = t

fs.CSM.Reg(ct.DSortFileType, &ct.DSortFile{})
fs.CSM.Reg(ct.DSortWorkfileType, &ct.DSortFile{})

Expand Down Expand Up @@ -186,7 +187,6 @@ func (m *Manager) init(pars *parsedReqSpec) error {
return err
}

// Set extract creator depending on extension provided by the user
if err := m.setRW(); err != nil {
return err
}
Expand Down Expand Up @@ -310,7 +310,7 @@ func (m *Manager) cleanup() {

debug.Assertf(!m.inProgress(), "%s: was still in progress", m.ManagerUUID)

m.ec = nil
m.shardRW = nil
m.client = nil

g.t.Sowner().Listeners().Unreg(m)
Expand Down Expand Up @@ -453,33 +453,18 @@ func (m *Manager) setRW() (err error) {
return errors.WithStack(err)
}

m.ec = newExtractCreator(g.t, m.pars.InputExtension)
if m.ec == nil {
m.shardRW = shard.RWs[m.pars.InputExtension]
if m.shardRW == nil {
debug.Assert(!m.pars.DryRun, "dry-run in combination with _any_ shard extension is not supported")
debug.Assert(m.pars.InputExtension == "", m.pars.InputExtension)
// NOTE: [feature] allow non-specified extension; assign default extract-creator;
// handle all shards we encounter - all supported formats
m.ec = shard.NewTarRW(g.t)
// TODO -- FIXME: niy
}
if m.pars.DryRun {
debug.Assert(m.ec != nil, "dry-run in combination with _any_ shard extension is not supported yet")
m.ec = shard.NopRW(m.ec)
m.shardRW = shard.NopRW(m.shardRW)
}
m.recm = shard.NewRecordManager(g.t, m.pars.InputBck, m.ec, ke, m.onDupRecs)
return nil
}

func newExtractCreator(t cluster.Target, ext string) (ec shard.Creator) {
switch ext {
case archive.ExtTar:
ec = shard.NewTarRW(t)
case archive.ExtTarGz, archive.ExtTgz:
ec = shard.NewTargzRW(t, ext)
case archive.ExtZip:
ec = shard.NewZipRW(t)
case archive.ExtTarLz4:
ec = shard.NewTarlz4RW(t)
}
return
m.recm = shard.NewRecordManager(m.pars.InputBck, m.shardRW, ke, m.onDupRecs)
return nil
}

// updateFinishedAck marks daemonID as finished. If all daemons ack then the
Expand Down
2 changes: 1 addition & 1 deletion ext/dsort/mem_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func (mw *memoryWatcher) watchExcess(memStat sys.MemStat) {
}

storeType := shard.DiskStoreType
if mw.m.ec.SupportsOffset() {
if mw.m.shardRW.SupportsOffset() {
storeType = shard.OffsetStoreType
}
mw.m.recm.RecordContents().Range(func(key, value any) bool {
Expand Down
2 changes: 1 addition & 1 deletion ext/dsort/request_spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func (rs *RequestSpec) parse() (*parsedReqSpec, error) {
}
}
if rs.OutputExtension == "" {
pars.OutputExtension = pars.InputExtension
pars.OutputExtension = pars.InputExtension // default
} else {
pars.OutputExtension, err = archive.Mime(rs.OutputExtension, "")
if err != nil {
Expand Down
19 changes: 9 additions & 10 deletions ext/dsort/shard/noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,22 @@ import (
)

// interface guard
var _ Creator = (*nopRW)(nil)
var _ RW = (*nopRW)(nil)

type nopRW struct {
internal Creator
internal RW
}

func NopRW(internal Creator) Creator {
return &nopRW{internal: internal}
}
func NopRW(internal RW) RW { return &nopRW{internal: internal} }

func (n *nopRW) IsCompressed() bool { return n.internal.IsCompressed() }
func (n *nopRW) SupportsOffset() bool { return n.internal.SupportsOffset() }
func (n *nopRW) MetadataSize() int64 { return n.internal.MetadataSize() }

// Extract reads the tarball f and extracts its metadata.
func (t *nopRW) Extract(lom *cluster.LOM, r cos.ReadReaderAt, extractor RecordExtractor, toDisk bool) (extractedSize int64,
func (n *nopRW) Extract(lom *cluster.LOM, r cos.ReadReaderAt, extractor RecordExtractor, toDisk bool) (extractedSize int64,
extractedCount int, err error) {
return t.internal.Extract(lom, r, extractor, toDisk)
return n.internal.Extract(lom, r, extractor, toDisk)
}

// Create creates a new shard locally based on the Shard.
Expand All @@ -43,6 +45,3 @@ func (*nopRW) Create(s *Shard, w io.Writer, loader ContentLoader) (written int64
}
return written, nil
}

func (*nopRW) SupportsOffset() bool { return true }
func (t *nopRW) MetadataSize() int64 { return t.internal.MetadataSize() }
2 changes: 1 addition & 1 deletion ext/dsort/shard/rcb.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
// `archive.Reader` rcb context and callback; uses `extractor` to extract
type rcbCtx struct {
extractor RecordExtractor
parent Creator
parent RW
tw *tar.Writer
shardName string
buf []byte
Expand Down
24 changes: 6 additions & 18 deletions ext/dsort/shard/recm.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,27 +54,16 @@ type (
Load(w io.Writer, rec *Record, obj *RecordObj) (int64, error)
}

// Creator is interface which describes set of functions which each
// shard creator should implement.
Creator interface {
Extract(lom *cluster.LOM, r cos.ReadReaderAt, extractor RecordExtractor, toDisk bool) (int64, int, error)
Create(s *Shard, w io.Writer, loader ContentLoader) (int64, error)
SupportsOffset() bool
MetadataSize() int64
}

RecordExtractor interface {
RecordWithBuffer(args extractRecordArgs) (int64, error)
}

RecordManager struct {
Records *Records

t cluster.Target
Records *Records
bck cmn.Bck
onDuplicatedRecords func(string) error

extractCreator Creator
extractCreator RW
keyExtractor KeyExtractor
contents *sync.Map
extractionPaths *sync.Map // Keys correspond to all paths to record contents on disk.
Expand All @@ -86,11 +75,10 @@ type (
}
)

func NewRecordManager(t cluster.Target, bck cmn.Bck, extractCreator Creator,
func NewRecordManager(bck cmn.Bck, extractCreator RW,
keyExtractor KeyExtractor, onDuplicatedRecords func(string) error) *RecordManager {
return &RecordManager{
Records: NewRecords(1000),
t: t,
bck: bck,
onDuplicatedRecords: onDuplicatedRecords,
extractCreator: extractCreator,
Expand Down Expand Up @@ -130,7 +118,7 @@ func (recm *RecordManager) RecordWithBuffer(args extractRecordArgs) (size int64,
storeType = SGLStoreType
contentPath, fullContentPath = recm.encodeRecordName(storeType, args.shardName, args.recordName)

sgl := recm.t.PageMM().NewSGL(r.Size() + int64(len(args.metadata)))
sgl := T.PageMM().NewSGL(r.Size() + int64(len(args.metadata)))
// No need for `io.CopyBuffer` since SGL implements `io.ReaderFrom`.
if _, err = io.Copy(sgl, bytes.NewReader(args.metadata)); err != nil {
sgl.Free()
Expand Down Expand Up @@ -192,7 +180,7 @@ func (recm *RecordManager) RecordWithBuffer(args extractRecordArgs) (size int64,
recm.Records.Insert(&Record{
Key: key,
Name: recordUniqueName,
DaemonID: recm.t.SID(),
DaemonID: T.SID(),
Objects: []*RecordObj{{
ContentPath: contentPath,
ObjectFileType: args.fileType,
Expand Down Expand Up @@ -368,7 +356,7 @@ func (recm *RecordManager) Cleanup() {

// NOTE: forcefully free all MMSA memory to the OS
// TODO: another reason to use a separate MMSA for extractions
recm.t.PageMM().FreeSpec(memsys.FreeSpec{
T.PageMM().FreeSpec(memsys.FreeSpec{
Totally: true,
ToOS: true,
MinSize: 1, // force toGC to free all (even small) memory to system
Expand Down
Loading

0 comments on commit 6c74cb9

Please sign in to comment.