Skip to content

Commit ede7c2c

Browse files
committed
[FLINK-38712][table] Move away distribution from table schema
1 parent 855a61c commit ede7c2c

File tree

15 files changed

+254
-73
lines changed

15 files changed

+254
-73
lines changed

flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131
"org.apache.flink.sql.parser.ddl.resource.SqlResource"
3232
"org.apache.flink.sql.parser.ddl.resource.SqlResourceType"
3333
"org.apache.flink.sql.parser.ddl.SqlAddJar"
34+
"org.apache.flink.sql.parser.ddl.SqlAlterDistribution.SqlAddDistribution"
35+
"org.apache.flink.sql.parser.ddl.SqlAlterDistribution.SqlModifyDistribution"
3436
"org.apache.flink.sql.parser.ddl.SqlAddPartitions"
3537
"org.apache.flink.sql.parser.ddl.SqlAddPartitions.AlterTableAddPartitionContext"
3638
"org.apache.flink.sql.parser.ddl.SqlAlterCatalog"

flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -923,6 +923,10 @@ SqlAlterTable SqlAlterTable() :
923923
}
924924
|
925925
(
926+
<DISTRIBUTION>
927+
ctx.distribution = SqlDistribution(getPos())
928+
{return new SqlAddDistribution(getPos(), tableIdentifier, ctx.distribution);}
929+
|
926930
AlterTableAddOrModify(ctx)
927931
|
928932
<LPAREN>
@@ -939,13 +943,16 @@ SqlAlterTable SqlAlterTable() :
939943
new SqlNodeList(ctx.columnPositions, startPos.plus(getPos())),
940944
ctx.constraints,
941945
ctx.watermark,
942-
ctx.distribution,
943946
ifExists);
944947
}
945948
)
946949
|
947950
<MODIFY>
948951
(
952+
<DISTRIBUTION>
953+
ctx.distribution = SqlDistribution(getPos())
954+
{return new SqlModifyDistribution(getPos(), tableIdentifier, ctx.distribution);}
955+
|
949956
AlterTableAddOrModify(ctx)
950957
|
951958
<LPAREN>
@@ -962,10 +969,8 @@ SqlAlterTable SqlAlterTable() :
962969
new SqlNodeList(ctx.columnPositions, startPos.plus(getPos())),
963970
ctx.constraints,
964971
ctx.watermark,
965-
ctx.distribution,
966972
ifExists);
967973
}
968-
969974
|
970975
<DROP>
971976
(
@@ -1254,9 +1259,6 @@ void AlterTableAddOrModify(AlterTableContext context) :
12541259
}
12551260
|
12561261
Watermark(context)
1257-
|
1258-
<DISTRIBUTION>
1259-
context.distribution = SqlDistribution(getPos())
12601262
)
12611263
}
12621264

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.sql.parser.ddl;
20+
21+
import org.apache.calcite.sql.SqlIdentifier;
22+
import org.apache.calcite.sql.SqlNode;
23+
import org.apache.calcite.sql.SqlWriter;
24+
import org.apache.calcite.sql.parser.SqlParserPos;
25+
26+
import javax.annotation.Nonnull;
27+
28+
import java.util.ArrayList;
29+
import java.util.List;
30+
31+
/** ALTER Distribution DDL sql call for tables and materialized tables. */
32+
public abstract class SqlAlterDistribution extends SqlAlterTable {
33+
34+
private final SqlDistribution distribution;
35+
36+
public SqlAlterDistribution(
37+
SqlParserPos pos, SqlIdentifier tableName, SqlDistribution distribution) {
38+
super(pos, tableName, false);
39+
this.distribution = distribution;
40+
}
41+
42+
protected abstract String getAlterOperation();
43+
44+
public SqlDistribution getDistribution() {
45+
return distribution;
46+
}
47+
48+
@Nonnull
49+
@Override
50+
public List<SqlNode> getOperandList() {
51+
List<SqlNode> operands = new ArrayList<>();
52+
operands.add(tableIdentifier);
53+
operands.add(distribution);
54+
return operands;
55+
}
56+
57+
@Override
58+
public void unparseAlterOperation(SqlWriter writer, int leftPrec, int rightPrec) {
59+
super.unparseAlterOperation(writer, leftPrec, rightPrec);
60+
writer.keyword(getAlterOperation());
61+
distribution.unparseAlter(writer, leftPrec, rightPrec);
62+
}
63+
64+
public static class SqlAddDistribution extends SqlAlterDistribution {
65+
66+
public SqlAddDistribution(
67+
SqlParserPos pos, SqlIdentifier tableName, SqlDistribution distribution) {
68+
super(pos, tableName, distribution);
69+
}
70+
71+
@Override
72+
protected String getAlterOperation() {
73+
return "ADD";
74+
}
75+
}
76+
77+
public static class SqlModifyDistribution extends SqlAlterDistribution {
78+
79+
public SqlModifyDistribution(
80+
SqlParserPos pos, SqlIdentifier tableName, SqlDistribution distribution) {
81+
super(pos, tableName, distribution);
82+
}
83+
84+
@Override
85+
protected String getAlterOperation() {
86+
return "MODIFY";
87+
}
88+
}
89+
}

flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableAdd.java

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222

2323
import org.apache.calcite.sql.SqlIdentifier;
2424
import org.apache.calcite.sql.SqlNodeList;
25-
import org.apache.calcite.sql.SqlWriter;
2625
import org.apache.calcite.sql.parser.SqlParserPos;
2726

2827
import javax.annotation.Nullable;
@@ -56,16 +55,12 @@ public SqlAlterTableAdd(
5655
SqlNodeList addedColumns,
5756
List<SqlTableConstraint> constraint,
5857
@Nullable SqlWatermark sqlWatermark,
59-
@Nullable SqlDistribution distribution,
6058
boolean ifTableExists) {
61-
super(pos, tableName, addedColumns, constraint, sqlWatermark, distribution, ifTableExists);
59+
super(pos, tableName, addedColumns, constraint, sqlWatermark, ifTableExists);
6260
}
6361

6462
@Override
65-
public void unparseAlterOperation(SqlWriter writer, int leftPrec, int rightPrec) {
66-
super.unparseAlterOperation(writer, leftPrec, rightPrec);
67-
writer.keyword("ADD");
68-
// unparse table schema and distribution
69-
unparseSchemaAndDistribution(writer, leftPrec, rightPrec);
63+
protected String getAlterOperation() {
64+
return "ADD";
7065
}
7166
}

flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableModify.java

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222

2323
import org.apache.calcite.sql.SqlIdentifier;
2424
import org.apache.calcite.sql.SqlNodeList;
25-
import org.apache.calcite.sql.SqlWriter;
2625
import org.apache.calcite.sql.parser.SqlParserPos;
2726

2827
import javax.annotation.Nullable;
@@ -56,16 +55,12 @@ public SqlAlterTableModify(
5655
SqlNodeList modifiedColumns,
5756
List<SqlTableConstraint> constraints,
5857
@Nullable SqlWatermark watermark,
59-
@Nullable SqlDistribution distribution,
6058
boolean ifTableExists) {
61-
super(pos, tableName, modifiedColumns, constraints, watermark, distribution, ifTableExists);
59+
super(pos, tableName, modifiedColumns, constraints, watermark, ifTableExists);
6260
}
6361

6462
@Override
65-
public void unparseAlterOperation(SqlWriter writer, int leftPrec, int rightPrec) {
66-
super.unparseAlterOperation(writer, leftPrec, rightPrec);
67-
writer.keyword("MODIFY");
68-
// unparse table schema and distribution
69-
unparseSchemaAndDistribution(writer, leftPrec, rightPrec);
63+
protected String getAlterOperation() {
64+
return "MODIFY";
7065
}
7166
}

flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableSchema.java

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@ public abstract class SqlAlterTableSchema extends SqlAlterTable implements Exten
4444

4545
protected final SqlNodeList columnList;
4646
@Nullable protected final SqlWatermark watermark;
47-
@Nullable protected final SqlDistribution distribution;
4847
protected final List<SqlTableConstraint> constraints;
4948

