Skip to content

Commit ca233d5

Browse files
committed
[CDAP-16290] Added FLL for ScalaSparkCompute and Sink plugins
1 parent b761cdc commit ca233d5

File tree

5 files changed

+66
-6
lines changed

5 files changed

+66
-6
lines changed

pom.xml

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
<data.pipeline.parent>system:cdap-data-pipeline[6.0.0-SNAPSHOT,7.0.0-SNAPSHOT)</data.pipeline.parent>
3030
<data.stream.parent>system:cdap-data-streams[6.0.0-SNAPSHOT,7.0.0-SNAPSHOT)</data.stream.parent>
3131

32-
<cdap.version>6.0.0-SNAPSHOT</cdap.version>
32+
<cdap.version>6.2.0-SNAPSHOT</cdap.version>
3333
<spark.version>2.1.3</spark.version>
3434
<logback.version>1.0.9</logback.version>
3535

@@ -100,6 +100,11 @@
100100
<version>4.11</version>
101101
<scope>test</scope>
102102
</dependency>
103+
<dependency>
104+
<groupId>io.cdap.plugin</groupId>
105+
<artifactId>hydrator-common</artifactId>
106+
<version>2.4.0-SNAPSHOT</version>
107+
</dependency>
103108
<dependency>
104109
<groupId>io.cdap.cdap</groupId>
105110
<artifactId>hydrator-test</artifactId>

src/main/java/io/cdap/plugin/spark/dynamic/ScalaSparkCompute.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828
import io.cdap.cdap.etl.api.StageConfigurer;
2929
import io.cdap.cdap.etl.api.batch.SparkCompute;
3030
import io.cdap.cdap.etl.api.batch.SparkExecutionPluginContext;
31+
import io.cdap.cdap.etl.api.batch.SparkPluginContext;
32+
import io.cdap.plugin.common.TransformLineageRecorderUtils;
3133
import org.apache.spark.api.java.JavaRDD;
3234
import org.apache.spark.api.java.function.Function;
3335
import org.apache.spark.rdd.RDD;
@@ -82,6 +84,16 @@ public void initialize(SparkExecutionPluginContext context) throws Exception {
8284
isRDD = !codeExecutor.isDataFrame();
8385
}
8486

