Skip to content

Commit ba9caf0

Browse files
committed
Shared page cache.
1 parent 2c167dd commit ba9caf0

File tree

3 files changed

+100
-8
lines changed

3 files changed

+100
-8
lines changed

litestream/api.go

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,13 @@ import (
1010
"github.com/ncruces/go-sqlite3/vfs"
1111
)
1212

13-
// The default poll interval.
14-
const DefaultPollInterval = 1 * time.Second
13+
const (
14+
// The default poll interval.
15+
DefaultPollInterval = 1 * time.Second
16+
17+
// The default cache size: 10 MiB.
18+
DefaultCacheSize = 10 * 1024 * 1024
19+
)
1520

1621
func init() {
1722
vfs.Register("litestream", liteVFS{})
@@ -27,11 +32,18 @@ var (
2732
type ReplicaOptions struct {
2833
// Where to log error messages. May be nil.
2934
Logger *slog.Logger
30-
// Minimum compaction level to track.
31-
MinLevel int
32-
// Replica poll interval. Must be less than the compaction interval
35+
36+
// Replica poll interval.
37+
// Should be less than the compaction interval
3338
// used by the replica at MinLevel+1.
3439
PollInterval time.Duration
40+
41+
// Minimum compaction level to track.
42+
MinLevel int
43+
44+
// CacheSize is the maximum size of the page cache in bytes.
45+
// Zero means DefaultCacheSize, negative disables caching.
46+
CacheSize int
3547
}
3648

3749
// NewReplica creates a read-replica from a Litestream client.
@@ -44,12 +56,16 @@ func NewReplica(name string, client litestream.ReplicaClient, options ReplicaOpt
4456
if options.PollInterval <= 0 {
4557
options.PollInterval = DefaultPollInterval
4658
}
59+
if options.CacheSize == 0 {
60+
options.CacheSize = DefaultCacheSize
61+
}
4762

4863
liteMtx.Lock()
4964
defer liteMtx.Unlock()
5065
liteDBs[name] = &liteDB{
5166
client: client,
52-
opts: &options,
67+
opts: options,
68+
cache: pageCache{size: options.CacheSize},
5369
}
5470
}
5571

litestream/cache.go

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
package litestream
2+
3+
import (
4+
"encoding/binary"
5+
"sync"
6+
7+
"golang.org/x/sync/singleflight"
8+
9+
"github.com/superfly/ltx"
10+
)
11+
12+
type pageCache struct {
13+
single singleflight.Group
14+
pages map[uint32]cachedPage // +checklocks:mtx
15+
size int
16+
mtx sync.Mutex
17+
}
18+
19+
type cachedPage struct {
20+
data []byte
21+
txid ltx.TXID
22+
}
23+
24+
func (c *pageCache) getOrFetch(pgno uint32, maxTXID ltx.TXID, fetch func() (any, error)) ([]byte, error) {
25+
if c.size >= 0 {
26+
c.mtx.Lock()
27+
if c.pages == nil {
28+
c.pages = map[uint32]cachedPage{}
29+
}
30+
page := c.pages[pgno]
31+
c.mtx.Unlock()
32+
33+
if page.txid == maxTXID {
34+
return page.data, nil
35+
}
36+
}
37+
38+
var key [12]byte
39+
binary.LittleEndian.PutUint32(key[0:], pgno)
40+
binary.LittleEndian.PutUint64(key[4:], uint64(maxTXID))
41+
v, err, _ := c.single.Do(string(key[:]), fetch)
42+
43+
if err != nil {
44+
return nil, err
45+
}
46+
47+
page := cachedPage{v.([]byte), maxTXID}
48+
if c.size >= 0 {
49+
c.mtx.Lock()
50+
c.evict(len(page.data))
51+
c.pages[pgno] = page
52+
c.mtx.Unlock()
53+
}
54+
return page.data, nil
55+
}
56+
57+
// +checklocks:c.mtx
58+
func (c *pageCache) evict(pageSize int) {
59+
// Evict random keys until we're under the maximum size.
60+
// SQLite has its own page cache, which it will use for each connection.
61+
// Since this is a second layer of shared cache,
62+
// random eviction is probably good enough.
63+
if pageSize*len(c.pages) < c.size {
64+
return
65+
}
66+
for key := range c.pages {
67+
delete(c.pages, key)
68+
if pageSize*len(c.pages) < c.size {
69+
return
70+
}
71+
}
72+
}

litestream/vfs.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,10 @@ func (f *liteFile) ReadAt(p []byte, off int64) (n int, err error) {
8686
return 0, io.EOF
8787
}
8888

89-
_, data, err := litestream.FetchPage(ctx, f.db.client, elem.Level, elem.MinTXID, elem.MaxTXID, elem.Offset, elem.Size)
89+
data, err := f.db.cache.getOrFetch(pgno, elem.MaxTXID, func() (any, error) {
90+
_, data, err := litestream.FetchPage(ctx, f.db.client, elem.Level, elem.MinTXID, elem.MaxTXID, elem.Offset, elem.Size)
91+
return data, err
92+
})
9093
if err != nil {
9194
f.db.opts.Logger.Error("fetch page", "error", err)
9295
return 0, err
@@ -169,7 +172,8 @@ func (f *liteFile) context() context.Context {
169172

170173
type liteDB struct {
171174
client litestream.ReplicaClient
172-
opts *ReplicaOptions
175+
opts ReplicaOptions
176+
cache pageCache
173177
pages *pageIndex // +checklocks:mtx
174178
lastPoll time.Time // +checklocks:mtx
175179
txids levelTXIDs // +checklocks:mtx

0 commit comments

Comments
 (0)