Skip to content

Commit c22dcad

Browse files
authored
CDAP-20488: Cache CDAP schema objects to improve performance (#234)
* CDAP-20488: Cache schema objects to improve performance * Make schema mapping cache non static
1 parent e8535b3 commit c22dcad

File tree

5 files changed

+87
-17
lines changed

5 files changed

+87
-17
lines changed

delta-plugins-common/src/main/java/io/cdap/delta/plugin/common/Records.java

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@
3535
import org.apache.kafka.connect.data.Field;
3636
import org.apache.kafka.connect.data.SchemaBuilder;
3737
import org.apache.kafka.connect.data.Struct;
38+
import org.slf4j.Logger;
39+
import org.slf4j.LoggerFactory;
3840

3941
import java.math.BigDecimal;
4042
import java.time.Instant;
@@ -55,6 +57,8 @@
5557
* Utilities for converting Records and Schemas.
5658
*/
5759
public class Records {
60+
private static final Logger LOG = LoggerFactory.getLogger(Records.class);
61+
5862
private static final String PRECISION_NAME = "connect.decimal.precision";
5963
private static final String SCALE_NAME = "scale";
6064

@@ -149,10 +153,20 @@ public static StructuredRecord keepSelectedColumns(StructuredRecord record, Set<
149153
* @param struct
150154
* @return
151155
*/
152-
public static StructuredRecord convert(Struct struct) {
153-
Schema schema = convert(struct.schema());
154-
schema = schema.isNullable() ? schema.getNonNullable() : schema;
156+
public static StructuredRecord convert(Struct struct, SchemaMappingCache schemaMappingCache) {
157+
org.apache.kafka.connect.data.Schema schema = struct.schema();
158+
Schema mappedSchema = schemaMappingCache.get(schema);
159+
if (mappedSchema == null) {
160+
LOG.info("Creating CDAP schema from source DB schema");
161+
mappedSchema = convert(struct.schema());
162+
mappedSchema = mappedSchema.isNullable() ? mappedSchema.getNonNullable() : mappedSchema;
163+
schemaMappingCache.put(schema, mappedSchema);
164+
}
165+
return getStructuredRecord(struct, mappedSchema, schemaMappingCache);
166+
}
155167

168+
private static StructuredRecord getStructuredRecord(Struct struct, Schema schema,
169+
SchemaMappingCache schemaMappingCache) {
156170
StructuredRecord.Builder builder = StructuredRecord.builder(schema);
157171
if (schema.getFields() == null) {
158172
return builder.build();
@@ -162,7 +176,7 @@ public static StructuredRecord convert(Struct struct) {
162176
String fieldName = field.getName();
163177
Field debeziumField = struct.schema().field(fieldName);
164178
String debeziumSchemaName = debeziumField.schema().name();
165-
Object val = convert(debeziumField.schema(), struct.get(fieldName));
179+
Object val = convert(debeziumField.schema(), struct.get(fieldName), schemaMappingCache);
166180
Schema fieldSchema = field.getSchema();
167181
fieldSchema = fieldSchema.isNullable() ? fieldSchema.getNonNullable() : fieldSchema;
168182
Schema.LogicalType logicalType = fieldSchema.getLogicalType();
@@ -257,7 +271,8 @@ private static ZonedDateTime getZonedDateTime(long epochSecond, long fraction, T
257271
}
258272

259273

260-
private static Object convert(org.apache.kafka.connect.data.Schema schema, Object val) {
274+
private static Object convert(org.apache.kafka.connect.data.Schema schema, Object val,
275+
SchemaMappingCache schemaMappingCache) {
261276
if (val == null) {
262277
return null;
263278
}
@@ -275,15 +290,15 @@ private static Object convert(org.apache.kafka.connect.data.Schema schema, Objec
275290
return ((Short) val).intValue();
276291
case ARRAY:
277292
return ((List<?>) val).stream()
278-
.map(o -> convert(schema.valueSchema(), o))
293+
.map(o -> convert(schema.valueSchema(), o, schemaMappingCache))
279294
.collect(Collectors.toList());
280295
case MAP:
281296
return ((Map<?, ?>) val).entrySet().stream()
282297
.collect(Collectors.toMap(
283-
mapKey -> convert(schema.keySchema(), mapKey),
284-
mapVal -> convert(schema.valueSchema(), mapVal)));
298+
mapKey -> convert(schema.keySchema(), mapKey, schemaMappingCache),
299+
mapVal -> convert(schema.valueSchema(), mapVal, schemaMappingCache)));
285300
case STRUCT:
286-
return convert((Struct) val);
301+
return convert((Struct) val, schemaMappingCache);
287302
}
288303
// should never happen, all values are listed above
289304
throw new IllegalStateException(String.format("Kafka type '%s' is not supported.", schema.type()));
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Copyright © 2023 Cask Data, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
17+
package io.cdap.delta.plugin.common;
18+
19+
import com.google.common.collect.MapMaker;
20+
import io.cdap.cdap.api.data.schema.Schema;
21+
22+
import java.util.Map;
23+
import javax.annotation.concurrent.ThreadSafe;
24+
25+
/**
26+
* Cache which stores mapping of Debezium schema to CDAP schema
27+
* Uses reference equality (==) for keys for performance reasons as Schema objects are immutable
28+
* Weak references are used for keys to ensure entries are cleaned up by GC when no longer used by the program
29+
*/
30+
@ThreadSafe
31+
public class SchemaMappingCache {
32+
33+
private final Map<org.apache.kafka.connect.data.Schema, Schema> cache = new MapMaker()
34+
.weakKeys().makeMap();
35+
36+
37+
public void reset() {
38+
cache.clear();
39+
}
40+
41+
public void put(org.apache.kafka.connect.data.Schema key, Schema mappedSchema) {
42+
cache.put(key, mappedSchema);
43+
}
44+
45+
public Schema get(org.apache.kafka.connect.data.Schema key) {
46+
return cache.get(key);
47+
}
48+
}

delta-plugins-common/src/test/java/io/cdap/delta/plugin/common/RecordsTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ public void testConvertTinyInt() {
4444
Struct struct = new Struct(dataSchema);
4545
Short val = 1;
4646
struct.put(fieldName, val);
47-
StructuredRecord convert = Records.convert(struct);
47+
StructuredRecord convert = Records.convert(struct, new SchemaMappingCache());
4848
io.cdap.cdap.api.data.schema.Schema.Field priority = convert.getSchema().getField(fieldName);
4949
Assert.assertNotNull(priority);
5050
Assert.assertEquals(priority.getSchema(), io.cdap.cdap.api.data.schema.Schema.of(
@@ -61,7 +61,7 @@ public void testConvertTimeStamp() {
6161
Struct struct = new Struct(dataSchema);
6262
String val = "2011-12-03T10:15:30.030431+01:00";
6363
struct.put(fieldName, val);
64-
StructuredRecord converted = Records.convert(struct);
64+
StructuredRecord converted = Records.convert(struct, new SchemaMappingCache());
6565
Assert.assertEquals(Schema.of(Schema.LogicalType.TIMESTAMP_MICROS),
6666
converted.getSchema().getField(fieldName).getSchema());
6767
Assert.assertEquals(converted.getTimestamp(fieldName),

mysql-delta-plugins/src/main/java/io/cdap/delta/mysql/MySqlRecordConsumer.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import io.cdap.delta.api.Offset;
2727
import io.cdap.delta.api.SourceTable;
2828
import io.cdap.delta.plugin.common.Records;
29+
import io.cdap.delta.plugin.common.SchemaMappingCache;
2930
import io.debezium.connector.mysql.MySqlValueConverters;
3031
import io.debezium.embedded.StopConnectorException;
3132
import io.debezium.relational.Table;
@@ -49,6 +50,7 @@
4950
*/
5051
public class MySqlRecordConsumer implements Consumer<SourceRecord> {
5152
private static final Logger LOG = LoggerFactory.getLogger(MySqlRecordConsumer.class);
53+
private static final String TRX_ID_SEP = ":";
5254

5355
private final DeltaSourceContext context;
5456
private final EventEmitter emitter;
@@ -57,6 +59,7 @@ public class MySqlRecordConsumer implements Consumer<SourceRecord> {
5759
private final Tables tables;
5860
private final Map<String, SourceTable> sourceTableMap;
5961
private final boolean replicateExistingData;
62+
private final SchemaMappingCache schemaMappingCache;
6063

6164
public MySqlRecordConsumer(DeltaSourceContext context, EventEmitter emitter,
6265
DdlParser ddlParser, MySqlValueConverters mySqlValueConverters,
@@ -68,6 +71,7 @@ public MySqlRecordConsumer(DeltaSourceContext context, EventEmitter emitter,
6871
this.tables = tables;
6972
this.sourceTableMap = sourceTableMap;
7073
this.replicateExistingData = replicateExistingData;
74+
this.schemaMappingCache = new SchemaMappingCache();
7175
}
7276

7377
@Override
@@ -123,7 +127,7 @@ public void accept(SourceRecord sourceRecord) {
123127
Map<String, String> deltaOffset = generateCdapOffsets(sourceRecord);
124128
Offset recordOffset = new Offset(deltaOffset);
125129

126-
StructuredRecord val = Records.convert((Struct) sourceRecord.value());
130+
StructuredRecord val = Records.convert((Struct) sourceRecord.value(), schemaMappingCache);
127131
String ddl = val.get("ddl");
128132
StructuredRecord source = val.get("source");
129133
if (source == null) {
@@ -184,9 +188,8 @@ private void handleDML(StructuredRecord source, StructuredRecord val, Offset rec
184188
String transactionId = source.get("gtid");
185189
if (transactionId == null) {
186190
// this is not really a transaction id, but we don't get an event when a transaction started/ended
187-
transactionId = String.format("%s:%d",
188-
source.get(MySqlConstantOffsetBackingStore.FILE),
189-
source.get(MySqlConstantOffsetBackingStore.POS));
191+
transactionId = source.get(MySqlConstantOffsetBackingStore.FILE) + TRX_ID_SEP +
192+
source.get(MySqlConstantOffsetBackingStore.POS);
190193
}
191194

192195
StructuredRecord before = val.get("before");
@@ -240,6 +243,7 @@ private void handleDDL(String ddlStatement, Offset recordOffset,
240243
// CREATE_TABLE and TRUNCATE_TABLE.
241244
switch (event.type()) {
242245
case ALTER_TABLE:
246+
schemaMappingCache.reset();
243247
DdlParserListener.TableAlteredEvent alteredEvent = (DdlParserListener.TableAlteredEvent) event;
244248
TableId tableId = alteredEvent.tableId();
245249
Table table = tables.forTable(tableId);

sqlserver-delta-plugins/src/main/java/io/cdap/delta/sqlserver/SqlServerRecordConsumer.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import io.cdap.delta.api.Offset;
2929
import io.cdap.delta.api.SourceTable;
3030
import io.cdap.delta.plugin.common.Records;
31+
import io.cdap.delta.plugin.common.SchemaMappingCache;
3132
import io.debezium.embedded.StopConnectorException;
3233
import org.apache.kafka.connect.data.Struct;
3334
import org.apache.kafka.connect.source.SourceRecord;
@@ -57,6 +58,7 @@ public class SqlServerRecordConsumer implements Consumer<SourceRecord> {
5758
private final Map<String, SourceTable> sourceTableMap;
5859
private final boolean replicateExistingData;
5960
private final Offset latestOffset;
61+
private final SchemaMappingCache schemaMappingCache;
6062

6163

6264
SqlServerRecordConsumer(DeltaSourceContext context, EventEmitter emitter, String databaseName,
@@ -69,6 +71,7 @@ public class SqlServerRecordConsumer implements Consumer<SourceRecord> {
6971
this.sourceTableMap = sourceTableMap;
7072
this.latestOffset = latestOffset;
7173
this.replicateExistingData = replicateExistingData;
74+
this.schemaMappingCache = new SchemaMappingCache();
7275
}
7376

7477
@Override
@@ -93,7 +96,7 @@ public void accept(SourceRecord sourceRecord) {
9396
return;
9497
}
9598

96-
StructuredRecord val = Records.convert((Struct) sourceRecord.value());
99+
StructuredRecord val = Records.convert((Struct) sourceRecord.value(), schemaMappingCache);
97100
DMLOperation.Type op;
98101
String opStr = val.get("op");
99102
if ("c".equals(opStr) || "r".equals(opStr)) {
@@ -158,7 +161,7 @@ public void accept(SourceRecord sourceRecord) {
158161
.setSnapshot(ddlRecordOffset.isSnapshot())
159162
.setOffset(ddlRecordOffset.getAsOffset());
160163

161-
StructuredRecord key = Records.convert((Struct) sourceRecord.key());
164+
StructuredRecord key = Records.convert((Struct) sourceRecord.key(), schemaMappingCache);
162165
List<Schema.Field> fields = key.getSchema().getFields();
163166
List<String> primaryFields = new ArrayList<>();
164167
if (fields != null && !fields.isEmpty()) {

0 commit comments

Comments
 (0)