Skip to content

Commit 5a778c3

Browse files
author
vjosa@cirus.com
committed
Fixed: Show new error message when output schema is not maching input schema
1 parent 52cbef1 commit 5a778c3

File tree

2 files changed

+9
-1
lines changed

2 files changed

+9
-1
lines changed

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
<data.pipeline.parent>system:cdap-data-pipeline[6.0.0-SNAPSHOT,7.0.0-SNAPSHOT)</data.pipeline.parent>
3030
<data.stream.parent>system:cdap-data-streams[6.0.0-SNAPSHOT,7.0.0-SNAPSHOT)</data.stream.parent>
3131

32-
<cdap.version>6.2.0-SNAPSHOT</cdap.version>
32+
<cdap.version>6.2.0</cdap.version>
3333
<spark.version>2.1.3</spark.version>
3434
<logback.version>1.0.9</logback.version>
3535

src/main/java/io/cdap/plugin/spark/dynamic/ScalaSparkCompute.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import io.cdap.cdap.api.data.schema.Schema;
2525
import io.cdap.cdap.api.plugin.PluginConfig;
2626
import io.cdap.cdap.api.spark.sql.DataFrames;
27+
import io.cdap.cdap.etl.api.FailureCollector;
2728
import io.cdap.cdap.etl.api.PipelineConfigurer;
2829
import io.cdap.cdap.etl.api.StageConfigurer;
2930
import io.cdap.cdap.etl.api.batch.SparkCompute;
@@ -110,6 +111,13 @@ public JavaRDD<StructuredRecord> transform(SparkExecutionPluginContext context,
110111
// If there is no output schema configured, derive it from the DataFrame
111112
// Otherwise, assume the DataFrame has the correct schema already
112113
outputSchema = DataFrames.toSchema((DataType) invokeDataFrameMethod(result, "schema"));
114+
} else {
115+
Schema dataSchema = DataFrames.toSchema((DataType) invokeDataFrameMethod(result, "schema"));
116+
if (!dataSchema.isCompatible(outputSchema)) {
117+
FailureCollector collector = context.getFailureCollector();
118+
collector.addFailure("Schema mismatch.", "Output schema is not matching input schema.");
119+
collector.getOrThrowException();
120+
}
113121
}
114122
//noinspection unchecked
115123
return ((JavaRDD<Row>) invokeDataFrameMethod(result, "toJavaRDD")).map(new RowToRecord(outputSchema));

0 commit comments

Comments
 (0)