|
48 | 48 | import java.io.IOException; |
49 | 49 | import java.io.PrintWriter; |
50 | 50 | import java.io.StringWriter; |
| 51 | +import java.lang.reflect.InvocationTargetException; |
51 | 52 | import java.lang.reflect.Method; |
52 | 53 | import java.lang.reflect.ParameterizedType; |
53 | 54 | import java.lang.reflect.Type; |
@@ -207,7 +208,7 @@ public JavaRDD<StructuredRecord> transform(SparkExecutionPluginContext context, |
207 | 208 | StructType rowType = DataFrames.toDataType(inputSchema); |
208 | 209 | JavaRDD<Row> rowRDD = javaRDD.map(new RecordToRow(rowType)); |
209 | 210 |
|
210 | | - Object dataFrame = sqlContext.createDataFrame(rowRDD, rowType); |
| 211 | + Object dataFrame = createDataFrame(sqlContext, rowRDD, rowType); |
211 | 212 | Object result = takeContext ? method.invoke(null, dataFrame, context) : method.invoke(null, dataFrame); |
212 | 213 |
|
213 | 214 | // Convert the DataFrame back to RDD<StructureRecord> |
@@ -373,6 +374,20 @@ private boolean isDataFrame(Type type) { |
373 | 374 | return false; |
374 | 375 | } |
375 | 376 |
|
| 377 | + private Object createDataFrame(SQLContext sqlContext, JavaRDD<Row> rdd, StructType type) { |
| 378 | + try { |
| 379 | + return sqlContext.getClass() |
| 380 | + .getMethod("createDataFrame", JavaRDD.class, StructType.class) |
| 381 | + .invoke(sqlContext, rdd, type); |
| 382 | + } catch (NoSuchMethodException e) { |
| 383 | + throw new RuntimeException("Unable to find SQLContext.createDataFrame(JavaRDD, StructType) method", e); |
| 384 | + } catch (IllegalAccessException e) { |
| 385 | + throw new RuntimeException("No permission to invoke SQLContext.createDataFrame(JavaRDD, StructType) method", e); |
| 386 | + } catch (InvocationTargetException e) { |
| 387 | + throw new RuntimeException("Failed to invoke SQLContext.createDataFrame(JavaRDD, StructType) method", e); |
| 388 | + } |
| 389 | + } |
| 390 | + |
376 | 391 | /** |
377 | 392 | * Configuration object for the plugin |
378 | 393 | */ |
|
0 commit comments