diff --git a/integration-test/src/test/java/org/apache/iotdb/relational/it/session/IoTDBObjectDeleteIT.java b/integration-test/src/test/java/org/apache/iotdb/relational/it/session/IoTDBObjectDeleteIT.java new file mode 100644 index 0000000000000..8bfec578671eb --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/relational/it/session/IoTDBObjectDeleteIT.java @@ -0,0 +1,273 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.relational.it.session; + +import org.apache.iotdb.isession.ITableSession; +import org.apache.iotdb.isession.SessionDataSet; +import org.apache.iotdb.it.env.EnvFactory; +import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.TableClusterIT; +import org.apache.iotdb.itbase.category.TableLocalStandaloneIT; +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.StatementExecutionException; + +import com.google.common.io.BaseEncoding; +import org.apache.tsfile.enums.ColumnCategory; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.utils.Binary; +import org.apache.tsfile.write.record.Tablet; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.junit.Assert.assertNull; + +@RunWith(IoTDBTestRunner.class) +@Category({TableLocalStandaloneIT.class, TableClusterIT.class}) +public class IoTDBObjectDeleteIT { + + @BeforeClass + public static void classSetUp() throws Exception { + EnvFactory.getEnv().initClusterEnvironment(); + } + + @Before + public void setUp() throws Exception { + try (ITableSession session = EnvFactory.getEnv().getTableSessionConnection()) { + session.executeNonQueryStatement("CREATE DATABASE IF NOT EXISTS db1"); + } + } + + @After + public void tearDown() throws Exception { + try (ITableSession session = EnvFactory.getEnv().getTableSessionConnection()) { + session.executeNonQueryStatement("DROP DATABASE IF EXISTS db1"); + } + } + + @AfterClass + public static void classTearDown() { + EnvFactory.getEnv().cleanClusterEnvironment(); + } + + @Test + public void deleteObjectTest() + throws IoTDBConnectionException, StatementExecutionException, IOException { + String testObject = + System.getProperty("user.dir") + + File.separator + + "target" + + File.separator + + "test-classes" + + File.separator + + "object-example.pt"; + + try (ITableSession session = EnvFactory.getEnv().getTableSessionConnection()) { + session.executeNonQueryStatement("USE \"db1\""); + // insert table data by tablet + List columnNameList = + Arrays.asList("region_id", "plant_id", "device_id", "temperature", "file"); + List dataTypeList = + Arrays.asList( + TSDataType.STRING, + TSDataType.STRING, + TSDataType.STRING, + TSDataType.FLOAT, + TSDataType.OBJECT); + List columnTypeList = + new ArrayList<>( + Arrays.asList( + ColumnCategory.TAG, + ColumnCategory.TAG, + ColumnCategory.TAG, + ColumnCategory.FIELD, + ColumnCategory.FIELD)); + Tablet tablet = new Tablet("object_table", columnNameList, dataTypeList, columnTypeList, 1); + int rowIndex = tablet.getRowSize(); + tablet.addTimestamp(rowIndex, 1); + tablet.addValue(rowIndex, 0, "1"); + tablet.addValue(rowIndex, 1, "5"); + tablet.addValue(rowIndex, 2, "3"); + tablet.addValue(rowIndex, 3, 37.6F); + tablet.addValue(rowIndex, 4, true, 0, Files.readAllBytes(Paths.get(testObject))); + session.insert(tablet); + tablet.reset(); + + try (SessionDataSet dataSet = + session.executeQueryStatement( + "select READ_OBJECT(file) from object_table where time = 1")) { + SessionDataSet.DataIterator iterator = dataSet.iterator(); + while (iterator.next()) { + Binary binary = iterator.getBlob(1); + Assert.assertArrayEquals(Files.readAllBytes(Paths.get(testObject)), binary.getValues()); + } + session.executeNonQueryStatement("DROP TABLE IF EXISTS object_table"); + } + } + + // test object file path + boolean success = false; + for (DataNodeWrapper dataNodeWrapper : EnvFactory.getEnv().getDataNodeWrapperList()) { + String objectDirStr = dataNodeWrapper.getDataNodeObjectDir(); + File objectDir = new File(objectDirStr); + if (objectDir.exists() && objectDir.isDirectory()) { + File[] regionDirs = objectDir.listFiles(); + if (regionDirs != null) { + for (File regionDir : regionDirs) { + if (regionDir.isDirectory()) { + File objectFile = + new File( + regionDir, + convertPathString("object_table") + + File.separator + + convertPathString("1") + + File.separator + + convertPathString("5") + + File.separator + + convertPathString("3") + + File.separator + + convertPathString("file") + + File.separator + + "1.bin"); + if (objectFile.exists() && objectFile.isFile()) { + success = true; + } + } + } + } + } + } + Assert.assertFalse(success); + } + + @Test + public void deleteObjectSegmentsTest() + throws IoTDBConnectionException, StatementExecutionException, IOException { + String testObject = + System.getProperty("user.dir") + + File.separator + + "target" + + File.separator + + "test-classes" + + File.separator + + "object-example.pt"; + byte[] objectBytes = Files.readAllBytes(Paths.get(testObject)); + List objectSegments = new ArrayList<>(); + for (int i = 0; i < objectBytes.length; i += 512) { + objectSegments.add(Arrays.copyOfRange(objectBytes, i, Math.min(i + 512, objectBytes.length))); + } + + try (ITableSession session = EnvFactory.getEnv().getTableSessionConnection()) { + session.executeNonQueryStatement("USE \"db1\""); + // insert table data by tablet + List columnNameList = + Arrays.asList("region_id", "plant_id", "device_id", "temperature", "file"); + List dataTypeList = + Arrays.asList( + TSDataType.STRING, + TSDataType.STRING, + TSDataType.STRING, + TSDataType.FLOAT, + TSDataType.OBJECT); + List columnTypeList = + new ArrayList<>( + Arrays.asList( + ColumnCategory.TAG, + ColumnCategory.TAG, + ColumnCategory.TAG, + ColumnCategory.FIELD, + ColumnCategory.FIELD)); + Tablet tablet = new Tablet("object_table", columnNameList, dataTypeList, columnTypeList, 1); + for (int i = 0; i < objectSegments.size() - 1; i++) { + int rowIndex = tablet.getRowSize(); + tablet.addTimestamp(rowIndex, 1); + tablet.addValue(rowIndex, 0, "1"); + tablet.addValue(rowIndex, 1, "5"); + tablet.addValue(rowIndex, 2, "3"); + tablet.addValue(rowIndex, 3, 37.6F); + tablet.addValue(rowIndex, 4, false, i * 512L, objectSegments.get(i)); + session.insert(tablet); + tablet.reset(); + } + session.executeNonQueryStatement("DELETE FROM object_table where time = 1"); + + try (SessionDataSet dataSet = + session.executeQueryStatement("select file from object_table where time = 1")) { + SessionDataSet.DataIterator iterator = dataSet.iterator(); + while (iterator.next()) { + assertNull(iterator.getString(1)); + } + } + } + + // test object file path + boolean success = false; + for (DataNodeWrapper dataNodeWrapper : EnvFactory.getEnv().getDataNodeWrapperList()) { + String objectDirStr = dataNodeWrapper.getDataNodeObjectDir(); + File objectDir = new File(objectDirStr); + if (objectDir.exists() && objectDir.isDirectory()) { + File[] regionDirs = objectDir.listFiles(); + if (regionDirs != null) { + for (File regionDir : regionDirs) { + if (regionDir.isDirectory()) { + File objectTmpFile = + new File( + regionDir, + convertPathString("object_table") + + File.separator + + convertPathString("1") + + File.separator + + convertPathString("5") + + File.separator + + convertPathString("3") + + File.separator + + convertPathString("file") + + File.separator + + "1.bin.tmp"); + if (objectTmpFile.exists() && objectTmpFile.isFile()) { + success = true; + } + } + } + } + } + } + Assert.assertFalse(success); + } + + protected String convertPathString(String path) { + return BaseEncoding.base32().omitPadding().encode(path.getBytes(StandardCharsets.UTF_8)); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java index 09c9ee8430415..14a56e24acbff 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java @@ -158,6 +158,7 @@ import org.apache.iotdb.db.utils.DateTimeUtils; import org.apache.iotdb.db.utils.EncryptDBUtils; import org.apache.iotdb.db.utils.ModificationUtils; +import org.apache.iotdb.db.utils.ObjectTypeUtils; import org.apache.iotdb.db.utils.ObjectWriter; import org.apache.iotdb.metrics.utils.MetricLevel; import org.apache.iotdb.rpc.RpcUtils; @@ -165,11 +166,13 @@ import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; +import com.google.common.io.BaseEncoding; import org.apache.thrift.TException; import org.apache.tsfile.external.commons.io.FileUtils; import org.apache.tsfile.external.commons.lang3.tuple.Triple; import org.apache.tsfile.file.metadata.ChunkMetadata; import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.file.metadata.IDeviceID.Factory; import org.apache.tsfile.file.metadata.TableSchema; import org.apache.tsfile.fileSystem.FSFactoryProducer; import org.apache.tsfile.fileSystem.FSType; @@ -186,6 +189,7 @@ import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; @@ -2751,8 +2755,8 @@ public void deleteByTable(RelationalDeleteDataNode node) throws IOException { if (deleted) { return; } - TableDeviceSchemaCache.getInstance() - .invalidateLastCache(getDatabaseName(), modEntries.get(0).getTableName()); + String tableName = modEntries.get(0).getTableName(); + TableDeviceSchemaCache.getInstance().invalidateLastCache(getDatabaseName(), tableName); List walListeners = logDeletionInWAL(node); for (WALFlushListener walFlushListener : walListeners) { @@ -2762,6 +2766,23 @@ public void deleteByTable(RelationalDeleteDataNode node) throws IOException { } } + List objectTableDirs = + TierManager.getInstance().getAllMatchedObjectDirs(dataRegionIdString, tableName); + if (!objectTableDirs.isEmpty()) { + boolean droppingTable = false; + for (TableDeletionEntry entry : modEntries) { + if (entry.isDroppingTable()) { + for (File objectTableDir : objectTableDirs) { + droppingTable = true; + FileUtils.deleteQuietly(objectTableDir); + } + } + } + if (!droppingTable) { + deleteObjectFiles(objectTableDirs, modEntries); + } + } + List> sealedTsFileResourceLists = new ArrayList<>(modEntries.size()); for (TableDeletionEntry modEntry : modEntries) { List sealedTsFileResource = new ArrayList<>(); @@ -2930,6 +2951,59 @@ private List logDeletionInWAL( return walFlushListeners; } + private void deleteObjectFiles(List matchedObjectDirs, List modEntries) + throws IOException { + for (File matchedObjectDir : matchedObjectDirs) { + try (Stream paths = + Files.find( + matchedObjectDir.toPath(), + Integer.MAX_VALUE, + (path, attrs) -> + attrs.isRegularFile() + && (path.getFileName().toString().endsWith(".bin") + || path.getFileName().toString().endsWith(".tmp")))) { + paths.forEach( + path -> { + Path relativePath = matchedObjectDir.getParentFile().toPath().relativize(path); + String[] ideviceIdSegments = new String[relativePath.getNameCount() - 2]; + for (int i = 0; i < ideviceIdSegments.length; i++) { + ideviceIdSegments[i] = + CommonDescriptor.getInstance().getConfig().isRestrictObjectLimit() + ? relativePath.getName(i).toString() + : new String( + BaseEncoding.base32() + .omitPadding() + .decode(relativePath.getName(i).toString()), + StandardCharsets.UTF_8); + } + IDeviceID iDeviceID = Factory.DEFAULT_FACTORY.create(ideviceIdSegments); + String measurementId = + CommonDescriptor.getInstance().getConfig().isRestrictObjectLimit() + ? relativePath.getName(relativePath.getNameCount() - 2).toString() + : new String( + BaseEncoding.base32() + .omitPadding() + .decode( + relativePath.getName(relativePath.getNameCount() - 2).toString()), + StandardCharsets.UTF_8); + String fileName = path.getFileName().toString(); + long timestamp = Long.parseLong(fileName.substring(0, fileName.indexOf('.'))); + logger.info( + "timestamp {}, measurementId {}, ideviceId {}", + timestamp, + measurementId, + iDeviceID); + for (TableDeletionEntry modEntry : modEntries) { + if (modEntry.affects(iDeviceID, timestamp, timestamp) + && modEntry.affects(measurementId)) { + ObjectTypeUtils.deleteObjectPath(path.toFile()); + } + } + }); + } + } + } + /** * For IoTConsensus sync. See github pull * request for details. diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/DeletionPredicate.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/DeletionPredicate.java index 7e79e8f580dc5..0a22da2a90a9e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/DeletionPredicate.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/DeletionPredicate.java @@ -72,6 +72,10 @@ public void setIdPredicate(IDPredicate idPredicate) { this.idPredicate = idPredicate; } + public IDPredicate getIdPredicate() { + return idPredicate; + } + public String getTableName() { return tableName; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/TableDeletionEntry.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/TableDeletionEntry.java index 858b6645b2f9a..61ec2e0d67854 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/TableDeletionEntry.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/TableDeletionEntry.java @@ -21,6 +21,7 @@ import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper; +import org.apache.iotdb.db.storageengine.dataregion.modification.IDPredicate.IDPredicateType; import org.apache.iotdb.db.utils.ModificationUtils; import org.apache.tsfile.file.metadata.IDeviceID; @@ -136,6 +137,14 @@ public String getTableName() { return predicate.getTableName(); } + public boolean isDroppingTable() { + IDPredicate idPredicate = predicate.getIdPredicate(); + return idPredicate.type == IDPredicateType.NOP + && predicate.getMeasurementNames().isEmpty() + && timeRange.getMin() == Long.MIN_VALUE + && timeRange.getMax() == Long.MAX_VALUE; + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/disk/TierManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/disk/TierManager.java index a5fa8b54e7b10..36bbe1cf06318 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/disk/TierManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/disk/TierManager.java @@ -18,6 +18,7 @@ */ package org.apache.iotdb.db.storageengine.rescon.disk; +import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; @@ -28,6 +29,7 @@ import org.apache.iotdb.db.storageengine.rescon.disk.strategy.RandomOnDiskUsableSpaceStrategy; import org.apache.iotdb.metrics.utils.FileStoreUtils; +import com.google.common.io.BaseEncoding; import org.apache.tsfile.fileSystem.FSFactoryProducer; import org.apache.tsfile.fileSystem.FSType; import org.apache.tsfile.utils.FSUtils; @@ -36,7 +38,9 @@ import java.io.File; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.nio.file.FileStore; +import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; @@ -172,6 +176,16 @@ public synchronized void initFolders() { } catch (DiskSpaceInsufficientException e) { logger.error("All disks of tier {} are full.", tierLevel, e); } + // try to remove empty objectDirs + for (String dir : objectDirs) { + File dirFile = FSFactoryProducer.getFSFactory().getFile(dir); + if (dirFile.isDirectory() && Objects.requireNonNull(dirFile.list()).length == 0) { + try { + Files.delete(dirFile.toPath()); + } catch (IOException ignore) { + } + } + } } tierDiskTotalSpace = getTierDiskSpace(DiskSpaceType.TOTAL); @@ -272,6 +286,41 @@ public Optional getAbsoluteObjectFilePath(String filePath, boolean needTem return Optional.empty(); } + public List getAllMatchedObjectDirs(String regionIdStr, String... path) { + List matchedDirs = new ArrayList<>(); + boolean hasObjectDir = false; + for (String objectDir : objectDirs) { + File objectDirPath = FSFactoryProducer.getFSFactory().getFile(objectDir); + if (objectDirPath.exists()) { + hasObjectDir = true; + break; + } + } + if (!hasObjectDir) { + return matchedDirs; + } + StringBuilder objectPath = new StringBuilder(); + objectPath.append(regionIdStr); + for (String str : path) { + objectPath + .append(File.separator) + .append( + CommonDescriptor.getInstance().getConfig().isRestrictObjectLimit() + ? str + : BaseEncoding.base32() + .omitPadding() + .encode(str.getBytes(StandardCharsets.UTF_8))); + } + for (String objectDir : objectDirs) { + File objectFilePath = + FSFactoryProducer.getFSFactory().getFile(objectDir, objectPath.toString()); + if (objectFilePath.exists()) { + matchedDirs.add(objectFilePath); + } + } + return matchedDirs; + } + public int getTiersNum() { return seqTiers.size(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ObjectTypeUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ObjectTypeUtils.java index ec1fd592617a4..15964ffaddd25 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ObjectTypeUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ObjectTypeUtils.java @@ -55,6 +55,7 @@ import java.nio.file.StandardOpenOption; import java.util.Collections; import java.util.List; +import java.util.Objects; import java.util.Optional; public class ObjectTypeUtils { @@ -294,6 +295,37 @@ public static void deleteObjectPathFromBinary(Binary binary) { } } + public static void deleteObjectPath(File file) { + File tmpFile = new File(file.getPath() + ".tmp"); + File bakFile = new File(file.getPath() + ".back"); + for (int i = 0; i < 2; i++) { + if (file.exists()) { + FileMetrics.getInstance().decreaseObjectFileNum(1); + FileMetrics.getInstance().decreaseObjectFileSize(file.length()); + } + try { + deleteObjectFile(file); + deleteObjectFile(tmpFile); + deleteObjectFile(bakFile); + } catch (IOException e) { + logger.error("Failed to remove object file {}", file.getAbsolutePath(), e); + } + } + deleteEmptyParentDir(file); + } + + private static void deleteEmptyParentDir(File file) { + File dir = file.getParentFile(); + if (dir.isDirectory() && Objects.requireNonNull(dir.list()).length == 0) { + try { + Files.deleteIfExists(dir.toPath()); + deleteEmptyParentDir(dir); + } catch (IOException e) { + logger.error("Failed to remove empty object dir {}", dir.getAbsolutePath(), e); + } + } + } + private static void deleteObjectFile(File file) throws IOException { if (file.exists()) { logger.info("Remove object file {}, size is {}(byte)", file.getAbsolutePath(), file.length());