Skip to content

Commit

Permalink
dsort: no need to block when sending shard records
Browse files Browse the repository at this point in the history
* dsort-general (above); TODO: review all _similar_ cases
* TODO: remove dsort metrics mutex - use atomics
* with minor ref
* up CLI

Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
alex-aizman committed Aug 28, 2023
1 parent 7e1e7c9 commit 4ea8602
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 48 deletions.
3 changes: 2 additions & 1 deletion ais/test/dsort_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1392,13 +1392,14 @@ func TestDsortAbort(t *testing.T) {
)
}

// TODO -- FIXME: pass asXaction = true
func TestDsortAbortDuringPhases(t *testing.T) {
tools.CheckSkip(t, tools.SkipTestArgs{Long: true})

runDSortTest(
t, dsortTestSpec{p: true, types: dsorterTypes, phases: dsortPhases},
func(dsorterType, phase string, t *testing.T) {
for _, asXaction := range []bool{false, true} {
for _, asXaction := range []bool{false} {
test := dsorterType + "/" + fmt.Sprintf("as-xaction=%t", asXaction)
t.Run(test, func(t *testing.T) {
var (
Expand Down
2 changes: 1 addition & 1 deletion cmd/cli/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.20

// direct
require (
github.com/NVIDIA/aistore v1.3.19-0.20230824200641-cfe8329d6f64
github.com/NVIDIA/aistore v1.3.19-0.20230828144453-a2902adbb750
github.com/fatih/color v1.14.1
github.com/json-iterator/go v1.1.12
github.com/onsi/ginkgo v1.16.5
Expand Down
4 changes: 2 additions & 2 deletions cmd/cli/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMT
code.cloudfoundry.org/bytefmt v0.0.0-20190710193110-1eb035ffe2b6/go.mod h1:wN/zk7mhREp/oviagqUXY3EwuHhWyOvAdsn5Y4CzOrc=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/toml v1.2.1/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
github.com/NVIDIA/aistore v1.3.19-0.20230824200641-cfe8329d6f64 h1:i/xALZmLd722rlncwOeVYXzrlHXpSQn5dY4r+GvVlOQ=
github.com/NVIDIA/aistore v1.3.19-0.20230824200641-cfe8329d6f64/go.mod h1:tZvUalPk4wL/+5+5psJkZRHBqu3i2KV9g97HYyHvwc4=
github.com/NVIDIA/aistore v1.3.19-0.20230828144453-a2902adbb750 h1:1CMjOKzGOls/ne66mcnx8bTrD6lS5jcxXaP6EquL0qs=
github.com/NVIDIA/aistore v1.3.19-0.20230828144453-a2902adbb750/go.mod h1:tZvUalPk4wL/+5+5psJkZRHBqu3i2KV9g97HYyHvwc4=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/OneOfOne/xxhash v1.2.8 h1:31czK/TI9sNkxIKfaUfGlU47BAxQ0ztGgd9vPyqimf8=
github.com/OneOfOne/xxhash v1.2.8/go.mod h1:eZbhyaAYD41SGSSsnmcpxVoRiQ/MPUTjUdIIOT9Um7Q=
Expand Down
11 changes: 8 additions & 3 deletions ext/dsort/dsort.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,13 +113,17 @@ func (m *Manager) start() (err error) {
if curTargetIsFinal {
// assuming uniform distribution estimate avg. output shard size
ratio := m.compressionRatio()
if m.config.FastV(4, cos.SmoduleDsort) {
nlog.Infof("%s [dsort] %s phase3: ratio=%f", g.t, m.ManagerUUID, ratio)
}
debug.Assertf(shard.IsCompressed(m.Pars.InputExtension) || ratio == 1, "tar ratio=%f, ext=%q",
ratio, m.Pars.InputExtension)

shardSize := int64(float64(m.Pars.OutputShardSize) / ratio)

nlog.Infof("%s: %s started phase 3 distribution", g.t, m.ManagerUUID)
nlog.Infof("%s: [dsort] %s started phase 3: ratio=%f, shard size (%d, %d)",
g.t, m.ManagerUUID, shardSize, m.Pars.OutputShardSize)
if err := m.phase3(shardSize); err != nil {
nlog.Errorf("%s: [dsort] %s phase3 err: %v", g.t, m.ManagerUUID, err)
return err
}
}
Expand Down Expand Up @@ -810,9 +814,10 @@ func (m *Manager) phase3(maxSize int64) error {
close(errCh)

for err := range errCh {
nlog.Errorf("%s: [dsort] %s err while sending shards: %v", g.t, m.ManagerUUID, err)
return err
}
nlog.Infof("%s: %s finished sending all shards", g.t, m.ManagerUUID)
nlog.Infof("%s: [dsort] %s finished sending shards", g.t, m.ManagerUUID)
return nil
}

Expand Down
62 changes: 21 additions & 41 deletions ext/dsort/dsort_general.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/NVIDIA/aistore/cmn/cos"
"github.com/NVIDIA/aistore/cmn/debug"
"github.com/NVIDIA/aistore/cmn/mono"
"github.com/NVIDIA/aistore/cmn/nlog"
"github.com/NVIDIA/aistore/ext/dsort/shard"
"github.com/NVIDIA/aistore/fs"
"github.com/NVIDIA/aistore/memsys"
Expand Down Expand Up @@ -344,20 +345,14 @@ func (ds *dsorterGeneral) loadLocal(w io.Writer, obj *shard.RecordObj) (written

func (ds *dsorterGeneral) loadRemote(w io.Writer, rec *shard.Record, obj *shard.RecordObj) (int64, error) {
var (
cbErr error
beforeRecv int64
beforeSend int64

daemonID = rec.DaemonID
twg = cos.NewTimeoutGroup()
writer = ds.newStreamWriter(rec.MakeUniqueName(obj), w)
metrics = ds.m.Metrics.Creation

toNode = ds.m.smap.GetTarget(daemonID)
daemonID = rec.DaemonID
writer = ds.newStreamWriter(rec.MakeUniqueName(obj), w)
metrics = ds.m.Metrics.Creation
toNode = ds.m.smap.GetTarget(daemonID)
)

if toNode == nil {
return 0, errors.Errorf("tried to send request to node %q which is not present in the smap", daemonID)
return 0, errors.Errorf("cannot send request to node %q - not present in %s", daemonID, ds.m.smap)
}

req := remoteRequest{
Expand All @@ -367,40 +362,12 @@ func (ds *dsorterGeneral) loadRemote(w io.Writer, rec *shard.Record, obj *shard.
opaque := cos.MustMarshal(req)
o := transport.AllocSend()
o.Hdr = transport.ObjHdr{Opaque: opaque}
if ds.m.Metrics.extended {
beforeSend = mono.NanoTime()
}
o.Callback = func(hdr transport.ObjHdr, r io.ReadCloser, _ any, err error) {
if err != nil {
cbErr = err
}
if ds.m.Metrics.extended {
delta := mono.Since(beforeSend)
metrics.mu.Lock()
metrics.RequestStats.updateTime(delta)
metrics.mu.Unlock()

g.tstats.AddMany(
cos.NamedVal64{Name: stats.DSortCreationReqCount, Value: 1},
cos.NamedVal64{Name: stats.DSortCreationReqLatency, Value: int64(delta)},
)
}
twg.Done()
}
o.Callback, o.CmplArg = ds.sentCallback, &req

twg.Add(1)
if err := ds.streams.request.Send(o, nil, toNode); err != nil {
return 0, errors.WithStack(err)
}

// Send should be synchronous to make sure that 'wait timeout' is
// calculated only for the receive side.
twg.Wait()

if cbErr != nil {
return 0, errors.WithStack(cbErr)
}

if ds.m.Metrics.extended {
beforeRecv = mono.NanoTime()
}
Expand All @@ -412,7 +379,7 @@ func (ds *dsorterGeneral) loadRemote(w io.Writer, rec *shard.Record, obj *shard.
var pulled bool
timed, stopped := writer.wg.WaitTimeoutWithStop(ds.m.callTimeout, ds.m.listenAborted())
if timed || stopped {
// In case of time out or abort we need to pull the writer to
// In case of timeout or abort we need to pull the writer to
// avoid concurrent Close and Write on `writer.w`.
pulled = ds.pullStreamWriter(rec.MakeUniqueName(obj)) != nil
}
Expand Down Expand Up @@ -454,6 +421,19 @@ func (ds *dsorterGeneral) loadRemote(w io.Writer, rec *shard.Record, obj *shard.
return writer.n, writer.err
}

func (ds *dsorterGeneral) sentCallback(_ transport.ObjHdr, _ io.ReadCloser, arg any, err error) {
if err != nil {
req := arg.(*remoteRequest)
nlog.Errorf("%s: [dsort] %s failed to send record name=%q: %v", g.t, ds.m.ManagerUUID, req.Record.Name, err)
return
}
if ds.m.Metrics.extended {
g.tstats.AddMany(
cos.NamedVal64{Name: stats.DSortCreationReqCount, Value: 1},
)
}
}

func (ds *dsorterGeneral) errHandler(err error, node *meta.Snode, o *transport.Obj) {
*o = transport.Obj{Hdr: o.Hdr}
o.Hdr.Opaque = []byte(err.Error())
Expand Down

0 comments on commit 4ea8602

Please sign in to comment.