Skip to content

Commit

Permalink
🐛 [pagination] Fix stream pagination (#518)
Browse files Browse the repository at this point in the history
<!--
Copyright (C) 2020-2022 Arm Limited or its affiliates and Contributors.
All rights reserved.
SPDX-License-Identifier: Apache-2.0
-->
### Description
- Streams implementation had some bugs in that the browsing would stop
before reaching the end.
- Added a lot more specific stream tests



### Test Coverage

<!--
Please put an `x` in the correct box e.g. `[x]` to indicate the testing
coverage of this change.
-->

- [x]  This change is covered by existing or additional automated tests.
- [ ] Manual testing has been performed (and evidence provided) as
automated testing was not feasible.
- [ ] Additional tests are not required for this change (e.g.
documentation update).
  • Loading branch information
acabarbaye authored Nov 15, 2024
1 parent 10debd4 commit 37c3a62
Show file tree
Hide file tree
Showing 6 changed files with 411 additions and 41 deletions.
1 change: 1 addition & 0 deletions changes/20241114171420.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
:bug: `[pagination]` Fix stream pagination
4 changes: 2 additions & 2 deletions utils/collection/pagination/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ type IIterator interface {
GetNext() (interface{}, error)
}

// IStaticPage defines a generic page for a collection.
// IStaticPage defines a generic page for a collection. A page is marked as static when it cannot retrieve next pages on its own.
type IStaticPage interface {
// HasNext states whether more pages are accessible.
HasNext() bool
Expand Down Expand Up @@ -78,7 +78,7 @@ type IPaginatorAndPageFetcher interface {
FetchNextPage(ctx context.Context, currentPage IStaticPage) (IStaticPage, error)
}

// IGenericStreamPaginator is an iterator over a stream. A stream is a collection without any know ending.
// IGenericStreamPaginator is an iterator over a stream. A stream is a collection without any known ending.
type IGenericStreamPaginator interface {
IGenericPaginator
// DryUp indicates to the stream that it will soon run out.
Expand Down
45 changes: 23 additions & 22 deletions utils/collection/pagination/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,25 +34,29 @@ func (s *AbstractStreamPaginator) Close() error {
}

func (s *AbstractStreamPaginator) HasNext() bool {
if s.AbstractPaginator.HasNext() {
s.timeReachLast.Store(time.Now())
return true
}
page, err := s.AbstractPaginator.FetchCurrentPage()
if err != nil {
return false
}
stream, ok := page.(IStaticPageStream)
if !ok {
return false
}
if !stream.HasFuture() {
return false
}
if s.IsRunningDry() {
if time.Since(s.timeReachLast.Load()) >= s.timeOut {
for {
if s.AbstractPaginator.HasNext() {
s.timeReachLast.Store(time.Now())
return true
}
page, err := s.AbstractPaginator.FetchCurrentPage()
if err != nil {
return false
}
stream, ok := page.(IStaticPageStream)
if !ok {
return false
}
if !stream.HasFuture() {
return false
}
if s.IsRunningDry() {
if time.Since(s.timeReachLast.Load()) >= s.timeOut {
return false
}
} else {
s.timeReachLast.Store(time.Now())
}
future, err := s.FetchFuturePage(s.GetContext(), stream)
if err != nil {
return false
Expand All @@ -61,10 +65,9 @@ func (s *AbstractStreamPaginator) HasNext() bool {
if err != nil {
return false
}
} else {
s.timeReachLast.Store(time.Now())

parallelisation.SleepWithContext(s.GetContext(), s.backoff)
}
return s.AbstractPaginator.HasNext()
}

func (s *AbstractStreamPaginator) GetNext() (interface{}, error) {
Expand All @@ -78,9 +81,7 @@ func (s *AbstractStreamPaginator) GetNext() (interface{}, error) {
err = fmt.Errorf("%w: there is not any next item", commonerrors.ErrNotFound)
return nil, err
}

parallelisation.SleepWithContext(s.GetContext(), s.backoff)

}
}

Expand Down
153 changes: 153 additions & 0 deletions utils/collection/pagination/stream_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
package pagination

import (
"context"
"fmt"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/ARM-software/golang-utils/utils/commonerrors"
"github.com/ARM-software/golang-utils/utils/commonerrors/errortest"
)

func TestStreamPaginator(t *testing.T) {
tests := []struct {
paginator func(context.Context, IStaticPageStream) (IGenericStreamPaginator, error)
name string
generateFunc func() (firstPage IStream, itemTotal int64, err error)
dryOut bool
}{
{
paginator: func(ctx context.Context, collection IStaticPageStream) (IGenericStreamPaginator, error) {
paginator, err := NewStaticPageStreamPaginator(ctx, time.Second, 10*time.Millisecond, func(context.Context) (IStaticPageStream, error) {
return collection, nil
}, func(fCtx context.Context, current IStaticPage) (IStaticPage, error) {
c, err := toDynamicPage(current)
if err != nil {
return nil, err
}
return c.GetNext(fCtx)
}, func(fCtx context.Context, current IStaticPageStream) (IStaticPageStream, error) {
s, err := toDynamicStream(current)
if err != nil {
return nil, err
}
return s.GetFuture(fCtx)
})
return paginator, err
},
generateFunc: GenerateMockStreamWithEnding,
name: "stream paginator over a stream of static pages with known ending",
},
{
paginator: func(ctx context.Context, collection IStaticPageStream) (IGenericStreamPaginator, error) {
paginator, err := NewStreamPaginator(ctx, time.Second, 10*time.Millisecond, func(context.Context) (IStream, error) {
return toDynamicStream(collection)
})
return paginator, err
},
generateFunc: GenerateMockStreamWithEnding,
name: "stream paginator over a stream of dynamic pages but with a known ending",
},
{
paginator: func(ctx context.Context, collection IStaticPageStream) (IGenericStreamPaginator, error) {
paginator, err := NewStreamPaginator(ctx, time.Second, 10*time.Millisecond, func(context.Context) (IStream, error) {
return toDynamicStream(collection)
})
return paginator, err
},
name: "stream paginator over a running dry stream of dynamic pages",
generateFunc: GenerateMockStream,
dryOut: true,
},
{
paginator: func(ctx context.Context, collection IStaticPageStream) (IGenericStreamPaginator, error) {
paginator, err := NewStaticPageStreamPaginator(ctx, time.Second, 10*time.Millisecond, func(context.Context) (IStaticPageStream, error) {
return collection, nil
}, func(fCtx context.Context, current IStaticPage) (IStaticPage, error) {
c, err := toDynamicPage(current)
if err != nil {
return nil, err
}
return c.GetNext(fCtx)
}, func(fCtx context.Context, current IStaticPageStream) (IStaticPageStream, error) {
s, err := toDynamicStream(current)
if err != nil {
return nil, err
}
return s.GetFuture(fCtx)
})
if paginator != nil {
// Indicate the stream will run out.
err = paginator.DryUp()
}
return paginator, err
},
name: "stream paginator over a running dry stream of static pages",
generateFunc: GenerateMockStream,
dryOut: true,
},
}

for te := range tests {
test := tests[te]
for i := 0; i < 10; i++ {
mockPages, expectedCount, err := test.generateFunc()
require.NoError(t, err)
t.Run(fmt.Sprintf("%v-#%v-[%v items]", test.name, i, expectedCount), func(t *testing.T) {
paginator, err := test.paginator(context.TODO(), mockPages)
require.NoError(t, err)
count := int64(0)
for {
if !paginator.HasNext() {
break
}
count += 1
item, err := paginator.GetNext()
require.NoError(t, err)
require.NotNil(t, item)
mockItem, ok := item.(*MockItem)
require.True(t, ok)
assert.Equal(t, int(count-1), mockItem.Index)
if count >= expectedCount%2 {
require.NoError(t, paginator.DryUp())
}
}
assert.Equal(t, expectedCount, count)
})
}
}
}

func TestEmptyStream(t *testing.T) {
mockPages, expectedCount, err := GenerateMockEmptyStream()
require.NoError(t, err)
require.Zero(t, expectedCount)
require.NotNil(t, mockPages)
paginator, err := NewStreamPaginator(context.Background(), time.Second, 10*time.Millisecond, func(context.Context) (IStream, error) {
return toDynamicStream(mockPages)
})
require.NoError(t, err)
assert.False(t, paginator.HasNext())
assert.False(t, paginator.IsRunningDry())
item, err := paginator.GetNext()
errortest.AssertError(t, err, commonerrors.ErrNotFound)
assert.Nil(t, item)
}

func TestDryOutStream(t *testing.T) {
mockPages, expectedCount, err := GenerateMockEmptyStream()
require.NoError(t, err)
require.Zero(t, expectedCount)
require.NotNil(t, mockPages)
paginator, err := NewStreamPaginator(context.Background(), time.Millisecond, 10*time.Millisecond, func(context.Context) (IStream, error) {
return toDynamicStream(mockPages)
})
require.NoError(t, err)
require.NoError(t, paginator.DryUp())
assert.False(t, paginator.HasNext())
assert.True(t, paginator.IsRunningDry())
}
Loading

0 comments on commit 37c3a62

Please sign in to comment.