Skip to content

Commit 454c17f

Browse files
authored
Remove field-id constraint on add_files (#2662)
# Rationale for this change Closes #2131 The PR relaxes the constraint that prevented adding any file with field IDs, and replaces it with a constraint that prevents adding files which contain field IDs that are inconsistent with the field IDs of the table. If the field IDs are compatible, then they can be added safely, if not, they will be rejected. ## Are these changes tested? Yes ## Are there any user-facing changes? Yes
1 parent abae20f commit 454c17f

File tree

3 files changed

+92
-12
lines changed

3 files changed

+92
-12
lines changed

mkdocs/docs/api.md

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1006,8 +1006,10 @@ Expert Iceberg users may choose to commit existing parquet files to the Iceberg
10061006

10071007
<!-- prettier-ignore-start -->
10081008

1009-
!!! note "Name Mapping"
1010-
Because `add_files` uses existing files without writing new parquet files that are aware of the Iceberg's schema, it requires the Iceberg's table to have a [Name Mapping](https://iceberg.apache.org/spec/?h=name+mapping#name-mapping-serialization) (The Name mapping maps the field names within the parquet files to the Iceberg field IDs). Hence, `add_files` requires that there are no field IDs in the parquet file's metadata, and creates a new Name Mapping based on the table's current schema if the table doesn't already have one.
1009+
!!! note "Name Mapping and Field IDs"
1010+
`add_files` can work with Parquet files both with and without field IDs in their metadata:
1011+
- **Files with field IDs**: When field IDs are present in the Parquet metadata, they must match the corresponding field IDs in the Iceberg table schema. This is common for files generated by tools like Spark or when using or other libraries with explicit field ID metadata.
1012+
- **Files without field IDs**: When field IDs are absent, the table must have a [Name Mapping](https://iceberg.apache.org/spec/?h=name+mapping#name-mapping-serialization) to map field names to Iceberg field IDs. `add_files` will automatically create a Name Mapping based on the table's current schema if one doesn't already exist.
10111013

10121014
!!! note "Partitions"
10131015
`add_files` only requires the client to read the existing parquet files' metadata footer to infer the partition value of each file. This implementation also supports adding files to Iceberg tables with partition transforms like `MonthTransform`, and `TruncateTransform` which preserve the order of the values after the transformation (Any Transform that has the `preserves_order` property set to True is supported). Please note that if the column statistics of the `PartitionField`'s source column are not present in the parquet metadata, the partition value is inferred as `None`.

pyiceberg/io/pyarrow.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2708,6 +2708,7 @@ def _check_pyarrow_schema_compatible(
27082708
ValueError: If the schemas are not compatible.
27092709
"""
27102710
name_mapping = requested_schema.name_mapping
2711+
27112712
try:
27122713
provided_schema = pyarrow_to_schema(
27132714
provided_schema,
@@ -2738,10 +2739,6 @@ def parquet_file_to_data_file(io: FileIO, table_metadata: TableMetadata, file_pa
27382739
parquet_metadata = pq.read_metadata(input_stream)
27392740

27402741
arrow_schema = parquet_metadata.schema.to_arrow_schema()
2741-
if visit_pyarrow(arrow_schema, _HasIds()):
2742-
raise NotImplementedError(
2743-
f"Cannot add file {file_path} because it has field IDs. `add_files` only supports addition of files without field_ids"
2744-
)
27452742

27462743
schema = table_metadata.schema()
27472744
_check_pyarrow_schema_compatible(schema, arrow_schema, format_version=table_metadata.format_version)

tests/integration/test_add_files.py

Lines changed: 87 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -216,23 +216,45 @@ def test_add_files_to_unpartitioned_table_raises_file_not_found(
216216

217217

218218
@pytest.mark.integration
219-
def test_add_files_to_unpartitioned_table_raises_has_field_ids(
219+
def test_add_files_to_unpartitioned_table_with_field_ids(
220220
spark: SparkSession, session_catalog: Catalog, format_version: int
221221
) -> None:
222-
identifier = f"default.unpartitioned_raises_field_ids_v{format_version}"
222+
identifier = f"default.unpartitioned_with_field_ids_v{format_version}"
223223
tbl = _create_table(session_catalog, identifier, format_version)
224224

225-
file_paths = [f"s3://warehouse/default/unpartitioned_raises_field_ids/v{format_version}/test-{i}.parquet" for i in range(5)]
226-
# write parquet files
225+
file_paths = [f"s3://warehouse/default/unpartitioned_with_field_ids/v{format_version}/test-{i}.parquet" for i in range(5)]
226+
# write parquet files with field IDs matching the table schema
227227
for file_path in file_paths:
228228
fo = tbl.io.new_output(file_path)
229229
with fo.create(overwrite=True) as fos:
230230
with pq.ParquetWriter(fos, schema=ARROW_SCHEMA_WITH_IDS) as writer:
231231
writer.write_table(ARROW_TABLE_WITH_IDS)
232232

233233
# add the parquet files as data files
234-
with pytest.raises(NotImplementedError):
235-
tbl.add_files(file_paths=file_paths)
234+
tbl.add_files(file_paths=file_paths)
235+
236+
# NameMapping should still be set even though files have field IDs
237+
assert tbl.name_mapping() is not None
238+
239+
# Verify files were added successfully
240+
rows = spark.sql(
241+
f"""
242+
SELECT added_data_files_count, existing_data_files_count, deleted_data_files_count
243+
FROM {identifier}.all_manifests
244+
"""
245+
).collect()
246+
247+
assert [row.added_data_files_count for row in rows] == [5]
248+
assert [row.existing_data_files_count for row in rows] == [0]
249+
assert [row.deleted_data_files_count for row in rows] == [0]
250+
251+
# Verify data can be read back correctly
252+
df = spark.table(identifier).toPandas()
253+
assert len(df) == 5
254+
assert all(df["foo"] == True) # noqa: E712
255+
assert all(df["bar"] == "bar_string")
256+
assert all(df["baz"] == 123)
257+
assert all(df["qux"] == date(2024, 3, 7))
236258

237259

238260
@pytest.mark.integration
@@ -579,6 +601,65 @@ def test_add_files_fails_on_schema_mismatch(spark: SparkSession, session_catalog
579601
tbl.add_files(file_paths=[file_path])
580602

581603

604+
@pytest.mark.integration
605+
def test_add_files_with_field_ids_fails_on_schema_mismatch(
606+
spark: SparkSession, session_catalog: Catalog, format_version: int
607+
) -> None:
608+
"""Test that files with mismatched field types (when field IDs match) are rejected."""
609+
identifier = f"default.table_schema_mismatch_based_on_field_ids__fails_v{format_version}"
610+
611+
tbl = _create_table(session_catalog, identifier, format_version)
612+
613+
# All fields are renamed and reordered but have matching field IDs, so they should be compatible
614+
# except for 'baz' which has the wrong type
615+
WRONG_SCHEMA = pa.schema(
616+
[
617+
pa.field("qux_", pa.date32(), metadata={"PARQUET:field_id": "4"}),
618+
pa.field("baz_", pa.string(), metadata={"PARQUET:field_id": "3"}), # Wrong type: should be int32
619+
pa.field("bar_", pa.string(), metadata={"PARQUET:field_id": "2"}),
620+
pa.field("foo_", pa.bool_(), metadata={"PARQUET:field_id": "1"}),
621+
]
622+
)
623+
file_path = f"s3://warehouse/default/table_with_field_ids_schema_mismatch_fails/v{format_version}/test.parquet"
624+
# write parquet files
625+
fo = tbl.io.new_output(file_path)
626+
with fo.create(overwrite=True) as fos:
627+
with pq.ParquetWriter(fos, schema=WRONG_SCHEMA) as writer:
628+
writer.write_table(
629+
pa.Table.from_pylist(
630+
[
631+
{
632+
"qux_": date(2024, 3, 7),
633+
"baz_": "123",
634+
"bar_": "bar_string",
635+
"foo_": True,
636+
},
637+
{
638+
"qux_": date(2024, 3, 7),
639+
"baz_": "124",
640+
"bar_": "bar_string",
641+
"foo_": True,
642+
},
643+
],
644+
schema=WRONG_SCHEMA,
645+
)
646+
)
647+
648+
expected = """Mismatch in fields:
649+
┏━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
650+
┃ ┃ Table field ┃ Dataframe field ┃
651+
┡━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━┩
652+
│ ✅ │ 1: foo: optional boolean │ 1: foo_: optional boolean │
653+
│ ✅ │ 2: bar: optional string │ 2: bar_: optional string │
654+
│ ❌ │ 3: baz: optional int │ 3: baz_: optional string │
655+
│ ✅ │ 4: qux: optional date │ 4: qux_: optional date │
656+
└────┴──────────────────────────┴───────────────────────────┘
657+
"""
658+
659+
with pytest.raises(ValueError, match=expected):
660+
tbl.add_files(file_paths=[file_path])
661+
662+
582663
@pytest.mark.integration
583664
def test_add_files_with_large_and_regular_schema(spark: SparkSession, session_catalog: Catalog, format_version: int) -> None:
584665
identifier = f"default.unpartitioned_with_large_types{format_version}"

0 commit comments

Comments
 (0)