Skip to content

Conversation

@parthchandra
Copy link
Contributor

Which issue does this PR close?

Part of the changes needed for #2060
Mostly does cleanup of the native_iceberg_compat APIs so the they do not have Parquet classes. As a plus provides a utility class to allow ParquetMetadata to be serialized and deserialized to/from the Thrift format. This will also be useful in passing ParquetMetadata from JVM to native (for all native scan implementations). Currently the native scans end up reading Parquet metadata again (even though it has already been read in the JVM side) and this can be a costly operation in object stores.

@parthchandra parthchandra marked this pull request as draft November 3, 2025 20:56
@codecov-commenter
Copy link

codecov-commenter commented Nov 3, 2025

Codecov Report

❌ Patch coverage is 0% with 148 lines in your changes missing coverage. Please review.
✅ Project coverage is 58.31%. Comparing base (f09f8af) to head (25d8a37).
⚠️ Report is 692 commits behind head on main.

Files with missing lines Patch % Lines
...va/org/apache/comet/parquet/NativeBatchReader.java 0.00% 112 Missing ⚠️
...e/comet/parquet/IcebergCometNativeBatchReader.java 0.00% 22 Missing ⚠️
...pache/comet/parquet/ParquetMetadataSerializer.java 0.00% 13 Missing ⚠️
...org/apache/comet/parquet/AbstractColumnReader.java 0.00% 1 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##               main    #2680      +/-   ##
============================================
+ Coverage     56.12%   58.31%   +2.18%     
- Complexity      976     1457     +481     
============================================
  Files           119      166      +47     
  Lines         11743    14130    +2387     
  Branches       2251     2395     +144     
============================================
+ Hits           6591     8240    +1649     
- Misses         4012     4690     +678     
- Partials       1140     1200      +60     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@parthchandra parthchandra marked this pull request as ready for review November 3, 2025 23:56
hdfs-sys = {version = "0.3", optional = true, features = ["hdfs_3_3"]}
opendal = { version ="0.54.1", optional = true, features = ["services-hdfs"] }
uuid = "1.0"
opendal = { version ="0.54.0", optional = true, features = ["services-hdfs"] }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a reason for this change? Comet could still choose to use 0.54.1 since it is semver compatible

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this happened due to rebasing. Reverted.

Copy link
Member

@andygrove andygrove left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks @parthchandra

filteredSchema = filteredSchema.add(sparkFields[i]);
}
}
sparkSchema = filteredSchema;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible that the filtering done here may lead to ArrayIndexOutOfBoundsException at https://github.com/parthchandra/datafusion-comet/blob/d73bcbab9f80836d7229207f309283942501e9ab/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java#L985 ?
Now the sparkSchema may have less fields than before I see no new logic to protect the .fields()[i] call there.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you're right. This is not entirely correct. Let me fix this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup. Fixed to match the fields by name.

import org.apache.spark.sql.types.StructType;

/**
* A specialized NativeBatchReader for Iceberg that accepts ParquetMetadata as a JSON string. This
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

accepts ParquetMetadata as a JSON string - actually it accepts byte[] parquetMetadataBytes at https://github.com/apache/datafusion-comet/pull/2680/files#diff-e57878f6cd8036999500de5719f8f4bbe28e1ed5dcb79a02ad7d7eb206f37473R44, i.e. not a String but bytes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for catching this. The first version I did used JSON, but this is more efficient.

@andygrove andygrove changed the title chore: various refactoring changes for iceberg chore: various refactoring changes for iceberg [iceberg] Nov 6, 2025
@martin-g
Copy link
Member

martin-g commented Nov 7, 2025

@parthchandra You said Done but I see no new commits in the PR. Did the push fail ?

@parthchandra
Copy link
Contributor Author

@parthchandra You said Done but I see no new commits in the PR. Did the push fail ?

Oops. I had pushed to the wrong branch :(. Corrected.

Copy link
Member

@martin-g martin-g left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it need unit tests for the new classes ?

}

// String timeZoneId = conf.get("spark.sql.session.timeZone");
String timeZoneId = "UTC";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this intentional ?
If it is then either move the comment below one line up or add a new comment why timeZoneId should be also always UTC. The commented out conf.get("spark.sql.session.timeZone"); could be removed too.

DataType dataType = null;
int sparkSchemaIndex = -1;
for (int j = 0; j < sparkFields.length; j++) {
if (sparkFields[j].name().equals(field.getName())) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this equality check take into account spark.sql.caseSensitive ?
If it is sensitive then it could be optimized by storing the sparkFields in a Map<String, Field> and lookup by name here instead of looping over them for each field

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

i < preInitializedReaders.length && preInitializedReaders[i] != null;
int finalI = i;
boolean existsInFileSchema =
fileFields.stream().anyMatch(f -> f.getName().equals(sparkFields[finalI].name()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this equality check take into account spark.sql.caseSensitive ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it should. Fixed

Path path = new Path(new URI(filePath));
try (FileReader fileReader =
new FileReader(
CometInputFile.fromPath(path, conf), footer, readOptions, cometReadOptions, metrics)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got removed accidentally. Thanks for catching this!

this.sparkSchema = requiredSchema;
}

/** Initialize the reader using FileInfo instead of PartitionedFile. */
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/** Initialize the reader using FileInfo instead of PartitionedFile. */
/** Initialize the reader using FileInfo instead of PartitionedFile. */
@Override

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not an override. The parent init method has a different signature.

ConstantColumnReader reader =
new ConstantColumnReader(nonPartitionFields[i], capacity, useDecimal128);
columnReaders[i] = reader;
if (preInitializedReaders != null && preInitializedReaders[i] != null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (preInitializedReaders != null && preInitializedReaders[i] != null) {
if (preInitializedReaders != null && i < preInitializedReaders.length && preInitializedReaders[i] != null) {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

int columnIndex = getColumnIndexFromParquetColumn(column);
if (columnIndex == -1
|| preInitializedReaders == null
|| preInitializedReaders[columnIndex] == null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This probably needs a check for boundaries before trying to access this index.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

return fileSize;
}

public URI pathUri() throws Exception {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public URI pathUri() throws Exception {
public URI pathUri() throws URISyntaxException {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@parthchandra
Copy link
Contributor Author

Does it need unit tests for the new classes ?

Functionality is mostly covered by Comet tests and running Iceberg tests with Comet enabled.

@parthchandra parthchandra merged commit 35a99e0 into apache:main Nov 13, 2025
117 checks passed
@parthchandra
Copy link
Contributor Author

Merged. Thanks @martin-g, @andygrove .

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants