Skip to content

Commit 0d16e49

Browse files
committed
Merge branch 'release/2.0' into develop
2 parents 846abc4 + be9e555 commit 0d16e49

File tree

1 file changed

+35
-7
lines changed

1 file changed

+35
-7
lines changed

src/main/java/co/cask/hydrator/plugin/spark/dynamic/ScalaSparkCompute.java

Lines changed: 35 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,12 @@
3131
import co.cask.cdap.etl.api.batch.SparkCompute;
3232
import co.cask.cdap.etl.api.batch.SparkExecutionPluginContext;
3333
import org.apache.spark.SparkContext;
34+
import org.apache.spark.SparkFirehoseListener;
3435
import org.apache.spark.api.java.JavaRDD;
3536
import org.apache.spark.api.java.function.Function;
3637
import org.apache.spark.rdd.RDD;
38+
import org.apache.spark.scheduler.SparkListenerApplicationEnd;
39+
import org.apache.spark.scheduler.SparkListenerEvent;
3740
import org.apache.spark.sql.Row;
3841
import org.apache.spark.sql.SQLContext;
3942
import org.apache.spark.sql.types.DataType;
@@ -134,16 +137,41 @@ public void configurePipeline(PipelineConfigurer pipelineConfigurer) throws Ille
134137
public void initialize(SparkExecutionPluginContext context) throws Exception {
135138
String className = generateClassName(context.getStageName());
136139
interpreter = context.createSparkInterpreter();
137-
File dir = config.getDependencies() == null ? null : Files.createTempDirectory("sparkprogram").toFile();
138-
try {
139-
if (config.getDependencies() != null) {
140-
SparkCompilers.addDependencies(dir, interpreter, config.getDependencies());
140+
File tempDir = null;
141+
if (config.getDependencies() != null) {
142+
tempDir = Files.createTempDirectory("sparkprogram").toFile();
143+
SparkCompilers.addDependencies(tempDir, interpreter, config.getDependencies());
144+
}
145+
// Release resources on application completion.
146+
final File finalTempDir = tempDir;
147+
SparkFirehoseListener sparkListener = new SparkFirehoseListener() {
148+
@Override
149+
public void onEvent(SparkListenerEvent event) {
150+
if (event instanceof SparkListenerApplicationEnd) {
151+
LOG.info("Releasing resources on Spark application completion.");
152+
interpreter.close();
153+
if (finalTempDir != null) {
154+
SparkCompilers.deleteDir(finalTempDir);
155+
}
156+
}
141157
}
158+
};
159+
// Need to use reflection to find and call the addSparkListener() method, due to incompatible changes
160+
// between Spark1 (SparkListener) and Spark2 (SparkListenerInterface).
161+
SparkContext sc = context.getSparkContext().sc();
162+
for (Method method : sc.getClass().getMethods()) {
163+
if (method.getName().equals("addSparkListener")) {
164+
Class<?>[] paramTypes = method.getParameterTypes();
165+
if (paramTypes.length == 1 && paramTypes[0].isAssignableFrom(sparkListener.getClass())) {
166+
method.invoke(sc, sparkListener);
167+
break;
168+
}
169+
}
170+
}
171+
142172
interpreter.compile(generateSourceClass(className));
143173
method = getTransformMethod(interpreter.getClassLoader(), className);
144-
} finally {
145-
SparkCompilers.deleteDir(dir);
146-
}
174+
147175
isDataFrame = method.getParameterTypes()[0].equals(DATAFRAME_TYPE);
148176
takeContext = method.getParameterTypes().length == 2;
149177

0 commit comments

Comments
 (0)