2929import org .apache .flink .core .fs .RecoverableWriter ;
3030import org .apache .flink .fs .s3native .writer .NativeS3AccessHelper ;
3131import org .apache .flink .fs .s3native .writer .NativeS3RecoverableWriter ;
32+ import org .apache .flink .util .AutoCloseableAsync ;
3233import org .apache .flink .util .StringUtils ;
3334
3435import org .slf4j .Logger ;
5152import java .net .URI ;
5253import java .util .ArrayList ;
5354import java .util .List ;
55+ import java .util .concurrent .CompletableFuture ;
5456import java .util .concurrent .ThreadLocalRandom ;
57+ import java .util .concurrent .TimeUnit ;
58+ import java .util .concurrent .atomic .AtomicBoolean ;
5559
60+ /** Native S3 FileSystem implementation using AWS SDK v2. */
5661public class NativeS3FileSystem extends FileSystem
57- implements EntropyInjectingFileSystem , PathsCopyingFileSystem {
62+ implements EntropyInjectingFileSystem , PathsCopyingFileSystem , AutoCloseableAsync {
5863
5964 private static final Logger LOG = LoggerFactory .getLogger (NativeS3FileSystem .class );
6065
@@ -73,6 +78,7 @@ public class NativeS3FileSystem extends FileSystem
7378 @ Nullable private final NativeS3BulkCopyHelper bulkCopyHelper ;
7479 private final boolean useAsyncOperations ;
7580 private final int readBufferSize ;
81+ private final AtomicBoolean closed = new AtomicBoolean (false );
7682
7783 public NativeS3FileSystem (
7884 S3ClientProvider clientProvider ,
@@ -95,8 +101,6 @@ public NativeS3FileSystem(
95101 this .maxConcurrentUploadsPerStream = maxConcurrentUploadsPerStream ;
96102 this .useAsyncOperations = useAsyncOperations ;
97103 this .readBufferSize = readBufferSize ;
98-
99- // Create S3 Access Helper with async operations support
100104 this .s3AccessHelper =
101105 new NativeS3AccessHelper (
102106 clientProvider .getS3Client (),
@@ -136,6 +140,7 @@ public Path getHomeDirectory() {
136140
137141 @ Override
138142 public FileStatus getFileStatus (Path path ) throws IOException {
143+ checkNotClosed ();
139144 String key = NativeS3AccessHelper .extractKey (path );
140145 S3Client s3Client = clientProvider .getS3Client ();
141146
@@ -146,33 +151,24 @@ public FileStatus getFileStatus(Path path) throws IOException {
146151 HeadObjectRequest .builder ().bucket (bucketName ).key (key ).build ();
147152
148153 HeadObjectResponse response = s3Client .headObject (request );
149-
150- // Handle null fields (can happen with S3-compatible storage or during concurrent
151- // operations)
152154 Long contentLength = response .contentLength ();
153155
154- // IMPORTANT: In S3, a successful HeadObject with null contentLength means
156+ // In S3, a successful HeadObject with null contentLength means
155157 // this is a directory marker (prefix), not an actual file
156158 if (contentLength == null || contentLength == 0 ) {
157159 LOG .debug (
158160 "HeadObject returned null/zero content length, verifying if directory: {}" ,
159161 key );
160- // Verify it's actually a directory by listing with this prefix
161162 ListObjectsV2Request listRequest =
162163 ListObjectsV2Request .builder ()
163164 .bucket (bucketName )
164165 .prefix (key .endsWith ("/" ) ? key : key + "/" )
165166 .maxKeys (1 )
166167 .build ();
167-
168168 ListObjectsV2Response listResponse = s3Client .listObjectsV2 (listRequest );
169-
170169 if (listResponse .contents ().isEmpty () && !listResponse .hasCommonPrefixes ()) {
171- // Not a file and not a directory - doesn't exist
172170 throw new FileNotFoundException ("File not found: " + path );
173171 }
174-
175- LOG .debug ("Confirmed {} is a directory" , key );
176172 return new S3FileStatus (0 , 0 , 0 , 0 , true , path );
177173 }
178174
@@ -214,16 +210,13 @@ public FileStatus getFileStatus(Path path) throws IOException {
214210 ? e .awsErrorDetails ().errorMessage ()
215211 : e .getMessage ();
216212
217- // Log with appropriate context for troubleshooting
218213 LOG .error (
219214 "S3 error getting file status for s3://{}/{} - StatusCode: {}, ErrorCode: {}, Message: {}" ,
220215 bucketName ,
221216 key ,
222217 e .statusCode (),
223218 errorCode ,
224219 errorMsg );
225-
226- // Provide hints for common errors
227220 if (e .statusCode () == 403 ) {
228221 LOG .error (
229222 "Access denied (403). Check credentials, bucket policy, and bucket existence for s3://{}/{}" ,
@@ -255,21 +248,16 @@ public FSDataInputStream open(Path path, int bufferSize) throws IOException {
255248
256249 @ Override
257250 public FSDataInputStream open (Path path ) throws IOException {
251+ checkNotClosed ();
258252 String key = NativeS3AccessHelper .extractKey (path );
259253 S3Client s3Client = clientProvider .getS3Client ();
260254 long fileSize = getFileStatus (path ).getLen ();
261-
262- LOG .debug (
263- "Opening S3 file - key: {}, size: {} MB, buffer: {} KB" ,
264- key ,
265- fileSize / (1024 * 1024 ),
266- readBufferSize / 1024 );
267-
268255 return new NativeS3InputStream (s3Client , bucketName , key , fileSize , readBufferSize );
269256 }
270257
271258 @ Override
272259 public FileStatus [] listStatus (Path path ) throws IOException {
260+ checkNotClosed ();
273261 String key = NativeS3AccessHelper .extractKey (path );
274262 if (!key .isEmpty () && !key .endsWith ("/" )) {
275263 key = key + "/" ;
@@ -318,6 +306,7 @@ public FileStatus[] listStatus(Path path) throws IOException {
318306
319307 @ Override
320308 public boolean delete (Path path , boolean recursive ) throws IOException {
309+ checkNotClosed ();
321310 String key = NativeS3AccessHelper .extractKey (path );
322311 S3Client s3Client = clientProvider .getS3Client ();
323312
@@ -356,6 +345,7 @@ public boolean mkdirs(Path path) throws IOException {
356345
357346 @ Override
358347 public FSDataOutputStream create (Path path , WriteMode overwriteMode ) throws IOException {
348+ checkNotClosed ();
359349 if (overwriteMode == WriteMode .NO_OVERWRITE ) {
360350 try {
361351 if (exists (path )) {
@@ -376,10 +366,10 @@ public FSDataOutputStream create(Path path, WriteMode overwriteMode) throws IOEx
376366
377367 @ Override
378368 public boolean rename (Path src , Path dst ) throws IOException {
369+ checkNotClosed ();
379370 String srcKey = NativeS3AccessHelper .extractKey (src );
380371 String dstKey = NativeS3AccessHelper .extractKey (dst );
381372 S3Client s3Client = clientProvider .getS3Client ();
382-
383373 try {
384374 CopyObjectRequest copyRequest =
385375 CopyObjectRequest .builder ()
@@ -388,14 +378,10 @@ public boolean rename(Path src, Path dst) throws IOException {
388378 .destinationBucket (bucketName )
389379 .destinationKey (dstKey )
390380 .build ();
391-
392381 s3Client .copyObject (copyRequest );
393-
394382 DeleteObjectRequest deleteRequest =
395383 DeleteObjectRequest .builder ().bucket (bucketName ).key (srcKey ).build ();
396-
397384 s3Client .deleteObject (deleteRequest );
398-
399385 return true ;
400386 } catch (S3Exception e ) {
401387 throw new IOException ("Failed to rename " + src + " to " + dst , e );
@@ -429,6 +415,7 @@ public void copyFiles(
429415 List <CopyRequest > requests ,
430416 org .apache .flink .core .fs .ICloseableRegistry closeableRegistry )
431417 throws IOException {
418+ checkNotClosed ();
432419 if (bulkCopyHelper == null ) {
433420 throw new UnsupportedOperationException (
434421 "Bulk copy not enabled. Set s3.bulk-copy.enabled=true" );
@@ -438,11 +425,66 @@ public void copyFiles(
438425
439426 @ Override
440427 public RecoverableWriter createRecoverableWriter () throws IOException {
428+ checkNotClosed ();
441429 if (s3AccessHelper == null ) {
442430 throw new UnsupportedOperationException ("Recoverable writer not available" );
443431 }
444-
445432 return NativeS3RecoverableWriter .writer (
446433 s3AccessHelper , localTmpDir , s3uploadPartSize , maxConcurrentUploadsPerStream );
447434 }
435+
436+ @ Override
437+ public CompletableFuture <Void > closeAsync () {
438+ if (!closed .compareAndSet (false , true )) {
439+ return CompletableFuture .completedFuture (null );
440+ }
441+
442+ LOG .info ("Starting async close of Native S3 FileSystem for bucket: {}" , bucketName );
443+ return CompletableFuture .runAsync (
444+ () -> {
445+ if (bulkCopyHelper != null ) {
446+ try {
447+ bulkCopyHelper .close ();
448+ LOG .debug ("Bulk copy helper closed" );
449+ } catch (Exception e ) {
450+ LOG .warn ("Error closing bulk copy helper" , e );
451+ }
452+ }
453+
454+ LOG .info ("Native S3 FileSystem closed for bucket: {}" , bucketName );
455+ })
456+ .thenCompose (
457+ ignored -> {
458+ if (clientProvider != null ) {
459+ return clientProvider
460+ .closeAsync ()
461+ .whenComplete (
462+ (result , error ) -> {
463+ if (error != null ) {
464+ LOG .warn (
465+ "Error closing S3 client provider" ,
466+ error );
467+ } else {
468+ LOG .debug ("S3 client provider closed" );
469+ }
470+ });
471+ }
472+ return CompletableFuture .completedFuture (null );
473+ })
474+ .orTimeout (60 , TimeUnit .SECONDS )
475+ .exceptionally (
476+ ex -> {
477+ LOG .error (
478+ "FileSystem close timed out after 60 seconds for bucket: {}" ,
479+ bucketName ,
480+ ex );
481+ return null ;
482+ });
483+ }
484+
485+ private void checkNotClosed () throws IOException {
486+ if (closed .get ()) {
487+ throw new IOException ("FileSystem has been closed" );
488+ }
489+ }
448490}
0 commit comments