Skip to content

Commit c2c5265

Browse files
authored
Add Try/catch around runAsync() for tests so we always print results and dispose of progress (Azure#22352)
* Add Try/catch around runAsync() for tests so we always print results. * Always dispose of progress.
1 parent f4a0ced commit c2c5265

File tree

1 file changed

+50
-46
lines changed

1 file changed

+50
-46
lines changed

common/perf-test-core/src/main/java/com/azure/perf/test/core/PerfStressProgram.java

Lines changed: 50 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,15 @@
33

44
package com.azure.perf.test.core;
55

6+
import com.beust.jcommander.JCommander;
7+
import com.fasterxml.jackson.core.JsonGenerator;
8+
import com.fasterxml.jackson.databind.ObjectMapper;
9+
import com.fasterxml.jackson.databind.SerializationFeature;
10+
import reactor.core.Disposable;
11+
import reactor.core.publisher.Flux;
12+
import reactor.core.publisher.Mono;
13+
import reactor.core.scheduler.Schedulers;
14+
615
import java.io.IOException;
716
import java.lang.reflect.InvocationTargetException;
817
import java.time.Duration;
@@ -14,16 +23,6 @@
1423
import java.util.function.Supplier;
1524
import java.util.stream.IntStream;
1625

17-
import com.beust.jcommander.JCommander;
18-
import com.fasterxml.jackson.core.JsonGenerator;
19-
import com.fasterxml.jackson.databind.ObjectMapper;
20-
import com.fasterxml.jackson.databind.SerializationFeature;
21-
22-
import reactor.core.Disposable;
23-
import reactor.core.publisher.Flux;
24-
import reactor.core.publisher.Mono;
25-
import reactor.core.scheduler.Schedulers;
26-
2726
/**
2827
* Represents the main program class which reflectively runs and manages the performance tests.
2928
*/
@@ -46,9 +45,10 @@ private static double getOperationsPerSecond() {
4645
/**
4746
* Runs the performance tests passed to be executed.
4847
*
49-
* @throws RuntimeException if the execution fails.
5048
* @param classes the performance test classes to execute.
5149
* @param args the command line arguments ro run performance tests with.
50+
*
51+
* @throws RuntimeException if the execution fails.
5252
*/
5353
public static void run(Class<?>[] classes, String[] args) {
5454
List<Class<?>> classList = new ArrayList<>(Arrays.asList(classes));
@@ -62,13 +62,13 @@ public static void run(Class<?>[] classes, String[] args) {
6262
}
6363

6464
String[] commands = classList.stream().map(c -> getCommandName(c.getSimpleName()))
65-
.toArray(i -> new String[i]);
65+
.toArray(i -> new String[i]);
6666

6767
PerfStressOptions[] options = classList.stream().map(c -> {
6868
try {
6969
return c.getConstructors()[0].getParameterTypes()[0].getConstructors()[0].newInstance();
7070
} catch (InstantiationException | IllegalAccessException | IllegalArgumentException
71-
| InvocationTargetException | SecurityException e) {
71+
| InvocationTargetException | SecurityException e) {
7272
throw new RuntimeException(e);
7373
}
7474
}).toArray(i -> new PerfStressOptions[i]);
@@ -99,9 +99,10 @@ private static String getCommandName(String testName) {
9999
/**
100100
* Run the performance test passed to be executed.
101101
*
102-
* @throws RuntimeException if the execution fails.
103102
* @param testClass the performance test class to execute.
104103
* @param options the configuration ro run performance test with.
104+
*
105+
* @throws RuntimeException if the execution fails.
105106
*/
106107
public static void run(Class<?> testClass, PerfStressOptions options) {
107108
System.out.println("=== Options ===");
@@ -125,7 +126,7 @@ public static void run(Class<?> testClass, PerfStressOptions options) {
125126
try {
126127
tests[i] = (PerfStressTest<?>) testClass.getConstructor(options.getClass()).newInstance(options);
127128
} catch (InstantiationException | IllegalAccessException | IllegalArgumentException
128-
| InvocationTargetException | SecurityException | NoSuchMethodException e) {
129+
| InvocationTargetException | SecurityException | NoSuchMethodException e) {
129130
throw new RuntimeException(e);
130131
}
131132
}
@@ -149,9 +150,7 @@ public static void run(Class<?> testClass, PerfStressOptions options) {
149150
}
150151
} finally {
151152
if (!options.isNoCleanup()) {
152-
if (cleanupStatus == null) {
153-
cleanupStatus = printStatus("=== Cleanup ===", () -> ".", false, false);
154-
}
153+
cleanupStatus = printStatus("=== Cleanup ===", () -> ".", false, false);
155154

156155
Flux.just(tests).flatMap(t -> t.cleanupAsync()).blockLast();
157156
}
@@ -174,12 +173,13 @@ public static void run(Class<?> testClass, PerfStressOptions options) {
174173
/**
175174
* Runs the performance tests passed to be executed.
176175
*
177-
* @throws RuntimeException if the execution fails.
178176
* @param tests the performance tests to be executed.
179177
* @param sync indicate if synchronous test should be run.
180178
* @param parallel the number of parallel threads to run the performance test on.
181179
* @param durationSeconds the duration for which performance test should be run on.
182180
* @param title the title of the performance tests.
181+
*
182+
* @throws RuntimeException if the execution fails.
183183
* @throws IllegalStateException if zero operations completed of the performance test.
184184
*/
185185
public static void runTests(PerfStressTest<?>[] tests, boolean sync, int parallel, int durationSeconds, String title) {
@@ -188,9 +188,9 @@ public static void runTests(PerfStressTest<?>[] tests, boolean sync, int paralle
188188

189189
long endNanoTime = System.nanoTime() + ((long) durationSeconds * 1000000000);
190190

191-
int[] lastCompleted = new int[] { 0 };
191+
int[] lastCompleted = new int[]{0};
192192
Disposable progressStatus = printStatus(
193-
"=== " + title + " ===" + System.lineSeparator() + "Current\t\tTotal\t\tAverage", () -> {
193+
"=== " + title + " ===" + System.lineSeparator() + "Current\t\tTotal\t\tAverage", () -> {
194194
int totalCompleted = getCompletedOperations();
195195
int currentCompleted = totalCompleted - lastCompleted[0];
196196
double averageCompleted = getOperationsPerSecond();
@@ -199,35 +199,39 @@ public static void runTests(PerfStressTest<?>[] tests, boolean sync, int paralle
199199
return String.format("%d\t\t%d\t\t%.2f", currentCompleted, totalCompleted, averageCompleted);
200200
}, true, true);
201201

202-
if (sync) {
203-
ForkJoinPool forkJoinPool = new ForkJoinPool(parallel);
204-
try {
202+
try {
203+
if (sync) {
204+
ForkJoinPool forkJoinPool = new ForkJoinPool(parallel);
205205
forkJoinPool.submit(() -> {
206206
IntStream.range(0, parallel).parallel().forEach(i -> runLoop(tests[i], i, endNanoTime));
207207
}).get();
208-
} catch (InterruptedException | ExecutionException e) {
209-
throw new RuntimeException(e);
208+
209+
} else {
210+
// Exceptions like OutOfMemoryError are handled differently by the default Reactor schedulers. Instead of terminating the
211+
// Flux, the Flux will hang and the exception is only sent to the thread's uncaughtExceptionHandler and the Reactor
212+
// Schedulers.onHandleError. This handler ensures the perf framework will fail fast on any such exceptions.
213+
Schedulers.onHandleError((t, e) -> {
214+
System.err.print(t + " threw exception: ");
215+
e.printStackTrace();
216+
System.exit(1);
217+
});
218+
219+
Flux.range(0, parallel)
220+
.parallel()
221+
.runOn(Schedulers.boundedElastic())
222+
.flatMap(i -> runLoopAsync(tests[i], i, endNanoTime))
223+
.then()
224+
.block();
210225
}
211-
} else {
212-
// Exceptions like OutOfMemoryError are handled differently by the default Reactor schedulers. Instead of terminating the
213-
// Flux, the Flux will hang and the exception is only sent to the thread's uncaughtExceptionHandler and the Reactor
214-
// Schedulers.onHandleError. This handler ensures the perf framework will fail fast on any such exceptions.
215-
Schedulers.onHandleError((t, e) -> {
216-
System.err.print(t + " threw exception: ");
217-
e.printStackTrace();
218-
System.exit(1);
219-
});
220-
221-
Flux.range(0, parallel)
222-
.parallel()
223-
.runOn(Schedulers.boundedElastic())
224-
.flatMap(i -> runLoopAsync(tests[i], i, endNanoTime))
225-
.then()
226-
.block();
226+
} catch (InterruptedException | ExecutionException e) {
227+
System.err.println("Error occurred when submitting jobs to ForkJoinPool. " + System.lineSeparator() + e);
228+
throw new RuntimeException(e);
229+
} catch (Exception e) {
230+
System.err.println("Error occurred running tests: " + System.lineSeparator() + e);
231+
} finally {
232+
progressStatus.dispose();
227233
}
228234

229-
progressStatus.dispose();
230-
231235
System.out.println("=== Results ===");
232236

233237
int totalOperations = getCompletedOperations();
@@ -239,7 +243,7 @@ public static void runTests(PerfStressTest<?>[] tests, boolean sync, int paralle
239243
double weightedAverageSeconds = totalOperations / operationsPerSecond;
240244

241245
System.out.printf("Completed %,d operations in a weighted-average of %,.2fs (%,.2f ops/s, %,.3f s/op)%n",
242-
totalOperations, weightedAverageSeconds, operationsPerSecond, secondsPerOperation);
246+
totalOperations, weightedAverageSeconds, operationsPerSecond, secondsPerOperation);
243247
System.out.println();
244248
}
245249

@@ -269,7 +273,7 @@ private static Mono<Void> runLoopAsync(PerfStressTest<?> test, int index, long e
269273
private static Disposable printStatus(String header, Supplier<Object> status, boolean newLine, boolean printFinalStatus) {
270274
System.out.println(header);
271275

272-
boolean[] needsExtraNewline = new boolean[] { false };
276+
boolean[] needsExtraNewline = new boolean[]{false};
273277

274278
return Flux.interval(Duration.ofSeconds(1)).doFinally(s -> {
275279
if (printFinalStatus) {

0 commit comments

Comments
 (0)