Skip to content

Commit 831771f

Browse files
authored
Merge pull request #18 from data-integrations/add-scala-spark-sink
Add a Scala SparkSink plugin type
2 parents 136753d + c50f3df commit 831771f

File tree

7 files changed

+747
-335
lines changed

7 files changed

+747
-335
lines changed

docs/ScalaSparkSink-sparksink.md

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
# Spark Sink in Scala
2+
3+
Description
4+
-----------
5+
Executes user-provided Spark code in Scala that operates on an input RDD or Dataframe with full
6+
access to all Spark features.
7+
8+
Use Case
9+
--------
10+
This plugin can be used when you want to have complete control on the Spark computation.
11+
For example, you may want to join the input RDD with another Dataset and select a subset
12+
of the join result using Spark SQL before writing the results out to files in parquet format.
13+
14+
Properties
15+
----------
16+
**scalaCode** Spark code in Scala defining how to transform RDD to RDD.
17+
The code must implement a function called ``sink``, whose signature should be one of:
18+
19+
def sink(df: DataFrame) : DataFrame
20+
21+
def sink(df: DataFrame, context: SparkExecutionPluginContext) : DataFrame
22+
23+
The input ``DataFrame`` has the same schema as the input schema to this stage.
24+
Using the ``SparkExecutionPluginContext``, you can access CDAP
25+
entities such as Datasets, as well as providing access to the underlying ``SparkContext`` in use.
26+
27+
Operating on lower level ``RDD`` is also possible by using the one of the following forms of the ``sink`` method:
28+
29+
def sink(rdd: RDD[StructuredRecord]) : RDD[StructuredRecord]
30+
31+
def sink(rdd: RDD[StructuredRecord], context: SparkExecutionPluginContext) : RDD[StructuredRecord]
32+
33+
For example:
34+
35+
def sink(rdd: RDD[StructuredRecord], context: SparkExecutionPluginContext) : Unit = {
36+
val outputSchema = context.getOutputSchema
37+
rdd
38+
.flatMap(_.get[String]("body").split("\\s+"))
39+
.map(s => (s, 1))
40+
.reduceByKey(_ + _)
41+
.saveAsTextFile("output")
42+
}
43+
44+
This will perform a word count on the input field ``'body'``, then write out the results as a text file.
45+
46+
The following imports are included automatically and are ready for the user code to use:
47+
48+
import co.cask.cdap.api.data.format._
49+
import co.cask.cdap.api.data.schema._;
50+
import co.cask.cdap.etl.api.batch._
51+
import org.apache.spark._
52+
import org.apache.spark.api.java._
53+
import org.apache.spark.rdd._
54+
import org.apache.spark.sql._
55+
import org.apache.spark.SparkContext._
56+
import scala.collection.JavaConversions._
57+
58+
59+
**deployCompile** Specify whether the code will get validated during pipeline creation time. Setting this to `false`
60+
will skip the validation.
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Copyright © 2018 Cask Data, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
17+
package co.cask.hydrator.plugin.spark.dynamic;
18+
19+
import co.cask.cdap.api.data.format.StructuredRecord;
20+
import co.cask.cdap.api.spark.sql.DataFrames;
21+
import org.apache.spark.api.java.function.Function;
22+
import org.apache.spark.sql.Row;
23+
import org.apache.spark.sql.types.StructType;
24+
25+
/**
26+
* Function to map from {@link StructuredRecord} to {@link Row}.
27+
*/
28+
public class RecordToRow implements Function<StructuredRecord, Row> {
29+
private final StructType rowType;
30+
31+
public RecordToRow(StructType rowType) {
32+
this.rowType = rowType;
33+
}
34+
35+
@Override
36+
public Row call(StructuredRecord record) throws Exception {
37+
return DataFrames.toRow(record, rowType);
38+
}
39+
}

0 commit comments

Comments
 (0)