Skip to content

Commit 81bf2eb

Browse files
committed
[FLINK-38712][table] Decouple table and TableSchemaContext
This closes #27264
1 parent ede7c2c commit 81bf2eb

File tree

5 files changed

+67
-44
lines changed

5 files changed

+67
-44
lines changed

flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,8 @@
5858
"org.apache.flink.sql.parser.ddl.SqlAlterModelReset"
5959
"org.apache.flink.sql.parser.ddl.SqlAlterModelSet"
6060
"org.apache.flink.sql.parser.ddl.SqlAlterTable"
61-
"org.apache.flink.sql.parser.ddl.SqlAlterTable.AlterTableContext"
61+
"org.apache.flink.sql.parser.ddl.TableSchemaContext"
62+
"org.apache.flink.sql.parser.ddl.TableSchemaContext.AlterTableSchemaContext"
6263
"org.apache.flink.sql.parser.ddl.SqlAlterTableAdd"
6364
"org.apache.flink.sql.parser.ddl.SqlAlterTableAddConstraint"
6465
"org.apache.flink.sql.parser.ddl.SqlAlterTableDropColumn"
@@ -82,7 +83,6 @@
8283
"org.apache.flink.sql.parser.ddl.SqlCreateModelAs"
8384
"org.apache.flink.sql.parser.ddl.SqlCreateOrAlterMaterializedTable"
8485
"org.apache.flink.sql.parser.ddl.SqlCreateTable"
85-
"org.apache.flink.sql.parser.ddl.SqlCreateTable.TableCreationContext"
8686
"org.apache.flink.sql.parser.ddl.SqlCreateTableAs"
8787
"org.apache.flink.sql.parser.ddl.SqlCreateTableLike"
8888
"org.apache.flink.sql.parser.ddl.SqlCreateView"

flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl

Lines changed: 20 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -857,7 +857,7 @@ SqlAlterTable SqlAlterTable() :
857857
SqlTableConstraint constraint;
858858
SqlIdentifier originColumnIdentifier;
859859
SqlIdentifier newColumnIdentifier;
860-
AlterTableContext ctx = new AlterTableContext();
860+
AlterTableSchemaContext ctx = new AlterTableSchemaContext();
861861
AlterTableAddPartitionContext addPartitionCtx = new AlterTableAddPartitionContext();
862862
AlterTableDropPartitionsContext dropPartitionsCtx = new AlterTableDropPartitionsContext();
863863
}
@@ -924,8 +924,7 @@ SqlAlterTable SqlAlterTable() :
924924
|
925925
(
926926
<DISTRIBUTION>
927-
ctx.distribution = SqlDistribution(getPos())
928-
{return new SqlAddDistribution(getPos(), tableIdentifier, ctx.distribution);}
927+
{return new SqlAddDistribution(getPos(), tableIdentifier, SqlDistribution(getPos()));}
929928
|
930929
AlterTableAddOrModify(ctx)
931930
|
@@ -950,8 +949,7 @@ SqlAlterTable SqlAlterTable() :
950949
<MODIFY>
951950
(
952951
<DISTRIBUTION>
953-
ctx.distribution = SqlDistribution(getPos())
954-
{return new SqlModifyDistribution(getPos(), tableIdentifier, ctx.distribution);}
952+
{return new SqlModifyDistribution(getPos(), tableIdentifier, SqlDistribution(getPos()));}
955953
|
956954
AlterTableAddOrModify(ctx)
957955
|
@@ -1062,7 +1060,7 @@ SqlNodeList PropertyKeys():
10621060
{ return new SqlNodeList(proKeyList, span.end(this)); }
10631061
}
10641062

1065-
void TableColumn(TableCreationContext context) :
1063+
void TableColumn(TableSchemaContext context) :
10661064
{
10671065
SqlTableConstraint constraint;
10681066
}
@@ -1081,7 +1079,7 @@ void TableColumn(TableCreationContext context) :
10811079
)
10821080
}
10831081

1084-
void Watermark(TableCreationContext context) :
1082+
void Watermark(TableSchemaContext context) :
10851083
{
10861084
SqlIdentifier eventTimeColumnName;
10871085
SqlParserPos pos;
@@ -1102,7 +1100,7 @@ void Watermark(TableCreationContext context) :
11021100
}
11031101

11041102
/** Parses {@code column_name column_data_type [...]}. */
1105-
SqlTableColumn TypedColumn(TableCreationContext context) :
1103+
SqlTableColumn TypedColumn(TableSchemaContext context) :
11061104
{
11071105
SqlTableColumn tableColumn;
11081106
SqlIdentifier name;
@@ -1123,7 +1121,7 @@ SqlTableColumn TypedColumn(TableCreationContext context) :
11231121
}
11241122

