{
+ * NOTE: this should be called before reading a new Parquet column chunk, and after {@link
+ * CometColumnReader#reset} is called.
+ */
+- public void setPageReader(PageReader pageReader) throws IOException {
++ public void setPageReader(RowGroupReader pageStore) throws IOException {
+ Preconditions.checkState(initialized, "Invalid state: 'reset' should be called first");
+- ((ColumnReader) delegate).setPageReader(pageReader);
++ ((ColumnReader) delegate).setRowGroupReader(pageStore, spec);
+ }
+
+ @Override
+diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnarBatchReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnarBatchReader.java
+index 04ac69476..916face2b 100644
+--- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnarBatchReader.java
++++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnarBatchReader.java
+@@ -22,8 +22,12 @@ import java.io.IOException;
+ import java.io.UncheckedIOException;
+ import java.util.List;
+ import java.util.Map;
++import org.apache.comet.CometRuntimeException;
+ import org.apache.comet.parquet.AbstractColumnReader;
+-import org.apache.comet.parquet.BatchReader;
++import org.apache.comet.parquet.IcebergCometBatchReader;
++import org.apache.comet.parquet.RowGroupReader;
++import org.apache.comet.vector.CometSelectionVector;
++import org.apache.comet.vector.CometVector;
+ import org.apache.iceberg.Schema;
+ import org.apache.iceberg.data.DeleteFilter;
+ import org.apache.iceberg.parquet.VectorizedReader;
+@@ -55,7 +59,7 @@ class CometColumnarBatchReader implements VectorizedReader {
+ // calling BatchReader.nextBatch, the isDeleted value is not yet available, so
+ // DeleteColumnReader.readBatch must be called explicitly later, after the isDeleted value is
+ // available.
+- private final BatchReader delegate;
++ private final IcebergCometBatchReader delegate;
+ private DeleteFilter deletes = null;
+ private long rowStartPosInBatch = 0;
+
+@@ -65,9 +69,7 @@ class CometColumnarBatchReader implements VectorizedReader {
+ this.hasIsDeletedColumn =
+ readers.stream().anyMatch(reader -> reader instanceof CometDeleteColumnReader);
+
+- AbstractColumnReader[] abstractColumnReaders = new AbstractColumnReader[readers.size()];
+- this.delegate = new BatchReader(abstractColumnReaders);
+- delegate.setSparkSchema(SparkSchemaUtil.convert(schema));
++ this.delegate = new IcebergCometBatchReader(readers.size(), SparkSchemaUtil.convert(schema));
+ }
+
+ @Override
+@@ -79,19 +81,22 @@ class CometColumnarBatchReader implements VectorizedReader {
+ && !(readers[i] instanceof CometPositionColumnReader)
+ && !(readers[i] instanceof CometDeleteColumnReader)) {
+ readers[i].reset();
+- readers[i].setPageReader(pageStore.getPageReader(readers[i].descriptor()));
++ readers[i].setPageReader((RowGroupReader) pageStore);
+ }
+ } catch (IOException e) {
+ throw new UncheckedIOException("Failed to setRowGroupInfo for Comet vectorization", e);
+ }
+ }
+
++ AbstractColumnReader[] delegateReaders = new AbstractColumnReader[readers.length];
+ for (int i = 0; i < readers.length; i++) {
+- delegate.getColumnReaders()[i] = this.readers[i].delegate();
++ delegateReaders[i] = readers[i].delegate();
+ }
+
++ delegate.init(delegateReaders);
++
+ this.rowStartPosInBatch =
+- pageStore
++ ((RowGroupReader) pageStore)
+ .getRowIndexOffset()
+ .orElseThrow(
+ () ->
+@@ -148,9 +153,17 @@ class CometColumnarBatchReader implements VectorizedReader {
+ Pair pair = buildRowIdMapping(vectors);
+ if (pair != null) {
+ int[] rowIdMapping = pair.first();
+- numLiveRows = pair.second();
+- for (int i = 0; i < vectors.length; i++) {
+- vectors[i] = new ColumnVectorWithFilter(vectors[i], rowIdMapping);
++ if (pair.second() != null) {
++ numLiveRows = pair.second();
++ for (int i = 0; i < vectors.length; i++) {
++ if (vectors[i] instanceof CometVector) {
++ vectors[i] =
++ new CometSelectionVector((CometVector) vectors[i], rowIdMapping, numLiveRows);
++ } else {
++ throw new CometRuntimeException(
++ "Unsupported column vector type: " + vectors[i].getClass());
++ }
++ }
+ }
+ }
+ }
+diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometConstantColumnReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometConstantColumnReader.java
+index 047c96314..88d691a60 100644
+--- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometConstantColumnReader.java
++++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometConstantColumnReader.java
+@@ -21,6 +21,7 @@ package org.apache.iceberg.spark.data.vectorized;
+ import java.math.BigDecimal;
+ import java.nio.ByteBuffer;
+ import org.apache.comet.parquet.ConstantColumnReader;
++import org.apache.iceberg.parquet.CometTypeUtils;
+ import org.apache.iceberg.types.Types;
+ import org.apache.spark.sql.types.DataType;
+ import org.apache.spark.sql.types.DataTypes;
+@@ -34,7 +35,11 @@ class CometConstantColumnReader extends CometColumnReader {
+ super(field);
+ // use delegate to set constant value on the native side to be consumed by native execution.
+ setDelegate(
+- new ConstantColumnReader(sparkType(), descriptor(), convertToSparkValue(value), false));
++ new ConstantColumnReader(
++ sparkType(),
++ CometTypeUtils.descriptorToParquetColumnSpec(descriptor()),
++ convertToSparkValue(value),
++ false));
+ }
+
+ @Override
+diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometDeleteColumnReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometDeleteColumnReader.java
+index 6235bfe48..cba108e43 100644
+--- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometDeleteColumnReader.java
++++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometDeleteColumnReader.java
+@@ -51,10 +51,10 @@ class CometDeleteColumnReader extends CometColumnReader {
+ DeleteColumnReader() {
+ super(
+ DataTypes.BooleanType,
+- TypeUtil.convertToParquet(
++ TypeUtil.convertToParquetSpec(
+ new StructField("_deleted", DataTypes.BooleanType, false, Metadata.empty())),
+ false /* useDecimal128 = false */,
+- false /* isConstant = false */);
++ false /* isConstant */);
+ this.isDeleted = new boolean[0];
+ }
+
+diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometPositionColumnReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometPositionColumnReader.java
+index bcc0e514c..98e80068c 100644
+--- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometPositionColumnReader.java
++++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometPositionColumnReader.java
+@@ -20,6 +20,7 @@ package org.apache.iceberg.spark.data.vectorized;
+
+ import org.apache.comet.parquet.MetadataColumnReader;
+ import org.apache.comet.parquet.Native;
++import org.apache.iceberg.parquet.CometTypeUtils;
+ import org.apache.iceberg.types.Types;
+ import org.apache.parquet.column.ColumnDescriptor;
+ import org.apache.spark.sql.types.DataTypes;
+@@ -44,7 +45,7 @@ class CometPositionColumnReader extends CometColumnReader {
+ PositionColumnReader(ColumnDescriptor descriptor) {
+ super(
+ DataTypes.LongType,
+- descriptor,
++ CometTypeUtils.descriptorToParquetColumnSpec(descriptor),
+ false /* useDecimal128 = false */,
+ false /* isConstant = false */);
+ }
+diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometVectorizedReaderBuilder.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometVectorizedReaderBuilder.java
+index d36f1a727..56f8c9bff 100644
+--- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometVectorizedReaderBuilder.java
++++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometVectorizedReaderBuilder.java
+@@ -142,6 +142,7 @@ class CometVectorizedReaderBuilder extends TypeWithSchemaVisitor extends BaseReader taskGroup = (ScanTaskGroup>) task;
++ return taskGroup.tasks().stream().allMatch(this::supportsCometBatchReads);
++
++ } else if (task.isFileScanTask() && !task.isDataTask()) {
++ FileScanTask fileScanTask = task.asFileScanTask();
++ // Comet can't handle delete files for now
++ return fileScanTask.file().format() == FileFormat.PARQUET;
++
++ } else {
++ return false;
++ }
++ }
++
+ // conditions for using ORC batch reads:
+ // - ORC vectorization is enabled
+ // - all tasks are of type FileScanTask and read only ORC files with no delete files
+diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java
+index 106b296de..967b0d41d 100644
+--- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java
++++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java
+@@ -24,6 +24,7 @@ import java.util.Map;
+ import java.util.Optional;
+ import java.util.function.Supplier;
+ import java.util.stream.Collectors;
++import org.apache.comet.parquet.SupportsComet;
+ import org.apache.iceberg.BlobMetadata;
+ import org.apache.iceberg.ScanTask;
+ import org.apache.iceberg.ScanTaskGroup;
+@@ -95,7 +96,7 @@ import org.apache.spark.sql.types.StructType;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+
+-abstract class SparkScan implements Scan, SupportsReportStatistics {
++abstract class SparkScan implements Scan, SupportsReportStatistics, SupportsComet {
+ private static final Logger LOG = LoggerFactory.getLogger(SparkScan.class);
+ private static final String NDV_KEY = "ndv";
+
+@@ -351,4 +352,10 @@ abstract class SparkScan implements Scan, SupportsReportStatistics {
+ return splitSize;
+ }
+ }
++
++ @Override
++ public boolean isCometEnabled() {
++ SparkBatch batch = (SparkBatch) this.toBatch();
++ return batch.useCometBatchReads();
++ }
+ }
+diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/SparkDistributedDataScanTestBase.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/SparkDistributedDataScanTestBase.java
+index 404ba7284..00e97e96f 100644
+--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/SparkDistributedDataScanTestBase.java
++++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/SparkDistributedDataScanTestBase.java
+@@ -90,6 +90,16 @@ public abstract class SparkDistributedDataScanTestBase
+ .master("local[2]")
+ .config("spark.serializer", serializer)
+ .config(SQLConf.SHUFFLE_PARTITIONS().key(), "4")
++ .config("spark.plugins", "org.apache.spark.CometPlugin")
++ .config(
++ "spark.shuffle.manager",
++ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
++ .config("spark.comet.explainFallback.enabled", "true")
++ .config("spark.comet.scan.icebergNative.enabled", "true")
++ .config("spark.memory.offHeap.enabled", "true")
++ .config("spark.memory.offHeap.size", "10g")
++ .config("spark.comet.use.lazyMaterialization", "false")
++ .config("spark.comet.schemaEvolution.enabled", "true")
+ .getOrCreate();
+ }
+ }
+diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanDeletes.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanDeletes.java
+index 659507e4c..9076ec24d 100644
+--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanDeletes.java
++++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanDeletes.java
+@@ -73,6 +73,16 @@ public class TestSparkDistributedDataScanDeletes
+ .master("local[2]")
+ .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+ .config(SQLConf.SHUFFLE_PARTITIONS().key(), "4")
++ .config("spark.plugins", "org.apache.spark.CometPlugin")
++ .config(
++ "spark.shuffle.manager",
++ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
++ .config("spark.comet.explainFallback.enabled", "true")
++ .config("spark.comet.scan.icebergNative.enabled", "true")
++ .config("spark.memory.offHeap.enabled", "true")
++ .config("spark.memory.offHeap.size", "10g")
++ .config("spark.comet.use.lazyMaterialization", "false")
++ .config("spark.comet.schemaEvolution.enabled", "true")
+ .getOrCreate();
+ }
+
+diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanFilterFiles.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanFilterFiles.java
+index a218f965e..eca0125ac 100644
+--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanFilterFiles.java
++++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanFilterFiles.java
+@@ -62,6 +62,16 @@ public class TestSparkDistributedDataScanFilterFiles
+ .master("local[2]")
+ .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+ .config(SQLConf.SHUFFLE_PARTITIONS().key(), "4")
++ .config("spark.plugins", "org.apache.spark.CometPlugin")
++ .config(
++ "spark.shuffle.manager",
++ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
++ .config("spark.comet.explainFallback.enabled", "true")
++ .config("spark.comet.scan.icebergNative.enabled", "true")
++ .config("spark.memory.offHeap.enabled", "true")
++ .config("spark.memory.offHeap.size", "10g")
++ .config("spark.comet.use.lazyMaterialization", "false")
++ .config("spark.comet.schemaEvolution.enabled", "true")
+ .getOrCreate();
+ }
+
+diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanReporting.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanReporting.java
+index 2665d7ba8..2381d0aa1 100644
+--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanReporting.java
++++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanReporting.java
+@@ -63,6 +63,16 @@ public class TestSparkDistributedDataScanReporting
+ .master("local[2]")
+ .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+ .config(SQLConf.SHUFFLE_PARTITIONS().key(), "4")
++ .config("spark.plugins", "org.apache.spark.CometPlugin")
++ .config(
++ "spark.shuffle.manager",
++ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
++ .config("spark.comet.explainFallback.enabled", "true")
++ .config("spark.comet.scan.icebergNative.enabled", "true")
++ .config("spark.memory.offHeap.enabled", "true")
++ .config("spark.memory.offHeap.size", "10g")
++ .config("spark.comet.use.lazyMaterialization", "false")
++ .config("spark.comet.schemaEvolution.enabled", "true")
+ .getOrCreate();
+ }
+
+diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestBase.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestBase.java
+index daf4e29ac..cffe2944e 100644
+--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestBase.java
++++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestBase.java
+@@ -79,6 +79,17 @@ public abstract class TestBase extends SparkTestHelperBase {
+ .config(SQLConf.PARTITION_OVERWRITE_MODE().key(), "dynamic")
+ .config("spark.hadoop." + METASTOREURIS.varname, hiveConf.get(METASTOREURIS.varname))
+ .config("spark.sql.legacy.respectNullabilityInTextDatasetConversion", "true")
++ .config("spark.plugins", "org.apache.spark.CometPlugin")
++ .config(
++ "spark.shuffle.manager",
++ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
++ .config("spark.comet.explainFallback.enabled", "true")
++ .config("spark.comet.scan.icebergNative.enabled", "true")
++ .config("spark.memory.offHeap.enabled", "true")
++ .config("spark.memory.offHeap.size", "10g")
++ .config("spark.comet.use.lazyMaterialization", "false")
++ .config("spark.comet.schemaEvolution.enabled", "true")
++ .config("spark.comet.exec.broadcastExchange.enabled", "false")
+ .enableHiveSupport()
+ .getOrCreate();
+
+diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetDictionaryEncodedVectorizedReads.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetDictionaryEncodedVectorizedReads.java
+index 973a17c9a..4490c219d 100644
+--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetDictionaryEncodedVectorizedReads.java
++++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetDictionaryEncodedVectorizedReads.java
+@@ -65,6 +65,16 @@ public class TestParquetDictionaryEncodedVectorizedReads extends TestParquetVect
+ SparkSession.builder()
+ .master("local[2]")
+ .config("spark.driver.host", InetAddress.getLoopbackAddress().getHostAddress())
++ .config("spark.plugins", "org.apache.spark.CometPlugin")
++ .config(
++ "spark.shuffle.manager",
++ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
++ .config("spark.comet.explainFallback.enabled", "true")
++ .config("spark.comet.scan.icebergNative.enabled", "true")
++ .config("spark.memory.offHeap.enabled", "true")
++ .config("spark.memory.offHeap.size", "10g")
++ .config("spark.comet.use.lazyMaterialization", "false")
++ .config("spark.comet.schemaEvolution.enabled", "true")
+ .getOrCreate();
+ }
+
+diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/ScanTestBase.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/ScanTestBase.java
+index 1c5905744..543edf3b2 100644
+--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/ScanTestBase.java
++++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/ScanTestBase.java
+@@ -61,6 +61,16 @@ public abstract class ScanTestBase extends AvroDataTestBase {
+ ScanTestBase.spark =
+ SparkSession.builder()
+ .config("spark.driver.host", InetAddress.getLoopbackAddress().getHostAddress())
++ .config("spark.plugins", "org.apache.spark.CometPlugin")
++ .config(
++ "spark.shuffle.manager",
++ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
++ .config("spark.comet.explainFallback.enabled", "true")
++ .config("spark.comet.scan.icebergNative.enabled", "true")
++ .config("spark.memory.offHeap.enabled", "true")
++ .config("spark.memory.offHeap.size", "10g")
++ .config("spark.comet.use.lazyMaterialization", "false")
++ .config("spark.comet.schemaEvolution.enabled", "true")
+ .master("local[2]")
+ .getOrCreate();
+ ScanTestBase.sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
+diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java
+index 19ec6d13d..e2d0a84d0 100644
+--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java
++++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java
+@@ -144,7 +144,20 @@ public class TestCompressionSettings extends CatalogTestBase {
+
+ @BeforeAll
+ public static void startSpark() {
+- TestCompressionSettings.spark = SparkSession.builder().master("local[2]").getOrCreate();
++ TestCompressionSettings.spark =
++ SparkSession.builder()
++ .master("local[2]")
++ .config("spark.plugins", "org.apache.spark.CometPlugin")
++ .config(
++ "spark.shuffle.manager",
++ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
++ .config("spark.comet.explainFallback.enabled", "true")
++ .config("spark.comet.scan.icebergNative.enabled", "true")
++ .config("spark.memory.offHeap.enabled", "true")
++ .config("spark.memory.offHeap.size", "10g")
++ .config("spark.comet.use.lazyMaterialization", "false")
++ .config("spark.comet.schemaEvolution.enabled", "true")
++ .getOrCreate();
+ }
+
+ @BeforeEach
+diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
+index a7702b169..e1fdafae8 100644
+--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
++++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
+@@ -74,7 +74,20 @@ public class TestDataSourceOptions extends TestBaseWithCatalog {
+
+ @BeforeAll
+ public static void startSpark() {
+- TestDataSourceOptions.spark = SparkSession.builder().master("local[2]").getOrCreate();
++ TestDataSourceOptions.spark =
++ SparkSession.builder()
++ .master("local[2]")
++ .config("spark.plugins", "org.apache.spark.CometPlugin")
++ .config(
++ "spark.shuffle.manager",
++ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
++ .config("spark.comet.explainFallback.enabled", "true")
++ .config("spark.comet.scan.icebergNative.enabled", "true")
++ .config("spark.memory.offHeap.enabled", "true")
++ .config("spark.memory.offHeap.size", "10g")
++ .config("spark.comet.use.lazyMaterialization", "false")
++ .config("spark.comet.schemaEvolution.enabled", "true")
++ .getOrCreate();
+ }
+
+ @AfterAll
+diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java
+index fd7d52178..c8aab2996 100644
+--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java
++++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java
+@@ -114,6 +114,16 @@ public class TestFilteredScan {
+ SparkSession.builder()
+ .master("local[2]")
+ .config("spark.driver.host", InetAddress.getLoopbackAddress().getHostAddress())
++ .config("spark.plugins", "org.apache.spark.CometPlugin")
++ .config(
++ "spark.shuffle.manager",
++ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
++ .config("spark.comet.explainFallback.enabled", "true")
++ .config("spark.comet.scan.icebergNative.enabled", "true")
++ .config("spark.memory.offHeap.enabled", "true")
++ .config("spark.memory.offHeap.size", "10g")
++ .config("spark.comet.use.lazyMaterialization", "false")
++ .config("spark.comet.schemaEvolution.enabled", "true")
+ .getOrCreate();
+ }
+
+diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestForwardCompatibility.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestForwardCompatibility.java
+index 153564f7d..264179459 100644
+--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestForwardCompatibility.java
++++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestForwardCompatibility.java
+@@ -98,6 +98,16 @@ public class TestForwardCompatibility {
+ SparkSession.builder()
+ .master("local[2]")
+ .config("spark.driver.host", InetAddress.getLoopbackAddress().getHostAddress())
++ .config("spark.plugins", "org.apache.spark.CometPlugin")
++ .config(
++ "spark.shuffle.manager",
++ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
++ .config("spark.comet.explainFallback.enabled", "true")
++ .config("spark.comet.scan.icebergNative.enabled", "true")
++ .config("spark.memory.offHeap.enabled", "true")
++ .config("spark.memory.offHeap.size", "10g")
++ .config("spark.comet.use.lazyMaterialization", "false")
++ .config("spark.comet.schemaEvolution.enabled", "true")
+ .getOrCreate();
+ }
+
+diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSpark.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSpark.java
+index f4f57157e..7dd2ed811 100644
+--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSpark.java
++++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSpark.java
+@@ -51,6 +51,16 @@ public class TestIcebergSpark {
+ SparkSession.builder()
+ .master("local[2]")
+ .config("spark.driver.host", InetAddress.getLoopbackAddress().getHostAddress())
++ .config("spark.plugins", "org.apache.spark.CometPlugin")
++ .config(
++ "spark.shuffle.manager",
++ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
++ .config("spark.comet.explainFallback.enabled", "true")
++ .config("spark.comet.scan.icebergNative.enabled", "true")
++ .config("spark.memory.offHeap.enabled", "true")
++ .config("spark.memory.offHeap.size", "10g")
++ .config("spark.comet.use.lazyMaterialization", "false")
++ .config("spark.comet.schemaEvolution.enabled", "true")
+ .getOrCreate();
+ }
+
+diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionPruning.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionPruning.java
+index e1402396f..8b60b8bc4 100644
+--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionPruning.java
++++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionPruning.java
+@@ -118,6 +118,16 @@ public class TestPartitionPruning {
+ SparkSession.builder()
+ .master("local[2]")
+ .config("spark.driver.host", InetAddress.getLoopbackAddress().getHostAddress())
++ .config("spark.plugins", "org.apache.spark.CometPlugin")
++ .config(
++ "spark.shuffle.manager",
++ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
++ .config("spark.comet.explainFallback.enabled", "true")
++ .config("spark.comet.scan.icebergNative.enabled", "true")
++ .config("spark.memory.offHeap.enabled", "true")
++ .config("spark.memory.offHeap.size", "10g")
++ .config("spark.comet.use.lazyMaterialization", "false")
++ .config("spark.comet.schemaEvolution.enabled", "true")
+ .getOrCreate();
+ TestPartitionPruning.sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext());
+
+diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionValues.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionValues.java
+index 0b6ab2052..dec4026f3 100644
+--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionValues.java
++++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionValues.java
+@@ -112,6 +112,16 @@ public class TestPartitionValues {
+ SparkSession.builder()
+ .master("local[2]")
+ .config("spark.driver.host", InetAddress.getLoopbackAddress().getHostAddress())
++ .config("spark.plugins", "org.apache.spark.CometPlugin")
++ .config(
++ "spark.shuffle.manager",
++ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
++ .config("spark.comet.explainFallback.enabled", "true")
++ .config("spark.comet.scan.icebergNative.enabled", "true")
++ .config("spark.memory.offHeap.enabled", "true")
++ .config("spark.memory.offHeap.size", "10g")
++ .config("spark.comet.use.lazyMaterialization", "false")
++ .config("spark.comet.schemaEvolution.enabled", "true")
+ .getOrCreate();
+ }
+
+diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java
+index 11865db7f..bdb9ae32a 100644
+--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java
++++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java
+@@ -91,6 +91,16 @@ public class TestSnapshotSelection {
+ SparkSession.builder()
+ .master("local[2]")
+ .config("spark.driver.host", InetAddress.getLoopbackAddress().getHostAddress())
++ .config("spark.plugins", "org.apache.spark.CometPlugin")
++ .config(
++ "spark.shuffle.manager",
++ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
++ .config("spark.comet.explainFallback.enabled", "true")
++ .config("spark.comet.scan.icebergNative.enabled", "true")
++ .config("spark.memory.offHeap.enabled", "true")
++ .config("spark.memory.offHeap.size", "10g")
++ .config("spark.comet.use.lazyMaterialization", "false")
++ .config("spark.comet.schemaEvolution.enabled", "true")
+ .getOrCreate();
+ }
+
+diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataFile.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataFile.java
+index 3051e27d7..22899c233 100644
+--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataFile.java
++++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataFile.java
+@@ -125,6 +125,16 @@ public class TestSparkDataFile {
+ SparkSession.builder()
+ .master("local[2]")
+ .config("spark.driver.host", InetAddress.getLoopbackAddress().getHostAddress())
++ .config("spark.plugins", "org.apache.spark.CometPlugin")
++ .config(
++ "spark.shuffle.manager",
++ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
++ .config("spark.comet.explainFallback.enabled", "true")
++ .config("spark.comet.scan.icebergNative.enabled", "true")
++ .config("spark.memory.offHeap.enabled", "true")
++ .config("spark.memory.offHeap.size", "10g")
++ .config("spark.comet.use.lazyMaterialization", "false")
++ .config("spark.comet.schemaEvolution.enabled", "true")
+ .getOrCreate();
+ TestSparkDataFile.sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext());
+ }
+diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
+index 4ccbf86f1..2089bac57 100644
+--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
++++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
+@@ -100,6 +100,16 @@ public class TestSparkDataWrite {
+ SparkSession.builder()
+ .master("local[2]")
+ .config("spark.driver.host", InetAddress.getLoopbackAddress().getHostAddress())
++ .config("spark.plugins", "org.apache.spark.CometPlugin")
++ .config(
++ "spark.shuffle.manager",
++ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
++ .config("spark.comet.explainFallback.enabled", "true")
++ .config("spark.comet.scan.icebergNative.enabled", "true")
++ .config("spark.memory.offHeap.enabled", "true")
++ .config("spark.memory.offHeap.size", "10g")
++ .config("spark.comet.use.lazyMaterialization", "false")
++ .config("spark.comet.schemaEvolution.enabled", "true")
+ .getOrCreate();
+ }
+
+@@ -144,7 +154,7 @@ public class TestSparkDataWrite {
+ Dataset result = spark.read().format("iceberg").load(targetLocation);
+
+ List actual =
+- result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
++ result.orderBy("id", "data").as(Encoders.bean(SimpleRecord.class)).collectAsList();
+ assertThat(actual).hasSameSizeAs(expected).isEqualTo(expected);
+ for (ManifestFile manifest :
+ SnapshotUtil.latestSnapshot(table, branch).allManifests(table.io())) {
+@@ -213,7 +223,7 @@ public class TestSparkDataWrite {
+ Dataset result = spark.read().format("iceberg").load(targetLocation);
+
+ List actual =
+- result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
++ result.orderBy("id", "data").as(Encoders.bean(SimpleRecord.class)).collectAsList();
+ assertThat(actual).hasSameSizeAs(expected).isEqualTo(expected);
+ }
+
+@@ -258,7 +268,7 @@ public class TestSparkDataWrite {
+ Dataset result = spark.read().format("iceberg").load(targetLocation);
+
+ List actual =
+- result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
++ result.orderBy("id", "data").as(Encoders.bean(SimpleRecord.class)).collectAsList();
+ assertThat(actual).hasSameSizeAs(expected).isEqualTo(expected);
+ }
+
+@@ -310,7 +320,7 @@ public class TestSparkDataWrite {
+ Dataset result = spark.read().format("iceberg").load(targetLocation);
+
+ List actual =
+- result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
++ result.orderBy("id", "data").as(Encoders.bean(SimpleRecord.class)).collectAsList();
+ assertThat(actual).hasSameSizeAs(expected).isEqualTo(expected);
+ }
+
+@@ -352,7 +362,7 @@ public class TestSparkDataWrite {
+ Dataset result = spark.read().format("iceberg").load(targetLocation);
+
+ List actual =
+- result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
++ result.orderBy("id", "data").as(Encoders.bean(SimpleRecord.class)).collectAsList();
+ assertThat(actual).hasSameSizeAs(expected).isEqualTo(expected);
+ }
+
+@@ -391,7 +401,7 @@ public class TestSparkDataWrite {
+ Dataset result = spark.read().format("iceberg").load(targetLocation);
+
+ List actual =
+- result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
++ result.orderBy("id", "data").as(Encoders.bean(SimpleRecord.class)).collectAsList();
+ assertThat(actual).hasSameSizeAs(expected).isEqualTo(expected);
+
+ List files = Lists.newArrayList();
+@@ -459,7 +469,7 @@ public class TestSparkDataWrite {
+ Dataset result = spark.read().format("iceberg").load(targetLocation);
+
+ List actual =
+- result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
++ result.orderBy("id", "data").as(Encoders.bean(SimpleRecord.class)).collectAsList();
+ assertThat(actual).hasSameSizeAs(expected).isEqualTo(expected);
+ }
+
+@@ -706,7 +716,7 @@ public class TestSparkDataWrite {
+ // Since write and commit succeeded, the rows should be readable
+ Dataset result = spark.read().format("iceberg").load(targetLocation);
+ List actual =
+- result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
++ result.orderBy("id", "data").as(Encoders.bean(SimpleRecord.class)).collectAsList();
+ assertThat(actual)
+ .hasSize(records.size() + records2.size())
+ .containsExactlyInAnyOrder(
+diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReadProjection.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReadProjection.java
+index 596d05d30..e681d8170 100644
+--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReadProjection.java
++++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReadProjection.java
+@@ -88,6 +88,16 @@ public class TestSparkReadProjection extends TestReadProjection {
+ SparkSession.builder()
+ .master("local[2]")
+ .config("spark.driver.host", InetAddress.getLoopbackAddress().getHostAddress())
++ .config("spark.plugins", "org.apache.spark.CometPlugin")
++ .config(
++ "spark.shuffle.manager",
++ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
++ .config("spark.comet.explainFallback.enabled", "true")
++ .config("spark.comet.scan.icebergNative.enabled", "true")
++ .config("spark.memory.offHeap.enabled", "true")
++ .config("spark.memory.offHeap.size", "10g")
++ .config("spark.comet.use.lazyMaterialization", "false")
++ .config("spark.comet.schemaEvolution.enabled", "true")
+ .getOrCreate();
+ ImmutableMap config =
+ ImmutableMap.of(
+diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
+index 42699f466..c5e077d7a 100644
+--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
++++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
+@@ -138,6 +138,16 @@ public class TestSparkReaderDeletes extends DeleteReadTests {
+ .config("spark.ui.liveUpdate.period", 0)
+ .config(SQLConf.PARTITION_OVERWRITE_MODE().key(), "dynamic")
+ .config("spark.hadoop." + METASTOREURIS.varname, hiveConf.get(METASTOREURIS.varname))
++ .config("spark.plugins", "org.apache.spark.CometPlugin")
++ .config(
++ "spark.shuffle.manager",
++ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
++ .config("spark.comet.explainFallback.enabled", "true")
++ .config("spark.comet.scan.icebergNative.enabled", "true")
++ .config("spark.memory.offHeap.enabled", "true")
++ .config("spark.memory.offHeap.size", "10g")
++ .config("spark.comet.use.lazyMaterialization", "false")
++ .config("spark.comet.schemaEvolution.enabled", "true")
+ .enableHiveSupport()
+ .getOrCreate();
+
+@@ -205,9 +215,10 @@ public class TestSparkReaderDeletes extends DeleteReadTests {
+ catalog.dropTable(TableIdentifier.of("default", name));
+ }
+
+- protected boolean countDeletes() {
+- return true;
+- }
++protected boolean countDeletes() {
++ // TODO: Enable once iceberg-rust exposes delete count metrics to Comet
++ return false;
++}
+
+ @Override
+ protected long deleteCount() {
+diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderWithBloomFilter.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderWithBloomFilter.java
+index baf7fa8f8..150cc8969 100644
+--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderWithBloomFilter.java
++++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderWithBloomFilter.java
+@@ -182,6 +182,16 @@ public class TestSparkReaderWithBloomFilter {
+ SparkSession.builder()
+ .master("local[2]")
+ .config("spark.hadoop." + METASTOREURIS.varname, hiveConf.get(METASTOREURIS.varname))
++ .config("spark.plugins", "org.apache.spark.CometPlugin")
++ .config(
++ "spark.shuffle.manager",
++ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
++ .config("spark.comet.explainFallback.enabled", "true")
++ .config("spark.comet.scan.icebergNative.enabled", "true")
++ .config("spark.memory.offHeap.enabled", "true")
++ .config("spark.memory.offHeap.size", "10g")
++ .config("spark.comet.use.lazyMaterialization", "false")
++ .config("spark.comet.schemaEvolution.enabled", "true")
+ .enableHiveSupport()
+ .getOrCreate();
+
+diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreaming.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreaming.java
+index 54048bbf2..c87216824 100644
+--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreaming.java
++++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreaming.java
+@@ -69,6 +69,16 @@ public class TestStructuredStreaming {
+ .master("local[2]")
+ .config("spark.driver.host", InetAddress.getLoopbackAddress().getHostAddress())
+ .config("spark.sql.shuffle.partitions", 4)
++ .config("spark.plugins", "org.apache.spark.CometPlugin")
++ .config(
++ "spark.shuffle.manager",
++ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
++ .config("spark.comet.explainFallback.enabled", "true")
++ .config("spark.comet.scan.icebergNative.enabled", "true")
++ .config("spark.memory.offHeap.enabled", "true")
++ .config("spark.memory.offHeap.size", "10g")
++ .config("spark.comet.use.lazyMaterialization", "false")
++ .config("spark.comet.schemaEvolution.enabled", "true")
+ .getOrCreate();
+ }
+
+diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone.java
+index 8b1e3fbfc..b4385c395 100644
+--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone.java
++++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone.java
+@@ -75,7 +75,20 @@ public class TestTimestampWithoutZone extends TestBase {
+
+ @BeforeAll
+ public static void startSpark() {
+- TestTimestampWithoutZone.spark = SparkSession.builder().master("local[2]").getOrCreate();
++ TestTimestampWithoutZone.spark =
++ SparkSession.builder()
++ .master("local[2]")
++ .config("spark.plugins", "org.apache.spark.CometPlugin")
++ .config(
++ "spark.shuffle.manager",
++ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
++ .config("spark.comet.explainFallback.enabled", "true")
++ .config("spark.comet.scan.icebergNative.enabled", "true")
++ .config("spark.memory.offHeap.enabled", "true")
++ .config("spark.memory.offHeap.size", "10g")
++ .config("spark.comet.use.lazyMaterialization", "false")
++ .config("spark.comet.schemaEvolution.enabled", "true")
++ .getOrCreate();
+ }
+
+ @AfterAll
+diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestWriteMetricsConfig.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestWriteMetricsConfig.java
+index c3fac70dd..b6c66f3f8 100644
+--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestWriteMetricsConfig.java
++++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestWriteMetricsConfig.java
+@@ -84,6 +84,16 @@ public class TestWriteMetricsConfig {
+ SparkSession.builder()
+ .master("local[2]")
+ .config("spark.driver.host", InetAddress.getLoopbackAddress().getHostAddress())
++ .config("spark.plugins", "org.apache.spark.CometPlugin")
++ .config(
++ "spark.shuffle.manager",
++ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
++ .config("spark.comet.explainFallback.enabled", "true")
++ .config("spark.comet.scan.icebergNative.enabled", "true")
++ .config("spark.memory.offHeap.enabled", "true")
++ .config("spark.memory.offHeap.size", "10g")
++ .config("spark.comet.use.lazyMaterialization", "false")
++ .config("spark.comet.schemaEvolution.enabled", "true")
+ .getOrCreate();
+ TestWriteMetricsConfig.sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
+ }
+diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestAggregatePushDown.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestAggregatePushDown.java
+index 5ce56b4fe..df1f914da 100644
+--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestAggregatePushDown.java
++++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestAggregatePushDown.java
+@@ -63,6 +63,16 @@ public class TestAggregatePushDown extends CatalogTestBase {
+ SparkSession.builder()
+ .master("local[2]")
+ .config("spark.sql.iceberg.aggregate_pushdown", "true")
++ .config("spark.plugins", "org.apache.spark.CometPlugin")
++ .config(
++ "spark.shuffle.manager",
++ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
++ .config("spark.comet.explainFallback.enabled", "true")
++ .config("spark.comet.scan.icebergNative.enabled", "true")
++ .config("spark.memory.offHeap.enabled", "true")
++ .config("spark.memory.offHeap.size", "10g")
++ .config("spark.comet.use.lazyMaterialization", "false")
++ .config("spark.comet.schemaEvolution.enabled", "true")
+ .enableHiveSupport()
+ .getOrCreate();
+
+diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestFilterPushDown.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestFilterPushDown.java
+index 9d2ce2b38..5e2336884 100644
+--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestFilterPushDown.java
++++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestFilterPushDown.java
+@@ -598,9 +598,7 @@ public class TestFilterPushDown extends TestBaseWithCatalog {
+ String planAsString = sparkPlan.toString().replaceAll("#(\\d+L?)", "");
+
+ if (sparkFilter != null) {
+- assertThat(planAsString)
+- .as("Post scan filter should match")
+- .contains("Filter (" + sparkFilter + ")");
++ assertThat(planAsString).as("Post scan filter should match").contains("CometFilter");
+ } else {
+ assertThat(planAsString).as("Should be no post scan filter").doesNotContain("Filter (");
+ }
diff --git a/dev/diffs/iceberg-rust/1.8.1.diff b/dev/diffs/iceberg-rust/1.8.1.diff
new file mode 100644
index 0000000000..e7b58902ca
--- /dev/null
+++ b/dev/diffs/iceberg-rust/1.8.1.diff
@@ -0,0 +1,1902 @@
+diff --git a/build.gradle b/build.gradle
+index 7327b3890..7967109f0 100644
+--- a/build.gradle
++++ b/build.gradle
+@@ -780,6 +780,13 @@ project(':iceberg-parquet') {
+ implementation project(':iceberg-core')
+ implementation project(':iceberg-common')
+
++ implementation("org.apache.datafusion:comet-spark-spark${sparkVersionsString}_${scalaVersion}:${libs.versions.comet.get()}") {
++ exclude group: 'org.apache.arrow'
++ exclude group: 'org.apache.parquet'
++ exclude group: 'org.apache.spark'
++ exclude group: 'org.apache.iceberg'
++ }
++
+ implementation(libs.parquet.avro) {
+ exclude group: 'org.apache.avro', module: 'avro'
+ // already shaded by Parquet
+diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml
+index 04ffa8f4e..3a57af315 100644
+--- a/gradle/libs.versions.toml
++++ b/gradle/libs.versions.toml
+@@ -34,6 +34,7 @@ azuresdk-bom = "1.2.31"
+ awssdk-s3accessgrants = "2.3.0"
+ caffeine = "2.9.3"
+ calcite = "1.10.0"
++comet = "0.12.0-SNAPSHOT"
+ datasketches = "6.2.0"
+ delta-standalone = "3.3.0"
+ delta-spark = "3.3.0"
+@@ -81,7 +82,7 @@ slf4j = "2.0.16"
+ snowflake-jdbc = "3.22.0"
+ spark-hive33 = "3.3.4"
+ spark-hive34 = "3.4.4"
+-spark-hive35 = "3.5.4"
++spark-hive35 = "3.5.6"
+ sqlite-jdbc = "3.48.0.0"
+ testcontainers = "1.20.4"
+ tez010 = "0.10.4"
+diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/CometTypeUtils.java b/parquet/src/main/java/org/apache/iceberg/parquet/CometTypeUtils.java
+new file mode 100644
+index 000000000..ddf6c7de5
+--- /dev/null
++++ b/parquet/src/main/java/org/apache/iceberg/parquet/CometTypeUtils.java
+@@ -0,0 +1,255 @@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++package org.apache.iceberg.parquet;
++
++import java.util.Map;
++import org.apache.comet.parquet.ParquetColumnSpec;
++import org.apache.iceberg.relocated.com.google.common.collect.Maps;
++import org.apache.parquet.column.ColumnDescriptor;
++import org.apache.parquet.schema.LogicalTypeAnnotation;
++import org.apache.parquet.schema.PrimitiveType;
++import org.apache.parquet.schema.Type;
++import org.apache.parquet.schema.Types;
++
++public class CometTypeUtils {
++
++ private CometTypeUtils() {}
++
++ public static ParquetColumnSpec descriptorToParquetColumnSpec(ColumnDescriptor descriptor) {
++
++ String[] path = descriptor.getPath();
++ PrimitiveType primitiveType = descriptor.getPrimitiveType();
++ String physicalType = primitiveType.getPrimitiveTypeName().name();
++
++ int typeLength =
++ primitiveType.getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY
++ ? primitiveType.getTypeLength()
++ : 0;
++
++ boolean isRepeated = primitiveType.getRepetition() == Type.Repetition.REPEATED;
++
++ // ToDo: extract this into a Util method
++ String logicalTypeName = null;
++ Map logicalTypeParams = Maps.newHashMap();
++ LogicalTypeAnnotation logicalType = primitiveType.getLogicalTypeAnnotation();
++
++ if (logicalType != null) {
++ logicalTypeName = logicalType.getClass().getSimpleName();
++
++ // Handle specific logical types
++ if (logicalType instanceof LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) {
++ LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimal =
++ (LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) logicalType;
++ logicalTypeParams.put("precision", String.valueOf(decimal.getPrecision()));
++ logicalTypeParams.put("scale", String.valueOf(decimal.getScale()));
++ } else if (logicalType instanceof LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) {
++ LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestamp =
++ (LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) logicalType;
++ logicalTypeParams.put("isAdjustedToUTC", String.valueOf(timestamp.isAdjustedToUTC()));
++ logicalTypeParams.put("unit", timestamp.getUnit().name());
++ } else if (logicalType instanceof LogicalTypeAnnotation.TimeLogicalTypeAnnotation) {
++ LogicalTypeAnnotation.TimeLogicalTypeAnnotation time =
++ (LogicalTypeAnnotation.TimeLogicalTypeAnnotation) logicalType;
++ logicalTypeParams.put("isAdjustedToUTC", String.valueOf(time.isAdjustedToUTC()));
++ logicalTypeParams.put("unit", time.getUnit().name());
++ } else if (logicalType instanceof LogicalTypeAnnotation.IntLogicalTypeAnnotation) {
++ LogicalTypeAnnotation.IntLogicalTypeAnnotation intType =
++ (LogicalTypeAnnotation.IntLogicalTypeAnnotation) logicalType;
++ logicalTypeParams.put("isSigned", String.valueOf(intType.isSigned()));
++ logicalTypeParams.put("bitWidth", String.valueOf(intType.getBitWidth()));
++ }
++ }
++
++ return new ParquetColumnSpec(
++ 1, // ToDo: pass in the correct id
++ path,
++ physicalType,
++ typeLength,
++ isRepeated,
++ descriptor.getMaxDefinitionLevel(),
++ descriptor.getMaxRepetitionLevel(),
++ logicalTypeName,
++ logicalTypeParams);
++ }
++
++ public static ColumnDescriptor buildColumnDescriptor(ParquetColumnSpec columnSpec) {
++ PrimitiveType.PrimitiveTypeName primType =
++ PrimitiveType.PrimitiveTypeName.valueOf(columnSpec.getPhysicalType());
++
++ Type.Repetition repetition;
++ if (columnSpec.getMaxRepetitionLevel() > 0) {
++ repetition = Type.Repetition.REPEATED;
++ } else if (columnSpec.getMaxDefinitionLevel() > 0) {
++ repetition = Type.Repetition.OPTIONAL;
++ } else {
++ repetition = Type.Repetition.REQUIRED;
++ }
++
++ String name = columnSpec.getPath()[columnSpec.getPath().length - 1];
++ // Reconstruct the logical type from parameters
++ LogicalTypeAnnotation logicalType = null;
++ if (columnSpec.getLogicalTypeName() != null) {
++ logicalType =
++ reconstructLogicalType(
++ columnSpec.getLogicalTypeName(), columnSpec.getLogicalTypeParams());
++ }
++
++ PrimitiveType primitiveType;
++ if (primType == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) {
++ primitiveType =
++ org.apache.parquet.schema.Types.primitive(primType, repetition)
++ .length(columnSpec.getTypeLength())
++ .as(logicalType)
++ .id(columnSpec.getFieldId())
++ .named(name);
++ } else {
++ primitiveType =
++ Types.primitive(primType, repetition)
++ .as(logicalType)
++ .id(columnSpec.getFieldId())
++ .named(name);
++ }
++
++ return new ColumnDescriptor(
++ columnSpec.getPath(),
++ primitiveType,
++ columnSpec.getMaxRepetitionLevel(),
++ columnSpec.getMaxDefinitionLevel());
++ }
++
++ private static LogicalTypeAnnotation reconstructLogicalType(
++ String logicalTypeName, java.util.Map params) {
++
++ switch (logicalTypeName) {
++ // MAP
++ case "MapLogicalTypeAnnotation":
++ return LogicalTypeAnnotation.mapType();
++
++ // LIST
++ case "ListLogicalTypeAnnotation":
++ return LogicalTypeAnnotation.listType();
++
++ // STRING
++ case "StringLogicalTypeAnnotation":
++ return LogicalTypeAnnotation.stringType();
++
++ // MAP_KEY_VALUE
++ case "MapKeyValueLogicalTypeAnnotation":
++ return LogicalTypeAnnotation.MapKeyValueTypeAnnotation.getInstance();
++
++ // ENUM
++ case "EnumLogicalTypeAnnotation":
++ return LogicalTypeAnnotation.enumType();
++
++ // DECIMAL
++ case "DecimalLogicalTypeAnnotation":
++ if (!params.containsKey("scale") || !params.containsKey("precision")) {
++ throw new IllegalArgumentException(
++ "Missing required parameters for DecimalLogicalTypeAnnotation: " + params);
++ }
++ int scale = Integer.parseInt(params.get("scale"));
++ int precision = Integer.parseInt(params.get("precision"));
++ return LogicalTypeAnnotation.decimalType(scale, precision);
++
++ // DATE
++ case "DateLogicalTypeAnnotation":
++ return LogicalTypeAnnotation.dateType();
++
++ // TIME
++ case "TimeLogicalTypeAnnotation":
++ if (!params.containsKey("isAdjustedToUTC") || !params.containsKey("unit")) {
++ throw new IllegalArgumentException(
++ "Missing required parameters for TimeLogicalTypeAnnotation: " + params);
++ }
++
++ boolean isUTC = Boolean.parseBoolean(params.get("isAdjustedToUTC"));
++ String timeUnitStr = params.get("unit");
++
++ LogicalTypeAnnotation.TimeUnit timeUnit;
++ switch (timeUnitStr) {
++ case "MILLIS":
++ timeUnit = LogicalTypeAnnotation.TimeUnit.MILLIS;
++ break;
++ case "MICROS":
++ timeUnit = LogicalTypeAnnotation.TimeUnit.MICROS;
++ break;
++ case "NANOS":
++ timeUnit = LogicalTypeAnnotation.TimeUnit.NANOS;
++ break;
++ default:
++ throw new IllegalArgumentException("Unknown time unit: " + timeUnitStr);
++ }
++ return LogicalTypeAnnotation.timeType(isUTC, timeUnit);
++
++ // TIMESTAMP
++ case "TimestampLogicalTypeAnnotation":
++ if (!params.containsKey("isAdjustedToUTC") || !params.containsKey("unit")) {
++ throw new IllegalArgumentException(
++ "Missing required parameters for TimestampLogicalTypeAnnotation: " + params);
++ }
++ boolean isAdjustedToUTC = Boolean.parseBoolean(params.get("isAdjustedToUTC"));
++ String unitStr = params.get("unit");
++
++ LogicalTypeAnnotation.TimeUnit unit;
++ switch (unitStr) {
++ case "MILLIS":
++ unit = LogicalTypeAnnotation.TimeUnit.MILLIS;
++ break;
++ case "MICROS":
++ unit = LogicalTypeAnnotation.TimeUnit.MICROS;
++ break;
++ case "NANOS":
++ unit = LogicalTypeAnnotation.TimeUnit.NANOS;
++ break;
++ default:
++ throw new IllegalArgumentException("Unknown timestamp unit: " + unitStr);
++ }
++ return LogicalTypeAnnotation.timestampType(isAdjustedToUTC, unit);
++
++ // INTEGER
++ case "IntLogicalTypeAnnotation":
++ if (!params.containsKey("isSigned") || !params.containsKey("bitWidth")) {
++ throw new IllegalArgumentException(
++ "Missing required parameters for IntLogicalTypeAnnotation: " + params);
++ }
++ boolean isSigned = Boolean.parseBoolean(params.get("isSigned"));
++ int bitWidth = Integer.parseInt(params.get("bitWidth"));
++ return LogicalTypeAnnotation.intType(bitWidth, isSigned);
++
++ // JSON
++ case "JsonLogicalTypeAnnotation":
++ return LogicalTypeAnnotation.jsonType();
++
++ // BSON
++ case "BsonLogicalTypeAnnotation":
++ return LogicalTypeAnnotation.bsonType();
++
++ // UUID
++ case "UUIDLogicalTypeAnnotation":
++ return LogicalTypeAnnotation.uuidType();
++
++ // INTERVAL
++ case "IntervalLogicalTypeAnnotation":
++ return LogicalTypeAnnotation.IntervalLogicalTypeAnnotation.getInstance();
++
++ default:
++ throw new IllegalArgumentException("Unknown logical type: " + logicalTypeName);
++ }
++ }
++}
+diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/CometVectorizedParquetReader.java b/parquet/src/main/java/org/apache/iceberg/parquet/CometVectorizedParquetReader.java
+new file mode 100644
+index 000000000..a3cba4018
+--- /dev/null
++++ b/parquet/src/main/java/org/apache/iceberg/parquet/CometVectorizedParquetReader.java
+@@ -0,0 +1,260 @@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++package org.apache.iceberg.parquet;
++
++import java.io.IOException;
++import java.io.UncheckedIOException;
++import java.nio.ByteBuffer;
++import java.util.List;
++import java.util.Map;
++import java.util.NoSuchElementException;
++import java.util.function.Function;
++import org.apache.comet.parquet.FileReader;
++import org.apache.comet.parquet.ParquetColumnSpec;
++import org.apache.comet.parquet.ReadOptions;
++import org.apache.comet.parquet.RowGroupReader;
++import org.apache.comet.parquet.WrappedInputFile;
++import org.apache.hadoop.conf.Configuration;
++import org.apache.iceberg.Schema;
++import org.apache.iceberg.exceptions.RuntimeIOException;
++import org.apache.iceberg.expressions.Expression;
++import org.apache.iceberg.expressions.Expressions;
++import org.apache.iceberg.io.CloseableGroup;
++import org.apache.iceberg.io.CloseableIterable;
++import org.apache.iceberg.io.CloseableIterator;
++import org.apache.iceberg.io.InputFile;
++import org.apache.iceberg.mapping.NameMapping;
++import org.apache.iceberg.relocated.com.google.common.collect.Lists;
++import org.apache.iceberg.util.ByteBuffers;
++import org.apache.parquet.ParquetReadOptions;
++import org.apache.parquet.column.ColumnDescriptor;
++import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
++import org.apache.parquet.hadoop.metadata.ColumnPath;
++import org.apache.parquet.schema.MessageType;
++
++public class CometVectorizedParquetReader extends CloseableGroup
++ implements CloseableIterable {
++ private final InputFile input;
++ private final ParquetReadOptions options;
++ private final Schema expectedSchema;
++ private final Function> batchReaderFunc;
++ private final Expression filter;
++ private final boolean reuseContainers;
++ private final boolean caseSensitive;
++ private final int batchSize;
++ private final NameMapping nameMapping;
++ private final Map properties;
++ private Long start = null;
++ private Long length = null;
++ private ByteBuffer fileEncryptionKey = null;
++ private ByteBuffer fileAADPrefix = null;
++
++ public CometVectorizedParquetReader(
++ InputFile input,
++ Schema expectedSchema,
++ ParquetReadOptions options,
++ Function> readerFunc,
++ NameMapping nameMapping,
++ Expression filter,
++ boolean reuseContainers,
++ boolean caseSensitive,
++ int maxRecordsPerBatch,
++ Map properties,
++ Long start,
++ Long length,
++ ByteBuffer fileEncryptionKey,
++ ByteBuffer fileAADPrefix) {
++ this.input = input;
++ this.expectedSchema = expectedSchema;
++ this.options = options;
++ this.batchReaderFunc = readerFunc;
++ // replace alwaysTrue with null to avoid extra work evaluating a trivial filter
++ this.filter = filter == Expressions.alwaysTrue() ? null : filter;
++ this.reuseContainers = reuseContainers;
++ this.caseSensitive = caseSensitive;
++ this.batchSize = maxRecordsPerBatch;
++ this.nameMapping = nameMapping;
++ this.properties = properties;
++ this.start = start;
++ this.length = length;
++ this.fileEncryptionKey = fileEncryptionKey;
++ this.fileAADPrefix = fileAADPrefix;
++ }
++
++ private ReadConf conf = null;
++
++ private ReadConf init() {
++ if (conf == null) {
++ ReadConf readConf =
++ new ReadConf(
++ input,
++ options,
++ expectedSchema,
++ filter,
++ null,
++ batchReaderFunc,
++ nameMapping,
++ reuseContainers,
++ caseSensitive,
++ batchSize);
++ this.conf = readConf.copy();
++ return readConf;
++ }
++ return conf;
++ }
++
++ @Override
++ public CloseableIterator iterator() {
++ FileIterator iter =
++ new FileIterator<>(init(), properties, start, length, fileEncryptionKey, fileAADPrefix);
++ addCloseable(iter);
++ return iter;
++ }
++
++ private static class FileIterator implements CloseableIterator {
++ // private final ParquetFileReader reader;
++ private final boolean[] shouldSkip;
++ private final VectorizedReader model;
++ private final long totalValues;
++ private final int batchSize;
++ private final List