Skip to content

Commit bfd1420

Browse files
authored
Merge pull request #1046 from dolthub/zachmu/index-pushdown
Introduced OrderedIndex interface to deal with indexes that don't return ordered results
2 parents e6d4101 + 2a56fe9 commit bfd1420

File tree

7 files changed

+47
-24
lines changed

7 files changed

+47
-24
lines changed

enginetest/evaluation.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ func RunQuery(t *testing.T, e *sqle.Engine, harness Harness, query string) {
4545
func RunQueryWithContext(t *testing.T, e *sqle.Engine, harness Harness, ctx *sql.Context, query string) {
4646
ctx = ctx.WithQuery(query)
4747
sch, iter, err := e.Query(ctx, query)
48-
require.NoError(t, err)
48+
require.NoError(t, err, "error running query %s: %v", query, err)
4949
_, err = sql.RowIterToRows(ctx, sch, iter)
5050
require.NoError(t, err)
5151
validateEngine(t, ctx, harness, e)

memory/index.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ type Index struct {
3838

3939
var _ sql.Index = (*Index)(nil)
4040
var _ sql.FilteredIndex = (*Index)(nil)
41+
var _ sql.OrderedIndex = (*Index)(nil)
4142

4243
func (idx *Index) Database() string { return idx.DB }
4344
func (idx *Index) Driver() string { return idx.DriverName }
@@ -232,6 +233,10 @@ func getType(val interface{}) (interface{}, sql.Type) {
232233
}
233234
}
234235

236+
func (idx *Index) Order() sql.IndexOrder {
237+
return sql.IndexOrderAsc
238+
}
239+
235240
func or(expressions ...sql.Expression) sql.Expression {
236241
if len(expressions) == 1 {
237242
return expressions[0]

sql/analyzer/process.go

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,9 @@ func trackProcess(ctx *sql.Context, a *Analyzer, n sql.Node, scope *Scope, sel R
5050
n, _, err := transform.Node(n, func(n sql.Node) (sql.Node, transform.TreeIdentity, error) {
5151
switch n := n.(type) {
5252
case *plan.IndexedTableAccess:
53-
// Only add a process table if ResolvedTable implements ParallelizedIndexAddressableTable
53+
// Only parallelize indexed table accesses if the underlying table supports it
5454
parallelizedTable, ok := n.ResolvedTable.Table.(sql.ParallelizedIndexAddressableTable)
55-
if !ok {
55+
if !ok || !parallelizedTable.ShouldParallelizeAccess() {
5656
return n, transform.SameTree, nil
5757
}
5858

@@ -83,11 +83,7 @@ func trackProcess(ctx *sql.Context, a *Analyzer, n sql.Node, scope *Scope, sel R
8383
}
8484
}
8585

86-
// Wrap with ProcessTable
87-
t := plan.NewProcessTable(parallelizedTable, onPartitionDone, onPartitionStart, onRowNext)
88-
89-
// Replace child
90-
n, err := n.WithTable(t)
86+
n, err := n.WithTable(plan.NewProcessTable(parallelizedTable, onPartitionDone, onPartitionStart, onRowNext))
9187
return n, transform.NewTree, err
9288
case *plan.ResolvedTable:
9389
switch n.Table.(type) {

sql/analyzer/pushdown.go

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -680,21 +680,10 @@ func pushdownIndexesToTable(a *Analyzer, tableNode NameableNode, indexes map[str
680680
if table == nil {
681681
return n, transform.SameTree, nil
682682
}
683-
if indexAddressableTable, ok := table.(sql.IndexAddressableTable); ok {
683+
if _, ok := table.(sql.IndexAddressableTable); ok {
684684
indexLookup, ok := indexes[tableNode.Name()]
685685
if ok {
686686
a.Log("table %q transformed with pushdown of index", tableNode.Name())
687-
688-
// Only pass lookup into resolved table if it's Parallelizable
689-
if _, ok := indexAddressableTable.(sql.ParallelizedIndexAddressableTable); ok {
690-
indexedTable := indexAddressableTable.WithIndexLookup(indexLookup.lookup)
691-
newResolvedTable, err := n.WithTable(indexedTable)
692-
if err != nil {
693-
return nil, transform.SameTree, err
694-
}
695-
return plan.NewStaticIndexedTableAccess(newResolvedTable, indexLookup.lookup, indexLookup.indexes[0], indexLookup.fields), transform.NewTree, nil
696-
697-
}
698687
return plan.NewStaticIndexedTableAccess(n, indexLookup.lookup, indexLookup.indexes[0], indexLookup.fields), transform.NewTree, nil
699688
}
700689
}
@@ -951,6 +940,10 @@ func replacePkSort(ctx *sql.Context, a *Analyzer, n sql.Node, scope *Scope, sel
951940
// Extract primary index
952941
var pkIndex sql.Index
953942
for _, idx := range idxs {
943+
oi, ok := idx.(sql.OrderedIndex)
944+
if !ok || oi.Order() != sql.IndexOrderAsc {
945+
continue
946+
}
954947
if idx.ID() == "PRIMARY" {
955948
pkIndex = idx
956949
break

sql/index.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,13 +51,31 @@ type Index interface {
5151
ColumnExpressionTypes(ctx *Context) []ColumnExpressionType
5252
}
5353

54+
// FilteredIndex is an extension of |Index| that allows an index to declare certain filter predicates handled,
55+
// allowing them to be removed from the overall plan for greater execution efficiency
5456
type FilteredIndex interface {
5557
Index
5658
// HandledFilters returns a subset of |filters| that are satisfied
5759
// by index lookups to this index.
5860
HandledFilters(filters []Expression) (handled []Expression)
5961
}
6062

63+
type IndexOrder byte
64+
65+
const (
66+
IndexOrderNone IndexOrder = iota
67+
IndexOrderAsc
68+
IndexOrderDesc
69+
)
70+
71+
// OrderedIndex is an extension of |Index| that allows indexes to declare their return order. The query engine can
72+
// optimize certain queries if the order of an index is guaranteed, e.g. removing a sort operation.
73+
type OrderedIndex interface {
74+
Index
75+
// Order returns the order of results for reads from this index
76+
Order() IndexOrder
77+
}
78+
6179
// IndexLookup is the implementation-specific definition of an index lookup. The IndexLookup must contain all necessary
6280
// information to retrieve exactly the rows in the table as specified by the ranges given to their parent index.
6381
// Implementors are responsible for all semantics of correctly returning rows that match an index lookup.

sql/plan/exchange.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ func (e *Exchange) RowIter2(ctx *sql.Context, f *sql.RowFrame) (sql.RowIter2, er
173173

174174
func (e *Exchange) String() string {
175175
p := sql.NewTreePrinter()
176-
_ = p.WriteNode("Exchange(parallelism=%d)", e.Parallelism)
176+
_ = p.WriteNode("Exchange")
177177
_ = p.WriteChildren(e.Child.String())
178178
return p.String()
179179
}

sql/plan/indexed_table_access.go

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -305,16 +305,27 @@ func (i *IndexedTableAccess) Partitions(ctx *sql.Context) (sql.PartitionIter, er
305305
return i.ResolvedTable.Partitions(ctx)
306306
}
307307

308+
table := i.baseTable()
309+
return table.Partitions(ctx)
310+
}
311+
312+
// baseTable returns the underlying sql.Table with any static index lookup applied
313+
func (i *IndexedTableAccess) baseTable() sql.Table {
308314
table := i.ResolvedTable.Table
309-
if indexAddressableTable, ok := i.ResolvedTable.Table.(sql.IndexAddressable); ok {
315+
// This won't work if we add another layer of wrapping on top
316+
if tw, ok := table.(sql.TableWrapper); ok {
317+
table = tw.Underlying()
318+
}
319+
320+
if indexAddressableTable, ok := table.(sql.IndexAddressable); ok {
310321
table = indexAddressableTable.WithIndexLookup(i.lookup)
311322
}
312-
return table.Partitions(ctx)
323+
return table
313324
}
314325

315326
// PartitionRows implements sql.Table
316327
func (i *IndexedTableAccess) PartitionRows(ctx *sql.Context, partition sql.Partition) (sql.RowIter, error) {
317-
return i.ResolvedTable.PartitionRows(ctx, partition)
328+
return i.baseTable().PartitionRows(ctx, partition)
318329
}
319330

320331
// GetIndexLookup returns the sql.IndexLookup from an IndexedTableAccess.

0 commit comments

Comments
 (0)