From 037b9ed60be2ab32eadbbbe9c5b3ec186223bb9a Mon Sep 17 00:00:00 2001 From: HTHou Date: Thu, 21 Aug 2025 22:46:41 +0800 Subject: [PATCH 1/7] dev --- .../rescon/memory/PrimitiveArrayManager.java | 3 ++ .../db/utils/datastructure/AlignedTVList.java | 53 +++++++++---------- 2 files changed, 29 insertions(+), 27 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/PrimitiveArrayManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/PrimitiveArrayManager.java index e8a2a9bb8374..9b66fdf51601 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/PrimitiveArrayManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/PrimitiveArrayManager.java @@ -265,6 +265,9 @@ private static Object createPrimitiveArray(TSDataType dataType) { */ public static void release(Object array) { int order; + if (array == null) { + return; + } if (array instanceof boolean[]) { order = TSDataType.BOOLEAN.serialize(); } else if (array instanceof int[]) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java index 80649eb182d1..a396c84f1b20 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java @@ -357,32 +357,7 @@ public void extendColumn(TSDataType dataType) { List columnValue = new ArrayList<>(timestamps.size()); List columnBitMaps = new ArrayList<>(timestamps.size()); for (int i = 0; i < timestamps.size(); i++) { - switch (dataType) { - case TEXT: - case STRING: - case BLOB: - columnValue.add(getPrimitiveArraysByType(TSDataType.TEXT)); - break; - case FLOAT: - columnValue.add(getPrimitiveArraysByType(TSDataType.FLOAT)); - break; - case INT32: - case DATE: - columnValue.add(getPrimitiveArraysByType(TSDataType.INT32)); - break; - case INT64: - case TIMESTAMP: - columnValue.add(getPrimitiveArraysByType(TSDataType.INT64)); - break; - case DOUBLE: - columnValue.add(getPrimitiveArraysByType(TSDataType.DOUBLE)); - break; - case BOOLEAN: - columnValue.add(getPrimitiveArraysByType(TSDataType.BOOLEAN)); - break; - default: - break; - } + columnValue.add(null); BitMap bitMap = new BitMap(ARRAY_SIZE); // The following code is for these 2 kinds of scenarios. @@ -708,7 +683,7 @@ protected void expandValues() { indices.add((int[]) getPrimitiveArraysByType(TSDataType.INT32)); } for (int i = 0; i < dataTypes.size(); i++) { - values.get(i).add(getPrimitiveArraysByType(dataTypes.get(i))); + values.get(i).add(null); if (bitMaps != null && bitMaps.get(i) != null) { bitMaps.get(i).add(null); } @@ -835,6 +810,10 @@ private void arrayCopy(Object[] value, int idx, int arrayIndex, int elementIndex case BLOB: case STRING: Binary[] arrayT = ((Binary[]) columnValues.get(arrayIndex)); + if (arrayT == null) { + arrayT = (Binary[]) getPrimitiveArraysByType(TSDataType.TEXT); + columnValues.set(arrayIndex, arrayT); + } System.arraycopy(value[i], idx, arrayT, elementIndex, remaining); // update raw size of Text chunk @@ -845,24 +824,44 @@ private void arrayCopy(Object[] value, int idx, int arrayIndex, int elementIndex break; case FLOAT: float[] arrayF = ((float[]) columnValues.get(arrayIndex)); + if (arrayF == null) { + arrayF = (float[]) getPrimitiveArraysByType(TSDataType.FLOAT); + columnValues.set(arrayIndex, arrayF); + } System.arraycopy(value[i], idx, arrayF, elementIndex, remaining); break; case INT32: case DATE: int[] arrayI = ((int[]) columnValues.get(arrayIndex)); + if (arrayI == null) { + arrayI = (int[]) getPrimitiveArraysByType(TSDataType.INT32); + columnValues.set(arrayIndex, arrayI); + } System.arraycopy(value[i], idx, arrayI, elementIndex, remaining); break; case INT64: case TIMESTAMP: long[] arrayL = ((long[]) columnValues.get(arrayIndex)); + if (arrayL == null) { + arrayL = (long[]) getPrimitiveArraysByType(TSDataType.INT64); + columnValues.set(arrayIndex, arrayL); + } System.arraycopy(value[i], idx, arrayL, elementIndex, remaining); break; case DOUBLE: double[] arrayD = ((double[]) columnValues.get(arrayIndex)); + if (arrayD == null) { + arrayD = (double[]) getPrimitiveArraysByType(TSDataType.DOUBLE); + columnValues.set(arrayIndex, arrayD); + } System.arraycopy(value[i], idx, arrayD, elementIndex, remaining); break; case BOOLEAN: boolean[] arrayB = ((boolean[]) columnValues.get(arrayIndex)); + if (arrayB == null) { + arrayB = (boolean[]) getPrimitiveArraysByType(TSDataType.BOOLEAN); + columnValues.set(arrayIndex, arrayB); + } System.arraycopy(value[i], idx, arrayB, elementIndex, remaining); break; default: From a2cc0adeb17479f6cba7bf4b890a68698769cb70 Mon Sep 17 00:00:00 2001 From: HTHou Date: Thu, 21 Aug 2025 23:33:01 +0800 Subject: [PATCH 2/7] dev more --- .../rescon/memory/PrimitiveArrayManager.java | 3 - .../db/utils/datastructure/AlignedTVList.java | 115 +++++++++++------- 2 files changed, 73 insertions(+), 45 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/PrimitiveArrayManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/PrimitiveArrayManager.java index 9b66fdf51601..e8a2a9bb8374 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/PrimitiveArrayManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/PrimitiveArrayManager.java @@ -265,9 +265,6 @@ private static Object createPrimitiveArray(TSDataType dataType) { */ public static void release(Object array) { int order; - if (array == null) { - return; - } if (array instanceof boolean[]) { order = TSDataType.BOOLEAN.serialize(); } else if (array instanceof int[]) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java index a396c84f1b20..a4d967d1f93b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java @@ -196,39 +196,64 @@ public synchronized void putAlignedValue(long timestamp, Object[] value) { List columnValues = values.get(i); if (columnValue == null) { markNullValue(i, arrayIndex, elementIndex); + if (dataTypes.get(i).isBinary()) { + memoryBinaryChunkSize[i] += getBinarySize(Binary.EMPTY_VALUE); + } + continue; } switch (dataTypes.get(i)) { case TEXT: case BLOB: case STRING: - ((Binary[]) columnValues.get(arrayIndex))[elementIndex] = - columnValue != null ? (Binary) columnValue : Binary.EMPTY_VALUE; - memoryBinaryChunkSize[i] += - columnValue != null - ? getBinarySize((Binary) columnValue) - : getBinarySize(Binary.EMPTY_VALUE); + Binary[] arrayT = (Binary[]) columnValues.get(arrayIndex); + if (arrayT == null) { + arrayT = (Binary[]) getPrimitiveArraysByType(TSDataType.TEXT); + columnValues.set(arrayIndex, arrayT); + } + arrayT[elementIndex] = (Binary) columnValue; + memoryBinaryChunkSize[i] += getBinarySize((Binary) columnValue); break; case FLOAT: - ((float[]) columnValues.get(arrayIndex))[elementIndex] = - columnValue != null ? (float) columnValue : Float.MIN_VALUE; + float[] arrayF = (float[]) columnValues.get(arrayIndex); + if (arrayF == null) { + arrayF = (float[]) getPrimitiveArraysByType(TSDataType.FLOAT); + columnValues.set(arrayIndex, arrayF); + } + arrayF[elementIndex] = (float) columnValue; break; case INT32: case DATE: - ((int[]) columnValues.get(arrayIndex))[elementIndex] = - columnValue != null ? (int) columnValue : Integer.MIN_VALUE; + float[] arrayI = (float[]) columnValues.get(arrayIndex); + if (arrayI == null) { + arrayI = (float[]) getPrimitiveArraysByType(TSDataType.INT32); + columnValues.set(arrayIndex, arrayI); + } + arrayI[elementIndex] = (int) columnValue; break; case INT64: case TIMESTAMP: - ((long[]) columnValues.get(arrayIndex))[elementIndex] = - columnValue != null ? (long) columnValue : Long.MIN_VALUE; + long[] arrayL = (long[]) columnValues.get(arrayIndex); + if (arrayL == null) { + arrayL = (long[]) getPrimitiveArraysByType(TSDataType.INT64); + columnValues.set(arrayIndex, arrayL); + } + arrayL[elementIndex] = (long) columnValue; break; case DOUBLE: - ((double[]) columnValues.get(arrayIndex))[elementIndex] = - columnValue != null ? (double) columnValue : Double.MIN_VALUE; + double[] arrayD = (double[]) columnValues.get(arrayIndex); + if (arrayD == null) { + arrayD = (double[]) getPrimitiveArraysByType(TSDataType.DOUBLE); + columnValues.set(arrayIndex, arrayD); + } + arrayD[elementIndex] = (double) columnValue; break; case BOOLEAN: - ((boolean[]) columnValues.get(arrayIndex))[elementIndex] = - columnValue != null && (boolean) columnValue; + boolean[] arrayB = (boolean[]) columnValues.get(arrayIndex); + if (arrayB == null) { + arrayB = (boolean[]) getPrimitiveArraysByType(TSDataType.BOOLEAN); + columnValues.set(arrayIndex, arrayB); + } + arrayB[elementIndex] = (boolean) columnValue; break; default: break; @@ -485,25 +510,24 @@ public boolean isNullValue(int unsortedRowIndex, int columnIndex) { if (allValueColDeletedMap != null && allValueColDeletedMap.isMarked(unsortedRowIndex)) { return true; } + int arrayIndex = unsortedRowIndex / ARRAY_SIZE; + int elementIndex = unsortedRowIndex % ARRAY_SIZE; - if (columnIndex < 0 || columnIndex >= values.size() || values.get(columnIndex) == null) { + if (columnIndex < 0 + || columnIndex >= values.size() + || values.get(columnIndex) == null + || values.get(columnIndex).get(arrayIndex) == null) { return true; } if (bitMaps == null || bitMaps.get(columnIndex) == null - || bitMaps.get(columnIndex).get(unsortedRowIndex / ARRAY_SIZE) == null) { + || bitMaps.get(columnIndex).get(arrayIndex) == null) { return false; } - int arrayIndex = unsortedRowIndex / ARRAY_SIZE; - int elementIndex = unsortedRowIndex % ARRAY_SIZE; List columnBitMaps = bitMaps.get(columnIndex); return columnBitMaps.get(arrayIndex).isMarked(elementIndex); } - public List> getValues() { - return values; - } - public List getTsDataTypes() { return dataTypes; } @@ -611,6 +635,9 @@ public void deleteColumn(int columnIndex) { } protected Object cloneValue(TSDataType type, Object value) { + if (value == null) { + return null; + } switch (type) { case TEXT: case BLOB: @@ -657,7 +684,9 @@ protected void clearValue() { List columnValues = values.get(i); if (columnValues != null) { for (Object dataArray : columnValues) { - PrimitiveArrayManager.release(dataArray); + if (dataArray != null) { + PrimitiveArrayManager.release(dataArray); + } } columnValues.clear(); } @@ -1254,50 +1283,52 @@ public void serializeToWAL(IWALByteBufferView buffer) { int arrayIndex = rowIndex / ARRAY_SIZE; int elementIndex = rowIndex % ARRAY_SIZE; // value + boolean isNull = isNullValue(rowIndex, columnIndex); switch (dataTypes.get(columnIndex)) { case TEXT: case BLOB: case STRING: - Binary valueT = ((Binary[]) columnValues.get(arrayIndex))[elementIndex]; // In some scenario, the Binary in AlignedTVList will be null if this field is empty in // current row. We need to handle this scenario to get rid of NPE. See the similar issue // here: https://github.com/apache/iotdb/pull/9884 // Furthermore, we use an empty Binary as a placeholder here. It won't lead to data // error because whether this field is null or not is decided by the bitMap rather than // the object's value here. - if (valueT != null) { - WALWriteUtils.write(valueT, buffer); - } else { - WALWriteUtils.write(new Binary(new byte[0]), buffer); - } + WALWriteUtils.write( + isNull + ? Binary.EMPTY_VALUE + : ((Binary[]) columnValues.get(arrayIndex))[elementIndex], + buffer); break; case FLOAT: - float valueF = ((float[]) columnValues.get(arrayIndex))[elementIndex]; - buffer.putFloat(valueF); + buffer.putFloat( + isNull ? Float.MIN_VALUE : ((float[]) columnValues.get(arrayIndex))[elementIndex]); break; case INT32: case DATE: - int valueI = ((int[]) columnValues.get(arrayIndex))[elementIndex]; - buffer.putInt(valueI); + buffer.putInt( + isNull ? Integer.MIN_VALUE : ((int[]) columnValues.get(arrayIndex))[elementIndex]); break; case INT64: case TIMESTAMP: - long valueL = ((long[]) columnValues.get(arrayIndex))[elementIndex]; - buffer.putLong(valueL); + buffer.putLong( + isNull ? Long.MIN_VALUE : ((long[]) columnValues.get(arrayIndex))[elementIndex]); break; case DOUBLE: - double valueD = ((double[]) columnValues.get(arrayIndex))[elementIndex]; - buffer.putDouble(valueD); + buffer.putDouble( + isNull + ? Double.MIN_VALUE + : ((double[]) columnValues.get(arrayIndex))[elementIndex]); break; case BOOLEAN: - boolean valueB = ((boolean[]) columnValues.get(arrayIndex))[elementIndex]; - WALWriteUtils.write(valueB, buffer); + WALWriteUtils.write( + !isNull && ((boolean[]) columnValues.get(arrayIndex))[elementIndex], buffer); break; default: throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT); } // bitmap - WALWriteUtils.write(isNullValue(rowIndex, columnIndex), buffer); + WALWriteUtils.write(isNull, buffer); } } From c532311b14b19d3b3b2ded5432abc2b718ea5c35 Mon Sep 17 00:00:00 2001 From: HTHou Date: Thu, 21 Aug 2025 23:43:12 +0800 Subject: [PATCH 3/7] deving mem_control --- .../storageengine/dataregion/memtable/TsFileProcessor.java | 6 ------ 1 file changed, 6 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java index e90fc264e790..35651499b918 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java @@ -773,7 +773,6 @@ private long[] checkAlignedMemCostAndAddToTspInfoForRow( } else { // For existed device of this mem table AlignedWritableMemChunk alignedMemChunk = (AlignedWritableMemChunk) memChunk; - List dataTypesInTVList = new ArrayList<>(); for (int i = 0; i < dataTypes.length; i++) { // Skip failed Measurements if (dataTypes[i] == null @@ -790,12 +789,10 @@ private long[] checkAlignedMemCostAndAddToTspInfoForRow( ? 1 : 0); memTableIncrement += currentArrayNum * AlignedTVList.valueListArrayMemCost(dataTypes[i]); - dataTypesInTVList.add(dataTypes[i]); } } // this insertion will result in a new array if ((alignedMemChunk.alignedListSize() % PrimitiveArrayManager.ARRAY_SIZE) == 0) { - dataTypesInTVList.addAll(alignedMemChunk.getWorkingTVList().getTsDataTypes()); memTableIncrement += alignedMemChunk.getWorkingTVList().alignedTvListArrayMemCost(); } } @@ -1067,7 +1064,6 @@ private void updateAlignedMemCost( numArraysToAdd * AlignedTVList.alignedTvListArrayMemCost(dataTypes, columnCategories); } else { AlignedWritableMemChunk alignedMemChunk = (AlignedWritableMemChunk) memChunk; - List dataTypesInTVList = new ArrayList<>(); int currentPointNum = alignedMemChunk.alignedListSize(); int newPointNum = currentPointNum + incomingPointNum; for (int i = 0; i < dataTypes.length; i++) { @@ -1086,7 +1082,6 @@ private void updateAlignedMemCost( memIncrements[0] += (currentPointNum / PrimitiveArrayManager.ARRAY_SIZE + 1) * AlignedTVList.valueListArrayMemCost(dataType); - dataTypesInTVList.add(dataType); } } @@ -1101,7 +1096,6 @@ private void updateAlignedMemCost( if (acquireArray != 0) { // memory of extending the TVList - dataTypesInTVList.addAll(alignedMemChunk.getWorkingTVList().getTsDataTypes()); memIncrements[0] += acquireArray * alignedMemChunk.getWorkingTVList().alignedTvListArrayMemCost(); } From cdac597412baef19e75c35100cd73dc716563c61 Mon Sep 17 00:00:00 2001 From: HTHou Date: Fri, 22 Aug 2025 17:32:45 +0800 Subject: [PATCH 4/7] deving --- .../dataregion/memtable/TsFileProcessor.java | 13 ++++++++----- .../db/utils/datastructure/AlignedTVList.java | 15 +++++---------- 2 files changed, 13 insertions(+), 15 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java index 35651499b918..c66a0f7091e0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java @@ -788,7 +788,7 @@ private long[] checkAlignedMemCostAndAddToTspInfoForRow( + (alignedMemChunk.alignedListSize() % PrimitiveArrayManager.ARRAY_SIZE > 0 ? 1 : 0); - memTableIncrement += currentArrayNum * AlignedTVList.valueListArrayMemCost(dataTypes[i]); + memTableIncrement += currentArrayNum * AlignedTVList.emptyValueListArrayMemCost(); } } // this insertion will result in a new array @@ -875,7 +875,7 @@ private long[] checkAlignedMemCostAndAddToTspInfoForRows(List ins ? 1 : 0); memTableIncrement += - currentArrayNum * AlignedTVList.valueListArrayMemCost(dataTypes[i]); + currentArrayNum * AlignedTVList.emptyValueListArrayMemCost(); } } int addingPointNum = addingPointNumInfo.right; @@ -1066,6 +1066,7 @@ private void updateAlignedMemCost( AlignedWritableMemChunk alignedMemChunk = (AlignedWritableMemChunk) memChunk; int currentPointNum = alignedMemChunk.alignedListSize(); int newPointNum = currentPointNum + incomingPointNum; + List insertingTypes = new ArrayList<>(); for (int i = 0; i < dataTypes.length; i++) { TSDataType dataType = dataTypes[i]; String measurement = measurementIds[i]; @@ -1076,12 +1077,13 @@ private void updateAlignedMemCost( || (columnCategories != null && columnCategories[i] != TsTableColumnCategory.FIELD)) { continue; } + insertingTypes.add(dataType); if (!alignedMemChunk.containsMeasurement(measurementIds[i])) { // add a new column in the TVList, the new column should be as long as existing ones memIncrements[0] += (currentPointNum / PrimitiveArrayManager.ARRAY_SIZE + 1) - * AlignedTVList.valueListArrayMemCost(dataType); + * AlignedTVList.emptyValueListArrayMemCost(); } } @@ -1096,8 +1098,9 @@ private void updateAlignedMemCost( if (acquireArray != 0) { // memory of extending the TVList - memIncrements[0] += - acquireArray * alignedMemChunk.getWorkingTVList().alignedTvListArrayMemCost(); + memIncrements[0] += acquireArray * alignedMemChunk.getWorkingTVList().alignedTvListArrayMemCost(insertingTypes); + } else { + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java index a4d967d1f93b..00c171f136ef 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java @@ -975,7 +975,7 @@ public static long alignedTvListArrayMemCost( * * @return AlignedTvListArrayMemSize */ - public long alignedTvListArrayMemCost() { + public long alignedTvListArrayMemCost(List insertingTypes) { long size = 0; // value & bitmap array mem size for (int column = 0; column < dataTypes.size(); column++) { @@ -1003,19 +1003,14 @@ public long alignedTvListArrayMemCost() { } /** - * Get the single column array mem cost by give type. + * Get the single empty column array mem cost. * - * @param type the type of the value column - * @return valueListArrayMemCost + * @return emptyValueListArrayMemCost */ - public static long valueListArrayMemCost(TSDataType type) { + public static long emptyValueListArrayMemCost() { long size = 0; - // value array mem size - size += (long) PrimitiveArrayManager.ARRAY_SIZE * (long) type.getDataTypeSize(); // bitmap array mem size - size += (long) PrimitiveArrayManager.ARRAY_SIZE / 8 + 1; - // array headers mem size - size += NUM_BYTES_ARRAY_HEADER; + size += PrimitiveArrayManager.ARRAY_SIZE / 8 + 1; // Object references size in ArrayList size += NUM_BYTES_OBJECT_REF; return size; From ead01cd0a4c4751402a88094b5393e4d7c2de438 Mon Sep 17 00:00:00 2001 From: HTHou Date: Wed, 27 Aug 2025 17:16:26 +0800 Subject: [PATCH 5/7] dev --- .../memtable/AlignedWritableMemChunk.java | 61 +++++++++++++++++++ .../dataregion/memtable/TsFileProcessor.java | 29 +++++---- .../db/utils/datastructure/AlignedTVList.java | 18 +++++- 3 files changed, 93 insertions(+), 15 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java index d41ef855d368..d7b7b4b027ce 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java @@ -24,6 +24,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView; import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALWriteUtils; +import org.apache.iotdb.db.storageengine.rescon.memory.PrimitiveArrayManager; import org.apache.iotdb.db.utils.datastructure.AlignedTVList; import org.apache.iotdb.db.utils.datastructure.BatchEncodeInfo; import org.apache.iotdb.db.utils.datastructure.MemPointIterator; @@ -55,6 +56,8 @@ import java.util.concurrent.BlockingQueue; import static org.apache.iotdb.db.utils.ModificationUtils.isPointDeleted; +import static org.apache.tsfile.utils.RamUsageEstimator.NUM_BYTES_ARRAY_HEADER; +import static org.apache.tsfile.utils.RamUsageEstimator.NUM_BYTES_OBJECT_REF; public class AlignedWritableMemChunk extends AbstractWritableMemChunk { @@ -860,4 +863,62 @@ public int getAvgPointSizeOfLargestColumn() { } return avgPointSizeOfLargestColumn; } + + public long getTvListArrayMemCostIncrement1( + List insertingMeasurements, List insertingTypes) { + long size = 0; + List bitMaps = list.getBitMap(); + // value & bitmap array mem size + for (int column = 0; column < dataTypes.size(); column++) { + TSDataType type = dataTypes.get(column); + if (type != null) { + if (bitMaps != null && bitMaps.get(column) != null) { + size += (long) PrimitiveArrayManager.ARRAY_SIZE / 8 + 1; + } + } + } + int newMeasurementCount = 0; + for (int i = 0; i < insertingMeasurements.size(); i++) { + String measurementName = insertingMeasurements.get(i); + TSDataType type = insertingTypes.get(i); + size += (long) PrimitiveArrayManager.ARRAY_SIZE * (long) type.getDataTypeSize(); + if (!measurementIndexMap.containsKey(measurementName)) { + newMeasurementCount++; + } + } + // size is 0 when all types are null + if (size == 0) { + return size; + } + // time array mem size + size += PrimitiveArrayManager.ARRAY_SIZE * 8L; + // index array mem size + size += (list.getIndices() != null) ? PrimitiveArrayManager.ARRAY_SIZE * 4L : 0; + // array headers mem size + size += (long) NUM_BYTES_ARRAY_HEADER * (2 + insertingTypes.size()); + // Object references size in ArrayList + size += (long) NUM_BYTES_OBJECT_REF * (2 + dataTypes.size() + newMeasurementCount); + return size; + } + + public long getTvListArrayMemCostIncrement( + List insertingMeasurements, List insertingTypes) { + long memCostIncrement = 0; + for (int i = 0; i < insertingMeasurements.size(); i++) { + String measurementName = insertingMeasurements.get(i); + TSDataType dataType = insertingTypes.get(i); + Integer columIndex = measurementIndexMap.get(measurementName); + if (columIndex == null) { + memCostIncrement += + (long) PrimitiveArrayManager.ARRAY_SIZE * (long) dataType.getDataTypeSize(); + } else { + List columnArries = list.getValues().get(columIndex); + if (columnArries.get(columnArries.size() - 1) == null) { + memCostIncrement += + (long) PrimitiveArrayManager.ARRAY_SIZE * (long) dataType.getDataTypeSize(); + } + } + } + return memCostIncrement; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java index 6602a3b07bb6..d28880fd4cf6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java @@ -792,9 +792,9 @@ private long[] checkAlignedMemCostAndAddToTspInfoForRow( } } // this insertion will result in a new array - if ((alignedMemChunk.alignedListSize() % PrimitiveArrayManager.ARRAY_SIZE) == 0) { - memTableIncrement += alignedMemChunk.getWorkingTVList().alignedTvListArrayMemCost(); - } + // if ((alignedMemChunk.alignedListSize() % PrimitiveArrayManager.ARRAY_SIZE) == 0) { + // memTableIncrement += alignedMemChunk.getWorkingTVList().alignedTvListArrayMemCost(); + // } } for (int i = 0; i < dataTypes.length; i++) { @@ -874,8 +874,7 @@ private long[] checkAlignedMemCostAndAddToTspInfoForRows(List ins > 0 ? 1 : 0); - memTableIncrement += - currentArrayNum * AlignedTVList.emptyValueListArrayMemCost(); + memTableIncrement += currentArrayNum * AlignedTVList.emptyValueListArrayMemCost(); } } int addingPointNum = addingPointNumInfo.right; @@ -885,11 +884,11 @@ private long[] checkAlignedMemCostAndAddToTspInfoForRows(List ins dataTypesInTVList.addAll(alignedMemChunk.getWorkingTVList().getTsDataTypes()); } dataTypesInTVList.addAll(addingPointNumInfo.left.values()); - memTableIncrement += - alignedMemChunk != null - ? alignedMemChunk.getWorkingTVList().alignedTvListArrayMemCost() - : AlignedTVList.alignedTvListArrayMemCost( - dataTypesInTVList.toArray(new TSDataType[0]), null); + // memTableIncrement += + // alignedMemChunk != null + // ? alignedMemChunk.getWorkingTVList().alignedTvListArrayMemCost() + // : AlignedTVList.alignedTvListArrayMemCost( + // dataTypesInTVList.toArray(new TSDataType[0]), null); } addingPointNumInfo.setRight(addingPointNum + 1); } @@ -1066,6 +1065,7 @@ private void updateAlignedMemCost( AlignedWritableMemChunk alignedMemChunk = (AlignedWritableMemChunk) memChunk; int currentPointNum = alignedMemChunk.alignedListSize(); int newPointNum = currentPointNum + incomingPointNum; + List insertingMeasurements = new ArrayList<>(); List insertingTypes = new ArrayList<>(); for (int i = 0; i < dataTypes.length; i++) { TSDataType dataType = dataTypes[i]; @@ -1077,6 +1077,7 @@ private void updateAlignedMemCost( || (columnCategories != null && columnCategories[i] != TsTableColumnCategory.FIELD)) { continue; } + insertingMeasurements.add(measurement); insertingTypes.add(dataType); if (!alignedMemChunk.containsMeasurement(measurementIds[i])) { @@ -1098,9 +1099,13 @@ private void updateAlignedMemCost( if (acquireArray != 0) { // memory of extending the TVList - memIncrements[0] += acquireArray * alignedMemChunk.getWorkingTVList().alignedTvListArrayMemCost(insertingTypes); + memIncrements[0] += + acquireArray + * alignedMemChunk.getTvListArrayMemCostIncrement1( + insertingMeasurements, insertingTypes); } else { - + memIncrements[0] += + alignedMemChunk.getTvListArrayMemCostIncrement(insertingMeasurements, insertingTypes); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java index 996eb1282a87..2f346de64a24 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java @@ -528,10 +528,18 @@ public boolean isNullValue(int unsortedRowIndex, int columnIndex) { return columnBitMaps.get(arrayIndex).isMarked(elementIndex); } + public List> getValues() { + return values; + } + public List getTsDataTypes() { return dataTypes; } + public List getIndices() { + return indices; + } + @Override public int delete(long lowerBound, long upperBound) { int deletedNumber = 0; @@ -983,7 +991,9 @@ public TSDataType getDataType() { @Override public long calculateRamSize() { - return timestamps.size() * alignedTvListArrayMemCost(); + // return timestamps.size() * alignedTvListArrayMemCost(); + // FIXME:(Haonan) + return 0; } /** @@ -1030,12 +1040,14 @@ public long alignedTvListArrayMemCost(List insertingTypes) { for (int column = 0; column < dataTypes.size(); column++) { TSDataType type = dataTypes.get(column); if (type != null) { - size += (long) PrimitiveArrayManager.ARRAY_SIZE * (long) type.getDataTypeSize(); if (bitMaps != null && bitMaps.get(column) != null) { size += (long) PrimitiveArrayManager.ARRAY_SIZE / 8 + 1; } } } + for (TSDataType type : insertingTypes) { + size += (long) PrimitiveArrayManager.ARRAY_SIZE * (long) type.getDataTypeSize(); + } // size is 0 when all types are null if (size == 0) { return size; @@ -1045,7 +1057,7 @@ public long alignedTvListArrayMemCost(List insertingTypes) { // index array mem size size += (indices != null) ? PrimitiveArrayManager.ARRAY_SIZE * 4L : 0; // array headers mem size - size += (long) NUM_BYTES_ARRAY_HEADER * (2 + dataTypes.size()); + size += (long) NUM_BYTES_ARRAY_HEADER * (2 + insertingTypes.size()); // Object references size in ArrayList size += (long) NUM_BYTES_OBJECT_REF * (2 + dataTypes.size()); return size; From c186fd4300a1494b9a8fea72f150be6ec5ae39a0 Mon Sep 17 00:00:00 2001 From: HTHou Date: Mon, 1 Sep 2025 19:03:28 +0800 Subject: [PATCH 6/7] dev --- .../AlignedTimeseriesSessionExample.java | 87 ++++++++++++------- .../dataregion/memtable/AbstractMemTable.java | 1 + .../memtable/AlignedWritableMemChunk.java | 9 +- .../dataregion/memtable/TsFileProcessor.java | 19 ++-- 4 files changed, 67 insertions(+), 49 deletions(-) diff --git a/example/session/src/main/java/org/apache/iotdb/AlignedTimeseriesSessionExample.java b/example/session/src/main/java/org/apache/iotdb/AlignedTimeseriesSessionExample.java index 288bd65ddcb4..84e570bef23c 100644 --- a/example/session/src/main/java/org/apache/iotdb/AlignedTimeseriesSessionExample.java +++ b/example/session/src/main/java/org/apache/iotdb/AlignedTimeseriesSessionExample.java @@ -63,45 +63,45 @@ public static void main(String[] args) // set session fetchSize session.setFetchSize(10000); - // createTemplate(); - createAlignedTimeseries(); - createAlignedTimeseriesWithNullPartical(); - - insertAlignedRecord(); + // // createTemplate(); + // createAlignedTimeseries(); + // createAlignedTimeseriesWithNullPartical(); + // + // insertAlignedRecord(); // insertAlignedRecords(); // insertAlignedRecordsOfOneDevice(); // insertAlignedStringRecord(); // insertAlignedStringRecords(); - // insertTabletWithAlignedTimeseriesMethod1(); + insertTabletWithAlignedTimeseriesMethod1(); // insertTabletWithAlignedTimeseriesMethod2(); // insertNullableTabletWithAlignedTimeseries(); // insertTabletsWithAlignedTimeseries(); - session.executeNonQueryStatement(FLUSH); - selectTest(); - selectWithValueFilterTest(); - selectWithLastTest(); - selectWithLastTestWithoutValueFilter(); - session.executeNonQueryStatement("delete from root.sg_1.d1.s1 where time <= 5"); - System.out.println("execute sql delete from root.sg_1.d1.s1 where time <= 5"); - selectTest(); - selectWithValueFilterTest(); - selectWithLastTest(); - selectWithLastTestWithoutValueFilter(); - session.executeNonQueryStatement("delete from root.sg_1.d1.s2 where time <= 3"); - System.out.println("execute sql delete from root.sg_1.d1.s2 where time <= 3"); - - selectTest(); - selectWithValueFilterTest(); - selectWithLastTest(); - selectWithLastTestWithoutValueFilter(); - session.executeNonQueryStatement("delete from root.sg_1.d1.s1 where time <= 10"); - System.out.println("execute sql delete from root.sg_1.d1.s1 where time <= 10"); - selectTest(); - selectWithValueFilterTest(); - selectWithLastTest(); - selectWithLastTestWithoutValueFilter(); + // session.executeNonQueryStatement(FLUSH); + // selectTest(); + // selectWithValueFilterTest(); + // selectWithLastTest(); + // selectWithLastTestWithoutValueFilter(); + // session.executeNonQueryStatement("delete from root.sg_1.d1.s1 where time <= 5"); + // System.out.println("execute sql delete from root.sg_1.d1.s1 where time <= 5"); + // selectTest(); + // selectWithValueFilterTest(); + // selectWithLastTest(); + // selectWithLastTestWithoutValueFilter(); + // session.executeNonQueryStatement("delete from root.sg_1.d1.s2 where time <= 3"); + // System.out.println("execute sql delete from root.sg_1.d1.s2 where time <= 3"); + // + // selectTest(); + // selectWithValueFilterTest(); + // selectWithLastTest(); + // selectWithLastTestWithoutValueFilter(); + // session.executeNonQueryStatement("delete from root.sg_1.d1.s1 where time <= 10"); + // System.out.println("execute sql delete from root.sg_1.d1.s1 where time <= 10"); + // selectTest(); + // selectWithValueFilterTest(); + // selectWithLastTest(); + // selectWithLastTestWithoutValueFilter(); // selectWithValueFilterTest(); // selectWithGroupByTest(); @@ -347,7 +347,6 @@ private static void insertTabletWithAlignedTimeseriesMethod1() // only measurementId and data type in MeasurementSchema take effects in Tablet List schemaList = new ArrayList<>(); schemaList.add(new MeasurementSchema("s1", TSDataType.INT64)); - schemaList.add(new MeasurementSchema("s2", TSDataType.INT32)); Tablet tablet = new Tablet(ROOT_SG1_D1, schemaList); long timestamp = 1; @@ -357,8 +356,30 @@ private static void insertTabletWithAlignedTimeseriesMethod1() tablet.addTimestamp(rowIndex, timestamp); tablet.addValue( schemaList.get(0).getMeasurementName(), rowIndex, new SecureRandom().nextLong()); + + if (tablet.getRowSize() == tablet.getMaxRowNumber()) { + session.insertAlignedTablet(tablet, true); + tablet.reset(); + } + timestamp++; + } + + if (tablet.getRowSize() != 0) { + session.insertAlignedTablet(tablet); + tablet.reset(); + } + + schemaList = new ArrayList<>(); + schemaList.add(new MeasurementSchema("s2", TSDataType.INT32)); + + tablet = new Tablet(ROOT_SG1_D1, schemaList); + timestamp = 1; + + for (long row = 1; row < 100; row++) { + int rowIndex = tablet.getRowSize(); + tablet.addTimestamp(rowIndex, timestamp); tablet.addValue( - schemaList.get(1).getMeasurementName(), rowIndex, new SecureRandom().nextInt()); + schemaList.get(0).getMeasurementName(), rowIndex, new SecureRandom().nextInt()); if (tablet.getRowSize() == tablet.getMaxRowNumber()) { session.insertAlignedTablet(tablet, true); @@ -372,7 +393,7 @@ private static void insertTabletWithAlignedTimeseriesMethod1() tablet.reset(); } - session.executeNonQueryStatement(FLUSH); + // session.executeNonQueryStatement(FLUSH); } /** Method 2 for insert tablet with aligned timeseries */ diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java index f2f4d5e29439..72c673831bc2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java @@ -787,6 +787,7 @@ public long delete(ModEntry modEntry) { @Override public void addTVListRamCost(long cost) { this.tvListRamCost += cost; + System.out.println(tvListRamCost); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java index d7b7b4b027ce..a48692ba737f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java @@ -867,14 +867,11 @@ public int getAvgPointSizeOfLargestColumn() { public long getTvListArrayMemCostIncrement1( List insertingMeasurements, List insertingTypes) { long size = 0; - List bitMaps = list.getBitMap(); + List> bitMaps = list.getBitMaps(); // value & bitmap array mem size for (int column = 0; column < dataTypes.size(); column++) { - TSDataType type = dataTypes.get(column); - if (type != null) { - if (bitMaps != null && bitMaps.get(column) != null) { - size += (long) PrimitiveArrayManager.ARRAY_SIZE / 8 + 1; - } + if (bitMaps != null && bitMaps.get(column) != null) { + size += (long) PrimitiveArrayManager.ARRAY_SIZE / 8 + 1; } } int newMeasurementCount = 0; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java index d28880fd4cf6..67ae375b15af 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java @@ -1065,6 +1065,14 @@ private void updateAlignedMemCost( AlignedWritableMemChunk alignedMemChunk = (AlignedWritableMemChunk) memChunk; int currentPointNum = alignedMemChunk.alignedListSize(); int newPointNum = currentPointNum + incomingPointNum; + // calculate how many new arrays will be added after this insertion + int currentArrayCnt = + currentPointNum / PrimitiveArrayManager.ARRAY_SIZE + + (currentPointNum % PrimitiveArrayManager.ARRAY_SIZE > 0 ? 1 : 0); + int newArrayCnt = + newPointNum / PrimitiveArrayManager.ARRAY_SIZE + + (newPointNum % PrimitiveArrayManager.ARRAY_SIZE > 0 ? 1 : 0); + long acquireArray = newArrayCnt - currentArrayCnt; List insertingMeasurements = new ArrayList<>(); List insertingTypes = new ArrayList<>(); for (int i = 0; i < dataTypes.length; i++) { @@ -1083,20 +1091,11 @@ private void updateAlignedMemCost( if (!alignedMemChunk.containsMeasurement(measurementIds[i])) { // add a new column in the TVList, the new column should be as long as existing ones memIncrements[0] += - (currentPointNum / PrimitiveArrayManager.ARRAY_SIZE + 1) + newArrayCnt * AlignedTVList.emptyValueListArrayMemCost(); } } - // calculate how many new arrays will be added after this insertion - int currentArrayCnt = - currentPointNum / PrimitiveArrayManager.ARRAY_SIZE - + (currentPointNum % PrimitiveArrayManager.ARRAY_SIZE > 0 ? 1 : 0); - int newArrayCnt = - newPointNum / PrimitiveArrayManager.ARRAY_SIZE - + (newPointNum % PrimitiveArrayManager.ARRAY_SIZE > 0 ? 1 : 0); - long acquireArray = newArrayCnt - currentArrayCnt; - if (acquireArray != 0) { // memory of extending the TVList memIncrements[0] += From 583edfe356bc6dbaf88abec5a75a5c366b3cf05b Mon Sep 17 00:00:00 2001 From: HTHou Date: Fri, 28 Nov 2025 11:23:52 +0800 Subject: [PATCH 7/7] recover example --- .../AlignedTimeseriesSessionExample.java | 87 +++++++------------ 1 file changed, 33 insertions(+), 54 deletions(-) diff --git a/example/session/src/main/java/org/apache/iotdb/AlignedTimeseriesSessionExample.java b/example/session/src/main/java/org/apache/iotdb/AlignedTimeseriesSessionExample.java index 84e570bef23c..288bd65ddcb4 100644 --- a/example/session/src/main/java/org/apache/iotdb/AlignedTimeseriesSessionExample.java +++ b/example/session/src/main/java/org/apache/iotdb/AlignedTimeseriesSessionExample.java @@ -63,45 +63,45 @@ public static void main(String[] args) // set session fetchSize session.setFetchSize(10000); - // // createTemplate(); - // createAlignedTimeseries(); - // createAlignedTimeseriesWithNullPartical(); - // - // insertAlignedRecord(); + // createTemplate(); + createAlignedTimeseries(); + createAlignedTimeseriesWithNullPartical(); + + insertAlignedRecord(); // insertAlignedRecords(); // insertAlignedRecordsOfOneDevice(); // insertAlignedStringRecord(); // insertAlignedStringRecords(); - insertTabletWithAlignedTimeseriesMethod1(); + // insertTabletWithAlignedTimeseriesMethod1(); // insertTabletWithAlignedTimeseriesMethod2(); // insertNullableTabletWithAlignedTimeseries(); // insertTabletsWithAlignedTimeseries(); - // session.executeNonQueryStatement(FLUSH); - // selectTest(); - // selectWithValueFilterTest(); - // selectWithLastTest(); - // selectWithLastTestWithoutValueFilter(); - // session.executeNonQueryStatement("delete from root.sg_1.d1.s1 where time <= 5"); - // System.out.println("execute sql delete from root.sg_1.d1.s1 where time <= 5"); - // selectTest(); - // selectWithValueFilterTest(); - // selectWithLastTest(); - // selectWithLastTestWithoutValueFilter(); - // session.executeNonQueryStatement("delete from root.sg_1.d1.s2 where time <= 3"); - // System.out.println("execute sql delete from root.sg_1.d1.s2 where time <= 3"); - // - // selectTest(); - // selectWithValueFilterTest(); - // selectWithLastTest(); - // selectWithLastTestWithoutValueFilter(); - // session.executeNonQueryStatement("delete from root.sg_1.d1.s1 where time <= 10"); - // System.out.println("execute sql delete from root.sg_1.d1.s1 where time <= 10"); - // selectTest(); - // selectWithValueFilterTest(); - // selectWithLastTest(); - // selectWithLastTestWithoutValueFilter(); + session.executeNonQueryStatement(FLUSH); + selectTest(); + selectWithValueFilterTest(); + selectWithLastTest(); + selectWithLastTestWithoutValueFilter(); + session.executeNonQueryStatement("delete from root.sg_1.d1.s1 where time <= 5"); + System.out.println("execute sql delete from root.sg_1.d1.s1 where time <= 5"); + selectTest(); + selectWithValueFilterTest(); + selectWithLastTest(); + selectWithLastTestWithoutValueFilter(); + session.executeNonQueryStatement("delete from root.sg_1.d1.s2 where time <= 3"); + System.out.println("execute sql delete from root.sg_1.d1.s2 where time <= 3"); + + selectTest(); + selectWithValueFilterTest(); + selectWithLastTest(); + selectWithLastTestWithoutValueFilter(); + session.executeNonQueryStatement("delete from root.sg_1.d1.s1 where time <= 10"); + System.out.println("execute sql delete from root.sg_1.d1.s1 where time <= 10"); + selectTest(); + selectWithValueFilterTest(); + selectWithLastTest(); + selectWithLastTestWithoutValueFilter(); // selectWithValueFilterTest(); // selectWithGroupByTest(); @@ -347,6 +347,7 @@ private static void insertTabletWithAlignedTimeseriesMethod1() // only measurementId and data type in MeasurementSchema take effects in Tablet List schemaList = new ArrayList<>(); schemaList.add(new MeasurementSchema("s1", TSDataType.INT64)); + schemaList.add(new MeasurementSchema("s2", TSDataType.INT32)); Tablet tablet = new Tablet(ROOT_SG1_D1, schemaList); long timestamp = 1; @@ -356,30 +357,8 @@ private static void insertTabletWithAlignedTimeseriesMethod1() tablet.addTimestamp(rowIndex, timestamp); tablet.addValue( schemaList.get(0).getMeasurementName(), rowIndex, new SecureRandom().nextLong()); - - if (tablet.getRowSize() == tablet.getMaxRowNumber()) { - session.insertAlignedTablet(tablet, true); - tablet.reset(); - } - timestamp++; - } - - if (tablet.getRowSize() != 0) { - session.insertAlignedTablet(tablet); - tablet.reset(); - } - - schemaList = new ArrayList<>(); - schemaList.add(new MeasurementSchema("s2", TSDataType.INT32)); - - tablet = new Tablet(ROOT_SG1_D1, schemaList); - timestamp = 1; - - for (long row = 1; row < 100; row++) { - int rowIndex = tablet.getRowSize(); - tablet.addTimestamp(rowIndex, timestamp); tablet.addValue( - schemaList.get(0).getMeasurementName(), rowIndex, new SecureRandom().nextInt()); + schemaList.get(1).getMeasurementName(), rowIndex, new SecureRandom().nextInt()); if (tablet.getRowSize() == tablet.getMaxRowNumber()) { session.insertAlignedTablet(tablet, true); @@ -393,7 +372,7 @@ private static void insertTabletWithAlignedTimeseriesMethod1() tablet.reset(); } - // session.executeNonQueryStatement(FLUSH); + session.executeNonQueryStatement(FLUSH); } /** Method 2 for insert tablet with aligned timeseries */