Skip to content

Commit 779a485

Browse files
authored
Cosmos Spark: Changing inferSchema.forceNullableProperties default to true (Azure#22049)
* Changing default * Docs * Tests * new test * doc update * Change log
1 parent 940c22d commit 779a485

File tree

4 files changed

+112
-2
lines changed

4 files changed

+112
-2
lines changed

sdk/cosmos/azure-cosmos-spark_3-1_2-12/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
## Release History
22

33
### 4.2.0-beta.1 (Unreleased)
4+
#### Configuration Changes
5+
* Changed the default value of `spark.cosmos.read.inferSchema.forceNullableProperties` from `false` to `true` based on user feedback, see [PR](https://github.com/Azure/azure-sdk-for-java/pull/22049).
46

57
### 4.1.0 (2021-05-27)
68
#### New Features

sdk/cosmos/azure-cosmos-spark_3-1_2-12/docs/configuration-reference.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ When doing read operations, users can specify a custom schema or allow the conne
4848
| `spark.cosmos.read.inferSchema.samplingSize` | `1000` | Sampling size to use when inferring schema and not using a query. |
4949
| `spark.cosmos.read.inferSchema.includeSystemProperties` | `false` | When schema inference is enabled, whether the resulting schema will include all [Cosmos DB system properties](https://docs.microsoft.com/azure/cosmos-db/account-databases-containers-items#properties-of-an-item). |
5050
| `spark.cosmos.read.inferSchema.includeTimestamp` | `false` | When schema inference is enabled, whether the resulting schema will include the document Timestamp (`_ts`). Not required if `spark.cosmos.read.inferSchema.includeSystemProperties` is enabled, as it will already include all system properties. |
51-
| `spark.cosmos.read.inferSchema.forceNullableProperties` | `false` | When schema inference is enabled, whether the resulting schema will make all columns nullable. By default whether inferred columns are treated as nullable or not will depend on whether any record in the sample set has null-values within a column. If set to `true` all columns will be treated as nullable even if all rows within the sample set have non-null values. |
51+
| `spark.cosmos.read.inferSchema.forceNullableProperties` | `true` | When schema inference is enabled, whether the resulting schema will make all columns nullable. By default, all columns (except cosmos system properties) will be treated as nullable even if all rows within the sample set have non-null values. When disabled, the inferred columns are treated as nullable or not depending on whether any record in the sample set has null-values within a column. |
5252

5353
#### Json conversion configuration
5454

sdk/cosmos/azure-cosmos-spark_3-1_2-12/src/main/scala/com/azure/cosmos/spark/CosmosConfig.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -533,7 +533,7 @@ private object CosmosSchemaInferenceConfig {
533533
private val inferSchemaForceNullableProperties = CosmosConfigEntry[Boolean](
534534
key = CosmosConfigNames.ReadInferSchemaForceNullableProperties,
535535
mandatory = false,
536-
defaultValue = Some(false),
536+
defaultValue = Some(true),
537537
parseFromStringFunction = include => include.toBoolean,
538538
helpMessage = "Whether schema inference should enforce inferred properties to be nullable - even when no null-values are contained in the sample set")
539539

sdk/cosmos/azure-cosmos-spark_3-1_2-12/src/test/scala/com/azure/cosmos/spark/SparkE2EQueryITest.scala

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -460,6 +460,57 @@ class SparkE2EQueryITest
460460
fieldNames.contains(CosmosTableSchemaInferrer.AttachmentsAttributeName) shouldBe false
461461
}
462462

463+
"spark query" can "when forceNullableProperties is false and rows have different schema" in {
464+
val cosmosEndpoint = TestConfigurations.HOST
465+
val cosmosMasterKey = TestConfigurations.MASTER_KEY
466+
val samplingSize = 100
467+
val expectedResults = samplingSize * 2
468+
val container = cosmosClient.getDatabase(cosmosDatabase).getContainer(cosmosContainer)
469+
470+
// Inserting documents with slightly different schema
471+
for( _ <- 1 to expectedResults) {
472+
val objectNode = Utils.getSimpleObjectMapper.createObjectNode()
473+
val arr = objectNode.putArray("object_array")
474+
val nested = Utils.getSimpleObjectMapper.createObjectNode()
475+
nested.put("A", "test")
476+
nested.put("B", "test")
477+
arr.add(nested)
478+
objectNode.put("id", UUID.randomUUID().toString)
479+
container.createItem(objectNode).block()
480+
}
481+
482+
for( _ <- 1 to samplingSize) {
483+
val objectNode2 = Utils.getSimpleObjectMapper.createObjectNode()
484+
val arr = objectNode2.putArray("object_array")
485+
val nested = Utils.getSimpleObjectMapper.createObjectNode()
486+
nested.put("A", "test")
487+
arr.add(nested)
488+
objectNode2.put("id", UUID.randomUUID().toString)
489+
container.createItem(objectNode2).block()
490+
}
491+
492+
val cfgWithInference = Map("spark.cosmos.accountEndpoint" -> cosmosEndpoint,
493+
"spark.cosmos.accountKey" -> cosmosMasterKey,
494+
"spark.cosmos.database" -> cosmosDatabase,
495+
"spark.cosmos.container" -> cosmosContainer,
496+
"spark.cosmos.read.inferSchema.enabled" -> "true",
497+
"spark.cosmos.read.inferSchema.forceNullableProperties" -> "false",
498+
"spark.cosmos.read.inferSchema.samplingSize" -> samplingSize.toString,
499+
"spark.cosmos.read.inferSchema.query" -> "SELECT * FROM c ORDER BY c._ts",
500+
"spark.cosmos.read.partitioning.strategy" -> "Restrictive"
501+
)
502+
503+
val dfWithInference = spark.read.format("cosmos.oltp").options(cfgWithInference).load()
504+
try {
505+
dfWithInference.collect()
506+
fail("Should have thrown an exception")
507+
}
508+
catch {
509+
case inner: Exception =>
510+
inner.toString.contains("The 1th field 'B' of input row cannot be null") shouldBe true
511+
}
512+
}
513+
463514
"spark query" can "use custom sampling size" in {
464515
val cosmosEndpoint = TestConfigurations.HOST
465516
val cosmosMasterKey = TestConfigurations.MASTER_KEY
@@ -580,6 +631,7 @@ class SparkE2EQueryITest
580631
"spark.cosmos.accountKey" -> cosmosMasterKey,
581632
"spark.cosmos.database" -> cosmosDatabase,
582633
"spark.cosmos.container" -> cosmosContainer,
634+
"spark.cosmos.read.inferSchema.forceNullableProperties" -> "false",
583635
"spark.cosmos.read.partitioning.strategy" -> "Restrictive"
584636
)
585637

@@ -652,6 +704,62 @@ class SparkE2EQueryITest
652704
fieldNames.contains(CosmosTableSchemaInferrer.AttachmentsAttributeName) shouldBe false
653705
}
654706

707+
"spark query" can "return proper Cosmos specific query plan on explain with nullable properties" in {
708+
val cosmosEndpoint = TestConfigurations.HOST
709+
val cosmosMasterKey = TestConfigurations.MASTER_KEY
710+
711+
val id = UUID.randomUUID().toString
712+
713+
val rawItem = s"""
714+
| {
715+
| "id" : "${id}",
716+
| "nestedObject" : {
717+
| "prop1" : 5,
718+
| "prop2" : "6"
719+
| }
720+
| }
721+
|""".stripMargin
722+
723+
val objectNode = objectMapper.readValue(rawItem, classOf[ObjectNode])
724+
725+
val container = cosmosClient.getDatabase(cosmosDatabase).getContainer(cosmosContainer)
726+
container.createItem(objectNode).block()
727+
728+
val cfg = Map("spark.cosmos.accountEndpoint" -> cosmosEndpoint,
729+
"spark.cosmos.accountKey" -> cosmosMasterKey,
730+
"spark.cosmos.database" -> cosmosDatabase,
731+
"spark.cosmos.container" -> cosmosContainer,
732+
"spark.cosmos.read.inferSchema.forceNullableProperties" -> "true",
733+
"spark.cosmos.read.partitioning.strategy" -> "Restrictive"
734+
)
735+
736+
val df = spark.read.format("cosmos.oltp").options(cfg).load()
737+
val rowsArray = df.where("nestedObject.prop2 = '6'").collect()
738+
rowsArray should have size 1
739+
740+
var output = new java.io.ByteArrayOutputStream()
741+
Console.withOut(output) {
742+
df.explain()
743+
}
744+
var queryPlan = output.toString.replaceAll("#\\d+", "#x")
745+
logInfo(s"Query Plan: $queryPlan")
746+
queryPlan.contains("Cosmos Query: SELECT * FROM r") shouldEqual true
747+
748+
output = new java.io.ByteArrayOutputStream()
749+
Console.withOut(output) {
750+
df.where("nestedObject.prop2 = '6'").explain()
751+
}
752+
queryPlan = output.toString.replaceAll("#\\d+", "#x")
753+
logInfo(s"Query Plan: $queryPlan")
754+
val expected = s"Cosmos Query: SELECT * FROM r WHERE NOT(IS_NULL(r['nestedObject'])) " +
755+
s"AND r['nestedObject']['prop2']=" +
756+
s"@param0${System.getProperty("line.separator")} > param: @param0 = 6"
757+
queryPlan.contains(expected) shouldEqual true
758+
759+
val item = rowsArray(0)
760+
item.getAs[String]("id") shouldEqual id
761+
}
762+
655763
//scalastyle:on magic.number
656764
//scalastyle:on multiple.string.literals
657765
}

0 commit comments

Comments
 (0)