Skip to content

Commit 3f63694

Browse files
committed
Added Unit Test and fixed bugs
1 parent 0bedf73 commit 3f63694

File tree

6 files changed

+249
-89
lines changed

6 files changed

+249
-89
lines changed

build.gradle

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,6 @@ dependencies {
3030
compileOnly group: 'org.projectlombok', name: 'lombok', version: lombokVersion
3131
annotationProcessor group: 'org.projectlombok', name: 'lombok', version: lombokVersion
3232

33-
// Mono Reactor
34-
compileOnly group: 'io.projectreactor', name: 'reactor-core', version: monoReactorVersion
35-
3633
// AWS SDK v1
3734
compileOnly group: 'com.amazonaws', name: 'aws-java-sdk', version: aws1SdkVersion
3835

@@ -49,11 +46,9 @@ dependencies {
4946

5047
// Testcontainers
5148
testImplementation platform(group: 'org.testcontainers', name: 'testcontainers-bom', version: testContainersVersion)
49+
testImplementation group: 'org.testcontainers', name: 'junit-jupiter', version: testContainersVersion
5250
testImplementation group: 'org.testcontainers', name: 'localstack', version: testContainersVersion
5351

54-
// Mono Reactor for running Tests
55-
testImplementation group: 'io.projectreactor', name: 'reactor-core', version: monoReactorVersion
56-
5752
// AWS SDK v1 for running Tests
5853
testImplementation group: 'com.amazonaws', name: 'aws-java-sdk', version: aws1SdkVersion
5954

src/main/java/me/aneeshneelam/lib/aws/s3/v1/S3ObjectSummaryIterator.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,22 +16,26 @@ public class S3ObjectSummaryIterator implements Iterator<S3ObjectSummary> {
1616
private ListObjectsV2Result listObjectsV2Result;
1717
private Iterator<S3ObjectSummary> s3ObjectSummaryIterator;
1818

19-
public S3ObjectSummaryIterator(AmazonS3 amazonS3, ListObjectsV2Request listObjectsV2Request) {
19+
public S3ObjectSummaryIterator(AmazonS3 amazonS3,
20+
ListObjectsV2Request listObjectsV2Request) {
21+
2022
this.amazonS3 = amazonS3;
2123
this.listObjectsV2Request = listObjectsV2Request;
2224
}
2325

2426
private void checkListObjectsV2ResponseState() {
2527
if (this.s3ObjectSummaryIterator != null) {
26-
if (this.listObjectsV2Result.isTruncated()) {
28+
if (!this.s3ObjectSummaryIterator.hasNext() && this.listObjectsV2Result.isTruncated()) {
2729
ListObjectsV2Request listObjectsV2ContinuationRequest = ((ListObjectsV2Request) this.listObjectsV2Request.clone())
28-
.withContinuationToken(this.listObjectsV2Result.getContinuationToken());
30+
.withContinuationToken(this.listObjectsV2Result.getNextContinuationToken());
2931
this.listObjectsV2Result = this.amazonS3.listObjectsV2(listObjectsV2ContinuationRequest);
3032
this.s3ObjectSummaryIterator = this.listObjectsV2Result.getObjectSummaries()
31-
.listIterator();
33+
.iterator();
3234
}
3335
} else {
3436
this.listObjectsV2Result = this.amazonS3.listObjectsV2(this.listObjectsV2Request);
37+
this.s3ObjectSummaryIterator = this.listObjectsV2Result.getObjectSummaries()
38+
.iterator();
3539
}
3640
}
3741

src/main/java/me/aneeshneelam/lib/aws/s3/v2/AsyncObjectIterator.java

Lines changed: 0 additions & 75 deletions
This file was deleted.
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
package me.aneeshneelam.lib.aws.s3.v2;
2+
3+
import software.amazon.awssdk.services.s3.S3AsyncClient;
4+
import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
5+
import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
6+
import software.amazon.awssdk.services.s3.model.S3Object;
7+
8+
import java.time.Duration;
9+
import java.util.Iterator;
10+
import java.util.concurrent.CompletableFuture;
11+
import java.util.concurrent.ExecutionException;
12+
import java.util.concurrent.TimeUnit;
13+
import java.util.concurrent.TimeoutException;
14+
15+
16+
public class AsyncS3ObjectIterator implements Iterator<S3Object> {
17+
18+
private final S3AsyncClient s3AsyncClient;
19+
private final ListObjectsV2Request listObjectsV2Request;
20+
private final Duration requestTimeoutDuration;
21+
22+
private boolean listObjectsV2ResponseTruncated;
23+
private String listObjectsV2ResponseNextContinuationToken;
24+
private Iterator<S3Object> s3ObjectIterator;
25+
26+
public AsyncS3ObjectIterator(S3AsyncClient s3AsyncClient,
27+
ListObjectsV2Request listObjectsV2Request,
28+
Duration requestTimeoutDuration) {
29+
30+
this.requestTimeoutDuration = requestTimeoutDuration;
31+
this.s3AsyncClient = s3AsyncClient;
32+
this.listObjectsV2Request = listObjectsV2Request;
33+
34+
this.listObjectsV2ResponseTruncated = false;
35+
this.listObjectsV2ResponseNextContinuationToken = null;
36+
this.s3ObjectIterator = null;
37+
}
38+
39+
private CompletableFuture<Void> checkListObjectsV2ResponseState() {
40+
if (this.s3ObjectIterator != null) {
41+
if (!this.s3ObjectIterator.hasNext() && this.listObjectsV2ResponseTruncated) {
42+
ListObjectsV2Request listObjectsV2ContinuationRequest = this.listObjectsV2Request.toBuilder()
43+
.continuationToken(this.listObjectsV2ResponseNextContinuationToken)
44+
.build();
45+
46+
return this.s3AsyncClient.listObjectsV2(listObjectsV2ContinuationRequest)
47+
.thenAccept(this::setListObjectsV2ResponsePaginationState);
48+
}
49+
return CompletableFuture.completedFuture(null);
50+
} else {
51+
return this.s3AsyncClient.listObjectsV2(this.listObjectsV2Request)
52+
.thenAccept(this::setListObjectsV2ResponsePaginationState);
53+
}
54+
}
55+
56+
private void setListObjectsV2ResponsePaginationState(ListObjectsV2Response listObjectsV2Response) {
57+
this.listObjectsV2ResponseTruncated = listObjectsV2Response.isTruncated();
58+
this.listObjectsV2ResponseNextContinuationToken = listObjectsV2Response.nextContinuationToken();
59+
this.s3ObjectIterator = listObjectsV2Response.contents()
60+
.iterator();
61+
}
62+
63+
@Override
64+
public boolean hasNext() {
65+
try {
66+
return this.checkListObjectsV2ResponseState()
67+
.thenApply(aVoid -> this.s3ObjectIterator.hasNext())
68+
.get(requestTimeoutDuration.getSeconds(), TimeUnit.SECONDS);
69+
} catch (InterruptedException | ExecutionException | TimeoutException e) {
70+
throw new RuntimeException(e);
71+
}
72+
}
73+
74+
@Override
75+
public S3Object next() {
76+
try {
77+
return this.checkListObjectsV2ResponseState()
78+
.thenApply(aVoid -> this.s3ObjectIterator.next())
79+
.get(requestTimeoutDuration.getSeconds(), TimeUnit.SECONDS);
80+
} catch (InterruptedException | ExecutionException | TimeoutException e) {
81+
throw new RuntimeException(e);
82+
}
83+
}
84+
}

src/main/java/me/aneeshneelam/lib/aws/s3/v2/S3ObjectIterator.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,25 +16,28 @@ public class S3ObjectIterator implements Iterator<S3Object> {
1616
private ListObjectsV2Response listObjectsV2Response;
1717
private Iterator<S3Object> s3ObjectIterator;
1818

19-
public S3ObjectIterator(S3Client s3Client, ListObjectsV2Request listObjectsV2Request) {
19+
public S3ObjectIterator(S3Client s3Client,
20+
ListObjectsV2Request listObjectsV2Request) {
21+
2022
this.s3Client = s3Client;
2123
this.listObjectsV2Request = listObjectsV2Request;
2224
}
2325

2426
private void checkListObjectsV2ResponseState() {
2527
if (this.s3ObjectIterator != null) {
26-
if (this.s3ObjectIterator.hasNext()) {
27-
return;
28-
} else if (this.listObjectsV2Response.isTruncated()) {
28+
if (!this.s3ObjectIterator.hasNext() && this.listObjectsV2Response.isTruncated()) {
2929
ListObjectsV2Request listObjectsV2ContinuationRequest = this.listObjectsV2Request.toBuilder()
3030
.continuationToken(this.listObjectsV2Response.nextContinuationToken())
3131
.build();
32+
3233
this.listObjectsV2Response = this.s3Client.listObjectsV2(listObjectsV2ContinuationRequest);
3334
this.s3ObjectIterator = this.listObjectsV2Response.contents()
3435
.iterator();
3536
}
3637
} else {
3738
this.listObjectsV2Response = this.s3Client.listObjectsV2(this.listObjectsV2Request);
39+
this.s3ObjectIterator = this.listObjectsV2Response.contents()
40+
.iterator();
3841
}
3942
}
4043

Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
import com.amazonaws.auth.AWSStaticCredentialsProvider;
2+
import com.amazonaws.auth.BasicAWSCredentials;
3+
import com.amazonaws.client.builder.AwsClientBuilder;
4+
import com.amazonaws.services.s3.AmazonS3;
5+
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
6+
import me.aneeshneelam.lib.aws.s3.v1.S3ObjectSummaryIterator;
7+
import me.aneeshneelam.lib.aws.s3.v2.AsyncS3ObjectIterator;
8+
import me.aneeshneelam.lib.aws.s3.v2.S3ObjectIterator;
9+
import org.junit.jupiter.api.Assertions;
10+
import org.junit.jupiter.api.BeforeAll;
11+
import org.junit.jupiter.api.BeforeEach;
12+
import org.junit.jupiter.api.Test;
13+
import org.junit.jupiter.api.parallel.Execution;
14+
import org.junit.jupiter.api.parallel.ExecutionMode;
15+
import org.testcontainers.containers.localstack.LocalStackContainer;
16+
import org.testcontainers.junit.jupiter.Container;
17+
import org.testcontainers.junit.jupiter.Testcontainers;
18+
import org.testcontainers.utility.DockerImageName;
19+
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
20+
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
21+
import software.amazon.awssdk.core.sync.RequestBody;
22+
import software.amazon.awssdk.regions.Region;
23+
import software.amazon.awssdk.services.s3.S3AsyncClient;
24+
import software.amazon.awssdk.services.s3.S3Client;
25+
import software.amazon.awssdk.services.s3.model.*;
26+
27+
import java.net.URI;
28+
import java.nio.charset.StandardCharsets;
29+
import java.time.Duration;
30+
import java.util.Spliterator;
31+
import java.util.Spliterators;
32+
import java.util.stream.LongStream;
33+
import java.util.stream.StreamSupport;
34+
35+
36+
@Execution(ExecutionMode.SAME_THREAD)
37+
@Testcontainers(disabledWithoutDocker = true, parallel = true)
38+
public class S3ObjectIteratorContainerTest {
39+
private static final long s3ObjectCount = 3000;
40+
private static final String s3BucketName = "aneesh-test-bucket";
41+
private static final String s3KeyPrefix = "files/";
42+
private static final String fileNameFormat = "file-%d.txt";
43+
private static final String fileContentFormat = "This is file %d";
44+
45+
public static final String localStackImageName = "localstack/localstack";
46+
public static final String localStackImageVersion = "3.6.0";
47+
public static final String localStackFullImageName = String.format("%s:%s", localStackImageName, localStackImageVersion);
48+
public static final DockerImageName localStackDockerImageName = DockerImageName.parse(localStackFullImageName);
49+
50+
@Container
51+
public static LocalStackContainer localStackContainer = new LocalStackContainer(localStackDockerImageName)
52+
.withServices(LocalStackContainer.Service.S3);
53+
54+
private static boolean createS3Bucket;
55+
private static boolean loadS3Objects;
56+
57+
private AmazonS3 amazonS3;
58+
private S3Client s3Client;
59+
private S3AsyncClient s3AsyncClient;
60+
61+
@BeforeAll
62+
public static void beforeAll() {
63+
createS3Bucket = true;
64+
loadS3Objects = true;
65+
}
66+
67+
@BeforeEach
68+
public void setUp() {
69+
String awsAccessKey = localStackContainer.getAccessKey();
70+
String awsSecretKey = localStackContainer.getSecretKey();
71+
String awsRegion = localStackContainer.getRegion();
72+
URI awsS3Uri = localStackContainer.getEndpointOverride(LocalStackContainer.Service.S3);
73+
74+
this.amazonS3 = AmazonS3ClientBuilder.standard()
75+
.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(awsS3Uri.toString(), awsRegion))
76+
.withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials(awsAccessKey, awsSecretKey)))
77+
.build();
78+
this.s3Client = S3Client.builder()
79+
.endpointOverride(awsS3Uri)
80+
.region(Region.of(awsRegion))
81+
.credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create(awsAccessKey, awsSecretKey)))
82+
.build();
83+
this.s3AsyncClient = S3AsyncClient.builder()
84+
.endpointOverride(awsS3Uri)
85+
.region(Region.of(awsRegion))
86+
.credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create(awsAccessKey, awsSecretKey)))
87+
.build();
88+
89+
if (createS3Bucket) {
90+
CreateBucketResponse response = this.s3Client.createBucket(r -> r.bucket(s3BucketName));
91+
System.out.println("Created S3 Bucket, Response: " + response.toString());
92+
createS3Bucket = false;
93+
}
94+
95+
if (loadS3Objects) {
96+
System.out.println("Uploading " + s3ObjectCount + " S3 Objects to Bucket: " + s3BucketName);
97+
LongStream.rangeClosed(1L, s3ObjectCount).forEach(i -> {
98+
String key = s3KeyPrefix + String.format(fileNameFormat, i);
99+
String content = String.format(fileContentFormat, i);
100+
PutObjectRequest request = PutObjectRequest.builder()
101+
.bucket(s3BucketName)
102+
.key(key)
103+
.build();
104+
PutObjectResponse response = this.s3Client.putObject(request, RequestBody.fromString(content, StandardCharsets.UTF_8));
105+
System.out.println("Uploaded S3 Object to Bucket: " + s3BucketName + ", Key: " + key + ", Content: " + content + ", Response: " + response.toString());
106+
});
107+
loadS3Objects = false;
108+
System.out.println("Uploaded " + s3ObjectCount + " S3 Objects to Bucket: " + s3BucketName);
109+
}
110+
}
111+
112+
@Test
113+
public void testS3ObjectIterator() {
114+
System.out.println("Testing: " + S3ObjectIterator.class.getCanonicalName());
115+
ListObjectsV2Request listObjectsV2Request = ListObjectsV2Request.builder()
116+
.bucket(s3BucketName)
117+
.prefix(s3KeyPrefix)
118+
.build();
119+
S3ObjectIterator s3ObjectIterator = new S3ObjectIterator(this.s3Client, listObjectsV2Request);
120+
long count = StreamSupport.stream(Spliterators.spliteratorUnknownSize(s3ObjectIterator, Spliterator.ORDERED), false)
121+
.count();
122+
Assertions.assertEquals(s3ObjectCount, count);
123+
}
124+
125+
@Test
126+
public void testS3ObjectIteratorAsync() {
127+
System.out.println("Testing: " + AsyncS3ObjectIterator.class.getCanonicalName());
128+
ListObjectsV2Request listObjectsV2Request = ListObjectsV2Request.builder()
129+
.bucket(s3BucketName)
130+
.prefix(s3KeyPrefix)
131+
.build();
132+
AsyncS3ObjectIterator asyncS3ObjectIterator = new AsyncS3ObjectIterator(this.s3AsyncClient, listObjectsV2Request, Duration.ofSeconds(30));
133+
long count = StreamSupport.stream(Spliterators.spliteratorUnknownSize(asyncS3ObjectIterator, Spliterator.ORDERED), false)
134+
.count();
135+
Assertions.assertEquals(s3ObjectCount, count);
136+
}
137+
138+
@Test
139+
public void testS3ObjectSummaryIterator() {
140+
System.out.println("Testing: " + S3ObjectSummaryIterator.class.getCanonicalName());
141+
com.amazonaws.services.s3.model.ListObjectsV2Request listObjectsV2Request = new com.amazonaws.services.s3.model.ListObjectsV2Request()
142+
.withBucketName(s3BucketName)
143+
.withPrefix(s3KeyPrefix);
144+
S3ObjectSummaryIterator s3ObjectSummaryIterator = new S3ObjectSummaryIterator(this.amazonS3, listObjectsV2Request);
145+
long count = StreamSupport.stream(Spliterators.spliteratorUnknownSize(s3ObjectSummaryIterator, Spliterator.ORDERED), false)
146+
.count();
147+
Assertions.assertEquals(s3ObjectCount, count);
148+
}
149+
}

0 commit comments

Comments
 (0)