Skip to content

Commit

Permalink
fix(dl): wait group add miss
Browse files Browse the repository at this point in the history
  • Loading branch information
iyear committed Dec 3, 2022
1 parent 3027d60 commit ada1ead
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 26 deletions.
31 changes: 16 additions & 15 deletions app/dl/iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package dl
import (
"bytes"
"context"
"errors"
"fmt"
"github.com/gotd/td/telegram/peers"
"github.com/gotd/td/telegram/query"
Expand All @@ -11,13 +12,15 @@ import (
"github.com/iyear/tdl/pkg/kv"
"github.com/iyear/tdl/pkg/storage"
"github.com/iyear/tdl/pkg/utils"
"sync"
"text/template"
"time"
)

type iter struct {
client *tg.Client
dialogs []*dialog
mu sync.Mutex
curi int
curj int
template *template.Template
Expand Down Expand Up @@ -68,34 +71,25 @@ func newIter(client *tg.Client, kvd kv.KV, tmpl string, items ...[]*dialog) (*it
}, nil
}

func (i *iter) Next(ctx context.Context) bool {
func (i *iter) Next(ctx context.Context) (*downloader.Item, error) {
select {
case <-ctx.Done():
return false
return nil, ctx.Err()
default:
}

i.mu.Lock()
i.curj++
if i.curj >= len(i.dialogs[i.curi].msgs) {
if i.curi++; i.curi >= len(i.dialogs) {
return false
return nil, errors.New("no more items")
}
i.curj = 0
return true
}

return true
}

func (i *iter) Value(ctx context.Context) (*downloader.Item, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}

curi := i.dialogs[i.curi]
cur := curi.msgs[i.curj]
i.mu.Unlock()

return i.item(ctx, curi.peer, cur)
}
Expand Down Expand Up @@ -143,5 +137,12 @@ func (i *iter) item(ctx context.Context, peer tg.InputPeerClass, msg int) (*down
}

func (i *iter) Total(_ context.Context) int {
return len(i.dialogs)
i.mu.Lock()
defer i.mu.Unlock()

total := 0
for _, m := range i.dialogs {
total += len(m.msgs)
}
return total
}
17 changes: 8 additions & 9 deletions pkg/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ func New(client *tg.Client, partSize int, threads int, iter Iter) *Downloader {
func (d *Downloader) Download(ctx context.Context, limit int) error {
d.pw.Log(color.GreenString("All files will be downloaded to '%s' dir", consts.DownloadPath))

d.pw.SetNumTrackersExpected(d.iter.Total(ctx))
total := d.iter.Total(ctx)
d.pw.SetNumTrackersExpected(total)

go d.pw.Render()

Expand All @@ -53,15 +54,13 @@ func (d *Downloader) Download(ctx context.Context, limit int) error {

go runPS(errctx, d.pw)

for d.iter.Next(ctx) {
item, err := d.iter.Value(ctx)
if err != nil {
d.pw.Log(color.RedString("Get item failed: %v, skip...", err))
continue
}

for i := 0; i < total; i++ {
wg.Go(func() error {
// d.pw.Log(color.MagentaString("name: %s,size: %s", item.Name, utils.Byte.FormatBinaryBytes(item.Size)))
item, err := d.iter.Next(errctx)
if err != nil {
d.pw.Log(color.RedString("Get item failed: %v, skip...", err))
return nil
}
return d.download(errctx, item)
})
}
Expand Down
3 changes: 1 addition & 2 deletions pkg/downloader/iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@ import (
)

type Iter interface {
Next(ctx context.Context) bool
Value(ctx context.Context) (*Item, error)
Next(ctx context.Context) (*Item, error)
Total(ctx context.Context) int
}

Expand Down

0 comments on commit ada1ead

Please sign in to comment.