Skip to content

Commit 39d22c1

Browse files
committed
Added TPCDS connector package code as the actual example and added the relevant description to the README of Minimal connector
1 parent b76ad05 commit 39d22c1

32 files changed

+2100
-0
lines changed

GlueCustomConnectors/development/Spark/glue-3.0/README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
## Introduction
44
This document shows how to develop a connector supporting Glue 3.0 and Spark 3.1.1 for reading and writing data. We’ll use the simple example to show an overview of DataSourceV2 interface implementations in Spark 3. The full code example is [MinimalSpark3Connector](./MinimalSpark3Connector.scala) in the same folder.
55

6+
In addition to the minimal connector, please refer to the [TPC-DS connector for Glue 3.0](./tpcds-custom-connector-for-glue3.0) in the same folder, which is the actual Glue custom connector package published on AWS Marketplace.
7+
68
## Setup Environment
79
Build a local Scala environment with local Glue ETL maven library: [Developing Locally with Scala](https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-libraries.html). You may also refer to [GlueSparkRuntime](https://github.com/aws-samples/aws-glue-samples/blob/master/GlueCustomConnectors/development/GlueSparkRuntime/README.md) for more details to custom the local environment for advanced testing.
810

GlueCustomConnectors/development/Spark/glue-3.0/tpcds-custom-connector-for-glue3.0/README.md

Lines changed: 245 additions & 0 deletions
Large diffs are not rendered by default.
383 KB
Loading
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
# Copyright 2016-2021 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
# SPDX-License-Identifier: MIT-0
3+
4+
from pyspark.context import SparkContext
5+
from awsglue.context import GlueContext
6+
7+
8+
glue_context = GlueContext(SparkContext(), minPartitions=1, targetPartitions=20)
9+
logger = glue_context.get_logger()
10+
connection_name = "GlueTPCDSConnection"
11+
connection_type = "marketplace.spark" # use `custom.spark` when running job validation tests by self-build custom connector
12+
13+
'''
14+
1. Partition count - generated single chunk data and row count is more than numPartitions
15+
'''
16+
connection_options_1 = {
17+
"table": "customer",
18+
"scale": "1",
19+
"numPartitions": "5",
20+
"connectionName" : connection_name
21+
}
22+
23+
# read data from data source
24+
dyf_1 = glue_context.create_dynamic_frame_from_options(
25+
connection_type=connection_type,
26+
connection_options=connection_options_1)
27+
28+
# validate number of partitions and row count
29+
expected_partitions = 5
30+
result_partitions = dyf_1.getNumPartitions()
31+
assert result_partitions == expected_partitions
32+
logger.info(f'Expected partition count: {expected_partitions}, result partition count: {result_partitions}')
33+
34+
expected_count = 100000
35+
result_count = dyf_1.count()
36+
assert dyf_1.count() == expected_count
37+
logger.info(f'Expected record count: {expected_count}, result record count: {result_count}')
38+
39+
'''
40+
2. Partition count - generated single chunk data and row count is less than numPartitions
41+
'''
42+
connection_options_2 = {
43+
"table": "call_center",
44+
"scale": "1",
45+
"numPartitions": "100",
46+
"connectionName" : connection_name
47+
}
48+
49+
# read data from data source
50+
dyf_2 = glue_context.create_dynamic_frame_from_options(
51+
connection_type=connection_type,
52+
connection_options=connection_options_2)
53+
54+
# validate number of partitions and row count
55+
expected_partitions = 1
56+
result_partitions = dyf_2.getNumPartitions()
57+
assert result_partitions == expected_partitions
58+
logger.info(f'Expected partition count: {expected_partitions}, result partition count: {result_partitions}')
59+
60+
expected_count = 6
61+
result_count = dyf_2.count()
62+
assert result_count == expected_count
63+
logger.info(f'Expected record count: {expected_count}, result record count: {result_count}')
64+
65+
66+
'''
67+
3. Partition count - generated multiple chunk data - in parallel
68+
'''
69+
connection_options_3 = {
70+
"table": "customer",
71+
"scale": "100",
72+
"numPartitions": "100",
73+
"connectionName" : connection_name
74+
}
75+
76+
# read data from data source
77+
dyf_3 = glue_context.create_dynamic_frame_from_options(
78+
connection_type=connection_type,
79+
connection_options=connection_options_3)
80+
81+
# validate number of partitions and row count
82+
expected_partitions = 100
83+
result_partitions = dyf_3.getNumPartitions()
84+
assert result_partitions == expected_partitions
85+
logger.info(f'Expected partition count: {expected_partitions}, result partition count: {result_partitions}')
86+
87+
expected_count = 2000000
88+
result_count = dyf_3.count()
89+
assert result_count == expected_count
90+
logger.info(f'Expected record count: {expected_count}, result record count: {result_count}')
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
# Copyright 2016-2021 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
# SPDX-License-Identifier: MIT-0
3+
4+
from pyspark.context import SparkContext
5+
from awsglue.context import GlueContext
6+
from awsglue.gluetypes import Field, IntegerType, LongType, StructType, StringType, DecimalType, DateType
7+
8+
9+
glue_context = GlueContext(SparkContext())
10+
logger = glue_context.get_logger()
11+
12+
connection_options = {
13+
"table": "call_center",
14+
"scale": "1",
15+
"numPartitions": "1",
16+
"connectionName" : "GlueTPCDSConnection"
17+
}
18+
19+
# read data from data source
20+
datasource0 = glue_context.create_dynamic_frame_from_options(
21+
connection_type="marketplace.spark", # use `custom.spark` when running job validation tests by self-build custom connector
22+
connection_options=connection_options)
23+
24+
# validate data schema
25+
expected_schema = StructType([
26+
Field("cc_call_center_sk", LongType()),
27+
Field("cc_call_center_id", StringType()),
28+
Field("cc_rec_start_date", DateType()),
29+
Field("cc_rec_end_date", DateType()),
30+
Field("cc_closed_date_sk", IntegerType()),
31+
Field("cc_open_date_sk", IntegerType()),
32+
Field("cc_name", StringType()),
33+
Field("cc_class", StringType()),
34+
Field("cc_employees", IntegerType()),
35+
Field("cc_sq_ft", IntegerType()),
36+
Field("cc_hours", StringType()),
37+
Field("cc_manager", StringType()),
38+
Field("cc_mkt_id", IntegerType()),
39+
Field("cc_mkt_class", StringType()),
40+
Field("cc_mkt_desc", StringType()),
41+
Field("cc_market_manager", StringType()),
42+
Field("cc_division", IntegerType()),
43+
Field("cc_division_name", StringType()),
44+
Field("cc_company", IntegerType()),
45+
Field("cc_company_name", StringType()),
46+
Field("cc_street_number", StringType()),
47+
Field("cc_street_name", StringType()),
48+
Field("cc_street_type", StringType()),
49+
Field("cc_suite_number", StringType()),
50+
Field("cc_city", StringType()),
51+
Field("cc_county", StringType()),
52+
Field("cc_state", StringType()),
53+
Field("cc_zip", StringType()),
54+
Field("cc_country", StringType()),
55+
Field("cc_gmt_offset", DecimalType(precision=5, scale=2)),
56+
Field("cc_tax_percentage", DecimalType(precision=5, scale=2))
57+
])
58+
59+
result_schema = datasource0.schema()
60+
assert result_schema == expected_schema
61+
logger.info(f'Expected schema: {expected_schema.jsonValue()}')
62+
logger.info(f'Result schema: {result_schema.jsonValue()}')
63+
logger.info("Result schema in tree structure: ")
64+
datasource0.printSchema()
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Copyright 2016-2021 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
# SPDX-License-Identifier: MIT-0
3+
4+
from pyspark.context import SparkContext
5+
from awsglue.context import GlueContext
6+
7+
8+
glue_context = GlueContext(SparkContext())
9+
logger = glue_context.get_logger()
10+
11+
connection_options = {
12+
"table": "customer",
13+
"scale": "1",
14+
"numPartitions": "1",
15+
"connectionName" : "GlueTPCDSConnection"
16+
}
17+
18+
# read data from data source
19+
datasource0 = glue_context.create_dynamic_frame_from_options(
20+
connection_type="marketplace.spark",
21+
connection_options=connection_options)
22+
23+
# validate data reading and row count
24+
expected_count = 100000
25+
result_count = datasource0.count()
26+
assert result_count == expected_count
27+
logger.info(f'Expected record count: {expected_count}, result record count: {result_count}')
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/*
2+
* Copyright 2016-2021 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
* SPDX-License-Identifier: MIT-0
4+
*/
5+
6+
import com.amazonaws.services.glue.GlueContext
7+
import com.amazonaws.services.glue.errors.CallSite
8+
import com.amazonaws.services.glue.schema.{Schema, TypeCode}
9+
import com.amazonaws.services.glue.schema.builders.SchemaBuilder
10+
import com.amazonaws.services.glue.schema.types.DecimalType
11+
import com.amazonaws.services.glue.util.GlueArgParser
12+
import com.amazonaws.services.glue.util.Job
13+
import com.amazonaws.services.glue.util.JsonOptions
14+
import org.apache.spark.SparkContext
15+
16+
import scala.collection.JavaConverters._
17+
18+
object GlueJobValidationDataPartitioningTest {
19+
def main(sysArgs: Array[String]) {
20+
val spark: SparkContext = new SparkContext()
21+
val glueContext: GlueContext = new GlueContext(spark, 1, 20) // Change these values for preventing from repartition by GlueContext default config.
22+
23+
val connectionName = "GlueTPCDSConnection"
24+
val connectionType = "marketplace.spark"
25+
26+
// 1. Partition count - generated single chunk data and row count is more than numPartitions
27+
val options_1 = Map(
28+
"table" -> "customer",
29+
"scale" -> "1",
30+
"numPartitions" -> "5",
31+
"connectionName" -> connectionName
32+
)
33+
34+
val dyf_1 = glueContext.getSource(
35+
connectionType = connectionType,
36+
connectionOptions = JsonOptions(options_1),
37+
transformationContext = "dyf"
38+
).getDynamicFrame()
39+
40+
assert(dyf_1.getNumPartitions == 5)
41+
assert(dyf_1.count == 100000)
42+
43+
44+
// 2. Partition count - generated single chunk data and row count is less than numPartitions
45+
val options_2 = Map(
46+
"table" -> "call_center",
47+
"scale" -> "1",
48+
"numPartitions" -> "100",
49+
"connectionName" -> connectionName
50+
)
51+
52+
val dyf_2 = glueContext.getSource(
53+
connectionType = connectionType,
54+
connectionOptions = JsonOptions(options_2),
55+
transformationContext = "dyf"
56+
).getDynamicFrame()
57+
58+
assert(dyf_2.getNumPartitions == 1)
59+
assert(dyf_2.count == 6)
60+
61+
62+
// 3. Partition count - generated multiple chunk data - in parallel
63+
val options_3 = Map(
64+
"table" -> "customer",
65+
"scale" -> "100",
66+
"numPartitions" -> "100",
67+
"connectionName" -> connectionName
68+
)
69+
70+
val dyf_3 = glueContext.getSource(
71+
connectionType = connectionType,
72+
connectionOptions = JsonOptions(options_3),
73+
transformationContext = "dyf"
74+
).getDynamicFrame()
75+
76+
assert(dyf_3.getNumPartitions == 100)
77+
assert(dyf_3.count == 2000000)
78+
}
79+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/*
2+
* Copyright 2016-2021 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
* SPDX-License-Identifier: MIT-0
4+
*/
5+
6+
import com.amazonaws.services.glue.GlueContext
7+
import com.amazonaws.services.glue.errors.CallSite
8+
import com.amazonaws.services.glue.schema.{Schema, TypeCode}
9+
import com.amazonaws.services.glue.schema.builders.SchemaBuilder
10+
import com.amazonaws.services.glue.schema.types.DecimalType
11+
import com.amazonaws.services.glue.util.GlueArgParser
12+
import com.amazonaws.services.glue.util.Job
13+
import com.amazonaws.services.glue.util.JsonOptions
14+
import org.apache.spark.SparkContext
15+
16+
import scala.collection.JavaConverters._
17+
18+
object GlueJobValidationDataSchemaTest {
19+
def main(sysArgs: Array[String]) {
20+
val spark: SparkContext = new SparkContext()
21+
val glueContext: GlueContext = new GlueContext(spark)
22+
23+
val optionsMap = Map(
24+
"table" -> "call_center",
25+
"scale" -> "1",
26+
"numPartitions" -> "1",
27+
"connectionName" -> "GlueTPCDSConnection"
28+
)
29+
30+
// create DataSource and read data
31+
val customSource = glueContext.getSource(
32+
connectionType = "marketplace.spark",
33+
connectionOptions = JsonOptions(optionsMap),
34+
transformationContext = "customSource")
35+
val dyf = customSource.getDynamicFrame()
36+
37+
// verify schema of 'customer' table
38+
val expectedSchema = new Schema(new SchemaBuilder()
39+
.beginStruct()
40+
.atomicField("cc_call_center_sk", TypeCode.LONG)
41+
.atomicField("cc_call_center_id", TypeCode.STRING)
42+
.atomicField("cc_rec_start_date", TypeCode.DATE)
43+
.atomicField("cc_rec_end_date", TypeCode.DATE)
44+
.atomicField("cc_closed_date_sk", TypeCode.INT)
45+
.atomicField("cc_open_date_sk", TypeCode.INT)
46+
.atomicField("cc_name", TypeCode.STRING)
47+
.atomicField("cc_class", TypeCode.STRING)
48+
.atomicField("cc_employees", TypeCode.INT)
49+
.atomicField("cc_sq_ft", TypeCode.INT)
50+
.atomicField("cc_hours", TypeCode.STRING)
51+
.atomicField("cc_manager", TypeCode.STRING)
52+
.atomicField("cc_mkt_id", TypeCode.INT)
53+
.atomicField("cc_mkt_class", TypeCode.STRING)
54+
.atomicField("cc_mkt_desc", TypeCode.STRING)
55+
.atomicField("cc_market_manager", TypeCode.STRING)
56+
.atomicField("cc_division", TypeCode.INT)
57+
.atomicField("cc_division_name", TypeCode.STRING)
58+
.atomicField("cc_company", TypeCode.INT)
59+
.atomicField("cc_company_name", TypeCode.STRING)
60+
.atomicField("cc_street_number", TypeCode.STRING)
61+
.atomicField("cc_street_name", TypeCode.STRING)
62+
.atomicField("cc_street_type", TypeCode.STRING)
63+
.atomicField("cc_suite_number", TypeCode.STRING)
64+
.atomicField("cc_city", TypeCode.STRING)
65+
.atomicField("cc_county", TypeCode.STRING)
66+
.atomicField("cc_state", TypeCode.STRING)
67+
.atomicField("cc_zip", TypeCode.STRING)
68+
.atomicField("cc_country", TypeCode.STRING)
69+
.atomicField("cc_gmt_offset", new DecimalType(5, 2))
70+
.atomicField("cc_tax_percentage", new DecimalType(5, 2))
71+
.endStruct().build())
72+
73+
assert(dyf.schema == expectedSchema)
74+
}
75+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Copyright 2016-2021 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
* SPDX-License-Identifier: MIT-0
4+
*/
5+
6+
import com.amazonaws.services.glue.GlueContext
7+
import com.amazonaws.services.glue.errors.CallSite
8+
import com.amazonaws.services.glue.util.GlueArgParser
9+
import com.amazonaws.services.glue.util.Job
10+
import com.amazonaws.services.glue.util.JsonOptions
11+
import org.apache.spark.SparkContext
12+
import scala.collection.JavaConverters._
13+
14+
object GlueJobValidationDataSourceTest {
15+
def main(sysArgs: Array[String]) {
16+
val spark: SparkContext = new SparkContext()
17+
val glueContext: GlueContext = new GlueContext(spark)
18+
19+
val optionsMap = Map(
20+
"table" -> "customer",
21+
"scale" -> "1",
22+
"numPartitions" -> "1",
23+
"connectionName" -> "GlueTPCDSConnection"
24+
)
25+
26+
// create DataSource
27+
val customSource = glueContext.getSource(
28+
connectionType = "marketplace.spark",
29+
connectionOptions = JsonOptions(optionsMap),
30+
transformationContext = "customSource")
31+
val dyf = customSource.getDynamicFrame()
32+
33+
// verify data
34+
val expectedRowCount = 100000
35+
assert(dyf.count == expectedRowCount)
36+
}
37+
}

0 commit comments

Comments
 (0)