Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5471,12 +5471,13 @@ public void testInsertRowConcurrently()
int threads = 4;
CyclicBarrier barrier = new CyclicBarrier(threads);
ExecutorService executor = newFixedThreadPool(threads);
long insertTimeoutMillis = getConcurrentInsertTimeout().toMillis();
try (TestTable table = createTableWithOneIntegerColumn("test_insert")) {
String tableName = table.getName();

List<Future<OptionalInt>> futures = IntStream.range(0, threads)
.mapToObj(threadNumber -> executor.submit(() -> {
barrier.await(10, SECONDS);
barrier.await(insertTimeoutMillis, MILLISECONDS);
try {
getQueryRunner().execute("INSERT INTO " + tableName + " VALUES (" + threadNumber + ")");
return OptionalInt.of(threadNumber);
Expand All @@ -5498,7 +5499,7 @@ public void testInsertRowConcurrently()
.collect(toImmutableList());

List<Integer> values = futures.stream()
.map(future -> tryGetFutureValue(future, 10, SECONDS).orElseThrow(() -> new RuntimeException("Wait timed out")))
.map(future -> tryGetFutureValue(future, (int) insertTimeoutMillis, MILLISECONDS).orElseThrow(() -> new RuntimeException("Wait timed out")))
.filter(OptionalInt::isPresent)
.map(OptionalInt::getAsInt)
.collect(toImmutableList());
Expand All @@ -5516,7 +5517,7 @@ public void testInsertRowConcurrently()
}
finally {
executor.shutdownNow();
executor.awaitTermination(10, SECONDS);
executor.awaitTermination(insertTimeoutMillis, MILLISECONDS);
}
}

Expand All @@ -5526,6 +5527,14 @@ protected void verifyConcurrentInsertFailurePermissible(Exception e)
throw new AssertionError("Unexpected concurrent insert failure", e);
}

protected Duration getConcurrentInsertTimeout()
{
// most often we experienced this test failing due to timeout waiting on the barrier,
// the reason behind this for pooled connectors is connection timeout,
// making this timeout adjustable according to connection timeout settings
return new Duration(10, SECONDS);
}

// Repeat test with invocationCount for better test coverage, since the tested aspect is inherently non-deterministic.
@RepeatedTest(4)
@Timeout(60)
Expand Down
Loading