11251123
/** Parses {@code column_name AS expr [COMMENT 'comment']}. */
1126-
SqlTableColumn ComputedColumn(TableCreationContext context) :
1124+
SqlTableColumn ComputedColumn(TableSchemaContext context) :
11271125
{
11281126
SqlIdentifier name;
11291127
SqlParserPos pos;
@@ -1152,7 +1150,7 @@ SqlTableColumn ComputedColumn(TableCreationContext context) :
11521150
}
11531151

11541152
/** Parses {@code column_name column_data_type METADATA [FROM 'alias_name'] [VIRTUAL] [COMMENT 'comment']}. */
1155-
SqlTableColumn MetadataColumn(TableCreationContext context, SqlIdentifier name, SqlDataTypeSpec type) :
1153+
SqlTableColumn MetadataColumn(TableSchemaContext context, SqlIdentifier name, SqlDataTypeSpec type) :
11561154
{
11571155
SqlNode metadataAlias = null;
11581156
boolean isVirtual = false;
@@ -1189,7 +1187,7 @@ SqlTableColumn MetadataColumn(TableCreationContext context, SqlIdentifier name,
11891187
}
11901188

11911189
/** Parses {@code column_name column_data_type [constraint] [COMMENT 'comment']}. */
1192-
SqlTableColumn RegularColumn(TableCreationContext context, SqlIdentifier name, SqlDataTypeSpec type) :
1190+
SqlTableColumn RegularColumn(TableSchemaContext context, SqlIdentifier name, SqlDataTypeSpec type) :
11931191
{
11941192
SqlTableConstraint constraint = null;
11951193
SqlCharStringLiteral comment = null;
@@ -1245,8 +1243,8 @@ void AlterTableAddPartition(AlterTableAddPartitionContext context) :
12451243
}
12461244
}
12471245

1248-
/** Parses {@code ALTER TABLE table_name ADD/MODIFY [...]}. */
1249-
void AlterTableAddOrModify(AlterTableContext context) :
1246+
/** Parses {@code ALTER [MATERIALIZED ]TABLE table_name ADD/MODIFY [...]}. */
1247+
void AlterTableAddOrModify(AlterTableSchemaContext context) :
12501248
{
12511249
SqlTableConstraint constraint;
12521250
}
@@ -1263,7 +1261,7 @@ void AlterTableAddOrModify(AlterTableContext context) :
12631261
}
12641262

