Skip to content

Commit 271266a

Browse files
[FLINK-38731][table] Add support for MULTI_JOIN hint
1 parent 1e0045a commit 271266a

File tree

16 files changed

+558
-39
lines changed

16 files changed

+558
-39
lines changed

docs/content/docs/dev/table/sql/queries/hints.md

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -279,6 +279,49 @@ SELECT /*+ NEST_LOOP(t1) */ * FROM t1 JOIN t2 ON t1.id = t2.id;
279279
SELECT /*+ NEST_LOOP(t1, t3) */ * FROM t1 JOIN t2 ON t1.id = t2.id JOIN t3 ON t1.id = t3.id;
280280
```
281281

282+
#### MULTI_JOIN
283+
284+
{{< label Streaming >}}
285+
286+
`MULTI_JOIN` suggests that Flink uses the `MultiJoin operator` to process multiple regular joins simultaneously. This type of join hint is recommended when you have multiple joins that share at least one common join key and experience large intermediate state or record amplification. The MultiJoin operator eliminates intermediate state by processing joins across various input streams simultaneously, which can significantly reduce state size and improve performance in some cases.
287+
288+
For more details on the MultiJoin operator, including when to use it and configuration options, see [Multiple Regular Joins]({{< ref "docs/dev/table/tuning" >}}#multiple-regular-joins).
289+
290+
{{< hint info >}}
291+
Note:
292+
- The MULTI_JOIN hint can specify table names or table aliases. If a table has an alias, the hint must use the alias name.
293+
- At least one key must be shared between the join conditions for the MultiJoin operator to be applied.
294+
- When specified, the MULTI_JOIN hint applies to the tables listed in the hint within the current query block.
295+
{{< /hint >}}
296+
297+
##### Examples
298+
299+
```sql
300+
CREATE TABLE t1 (id BIGINT, name STRING, age INT) WITH (...);
301+
CREATE TABLE t2 (id BIGINT, name STRING, age INT) WITH (...);
302+
CREATE TABLE t3 (id BIGINT, name STRING, age INT) WITH (...);
303+
304+
-- Flink will use the MultiJoin operator for the three-way join.
305+
SELECT /*+ MULTI_JOIN(t1, t2, t3) */ * FROM t1
306+
JOIN t2 ON t1.id = t2.id
307+
JOIN t3 ON t1.id = t3.id;
308+
309+
-- Using table names instead of aliases.
310+
SELECT /*+ MULTI_JOIN(Users, Orders, Payments) */ * FROM Users
311+
INNER JOIN Orders ON Users.user_id = Orders.user_id
312+
INNER JOIN Payments ON Users.user_id = Payments.user_id;
313+
314+
-- Partial match: only t1 and t2 will use MultiJoin, t3 will use regular join.
315+
SELECT /*+ MULTI_JOIN(t1, t2) */ * FROM t1
316+
JOIN t2 ON t1.id = t2.id
317+
JOIN t3 ON t1.id = t3.id;
318+
319+
-- Combining MULTI_JOIN with STATE_TTL hint.
320+
SELECT /*+ MULTI_JOIN(t1, t2, t3), STATE_TTL('t1'='1d', 't2'='2d', 't3'='12h') */ * FROM t1
321+
JOIN t2 ON t1.id = t2.id
322+
JOIN t3 ON t1.id = t3.id;
323+
```
324+
282325
#### LOOKUP
283326

284327
{{< label Streaming >}}

docs/content/docs/dev/table/tuning.md

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -307,9 +307,9 @@ MiniBatch optimization is disabled by default for regular join. In order to enab
307307

308308
{{< label Streaming >}}
309309

310-
Streaming Flink jobs with multiple non-temporal regular joins often experience operational instability and performance degradation due to large state sizes. This is often because the intermediate state created by a chain of joins is much larger than the input state itself. In Flink 2.1, we introduce a new multi-join operator, an optimization designed to significantly reduce state size and improve performance for join pipelines that involve record amplification and large intermediate state. This new operator eliminates the need to store intermediate state for joins across multiple tables by processing joins across various input streams simultaneously. This "zero intermediate state" approach primarily targets state reduction, offering substantial benefits in resource consumption and operational stability.
310+
Streaming Flink jobs with multiple non-temporal regular joins often experience operational instability and performance degradation due to large state sizes. This is often because the intermediate state created by a chain of joins is much larger than the input state itself. In Flink 2.1, we introduce a new multi-join operator, an optimization designed to significantly reduce state size and improve performance for join pipelines that involve record amplification and large intermediate state. This new operator eliminates the need to store intermediate state for joins across multiple tables by processing joins across various input streams simultaneously. This "zero intermediate state" approach primarily targets state reduction, offering substantial benefits in resource consumption and operational stability in some cases. This technique exchanges a reduction in storage requirements for a corresponding increase in computational effort, as intermediate states are re-evaluated upon necessity.
311311

312-
In most joins, a significant portion of processing time is spent fetching records from the state. The efficiency of the MultiJoin operator largely depends on the size of this intermediate state. In a common scenario where a pipeline experiences record amplification—meaning each join produces more data and records than the previous one, the MultiJoin operator is more efficient. This is because it keeps the state on which the operator interacts much smaller, leading to a more stable operator. If a chain of joins actually produces less state than the original records, the MultiJoin operator will still use less state overall. However, in this specific case, binary joins might perform better because the state that the final joins need to operate on is smaller.
312+
In most joins, a significant portion of processing time is spent fetching records from the state. The efficiency of the MultiJoin operator largely depends on the size of this intermediate state and the selectivity of the common join key(s). In a common scenario where a pipeline experiences record amplification—meaning each join produces more data and records than the previous one, the MultiJoin operator is more efficient. This is because it keeps the state on which the operator interacts much smaller, leading to a more stable operator. If a chain of joins actually produces less state than the original records, the MultiJoin operator will still use less state overall. However, in this specific case, binary joins might perform better because the state that the final joins need to operate on is smaller.
313313

314314
### The MultiJoin Operator
315315
The main benefits of the MultiJoin operator are:
@@ -318,21 +318,39 @@ The main benefits of the MultiJoin operator are:
318318
2) Improved performance for chained joins with record amplification.
319319
3) Improved stability: linear state growth with amount of records processed, instead of polynomial growth with binary joins.
320320

321-
Also, pipelines with MultiJoin instead of binary joins usually have faster initialization and recovery times due to smaller state and fewer amount of nodes.
321+
Also, pipelines with MultiJoin instead of binary joins usually have faster initialization and recovery times due to smaller state and fewer nodes.
322322

323323
### When to enable the MultiJoin?
324324

325-
If your job has multiple joins that share at least one common join key, and you observe that the intermediate state in the intermediate joins is larger than the inputs sources, consider enabling the MultiJoin operator.
325+
If your job has multiple joins that share at least one common join key, and you observe that the intermediate state in the intermediate joins is larger than the input sources, consider enabling the MultiJoin operator.
326+
327+
Recommended use cases:
328+
- The common join key(s) have a high selectivity (the number of records per key is small)
329+
- Statement with several chained joins and considerable intermediate state
330+
- No considerable data skew on the common join key(s)
331+
- Joins are generating large state (state 50+ GB)
332+
333+
If your common join key(s) exhibit low selectivity (i.e., a high number of rows sharing the same key value), the MultiJoin operator's required recomputation of the intermediate state can severely impact performance. In such scenarios, binary joins are recommended, as these will partition the data using all join keys.
326334

327335
### How to enable the MultiJoin?
328336

329-
To enable this optimization, set the following configuration
337+
To enable this optimization globally for all eligible joins, set the following configuration:
330338

331339
```sql
332340
SET 'table.optimizer.multi-join.enabled' = 'true';
333341
```
334342

335-
Important: This is currently in an experimental state - there are open optimizations and breaking changes might be implemented in this version. We currently support only streaming INNER/LEFT joins. Support for RIGHT joins will be added soon. Due to records partitioning, you need at least one key that is shared between the join conditions, see:
343+
Alternatively, you can enable the MultiJoin operator for specific tables using the `MULTI_JOIN` hint:
344+
345+
```sql
346+
SELECT /*+ MULTI_JOIN(t1, t2, t3) */ * FROM t1
347+
JOIN t2 ON t1.id = t2.id
348+
JOIN t3 ON t1.id = t3.id;
349+
```
350+
351+
The hint approach allows you to selectively apply the MultiJoin optimization to specific query blocks without enabling it globally. You can specify either table names or table aliases in the hint. For more details on the MULTI_JOIN hint, see [Join Hints]({{< ref "docs/dev/table/sql/queries/hints" >}}#multi_join).
352+
353+
Important: This is currently in an experimental state - optimizations and breaking changes might be implemented. We currently support only streaming INNER/LEFT joins. Due to records partitioning, you need at least one key that is shared between the join conditions, see:
336354

337355
- Supported: A JOIN B ON A.key = B.key JOIN C ON A.key = C.key (Partition by key)
338356
- Supported: A JOIN B ON A.key = B.key JOIN C ON B.key = C.key (Partition by key via transitivity)
@@ -349,9 +367,9 @@ For this 10-way join above, involving record amplification, we've observed signi
349367
- Performance: 2x to over 100x+ increase in processed records when both at 100% busyness.
350368
- State Size: 3x to over 1000x+ smaller as intermediate state grows.
351369

352-
The total state is always smaller with the MultiJoin operator. In this case, the performance is initially the same, but as the intermediate state grows, the performance of binary joins degrade and the multi join remains stable and outperforms.
370+
The total state is always smaller with the MultiJoin operator. In this case, the performance is initially the same, but as the intermediate state grows, the performance of binary joins degrades and the multi join remains stable and outperforms.
353371

354-
This general benchmark for the 10-way join was run with the following configuration: 10 upsert kafka topics, 10 parallelism, 1 record per second per topic. We used rocksdb with unaligned checkpoints and with incremental checkpoints. Each job ran in one TaskManager containing 8GB process memory, 1GB off-heap memory and 20% network memory. The JobManager had 4GB process memory. The host machine contained a M1 processor chip, 32GB RAM and 1TB SSD. The sink uses a blackhole connector so we only benchmark the joins. The SQL used to generate the benchmark data had this structure:
372+
This general benchmark for the 10-way join was run with the following configuration: 1 record per tenant_id (high selectivity), 10 upsert kafka topics, 10 parallelism, 1 record per second per topic. We used rocksdb with unaligned checkpoints and with incremental checkpoints. Each job ran in one TaskManager containing 8GB process memory, 1GB off-heap memory and 20% network memory. The JobManager had 4GB process memory. The host machine contained a M1 processor chip, 32GB RAM and 1TB SSD. The sink uses a blackhole connector so we only benchmark the joins. The SQL used to generate the benchmark data had this structure:
355373

356374
```sql
357375
INSERT INTO JoinResultsMJ

flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/FlinkHintStrategies.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,11 @@ public static HintStrategyTable createHintStrategyTable() {
121121
HintPredicates.CORRELATE, HintPredicates.JOIN))
122122
.optionChecker(LOOKUP_NON_EMPTY_KV_OPTION_CHECKER)
123123
.build())
124+
.hintStrategy(
125+
JoinStrategy.MULTI_JOIN.getJoinHintName(),
126+
HintStrategy.builder(HintPredicates.JOIN)
127+
.optionChecker(NON_EMPTY_LIST_OPTION_CHECKER)
128+
.build())
124129
.hintStrategy(
125130
StateTtlHint.STATE_TTL.getHintName(),
126131
HintStrategy.builder(

flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/JoinStrategy.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,14 @@ public enum JoinStrategy {
4848
NEST_LOOP("NEST_LOOP"),
4949

5050
/** Instructs the optimizer to use lookup join strategy. Only accept key-value hint options. */
51-
LOOKUP("LOOKUP");
51+
LOOKUP("LOOKUP"),
52+
53+
/**
54+
* Instructs the optimizer to use multi-way join strategy for streaming queries. This hint
55+
* allows specifying multiple tables to be joined together in a single {@link
56+
* org.apache.flink.table.runtime.operators.join.stream.StreamingMultiJoinOperator}.
57+
*/
58+
MULTI_JOIN("MULTI_JOIN");
5259

5360
private final String joinHintName;
5461

@@ -83,6 +90,8 @@ public static boolean validOptions(String hintName, List<String> options) {
8390
return options.size() > 0;
8491
case LOOKUP:
8592
return null == options || options.size() == 0;
93+
case MULTI_JOIN:
94+
return options.size() > 0;
8695
}
8796
return false;
8897
}

flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/JoinToMultiJoinRule.java

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,16 @@
1818

1919
package org.apache.flink.table.planner.plan.rules.logical;
2020

21+
import org.apache.flink.table.api.TableConfig;
2122
import org.apache.flink.table.api.TableException;
23+
import org.apache.flink.table.api.config.OptimizerConfigOptions;
2224
import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
2325
import org.apache.flink.table.planner.hint.FlinkHints;
26+
import org.apache.flink.table.planner.hint.JoinStrategy;
2427
import org.apache.flink.table.planner.hint.StateTtlHint;
2528
import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalMultiJoin;
2629
import org.apache.flink.table.planner.plan.utils.IntervalJoinUtil;
30+
import org.apache.flink.table.planner.utils.ShortcutUtils;
2731
import org.apache.flink.table.runtime.operators.join.stream.keyselector.AttributeBasedJoinKeyExtractor;
2832
import org.apache.flink.table.runtime.operators.join.stream.keyselector.JoinKeyExtractor;
2933
import org.apache.flink.table.types.logical.RowType;
@@ -170,7 +174,41 @@ public boolean matches(RelOptRuleCall call) {
170174
return false;
171175
}
172176

173-
return origJoin.getJoinType().projectsRight();
177+
if (!origJoin.getJoinType().projectsRight()) {
178+
return false;
179+
}
180+
181+
// Enable multi-join if either config is enabled OR MULTI_JOIN hint is present
182+
return isEnabledViaConfig(origJoin) || hasMultiJoinHint(origJoin);
183+
}
184+
185+
/**
186+
* Checks if multi-join optimization is enabled via configuration.
187+
*
188+
* @param join the join node
189+
* @return true if TABLE_OPTIMIZER_MULTI_JOIN_ENABLED is set to true
190+
*/
191+
private boolean isEnabledViaConfig(Join join) {
192+
final TableConfig tableConfig = ShortcutUtils.unwrapTableConfig(join);
193+
return tableConfig.get(OptimizerConfigOptions.TABLE_OPTIMIZER_MULTI_JOIN_ENABLED);
194+
}
195+
196+
/**
197+
* Checks if the MULTI_JOIN hint is present on the join node.
198+
*
199+
* <p>Note: By the time this rule sees the join, the QueryHintsResolver has already validated
200+
* the hint. If the hint is present with valid options, it means both sides of this join were
201+
* mentioned in the original hint and have been validated.
202+
*
203+
* @param join the join node
204+
* @return true if MULTI_JOIN hint is present and valid
205+
*/
206+
private boolean hasMultiJoinHint(Join join) {
207+
return join.getHints().stream()
208+
.anyMatch(
209+
hint ->
210+
JoinStrategy.MULTI_JOIN.getJoinHintName().equals(hint.hintName)
211+
&& !hint.listOptions.isEmpty());
174212
}
175213

176214
@Override

flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkStreamProgram.scala

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -233,22 +233,20 @@ object FlinkStreamProgram {
233233
}
234234

235235
// multi-join
236-
if (tableConfig.get(OptimizerConfigOptions.TABLE_OPTIMIZER_MULTI_JOIN_ENABLED)) {
237-
chainedProgram.addLast(
238-
MULTI_JOIN,
239-
FlinkGroupProgramBuilder
240-
.newBuilder[StreamOptimizeContext]
241-
.addProgram(
242-
FlinkHepRuleSetProgramBuilder.newBuilder
243-
.setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_COLLECTION)
244-
.setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
245-
.add(FlinkStreamRuleSets.MULTI_JOIN_RULES)
246-
.build(),
247-
"merge binary regular joins into MultiJoin"
248-
)
249-
.build()
250-
)
251-
}
236+
chainedProgram.addLast(
237+
MULTI_JOIN,
238+
FlinkGroupProgramBuilder
239+
.newBuilder[StreamOptimizeContext]
240+
.addProgram(
241+
FlinkHepRuleSetProgramBuilder.newBuilder
242+
.setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_COLLECTION)
243+
.setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
244+
.add(FlinkStreamRuleSets.MULTI_JOIN_RULES)
245+
.build(),
246+
"merge binary regular joins into MultiJoin"
247+
)
248+
.build()
249+
)
252250

