diff --git a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd index 6e93241af40ef..6daa33af71f74 100644 --- a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd +++ b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd @@ -39,6 +39,10 @@ "org.apache.flink.sql.parser.ddl.SqlAlterCatalogOptions" "org.apache.flink.sql.parser.ddl.SqlAlterCatalogReset" "org.apache.flink.sql.parser.ddl.SqlAlterCatalogComment" + "org.apache.flink.sql.parser.ddl.SqlAlterConnection" + "org.apache.flink.sql.parser.ddl.SqlAlterConnectionRename" + "org.apache.flink.sql.parser.ddl.SqlAlterConnectionReset" + "org.apache.flink.sql.parser.ddl.SqlAlterConnectionSet" "org.apache.flink.sql.parser.ddl.SqlAlterDatabase" "org.apache.flink.sql.parser.ddl.SqlAlterFunction" "org.apache.flink.sql.parser.ddl.SqlAlterMaterializedTable" @@ -76,6 +80,7 @@ "org.apache.flink.sql.parser.ddl.SqlAlterViewRename" "org.apache.flink.sql.parser.ddl.SqlCompilePlan" "org.apache.flink.sql.parser.ddl.SqlCreateCatalog" + "org.apache.flink.sql.parser.ddl.SqlCreateConnection" "org.apache.flink.sql.parser.ddl.SqlCreateDatabase" "org.apache.flink.sql.parser.ddl.SqlCreateFunction" "org.apache.flink.sql.parser.ddl.SqlCreateModel" @@ -87,6 +92,7 @@ "org.apache.flink.sql.parser.ddl.SqlCreateView" "org.apache.flink.sql.parser.ddl.SqlDistribution" "org.apache.flink.sql.parser.ddl.SqlDropCatalog" + "org.apache.flink.sql.parser.ddl.SqlDropConnection" "org.apache.flink.sql.parser.ddl.SqlDropDatabase" "org.apache.flink.sql.parser.ddl.SqlDropFunction" "org.apache.flink.sql.parser.ddl.SqlDropMaterializedTable" @@ -137,12 +143,15 @@ "org.apache.flink.sql.parser.dql.SqlShowTables" "org.apache.flink.sql.parser.dql.SqlShowTables.SqlTableKind" "org.apache.flink.sql.parser.dql.SqlShowColumns" + "org.apache.flink.sql.parser.dql.SqlShowConnections" "org.apache.flink.sql.parser.dql.SqlShowCreate" + "org.apache.flink.sql.parser.dql.SqlShowCreateConnection" "org.apache.flink.sql.parser.dql.SqlShowCreateMaterializedTable" "org.apache.flink.sql.parser.dql.SqlShowCreateModel" "org.apache.flink.sql.parser.dql.SqlShowCreateTable" "org.apache.flink.sql.parser.dql.SqlShowCreateView" "org.apache.flink.sql.parser.dql.SqlShowCreateCatalog" + "org.apache.flink.sql.parser.dql.SqlRichDescribeConnection" "org.apache.flink.sql.parser.dql.SqlRichDescribeFunction" "org.apache.flink.sql.parser.dql.SqlRichDescribeModel" "org.apache.flink.sql.parser.dql.SqlRichDescribeTable" @@ -185,6 +194,7 @@ "COMMENT" "COMPILE" "COMPUTE" + "CONNECTIONS", "CONTINUOUS" "DATABASES" "DISTRIBUTED" @@ -618,12 +628,14 @@ "SqlAlterFunction()" "SqlShowFunctions()" "SqlShowModels()" + "SqlShowConnections()" "SqlShowTables()" "SqlShowColumns()" "SqlShowCreate()" "SqlReplaceTable()" "SqlAlterMaterializedTable()" "SqlAlterModel()" + "SqlAlterConnection()" "SqlAlterTable()" "SqlAlterView()" "SqlShowModules()" @@ -648,6 +660,7 @@ "SqlDescribeJob()" "SqlRichDescribeFunction()" "SqlRichDescribeModel()" + "SqlRichDescribeConnection()" "SqlRichDescribeTable()" ] diff --git a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl index bf2f07ce06084..50650cbc9b2c8 100644 --- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl +++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl @@ -776,6 +776,13 @@ SqlShowCreate SqlShowCreate() : { return new SqlShowCreateModel(pos, sqlIdentifier); } + | + + { pos = getPos(); } + sqlIdentifier = CompoundIdentifier() + { + return new SqlShowCreateConnection(pos, sqlIdentifier); + } | { pos = getPos(); } @@ -787,7 +794,7 @@ SqlShowCreate SqlShowCreate() : } /** - * DESCRIBE | DESC FUNCTION [ EXTENDED] [[catalogName.] dataBasesName].functionName sql call. + * (DESCRIBE | DESC) FUNCTION [ EXTENDED] [[catalogName.] dataBasesName].functionName sql call. * Here we add Rich in className to match the naming of SqlRichDescribeTable. */ SqlRichDescribeFunction SqlRichDescribeFunction() : @@ -806,7 +813,7 @@ SqlRichDescribeFunction SqlRichDescribeFunction() : } /** - * DESCRIBE | DESC MODEL [ EXTENDED] [[catalogName.] dataBasesName].modelName sql call. + * (DESCRIBE | DESC) MODEL [ EXTENDED] [[catalogName.] dataBasesName].modelName sql call. * Here we add Rich in className to match the naming of SqlRichDescribeTable. */ SqlRichDescribeModel SqlRichDescribeModel() : @@ -825,7 +832,26 @@ SqlRichDescribeModel SqlRichDescribeModel() : } /** - * DESCRIBE | DESC [ EXTENDED] [[catalogName.] dataBasesName].tableName sql call. + * (DESCRIBE | DESC) CONNECTION [ EXTENDED] [[catalogName.] dataBasesName].connectionName sql call. + * Here we add Rich in className to match the naming of SqlRichDescribeTable. + */ +SqlRichDescribeConnection SqlRichDescribeConnection() : +{ + SqlIdentifier connectionName; + SqlParserPos pos; + boolean isExtended = false; +} +{ + ( | ) { pos = getPos();} + [ { isExtended = true;} ] + connectionName = CompoundIdentifier() + { + return new SqlRichDescribeConnection(pos, connectionName, isExtended); + } +} + +/** + * (DESCRIBE | DESC) [ EXTENDED] [[catalogName.] dataBasesName].tableName sql call. * Here we add Rich in className to distinguish from calcite's original SqlDescribeTable. */ SqlRichDescribeTable SqlRichDescribeTable() : @@ -2663,9 +2689,13 @@ SqlCreate SqlCreateExtended(Span s, boolean replace) : | create = SqlCreateDatabase(s, replace) | + create = SqlCreateModel(s, isTemporary) + | + // Lookahead to distinguish FUNCTION and + LOOKAHEAD(2) create = SqlCreateFunction(s, replace, isTemporary) | - create = SqlCreateModel(s, isTemporary) + create = SqlCreateConnection(s, isTemporary) ) { return create; @@ -2692,9 +2722,13 @@ SqlDrop SqlDropExtended(Span s, boolean replace) : | drop = SqlDropDatabase(s, replace) | + drop = SqlDropModel(s, isTemporary) + | + // Lookahead to distinguish FUNCTION and + LOOKAHEAD(2) drop = SqlDropFunction(s, replace, isTemporary) | - drop = SqlDropModel(s, isTemporary) + drop = SqlDropConnection(s, isTemporary) ) { return drop; @@ -3325,7 +3359,7 @@ SqlTruncateTable SqlTruncateTable() : } /** -* SHOW MODELS [FROM [catalog.] database] [[NOT] LIKE pattern]; sql call. +* SHOW MODELS [FROM [catalog.] database] [[NOT] LIKE pattern]; */ SqlShowModels SqlShowModels() : { @@ -3361,6 +3395,43 @@ SqlShowModels SqlShowModels() : } } +/** +* SHOW CONNECTIONS [LIKE 'pattern'] [FROM catalog_name.db_name]; +*/ +SqlShowConnections SqlShowConnections() : +{ + SqlIdentifier databaseName = null; + SqlCharStringLiteral likeLiteral = null; + String prep = null; + boolean notLike = false; + SqlParserPos pos; +} +{ + + { pos = getPos(); } + [ + ( { prep = "FROM"; } | { prep = "IN"; } ) + { pos = getPos(); } + databaseName = CompoundIdentifier() + ] + [ + [ + + { + notLike = true; + } + ] + + { + String likeCondition = SqlParserUtil.parseString(token.image); + likeLiteral = SqlLiteral.createCharString(likeCondition, getPos()); + } + ] + { + return new SqlShowConnections(pos, prep, databaseName, notLike, likeLiteral); + } +} + /** * ALTER MODEL [IF EXISTS] modelName SET (property_key = property_val, ...) * ALTER MODEL [IF EXISTS] modelName RENAME TO newModelName @@ -3413,6 +3484,59 @@ SqlAlterModel SqlAlterModel() : ) } +/** +* ALTER CONNECTION [IF EXISTS] connectionName SET (property_key = property_val, ...) +* ALTER CONNECTION [IF EXISTS] connectionName RENAME TO newConnectionName +* ALTER CONNECTION [IF EXISTS] connectionName RESET (property_key, ...) +* Alter temporary or system connection is not supported. +*/ +SqlAlterConnection SqlAlterConnection() : +{ + SqlParserPos startPos; + boolean ifExists = false; + SqlIdentifier connectionIdentifier; + SqlIdentifier newConnectionIdentifier = null; + SqlNodeList propertyList = SqlNodeList.EMPTY; + SqlNodeList propertyKeyList = SqlNodeList.EMPTY; +} +{ + { startPos = getPos(); } + ifExists = IfExistsOpt() + connectionIdentifier = CompoundIdentifier() + ( + LOOKAHEAD(2) + + newConnectionIdentifier = CompoundIdentifier() + { + return new SqlAlterConnectionRename( + startPos.plus(getPos()), + connectionIdentifier, + newConnectionIdentifier, + ifExists); + } + | + + propertyList = Properties() + { + return new SqlAlterConnectionSet( + startPos.plus(getPos()), + connectionIdentifier, + ifExists, + propertyList); + } + | + + propertyKeyList = PropertyKeys() + { + return new SqlAlterConnectionReset( + startPos.plus(getPos()), + connectionIdentifier, + ifExists, + propertyKeyList); + } + ) +} + /** * DROP MODEL [IF EXIST] modelName */ @@ -3433,6 +3557,38 @@ SqlDrop SqlDropModel(Span s, boolean isTemporary) : } } +/** +* DROP [TEMPORARY] [SYSTEM] CONNECTION [IF EXIST] connectionName +*/ +SqlDrop SqlDropConnection(Span s, boolean isTemporary) : +{ + SqlIdentifier connectionIdentifier = null; + boolean ifExists = false; + boolean isSystemConnection = false; +} +{ + [ + + { + if (!isTemporary){ + throw SqlUtil.newContextException(getPos(), + ParserResource.RESOURCE.dropSystemConnectionOnlySupportTemporary()); + } + isSystemConnection = true; + } + ] + + + + ifExists = IfExistsOpt() + + connectionIdentifier = CompoundIdentifier() + + { + return new SqlDropConnection(s.pos(), connectionIdentifier, ifExists, isTemporary, isSystemConnection); + } +} + /** * CREATE MODEL [IF NOT EXIST] modelName * [INPUT(col1 type1, col2 type2, ...)] @@ -3519,6 +3675,54 @@ SqlCreate SqlCreateModel(Span s, boolean isTemporary) : } } +/** +* CREATE [TEMPORARY] [SYSTEM] CONNECTION [IF NOT EXISTS] [catalog_name.][db_name.]connection_name +* [COMMENT connection_comment] +* WITH (property_key = property_val, ...) +*/ +SqlCreate SqlCreateConnection(Span s, boolean isTemporary) : +{ + final SqlParserPos startPos = s.pos(); + boolean ifNotExists = false; + boolean isSystem = false; + SqlIdentifier connectionIdentifier; + SqlCharStringLiteral comment = null; + SqlNodeList propertyList = SqlNodeList.EMPTY; +} +{ + [ + + { + if (!isTemporary){ + throw SqlUtil.newContextException(getPos(), + ParserResource.RESOURCE.createSystemConnectionOnlySupportTemporary()); + } + isSystem = true; + } + ] + + + ifNotExists = IfNotExistsOpt() + + connectionIdentifier = CompoundIdentifier() + [ + { + comment = Comment(); + } + ] + + propertyList = Properties() + { + return new SqlCreateConnection(startPos.plus(getPos()), + connectionIdentifier, + comment, + propertyList, + isTemporary, + isSystem, + ifNotExists); + } +} + SqlCharStringLiteral Comment() : { } diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/SqlParseUtils.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/SqlParseUtils.java index 8f3a3831aec26..2a289d09bd879 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/SqlParseUtils.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/SqlParseUtils.java @@ -20,7 +20,6 @@ import org.apache.flink.sql.parser.ddl.SqlTableOption; -import org.apache.calcite.sql.SqlIdentifier; import org.apache.calcite.sql.SqlLiteral; import org.apache.calcite.sql.SqlNode; import org.apache.calcite.sql.SqlNodeList; @@ -31,6 +30,8 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.function.Function; import java.util.stream.Collectors; /** Utils methods for parsing DDLs. */ @@ -89,12 +90,19 @@ public static Map extractMap(@Nullable SqlNodeList propList) { .collect(Collectors.toMap(k -> k.getKeyString(), SqlTableOption::getValueString)); } - public static List extractList(@Nullable SqlNodeList sqlNodeList) { + public static List extractList( + @Nullable SqlNodeList sqlNodeList, Function mapper) { if (sqlNodeList == null) { return List.of(); } - return sqlNodeList.getList().stream() - .map(p -> ((SqlIdentifier) p).getSimple()) - .collect(Collectors.toList()); + return sqlNodeList.getList().stream().map(mapper).collect(Collectors.toList()); + } + + public static Set extractSet( + @Nullable SqlNodeList sqlNodeList, Function mapper) { + if (sqlNodeList == null) { + return Set.of(); + } + return sqlNodeList.getList().stream().map(mapper).collect(Collectors.toSet()); } } diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterConnection.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterConnection.java new file mode 100644 index 0000000000000..d8c490c547a2c --- /dev/null +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterConnection.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.parser.ddl; + +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlSpecialOperator; +import org.apache.calcite.sql.SqlWriter; +import org.apache.calcite.sql.parser.SqlParserPos; + +/** + * Abstract class to describe statements like ALTER CONNECTION [IF EXISTS] [[catalogName.] + * dataBasesName.]connectionName ... + */ +public abstract class SqlAlterConnection extends SqlAlterObject { + + private static final SqlSpecialOperator OPERATOR = + new SqlSpecialOperator("ALTER CONNECTION", SqlKind.OTHER_DDL); + + protected final boolean ifConnectionExists; + + public SqlAlterConnection( + SqlParserPos pos, SqlIdentifier connectionName, boolean ifConnectionExists) { + super(OPERATOR, pos, "CONNECTION", connectionName); + this.ifConnectionExists = ifConnectionExists; + } + + /** + * Whether to ignore the error if the connection doesn't exist. + * + * @return true when IF EXISTS is specified. + */ + public boolean ifConnectionExists() { + return ifConnectionExists; + } + + @Override + public void unparseAlterOperation(SqlWriter writer, int leftPrec, int rightPrec) { + if (ifConnectionExists) { + writer.keyword("IF EXISTS"); + } + name.unparse(writer, leftPrec, rightPrec); + } +} diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterConnectionRename.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterConnectionRename.java new file mode 100644 index 0000000000000..ce50651c4c657 --- /dev/null +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterConnectionRename.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.parser.ddl; + +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlWriter; +import org.apache.calcite.sql.parser.SqlParserPos; + +import java.util.List; + +import static java.util.Objects.requireNonNull; + +/** + * ALTER CONNECTION [IF EXISTS] [[catalogName.] dataBasesName.]connectionName RENAME TO + * newConnectionName. + */ +public class SqlAlterConnectionRename extends SqlAlterConnection { + + private final SqlIdentifier newConnectionName; + + public SqlAlterConnectionRename( + SqlParserPos pos, + SqlIdentifier connectionName, + SqlIdentifier newConnectionName, + boolean ifConnectionExists) { + super(pos, connectionName, ifConnectionExists); + this.newConnectionName = + requireNonNull(newConnectionName, "newConnectionName should not be null"); + } + + public SqlIdentifier getNewConnectionName() { + return newConnectionName; + } + + @Override + public List getOperandList() { + return List.of(name, newConnectionName); + } + + @Override + public void unparseAlterOperation(SqlWriter writer, int leftPrec, int rightPrec) { + super.unparseAlterOperation(writer, leftPrec, rightPrec); + writer.keyword("RENAME TO"); + newConnectionName.unparse(writer, leftPrec, rightPrec); + } +} diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterConnectionReset.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterConnectionReset.java new file mode 100644 index 0000000000000..23501f7552947 --- /dev/null +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterConnectionReset.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.parser.ddl; + +import org.apache.flink.sql.parser.SqlParseUtils; +import org.apache.flink.sql.parser.SqlUnparseUtils; + +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlNodeList; +import org.apache.calcite.sql.SqlWriter; +import org.apache.calcite.sql.parser.SqlParserPos; + +import java.util.List; +import java.util.Set; + +import static java.util.Objects.requireNonNull; + +/** + * ALTER CONNECTION [IF EXISTS] [[catalogName.] dataBasesName.]connectionName RESET ( 'key1' [, + * 'key2']...). + */ +public class SqlAlterConnectionReset extends SqlAlterConnection { + private final SqlNodeList optionKeyList; + + public SqlAlterConnectionReset( + SqlParserPos pos, + SqlIdentifier connectionName, + boolean ifConnectionExists, + SqlNodeList optionKeyList) { + super(pos, connectionName, ifConnectionExists); + this.optionKeyList = requireNonNull(optionKeyList, "optionKeyList should not be null"); + } + + @Override + public List getOperandList() { + return List.of(name, optionKeyList); + } + + public Set getResetKeys() { + return SqlParseUtils.extractSet(optionKeyList, SqlParseUtils::extractString); + } + + @Override + public void unparseAlterOperation(SqlWriter writer, int leftPrec, int rightPrec) { + super.unparseAlterOperation(writer, leftPrec, rightPrec); + SqlUnparseUtils.unparseResetOptions(optionKeyList, writer, leftPrec, rightPrec); + } +} diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterConnectionSet.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterConnectionSet.java new file mode 100644 index 0000000000000..c738d9a753094 --- /dev/null +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterConnectionSet.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.parser.ddl; + +import org.apache.flink.sql.parser.SqlParseUtils; +import org.apache.flink.sql.parser.SqlUnparseUtils; + +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlNodeList; +import org.apache.calcite.sql.SqlWriter; +import org.apache.calcite.sql.parser.SqlParserPos; + +import java.util.List; +import java.util.Map; + +import static java.util.Objects.requireNonNull; + +/** + * ALTER CONNECTION [IF EXISTS] [[catalogName.] dataBasesName.]connectionName SET ( name=value [, + * name=value]*). + */ +public class SqlAlterConnectionSet extends SqlAlterConnection { + + private final SqlNodeList connectionOptionList; + + public SqlAlterConnectionSet( + SqlParserPos pos, + SqlIdentifier connectionName, + boolean ifConnectionExists, + SqlNodeList connectionOptionList) { + super(pos, connectionName, ifConnectionExists); + this.connectionOptionList = + requireNonNull(connectionOptionList, "connectionOptionList should not be null"); + } + + public Map getProperties() { + return SqlParseUtils.extractMap(connectionOptionList); + } + + @Override + public List getOperandList() { + return List.of(name, connectionOptionList); + } + + @Override + public void unparseAlterOperation(SqlWriter writer, int leftPrec, int rightPrec) { + super.unparseAlterOperation(writer, leftPrec, rightPrec); + SqlUnparseUtils.unparseSetOptions(connectionOptionList, writer, leftPrec, rightPrec); + } +} diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterModelReset.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterModelReset.java index e9392daaa1a69..00bfbc1079e5c 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterModelReset.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterModelReset.java @@ -29,7 +29,6 @@ import java.util.List; import java.util.Set; -import java.util.stream.Collectors; import static java.util.Objects.requireNonNull; @@ -54,9 +53,7 @@ public List getOperandList() { } public Set getResetKeys() { - return optionKeyList.getList().stream() - .map(SqlParseUtils::extractString) - .collect(Collectors.toSet()); + return SqlParseUtils.extractSet(optionKeyList, SqlParseUtils::extractString); } @Override diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableReset.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableReset.java index a76163b999ee8..b6581a5f41929 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableReset.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableReset.java @@ -30,7 +30,6 @@ import java.util.List; import java.util.Set; -import java.util.stream.Collectors; import static java.util.Objects.requireNonNull; @@ -58,9 +57,7 @@ public SqlNodeList getPropertyKeyList() { } public Set getResetKeys() { - return propertyKeyList.getList().stream() - .map(SqlParseUtils::extractString) - .collect(Collectors.toSet()); + return SqlParseUtils.extractSet(propertyKeyList, SqlParseUtils::extractString); } @Override diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAnalyzeTable.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAnalyzeTable.java index 4ab91890c19db..c90c56a7b8cde 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAnalyzeTable.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAnalyzeTable.java @@ -81,7 +81,7 @@ public LinkedHashMap getPartitions() { } public List getColumnNames() { - return SqlParseUtils.extractList(columns); + return SqlParseUtils.extractList(columns, p -> ((SqlIdentifier) p).getSimple()); } public boolean isAllColumns() { diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateConnection.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateConnection.java new file mode 100644 index 0000000000000..c9936893a1474 --- /dev/null +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateConnection.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.parser.ddl; + +import org.apache.flink.sql.parser.ExtendedSqlNode; +import org.apache.flink.sql.parser.SqlUnparseUtils; +import org.apache.flink.sql.parser.error.SqlValidateException; + +import org.apache.calcite.sql.SqlCharStringLiteral; +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlNodeList; +import org.apache.calcite.sql.SqlSpecialOperator; +import org.apache.calcite.sql.SqlWriter; +import org.apache.calcite.sql.parser.SqlParserPos; +import org.apache.calcite.util.ImmutableNullableList; + +import javax.annotation.Nonnull; + +import java.util.List; + +/** + * {@link SqlNode} to describe the CREATE CONNECTION syntax. CREATE [TEMPORARY] [SYSTEM] CONNECTION + * [IF NOT EXISTS] [[catalogName.] dataBasesName].connectionName [COMMENT connection_comment] WITH + * (name=value, [name=value]*). + */ +public class SqlCreateConnection extends SqlCreateObject implements ExtendedSqlNode { + + private static final SqlSpecialOperator OPERATOR = + new SqlSpecialOperator("CREATE CONNECTION", SqlKind.OTHER_DDL); + + private final boolean isSystem; + + public SqlCreateConnection( + SqlParserPos pos, + SqlIdentifier connectionName, + SqlCharStringLiteral comment, + SqlNodeList propertyList, + boolean isTemporary, + boolean isSystem, + boolean ifNotExists) { + super( + OPERATOR, + pos, + connectionName, + isTemporary, + false, + ifNotExists, + propertyList, + comment); + this.isSystem = isSystem; + } + + @Override + public @Nonnull List getOperandList() { + return ImmutableNullableList.of(name, comment, properties); + } + + public boolean isSystem() { + return isSystem; + } + + @Override + public void validate() throws SqlValidateException { + if (properties == null || properties.isEmpty()) { + throw new SqlValidateException( + getParserPosition(), "Connection property list can not be empty."); + } + } + + @Override + protected String getScope() { + return "CONNECTION"; + } + + @Override + public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { + writer.keyword("CREATE"); + if (isTemporary()) { + writer.keyword("TEMPORARY"); + } + if (isSystem) { + writer.keyword("SYSTEM"); + } + writer.keyword("CONNECTION"); + if (isIfNotExists()) { + writer.keyword("IF NOT EXISTS"); + } + name.unparse(writer, leftPrec, rightPrec); + SqlUnparseUtils.unparseComment(comment, true, writer, leftPrec, rightPrec); + SqlUnparseUtils.unparseProperties(properties, writer, leftPrec, rightPrec); + } +} diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateMaterializedTable.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateMaterializedTable.java index 628571fdcb4bb..8039e28296866 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateMaterializedTable.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateMaterializedTable.java @@ -119,7 +119,7 @@ public Optional getWatermark() { } public List getPartitionKeyList() { - return SqlParseUtils.extractList(partitionKeyList); + return SqlParseUtils.extractList(partitionKeyList, p -> ((SqlIdentifier) p).getSimple()); } @Nullable diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java index 2d6769b225288..8fa0a4c7ca537 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java @@ -136,7 +136,7 @@ public final SqlDistribution getDistribution() { } public List getPartitionKeyList() { - return SqlParseUtils.extractList(partitionKeyList); + return SqlParseUtils.extractList(partitionKeyList, p -> ((SqlIdentifier) p).getSimple()); } public List getTableConstraints() { diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropConnection.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropConnection.java new file mode 100644 index 0000000000000..3af65a4ba119c --- /dev/null +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropConnection.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.parser.ddl; + +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlOperator; +import org.apache.calcite.sql.SqlSpecialOperator; +import org.apache.calcite.sql.SqlWriter; +import org.apache.calcite.sql.parser.SqlParserPos; + +/** + * {@link org.apache.calcite.sql.SqlNode} to describe the DROP CONNECTION [IF EXISTS] + * [[catalogName.] dataBasesName].connectionName syntax. + */ +public class SqlDropConnection extends SqlDropObject { + private static final SqlOperator OPERATOR = + new SqlSpecialOperator("DROP CONNECTION", SqlKind.OTHER_DDL); + + private final boolean isTemporary; + private final boolean isSystemConnection; + + public SqlDropConnection( + SqlParserPos pos, + SqlIdentifier connectionName, + boolean ifExists, + boolean isTemporary, + boolean isSystemConnection) { + super(OPERATOR, pos, connectionName, ifExists); + this.isTemporary = isTemporary; + this.isSystemConnection = isSystemConnection; + } + + public boolean isTemporary() { + return isTemporary; + } + + public boolean isSystemConnection() { + return isSystemConnection; + } + + @Override + public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { + writer.keyword("DROP"); + if (isTemporary) { + writer.keyword("TEMPORARY"); + } + if (isSystemConnection) { + writer.keyword("SYSTEM"); + } + writer.keyword("CONNECTION"); + if (ifExists) { + writer.keyword("IF EXISTS"); + } + name.unparse(writer, leftPrec, rightPrec); + } +} diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropModel.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropModel.java index 53b6417f7f5d9..8aeb082a54f24 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropModel.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropModel.java @@ -42,13 +42,16 @@ public SqlDropModel( this.isTemporary = isTemporary; } - public boolean getIsTemporary() { - return this.isTemporary; + public boolean isTemporary() { + return isTemporary; } @Override public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { writer.keyword("DROP"); + if (isTemporary) { + writer.keyword("TEMPORARY"); + } writer.keyword("MODEL"); if (ifExists) { writer.keyword("IF EXISTS"); diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlRichDescribeConnection.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlRichDescribeConnection.java new file mode 100644 index 0000000000000..9fb9bb9b79cfd --- /dev/null +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlRichDescribeConnection.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.parser.dql; + +import org.apache.calcite.sql.SqlCall; +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlOperator; +import org.apache.calcite.sql.SqlSpecialOperator; +import org.apache.calcite.sql.SqlWriter; +import org.apache.calcite.sql.parser.SqlParserPos; + +import java.util.Collections; +import java.util.List; + +/** + * DESCRIBE CONNECTION [EXTENDED] [[catalogName.] dataBasesName].sqlIdentifier sql call. Here we add + * Rich in className to follow the convention of {@link org.apache.calcite.sql.SqlDescribeTable}, + * which only had it to distinguish from calcite's original SqlDescribeTable, even though calcite + * does not have SqlDescribeConnection. + */ +public class SqlRichDescribeConnection extends SqlCall { + + public static final SqlSpecialOperator OPERATOR = + new SqlSpecialOperator("DESCRIBE CONNECTION", SqlKind.OTHER); + protected final SqlIdentifier connectionNameIdentifier; + private final boolean isExtended; + + public SqlRichDescribeConnection( + SqlParserPos pos, SqlIdentifier connectionNameIdentifier, boolean isExtended) { + super(pos); + this.connectionNameIdentifier = connectionNameIdentifier; + this.isExtended = isExtended; + } + + @Override + public SqlOperator getOperator() { + return OPERATOR; + } + + @Override + public List getOperandList() { + return Collections.singletonList(connectionNameIdentifier); + } + + public boolean isExtended() { + return isExtended; + } + + public String[] fullConnectionName() { + return connectionNameIdentifier.names.toArray(new String[0]); + } + + @Override + public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { + writer.keyword("DESCRIBE CONNECTION"); + if (isExtended) { + writer.keyword("EXTENDED"); + } + connectionNameIdentifier.unparse(writer, leftPrec, rightPrec); + } +} diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowConnections.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowConnections.java new file mode 100644 index 0000000000000..a0dd89f99e639 --- /dev/null +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowConnections.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.parser.dql; + +import org.apache.calcite.sql.SqlCharStringLiteral; +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlOperator; +import org.apache.calcite.sql.SqlSpecialOperator; +import org.apache.calcite.sql.parser.SqlParserPos; + +/** {@link SqlNode} to describe the SHOW CONNECTIONS syntax. */ +public class SqlShowConnections extends SqlShowCall { + + public static final SqlSpecialOperator OPERATOR = + new SqlSpecialOperator("SHOW CONNECTIONS", SqlKind.OTHER); + + public SqlShowConnections( + SqlParserPos pos, + String preposition, + SqlIdentifier databaseName, + boolean notLike, + SqlCharStringLiteral likeLiteral) { + // only LIKE currently supported for SHOW CONNECTIONS + super( + pos, + preposition, + databaseName, + likeLiteral == null ? null : "LIKE", + likeLiteral, + notLike); + } + + @Override + public SqlOperator getOperator() { + return OPERATOR; + } + + @Override + String getOperationName() { + return "SHOW CONNECTIONS"; + } +} diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowCreateConnection.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowCreateConnection.java new file mode 100644 index 0000000000000..0434bbb6dc109 --- /dev/null +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowCreateConnection.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.parser.dql; + +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlOperator; +import org.apache.calcite.sql.SqlSpecialOperator; +import org.apache.calcite.sql.SqlWriter; +import org.apache.calcite.sql.parser.SqlParserPos; + +import java.util.Collections; +import java.util.List; + +/** SHOW CREATE CONNECTION sql call. */ +public class SqlShowCreateConnection extends SqlShowCreate { + + public static final SqlSpecialOperator OPERATOR = + new SqlSpecialOperator("SHOW CREATE CONNECTION", SqlKind.OTHER_DDL); + + public SqlShowCreateConnection(SqlParserPos pos, SqlIdentifier connectionName) { + super(pos, connectionName); + } + + public SqlIdentifier getConnectionName() { + return sqlIdentifier; + } + + @Override + public SqlOperator getOperator() { + return OPERATOR; + } + + @Override + public List getOperandList() { + return Collections.singletonList(sqlIdentifier); + } + + @Override + public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { + writer.keyword("SHOW CREATE CONNECTION"); + sqlIdentifier.unparse(writer, leftPrec, rightPrec); + } +} diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java index 9c29119ba9e08..57e2e55c17d76 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java @@ -35,7 +35,15 @@ public interface ParserResource { Resources.ExInst overwriteIsOnlyUsedWithInsert(); @Resources.BaseMessage( - "CREATE SYSTEM FUNCTION is not supported, system functions can only be registered as temporary function, you can use CREATE TEMPORARY SYSTEM FUNCTION instead.") + "CREATE SYSTEM CONNECTION is not supported, system connections can only be registered as temporary connections, you can use CREATE TEMPORARY SYSTEM CONNECTION instead.") + Resources.ExInst createSystemConnectionOnlySupportTemporary(); + + @Resources.BaseMessage( + "DROP SYSTEM CONNECTION is not supported, system connections can only be dropped as temporary connections, you can use DROP TEMPORARY SYSTEM CONNECTION instead.") + Resources.ExInst dropSystemConnectionOnlySupportTemporary(); + + @Resources.BaseMessage( + "CREATE SYSTEM FUNCTION is not supported, system functions can only be registered as temporary functions, you can use CREATE TEMPORARY SYSTEM FUNCTION instead.") Resources.ExInst createSystemFunctionOnlySupportTemporary(); @Resources.BaseMessage("Duplicate EXPLAIN DETAIL is not allowed.") diff --git a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java index a5dbf34d45bac..a951bd165e028 100644 --- a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java +++ b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java @@ -2482,7 +2482,7 @@ void testCreateFunction() { .fails( "CREATE SYSTEM FUNCTION is not supported, " + "system functions can only be registered as temporary " - + "function, you can use CREATE TEMPORARY SYSTEM FUNCTION instead."); + + "functions, you can use CREATE TEMPORARY SYSTEM FUNCTION instead."); // test create function using jar sql("create temporary function function1 as 'org.apache.flink.function.function1' language java using jar 'file:///path/to/test.jar'") @@ -3265,6 +3265,12 @@ void testDropModel() { sql("drop model catalog1.db1.m1").ok("DROP MODEL `CATALOG1`.`DB1`.`M1`"); } + @Test + void testDropTemporaryModel() { + sql("drop temporary model m1").ok("DROP TEMPORARY MODEL `M1`"); + sql("drop temporary model if exists m1").ok("DROP TEMPORARY MODEL IF EXISTS `M1`"); + } + @Test void testDropModelIfExists() { sql("drop model if exists catalog1.db1.m1") @@ -3483,6 +3489,254 @@ void testModelInFunctionNamedArgs() { + "FROM TABLE(`ML_PREDICT`(`INPUT` => (TABLE `MY_TABLE`), `MODEL` => (MODEL `MY_MODEL`)))"); } + // ===================================================================================== + // Connection DDL/DQL Tests + // ===================================================================================== + + @Test + void testCreateConnection() { + sql("create connection conn1\n" + + " COMMENT 'connection_comment'\n" + + " WITH (\n" + + " 'type'='basic',\n" + + " 'url'='http://example.com',\n" + + " 'username'='user1',\n" + + " 'password'='pass1'\n" + + " )\n") + .ok( + "CREATE CONNECTION `CONN1`\n" + + "COMMENT 'connection_comment'\n" + + "WITH (\n" + + " 'type' = 'basic',\n" + + " 'url' = 'http://example.com',\n" + + " 'username' = 'user1',\n" + + " 'password' = 'pass1'\n" + + ")"); + } + + @Test + void testCreateConnectionIfNotExists() { + sql("create connection if not exists conn1\n" + + " WITH (\n" + + " 'type'='bearer',\n" + + " 'token'='my_token'\n" + + " )\n") + .ok( + "CREATE CONNECTION IF NOT EXISTS `CONN1`\n" + + "WITH (\n" + + " 'type' = 'bearer',\n" + + " 'token' = 'my_token'\n" + + ")"); + } + + @Test + void testCreateTemporaryConnection() { + sql("create temporary connection conn1\n" + + " WITH (\n" + + " 'type'='oauth',\n" + + " 'client_id'='client1'\n" + + " )\n") + .ok( + "CREATE TEMPORARY CONNECTION `CONN1`\n" + + "WITH (\n" + + " 'type' = 'oauth',\n" + + " 'client_id' = 'client1'\n" + + ")"); + } + + @Test + void testCreateSystemConnection() { + sql("create ^system^ connection conn1\n" + + " WITH (\n" + + " 'type'='basic',\n" + + " 'url'='http://example.com'\n" + + " )\n") + .fails( + "(?s)CREATE SYSTEM CONNECTION is not supported, " + + "system connections can only be registered as temporary " + + "connections, you can use CREATE TEMPORARY SYSTEM CONNECTION " + + "instead\\..*"); + } + + @Test + void testCreateTemporarySystemConnection() { + sql("create temporary system connection conn1\n" + + " WITH (\n" + + " 'type'='custom_type',\n" + + " 'api_key'='key123'\n" + + " )\n") + .ok( + "CREATE TEMPORARY SYSTEM CONNECTION `CONN1`\n" + + "WITH (\n" + + " 'type' = 'custom_type',\n" + + " 'api_key' = 'key123'\n" + + ")"); + } + + @Test + void testCreateConnectionWithQualifiedName() { + sql("create connection catalog1.db1.conn1\n" + + " WITH ('type'='basic', 'url'='http://example.com')\n") + .ok( + "CREATE CONNECTION `CATALOG1`.`DB1`.`CONN1`\n" + + "WITH (\n" + + " 'type' = 'basic',\n" + + " 'url' = 'http://example.com'\n" + + ")"); + } + + @Test + void testDropConnection() { + sql("drop connection conn1").ok("DROP CONNECTION `CONN1`"); + sql("drop connection db1.conn1").ok("DROP CONNECTION `DB1`.`CONN1`"); + sql("drop connection catalog1.db1.conn1").ok("DROP CONNECTION `CATALOG1`.`DB1`.`CONN1`"); + } + + @Test + void testDropConnectionIfExists() { + sql("drop connection if exists catalog1.db1.conn1") + .ok("DROP CONNECTION IF EXISTS `CATALOG1`.`DB1`.`CONN1`"); + } + + @Test + void testDropTemporaryConnection() { + sql("drop temporary connection conn1").ok("DROP TEMPORARY CONNECTION `CONN1`"); + sql("drop temporary connection if exists conn1") + .ok("DROP TEMPORARY CONNECTION IF EXISTS `CONN1`"); + } + + @Test + void testDropTemporarySystemConnection() { + sql("drop temporary system connection conn1") + .ok("DROP TEMPORARY SYSTEM CONNECTION `CONN1`"); + sql("drop temporary system connection if exists conn1") + .ok("DROP TEMPORARY SYSTEM CONNECTION IF EXISTS `CONN1`"); + } + + @Test + void testDropSystemConnection() { + sql("drop ^system^ connection conn1") + .fails( + "(?s)DROP SYSTEM CONNECTION is not supported, " + + "system connections can only be dropped as temporary " + + "connections, you can use DROP TEMPORARY SYSTEM CONNECTION " + + "instead\\..*"); + } + + @Test + void testAlterConnectionSet() { + final String sql = + "alter connection conn1 set ('password' = 'new_password','url' = 'http://new.com')"; + final String expected = + "ALTER CONNECTION `CONN1` SET (\n" + + " 'password' = 'new_password',\n" + + " 'url' = 'http://new.com'\n" + + ")"; + sql(sql).ok(expected); + } + + @Test + void testAlterConnectionSetWithQualifiedName() { + final String sql = "alter connection catalog1.db1.conn1 set ('token' = 'new_token')"; + final String expected = + "ALTER CONNECTION `CATALOG1`.`DB1`.`CONN1` SET (\n" + + " 'token' = 'new_token'\n" + + ")"; + sql(sql).ok(expected); + } + + @Test + void testAlterConnectionRename() { + final String sql = "alter connection conn1 rename to conn2"; + final String expected = "ALTER CONNECTION `CONN1` RENAME TO `CONN2`"; + sql(sql).ok(expected); + } + + @Test + void testAlterConnectionRenameWithQualifiedName() { + final String sql = "alter connection catalog1.db1.conn1 rename to conn2"; + final String expected = "ALTER CONNECTION `CATALOG1`.`DB1`.`CONN1` RENAME TO `CONN2`"; + sql(sql).ok(expected); + } + + @Test + void testAlterConnectionReset() { + final String sql = "alter connection conn1 reset ('password', 'url')"; + final String expected = "ALTER CONNECTION `CONN1` RESET (\n 'password',\n 'url'\n)"; + sql(sql).ok(expected); + } + + @Test + void testAlterConnectionResetWithQualifiedName() { + final String sql = "alter connection catalog1.db1.conn1 reset ('token')"; + final String expected = "ALTER CONNECTION `CATALOG1`.`DB1`.`CONN1` RESET (\n 'token'\n)"; + sql(sql).ok(expected); + } + + @Test + void testAlterConnectionIfExists() { + final String sql = + "alter connection if exists conn1 set ('password' = 'new_password','url' = 'http://new.com')"; + final String expected = + "ALTER CONNECTION IF EXISTS `CONN1` SET (\n" + + " 'password' = 'new_password',\n" + + " 'url' = 'http://new.com'\n" + + ")"; + sql(sql).ok(expected); + } + + @Test + void testAlterConnectionRenameIfExists() { + final String sql = "alter connection if exists conn1 rename to conn2"; + final String expected = "ALTER CONNECTION IF EXISTS `CONN1` RENAME TO `CONN2`"; + sql(sql).ok(expected); + } + + @Test + void testAlterConnectionResetIfExists() { + final String sql = "alter connection if exists conn1 reset ('password', 'url')"; + final String expected = + "ALTER CONNECTION IF EXISTS `CONN1` RESET (\n 'password',\n 'url'\n)"; + sql(sql).ok(expected); + } + + @Test + void testShowConnections() { + sql("show connections").ok("SHOW CONNECTIONS"); + sql("show connections from db1").ok("SHOW CONNECTIONS FROM `DB1`"); + sql("show connections from catalog1.db1").ok("SHOW CONNECTIONS FROM `CATALOG1`.`DB1`"); + sql("show connections in db1").ok("SHOW CONNECTIONS IN `DB1`"); + sql("show connections in catalog1.db1").ok("SHOW CONNECTIONS IN `CATALOG1`.`DB1`"); + } + + @Test + void testShowConnectionsLike() { + sql("show connections like '%conn%'").ok("SHOW CONNECTIONS LIKE '%CONN%'"); + sql("show connections from db1 like 'my_%'").ok("SHOW CONNECTIONS FROM `DB1` LIKE 'MY_%'"); + sql("show connections not like 'temp_%'").ok("SHOW CONNECTIONS NOT LIKE 'TEMP_%'"); + } + + @Test + void testShowCreateConnection() { + sql("show create connection conn1").ok("SHOW CREATE CONNECTION `CONN1`"); + sql("show create connection catalog1.db1.conn1") + .ok("SHOW CREATE CONNECTION `CATALOG1`.`DB1`.`CONN1`"); + } + + @Test + void testDescribeConnection() { + sql("describe connection conn1").ok("DESCRIBE CONNECTION `CONN1`"); + sql("describe connection catalog1.db1.conn1") + .ok("DESCRIBE CONNECTION `CATALOG1`.`DB1`.`CONN1`"); + sql("describe connection extended conn1").ok("DESCRIBE CONNECTION EXTENDED `CONN1`"); + + sql("desc connection conn1").ok("DESCRIBE CONNECTION `CONN1`"); + sql("desc connection catalog1.db1.conn1") + .ok("DESCRIBE CONNECTION `CATALOG1`.`DB1`.`CONN1`"); + sql("desc connection extended catalog1.db1.conn1") + .ok("DESCRIBE CONNECTION EXTENDED `CATALOG1`.`DB1`.`CONN1`"); + } + /* * This test was backported from Calcite 1.38 (CALCITE-6266). * Remove it together with upgrade to Calcite 1.38. diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlDropModelConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlDropModelConverter.java index 90a45c5a4254c..4e0d85c6dbce7 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlDropModelConverter.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlDropModelConverter.java @@ -35,6 +35,6 @@ public Operation convertSqlNode(SqlDropModel sqlDropModel, ConvertContext contex context.getCatalogManager().qualifyIdentifier(unresolvedIdentifier); return new DropModelOperation( - identifier, sqlDropModel.getIfExists(), sqlDropModel.getIsTemporary()); + identifier, sqlDropModel.getIfExists(), sqlDropModel.isTemporary()); } } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java index b9992b43dd883..ff25ff6b0e068 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java @@ -24,6 +24,7 @@ import org.apache.flink.table.planner.calcite.FlinkPlannerImpl; import org.apache.calcite.sql.SqlDialect; +import org.apache.calcite.sql.SqlIdentifier; import org.apache.calcite.sql.SqlNode; import org.apache.calcite.sql.SqlNodeList; import org.apache.calcite.sql.SqlNumericLiteral; @@ -52,7 +53,8 @@ public static TableDistribution getDistributionFromSqlDistribution( } SqlNodeList columns = distribution.getBucketColumns(); - List bucketColumns = SqlParseUtils.extractList(columns); + List bucketColumns = + SqlParseUtils.extractList(columns, p -> ((SqlIdentifier) p).getSimple()); return TableDistribution.of(kind, bucketCount, bucketColumns); }