12651263
/** Parses {@code ADD/MODIFY column_name column_data_type [...]}. */
1266-
void AddOrModifyColumn(AlterTableContext context) :
1264+
void AddOrModifyColumn(AlterTableSchemaContext context) :
12671265
{
12681266
SqlTableColumn column;
12691267
SqlIdentifier referencedColumn = null;
@@ -1568,7 +1566,7 @@ SqlCreate SqlCreateTable(Span s, boolean replace, boolean isTemporary) :
15681566

15691567
tableName = CompoundIdentifier()
15701568
[
1571-
<LPAREN> { pos = getPos(); TableCreationContext ctx = new TableCreationContext();}
1569+
<LPAREN> { pos = getPos(); TableSchemaContext ctx = new TableSchemaContext();}
15721570
TableColumnsOrIdentifiers(pos, ctx)
15731571
{
15741572
pos = pos.plus(getPos());
@@ -1753,9 +1751,9 @@ SqlDrop SqlDropTable(Span s, boolean replace, boolean isTemporary) :
17531751
}
17541752
}
17551753

1756-
void TableColumnsOrIdentifiers(SqlParserPos pos, TableCreationContext ctx) :
1754+
void TableColumnsOrIdentifiers(SqlParserPos pos, TableSchemaContext ctx) :
17571755
{
1758-
final TableCreationContext tempCtx = new TableCreationContext();
1756+
final TableSchemaContext tempCtx = new TableSchemaContext();
17591757
final List<SqlNode> identifiers = new ArrayList<SqlNode>();
17601758
}
17611759
{
@@ -1812,7 +1810,7 @@ SqlNode SqlReplaceTable() :
18121810

18131811
tableName = CompoundIdentifier()
18141812
[
1815-
<LPAREN> { pos = getPos(); TableCreationContext ctx = new TableCreationContext();}
1813+
<LPAREN> { pos = getPos(); TableSchemaContext ctx = new TableSchemaContext();}
18161814
TableColumnsOrIdentifiers(pos, ctx)
18171815
{
18181816
pos = getPos();
@@ -1899,7 +1897,7 @@ SqlCreate SqlCreateOrAlterMaterializedTable(Span s, boolean replace, boolean isT
18991897
<TABLE>
19001898
tableName = CompoundIdentifier()
19011899
[
1902-
<LPAREN> { pos = getPos(); TableCreationContext ctx = new TableCreationContext();}
1900+
<LPAREN> { pos = getPos(); TableSchemaContext ctx = new TableSchemaContext();}
19031901
TableColumnsOrIdentifiers(pos, ctx) {
19041902
pos = pos.plus(getPos());
19051903
isColumnsIdentifiersOnly = ctx.isColumnsIdentifiersOnly();
@@ -2013,6 +2011,7 @@ SqlAlterMaterializedTable SqlAlterMaterializedTable() :
20132011
SqlNodeList partSpec = SqlNodeList.EMPTY;
20142012
SqlNode freshness = null;
20152013
SqlNode asQuery = null;
2014+
AlterTableSchemaContext ctx = new AlterTableSchemaContext();
20162015
}
20172016
{
20182017
<ALTER> <MATERIALIZED> <TABLE> { startPos = getPos();}
@@ -3461,7 +3460,7 @@ SqlCreate SqlCreateModel(Span s, boolean isTemporary) :
34613460

34623461
modelIdentifier = CompoundIdentifier()
34633462
[
3464-
<INPUT> <LPAREN> { pos = getPos(); TableCreationContext ctx = new TableCreationContext();}
3463+
<INPUT> <LPAREN> { pos = getPos(); TableSchemaContext ctx = new TableSchemaContext();}
34653464
TableColumn(ctx)
34663465
(
34673466
<COMMA> TableColumn(ctx)
@@ -3473,7 +3472,7 @@ SqlCreate SqlCreateModel(Span s, boolean isTemporary) :
34733472
<RPAREN>
34743473
]
34753474
[
3476-
<OUTPUT> <LPAREN> { pos = getPos(); TableCreationContext ctx = new TableCreationContext();}
3475+
<OUTPUT> <LPAREN> { pos = getPos(); TableSchemaContext ctx = new TableSchemaContext();}
34773476
TableColumn(ctx)
34783477
(
34793478
<COMMA> TableColumn(ctx)

flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTable.java

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import org.apache.calcite.sql.SqlAlter;
2424
import org.apache.calcite.sql.SqlIdentifier;
2525
import org.apache.calcite.sql.SqlKind;
26-
import org.apache.calcite.sql.SqlNode;
2726
import org.apache.calcite.sql.SqlNodeList;
2827
import org.apache.calcite.sql.SqlOperator;
2928
import org.apache.calcite.sql.SqlSpecialOperator;
@@ -32,9 +31,7 @@
3231

3332
import javax.annotation.Nullable;
3433

35-
import java.util.ArrayList;
3634
import java.util.LinkedHashMap;
37-
import java.util.List;
3835

3936
import static java.util.Objects.requireNonNull;
4037

@@ -118,9 +115,4 @@ public LinkedHashMap<String, String> getPartitionKVs() {
118115
public boolean ifTableExists() {
119116
return ifTableExists;
120117
}
121-
122-
/** Alter table context. */
123-
public static class AlterTableContext extends SqlCreateTable.TableCreationContext {
124-
public List<SqlNode> columnPositions = new ArrayList<>();
125-
}
126118
}

flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@
4141
import javax.annotation.Nonnull;
4242
import javax.annotation.Nullable;
4343

44-
import java.util.ArrayList;
4544
import java.util.List;
4645
import java.util.Optional;
4746

@@ -213,16 +212,4 @@ public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
213212
SqlUnparseUtils.unparsePartitionKeyList(partitionKeyList, writer, leftPrec, rightPrec);
214213
SqlUnparseUtils.unparseProperties(properties, writer, leftPrec, rightPrec);
215214
}
216-
217-
/** Table creation context. */
218-
public static class TableCreationContext {
219-
public List<SqlNode> columnList = new ArrayList<>();
220-
public List<SqlTableConstraint> constraints = new ArrayList<>();
221-
@Nullable public SqlWatermark watermark;
222-
@Nullable public SqlDistribution distribution;
223-
224-
public boolean isColumnsIdentifiersOnly() {
225-
return !columnList.isEmpty() && columnList.get(0) instanceof SqlIdentifier;
226-
}
227-
}
228215
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.sql.parser.ddl;
20+
21+
import org.apache.flink.sql.parser.ddl.constraint.SqlTableConstraint;
22+
23+
import org.apache.calcite.sql.SqlIdentifier;
24+
import org.apache.calcite.sql.SqlNode;
25+
26+
import javax.annotation.Nullable;
27+
28+
import java.util.ArrayList;
29+
import java.util.List;
30+
31+
/** Table schema creation context. */
32+
public class TableSchemaContext {
33+
public List<SqlNode> columnList = new ArrayList<>();
34+
public List<SqlTableConstraint> constraints = new ArrayList<>();
35+
@Nullable public SqlWatermark watermark;
36+
37+
public boolean isColumnsIdentifiersOnly() {
38+
return !columnList.isEmpty() && columnList.get(0) instanceof SqlIdentifier;
39+
}
40+
41+
/** Alter table context. */
42+
public static class AlterTableSchemaContext extends TableSchemaContext {
43+
public List<SqlNode> columnPositions = new ArrayList<>();
44+
}
45+
}

0 commit comments

Comments
 (0)