Skip to content

Commit 2e82317

Browse files
committed
[FLINK-38218] Rely on stable order of assigned splits
1 parent 7d311d2 commit 2e82317

File tree

3 files changed

+19
-48
lines changed

3 files changed

+19
-48
lines changed

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlHybridSplitAssigner.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,10 @@
3333

3434
import java.util.ArrayList;
3535
import java.util.Collection;
36-
import java.util.Comparator;
3736
import java.util.HashMap;
3837
import java.util.List;
3938
import java.util.Map;
4039
import java.util.Optional;
41-
import java.util.stream.Collectors;
4240

4341
/**
4442
* A {@link MySqlSplitAssigner} that splits tables into small chunk splits based on primary key
@@ -208,9 +206,7 @@ public void close() {
208206

209207
private MySqlBinlogSplit createBinlogSplit() {
210208
final List<MySqlSchemalessSnapshotSplit> assignedSnapshotSplit =
211-
snapshotSplitAssigner.getAssignedSplits().values().stream()
212-
.sorted(Comparator.comparing(MySqlSplit::splitId))
213-
.collect(Collectors.toList());
209+
new ArrayList<>(snapshotSplitAssigner.getAssignedSplits().values());
214210

215211
Map<String, BinlogOffset> splitFinishedOffsets =
216212
snapshotSplitAssigner.getSplitFinishedOffsets();

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -169,17 +169,7 @@ private MySqlSnapshotSplitAssigner(
169169
this.currentParallelism = currentParallelism;
170170
this.alreadyProcessedTables = alreadyProcessedTables;
171171
this.remainingSplits = new CopyOnWriteArrayList<>(remainingSplits);
172-
// When job restore from savepoint, sort the existing tables and newly added tables
173-
// to let enumerator only send newly added tables' BinlogSplitMetaEvent
174-
this.assignedSplits =
175-
assignedSplits.entrySet().stream()
176-
.sorted(Entry.comparingByKey())
177-
.collect(
178-
Collectors.toMap(
179-
Entry::getKey,
180-
Entry::getValue,
181-
(o, o2) -> o,
182-
LinkedHashMap::new));
172+
this.assignedSplits = assignedSplits;
183173
this.tableSchemas = tableSchemas;
184174
this.splitFinishedOffsets = splitFinishedOffsets;
185175
this.assignerStatus = assignerStatus;

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReader.java

Lines changed: 17 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -61,10 +61,8 @@
6161
import java.util.ArrayList;
6262
import java.util.Collections;
6363
import java.util.HashMap;
64-
import java.util.HashSet;
6564
import java.util.List;
6665
import java.util.Map;
67-
import java.util.Set;
6866
import java.util.function.Supplier;
6967
import java.util.stream.Collectors;
7068

@@ -423,15 +421,24 @@ private void fillMetadataForBinlogSplit(BinlogSplitMetaEvent metadataEvent) {
423421
binlogSplit, receivedTotalFinishedSplitSize);
424422
uncompletedBinlogSplits.put(binlogSplit.splitId(), binlogSplit);
425423
} else if (receivedMetaGroupId == expectedMetaGroupId) {
426-
List<FinishedSnapshotSplitInfo> newAddedMetadataGroup;
427-
Set<String> existedSplitsOfLastGroup =
428-
getExistedSplitsOfLastGroup(
429-
binlogSplit.getFinishedSnapshotSplitInfos(),
430-
sourceConfig.getSplitMetaGroupSize());
431-
newAddedMetadataGroup =
432-
metadataEvent.getMetaGroup().stream()
424+
int expectedNumberOfAlreadyRetrievedElements =
425+
binlogSplit.getFinishedSnapshotSplitInfos().size()
426+
% sourceConfig.getSplitMetaGroupSize();
427+
List<byte[]> metaGroup = metadataEvent.getMetaGroup();
428+
if (expectedNumberOfAlreadyRetrievedElements > 0) {
429+
LOG.info(
430+
"Source reader {} is discarding the first {} out of {} elements of meta group {}.",
431+
subtaskId,
432+
expectedNumberOfAlreadyRetrievedElements,
433+
metaGroup.size(),
434+
receivedMetaGroupId);
435+
metaGroup =
436+
metaGroup.subList(
437+
expectedNumberOfAlreadyRetrievedElements, metaGroup.size());
438+
}
439+
List<FinishedSnapshotSplitInfo> newAddedMetadataGroup =
440+
metaGroup.stream()
433441
.map(FinishedSnapshotSplitInfo::deserialize)
434-
.filter(r -> !existedSplitsOfLastGroup.contains(r.getSplitId()))
435442
.collect(Collectors.toList());
436443

437444
uncompletedBinlogSplits.put(
@@ -501,28 +508,6 @@ private MySqlBinlogSplit discoverTableSchemasForBinlogSplit(
501508
}
502509
}
503510

504-
private Set<String> getExistedSplitsOfLastGroup(
505-
List<FinishedSnapshotSplitInfo> finishedSnapshotSplits, int metaGroupSize) {
506-
int splitsNumOfLastGroup =
507-
finishedSnapshotSplits.size() % sourceConfig.getSplitMetaGroupSize();
508-
if (splitsNumOfLastGroup != 0) {
509-
int lastGroupStart =
510-
((int) (finishedSnapshotSplits.size() / sourceConfig.getSplitMetaGroupSize()))
511-
* metaGroupSize;
512-
// Keep same order with MySqlHybridSplitAssigner.createBinlogSplit() to avoid
513-
// 'invalid request meta group id' error
514-
List<String> sortedFinishedSnapshotSplits =
515-
finishedSnapshotSplits.stream()
516-
.map(FinishedSnapshotSplitInfo::getSplitId)
517-
.sorted()
518-
.collect(Collectors.toList());
519-
return new HashSet<>(
520-
sortedFinishedSnapshotSplits.subList(
521-
lastGroupStart, lastGroupStart + splitsNumOfLastGroup));
522-
}
523-
return new HashSet<>();
524-
}
525-
526511
private void logCurrentBinlogOffsets(List<MySqlSplit> splits, long checkpointId) {
527512
if (!LOG.isInfoEnabled()) {
528513
return;

0 commit comments

Comments
 (0)