From a40714fb31a049d0471099e22f1a5e68ba29bc42 Mon Sep 17 00:00:00 2001 From: Hao Li <1127478+lihaosky@users.noreply.github.com> Date: Mon, 17 Nov 2025 17:40:42 -0800 Subject: [PATCH 1/6] [FLINK-38260] Add parser changes for connection sql --- .../src/main/codegen/data/Parser.tdd | 14 +- .../src/main/codegen/includes/parserImpls.ftl | 197 ++++++++++++++++- .../sql/parser/ddl/SqlAlterConnection.java | 66 ++++++ .../parser/ddl/SqlAlterConnectionRename.java | 61 ++++++ .../parser/ddl/SqlAlterConnectionReset.java | 70 ++++++ .../sql/parser/ddl/SqlAlterConnectionSet.java | 68 ++++++ .../sql/parser/ddl/SqlCreateConnection.java | 162 ++++++++++++++ .../sql/parser/ddl/SqlDropConnection.java | 107 +++++++++ .../parser/dql/SqlRichDescribeConnection.java | 79 +++++++ .../sql/parser/dql/SqlShowConnections.java | 60 ++++++ .../parser/dql/SqlShowCreateConnection.java | 65 ++++++ .../sql/parser/utils/ParserResource.java | 4 + .../sql/parser/FlinkSqlParserImplTest.java | 203 ++++++++++++++++++ 13 files changed, 1152 insertions(+), 4 deletions(-) create mode 100644 flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterConnection.java create mode 100644 flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterConnectionRename.java create mode 100644 flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterConnectionReset.java create mode 100644 flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterConnectionSet.java create mode 100644 flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateConnection.java create mode 100644 flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropConnection.java create mode 100644 flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlRichDescribeConnection.java create mode 100644 flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowConnections.java create mode 100644 flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowCreateConnection.java diff --git a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd index 6e93241af40ef..ef2c129573755 100644 --- a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd +++ b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd @@ -39,6 +39,10 @@ "org.apache.flink.sql.parser.ddl.SqlAlterCatalogOptions" "org.apache.flink.sql.parser.ddl.SqlAlterCatalogReset" "org.apache.flink.sql.parser.ddl.SqlAlterCatalogComment" + "org.apache.flink.sql.parser.ddl.SqlAlterConnection" + "org.apache.flink.sql.parser.ddl.SqlAlterConnectionRename" + "org.apache.flink.sql.parser.ddl.SqlAlterConnectionReset" + "org.apache.flink.sql.parser.ddl.SqlAlterConnectionSet" "org.apache.flink.sql.parser.ddl.SqlAlterDatabase" "org.apache.flink.sql.parser.ddl.SqlAlterFunction" "org.apache.flink.sql.parser.ddl.SqlAlterMaterializedTable" @@ -76,6 +80,7 @@ "org.apache.flink.sql.parser.ddl.SqlAlterViewRename" "org.apache.flink.sql.parser.ddl.SqlCompilePlan" "org.apache.flink.sql.parser.ddl.SqlCreateCatalog" + "org.apache.flink.sql.parser.ddl.SqlCreateConnection" "org.apache.flink.sql.parser.ddl.SqlCreateDatabase" "org.apache.flink.sql.parser.ddl.SqlCreateFunction" "org.apache.flink.sql.parser.ddl.SqlCreateModel" @@ -87,6 +92,7 @@ "org.apache.flink.sql.parser.ddl.SqlCreateView" "org.apache.flink.sql.parser.ddl.SqlDistribution" "org.apache.flink.sql.parser.ddl.SqlDropCatalog" + "org.apache.flink.sql.parser.ddl.SqlDropConnection" "org.apache.flink.sql.parser.ddl.SqlDropDatabase" "org.apache.flink.sql.parser.ddl.SqlDropFunction" "org.apache.flink.sql.parser.ddl.SqlDropMaterializedTable" @@ -137,12 +143,15 @@ "org.apache.flink.sql.parser.dql.SqlShowTables" "org.apache.flink.sql.parser.dql.SqlShowTables.SqlTableKind" "org.apache.flink.sql.parser.dql.SqlShowColumns" + "org.apache.flink.sql.parser.dql.SqlShowConnections" "org.apache.flink.sql.parser.dql.SqlShowCreate" + "org.apache.flink.sql.parser.dql.SqlShowCreateConnection" "org.apache.flink.sql.parser.dql.SqlShowCreateMaterializedTable" "org.apache.flink.sql.parser.dql.SqlShowCreateModel" "org.apache.flink.sql.parser.dql.SqlShowCreateTable" "org.apache.flink.sql.parser.dql.SqlShowCreateView" "org.apache.flink.sql.parser.dql.SqlShowCreateCatalog" + "org.apache.flink.sql.parser.dql.SqlRichDescribeConnection" "org.apache.flink.sql.parser.dql.SqlRichDescribeFunction" "org.apache.flink.sql.parser.dql.SqlRichDescribeModel" "org.apache.flink.sql.parser.dql.SqlRichDescribeTable" @@ -185,6 +194,7 @@ "COMMENT" "COMPILE" "COMPUTE" + "CONNECTIONS", "CONTINUOUS" "DATABASES" "DISTRIBUTED" @@ -283,7 +293,6 @@ "COMPILE" "CONDITIONAL" "CONDITION_NUMBER" - "CONNECTION" "CONNECTION_NAME" "CONSTRAINT_CATALOG" "CONSTRAINT_NAME" @@ -618,12 +627,14 @@ "SqlAlterFunction()" "SqlShowFunctions()" "SqlShowModels()" + "SqlShowConnections()" "SqlShowTables()" "SqlShowColumns()" "SqlShowCreate()" "SqlReplaceTable()" "SqlAlterMaterializedTable()" "SqlAlterModel()" + "SqlAlterConnection()" "SqlAlterTable()" "SqlAlterView()" "SqlShowModules()" @@ -648,6 +659,7 @@ "SqlDescribeJob()" "SqlRichDescribeFunction()" "SqlRichDescribeModel()" + "SqlRichDescribeConnection()" "SqlRichDescribeTable()" ] diff --git a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl index bf2f07ce06084..febb849cc68db 100644 --- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl +++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl @@ -776,6 +776,13 @@ SqlShowCreate SqlShowCreate() : { return new SqlShowCreateModel(pos, sqlIdentifier); } + | + + { pos = getPos(); } + sqlIdentifier = CompoundIdentifier() + { + return new SqlShowCreateConnection(pos, sqlIdentifier); + } | { pos = getPos(); } @@ -824,6 +831,25 @@ SqlRichDescribeModel SqlRichDescribeModel() : } } +/** + * DESCRIBE | DESC CONNECTION [ EXTENDED] [[catalogName.] dataBasesName].connectionName sql call. + * Here we add Rich in className to match the naming of SqlRichDescribeTable. + */ +SqlRichDescribeConnection SqlRichDescribeConnection() : +{ + SqlIdentifier connectionName; + SqlParserPos pos; + boolean isExtended = false; +} +{ + ( | ) { pos = getPos();} + [ { isExtended = true;} ] + connectionName = CompoundIdentifier() + { + return new SqlRichDescribeConnection(pos, connectionName, isExtended); + } +} + /** * DESCRIBE | DESC [ EXTENDED] [[catalogName.] dataBasesName].tableName sql call. * Here we add Rich in className to distinguish from calcite's original SqlDescribeTable. @@ -2663,9 +2689,13 @@ SqlCreate SqlCreateExtended(Span s, boolean replace) : | create = SqlCreateDatabase(s, replace) | + create = SqlCreateModel(s, isTemporary) + | + // Lookahead to distinguish FUNCTION and + LOOKAHEAD(2) create = SqlCreateFunction(s, replace, isTemporary) | - create = SqlCreateModel(s, isTemporary) + create = SqlCreateConnection(s, isTemporary) ) { return create; @@ -2692,9 +2722,13 @@ SqlDrop SqlDropExtended(Span s, boolean replace) : | drop = SqlDropDatabase(s, replace) | + drop = SqlDropModel(s, isTemporary) + | + // Lookahead to distinguish FUNCTION and + LOOKAHEAD(2) drop = SqlDropFunction(s, replace, isTemporary) | - drop = SqlDropModel(s, isTemporary) + drop = SqlDropConnection(s, isTemporary) ) { return drop; @@ -3325,7 +3359,7 @@ SqlTruncateTable SqlTruncateTable() : } /** -* SHOW MODELS [FROM [catalog.] database] [[NOT] LIKE pattern]; sql call. +* SHOW MODELS [FROM [catalog.] database] [[NOT] LIKE pattern]; */ SqlShowModels SqlShowModels() : { @@ -3361,6 +3395,43 @@ SqlShowModels SqlShowModels() : } } +/** +* SHOW CONNECTIONS [LIKE 'pattern'] [FROM catalog_name.db_name]; +*/ +SqlShowConnections SqlShowConnections() : +{ + SqlIdentifier databaseName = null; + SqlCharStringLiteral likeLiteral = null; + String prep = null; + boolean notLike = false; + SqlParserPos pos; +} +{ + + { pos = getPos(); } + [ + ( { prep = "FROM"; } | { prep = "IN"; } ) + { pos = getPos(); } + databaseName = CompoundIdentifier() + ] + [ + [ + + { + notLike = true; + } + ] + + { + String likeCondition = SqlParserUtil.parseString(token.image); + likeLiteral = SqlLiteral.createCharString(likeCondition, getPos()); + } + ] + { + return new SqlShowConnections(pos, prep, databaseName, notLike, likeLiteral); + } +} + /** * ALTER MODEL [IF EXISTS] modelName SET (property_key = property_val, ...) * ALTER MODEL [IF EXISTS] modelName RENAME TO newModelName @@ -3413,6 +3484,54 @@ SqlAlterModel SqlAlterModel() : ) } +/** +* ALTER CONNECTION connectionName SET (property_key = property_val, ...) +* ALTER CONNECTION connectionName RENAME TO newConnectionName +* ALTER CONNECTION connectionName RESET (property_key, ...) +* Alter temporary or system connection is not supported. +*/ +SqlAlterConnection SqlAlterConnection() : +{ + SqlParserPos startPos; + SqlIdentifier connectionIdentifier; + SqlIdentifier newConnectionIdentifier = null; + SqlNodeList propertyList = SqlNodeList.EMPTY; + SqlNodeList propertyKeyList = SqlNodeList.EMPTY; +} +{ + { startPos = getPos(); } + connectionIdentifier = CompoundIdentifier() + ( + LOOKAHEAD(2) + + newConnectionIdentifier = CompoundIdentifier() + { + return new SqlAlterConnectionRename( + startPos.plus(getPos()), + connectionIdentifier, + newConnectionIdentifier); + } + | + + propertyList = Properties() + { + return new SqlAlterConnectionSet( + startPos.plus(getPos()), + connectionIdentifier, + propertyList); + } + | + + propertyKeyList = PropertyKeys() + { + return new SqlAlterConnectionReset( + startPos.plus(getPos()), + connectionIdentifier, + propertyKeyList); + } + ) +} + /** * DROP MODEL [IF EXIST] modelName */ @@ -3433,6 +3552,29 @@ SqlDrop SqlDropModel(Span s, boolean isTemporary) : } } +/** +* DROP [TEMPORARY] [SYSTEM] CONNECTION [IF EXIST] connectionName +*/ +SqlDrop SqlDropConnection(Span s, boolean isTemporary) : +{ + SqlIdentifier connectionIdentifier = null; + boolean ifExists = false; + boolean isSystemConnection = false; +} +{ + [ { isSystemConnection = true; } ] + + + + ifExists = IfExistsOpt() + + connectionIdentifier = CompoundIdentifier() + + { + return new SqlDropConnection(s.pos(), connectionIdentifier, ifExists, isTemporary, isSystemConnection); + } +} + /** * CREATE MODEL [IF NOT EXIST] modelName * [INPUT(col1 type1, col2 type2, ...)] @@ -3519,6 +3661,55 @@ SqlCreate SqlCreateModel(Span s, boolean isTemporary) : } } +/** +* CREATE [TEMPORARY] [SYSTEM] CONNECTION [IF NOT EXISTS] [catalog_name.][db_name.]connection_name +* [COMMENT connection_comment] +* WITH (property_key = property_val, ...) +*/ +SqlCreate SqlCreateConnection(Span s, boolean isTemporary) : +{ + final SqlParserPos startPos = s.pos(); + boolean ifNotExists = false; + boolean isSystem = false; + SqlIdentifier connectionIdentifier; + SqlCharStringLiteral comment = null; + SqlNodeList propertyList = SqlNodeList.EMPTY; +} +{ + [ + + { + if (!isTemporary){ + throw SqlUtil.newContextException(getPos(), + ParserResource.RESOURCE.createSystemConnectionOnlySupportTemporary()); + } + isSystem = true; + } + ] + + + ifNotExists = IfNotExistsOpt() + + connectionIdentifier = CompoundIdentifier() + [ + { + String p = SqlParserUtil.parseString(token.image); + comment = SqlLiteral.createCharString(p, getPos()); + } + ] + + propertyList = Properties() + { + return new SqlCreateConnection(startPos.plus(getPos()), + connectionIdentifier, + comment, + propertyList, + isTemporary, + isSystem, + ifNotExists); + } +} + SqlCharStringLiteral Comment() : { } diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterConnection.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterConnection.java new file mode 100644 index 0000000000000..d7bde1731016a --- /dev/null +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterConnection.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.parser.ddl; + + +import static java.util.Objects.requireNonNull; + +import org.apache.calcite.sql.SqlCall; +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlOperator; +import org.apache.calcite.sql.SqlSpecialOperator; +import org.apache.calcite.sql.SqlWriter; +import org.apache.calcite.sql.parser.SqlParserPos; + +/** + * Abstract class to describe statements like ALTER CONNECTION [[catalogName.] + * dataBasesName.]connectionName ... + */ +public abstract class SqlAlterConnection extends SqlCall { + + public static final SqlSpecialOperator OPERATOR = + new SqlSpecialOperator("ALTER CONNECTION", SqlKind.OTHER_DDL); + + protected final SqlIdentifier connectionName; + + public SqlAlterConnection(SqlParserPos pos, SqlIdentifier connectionName) { + super(pos); + this.connectionName = requireNonNull(connectionName, "connectionName should not be null"); + } + + @Override + public SqlOperator getOperator() { + return OPERATOR; + } + + public SqlIdentifier getConnectionName() { + return connectionName; + } + + public String[] fullConnectionName() { + return connectionName.names.toArray(new String[0]); + } + + @Override + public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { + writer.keyword("ALTER CONNECTION"); + connectionName.unparse(writer, leftPrec, rightPrec); + } +} diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterConnectionRename.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterConnectionRename.java new file mode 100644 index 0000000000000..d40f3d16d1538 --- /dev/null +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterConnectionRename.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.parser.ddl; + +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlWriter; +import org.apache.calcite.sql.parser.SqlParserPos; + +import java.util.List; + +import static java.util.Objects.requireNonNull; + +/** ALTER CONNECTION [[catalogName.] dataBasesName.]connectionName RENAME TO newConnectionName. */ +public class SqlAlterConnectionRename extends SqlAlterConnection { + + private final SqlIdentifier newConnectionName; + + public SqlAlterConnectionRename( + SqlParserPos pos, SqlIdentifier connectionName, SqlIdentifier newConnectionName) { + super(pos, connectionName); + this.newConnectionName = + requireNonNull(newConnectionName, "newConnectionName should not be null"); + } + + public SqlIdentifier getNewConnectionName() { + return newConnectionName; + } + + public String[] fullNewConnectionName() { + return newConnectionName.names.toArray(new String[0]); + } + + @Override + public List getOperandList() { + return List.of(connectionName, newConnectionName); + } + + @Override + public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { + super.unparse(writer, leftPrec, rightPrec); + writer.keyword("RENAME TO"); + newConnectionName.unparse(writer, leftPrec, rightPrec); + } +} diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterConnectionReset.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterConnectionReset.java new file mode 100644 index 0000000000000..2a9cee0285dd0 --- /dev/null +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterConnectionReset.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.parser.ddl; + +import org.apache.flink.sql.parser.SqlUnparseUtils; + +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlLiteral; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlNodeList; +import org.apache.calcite.sql.SqlWriter; +import org.apache.calcite.sql.parser.SqlParserPos; +import org.apache.calcite.util.NlsString; + +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +import static java.util.Objects.requireNonNull; + +/** ALTER CONNECTION [[catalogName.] dataBasesName.]connectionName RESET ( 'key1' [, 'key2']...). */ +public class SqlAlterConnectionReset extends SqlAlterConnection { + private final SqlNodeList optionKeyList; + + public SqlAlterConnectionReset( + SqlParserPos pos, SqlIdentifier connectionName, SqlNodeList optionKeyList) { + super(pos, connectionName); + this.optionKeyList = requireNonNull(optionKeyList, "optionKeyList should not be null"); + } + + @Override + public List getOperandList() { + return List.of(connectionName, optionKeyList); + } + + public Set getResetKeys() { + return optionKeyList.getList().stream() + .map(key -> ((NlsString) SqlLiteral.value(key)).getValue()) + .collect(Collectors.toSet()); + } + + @Override + public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { + super.unparse(writer, leftPrec, rightPrec); + writer.keyword("RESET"); + SqlWriter.Frame withFrame = writer.startList("(", ")"); + for (SqlNode optionKey : optionKeyList) { + SqlUnparseUtils.printIndent(writer); + optionKey.unparse(writer, leftPrec, rightPrec); + } + writer.newlineAndIndent(); + writer.endList(withFrame); + } +} diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterConnectionSet.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterConnectionSet.java new file mode 100644 index 0000000000000..0d4e7e9714112 --- /dev/null +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterConnectionSet.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.parser.ddl; + +import org.apache.flink.sql.parser.SqlUnparseUtils; + +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlNodeList; +import org.apache.calcite.sql.SqlWriter; +import org.apache.calcite.sql.parser.SqlParserPos; + +import java.util.List; + +import static java.util.Objects.requireNonNull; + +/** + * ALTER CONNECTION [[catalogName.] dataBasesName.]connectionName SET ( name=value [, name=value]*). + */ +public class SqlAlterConnectionSet extends SqlAlterConnection { + + private final SqlNodeList connectionOptionList; + + public SqlAlterConnectionSet( + SqlParserPos pos, SqlIdentifier connectionName, SqlNodeList connectionOptionList) { + super(pos, connectionName); + this.connectionOptionList = + requireNonNull(connectionOptionList, "connectionOptionList should not be null"); + } + + public SqlNodeList getOptionList() { + return connectionOptionList; + } + + @Override + public List getOperandList() { + return List.of(connectionName, connectionOptionList); + } + + @Override + public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { + super.unparse(writer, leftPrec, rightPrec); + writer.keyword("SET"); + SqlWriter.Frame withFrame = writer.startList("(", ")"); + for (SqlNode connectionOption : connectionOptionList) { + SqlUnparseUtils.printIndent(writer); + connectionOption.unparse(writer, leftPrec, rightPrec); + } + writer.newlineAndIndent(); + writer.endList(withFrame); + } +} diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateConnection.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateConnection.java new file mode 100644 index 0000000000000..6fbe77104eddd --- /dev/null +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateConnection.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.parser.ddl; + +import org.apache.flink.sql.parser.ExtendedSqlNode; +import org.apache.flink.sql.parser.SqlUnparseUtils; +import org.apache.flink.sql.parser.error.SqlValidateException; + +import org.apache.calcite.sql.SqlCharStringLiteral; +import org.apache.calcite.sql.SqlCreate; +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlNodeList; +import org.apache.calcite.sql.SqlOperator; +import org.apache.calcite.sql.SqlSpecialOperator; +import org.apache.calcite.sql.SqlWriter; +import org.apache.calcite.sql.parser.SqlParserPos; +import org.apache.calcite.util.ImmutableNullableList; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.List; +import java.util.Optional; + +import static java.util.Objects.requireNonNull; + +/** + * {@link SqlNode} to describe the CREATE CONNECTION syntax. CREATE [TEMPORARY] [SYSTEM] CONNECTION + * [IF NOT EXISTS] [[catalogName.] dataBasesName].connectionName [COMMENT connection_comment] WITH + * (name=value, [name=value]*). + */ +public class SqlCreateConnection extends SqlCreate implements ExtendedSqlNode { + + public static final SqlSpecialOperator OPERATOR = + new SqlSpecialOperator("CREATE CONNECTION", SqlKind.OTHER_DDL); + + private final SqlIdentifier connectionName; + + @Nullable private final SqlCharStringLiteral comment; + + private final SqlNodeList propertyList; + + private final boolean isTemporary; + + private final boolean isSystem; + + private final boolean ifNotExists; + + public SqlCreateConnection( + SqlParserPos pos, + SqlIdentifier connectionName, + SqlCharStringLiteral comment, + SqlNodeList propertyList, + boolean isTemporary, + boolean isSystem, + boolean ifNotExists) { + super(OPERATOR, pos, false, ifNotExists); + this.connectionName = requireNonNull(connectionName, "connectionName should not be null"); + this.comment = comment; + this.propertyList = requireNonNull(propertyList, "propertyList should not be null"); + this.isTemporary = isTemporary; + this.isSystem = isSystem; + this.ifNotExists = ifNotExists; + } + + @Override + public @Nonnull SqlOperator getOperator() { + return OPERATOR; + } + + @Override + public @Nonnull List getOperandList() { + return ImmutableNullableList.of(connectionName, comment, propertyList); + } + + public SqlIdentifier getConnectionName() { + return connectionName; + } + + public Optional getComment() { + return Optional.ofNullable(comment); + } + + public SqlNodeList getPropertyList() { + return propertyList; + } + + public boolean isTemporary() { + return isTemporary; + } + + public boolean isSystem() { + return isSystem; + } + + public boolean isIfNotExists() { + return ifNotExists; + } + + @Override + public void validate() throws SqlValidateException { + if (propertyList.isEmpty()) { + throw new SqlValidateException( + getParserPosition(), "Connection property list can not be empty."); + } + } + + @Override + public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { + writer.keyword("CREATE"); + if (isTemporary) { + writer.keyword("TEMPORARY"); + } + if (isSystem) { + writer.keyword("SYSTEM"); + } + writer.keyword("CONNECTION"); + if (isIfNotExists()) { + writer.keyword("IF NOT EXISTS"); + } + connectionName.unparse(writer, leftPrec, rightPrec); + + if (comment != null) { + writer.newlineAndIndent(); + writer.keyword("COMMENT"); + comment.unparse(writer, leftPrec, rightPrec); + } + + if (!this.propertyList.isEmpty()) { + writer.keyword("WITH"); + SqlWriter.Frame withFrame = writer.startList("(", ")"); + for (SqlNode connectionProperty : propertyList) { + SqlUnparseUtils.printIndent(writer); + connectionProperty.unparse(writer, leftPrec, rightPrec); + } + writer.newlineAndIndent(); + writer.endList(withFrame); + } + } + + public String[] fullConnectionName() { + return connectionName.names.toArray(new String[0]); + } +} diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropConnection.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropConnection.java new file mode 100644 index 0000000000000..1e8bba4fa615e --- /dev/null +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropConnection.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.parser.ddl; + +import org.apache.calcite.sql.SqlDrop; +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlOperator; +import org.apache.calcite.sql.SqlSpecialOperator; +import org.apache.calcite.sql.SqlWriter; +import org.apache.calcite.sql.parser.SqlParserPos; +import org.apache.calcite.util.ImmutableNullableList; + +import java.util.List; + +/** + * {@link SqlNode} to describe the DROP CONNECTION [IF EXISTS] [[catalogName.] + * dataBasesName].connectionName syntax. + */ +public class SqlDropConnection extends SqlDrop { + private static final SqlOperator OPERATOR = + new SqlSpecialOperator("DROP CONNECTION", SqlKind.OTHER_DDL); + + private SqlIdentifier connectionName; + private boolean ifExists; + private final boolean isTemporary; + private final boolean isSystemConnection; + + public SqlDropConnection( + SqlParserPos pos, + SqlIdentifier connectionName, + boolean ifExists, + boolean isTemporary, + boolean isSystemConnection) { + super(OPERATOR, pos, ifExists); + this.connectionName = connectionName; + this.ifExists = ifExists; + this.isTemporary = isTemporary; + this.isSystemConnection = isSystemConnection; + } + + @Override + public List getOperandList() { + return ImmutableNullableList.of(connectionName); + } + + public SqlIdentifier getConnectionName() { + return connectionName; + } + + public void setConnectionName(SqlIdentifier connectionName) { + this.connectionName = connectionName; + } + + public boolean getIfExists() { + return this.ifExists; + } + + public boolean getIsTemporary() { + return this.isTemporary; + } + + public boolean getIsSystemConnection() { + return this.isSystemConnection; + } + + public void setIfExists(boolean ifExists) { + this.ifExists = ifExists; + } + + @Override + public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { + writer.keyword("DROP"); + if (isTemporary) { + writer.keyword("TEMPORARY"); + } + if (isSystemConnection) { + writer.keyword("SYSTEM"); + } + writer.keyword("CONNECTION"); + if (ifExists) { + writer.keyword("IF EXISTS"); + } + connectionName.unparse(writer, leftPrec, rightPrec); + } + + public String[] fullConnectionName() { + return connectionName.names.toArray(new String[0]); + } +} diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlRichDescribeConnection.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlRichDescribeConnection.java new file mode 100644 index 0000000000000..9fb9bb9b79cfd --- /dev/null +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlRichDescribeConnection.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.parser.dql; + +import org.apache.calcite.sql.SqlCall; +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlOperator; +import org.apache.calcite.sql.SqlSpecialOperator; +import org.apache.calcite.sql.SqlWriter; +import org.apache.calcite.sql.parser.SqlParserPos; + +import java.util.Collections; +import java.util.List; + +/** + * DESCRIBE CONNECTION [EXTENDED] [[catalogName.] dataBasesName].sqlIdentifier sql call. Here we add + * Rich in className to follow the convention of {@link org.apache.calcite.sql.SqlDescribeTable}, + * which only had it to distinguish from calcite's original SqlDescribeTable, even though calcite + * does not have SqlDescribeConnection. + */ +public class SqlRichDescribeConnection extends SqlCall { + + public static final SqlSpecialOperator OPERATOR = + new SqlSpecialOperator("DESCRIBE CONNECTION", SqlKind.OTHER); + protected final SqlIdentifier connectionNameIdentifier; + private final boolean isExtended; + + public SqlRichDescribeConnection( + SqlParserPos pos, SqlIdentifier connectionNameIdentifier, boolean isExtended) { + super(pos); + this.connectionNameIdentifier = connectionNameIdentifier; + this.isExtended = isExtended; + } + + @Override + public SqlOperator getOperator() { + return OPERATOR; + } + + @Override + public List getOperandList() { + return Collections.singletonList(connectionNameIdentifier); + } + + public boolean isExtended() { + return isExtended; + } + + public String[] fullConnectionName() { + return connectionNameIdentifier.names.toArray(new String[0]); + } + + @Override + public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { + writer.keyword("DESCRIBE CONNECTION"); + if (isExtended) { + writer.keyword("EXTENDED"); + } + connectionNameIdentifier.unparse(writer, leftPrec, rightPrec); + } +} diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowConnections.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowConnections.java new file mode 100644 index 0000000000000..a0dd89f99e639 --- /dev/null +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowConnections.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.parser.dql; + +import org.apache.calcite.sql.SqlCharStringLiteral; +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlOperator; +import org.apache.calcite.sql.SqlSpecialOperator; +import org.apache.calcite.sql.parser.SqlParserPos; + +/** {@link SqlNode} to describe the SHOW CONNECTIONS syntax. */ +public class SqlShowConnections extends SqlShowCall { + + public static final SqlSpecialOperator OPERATOR = + new SqlSpecialOperator("SHOW CONNECTIONS", SqlKind.OTHER); + + public SqlShowConnections( + SqlParserPos pos, + String preposition, + SqlIdentifier databaseName, + boolean notLike, + SqlCharStringLiteral likeLiteral) { + // only LIKE currently supported for SHOW CONNECTIONS + super( + pos, + preposition, + databaseName, + likeLiteral == null ? null : "LIKE", + likeLiteral, + notLike); + } + + @Override + public SqlOperator getOperator() { + return OPERATOR; + } + + @Override + String getOperationName() { + return "SHOW CONNECTIONS"; + } +} diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowCreateConnection.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowCreateConnection.java new file mode 100644 index 0000000000000..0be99d9f7ebd4 --- /dev/null +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowCreateConnection.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.parser.dql; + +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlOperator; +import org.apache.calcite.sql.SqlSpecialOperator; +import org.apache.calcite.sql.SqlWriter; +import org.apache.calcite.sql.parser.SqlParserPos; + +import java.util.Collections; +import java.util.List; + +/** SHOW CREATE CONNECTION sql call. */ +public class SqlShowCreateConnection extends SqlShowCreate { + + public static final SqlSpecialOperator OPERATOR = + new SqlSpecialOperator("SHOW CREATE CONNECTION", SqlKind.OTHER_DDL); + + public SqlShowCreateConnection(SqlParserPos pos, SqlIdentifier connectionName) { + super(pos, connectionName); + } + + public SqlIdentifier getConnectionName() { + return sqlIdentifier; + } + + public String[] getFullConnectionName() { + return sqlIdentifier.names.toArray(new String[0]); + } + + @Override + public SqlOperator getOperator() { + return OPERATOR; + } + + @Override + public List getOperandList() { + return Collections.singletonList(sqlIdentifier); + } + + @Override + public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { + writer.keyword("SHOW CREATE CONNECTION"); + sqlIdentifier.unparse(writer, leftPrec, rightPrec); + } +} diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java index 9c29119ba9e08..3ef7419601895 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java @@ -34,6 +34,10 @@ public interface ParserResource { @Resources.BaseMessage("OVERWRITE expression is only used with INSERT statement.") Resources.ExInst overwriteIsOnlyUsedWithInsert(); + @Resources.BaseMessage( + "CREATE SYSTEM CONNECTION is not supported, system connection can only be registered as temporary connection, you can use CREATE TEMPORARY SYSTEM CONNECTION instead.") + Resources.ExInst createSystemConnectionOnlySupportTemporary(); + @Resources.BaseMessage( "CREATE SYSTEM FUNCTION is not supported, system functions can only be registered as temporary function, you can use CREATE TEMPORARY SYSTEM FUNCTION instead.") Resources.ExInst createSystemFunctionOnlySupportTemporary(); diff --git a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java index a5dbf34d45bac..aa8a44b0d0f9d 100644 --- a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java +++ b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java @@ -3483,6 +3483,209 @@ void testModelInFunctionNamedArgs() { + "FROM TABLE(`ML_PREDICT`(`INPUT` => (TABLE `MY_TABLE`), `MODEL` => (MODEL `MY_MODEL`)))"); } + // ===================================================================================== + // Connection DDL/DQL Tests + // ===================================================================================== + + @Test + void testCreateConnection() { + sql("create connection conn1\n" + + " COMMENT 'connection_comment'\n" + + " WITH (\n" + + " 'type'='basic',\n" + + " 'url'='http://example.com',\n" + + " 'username'='user1',\n" + + " 'password'='pass1'\n" + + " )\n") + .ok( + "CREATE CONNECTION `CONN1`\n" + + "COMMENT 'connection_comment' WITH (\n" + + " 'type' = 'basic',\n" + + " 'url' = 'http://example.com',\n" + + " 'username' = 'user1',\n" + + " 'password' = 'pass1'\n" + + ")"); + } + + @Test + void testCreateConnectionIfNotExists() { + sql("create connection if not exists conn1\n" + + " WITH (\n" + + " 'type'='bearer',\n" + + " 'token'='my_token'\n" + + " )\n") + .ok( + "CREATE CONNECTION IF NOT EXISTS `CONN1` WITH (\n" + + " 'type' = 'bearer',\n" + + " 'token' = 'my_token'\n" + + ")"); + } + + @Test + void testCreateTemporaryConnection() { + sql("create temporary connection conn1\n" + + " WITH (\n" + + " 'type'='oauth',\n" + + " 'client_id'='client1'\n" + + " )\n") + .ok( + "CREATE TEMPORARY CONNECTION `CONN1` WITH (\n" + + " 'type' = 'oauth',\n" + + " 'client_id' = 'client1'\n" + + ")"); + } + + @Test + void testCreateSystemConnection() { + sql("create ^system^ connection conn1\n" + + " WITH (\n" + + " 'type'='basic',\n" + + " 'url'='http://example.com'\n" + + " )\n") + .fails( + "(?s)CREATE SYSTEM CONNECTION is not supported, system connection can only be registered as temporary connection, you can use CREATE TEMPORARY SYSTEM CONNECTION instead\\..*"); + } + + @Test + void testCreateTemporarySystemConnection() { + sql("create temporary system connection conn1\n" + + " WITH (\n" + + " 'type'='custom_type',\n" + + " 'api_key'='key123'\n" + + " )\n") + .ok( + "CREATE TEMPORARY SYSTEM CONNECTION `CONN1` WITH (\n" + + " 'type' = 'custom_type',\n" + + " 'api_key' = 'key123'\n" + + ")"); + } + + @Test + void testCreateConnectionWithQualifiedName() { + sql("create connection catalog1.db1.conn1\n" + + " WITH ('type'='basic', 'url'='http://example.com')\n") + .ok( + "CREATE CONNECTION `CATALOG1`.`DB1`.`CONN1` WITH (\n" + + " 'type' = 'basic',\n" + + " 'url' = 'http://example.com'\n" + + ")"); + } + + @Test + void testDropConnection() { + sql("drop connection conn1").ok("DROP CONNECTION `CONN1`"); + sql("drop connection db1.conn1").ok("DROP CONNECTION `DB1`.`CONN1`"); + sql("drop connection catalog1.db1.conn1").ok("DROP CONNECTION `CATALOG1`.`DB1`.`CONN1`"); + } + + @Test + void testDropConnectionIfExists() { + sql("drop connection if exists catalog1.db1.conn1") + .ok("DROP CONNECTION IF EXISTS `CATALOG1`.`DB1`.`CONN1`"); + } + + @Test + void testDropTemporaryConnection() { + sql("drop temporary connection conn1").ok("DROP TEMPORARY CONNECTION `CONN1`"); + sql("drop temporary connection if exists conn1") + .ok("DROP TEMPORARY CONNECTION IF EXISTS `CONN1`"); + } + + @Test + void testDropTemporarySystemConnection() { + sql("drop temporary system connection conn1") + .ok("DROP TEMPORARY SYSTEM CONNECTION `CONN1`"); + sql("drop temporary system connection if exists conn1") + .ok("DROP TEMPORARY SYSTEM CONNECTION IF EXISTS `CONN1`"); + } + + @Test + void testAlterConnectionSet() { + final String sql = + "alter connection conn1 set ('password' = 'new_password','url' = 'http://new.com')"; + final String expected = + "ALTER CONNECTION `CONN1` SET (\n" + + " 'password' = 'new_password',\n" + + " 'url' = 'http://new.com'\n" + + ")"; + sql(sql).ok(expected); + } + + @Test + void testAlterConnectionSetWithQualifiedName() { + final String sql = "alter connection catalog1.db1.conn1 set ('token' = 'new_token')"; + final String expected = + "ALTER CONNECTION `CATALOG1`.`DB1`.`CONN1` SET (\n" + + " 'token' = 'new_token'\n" + + ")"; + sql(sql).ok(expected); + } + + @Test + void testAlterConnectionRename() { + final String sql = "alter connection conn1 rename to conn2"; + final String expected = "ALTER CONNECTION `CONN1` RENAME TO `CONN2`"; + sql(sql).ok(expected); + } + + @Test + void testAlterConnectionRenameWithQualifiedName() { + final String sql = "alter connection catalog1.db1.conn1 rename to conn2"; + final String expected = "ALTER CONNECTION `CATALOG1`.`DB1`.`CONN1` RENAME TO `CONN2`"; + sql(sql).ok(expected); + } + + @Test + void testAlterConnectionReset() { + final String sql = "alter connection conn1 reset ('password', 'url')"; + final String expected = "ALTER CONNECTION `CONN1` RESET (\n 'password',\n 'url'\n)"; + sql(sql).ok(expected); + } + + @Test + void testAlterConnectionResetWithQualifiedName() { + final String sql = "alter connection catalog1.db1.conn1 reset ('token')"; + final String expected = "ALTER CONNECTION `CATALOG1`.`DB1`.`CONN1` RESET (\n 'token'\n)"; + sql(sql).ok(expected); + } + + @Test + void testShowConnections() { + sql("show connections").ok("SHOW CONNECTIONS"); + sql("show connections from db1").ok("SHOW CONNECTIONS FROM `DB1`"); + sql("show connections from catalog1.db1").ok("SHOW CONNECTIONS FROM `CATALOG1`.`DB1`"); + sql("show connections in db1").ok("SHOW CONNECTIONS IN `DB1`"); + sql("show connections in catalog1.db1").ok("SHOW CONNECTIONS IN `CATALOG1`.`DB1`"); + } + + @Test + void testShowConnectionsLike() { + sql("show connections like '%conn%'").ok("SHOW CONNECTIONS LIKE '%CONN%'"); + sql("show connections from db1 like 'my_%'").ok("SHOW CONNECTIONS FROM `DB1` LIKE 'MY_%'"); + sql("show connections not like 'temp_%'").ok("SHOW CONNECTIONS NOT LIKE 'TEMP_%'"); + } + + @Test + void testShowCreateConnection() { + sql("show create connection conn1").ok("SHOW CREATE CONNECTION `CONN1`"); + sql("show create connection catalog1.db1.conn1") + .ok("SHOW CREATE CONNECTION `CATALOG1`.`DB1`.`CONN1`"); + } + + @Test + void testDescribeConnection() { + sql("describe connection conn1").ok("DESCRIBE CONNECTION `CONN1`"); + sql("describe connection catalog1.db1.conn1") + .ok("DESCRIBE CONNECTION `CATALOG1`.`DB1`.`CONN1`"); + sql("describe connection extended conn1").ok("DESCRIBE CONNECTION EXTENDED `CONN1`"); + + sql("desc connection conn1").ok("DESCRIBE CONNECTION `CONN1`"); + sql("desc connection catalog1.db1.conn1") + .ok("DESCRIBE CONNECTION `CATALOG1`.`DB1`.`CONN1`"); + sql("desc connection extended catalog1.db1.conn1") + .ok("DESCRIBE CONNECTION EXTENDED `CATALOG1`.`DB1`.`CONN1`"); + } + /* * This test was backported from Calcite 1.38 (CALCITE-6266). * Remove it together with upgrade to Calcite 1.38. From 2613c65ee9c759bc2be44884980065f0a497b659 Mon Sep 17 00:00:00 2001 From: Hao Li <1127478+lihaosky@users.noreply.github.com> Date: Tue, 18 Nov 2025 16:57:31 -0800 Subject: [PATCH 2/6] fix --- flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd | 1 + 1 file changed, 1 insertion(+) diff --git a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd index ef2c129573755..6daa33af71f74 100644 --- a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd +++ b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd @@ -293,6 +293,7 @@ "COMPILE" "CONDITIONAL" "CONDITION_NUMBER" + "CONNECTION" "CONNECTION_NAME" "CONSTRAINT_CATALOG" "CONSTRAINT_NAME" From 7dc3582ef1962aaae7b2bab519f7d303be0aba7a Mon Sep 17 00:00:00 2001 From: Hao Li <1127478+lihaosky@users.noreply.github.com> Date: Thu, 20 Nov 2025 11:59:49 -0800 Subject: [PATCH 3/6] comments --- .../src/main/codegen/includes/parserImpls.ftl | 19 ++++++++++++++----- .../sql/parser/ddl/SqlCreateConnection.java | 9 --------- .../flink/sql/parser/ddl/SqlCreateModel.java | 12 ++++++------ .../sql/parser/ddl/SqlDropConnection.java | 6 ------ .../flink/sql/parser/ddl/SqlDropTable.java | 4 ---- .../sql/parser/utils/ParserResource.java | 8 ++++++-- .../sql/parser/FlinkSqlParserImplTest.java | 17 +++++++++++++++-- 7 files changed, 41 insertions(+), 34 deletions(-) diff --git a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl index febb849cc68db..9535b7f561c5e 100644 --- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl +++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl @@ -794,7 +794,7 @@ SqlShowCreate SqlShowCreate() : } /** - * DESCRIBE | DESC FUNCTION [ EXTENDED] [[catalogName.] dataBasesName].functionName sql call. + * (DESCRIBE | DESC) FUNCTION [ EXTENDED] [[catalogName.] dataBasesName].functionName sql call. * Here we add Rich in className to match the naming of SqlRichDescribeTable. */ SqlRichDescribeFunction SqlRichDescribeFunction() : @@ -813,7 +813,7 @@ SqlRichDescribeFunction SqlRichDescribeFunction() : } /** - * DESCRIBE | DESC MODEL [ EXTENDED] [[catalogName.] dataBasesName].modelName sql call. + * (DESCRIBE | DESC) MODEL [ EXTENDED] [[catalogName.] dataBasesName].modelName sql call. * Here we add Rich in className to match the naming of SqlRichDescribeTable. */ SqlRichDescribeModel SqlRichDescribeModel() : @@ -832,7 +832,7 @@ SqlRichDescribeModel SqlRichDescribeModel() : } /** - * DESCRIBE | DESC CONNECTION [ EXTENDED] [[catalogName.] dataBasesName].connectionName sql call. + * (DESCRIBE | DESC) CONNECTION [ EXTENDED] [[catalogName.] dataBasesName].connectionName sql call. * Here we add Rich in className to match the naming of SqlRichDescribeTable. */ SqlRichDescribeConnection SqlRichDescribeConnection() : @@ -851,7 +851,7 @@ SqlRichDescribeConnection SqlRichDescribeConnection() : } /** - * DESCRIBE | DESC [ EXTENDED] [[catalogName.] dataBasesName].tableName sql call. + * (DESCRIBE | DESC) [ EXTENDED] [[catalogName.] dataBasesName].tableName sql call. * Here we add Rich in className to distinguish from calcite's original SqlDescribeTable. */ SqlRichDescribeTable SqlRichDescribeTable() : @@ -3562,7 +3562,16 @@ SqlDrop SqlDropConnection(Span s, boolean isTemporary) : boolean isSystemConnection = false; } { - [ { isSystemConnection = true; } ] + [ + + { + if (!isTemporary){ + throw SqlUtil.newContextException(getPos(), + ParserResource.RESOURCE.dropSystemConnectionOnlySupportTemporary()); + } + isSystemConnection = true; + } + ] diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateConnection.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateConnection.java index 6fbe77104eddd..c88e39cff9853 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateConnection.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateConnection.java @@ -28,7 +28,6 @@ import org.apache.calcite.sql.SqlKind; import org.apache.calcite.sql.SqlNode; import org.apache.calcite.sql.SqlNodeList; -import org.apache.calcite.sql.SqlOperator; import org.apache.calcite.sql.SqlSpecialOperator; import org.apache.calcite.sql.SqlWriter; import org.apache.calcite.sql.parser.SqlParserPos; @@ -62,8 +61,6 @@ public class SqlCreateConnection extends SqlCreate implements ExtendedSqlNode { private final boolean isSystem; - private final boolean ifNotExists; - public SqlCreateConnection( SqlParserPos pos, SqlIdentifier connectionName, @@ -78,12 +75,6 @@ public SqlCreateConnection( this.propertyList = requireNonNull(propertyList, "propertyList should not be null"); this.isTemporary = isTemporary; this.isSystem = isSystem; - this.ifNotExists = ifNotExists; - } - - @Override - public @Nonnull SqlOperator getOperator() { - return OPERATOR; } @Override diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateModel.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateModel.java index a588f5d04edf9..e226dd2444947 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateModel.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateModel.java @@ -18,9 +18,7 @@ package org.apache.flink.sql.parser.ddl; -import org.apache.flink.sql.parser.ExtendedSqlNode; -import org.apache.flink.sql.parser.SqlUnparseUtils; -import org.apache.flink.sql.parser.error.SqlValidateException; +import static java.util.Objects.requireNonNull; import org.apache.calcite.sql.SqlCharStringLiteral; import org.apache.calcite.sql.SqlIdentifier; @@ -31,12 +29,13 @@ import org.apache.calcite.sql.SqlWriter; import org.apache.calcite.sql.parser.SqlParserPos; import org.apache.calcite.util.ImmutableNullableList; - -import javax.annotation.Nonnull; +import org.apache.flink.sql.parser.ExtendedSqlNode; +import org.apache.flink.sql.parser.SqlUnparseUtils; +import org.apache.flink.sql.parser.error.SqlValidateException; import java.util.List; -import static java.util.Objects.requireNonNull; +import javax.annotation.Nonnull; /** * {@link SqlNode} to describe the CREATE MODEL syntax. CREATE MODEL [IF NOT EXISTS] [[catalogName.] @@ -50,6 +49,7 @@ public class SqlCreateModel extends SqlCreateObject implements ExtendedSqlNode { private final SqlNodeList inputColumnList; private final SqlNodeList outputColumnList; + public SqlCreateModel( SqlParserPos pos, SqlIdentifier modelName, diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropConnection.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropConnection.java index 1e8bba4fa615e..0e367f7b2caee 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropConnection.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropConnection.java @@ -39,7 +39,6 @@ public class SqlDropConnection extends SqlDrop { new SqlSpecialOperator("DROP CONNECTION", SqlKind.OTHER_DDL); private SqlIdentifier connectionName; - private boolean ifExists; private final boolean isTemporary; private final boolean isSystemConnection; @@ -51,7 +50,6 @@ public SqlDropConnection( boolean isSystemConnection) { super(OPERATOR, pos, ifExists); this.connectionName = connectionName; - this.ifExists = ifExists; this.isTemporary = isTemporary; this.isSystemConnection = isSystemConnection; } @@ -81,10 +79,6 @@ public boolean getIsSystemConnection() { return this.isSystemConnection; } - public void setIfExists(boolean ifExists) { - this.ifExists = ifExists; - } - @Override public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { writer.keyword("DROP"); diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropTable.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropTable.java index bb26e37b38d0c..54000ee04df7d 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropTable.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropTable.java @@ -38,10 +38,6 @@ public SqlDropTable( this.isTemporary = isTemporary; } - public boolean isTemporary() { - return this.isTemporary; - } - @Override public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { writer.keyword("DROP"); diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java index 3ef7419601895..57e2e55c17d76 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java @@ -35,11 +35,15 @@ public interface ParserResource { Resources.ExInst overwriteIsOnlyUsedWithInsert(); @Resources.BaseMessage( - "CREATE SYSTEM CONNECTION is not supported, system connection can only be registered as temporary connection, you can use CREATE TEMPORARY SYSTEM CONNECTION instead.") + "CREATE SYSTEM CONNECTION is not supported, system connections can only be registered as temporary connections, you can use CREATE TEMPORARY SYSTEM CONNECTION instead.") Resources.ExInst createSystemConnectionOnlySupportTemporary(); @Resources.BaseMessage( - "CREATE SYSTEM FUNCTION is not supported, system functions can only be registered as temporary function, you can use CREATE TEMPORARY SYSTEM FUNCTION instead.") + "DROP SYSTEM CONNECTION is not supported, system connections can only be dropped as temporary connections, you can use DROP TEMPORARY SYSTEM CONNECTION instead.") + Resources.ExInst dropSystemConnectionOnlySupportTemporary(); + + @Resources.BaseMessage( + "CREATE SYSTEM FUNCTION is not supported, system functions can only be registered as temporary functions, you can use CREATE TEMPORARY SYSTEM FUNCTION instead.") Resources.ExInst createSystemFunctionOnlySupportTemporary(); @Resources.BaseMessage("Duplicate EXPLAIN DETAIL is not allowed.") diff --git a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java index aa8a44b0d0f9d..a4a4785fa0c67 100644 --- a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java +++ b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java @@ -2482,7 +2482,7 @@ void testCreateFunction() { .fails( "CREATE SYSTEM FUNCTION is not supported, " + "system functions can only be registered as temporary " - + "function, you can use CREATE TEMPORARY SYSTEM FUNCTION instead."); + + "functions, you can use CREATE TEMPORARY SYSTEM FUNCTION instead."); // test create function using jar sql("create temporary function function1 as 'org.apache.flink.function.function1' language java using jar 'file:///path/to/test.jar'") @@ -3543,7 +3543,10 @@ void testCreateSystemConnection() { + " 'url'='http://example.com'\n" + " )\n") .fails( - "(?s)CREATE SYSTEM CONNECTION is not supported, system connection can only be registered as temporary connection, you can use CREATE TEMPORARY SYSTEM CONNECTION instead\\..*"); + "(?s)CREATE SYSTEM CONNECTION is not supported, " + + "system connections can only be registered as temporary " + + "connections, you can use CREATE TEMPORARY SYSTEM CONNECTION " + + "instead\\..*"); } @Test @@ -3599,6 +3602,16 @@ void testDropTemporarySystemConnection() { .ok("DROP TEMPORARY SYSTEM CONNECTION IF EXISTS `CONN1`"); } + @Test + void testDropSystemConnection() { + sql("drop ^system^ connection conn1") + .fails( + "(?s)DROP SYSTEM CONNECTION is not supported, " + + "system connections can only be dropped as temporary " + + "connections, you can use DROP TEMPORARY SYSTEM CONNECTION " + + "instead\\..*"); + } + @Test void testAlterConnectionSet() { final String sql = From c2336ec21d786d088e72d010f154373672ffc526 Mon Sep 17 00:00:00 2001 From: Hao Li <1127478+lihaosky@users.noreply.github.com> Date: Fri, 21 Nov 2025 23:22:46 -0800 Subject: [PATCH 4/6] comment --- .../src/main/codegen/includes/parserImpls.ftl | 13 ++-- .../sql/parser/ddl/SqlAlterConnection.java | 46 ++++++------ .../parser/ddl/SqlAlterConnectionRename.java | 18 +++-- .../parser/ddl/SqlAlterConnectionReset.java | 32 ++++----- .../sql/parser/ddl/SqlAlterConnectionSet.java | 31 ++++---- .../sql/parser/ddl/SqlCreateConnection.java | 72 ++++++------------- .../flink/sql/parser/ddl/SqlCreateModel.java | 12 ++-- .../sql/parser/ddl/SqlDropConnection.java | 46 +++--------- .../flink/sql/parser/ddl/SqlDropModel.java | 7 +- .../flink/sql/parser/ddl/SqlDropTable.java | 4 ++ .../sql/parser/FlinkSqlParserImplTest.java | 33 +++++++++ .../converters/SqlDropModelConverter.java | 2 +- 12 files changed, 151 insertions(+), 165 deletions(-) diff --git a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl index 9535b7f561c5e..7be96330ccd12 100644 --- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl +++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl @@ -3485,14 +3485,15 @@ SqlAlterModel SqlAlterModel() : } /** -* ALTER CONNECTION connectionName SET (property_key = property_val, ...) -* ALTER CONNECTION connectionName RENAME TO newConnectionName -* ALTER CONNECTION connectionName RESET (property_key, ...) +* ALTER CONNECTION [IF EXISTS] connectionName SET (property_key = property_val, ...) +* ALTER CONNECTION [IF EXISTS] connectionName RENAME TO newConnectionName +* ALTER CONNECTION [IF EXISTS] connectionName RESET (property_key, ...) * Alter temporary or system connection is not supported. */ SqlAlterConnection SqlAlterConnection() : { SqlParserPos startPos; + boolean ifExists = false; SqlIdentifier connectionIdentifier; SqlIdentifier newConnectionIdentifier = null; SqlNodeList propertyList = SqlNodeList.EMPTY; @@ -3500,6 +3501,7 @@ SqlAlterConnection SqlAlterConnection() : } { { startPos = getPos(); } + ifExists = IfExistsOpt() connectionIdentifier = CompoundIdentifier() ( LOOKAHEAD(2) @@ -3509,7 +3511,8 @@ SqlAlterConnection SqlAlterConnection() : return new SqlAlterConnectionRename( startPos.plus(getPos()), connectionIdentifier, - newConnectionIdentifier); + newConnectionIdentifier, + ifExists); } | @@ -3518,6 +3521,7 @@ SqlAlterConnection SqlAlterConnection() : return new SqlAlterConnectionSet( startPos.plus(getPos()), connectionIdentifier, + ifExists, propertyList); } | @@ -3527,6 +3531,7 @@ SqlAlterConnection SqlAlterConnection() : return new SqlAlterConnectionReset( startPos.plus(getPos()), connectionIdentifier, + ifExists, propertyKeyList); } ) diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterConnection.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterConnection.java index d7bde1731016a..d8c490c547a2c 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterConnection.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterConnection.java @@ -18,49 +18,43 @@ package org.apache.flink.sql.parser.ddl; - -import static java.util.Objects.requireNonNull; - -import org.apache.calcite.sql.SqlCall; import org.apache.calcite.sql.SqlIdentifier; import org.apache.calcite.sql.SqlKind; -import org.apache.calcite.sql.SqlOperator; import org.apache.calcite.sql.SqlSpecialOperator; import org.apache.calcite.sql.SqlWriter; import org.apache.calcite.sql.parser.SqlParserPos; /** - * Abstract class to describe statements like ALTER CONNECTION [[catalogName.] + * Abstract class to describe statements like ALTER CONNECTION [IF EXISTS] [[catalogName.] * dataBasesName.]connectionName ... */ -public abstract class SqlAlterConnection extends SqlCall { +public abstract class SqlAlterConnection extends SqlAlterObject { - public static final SqlSpecialOperator OPERATOR = + private static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator("ALTER CONNECTION", SqlKind.OTHER_DDL); - protected final SqlIdentifier connectionName; - - public SqlAlterConnection(SqlParserPos pos, SqlIdentifier connectionName) { - super(pos); - this.connectionName = requireNonNull(connectionName, "connectionName should not be null"); - } - - @Override - public SqlOperator getOperator() { - return OPERATOR; - } + protected final boolean ifConnectionExists; - public SqlIdentifier getConnectionName() { - return connectionName; + public SqlAlterConnection( + SqlParserPos pos, SqlIdentifier connectionName, boolean ifConnectionExists) { + super(OPERATOR, pos, "CONNECTION", connectionName); + this.ifConnectionExists = ifConnectionExists; } - public String[] fullConnectionName() { - return connectionName.names.toArray(new String[0]); + /** + * Whether to ignore the error if the connection doesn't exist. + * + * @return true when IF EXISTS is specified. + */ + public boolean ifConnectionExists() { + return ifConnectionExists; } @Override - public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { - writer.keyword("ALTER CONNECTION"); - connectionName.unparse(writer, leftPrec, rightPrec); + public void unparseAlterOperation(SqlWriter writer, int leftPrec, int rightPrec) { + if (ifConnectionExists) { + writer.keyword("IF EXISTS"); + } + name.unparse(writer, leftPrec, rightPrec); } } diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterConnectionRename.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterConnectionRename.java index d40f3d16d1538..5f0a4827ecca6 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterConnectionRename.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterConnectionRename.java @@ -27,14 +27,20 @@ import static java.util.Objects.requireNonNull; -/** ALTER CONNECTION [[catalogName.] dataBasesName.]connectionName RENAME TO newConnectionName. */ +/** + * ALTER CONNECTION [IF EXISTS] [[catalogName.] dataBasesName.]connectionName RENAME TO + * newConnectionName. + */ public class SqlAlterConnectionRename extends SqlAlterConnection { private final SqlIdentifier newConnectionName; public SqlAlterConnectionRename( - SqlParserPos pos, SqlIdentifier connectionName, SqlIdentifier newConnectionName) { - super(pos, connectionName); + SqlParserPos pos, + SqlIdentifier connectionName, + SqlIdentifier newConnectionName, + boolean ifConnectionExists) { + super(pos, connectionName, ifConnectionExists); this.newConnectionName = requireNonNull(newConnectionName, "newConnectionName should not be null"); } @@ -49,12 +55,12 @@ public String[] fullNewConnectionName() { @Override public List getOperandList() { - return List.of(connectionName, newConnectionName); + return List.of(name, newConnectionName); } @Override - public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { - super.unparse(writer, leftPrec, rightPrec); + public void unparseAlterOperation(SqlWriter writer, int leftPrec, int rightPrec) { + super.unparseAlterOperation(writer, leftPrec, rightPrec); writer.keyword("RENAME TO"); newConnectionName.unparse(writer, leftPrec, rightPrec); } diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterConnectionReset.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterConnectionReset.java index 2a9cee0285dd0..f400f299976c2 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterConnectionReset.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterConnectionReset.java @@ -18,15 +18,14 @@ package org.apache.flink.sql.parser.ddl; +import org.apache.flink.sql.parser.SqlParseUtils; import org.apache.flink.sql.parser.SqlUnparseUtils; import org.apache.calcite.sql.SqlIdentifier; -import org.apache.calcite.sql.SqlLiteral; import org.apache.calcite.sql.SqlNode; import org.apache.calcite.sql.SqlNodeList; import org.apache.calcite.sql.SqlWriter; import org.apache.calcite.sql.parser.SqlParserPos; -import org.apache.calcite.util.NlsString; import java.util.List; import java.util.Set; @@ -34,37 +33,36 @@ import static java.util.Objects.requireNonNull; -/** ALTER CONNECTION [[catalogName.] dataBasesName.]connectionName RESET ( 'key1' [, 'key2']...). */ +/** + * ALTER CONNECTION [IF EXISTS] [[catalogName.] dataBasesName.]connectionName RESET ( 'key1' [, + * 'key2']...). + */ public class SqlAlterConnectionReset extends SqlAlterConnection { private final SqlNodeList optionKeyList; public SqlAlterConnectionReset( - SqlParserPos pos, SqlIdentifier connectionName, SqlNodeList optionKeyList) { - super(pos, connectionName); + SqlParserPos pos, + SqlIdentifier connectionName, + boolean ifConnectionExists, + SqlNodeList optionKeyList) { + super(pos, connectionName, ifConnectionExists); this.optionKeyList = requireNonNull(optionKeyList, "optionKeyList should not be null"); } @Override public List getOperandList() { - return List.of(connectionName, optionKeyList); + return List.of(name, optionKeyList); } public Set getResetKeys() { return optionKeyList.getList().stream() - .map(key -> ((NlsString) SqlLiteral.value(key)).getValue()) + .map(SqlParseUtils::extractString) .collect(Collectors.toSet()); } @Override - public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { - super.unparse(writer, leftPrec, rightPrec); - writer.keyword("RESET"); - SqlWriter.Frame withFrame = writer.startList("(", ")"); - for (SqlNode optionKey : optionKeyList) { - SqlUnparseUtils.printIndent(writer); - optionKey.unparse(writer, leftPrec, rightPrec); - } - writer.newlineAndIndent(); - writer.endList(withFrame); + public void unparseAlterOperation(SqlWriter writer, int leftPrec, int rightPrec) { + super.unparseAlterOperation(writer, leftPrec, rightPrec); + SqlUnparseUtils.unparseResetOptions(optionKeyList, writer, leftPrec, rightPrec); } } diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterConnectionSet.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterConnectionSet.java index 0d4e7e9714112..c738d9a753094 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterConnectionSet.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterConnectionSet.java @@ -18,6 +18,7 @@ package org.apache.flink.sql.parser.ddl; +import org.apache.flink.sql.parser.SqlParseUtils; import org.apache.flink.sql.parser.SqlUnparseUtils; import org.apache.calcite.sql.SqlIdentifier; @@ -27,42 +28,40 @@ import org.apache.calcite.sql.parser.SqlParserPos; import java.util.List; +import java.util.Map; import static java.util.Objects.requireNonNull; /** - * ALTER CONNECTION [[catalogName.] dataBasesName.]connectionName SET ( name=value [, name=value]*). + * ALTER CONNECTION [IF EXISTS] [[catalogName.] dataBasesName.]connectionName SET ( name=value [, + * name=value]*). */ public class SqlAlterConnectionSet extends SqlAlterConnection { private final SqlNodeList connectionOptionList; public SqlAlterConnectionSet( - SqlParserPos pos, SqlIdentifier connectionName, SqlNodeList connectionOptionList) { - super(pos, connectionName); + SqlParserPos pos, + SqlIdentifier connectionName, + boolean ifConnectionExists, + SqlNodeList connectionOptionList) { + super(pos, connectionName, ifConnectionExists); this.connectionOptionList = requireNonNull(connectionOptionList, "connectionOptionList should not be null"); } - public SqlNodeList getOptionList() { - return connectionOptionList; + public Map getProperties() { + return SqlParseUtils.extractMap(connectionOptionList); } @Override public List getOperandList() { - return List.of(connectionName, connectionOptionList); + return List.of(name, connectionOptionList); } @Override - public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { - super.unparse(writer, leftPrec, rightPrec); - writer.keyword("SET"); - SqlWriter.Frame withFrame = writer.startList("(", ")"); - for (SqlNode connectionOption : connectionOptionList) { - SqlUnparseUtils.printIndent(writer); - connectionOption.unparse(writer, leftPrec, rightPrec); - } - writer.newlineAndIndent(); - writer.endList(withFrame); + public void unparseAlterOperation(SqlWriter writer, int leftPrec, int rightPrec) { + super.unparseAlterOperation(writer, leftPrec, rightPrec); + SqlUnparseUtils.unparseSetOptions(connectionOptionList, writer, leftPrec, rightPrec); } } diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateConnection.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateConnection.java index c88e39cff9853..a7cc767e401fe 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateConnection.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateConnection.java @@ -23,7 +23,6 @@ import org.apache.flink.sql.parser.error.SqlValidateException; import org.apache.calcite.sql.SqlCharStringLiteral; -import org.apache.calcite.sql.SqlCreate; import org.apache.calcite.sql.SqlIdentifier; import org.apache.calcite.sql.SqlKind; import org.apache.calcite.sql.SqlNode; @@ -34,31 +33,19 @@ import org.apache.calcite.util.ImmutableNullableList; import javax.annotation.Nonnull; -import javax.annotation.Nullable; import java.util.List; -import java.util.Optional; - -import static java.util.Objects.requireNonNull; /** * {@link SqlNode} to describe the CREATE CONNECTION syntax. CREATE [TEMPORARY] [SYSTEM] CONNECTION * [IF NOT EXISTS] [[catalogName.] dataBasesName].connectionName [COMMENT connection_comment] WITH * (name=value, [name=value]*). */ -public class SqlCreateConnection extends SqlCreate implements ExtendedSqlNode { +public class SqlCreateConnection extends SqlCreateObject implements ExtendedSqlNode { - public static final SqlSpecialOperator OPERATOR = + private static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator("CREATE CONNECTION", SqlKind.OTHER_DDL); - private final SqlIdentifier connectionName; - - @Nullable private final SqlCharStringLiteral comment; - - private final SqlNodeList propertyList; - - private final boolean isTemporary; - private final boolean isSystem; public SqlCreateConnection( @@ -69,55 +56,44 @@ public SqlCreateConnection( boolean isTemporary, boolean isSystem, boolean ifNotExists) { - super(OPERATOR, pos, false, ifNotExists); - this.connectionName = requireNonNull(connectionName, "connectionName should not be null"); - this.comment = comment; - this.propertyList = requireNonNull(propertyList, "propertyList should not be null"); - this.isTemporary = isTemporary; + super( + OPERATOR, + pos, + connectionName, + isTemporary, + false, + ifNotExists, + propertyList, + comment); this.isSystem = isSystem; } @Override public @Nonnull List getOperandList() { - return ImmutableNullableList.of(connectionName, comment, propertyList); - } - - public SqlIdentifier getConnectionName() { - return connectionName; - } - - public Optional getComment() { - return Optional.ofNullable(comment); - } - - public SqlNodeList getPropertyList() { - return propertyList; - } - - public boolean isTemporary() { - return isTemporary; + return ImmutableNullableList.of(name, comment, properties); } public boolean isSystem() { return isSystem; } - public boolean isIfNotExists() { - return ifNotExists; - } - @Override public void validate() throws SqlValidateException { - if (propertyList.isEmpty()) { + if (properties == null || properties.isEmpty()) { throw new SqlValidateException( getParserPosition(), "Connection property list can not be empty."); } } + @Override + protected String getScope() { + return "CONNECTION"; + } + @Override public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { writer.keyword("CREATE"); - if (isTemporary) { + if (isTemporary()) { writer.keyword("TEMPORARY"); } if (isSystem) { @@ -127,7 +103,7 @@ public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { if (isIfNotExists()) { writer.keyword("IF NOT EXISTS"); } - connectionName.unparse(writer, leftPrec, rightPrec); + name.unparse(writer, leftPrec, rightPrec); if (comment != null) { writer.newlineAndIndent(); @@ -135,10 +111,10 @@ public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { comment.unparse(writer, leftPrec, rightPrec); } - if (!this.propertyList.isEmpty()) { + if (properties != null && !properties.isEmpty()) { writer.keyword("WITH"); SqlWriter.Frame withFrame = writer.startList("(", ")"); - for (SqlNode connectionProperty : propertyList) { + for (SqlNode connectionProperty : properties) { SqlUnparseUtils.printIndent(writer); connectionProperty.unparse(writer, leftPrec, rightPrec); } @@ -146,8 +122,4 @@ public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { writer.endList(withFrame); } } - - public String[] fullConnectionName() { - return connectionName.names.toArray(new String[0]); - } } diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateModel.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateModel.java index e226dd2444947..a588f5d04edf9 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateModel.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateModel.java @@ -18,7 +18,9 @@ package org.apache.flink.sql.parser.ddl; -import static java.util.Objects.requireNonNull; +import org.apache.flink.sql.parser.ExtendedSqlNode; +import org.apache.flink.sql.parser.SqlUnparseUtils; +import org.apache.flink.sql.parser.error.SqlValidateException; import org.apache.calcite.sql.SqlCharStringLiteral; import org.apache.calcite.sql.SqlIdentifier; @@ -29,13 +31,12 @@ import org.apache.calcite.sql.SqlWriter; import org.apache.calcite.sql.parser.SqlParserPos; import org.apache.calcite.util.ImmutableNullableList; -import org.apache.flink.sql.parser.ExtendedSqlNode; -import org.apache.flink.sql.parser.SqlUnparseUtils; -import org.apache.flink.sql.parser.error.SqlValidateException; + +import javax.annotation.Nonnull; import java.util.List; -import javax.annotation.Nonnull; +import static java.util.Objects.requireNonNull; /** * {@link SqlNode} to describe the CREATE MODEL syntax. CREATE MODEL [IF NOT EXISTS] [[catalogName.] @@ -49,7 +50,6 @@ public class SqlCreateModel extends SqlCreateObject implements ExtendedSqlNode { private final SqlNodeList inputColumnList; private final SqlNodeList outputColumnList; - public SqlCreateModel( SqlParserPos pos, SqlIdentifier modelName, diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropConnection.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropConnection.java index 0e367f7b2caee..3af65a4ba119c 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropConnection.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropConnection.java @@ -18,27 +18,21 @@ package org.apache.flink.sql.parser.ddl; -import org.apache.calcite.sql.SqlDrop; import org.apache.calcite.sql.SqlIdentifier; import org.apache.calcite.sql.SqlKind; -import org.apache.calcite.sql.SqlNode; import org.apache.calcite.sql.SqlOperator; import org.apache.calcite.sql.SqlSpecialOperator; import org.apache.calcite.sql.SqlWriter; import org.apache.calcite.sql.parser.SqlParserPos; -import org.apache.calcite.util.ImmutableNullableList; - -import java.util.List; /** - * {@link SqlNode} to describe the DROP CONNECTION [IF EXISTS] [[catalogName.] - * dataBasesName].connectionName syntax. + * {@link org.apache.calcite.sql.SqlNode} to describe the DROP CONNECTION [IF EXISTS] + * [[catalogName.] dataBasesName].connectionName syntax. */ -public class SqlDropConnection extends SqlDrop { +public class SqlDropConnection extends SqlDropObject { private static final SqlOperator OPERATOR = new SqlSpecialOperator("DROP CONNECTION", SqlKind.OTHER_DDL); - private SqlIdentifier connectionName; private final boolean isTemporary; private final boolean isSystemConnection; @@ -48,35 +42,17 @@ public SqlDropConnection( boolean ifExists, boolean isTemporary, boolean isSystemConnection) { - super(OPERATOR, pos, ifExists); - this.connectionName = connectionName; + super(OPERATOR, pos, connectionName, ifExists); this.isTemporary = isTemporary; this.isSystemConnection = isSystemConnection; } - @Override - public List getOperandList() { - return ImmutableNullableList.of(connectionName); - } - - public SqlIdentifier getConnectionName() { - return connectionName; - } - - public void setConnectionName(SqlIdentifier connectionName) { - this.connectionName = connectionName; + public boolean isTemporary() { + return isTemporary; } - public boolean getIfExists() { - return this.ifExists; - } - - public boolean getIsTemporary() { - return this.isTemporary; - } - - public boolean getIsSystemConnection() { - return this.isSystemConnection; + public boolean isSystemConnection() { + return isSystemConnection; } @Override @@ -92,10 +68,6 @@ public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { if (ifExists) { writer.keyword("IF EXISTS"); } - connectionName.unparse(writer, leftPrec, rightPrec); - } - - public String[] fullConnectionName() { - return connectionName.names.toArray(new String[0]); + name.unparse(writer, leftPrec, rightPrec); } } diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropModel.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropModel.java index 53b6417f7f5d9..8aeb082a54f24 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropModel.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropModel.java @@ -42,13 +42,16 @@ public SqlDropModel( this.isTemporary = isTemporary; } - public boolean getIsTemporary() { - return this.isTemporary; + public boolean isTemporary() { + return isTemporary; } @Override public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { writer.keyword("DROP"); + if (isTemporary) { + writer.keyword("TEMPORARY"); + } writer.keyword("MODEL"); if (ifExists) { writer.keyword("IF EXISTS"); diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropTable.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropTable.java index 54000ee04df7d..bb26e37b38d0c 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropTable.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropTable.java @@ -38,6 +38,10 @@ public SqlDropTable( this.isTemporary = isTemporary; } + public boolean isTemporary() { + return this.isTemporary; + } + @Override public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { writer.keyword("DROP"); diff --git a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java index a4a4785fa0c67..0b7ec6251bb5e 100644 --- a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java +++ b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java @@ -3265,6 +3265,12 @@ void testDropModel() { sql("drop model catalog1.db1.m1").ok("DROP MODEL `CATALOG1`.`DB1`.`M1`"); } + @Test + void testDropTemporaryModel() { + sql("drop temporary model m1").ok("DROP TEMPORARY MODEL `M1`"); + sql("drop temporary model if exists m1").ok("DROP TEMPORARY MODEL IF EXISTS `M1`"); + } + @Test void testDropModelIfExists() { sql("drop model if exists catalog1.db1.m1") @@ -3662,6 +3668,33 @@ void testAlterConnectionResetWithQualifiedName() { sql(sql).ok(expected); } + @Test + void testAlterConnectionIfExists() { + final String sql = + "alter connection if exists conn1 set ('password' = 'new_password','url' = 'http://new.com')"; + final String expected = + "ALTER CONNECTION IF EXISTS `CONN1` SET (\n" + + " 'password' = 'new_password',\n" + + " 'url' = 'http://new.com'\n" + + ")"; + sql(sql).ok(expected); + } + + @Test + void testAlterConnectionRenameIfExists() { + final String sql = "alter connection if exists conn1 rename to conn2"; + final String expected = "ALTER CONNECTION IF EXISTS `CONN1` RENAME TO `CONN2`"; + sql(sql).ok(expected); + } + + @Test + void testAlterConnectionResetIfExists() { + final String sql = "alter connection if exists conn1 reset ('password', 'url')"; + final String expected = + "ALTER CONNECTION IF EXISTS `CONN1` RESET (\n 'password',\n 'url'\n)"; + sql(sql).ok(expected); + } + @Test void testShowConnections() { sql("show connections").ok("SHOW CONNECTIONS"); diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlDropModelConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlDropModelConverter.java index 90a45c5a4254c..4e0d85c6dbce7 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlDropModelConverter.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlDropModelConverter.java @@ -35,6 +35,6 @@ public Operation convertSqlNode(SqlDropModel sqlDropModel, ConvertContext contex context.getCatalogManager().qualifyIdentifier(unresolvedIdentifier); return new DropModelOperation( - identifier, sqlDropModel.getIfExists(), sqlDropModel.getIsTemporary()); + identifier, sqlDropModel.getIfExists(), sqlDropModel.isTemporary()); } } From 1ecf149612fe504cafb0987a891bded4c36e2f3c Mon Sep 17 00:00:00 2001 From: Hao Li <1127478+lihaosky@users.noreply.github.com> Date: Mon, 1 Dec 2025 10:57:10 -0800 Subject: [PATCH 5/6] comments --- .../src/main/codegen/includes/parserImpls.ftl | 3 +-- .../flink/sql/parser/SqlParseUtils.java | 18 +++++++++++++----- .../parser/ddl/SqlAlterConnectionRename.java | 4 ---- .../parser/ddl/SqlAlterConnectionReset.java | 5 +---- .../sql/parser/ddl/SqlAlterModelReset.java | 5 +---- .../flink/sql/parser/ddl/SqlAnalyzeTable.java | 2 +- .../sql/parser/ddl/SqlCreateConnection.java | 19 ++----------------- .../ddl/SqlCreateMaterializedTable.java | 2 +- .../flink/sql/parser/ddl/SqlCreateTable.java | 2 +- .../parser/dql/SqlShowCreateConnection.java | 4 ---- .../sql/parser/FlinkSqlParserImplTest.java | 15 ++++++++++----- .../utils/OperationConverterUtils.java | 4 +++- 12 files changed, 34 insertions(+), 49 deletions(-) diff --git a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl index 7be96330ccd12..50650cbc9b2c8 100644 --- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl +++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl @@ -3707,8 +3707,7 @@ SqlCreate SqlCreateConnection(Span s, boolean isTemporary) : connectionIdentifier = CompoundIdentifier() [ { - String p = SqlParserUtil.parseString(token.image); - comment = SqlLiteral.createCharString(p, getPos()); + comment = Comment(); } ] diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/SqlParseUtils.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/SqlParseUtils.java index 8f3a3831aec26..2a289d09bd879 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/SqlParseUtils.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/SqlParseUtils.java @@ -20,7 +20,6 @@ import org.apache.flink.sql.parser.ddl.SqlTableOption; -import org.apache.calcite.sql.SqlIdentifier; import org.apache.calcite.sql.SqlLiteral; import org.apache.calcite.sql.SqlNode; import org.apache.calcite.sql.SqlNodeList; @@ -31,6 +30,8 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.function.Function; import java.util.stream.Collectors; /** Utils methods for parsing DDLs. */ @@ -89,12 +90,19 @@ public static Map extractMap(@Nullable SqlNodeList propList) { .collect(Collectors.toMap(k -> k.getKeyString(), SqlTableOption::getValueString)); } - public static List extractList(@Nullable SqlNodeList sqlNodeList) { + public static List extractList( + @Nullable SqlNodeList sqlNodeList, Function mapper) { if (sqlNodeList == null) { return List.of(); } - return sqlNodeList.getList().stream() - .map(p -> ((SqlIdentifier) p).getSimple()) - .collect(Collectors.toList()); + return sqlNodeList.getList().stream().map(mapper).collect(Collectors.toList()); + } + + public static Set extractSet( + @Nullable SqlNodeList sqlNodeList, Function mapper) { + if (sqlNodeList == null) { + return Set.of(); + } + return sqlNodeList.getList().stream().map(mapper).collect(Collectors.toSet()); } } diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterConnectionRename.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterConnectionRename.java index 5f0a4827ecca6..ce50651c4c657 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterConnectionRename.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterConnectionRename.java @@ -49,10 +49,6 @@ public SqlIdentifier getNewConnectionName() { return newConnectionName; } - public String[] fullNewConnectionName() { - return newConnectionName.names.toArray(new String[0]); - } - @Override public List getOperandList() { return List.of(name, newConnectionName); diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterConnectionReset.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterConnectionReset.java index f400f299976c2..23501f7552947 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterConnectionReset.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterConnectionReset.java @@ -29,7 +29,6 @@ import java.util.List; import java.util.Set; -import java.util.stream.Collectors; import static java.util.Objects.requireNonNull; @@ -55,9 +54,7 @@ public List getOperandList() { } public Set getResetKeys() { - return optionKeyList.getList().stream() - .map(SqlParseUtils::extractString) - .collect(Collectors.toSet()); + return SqlParseUtils.extractSet(optionKeyList, SqlParseUtils::extractString); } @Override diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterModelReset.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterModelReset.java index e9392daaa1a69..00bfbc1079e5c 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterModelReset.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterModelReset.java @@ -29,7 +29,6 @@ import java.util.List; import java.util.Set; -import java.util.stream.Collectors; import static java.util.Objects.requireNonNull; @@ -54,9 +53,7 @@ public List getOperandList() { } public Set getResetKeys() { - return optionKeyList.getList().stream() - .map(SqlParseUtils::extractString) - .collect(Collectors.toSet()); + return SqlParseUtils.extractSet(optionKeyList, SqlParseUtils::extractString); } @Override diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAnalyzeTable.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAnalyzeTable.java index 4ab91890c19db..c90c56a7b8cde 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAnalyzeTable.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAnalyzeTable.java @@ -81,7 +81,7 @@ public LinkedHashMap getPartitions() { } public List getColumnNames() { - return SqlParseUtils.extractList(columns); + return SqlParseUtils.extractList(columns, p -> ((SqlIdentifier) p).getSimple()); } public boolean isAllColumns() { diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateConnection.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateConnection.java index a7cc767e401fe..c9936893a1474 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateConnection.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateConnection.java @@ -104,22 +104,7 @@ public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { writer.keyword("IF NOT EXISTS"); } name.unparse(writer, leftPrec, rightPrec); - - if (comment != null) { - writer.newlineAndIndent(); - writer.keyword("COMMENT"); - comment.unparse(writer, leftPrec, rightPrec); - } - - if (properties != null && !properties.isEmpty()) { - writer.keyword("WITH"); - SqlWriter.Frame withFrame = writer.startList("(", ")"); - for (SqlNode connectionProperty : properties) { - SqlUnparseUtils.printIndent(writer); - connectionProperty.unparse(writer, leftPrec, rightPrec); - } - writer.newlineAndIndent(); - writer.endList(withFrame); - } + SqlUnparseUtils.unparseComment(comment, true, writer, leftPrec, rightPrec); + SqlUnparseUtils.unparseProperties(properties, writer, leftPrec, rightPrec); } } diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateMaterializedTable.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateMaterializedTable.java index 628571fdcb4bb..8039e28296866 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateMaterializedTable.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateMaterializedTable.java @@ -119,7 +119,7 @@ public Optional getWatermark() { } public List getPartitionKeyList() { - return SqlParseUtils.extractList(partitionKeyList); + return SqlParseUtils.extractList(partitionKeyList, p -> ((SqlIdentifier) p).getSimple()); } @Nullable diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java index 2d6769b225288..8fa0a4c7ca537 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java @@ -136,7 +136,7 @@ public final SqlDistribution getDistribution() { } public List getPartitionKeyList() { - return SqlParseUtils.extractList(partitionKeyList); + return SqlParseUtils.extractList(partitionKeyList, p -> ((SqlIdentifier) p).getSimple()); } public List getTableConstraints() { diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowCreateConnection.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowCreateConnection.java index 0be99d9f7ebd4..0434bbb6dc109 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowCreateConnection.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowCreateConnection.java @@ -43,10 +43,6 @@ public SqlIdentifier getConnectionName() { return sqlIdentifier; } - public String[] getFullConnectionName() { - return sqlIdentifier.names.toArray(new String[0]); - } - @Override public SqlOperator getOperator() { return OPERATOR; diff --git a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java index 0b7ec6251bb5e..a951bd165e028 100644 --- a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java +++ b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java @@ -3505,7 +3505,8 @@ void testCreateConnection() { + " )\n") .ok( "CREATE CONNECTION `CONN1`\n" - + "COMMENT 'connection_comment' WITH (\n" + + "COMMENT 'connection_comment'\n" + + "WITH (\n" + " 'type' = 'basic',\n" + " 'url' = 'http://example.com',\n" + " 'username' = 'user1',\n" @@ -3521,7 +3522,8 @@ void testCreateConnectionIfNotExists() { + " 'token'='my_token'\n" + " )\n") .ok( - "CREATE CONNECTION IF NOT EXISTS `CONN1` WITH (\n" + "CREATE CONNECTION IF NOT EXISTS `CONN1`\n" + + "WITH (\n" + " 'type' = 'bearer',\n" + " 'token' = 'my_token'\n" + ")"); @@ -3535,7 +3537,8 @@ void testCreateTemporaryConnection() { + " 'client_id'='client1'\n" + " )\n") .ok( - "CREATE TEMPORARY CONNECTION `CONN1` WITH (\n" + "CREATE TEMPORARY CONNECTION `CONN1`\n" + + "WITH (\n" + " 'type' = 'oauth',\n" + " 'client_id' = 'client1'\n" + ")"); @@ -3563,7 +3566,8 @@ void testCreateTemporarySystemConnection() { + " 'api_key'='key123'\n" + " )\n") .ok( - "CREATE TEMPORARY SYSTEM CONNECTION `CONN1` WITH (\n" + "CREATE TEMPORARY SYSTEM CONNECTION `CONN1`\n" + + "WITH (\n" + " 'type' = 'custom_type',\n" + " 'api_key' = 'key123'\n" + ")"); @@ -3574,7 +3578,8 @@ void testCreateConnectionWithQualifiedName() { sql("create connection catalog1.db1.conn1\n" + " WITH ('type'='basic', 'url'='http://example.com')\n") .ok( - "CREATE CONNECTION `CATALOG1`.`DB1`.`CONN1` WITH (\n" + "CREATE CONNECTION `CATALOG1`.`DB1`.`CONN1`\n" + + "WITH (\n" + " 'type' = 'basic',\n" + " 'url' = 'http://example.com'\n" + ")"); diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java index b9992b43dd883..ff25ff6b0e068 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java @@ -24,6 +24,7 @@ import org.apache.flink.table.planner.calcite.FlinkPlannerImpl; import org.apache.calcite.sql.SqlDialect; +import org.apache.calcite.sql.SqlIdentifier; import org.apache.calcite.sql.SqlNode; import org.apache.calcite.sql.SqlNodeList; import org.apache.calcite.sql.SqlNumericLiteral; @@ -52,7 +53,8 @@ public static TableDistribution getDistributionFromSqlDistribution( } SqlNodeList columns = distribution.getBucketColumns(); - List bucketColumns = SqlParseUtils.extractList(columns); + List bucketColumns = + SqlParseUtils.extractList(columns, p -> ((SqlIdentifier) p).getSimple()); return TableDistribution.of(kind, bucketCount, bucketColumns); } From 96c6340acbf390d4c037175d7628675007d54555 Mon Sep 17 00:00:00 2001 From: Hao Li <1127478+lihaosky@users.noreply.github.com> Date: Mon, 1 Dec 2025 11:01:11 -0800 Subject: [PATCH 6/6] fix table --- .../org/apache/flink/sql/parser/ddl/SqlAlterTableReset.java | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableReset.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableReset.java index a76163b999ee8..b6581a5f41929 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableReset.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableReset.java @@ -30,7 +30,6 @@ import java.util.List; import java.util.Set; -import java.util.stream.Collectors; import static java.util.Objects.requireNonNull; @@ -58,9 +57,7 @@ public SqlNodeList getPropertyKeyList() { } public Set getResetKeys() { - return propertyKeyList.getList().stream() - .map(SqlParseUtils::extractString) - .collect(Collectors.toSet()); + return SqlParseUtils.extractSet(propertyKeyList, SqlParseUtils::extractString); } @Override