Skip to content

Commit 2c167dd

Browse files
committed
Avoid polling intermediate levels.
1 parent ce0da89 commit 2c167dd

File tree

4 files changed

+88
-42
lines changed

4 files changed

+88
-42
lines changed

litestream/api.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@ func NewReplica(name string, client litestream.ReplicaClient, options ReplicaOpt
4444
if options.PollInterval <= 0 {
4545
options.PollInterval = DefaultPollInterval
4646
}
47-
options.MinLevel = max(0, min(options.MinLevel, litestream.SnapshotLevel))
4847

4948
liteMtx.Lock()
5049
defer liteMtx.Unlock()

litestream/example_test.go

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package litestream_test
2+
3+
import (
4+
"log"
5+
"time"
6+
7+
"github.com/benbjohnson/litestream/s3"
8+
"github.com/ncruces/go-sqlite3/driver"
9+
_ "github.com/ncruces/go-sqlite3/embed"
10+
"github.com/ncruces/go-sqlite3/litestream"
11+
)
12+
13+
func ExampleNewReplica() {
14+
client := s3.NewReplicaClient()
15+
client.Bucket = "test-bucket"
16+
client.Path = "fruits.db"
17+
18+
litestream.NewReplica("fruits.db", client, litestream.ReplicaOptions{
19+
PollInterval: 5 * time.Second,
20+
})
21+
22+
db, err := driver.Open("file:fruits.db?vfs=litestream")
23+
if err != nil {
24+
log.Fatalln(err)
25+
}
26+
defer db.Close()
27+
28+
for {
29+
time.Sleep(time.Second)
30+
rows, err := db.Query("SELECT * FROM fruits")
31+
if err != nil {
32+
log.Fatalln(err)
33+
}
34+
35+
for rows.Next() {
36+
var name, color string
37+
err := rows.Scan(&name, &color)
38+
if err != nil {
39+
log.Fatalln(err)
40+
}
41+
log.Println(name, color)
42+
}
43+
44+
log.Println("===")
45+
rows.Close()
46+
}
47+
}

litestream/vfs.go

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -214,10 +214,7 @@ func (f *liteDB) pollReplica(ctx context.Context) (*pageIndex, ltx.TXID, error)
214214
return f.pages, f.txids[0], nil
215215
}
216216

217-
// Updating from MinLevel to SnapshotLevel is non-racy,
218-
// since LTX files are compacted into higher levels
219-
// before the lower level LTX files are deleted.
220-
for level := f.opts.MinLevel; level <= litestream.SnapshotLevel; level++ {
217+
for level := range pollLevels(f.opts.MinLevel) {
221218
if err := f.updateLevel(ctx, level); err != nil {
222219
f.opts.Logger.Error("cannot poll replica", "error", err)
223220
return nil, 0, err
@@ -285,6 +282,23 @@ func (f *liteDB) updateInfo(ctx context.Context, info *ltx.FileInfo) error {
285282
return nil
286283
}
287284

285+
func pollLevels(minLevel int) (r []int) {
286+
// Updating from lower to upper levels is non-racy,
287+
// since LTX files are compacted into higher levels
288+
// before the lower level LTX files are deleted.
289+
290+
// Also, only level 0 compactions and snapshots delete files,
291+
// so the intermediate levels never need to be updated.
292+
293+
if minLevel <= 0 {
294+
return append(r, 0, 1, litestream.SnapshotLevel)
295+
}
296+
if minLevel >= litestream.SnapshotLevel {
297+
return append(r, litestream.SnapshotLevel)
298+
}
299+
return append(r, minLevel, litestream.SnapshotLevel)
300+
}
301+
288302
// Type aliases; these are a mouthful.
289303
type pageIndex = wbt.Tree[uint32, ltx.PageIndexElem]
290304
type levelTXIDs = [litestream.SnapshotLevel + 1]ltx.TXID

litestream/vfs_test.go

Lines changed: 23 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,47 +1,33 @@
1-
package litestream_test
1+
package litestream
22

33
import (
4-
"log"
5-
"time"
4+
"slices"
5+
"strconv"
6+
"testing"
67

7-
"github.com/benbjohnson/litestream/s3"
8-
"github.com/ncruces/go-sqlite3/driver"
8+
"github.com/benbjohnson/litestream"
99
_ "github.com/ncruces/go-sqlite3/embed"
10-
"github.com/ncruces/go-sqlite3/litestream"
1110
)
1211

13-
func ExampleNewReplica() {
14-
client := s3.NewReplicaClient()
15-
client.Bucket = "test-bucket"
16-
client.Path = "fruits.db"
17-
18-
litestream.NewReplica("fruits.db", client, litestream.ReplicaOptions{
19-
PollInterval: 5 * time.Second,
20-
})
21-
22-
db, err := driver.Open("file:fruits.db?vfs=litestream")
23-
if err != nil {
24-
log.Fatalln(err)
12+
func Test_pollLevels(t *testing.T) {
13+
tests := []struct {
14+
minLevel int
15+
want []int
16+
}{
17+
{minLevel: -1, want: []int{0, 1, litestream.SnapshotLevel}},
18+
{minLevel: 0, want: []int{0, 1, litestream.SnapshotLevel}},
19+
{minLevel: 1, want: []int{1, litestream.SnapshotLevel}},
20+
{minLevel: 2, want: []int{2, litestream.SnapshotLevel}},
21+
{minLevel: 3, want: []int{3, litestream.SnapshotLevel}},
22+
{minLevel: litestream.SnapshotLevel, want: []int{litestream.SnapshotLevel}},
23+
{minLevel: litestream.SnapshotLevel + 1, want: []int{litestream.SnapshotLevel}},
2524
}
26-
defer db.Close()
27-
28-
for {
29-
time.Sleep(time.Second)
30-
rows, err := db.Query("SELECT * FROM fruits")
31-
if err != nil {
32-
log.Fatalln(err)
33-
}
34-
35-
for rows.Next() {
36-
var name, color string
37-
err := rows.Scan(&name, &color)
38-
if err != nil {
39-
log.Fatalln(err)
25+
for _, tt := range tests {
26+
t.Run(strconv.Itoa(tt.minLevel), func(t *testing.T) {
27+
got := pollLevels(tt.minLevel)
28+
if !slices.Equal(got, tt.want) {
29+
t.Errorf("pollLevels() = %v, want %v", got, tt.want)
4030
}
41-
log.Println(name, color)
42-
}
43-
44-
log.Println("===")
45-
rows.Close()
31+
})
4632
}
4733
}

0 commit comments

Comments
 (0)