Skip to content

Commit 2f16ed7

Browse files
committed
Use reflection to call SQLContext.createDataFrame
- Spark 1 and Spark 2 are not binary compatible for the method
1 parent 22454f5 commit 2f16ed7

File tree

2 files changed

+17
-2
lines changed

2 files changed

+17
-2
lines changed

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222

2323
<groupId>co.cask.hydrator</groupId>
2424
<artifactId>dynamic-spark</artifactId>
25-
<version>2.0.5</version>
25+
<version>2.0.6</version>
2626

2727
<properties>
2828
<!-- properties for script build step that creates the config files for the artifacts -->

src/main/java/co/cask/hydrator/plugin/spark/dynamic/ScalaSparkCompute.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import java.io.IOException;
4949
import java.io.PrintWriter;
5050
import java.io.StringWriter;
51+
import java.lang.reflect.InvocationTargetException;
5152
import java.lang.reflect.Method;
5253
import java.lang.reflect.ParameterizedType;
5354
import java.lang.reflect.Type;
@@ -207,7 +208,7 @@ public JavaRDD<StructuredRecord> transform(SparkExecutionPluginContext context,
207208
StructType rowType = DataFrames.toDataType(inputSchema);
208209
JavaRDD<Row> rowRDD = javaRDD.map(new RecordToRow(rowType));
209210

210-
Object dataFrame = sqlContext.createDataFrame(rowRDD, rowType);
211+
Object dataFrame = createDataFrame(sqlContext, rowRDD, rowType);
211212
Object result = takeContext ? method.invoke(null, dataFrame, context) : method.invoke(null, dataFrame);
212213

213214
// Convert the DataFrame back to RDD<StructureRecord>
@@ -373,6 +374,20 @@ private boolean isDataFrame(Type type) {
373374
return false;
374375
}
375376

377+
private Object createDataFrame(SQLContext sqlContext, JavaRDD<Row> rdd, StructType type) {
378+
try {
379+
return sqlContext.getClass()
380+
.getMethod("createDataFrame", JavaRDD.class, StructType.class)
381+
.invoke(sqlContext, rdd, type);
382+
} catch (NoSuchMethodException e) {
383+
throw new RuntimeException("Unable to find SQLContext.createDataFrame(JavaRDD, StructType) method", e);
384+
} catch (IllegalAccessException e) {
385+
throw new RuntimeException("No permission to invoke SQLContext.createDataFrame(JavaRDD, StructType) method", e);
386+
} catch (InvocationTargetException e) {
387+
throw new RuntimeException("Failed to invoke SQLContext.createDataFrame(JavaRDD, StructType) method", e);
388+
}
389+
}
390+
376391
/**
377392
* Configuration object for the plugin
378393
*/

0 commit comments

Comments
 (0)