Skip to content

Commit 1c47518

Browse files
Cosmos-Spark: Ported the NYC-Taxi-Data end-to-end sample from pyspark to scala (Azure#26969)
1 parent 0d749b0 commit 1c47518

File tree

3 files changed

+399
-0
lines changed

3 files changed

+399
-0
lines changed

sdk/cosmos/azure-cosmos-spark_3_2-12/Samples/Python/NYC-Taxi-Data/01_Batch.ipynb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -438,6 +438,7 @@
438438
"\n",
439439
"print(\"Starting count validation via query: \", datetime.datetime.utcnow().strftime(\"%Y-%m-%d %H:%M:%S.%f\"))\n",
440440
"count_query_schema=StructType(fields=[StructField(\"Count\", LongType(), True)])\n",
441+
"readCfg[\"spark.cosmos.read.customQuery\"] = \"SELECT COUNT(0) AS Count FROM c\"\n",
441442
"query_df = spark.read.format(\"cosmos.oltp\").schema(count_query_schema).options(**readCfg).load()\n",
442443
"count_query = query_df.select(F.sum(\"Count\").alias(\"TotalCount\")).first()[\"TotalCount\"]\n",
443444
"print(\"Number of records retrieved via query: \", count_query) \n",
Lines changed: 354 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,354 @@
1+
// Databricks notebook source
2+
// MAGIC %md
3+
// MAGIC **Secrets**
4+
// MAGIC
5+
// MAGIC The secrets below like the Cosmos account key are retrieved from a secret scope. If you don't have defined a secret scope for a Cosmos Account you want to use when going through this sample you can find the instructions on how to create one here:
6+
// MAGIC - Here you can [Create a new secret scope](./#secrets/createScope) for the current Databricks workspace
7+
// MAGIC - See how you can create an [Azure Key Vault backed secret scope](https://docs.microsoft.com/azure/databricks/security/secrets/secret-scopes#--create-an-azure-key-vault-backed-secret-scope)
8+
// MAGIC - See how you can create a [Databricks backed secret scope](https://docs.microsoft.com/azure/databricks/security/secrets/secret-scopes#create-a-databricks-backed-secret-scope)
9+
// MAGIC - And here you can find information on how to [add secrets to your Spark configuration](https://docs.microsoft.com/azure/databricks/security/secrets/secrets#read-a-secret)
10+
// MAGIC If you don't want to use secrets at all you can of course also just assign the values in clear-text below - but for obvious reasons we recommend the usage of secrets.
11+
12+
// COMMAND ----------
13+
14+
val cosmosEndpoint = spark.conf.get("spark.cosmos.accountEndpoint")
15+
val cosmosMasterKey = spark.conf.get("spark.cosmos.accountKey")
16+
17+
// COMMAND ----------
18+
19+
// MAGIC %md
20+
// MAGIC **Preparation - creating the Cosmos DB container to ingest the data into**
21+
// MAGIC
22+
// MAGIC Configure the Catalog API to be used
23+
24+
// COMMAND ----------
25+
26+
spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog")
27+
spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", cosmosEndpoint)
28+
spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", cosmosMasterKey)
29+
spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.views.repositoryPath", "/viewDefinitions")
30+
31+
// COMMAND ----------
32+
33+
// MAGIC %md
34+
// MAGIC And execute the command to create the new container with a throughput of up-to 100,000 RU (Autoscale - so 10,000 - 100,000 RU based on scale) and only system properties (like /id) being indexed. We will also create a second container that will be used to store metadata for the global throughput control
35+
36+
// COMMAND ----------
37+
38+
// MAGIC %sql
39+
// MAGIC CREATE DATABASE IF NOT EXISTS cosmosCatalog.SampleDatabase;
40+
// MAGIC
41+
// MAGIC CREATE TABLE IF NOT EXISTS cosmosCatalog.SampleDatabase.GreenTaxiRecords
42+
// MAGIC USING cosmos.oltp
43+
// MAGIC TBLPROPERTIES(partitionKeyPath = '/id', autoScaleMaxThroughput = '100000', indexingPolicy = 'OnlySystemProperties');
44+
// MAGIC
45+
// MAGIC CREATE TABLE IF NOT EXISTS cosmosCatalog.SampleDatabase.GreenTaxiRecordsCFSink
46+
// MAGIC USING cosmos.oltp
47+
// MAGIC TBLPROPERTIES(partitionKeyPath = '/id', autoScaleMaxThroughput = '100000', indexingPolicy = 'OnlySystemProperties');
48+
// MAGIC
49+
// MAGIC /* NOTE: It is important to enable TTL (can be off/-1 by default) on the throughput control container */
50+
// MAGIC CREATE TABLE IF NOT EXISTS cosmosCatalog.SampleDatabase.ThroughputControl
51+
// MAGIC USING cosmos.oltp
52+
// MAGIC OPTIONS(spark.cosmos.database = 'SampleDatabase')
53+
// MAGIC TBLPROPERTIES(partitionKeyPath = '/groupId', autoScaleMaxThroughput = '4000', indexingPolicy = 'AllProperties', defaultTtlInSeconds = '-1');
54+
55+
// COMMAND ----------
56+
57+
// MAGIC %md
58+
// MAGIC **Preparation - loading data source "[NYC Taxi & Limousine Commission - green taxi trip records](https://azure.microsoft.com/services/open-datasets/catalog/nyc-taxi-limousine-commission-green-taxi-trip-records/)"**
59+
// MAGIC
60+
// MAGIC The green taxi trip records include fields capturing pick-up and drop-off dates/times, pick-up and drop-off locations, trip distances, itemized fares, rate types, payment types, and driver-reported passenger counts. This data set has over 80 million records (>8 GB) of data and is available via a publicly accessible Azure Blob Storage Account located in the East-US Azure region.
61+
62+
// COMMAND ----------
63+
64+
import java.time.Instant;
65+
import java.time.ZoneId;
66+
import java.time.ZoneOffset;
67+
import java.time.format.DateTimeFormatter;
68+
import java.util.UUID
69+
70+
val formatter = DateTimeFormatter.ISO_LOCAL_DATE_TIME.withZone(ZoneId.from(ZoneOffset.UTC));
71+
72+
println(s"Starting preparation: ${formatter.format(Instant.now)}")
73+
74+
// Azure storage access info
75+
val blob_account_name = "azureopendatastorage"
76+
val blob_container_name = "nyctlc"
77+
val blob_relative_path = "green"
78+
val blob_sas_token = ""
79+
// Allow SPARK to read from Blob remotely
80+
val wasbs_path = s"wasbs://${blob_container_name}@${blob_account_name}.blob.core.windows.net/${blob_relative_path}"
81+
spark.conf.set(
82+
s"fs.azure.sas.${blob_container_name}.$blob_account_name{}.blob.core.windows.net",
83+
blob_sas_token)
84+
print(s"Remote blob path: ${wasbs_path}")
85+
// SPARK read parquet, note that it won't load any data yet by now
86+
// NOTE - if you want to experiment with larger dataset sizes - consider switching to Option B (commenting code
87+
// for Option A/uncommenting code for option B) the lines below or increase the value passed into the
88+
// limit function restricting the dataset size below
89+
90+
// ------------------------------------------------------------------------------------
91+
// Option A - with limited dataset size
92+
// ------------------------------------------------------------------------------------
93+
val df_rawInputWithoutLimit = spark.read.parquet(wasbs_path)
94+
val partitionCount = df_rawInputWithoutLimit.rdd.getNumPartitions
95+
val df_rawInput = df_rawInputWithoutLimit.limit(1000 * 1000).repartition(partitionCount)
96+
df_rawInput.persist()
97+
98+
// ------------------------------------------------------------------------------------
99+
// Option B - entire dataset
100+
// ------------------------------------------------------------------------------------
101+
// val df_rawInput = spark.read.parquet(wasbs_path)
102+
103+
// Adding an id column with unique values
104+
val uuidUdf=udf[String](() => UUID.randomUUID().toString)
105+
val df_input_withId = df_rawInput.withColumn("id", uuidUdf())
106+
107+
print("Register the DataFrame as a SQL temporary view: source")
108+
df_input_withId.createOrReplaceTempView("source")
109+
print("Finished preparation: ${formatter.format(Instant.now)}")
110+
111+
// COMMAND ----------
112+
113+
// MAGIC %md
114+
// MAGIC ** Sample - ingesting the NYC Green Taxi data into Cosmos DB**
115+
// MAGIC
116+
// MAGIC By setting the target throughput threshold to 0.95 (95%) we reduce throttling but still allow the ingestion to consume most of the provisioned throughput. For scenarios where ingestion should only take a smaller subset of the available throughput this threshold can be reduced accordingly.
117+
118+
// COMMAND ----------
119+
120+
println(s"Starting ingestion: ${formatter.format(Instant.now)}")
121+
122+
val writeCfg = Map(
123+
"spark.cosmos.accountEndpoint" -> cosmosEndpoint,
124+
"spark.cosmos.accountKey" -> cosmosMasterKey,
125+
"spark.cosmos.database" -> "SampleDatabase",
126+
"spark.cosmos.container" -> "GreenTaxiRecords",
127+
"spark.cosmos.write.strategy" -> "ItemOverwrite",
128+
"spark.cosmos.write.bulk.enabled" -> "true",
129+
"spark.cosmos.throughputControl.enabled" -> "true",
130+
"spark.cosmos.throughputControl.name" -> "NYCGreenTaxiDataIngestion",
131+
"spark.cosmos.throughputControl.targetThroughputThreshold" -> "0.95",
132+
"spark.cosmos.throughputControl.globalControl.database" -> "SampleDatabase",
133+
"spark.cosmos.throughputControl.globalControl.container" -> "ThroughputControl",
134+
)
135+
136+
val df_NYCGreenTaxi_Input = spark.sql("SELECT * FROM source")
137+
138+
df_NYCGreenTaxi_Input
139+
.write
140+
.format("cosmos.oltp")
141+
.mode("Append")
142+
.options(writeCfg)
143+
.save()
144+
145+
println(s"Finished ingestion: ${formatter.format(Instant.now)}")
146+
147+
// COMMAND ----------
148+
149+
// MAGIC %md
150+
// MAGIC **Getting the reference record count**
151+
152+
// COMMAND ----------
153+
154+
val count_source = spark.sql("SELECT * FROM source").count()
155+
println(s"Number of records in source: ${count_source}")
156+
157+
// COMMAND ----------
158+
159+
// MAGIC %md
160+
// MAGIC **Sample - validating the record count via query**
161+
162+
// COMMAND ----------
163+
164+
import org.apache.spark.sql.types._
165+
import org.apache.spark.sql.functions._
166+
167+
println(s"Starting validation via query: ${formatter.format(Instant.now)}")
168+
val readCfg = Map(
169+
"spark.cosmos.accountEndpoint" -> cosmosEndpoint,
170+
"spark.cosmos.accountKey" -> cosmosMasterKey,
171+
"spark.cosmos.database" -> "SampleDatabase",
172+
"spark.cosmos.container" -> "GreenTaxiRecords",
173+
"spark.cosmos.read.partitioning.strategy" -> "Restrictive", // IMPORTANT - any other partitioning strategy will result in indexing not being use to count - so latency and RU would spike up
174+
"spark.cosmos.read.inferSchema.enabled" -> "false",
175+
"spark.cosmos.read.customQuery" -> "SELECT COUNT(0) AS Count FROM c"
176+
)
177+
178+
val count_query_schema=StructType(Array(StructField("Count", LongType, true)))
179+
val query_df = spark.read.format("cosmos.oltp").schema(count_query_schema).options(readCfg).load()
180+
val count_query = query_df.agg(sum("Count").as("TotalCount")).first.getLong(0)
181+
println(s"Number of records retrieved via query: ${count_query}")
182+
println(s"Finished validation via query: ${formatter.format(Instant.now)}")
183+
184+
assert(count_source == count_query)
185+
186+
// COMMAND ----------
187+
188+
// MAGIC %md
189+
// MAGIC **Sample - validating the record count via change feed**
190+
191+
// COMMAND ----------
192+
193+
println(s"Starting validation via change feed: ${formatter.format(Instant.now)}")
194+
val changeFeedCfg = Map(
195+
"spark.cosmos.accountEndpoint" -> cosmosEndpoint,
196+
"spark.cosmos.accountKey" -> cosmosMasterKey,
197+
"spark.cosmos.database" -> "SampleDatabase",
198+
"spark.cosmos.container" -> "GreenTaxiRecords",
199+
"spark.cosmos.read.partitioning.strategy" -> "Default",
200+
"spark.cosmos.read.inferSchema.enabled" -> "false",
201+
"spark.cosmos.changeFeed.startFrom" -> "Beginning",
202+
"spark.cosmos.changeFeed.mode" -> "Incremental"
203+
)
204+
val changeFeed_df = spark.read.format("cosmos.oltp.changeFeed").options(changeFeedCfg).load()
205+
val count_changeFeed = changeFeed_df.count()
206+
println(s"Number of records retrieved via change feed: ${count_changeFeed}")
207+
println(s"Finished validation via change feed: ${formatter.format(Instant.now)}")
208+
209+
assert(count_source == count_changeFeed)
210+
211+
// COMMAND ----------
212+
213+
// MAGIC %md
214+
// MAGIC **Sample - bulk deleting documents and validating document count afterwards**
215+
216+
// COMMAND ----------
217+
218+
import scala.math._
219+
220+
println(s"Starting to identify to be deleted documents: ${formatter.format(Instant.now)}")
221+
val readCfg = Map(
222+
"spark.cosmos.accountEndpoint" -> cosmosEndpoint,
223+
"spark.cosmos.accountKey" -> cosmosMasterKey,
224+
"spark.cosmos.database" -> "SampleDatabase",
225+
"spark.cosmos.container" -> "GreenTaxiRecords",
226+
"spark.cosmos.read.partitioning.strategy" -> "Default",
227+
"spark.cosmos.read.inferSchema.enabled" -> "false",
228+
)
229+
230+
val toBeDeleted_df = spark.read.format("cosmos.oltp").options(readCfg).load().limit(100000)
231+
println(s"Number of records to be deleted: ${toBeDeleted_df.count}")
232+
233+
println(s"Starting to bulk delete documents: ${formatter.format(Instant.now)}")
234+
val deleteCfg = writeCfg + ("spark.cosmos.write.strategy" -> "ItemDelete")
235+
toBeDeleted_df
236+
.write
237+
.format("cosmos.oltp")
238+
.mode("Append")
239+
.options(deleteCfg)
240+
.save()
241+
println(s"Finished deleting documents: ${formatter.format(Instant.now)}")
242+
243+
println(s"Starting count validation via query: ${formatter.format(Instant.now)}")
244+
val countCfg = readCfg + ("spark.cosmos.read.customQuery" -> "SELECT COUNT(0) AS Count FROM c")
245+
val count_query_schema=StructType(Array(StructField("Count", LongType, true)))
246+
val query_df = spark.read.format("cosmos.oltp").schema(count_query_schema).options(countCfg).load()
247+
val count_query = query_df.agg(sum("Count").as("TotalCount")).first.getLong(0)
248+
println(s"Number of records retrieved via query: ${count_query}")
249+
println(s"Finished count validation via query: ${formatter.format(Instant.now)}")
250+
251+
assert (math.max(0, count_source - 100000) == count_query)
252+
253+
// COMMAND ----------
254+
255+
// MAGIC %md
256+
// MAGIC **Sample - showing the existing Containers**
257+
258+
// COMMAND ----------
259+
260+
// MAGIC %sql
261+
// MAGIC SHOW TABLES FROM cosmosCatalog.SampleDatabase
262+
263+
// COMMAND ----------
264+
265+
val df_Tables = spark.sql("SHOW TABLES FROM cosmosCatalog.SampleDatabase")
266+
assert(df_Tables.count() == 3)
267+
268+
// COMMAND ----------
269+
270+
// MAGIC %md
271+
// MAGIC **Sample - querying a Cosmos Container via Spark Catalog**
272+
273+
// COMMAND ----------
274+
275+
// MAGIC %sql
276+
// MAGIC SELECT * FROM cosmosCatalog.SampleDatabase.GreenTaxiRecords LIMIT 10
277+
278+
// COMMAND ----------
279+
280+
// MAGIC %md
281+
// MAGIC **Sample - querying a Cosmos Container with custom settings via Spark Catalog**
282+
283+
// COMMAND ----------
284+
285+
// MAGIC %md
286+
// MAGIC Creating the view with custom settings (in this case adding a projection, disabling schema inference and switching to aggressive partitioning strategy)
287+
288+
// COMMAND ----------
289+
290+
// MAGIC %sql
291+
// MAGIC CREATE TABLE cosmosCatalog.SampleDatabase.GreenTaxiRecordsView
292+
// MAGIC (id STRING, _ts TIMESTAMP, vendorID INT, totalAmount DOUBLE)
293+
// MAGIC USING cosmos.oltp
294+
// MAGIC TBLPROPERTIES(isCosmosView = 'True')
295+
// MAGIC OPTIONS (
296+
// MAGIC spark.cosmos.database = 'SampleDatabase',
297+
// MAGIC spark.cosmos.container = 'GreenTaxiRecords',
298+
// MAGIC spark.cosmos.read.inferSchema.enabled = 'False',
299+
// MAGIC spark.cosmos.read.inferSchema.includeSystemProperties = 'True',
300+
// MAGIC spark.cosmos.read.partitioning.strategy = 'Aggressive');
301+
// MAGIC
302+
// MAGIC SELECT * FROM cosmosCatalog.SampleDatabase.GreenTaxiRecordsView LIMIT 10
303+
304+
// COMMAND ----------
305+
306+
// MAGIC %md
307+
// MAGIC Creating another view with custom settings (in this case enabling schema inference and switching to restrictive partitioning strategy)
308+
309+
// COMMAND ----------
310+
311+
// MAGIC %sql
312+
// MAGIC CREATE TABLE cosmosCatalog.SampleDatabase.GreenTaxiRecordsAnotherView
313+
// MAGIC USING cosmos.oltp
314+
// MAGIC TBLPROPERTIES(isCosmosView = 'True')
315+
// MAGIC OPTIONS (
316+
// MAGIC spark.cosmos.database = 'SampleDatabase',
317+
// MAGIC spark.cosmos.container = 'GreenTaxiRecords',
318+
// MAGIC spark.cosmos.read.inferSchema.enabled = 'True',
319+
// MAGIC spark.cosmos.read.inferSchema.includeSystemProperties = 'False',
320+
// MAGIC spark.cosmos.read.partitioning.strategy = 'Restrictive');
321+
// MAGIC
322+
// MAGIC SELECT * FROM cosmosCatalog.SampleDatabase.GreenTaxiRecordsAnotherView LIMIT 10
323+
324+
// COMMAND ----------
325+
326+
// MAGIC %md
327+
// MAGIC Show all Tables in the Cosmos Catalog to show that both the "real" Containers as well as the views show-up
328+
329+
// COMMAND ----------
330+
331+
// MAGIC %sql
332+
// MAGIC SHOW TABLES FROM cosmosCatalog.SampleDatabase
333+
334+
// COMMAND ----------
335+
336+
val df_Tables = spark.sql("SHOW TABLES FROM cosmosCatalog.SampleDatabase")
337+
assert(df_Tables.count() == 5)
338+
339+
// COMMAND ----------
340+
341+
// MAGIC %md
342+
// MAGIC **Cleanup the views again**
343+
344+
// COMMAND ----------
345+
346+
// MAGIC %sql
347+
// MAGIC DROP TABLE IF EXISTS cosmosCatalog.SampleDatabase.GreenTaxiRecordsView;
348+
// MAGIC DROP TABLE IF EXISTS cosmosCatalog.SampleDatabase.GreenTaxiRecordsAnotherView;
349+
// MAGIC SHOW TABLES FROM cosmosCatalog.SampleDatabase
350+
351+
// COMMAND ----------
352+
353+
val df_Tables = spark.sql("SHOW TABLES FROM cosmosCatalog.SampleDatabase")
354+
assert(df_Tables.count() == 3)

0 commit comments

Comments
 (0)