Skip to content

Commit

Permalink
feat(dl): speed up resuming download
Browse files Browse the repository at this point in the history
  • Loading branch information
iyear committed Feb 3, 2023
1 parent 6f80444 commit 0cd0d85
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 17 deletions.
12 changes: 5 additions & 7 deletions app/internal/dliter/iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ func New(opts *Options) (*Iter, error) {
exclude: excludeMap,
curi: 0,
curj: -1,
preSum: preSum(dialogs),
finished: make(map[int]struct{}),
template: tpl,
manager: manager,
Expand All @@ -68,14 +69,15 @@ func (iter *Iter) Next(ctx context.Context) (*downloader.Item, error) {
}
iter.curj = 0
}
i, j := iter.curi, iter.curj
iter.mu.Unlock()

// check if finished
if _, ok := iter.finished[iter.ij2n(iter.curi, iter.curj)]; ok {
if _, ok := iter.finished[iter.ij2n(i, j)]; ok {
return nil, downloader.ErrSkip
}

return iter.item(ctx, iter.curi, iter.curj)
return iter.item(ctx, i, j)
}

func (iter *Iter) item(ctx context.Context, i, j int) (*downloader.Item, error) {
Expand Down Expand Up @@ -159,11 +161,7 @@ func (iter *Iter) Total(_ context.Context) int {
}

func (iter *Iter) ij2n(i, j int) int {
n := 0
for k := 0; k < i; k++ {
n += len(iter.dialogs[k].Messages)
}
return n + j
return iter.preSum[i] + j
}

func (iter *Iter) SetFinished(finished map[int]struct{}) {
Expand Down
69 changes: 69 additions & 0 deletions app/internal/dliter/iter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package dliter

import (
"reflect"
"testing"
)

func TestPreSum(t *testing.T) {
tests := []struct {
dialogs []*Dialog
want []int
}{
{
dialogs: []*Dialog{{Messages: []int{1, 2, 3}}, {Messages: []int{1, 2}}},
want: []int{0, 3, 5},
},
{
dialogs: []*Dialog{{Messages: []int{1, 2, 3}}, {Messages: []int{1, 2, 3}}, {Messages: []int{1, 2, 3, 4}}},
want: []int{0, 3, 6, 10},
},
{
dialogs: []*Dialog{{Messages: []int{1, 2, 3}}, {Messages: []int{1, 2, 3}}, {Messages: []int{1, 2, 3, 4}}, {Messages: []int{1}}},
want: []int{0, 3, 6, 10, 11},
},
}

for _, tt := range tests {
got := preSum(tt.dialogs)
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("preSum() = %v, want %v", got, tt.want)
}
}
}

func TestIter_ij2n(t *testing.T) {
tests := []struct {
dialogs []*Dialog
input []struct {
i, j int
}
want []int
}{
{
dialogs: []*Dialog{{Messages: []int{1, 2, 3}}, {Messages: []int{1, 2}}},
input: []struct {
i, j int
}{{0, 0}, {0, 1}, {0, 2}, {1, 0}, {1, 1}},
want: []int{0, 1, 2, 3, 4},
},
{
dialogs: []*Dialog{{Messages: []int{1, 2, 3}}, {Messages: []int{1, 2, 3}}, {Messages: []int{1, 2, 3, 4}}},
input: []struct {
i, j int
}{{0, 0}, {0, 1}, {0, 2}, {1, 0}, {1, 1}, {1, 2}, {2, 0}, {2, 1}, {2, 2}, {2, 3}},
want: []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9},
},
}

for _, tt := range tests {
iter := &Iter{preSum: preSum(tt.dialogs), dialogs: tt.dialogs}

for i, input := range tt.input {
got := iter.ij2n(input.i, input.j)
if got != tt.want[i] {
t.Errorf("ij2n(%v, %v) = %v, want %v", input.i, input.j, got, tt.want[i])
}
}
}
}
1 change: 1 addition & 0 deletions app/internal/dliter/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type Iter struct {
mu sync.Mutex
curi int
curj int
preSum []int
finished map[int]struct{}
template *template.Template
manager *peers.Manager
Expand Down
9 changes: 9 additions & 0 deletions app/internal/dliter/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,12 @@ func collectDialogs(dialogs [][]*Dialog) []*Dialog {
}
return res
}

// preSum of dialogs
func preSum(dialogs []*Dialog) []int {
sum := make([]int, len(dialogs)+1)
for i, m := range dialogs {
sum[i+1] = sum[i] + len(m.Messages)
}
return sum
}
21 changes: 11 additions & 10 deletions pkg/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,17 +70,18 @@ func (d *Downloader) Download(ctx context.Context, limit int) error {
wg.SetLimit(limit)

for i := 0; i < total; i++ {
wg.Go(func() (rerr error) {
item, err := d.iter.Next(errctx)
if err != nil {
logger.From(errctx).Debug("Iter next failed",
zap.String("error", err.Error()))
// skip error means we don't need to log error
if !errors.Is(err, ErrSkip) && !errors.Is(err, context.Canceled) {
d.pw.Log(color.RedString("failed: %v", err))
}
return nil
item, err := d.iter.Next(errctx)
if err != nil {
logger.From(errctx).Debug("Iter next failed",
zap.Int("index", i), zap.String("error", err.Error()))
// skip error means we don't need to log error
if !errors.Is(err, ErrSkip) && !errors.Is(err, context.Canceled) {
d.pw.Log(color.RedString("failed: %v", err))
}
continue
}

wg.Go(func() error {
return d.download(errctx, item)
})
}
Expand Down

0 comments on commit 0cd0d85

Please sign in to comment.