5049
public SqlAlterTableSchema(
@@ -53,12 +52,10 @@ public SqlAlterTableSchema(
5352
SqlNodeList columnList,
5453
List<SqlTableConstraint> constraints,
5554
@Nullable SqlWatermark sqlWatermark,
56-
@Nullable SqlDistribution distribution,
5755
boolean ifTableExists) {
5856
super(pos, tableName, ifTableExists);
5957
this.columnList = columnList;
6058
this.constraints = constraints;
61-
this.distribution = distribution;
6259
this.watermark = sqlWatermark;
6360
}
6461

@@ -85,10 +82,6 @@ public Optional<SqlWatermark> getWatermark() {
8582
return Optional.ofNullable(watermark);
8683
}
8784

88-
public Optional<SqlDistribution> getDistribution() {
89-
return Optional.ofNullable(distribution);
90-
}
91-
9285
public List<SqlTableConstraint> getConstraints() {
9386
return constraints;
9487
}
@@ -107,11 +100,13 @@ private SqlNodeList getColumns() {
107100
SqlParserPos.ZERO);
108101
}
109102

110-
void unparseSchemaAndDistribution(SqlWriter writer, int leftPrec, int rightPrec) {
103+
protected abstract String getAlterOperation();
104+
105+
@Override
106+
public void unparseAlterOperation(SqlWriter writer, int leftPrec, int rightPrec) {
107+
super.unparseAlterOperation(writer, leftPrec, rightPrec);
108+
writer.keyword(getAlterOperation());
111109
SqlUnparseUtils.unparseTableSchema(
112110
columnList, constraints, watermark, writer, leftPrec, rightPrec);
113-
if (distribution != null) {
114-
distribution.unparseAlter(writer, leftPrec, rightPrec);
115-
}
116111
}
117112
}

flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,15 @@
2222
import org.apache.flink.api.java.typeutils.TypeExtractor;
2323
import org.apache.flink.table.operations.Operation;
2424
import org.apache.flink.table.planner.operations.converters.SqlNodeConverter.ConvertContext;
25+
import org.apache.flink.table.planner.operations.converters.table.SqlAlterTableAddDistributionConverter;
2526
import org.apache.flink.table.planner.operations.converters.table.SqlAlterTableAddPartitionConverter;
2627
import org.apache.flink.table.planner.operations.converters.table.SqlAlterTableDropColumnConverter;
2728
import org.apache.flink.table.planner.operations.converters.table.SqlAlterTableDropConstraintConverter;
2829
import org.apache.flink.table.planner.operations.converters.table.SqlAlterTableDropDistributionConverter;
2930
import org.apache.flink.table.planner.operations.converters.table.SqlAlterTableDropPartitionConverter;
3031
import org.apache.flink.table.planner.operations.converters.table.SqlAlterTableDropPrimaryKeyConverter;
3132
import org.apache.flink.table.planner.operations.converters.table.SqlAlterTableDropWatermarkConverter;
33+
import org.apache.flink.table.planner.operations.converters.table.SqlAlterTableModifyDistributionConverter;
3234
import org.apache.flink.table.planner.operations.converters.table.SqlAlterTableOptionsConverter;
3335
import org.apache.flink.table.planner.operations.converters.table.SqlAlterTableRenameColumnConverter;
3436
import org.apache.flink.table.planner.operations.converters.table.SqlAlterTableRenameConverter;
@@ -106,6 +108,8 @@ public class SqlNodeConverters {
106108
register(new SqlAlterTableDropConstraintConverter());
107109
register(new SqlAlterTableDropWatermarkConverter());
108110
register(new SqlAlterTableDropDistributionConverter());
111+
register(new SqlAlterTableAddDistributionConverter());
112+
register(new SqlAlterTableModifyDistributionConverter());
109113
register(new SqlAlterTableRenameColumnConverter());
110114
register(new SqlAlterTableResetConverter());
111115
register(new SqlAlterTableOptionsConverter());

flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/table/AbstractAlterTableConverter.java

Lines changed: 2 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
package org.apache.flink.table.planner.operations.converters.table;
2020

2121
import org.apache.flink.sql.parser.ddl.SqlAlterTable;
22-
import org.apache.flink.sql.parser.ddl.SqlAlterTableSchema;
2322
import org.apache.flink.table.api.Schema;
2423
import org.apache.flink.table.api.ValidationException;
2524
import org.apache.flink.table.catalog.CatalogBaseTable;
@@ -36,12 +35,10 @@
3635
import org.apache.flink.table.operations.ddl.AlterTableChangeOperation;
3736
import org.apache.flink.table.operations.utils.ValidationUtils;
3837
import org.apache.flink.table.planner.operations.converters.SqlNodeConverter;
39-
import org.apache.flink.table.planner.utils.OperationConverterUtils;
4038

4139
import org.apache.calcite.sql.SqlIdentifier;
4240

4341
import java.util.List;
44-
import java.util.Map;
4542
import java.util.Optional;
4643

4744
/** Abstract class for ALTER TABLE converters. */
@@ -52,31 +49,10 @@ public abstract class AbstractAlterTableConverter<T extends SqlAlterTable>
5249
protected abstract Operation convertToOperation(
5350
T sqlAlterTable, ResolvedCatalogTable oldTable, ConvertContext context);
5451

55-
protected final Schema getOldSchema(ResolvedCatalogTable resolvedCatalogTable) {
56-
return resolvedCatalogTable.getUnresolvedSchema();
57-
}
58-
59-
protected final TableDistribution getOldTableDistribution(
60-
ResolvedCatalogTable resolvedCatalogTable) {
61-
return resolvedCatalogTable.getDistribution().orElse(null);
62-
}
63-
64-
protected final List<String> getOldPartitionKeys(ResolvedCatalogTable resolvedCatalogTable) {
65-
return resolvedCatalogTable.getPartitionKeys();
66-
}
67-
68-
protected final String getOldComment(ResolvedCatalogTable resolvedCatalogTable) {
69-
return resolvedCatalogTable.getComment();
70-
}
71-
72-
protected final Map<String, String> getOldOptions(ResolvedCatalogTable resolvedCatalogTable) {
73-
return resolvedCatalogTable.getOptions();
74-
}
75-
7652
@Override
7753
public final Operation convertSqlNode(T sqlAlterTable, ConvertContext context) {
7854
CatalogManager catalogManager = context.getCatalogManager();
79-
final ObjectIdentifier tableIdentifier = getIdentifier(sqlAlterTable, context);
55+
final ObjectIdentifier tableIdentifier = resolveIdentifier(sqlAlterTable, context);
8056
Optional<ContextResolvedTable> optionalCatalogTable =
8157
catalogManager.getTable(tableIdentifier);
8258

@@ -133,22 +109,13 @@ protected static String getColumnName(SqlIdentifier identifier) {
133109
return identifier.getSimple();
134110
}
135111

136-
protected final ObjectIdentifier getIdentifier(SqlAlterTable node, ConvertContext context) {
112+
protected final ObjectIdentifier resolveIdentifier(SqlAlterTable node, ConvertContext context) {
137113
UnresolvedIdentifier unresolvedIdentifier = UnresolvedIdentifier.of(node.fullTableName());
138114
return context.getCatalogManager().qualifyIdentifier(unresolvedIdentifier);
139115
}
140116

141117
protected TableDistribution getTableDistribution(
142118
SqlAlterTable alterTable, ResolvedCatalogTable oldTable) {
143-
if (alterTable instanceof SqlAlterTableSchema) {
144-
final Optional<TableDistribution> tableDistribution =
145-
((SqlAlterTableSchema) alterTable)
146-
.getDistribution()
147-
.map(OperationConverterUtils::getDistributionFromSqlDistribution);
148-
if (tableDistribution.isPresent()) {
149-
return tableDistribution.get();
150-
}
151-
}
152119
return oldTable.getDistribution().orElse(null);
153120
}
154121
}

0 commit comments

Comments
 (0)