87+
@Override
88+
public void prepareRun(SparkPluginContext context) throws Exception {
89+
super.prepareRun(context);
90+
Schema outSchema = config.schema == null ? context.getInputSchema() : Schema.parseJson(config.schema);
91+
context.record(TransformLineageRecorderUtils.generateManyToMany(
92+
TransformLineageRecorderUtils.getFields(context.getInputSchema()),
93+
TransformLineageRecorderUtils.getFields(outSchema), "sparkCompute",
94+
"Transformed fields according to spark computation"));
95+
}
96+
8597
@Override
8698
public JavaRDD<StructuredRecord> transform(SparkExecutionPluginContext context,
8799
JavaRDD<StructuredRecord> javaRDD) throws Exception {

src/main/java/io/cdap/plugin/spark/dynamic/ScalaSparkSink.java

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,16 +21,20 @@
2121
import io.cdap.cdap.api.annotation.Name;
2222
import io.cdap.cdap.api.annotation.Plugin;
2323
import io.cdap.cdap.api.data.format.StructuredRecord;
24+
import io.cdap.cdap.api.data.schema.Schema;
2425
import io.cdap.cdap.api.plugin.PluginConfig;
2526
import io.cdap.cdap.etl.api.PipelineConfigurer;
2627
import io.cdap.cdap.etl.api.StageConfigurer;
2728
import io.cdap.cdap.etl.api.batch.SparkExecutionPluginContext;
2829
import io.cdap.cdap.etl.api.batch.SparkPluginContext;
2930
import io.cdap.cdap.etl.api.batch.SparkSink;
31+
import io.cdap.plugin.common.LineageRecorder;
3032
import org.apache.spark.api.java.JavaRDD;
3133
import org.slf4j.Logger;
3234
import org.slf4j.LoggerFactory;
3335

36+
import java.util.List;
37+
import java.util.stream.Collectors;
3438
import javax.annotation.Nullable;
3539

3640
/**
@@ -64,7 +68,10 @@ public void configurePipeline(PipelineConfigurer pipelineConfigurer) throws Ille
6468

6569
@Override
6670
public void prepareRun(SparkPluginContext sparkPluginContext) throws Exception {
67-
// no-op
71+
Schema schema = sparkPluginContext.getInputSchema();
72+
if (schema != null && schema.getFields() != null) {
73+
recordLineage(sparkPluginContext, config.referenceName, schema, "Write", "Wrote to Scala Spark Sink.");
74+
}
6875
}
6976

7077
@Override
@@ -109,11 +116,16 @@ public static final class Config extends PluginConfig {
109116
@Nullable
110117
private final Boolean deployCompile;
111118

112-
public Config(String scalaCode, @Nullable String dependencies,
113-
@Nullable Boolean deployCompile) {
119+
@Name("referenceName")
120+
@Description("This will be used to uniquely identify this source/sink for lineage, annotating metadata, etc.")
121+
public String referenceName;
122+
123+
public Config(String scalaCode, @Nullable String dependencies, @Nullable Boolean deployCompile,
124+
String referenceName) {
114125
this.scalaCode = scalaCode;
115126
this.dependencies = dependencies;
116127
this.deployCompile = deployCompile;
128+
this.referenceName = referenceName;
117129
}
118130

119131
public String getScalaCode() {
@@ -129,5 +141,19 @@ public String getDependencies() {
129141
public Boolean getDeployCompile() {
130142
return deployCompile;
131143
}
144+
145+
public String getReferenceName() {
146+
return referenceName;
147+
}
148+
}
149+
150+
private void recordLineage(SparkPluginContext context, String outputName, Schema tableSchema, String operationName,
151+
String description) {
152+
LineageRecorder lineageRecorder = new LineageRecorder(context, outputName);
153+
lineageRecorder.createExternalDataset(tableSchema);
154+
List<String> fieldNames = tableSchema.getFields().stream().map(Schema.Field::getName).collect(Collectors.toList());
155+
if (!fieldNames.isEmpty()) {
156+
lineageRecorder.recordWrite(operationName, description, fieldNames);
157+
}
132158
}
133159
}

src/test/java/io/cdap/plugin/spark/dynamic/ScalaSparkTest.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -418,7 +418,9 @@ private void testWordCountSink(String code, File outputFolder) throws Exception
418418
ETLBatchConfig etlConfig = ETLBatchConfig.builder("* * * * *")
419419
.addStage(new ETLStage("source", MockSource.getPlugin(inputTable, inputSchema)))
420420
.addStage(new ETLStage("sink", new ETLPlugin("ScalaSparkSink", SparkSink.PLUGIN_TYPE,
421-
ImmutableMap.of("scalaCode", code))))
421+
ImmutableMap.of(
422+
"scalaCode", code,
423+
"referenceName", "sink"))))
422424
.addConnection("source", "sink")
423425
.build();
424426

widgets/ScalaSparkSink-sparksink.json

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,14 @@
77
{
88
"label": "Spark Scala",
99
"properties": [
10+
{
11+
"widget-type": "textbox",
12+
"label": "Reference Name",
13+
"name": "referenceName",
14+
"widget-attributes": {
15+
"placeholder": "Name used to identify this source for lineage"
16+
}
17+
},
1018
{
1119
"widget-type": "scala-editor",
1220
"label": "Scala",
@@ -31,5 +39,12 @@
3139
}
3240
]
3341
}
34-
]
42+
],
43+
"jump-config": {
44+
"datasets": [
45+
{
46+
"ref-property-name": "referenceName"
47+
}
48+
]
49+
}
3550
}

0 commit comments

Comments
 (0)