Skip to content

Commit 37c3a62

Browse files
authored
🐛 [pagination] Fix stream pagination (#518)
<!-- Copyright (C) 2020-2022 Arm Limited or its affiliates and Contributors. All rights reserved. SPDX-License-Identifier: Apache-2.0 --> ### Description - Streams implementation had some bugs in that the browsing would stop before reaching the end. - Added a lot more specific stream tests ### Test Coverage <!-- Please put an `x` in the correct box e.g. `[x]` to indicate the testing coverage of this change. --> - [x] This change is covered by existing or additional automated tests. - [ ] Manual testing has been performed (and evidence provided) as automated testing was not feasible. - [ ] Additional tests are not required for this change (e.g. documentation update).
1 parent 10debd4 commit 37c3a62

File tree

6 files changed

+411
-41
lines changed

6 files changed

+411
-41
lines changed

changes/20241114171420.bugfix

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
:bug: `[pagination]` Fix stream pagination

utils/collection/pagination/interfaces.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ type IIterator interface {
2222
GetNext() (interface{}, error)
2323
}
2424

25-
// IStaticPage defines a generic page for a collection.
25+
// IStaticPage defines a generic page for a collection. A page is marked as static when it cannot retrieve next pages on its own.
2626
type IStaticPage interface {
2727
// HasNext states whether more pages are accessible.
2828
HasNext() bool
@@ -78,7 +78,7 @@ type IPaginatorAndPageFetcher interface {
7878
FetchNextPage(ctx context.Context, currentPage IStaticPage) (IStaticPage, error)
7979
}
8080

81-
// IGenericStreamPaginator is an iterator over a stream. A stream is a collection without any know ending.
81+
// IGenericStreamPaginator is an iterator over a stream. A stream is a collection without any known ending.
8282
type IGenericStreamPaginator interface {
8383
IGenericPaginator
8484
// DryUp indicates to the stream that it will soon run out.

utils/collection/pagination/stream.go

Lines changed: 23 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -34,25 +34,29 @@ func (s *AbstractStreamPaginator) Close() error {
3434
}
3535

3636
func (s *AbstractStreamPaginator) HasNext() bool {
37-
if s.AbstractPaginator.HasNext() {
38-
s.timeReachLast.Store(time.Now())
39-
return true
40-
}
41-
page, err := s.AbstractPaginator.FetchCurrentPage()
42-
if err != nil {
43-
return false
44-
}
45-
stream, ok := page.(IStaticPageStream)
46-
if !ok {
47-
return false
48-
}
49-
if !stream.HasFuture() {
50-
return false
51-
}
52-
if s.IsRunningDry() {
53-
if time.Since(s.timeReachLast.Load()) >= s.timeOut {
37+
for {
38+
if s.AbstractPaginator.HasNext() {
39+
s.timeReachLast.Store(time.Now())
40+
return true
41+
}
42+
page, err := s.AbstractPaginator.FetchCurrentPage()
43+
if err != nil {
5444
return false
5545
}
46+
stream, ok := page.(IStaticPageStream)
47+
if !ok {
48+
return false
49+
}
50+
if !stream.HasFuture() {
51+
return false
52+
}
53+
if s.IsRunningDry() {
54+
if time.Since(s.timeReachLast.Load()) >= s.timeOut {
55+
return false
56+
}
57+
} else {
58+
s.timeReachLast.Store(time.Now())
59+
}
5660
future, err := s.FetchFuturePage(s.GetContext(), stream)
5761
if err != nil {
5862
return false
@@ -61,10 +65,9 @@ func (s *AbstractStreamPaginator) HasNext() bool {
6165
if err != nil {
6266
return false
6367
}
64-
} else {
65-
s.timeReachLast.Store(time.Now())
68+
69+
parallelisation.SleepWithContext(s.GetContext(), s.backoff)
6670
}
67-
return s.AbstractPaginator.HasNext()
6871
}
6972

7073
func (s *AbstractStreamPaginator) GetNext() (interface{}, error) {
@@ -78,9 +81,7 @@ func (s *AbstractStreamPaginator) GetNext() (interface{}, error) {
7881
err = fmt.Errorf("%w: there is not any next item", commonerrors.ErrNotFound)
7982
return nil, err
8083
}
81-
8284
parallelisation.SleepWithContext(s.GetContext(), s.backoff)
83-
8485
}
8586
}
8687

Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
package pagination
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"testing"
7+
"time"
8+
9+
"github.com/stretchr/testify/assert"
10+
"github.com/stretchr/testify/require"
11+
12+
"github.com/ARM-software/golang-utils/utils/commonerrors"
13+
"github.com/ARM-software/golang-utils/utils/commonerrors/errortest"
14+
)
15+
16+
func TestStreamPaginator(t *testing.T) {
17+
tests := []struct {
18+
paginator func(context.Context, IStaticPageStream) (IGenericStreamPaginator, error)
19+
name string
20+
generateFunc func() (firstPage IStream, itemTotal int64, err error)
21+
dryOut bool
22+
}{
23+
{
24+
paginator: func(ctx context.Context, collection IStaticPageStream) (IGenericStreamPaginator, error) {
25+
paginator, err := NewStaticPageStreamPaginator(ctx, time.Second, 10*time.Millisecond, func(context.Context) (IStaticPageStream, error) {
26+
return collection, nil
27+
}, func(fCtx context.Context, current IStaticPage) (IStaticPage, error) {
28+
c, err := toDynamicPage(current)
29+
if err != nil {
30+
return nil, err
31+
}
32+
return c.GetNext(fCtx)
33+
}, func(fCtx context.Context, current IStaticPageStream) (IStaticPageStream, error) {
34+
s, err := toDynamicStream(current)
35+
if err != nil {
36+
return nil, err
37+
}
38+
return s.GetFuture(fCtx)
39+
})
40+
return paginator, err
41+
},
42+
generateFunc: GenerateMockStreamWithEnding,
43+
name: "stream paginator over a stream of static pages with known ending",
44+
},
45+
{
46+
paginator: func(ctx context.Context, collection IStaticPageStream) (IGenericStreamPaginator, error) {
47+
paginator, err := NewStreamPaginator(ctx, time.Second, 10*time.Millisecond, func(context.Context) (IStream, error) {
48+
return toDynamicStream(collection)
49+
})
50+
return paginator, err
51+
},
52+
generateFunc: GenerateMockStreamWithEnding,
53+
name: "stream paginator over a stream of dynamic pages but with a known ending",
54+
},
55+
{
56+
paginator: func(ctx context.Context, collection IStaticPageStream) (IGenericStreamPaginator, error) {
57+
paginator, err := NewStreamPaginator(ctx, time.Second, 10*time.Millisecond, func(context.Context) (IStream, error) {
58+
return toDynamicStream(collection)
59+
})
60+
return paginator, err
61+
},
62+
name: "stream paginator over a running dry stream of dynamic pages",
63+
generateFunc: GenerateMockStream,
64+
dryOut: true,
65+
},
66+
{
67+
paginator: func(ctx context.Context, collection IStaticPageStream) (IGenericStreamPaginator, error) {
68+
paginator, err := NewStaticPageStreamPaginator(ctx, time.Second, 10*time.Millisecond, func(context.Context) (IStaticPageStream, error) {
69+
return collection, nil
70+
}, func(fCtx context.Context, current IStaticPage) (IStaticPage, error) {
71+
c, err := toDynamicPage(current)
72+
if err != nil {
73+
return nil, err
74+
}
75+
return c.GetNext(fCtx)
76+
}, func(fCtx context.Context, current IStaticPageStream) (IStaticPageStream, error) {
77+
s, err := toDynamicStream(current)
78+
if err != nil {
79+
return nil, err
80+
}
81+
return s.GetFuture(fCtx)
82+
})
83+
if paginator != nil {
84+
// Indicate the stream will run out.
85+
err = paginator.DryUp()
86+
}
87+
return paginator, err
88+
},
89+
name: "stream paginator over a running dry stream of static pages",
90+
generateFunc: GenerateMockStream,
91+
dryOut: true,
92+
},
93+
}
94+
95+
for te := range tests {
96+
test := tests[te]
97+
for i := 0; i < 10; i++ {
98+
mockPages, expectedCount, err := test.generateFunc()
99+
require.NoError(t, err)
100+
t.Run(fmt.Sprintf("%v-#%v-[%v items]", test.name, i, expectedCount), func(t *testing.T) {
101+
paginator, err := test.paginator(context.TODO(), mockPages)
102+
require.NoError(t, err)
103+
count := int64(0)
104+
for {
105+
if !paginator.HasNext() {
106+
break
107+
}
108+
count += 1
109+
item, err := paginator.GetNext()
110+
require.NoError(t, err)
111+
require.NotNil(t, item)
112+
mockItem, ok := item.(*MockItem)
113+
require.True(t, ok)
114+
assert.Equal(t, int(count-1), mockItem.Index)
115+
if count >= expectedCount%2 {
116+
require.NoError(t, paginator.DryUp())
117+
}
118+
}
119+
assert.Equal(t, expectedCount, count)
120+
})
121+
}
122+
}
123+
}
124+
125+
func TestEmptyStream(t *testing.T) {
126+
mockPages, expectedCount, err := GenerateMockEmptyStream()
127+
require.NoError(t, err)
128+
require.Zero(t, expectedCount)
129+
require.NotNil(t, mockPages)
130+
paginator, err := NewStreamPaginator(context.Background(), time.Second, 10*time.Millisecond, func(context.Context) (IStream, error) {
131+
return toDynamicStream(mockPages)
132+
})
133+
require.NoError(t, err)
134+
assert.False(t, paginator.HasNext())
135+
assert.False(t, paginator.IsRunningDry())
136+
item, err := paginator.GetNext()
137+
errortest.AssertError(t, err, commonerrors.ErrNotFound)
138+
assert.Nil(t, item)
139+
}
140+
141+
func TestDryOutStream(t *testing.T) {
142+
mockPages, expectedCount, err := GenerateMockEmptyStream()
143+
require.NoError(t, err)
144+
require.Zero(t, expectedCount)
145+
require.NotNil(t, mockPages)
146+
paginator, err := NewStreamPaginator(context.Background(), time.Millisecond, 10*time.Millisecond, func(context.Context) (IStream, error) {
147+
return toDynamicStream(mockPages)
148+
})
149+
require.NoError(t, err)
150+
require.NoError(t, paginator.DryUp())
151+
assert.False(t, paginator.HasNext())
152+
assert.True(t, paginator.IsRunningDry())
153+
}

0 commit comments

Comments
 (0)