253251
// project rewrite
254252
chainedProgram.addLast(

flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MultiJoinSemanticTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ public List<TableTestProgram> programs() {
5454
MultiJoinTestPrograms.MULTI_JOIN_WITH_TIME_ATTRIBUTES_IN_CONDITIONS_MATERIALIZATION,
5555
MultiJoinTestPrograms.MULTI_JOIN_TWO_WAY_INNER_JOIN_WITH_WHERE_IN,
5656
MultiJoinTestPrograms.MULTI_JOIN_THREE_WAY_INNER_JOIN_MULTI_KEY_TYPES,
57-
MultiJoinTestPrograms.MULTI_JOIN_FOUR_WAY_MIXED_JOIN_MULTI_KEY_TYPES_SHUFFLED);
57+
MultiJoinTestPrograms.MULTI_JOIN_FOUR_WAY_MIXED_JOIN_MULTI_KEY_TYPES_SHUFFLED,
58+
MultiJoinTestPrograms.MULTI_JOIN_THREE_WAY_INNER_JOIN_WITH_HINT);
5859
}
5960
}

flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MultiJoinTestPrograms.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1823,4 +1823,34 @@ public class MultiJoinTestPrograms {
18231823
+ "LEFT JOIN Shipments4K AS S ON U.k3 = S.k3 AND U.k2 > 150 AND U.k4 = S.k4 "
18241824
+ "WHERE U.k2 > 50")
18251825
.build();
1826+
1827+
public static final TableTestProgram MULTI_JOIN_THREE_WAY_INNER_JOIN_WITH_HINT =
1828+
TableTestProgram.of(
1829+
"three-way-inner-join-with-hint",
1830+
"three way inner join using MULTI_JOIN hint")
1831+
.setupConfig(OptimizerConfigOptions.TABLE_OPTIMIZER_MULTI_JOIN_ENABLED, false)
1832+
.setupTableSource(USERS_SOURCE)
1833+
.setupTableSource(ORDERS_SOURCE)
1834+
.setupTableSource(PAYMENTS_SOURCE)
1835+
.setupTableSink(
1836+
SinkTestStep.newBuilder("sink")
1837+
.addSchema(
1838+
"user_id STRING",
1839+
"name STRING",
1840+
"order_id STRING",
1841+
"payment_id STRING")
1842+
.consumedValues(
1843+
"+I[1, Gus, order1, payment1]",
1844+
"+I[2, Bob, order2, payment2]",
1845+
"+I[2, Bob, order3, payment2]",
1846+
"+I[1, Gus, order1, payment3]")
1847+
.testMaterializedData()
1848+
.build())
1849+
.runSql(
1850+
"INSERT INTO sink "
1851+
+ "SELECT /*+ MULTI_JOIN(u, o, p) */ u.user_id, u.name, o.order_id, p.payment_id "
1852+
+ "FROM Users u "
1853+
+ "INNER JOIN Orders o ON u.user_id = o.user_id "
1854+
+ "INNER JOIN Payments p ON u.user_id = p.user_id")
1855+
.build();
18261856
}

0 commit comments

Comments
 (0)