Skip to content

Commit 1df17fb

Browse files
snuyanzinSamrat002
authored andcommitted
[FLINK-38766][table] Add support for ALTER MATERIALIZED TABLE ADD support
This closes #27302.
1 parent 9151088 commit 1df17fb

File tree

33 files changed

+1091
-1273
lines changed

33 files changed

+1091
-1273
lines changed

flink-table/flink-sql-client/src/test/resources/sql/table.q

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -536,7 +536,7 @@ WITH (
536536
alter table orders2 add product string first;
537537
[ERROR] Could not execute SQL statement. Reason:
538538
org.apache.flink.table.api.ValidationException: Failed to execute ALTER TABLE statement.
539-
Try to add a column `product` which already exists in the table.
539+
Column `product` already exists in the table.
540540
!error
541541
542542
# test alter table schema add a column after a nonexistent column

flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java

Lines changed: 31 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -32,14 +32,15 @@
3232
import org.apache.flink.table.api.config.TableConfigOptions;
3333
import org.apache.flink.table.api.internal.TableEnvironmentInternal;
3434
import org.apache.flink.table.catalog.CatalogMaterializedTable;
35+
import org.apache.flink.table.catalog.CatalogMaterializedTable.RefreshMode;
36+
import org.apache.flink.table.catalog.CatalogMaterializedTable.RefreshStatus;
3537
import org.apache.flink.table.catalog.Column;
3638
import org.apache.flink.table.catalog.IntervalFreshness;
3739
import org.apache.flink.table.catalog.ObjectIdentifier;
3840
import org.apache.flink.table.catalog.ResolvedCatalogBaseTable;
3941
import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
4042
import org.apache.flink.table.catalog.ResolvedSchema;
4143
import org.apache.flink.table.catalog.TableChange;
42-
import org.apache.flink.table.catalog.TableChange.MaterializedTableChange;
4344
import org.apache.flink.table.data.GenericMapData;
4445
import org.apache.flink.table.data.GenericRowData;
4546
import org.apache.flink.table.data.RowData;
@@ -198,7 +199,7 @@ private ResultFetcher callCreateMaterializedTableOperation(
198199
CreateMaterializedTableOperation createMaterializedTableOperation) {
199200
ResolvedCatalogMaterializedTable materializedTable =
200201
createMaterializedTableOperation.getCatalogMaterializedTable();
201-
if (CatalogMaterializedTable.RefreshMode.CONTINUOUS == materializedTable.getRefreshMode()) {
202+
if (RefreshMode.CONTINUOUS == materializedTable.getRefreshMode()) {
202203
createMaterializedTableInContinuousMode(
203204
operationExecutor, handle, createMaterializedTableOperation);
204205
} else {
@@ -283,7 +284,7 @@ private void createMaterializedTableInFullMode(
283284
handle,
284285
materializedTableIdentifier,
285286
catalogMaterializedTable,
286-
CatalogMaterializedTable.RefreshStatus.ACTIVATED,
287+
RefreshStatus.ACTIVATED,
287288
refreshHandler.asSummaryString(),
288289
serializedRefreshHandler);
289290
} catch (Exception e) {
@@ -308,15 +309,14 @@ private ResultFetcher callAlterMaterializedTableSuspend(
308309
getCatalogMaterializedTable(operationExecutor, tableIdentifier);
309310

310311
// Initialization phase doesn't support resume operation.
311-
if (CatalogMaterializedTable.RefreshStatus.INITIALIZING
312-
== materializedTable.getRefreshStatus()) {
312+
if (RefreshStatus.INITIALIZING == materializedTable.getRefreshStatus()) {
313313
throw new SqlExecutionException(
314314
String.format(
315315
"Materialized table %s is being initialized and does not support suspend operation.",
316316
tableIdentifier));
317317
}
318318

319-
if (CatalogMaterializedTable.RefreshMode.CONTINUOUS == materializedTable.getRefreshMode()) {
319+
if (RefreshMode.CONTINUOUS == materializedTable.getRefreshMode()) {
320320
suspendContinuousRefreshJob(
321321
operationExecutor, handle, tableIdentifier, materializedTable);
322322
} else {
@@ -334,8 +334,7 @@ private CatalogMaterializedTable suspendContinuousRefreshJob(
334334
ContinuousRefreshHandler refreshHandler =
335335
deserializeContinuousHandler(materializedTable.getSerializedRefreshHandler());
336336

337-
if (CatalogMaterializedTable.RefreshStatus.SUSPENDED
338-
== materializedTable.getRefreshStatus()) {
337+
if (RefreshStatus.SUSPENDED == materializedTable.getRefreshStatus()) {
339338
throw new SqlExecutionException(
340339
String.format(
341340
"Materialized table %s continuous refresh job has been suspended, jobId is %s.",
@@ -356,7 +355,7 @@ private CatalogMaterializedTable suspendContinuousRefreshJob(
356355
handle,
357356
tableIdentifier,
358357
materializedTable,
359-
CatalogMaterializedTable.RefreshStatus.SUSPENDED,
358+
RefreshStatus.SUSPENDED,
360359
updateRefreshHandler.asSummaryString(),
361360
serializeContinuousHandler(updateRefreshHandler));
362361
} catch (Exception e) {
@@ -373,8 +372,7 @@ private void suspendRefreshWorkflow(
373372
OperationHandle handle,
374373
ObjectIdentifier tableIdentifier,
375374
CatalogMaterializedTable materializedTable) {
376-
if (CatalogMaterializedTable.RefreshStatus.SUSPENDED
377-
== materializedTable.getRefreshStatus()) {
375+
if (RefreshStatus.SUSPENDED == materializedTable.getRefreshStatus()) {
378376
throw new SqlExecutionException(
379377
String.format(
380378
"Materialized table %s refresh workflow has been suspended.",
@@ -401,7 +399,7 @@ private void suspendRefreshWorkflow(
401399
handle,
402400
tableIdentifier,
403401
materializedTable,
404-
CatalogMaterializedTable.RefreshStatus.SUSPENDED,
402+
RefreshStatus.SUSPENDED,
405403
refreshHandler.asSummaryString(),
406404
materializedTable.getSerializedRefreshHandler());
407405
} catch (Exception e) {
@@ -422,16 +420,14 @@ private ResultFetcher callAlterMaterializedTableResume(
422420
getCatalogMaterializedTable(operationExecutor, tableIdentifier);
423421

424422
// Initialization phase doesn't support resume operation.
425-
if (CatalogMaterializedTable.RefreshStatus.INITIALIZING
426-
== catalogMaterializedTable.getRefreshStatus()) {
423+
if (RefreshStatus.INITIALIZING == catalogMaterializedTable.getRefreshStatus()) {
427424
throw new SqlExecutionException(
428425
String.format(
429426
"Materialized table %s is being initialized and does not support resume operation.",
430427
tableIdentifier));
431428
}
432429

433-
if (CatalogMaterializedTable.RefreshMode.CONTINUOUS
434-
== catalogMaterializedTable.getRefreshMode()) {
430+
if (RefreshMode.CONTINUOUS == catalogMaterializedTable.getRefreshMode()) {
435431
resumeContinuousRefreshJob(
436432
operationExecutor,
437433
handle,
@@ -461,8 +457,7 @@ private void resumeContinuousRefreshJob(
461457
catalogMaterializedTable.getSerializedRefreshHandler());
462458

463459
// Repeated resume continuous refresh job is not supported
464-
if (CatalogMaterializedTable.RefreshStatus.ACTIVATED
465-
== catalogMaterializedTable.getRefreshStatus()) {
460+
if (RefreshStatus.ACTIVATED == catalogMaterializedTable.getRefreshStatus()) {
466461
JobStatus jobStatus = getJobStatus(operationExecutor, handle, refreshHandler);
467462
if (!jobStatus.isGloballyTerminalState()) {
468463
throw new SqlExecutionException(
@@ -497,8 +492,7 @@ private void resumeRefreshWorkflow(
497492
CatalogMaterializedTable catalogMaterializedTable,
498493
Map<String, String> dynamicOptions) {
499494
// Repeated resume refresh workflow is not supported
500-
if (CatalogMaterializedTable.RefreshStatus.ACTIVATED
501-
== catalogMaterializedTable.getRefreshStatus()) {
495+
if (RefreshStatus.ACTIVATED == catalogMaterializedTable.getRefreshStatus()) {
502496
throw new SqlExecutionException(
503497
String.format(
504498
"Materialized table %s refresh workflow has been resumed.",
@@ -525,7 +519,7 @@ private void resumeRefreshWorkflow(
525519
handle,
526520
tableIdentifier,
527521
catalogMaterializedTable,
528-
CatalogMaterializedTable.RefreshStatus.ACTIVATED,
522+
RefreshStatus.ACTIVATED,
529523
refreshHandler.asSummaryString(),
530524
catalogMaterializedTable.getSerializedRefreshHandler());
531525
} catch (Exception e) {
@@ -584,7 +578,7 @@ private void executeContinuousRefreshJob(
584578
handle,
585579
materializedTableIdentifier,
586580
catalogMaterializedTable,
587-
CatalogMaterializedTable.RefreshStatus.ACTIVATED,
581+
RefreshStatus.ACTIVATED,
588582
continuousRefreshHandler.asSummaryString(),
589583
serializedBytes);
590584
}
@@ -820,7 +814,7 @@ private ResultFetcher callAlterMaterializedTableChangeOperation(
820814
ResolvedCatalogMaterializedTable oldMaterializedTable =
821815
getCatalogMaterializedTable(operationExecutor, tableIdentifier);
822816

823-
if (CatalogMaterializedTable.RefreshMode.FULL == oldMaterializedTable.getRefreshMode()) {
817+
if (RefreshMode.FULL == oldMaterializedTable.getRefreshMode()) {
824818
// directly apply the alter operation
825819
AlterMaterializedTableChangeOperation alterMaterializedTableChangeOperation =
826820
new AlterMaterializedTableChangeOperation(
@@ -831,8 +825,7 @@ private ResultFetcher callAlterMaterializedTableChangeOperation(
831825
handle, alterMaterializedTableChangeOperation);
832826
}
833827

834-
if (CatalogMaterializedTable.RefreshStatus.ACTIVATED
835-
== oldMaterializedTable.getRefreshStatus()) {
828+
if (RefreshStatus.ACTIVATED == oldMaterializedTable.getRefreshStatus()) {
836829
// 1. suspend the materialized table
837830
CatalogMaterializedTable suspendMaterializedTable =
838831
suspendContinuousRefreshJob(
@@ -895,11 +888,10 @@ private ResultFetcher callAlterMaterializedTableChangeOperation(
895888
tableIdentifier),
896889
e);
897890
}
898-
} else if (CatalogMaterializedTable.RefreshStatus.SUSPENDED
899-
== oldMaterializedTable.getRefreshStatus()) {
891+
} else if (RefreshStatus.SUSPENDED == oldMaterializedTable.getRefreshStatus()) {
900892
// alter schema & definition query & refresh handler (reset savepoint path of refresh
901893
// handler)
902-
List<MaterializedTableChange> tableChanges = new ArrayList<>(op.getTableChanges());
894+
List<TableChange> tableChanges = new ArrayList<>(op.getTableChanges());
903895
TableChange.ModifyRefreshHandler modifyRefreshHandler =
904896
generateResetSavepointTableChange(
905897
oldMaterializedTable.getSerializedRefreshHandler());
@@ -930,8 +922,8 @@ private ResultFetcher callAlterMaterializedTableChangeOperation(
930922
private AlterMaterializedTableChangeOperation generateRollbackAlterMaterializedTableOperation(
931923
CatalogMaterializedTable oldMaterializedTable,
932924
AlterMaterializedTableChangeOperation op) {
933-
List<MaterializedTableChange> tableChanges = op.getTableChanges();
934-
List<MaterializedTableChange> rollbackChanges = new ArrayList<>();
925+
List<TableChange> tableChanges = op.getTableChanges();
926+
List<TableChange> rollbackChanges = new ArrayList<>();
935927

936928
for (TableChange tableChange : tableChanges) {
937929
if (tableChange instanceof TableChange.AddColumn) {
@@ -995,18 +987,17 @@ private ResultFetcher callDropMaterializedTableOperation(
995987

996988
ResolvedCatalogMaterializedTable materializedTable =
997989
getCatalogMaterializedTable(operationExecutor, tableIdentifier);
998-
CatalogMaterializedTable.RefreshStatus refreshStatus = materializedTable.getRefreshStatus();
999-
if (CatalogMaterializedTable.RefreshStatus.ACTIVATED == refreshStatus
1000-
|| CatalogMaterializedTable.RefreshStatus.SUSPENDED == refreshStatus) {
1001-
CatalogMaterializedTable.RefreshMode refreshMode = materializedTable.getRefreshMode();
1002-
if (CatalogMaterializedTable.RefreshMode.FULL == refreshMode) {
990+
RefreshStatus refreshStatus = materializedTable.getRefreshStatus();
991+
if (RefreshStatus.ACTIVATED == refreshStatus || RefreshStatus.SUSPENDED == refreshStatus) {
992+
RefreshMode refreshMode = materializedTable.getRefreshMode();
993+
if (RefreshMode.FULL == refreshMode) {
1003994
deleteRefreshWorkflow(tableIdentifier, materializedTable);
1004-
} else if (CatalogMaterializedTable.RefreshMode.CONTINUOUS == refreshMode
1005-
&& CatalogMaterializedTable.RefreshStatus.ACTIVATED == refreshStatus) {
995+
} else if (RefreshMode.CONTINUOUS == refreshMode
996+
&& RefreshStatus.ACTIVATED == refreshStatus) {
1006997
cancelContinuousRefreshJob(
1007998
operationExecutor, handle, tableIdentifier, materializedTable);
1008999
}
1009-
} else if (CatalogMaterializedTable.RefreshStatus.INITIALIZING == refreshStatus) {
1000+
} else if (RefreshStatus.INITIALIZING == refreshStatus) {
10101001
throw new ValidationException(
10111002
String.format(
10121003
"Current refresh status of materialized table %s is initializing, skip the drop operation.",
@@ -1211,13 +1202,13 @@ private CatalogMaterializedTable updateRefreshHandler(
12111202
OperationHandle operationHandle,
12121203
ObjectIdentifier materializedTableIdentifier,
12131204
CatalogMaterializedTable catalogMaterializedTable,
1214-
CatalogMaterializedTable.RefreshStatus refreshStatus,
1205+
RefreshStatus refreshStatus,
12151206
String refreshHandlerSummary,
12161207
byte[] serializedRefreshHandler) {
12171208
CatalogMaterializedTable updatedMaterializedTable =
12181209
catalogMaterializedTable.copy(
12191210
refreshStatus, refreshHandlerSummary, serializedRefreshHandler);
1220-
List<MaterializedTableChange> tableChanges = new ArrayList<>();
1211+
List<TableChange> tableChanges = new ArrayList<>();
12211212
tableChanges.add(TableChange.modifyRefreshStatus(refreshStatus));
12221213
tableChanges.add(
12231214
TableChange.modifyRefreshHandler(refreshHandlerSummary, serializedRefreshHandler));

0 commit comments

Comments
 (0)