Skip to content

Conversation

@Samrat002
Copy link
Contributor

@Samrat002 Samrat002 commented Nov 3, 2025

DESCRIPTION

Native S3 FileSystem

This module provides a native S3 filesystem implementation for Apache Flink using AWS SDK v2.

Overview

The Native S3 FileSystem is a direct implementation of Flink's FileSystem interface using AWS SDK v2, without Hadoop dependencies. It provides exactly-once semantics for checkpointing and file sinks through S3 multipart uploads.

Usage

Add this module to Flink's plugins directory:

mkdir -p $FLINK_HOME/plugins/s3-fs-native
cp flink-s3-fs-native-*.jar $FLINK_HOME/plugins/s3-fs-native/

Configure S3 credentials in conf/config.yaml:

s3.access-key: YOUR_ACCESS_KEY
s3.secret-key: YOUR_SECRET_KEY
s3.endpoint: https://s3.amazonaws.com  # Optional, defaults to AWS

Use S3 paths in your Flink application:

env.getCheckpointConfig().setCheckpointStorage("s3://my-bucket/checkpoints");

DataStream<String> input = env.readTextFile("s3://my-bucket/input");
input.sinkTo(FileSink.forRowFormat(new Path("s3://my-bucket/output"), 
                                    new SimpleStringEncoder<>()).build());

Configuration Options

Core Settings

Key Default Description
s3.access-key (none) AWS access key
s3.secret-key (none) AWS secret key
s3.region (auto-detect) AWS region (auto-detected via AWS_REGION, ~/.aws/config, EC2 metadata)
s3.endpoint (none) Custom S3 endpoint (for MinIO, LocalStack, etc.)
s3.path-style-access false Use path-style access (auto-enabled for custom endpoints)
s3.upload.min.part.size 5242880 Minimum part size for multipart uploads (5MB)
s3.upload.max.concurrent.uploads CPU cores Maximum concurrent uploads per stream
s3.entropy.key (none) Key for entropy injection in paths
s3.entropy.length 4 Length of entropy string
s3.bulk-copy.enabled true Enable bulk copy operations
s3.async.enabled true Enable async read/write with TransferManager
s3.read.buffer.size 262144 (256KB) Read buffer size per stream (64KB - 4MB)

Server-Side Encryption (SSE)

Key Default Description
s3.sse.type none Encryption type: none, sse-s3 (AES256), sse-kms (AWS KMS)
s3.sse.kms.key-id (none) KMS key ID/ARN/alias for SSE-KMS (uses default aws/s3 key if not specified)

IAM Assume Role

Key Default Description
s3.assume-role.arn (none) ARN of the IAM role to assume
s3.assume-role.external-id (none) External ID for cross-account access
s3.assume-role.session-name flink-s3-session Session name for the assumed role
s3.assume-role.session-duration 3600 Session duration in seconds (900-43200)

Server-Side Encryption (SSE)

The filesystem supports server-side encryption for data at rest:

SSE-S3 (S3-Managed Keys)

Amazon S3 manages the encryption keys. Simplest option with no additional configuration.

s3.sse.type: sse-s3

All objects will be encrypted with AES-256 using keys managed by S3.

SSE-KMS (AWS KMS-Managed Keys)

Use AWS Key Management Service for encryption key management. Provides additional security features like key rotation, audit trails, and fine-grained access control.

Using the default aws/s3 key:

s3.sse.type: sse-kms

Using a custom KMS key:

s3.sse.type: sse-kms
s3.sse.kms.key-id: arn:aws:kms:us-east-1:123456789012:key/12345678-1234-1234-1234-123456789abc
# Or use an alias:
# s3.sse.kms.key-id: alias/my-s3-encryption-key

Note: Ensure the IAM role/user has kms:Encrypt and kms:GenerateDataKey permissions on the KMS key.

IAM Assume Role

For cross-account access or temporary elevated permissions, configure an IAM role to assume:

Basic Assume Role

s3.assume-role.arn: arn:aws:iam::123456789012:role/S3AccessRole

Cross-Account Access with External ID

For enhanced security when granting access to third parties:

s3.assume-role.arn: arn:aws:iam::123456789012:role/CrossAccountS3Role
s3.assume-role.external-id: your-secret-external-id
s3.assume-role.session-name: flink-cross-account-session
s3.assume-role.session-duration: 3600  # 1 hour

How It Works

  1. Flink uses base credentials (access key, environment, or IAM role) to call STS AssumeRole
  2. STS returns temporary credentials (access key, secret key, session token)
  3. All S3 operations use the assumed role's permissions
  4. Credentials are automatically refreshed before expiration

IAM Policy Example for the Assumed Role:

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "s3:GetObject",
        "s3:PutObject",
        "s3:DeleteObject",
        "s3:ListBucket"
      ],
      "Resource": [
        "arn:aws:s3:::my-bucket",
        "arn:aws:s3:::my-bucket/*"
      ]
    }
  ]
}

Trust Policy for Cross-Account Access:

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "AWS": "arn:aws:iam::SOURCE_ACCOUNT_ID:root"
      },
      "Action": "sts:AssumeRole",
      "Condition": {
        "StringEquals": {
          "sts:ExternalId": "your-secret-external-id"
        }
      }
    }
  ]
}

MinIO and S3-Compatible Storage

The filesystem auto-detects custom endpoints and configures appropriate settings:

s3.access-key: minioadmin
s3.secret-key: minioadmin  
s3.endpoint: http://localhost:9000
s3.path-style-access: true  # Auto-enabled for custom endpoints

MinIO-specific optimizations are applied automatically:

  • Path-style access enabled
  • Chunked encoding disabled (compatibility)
  • Checksum validation disabled (compatibility)

Memory Optimization for Large Files

The filesystem is optimized to handle large files without OOM errors:

Streaming Reads (No Buffering)

  • Files are streamed chunk-by-chunk, not loaded into memory
  • Configurable read buffer (default 256KB) prevents memory spikes
  • Only small buffer held in memory at any time

Configuration for Memory Efficiency

# Read buffer: smaller = less memory, larger = better throughput
s3.read.buffer.size: 262144  # 256KB (default)
# For memory-constrained environments: 65536 (64KB)
# For high-throughput: 1048576 (1MB)

Memory Calculation Per Parallel Read:

  • Buffer size × concurrent reads = total memory
  • Example: 256KB buffer × 16 parallel readers = 4MB total
  • This allows processing GB-sized files with MB-sized memory

OOM Prevention Strategies

  1. Use smaller read buffers (64-128KB) for very large files
  2. Reduce parallelism to limit concurrent S3 readers
  3. Enable managed memory for Flink state backend
  4. Monitor: s3.read.buffer.size × parallelism = peak memory

Configuration:

s3.async.enabled: true  # Default: enabled

When enabled, file uploads automatically use TransferManager for:

  • Large file uploads (automatic multipart handling)
  • Checkpoint data writes
  • Recoverable output stream operations

Checkpointing

Configure checkpoint storage in conf/config.yaml:

state.checkpoints.dir: s3://my-bucket/checkpoints
execution.checkpointing.interval: 10s
execution.checkpointing.mode: EXACTLY_ONCE

Or programmatically:

env.enableCheckpointing(10000);
env.getCheckpointConfig().setCheckpointStorage("s3://my-bucket/checkpoints");

Entropy Injection

For high-throughput checkpointing to avoid S3 hot partitions:

s3.entropy.key: _entropy_
s3.entropy.length: 4

Paths like s3://bucket/_entropy_/checkpoints will be expanded to s3://bucket/af7e/checkpoints with random characters.

Implementation Details

The filesystem uses:

  • AWS SDK v2 for all S3 operations
  • Multipart uploads for recoverable writes and large files
  • S3 Transfer Manager for bulk copy operations
  • Separate sync and async clients for optimal performance

Key classes:

  • NativeS3FileSystem - Main FileSystem implementation
  • NativeS3RecoverableWriter - Exactly-once writer using multipart uploads
  • S3ClientProvider - Manages S3 client lifecycle
  • NativeS3AccessHelper - Low-level S3 operations

Building

mvn clean package

Testing with MinIO

# Start MinIO
docker run -d -p 9000:9000 -p 9001:9001 \
  -e "MINIO_ROOT_USER=minioadmin" \
  -e "MINIO_ROOT_PASSWORD=minioadmin" \
  minio/minio server /data --console-address ":9001"

# Create bucket
mc alias set local http://localhost:9000 minioadmin minioadmin
mc mb local/test-bucket

# Run Flink with MinIO
export FLINK_HOME=/path/to/flink
cat > $FLINK_HOME/conf/config.yaml <<EOF
s3.endpoint: http://localhost:9000
s3.access-key: minioadmin
s3.secret-key: minioadmin
s3.path-style-access: true
EOF

$FLINK_HOME/bin/flink run YourJob.jar

Delegation Tokens

The filesystem supports delegation tokens for secure multi-tenant deployments. The delegation token service name is s3-native to avoid conflicts with other S3 filesystem implementations.

Configure delegation tokens:

security.delegation.token.provider.s3-native.enabled: true
security.delegation.token.provider.s3-native.access-key: YOUR_KEY
security.delegation.token.provider.s3-native.secret-key: YOUR_SECRET
security.delegation.token.provider.s3-native.region: us-east-1

TESTING

Setup And Testing Details

  1. POC is tested properly with s3 bucket and minio setup in local docker setup.
  2. Used the wordCount Application to read from the s3 input bucket and write to s3 output bucket.
  3. Used streaming mode and enabled checkpointing to the MinIO bucket

Callout :

  1. Very minimal Unit test added in the patch.
  2. Only tested with the WordCount Application.

Log Snippet from testing

Reading the input data

2025-11-09 13:53:15,111 INFO  LocalityAwareSplitAssigner - Assigning local split to requesting host 'localhost': FileSourceSplit: s3://flink-wordcount-test-1762676252/input/wordcount_test_30mb.txt [0, 31465656)
2025-11-09 13:53:15,113 INFO  StaticFileSplitEnumerator - Assigned split to subtask 4 : FileSourceSplit: s3://flink-wordcount-test-1762676252/input/wordcount_test_30mb.txt [0, 31465656)

Small File Async Upload (< 5MB)

2025-11-09 13:53:19,314 INFO  NativeS3AccessHelper - Starting async upload with TransferManager - key: output/wordcount_results/2025-11-09--13/part-8aff2425-e334-474d-b75c-5ae914972b52-1/.incomplete/.../04c8cbcd-de95-46ae-871e-b40178fc1819, size: 550505 bytes
2025-11-09 13:53:20,732 INFO  NativeS3AccessHelper - Async upload completed successfully - key: ..., eTag: "390b8dc43a25b579dc4299cd6cef5246"

Async upload completed in ~1.4 seconds for 550KB file

2025-11-09 13:53:19,797 INFO  NativeS3AccessHelper - Starting async upload with TransferManager - key: output/wordcount_results/2025-11-09--13/part-57ab617d-596c-47d1-989f-e8578b217202-1/.incomplete/.../7dd5f82d-676d-472a-a0f9-8a94c2a91404, size: 184842 bytes
2025-11-09 13:53:20,860 INFO  NativeS3AccessHelper - Async upload completed successfully - key: ..., eTag: "623fd4e5e820b0774d88838dc8e218f0"

Multipart Upload Operations (Large Files)

2025-11-09 13:53:22,640 INFO  NativeS3AccessHelper - Completing multipart upload - key: output/wordcount_results/2025-11-09--13/part-57ab617d-596c-47d1-989f-e8578b217202-0, parts: 1, size: 1048580 bytes
2025-11-09 13:53:22,868 INFO  NativeS3AccessHelper - Multipart upload completed - key: output/wordcount_results/2025-11-09--13/part-57ab617d-596c-47d1-989f-e8578b217202-0, eTag: "a15f5433e6cab5aabc4825f03dbe23c0-1"

Checkpoint Multipart Uploads to S3

2025-11-09 13:53:22,438 INFO  NativeS3AccessHelper - Completing multipart upload - key: checkpoints/762fa149908c89d1195f73b565722b64/chk-1/_metadata, parts: 1, size: 105667 bytes
2025-11-09 13:53:22,633 INFO  NativeS3AccessHelper - Multipart upload completed - key: checkpoints/762fa149908c89d1195f73b565722b64/chk-1/_metadata, eTag: "a789024af1446d2c797e223c34c86617-1"

2025-11-09 13:53:33,221 INFO  NativeS3AccessHelper - Completing multipart upload - key: checkpoints/762fa149908c89d1195f73b565722b64/chk-2/_metadata, parts: 1, size: 122489 bytes
2025-11-09 13:53:33,401 INFO  NativeS3AccessHelper - Multipart upload completed - key: checkpoints/762fa149908c89d1195f73b565722b64/chk-2/_metadata, eTag: "f5c2377f17dc7aee80622371d190be9c-1"

Outcome

Final Output Results

$ aws s3 ls s3://flink-wordcount-test-1762676252/output/wordcount_results/ --recursive | wc -l
65
$ aws s3 ls s3://flink-wordcount-test-1762676252/output/wordcount_results/ --recursive | awk '{sum+=$3} END {print "Total size:", sum/1024/1024 "MB"}'
Total size: 61.24 MB

Able to verify read, write and checkpointing using the native-s3-filesystem

Jobmanager log: https://drive.google.com/file/d/1lfEJRn9F-r72YgU_uyyQFlEg2VAN_3qJ/view?usp=sharing

TaskManager log :
taskmanager-1.log

Screenshot 2025-11-09 at 2 46 39 PM Screenshot 2025-11-09 at 2 46 50 PM Screenshot 2025-11-09 at 2 47 15 PM Screenshot 2025-11-03 at 1 19 35 AM Screenshot 2025-11-03 at 1 19 59 AM Screenshot 2025-11-03 at 1 20 43 AM Screenshot 2025-11-03 at 1 20 52 AM Screenshot 2025-11-03 at 1 21 10 AM Screenshot 2025-11-03 at 1 22 27 AM

@Samrat002 Samrat002 changed the title [FLINK-38592] Native Flink S3 FileSystem [FLINK-38592] [POC] Native Flink S3 FileSystem Nov 3, 2025
@Samrat002
Copy link
Contributor Author

@pnowojski PTAL whenever a free cycle is available

Thank you

@flinkbot
Copy link
Collaborator

flinkbot commented Nov 3, 2025

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@Samrat002 Samrat002 marked this pull request as ready for review November 5, 2025 06:16
@Samrat002 Samrat002 force-pushed the native-fs-s3 branch 3 times, most recently from a556415 to bcdcf46 Compare November 8, 2025 17:28
@gaborgsomogyi
Copy link
Contributor

I've just picked this up and intend to test it in-depth. At the first touch without any special config entries the app blows up with invalid region. After some digging I've found out the following:

Hadoop Connector (S3A with AWS SDK v1)

The Hadoop connector uses AWS SDK v1 which has automatic region detection through a comprehensive fallback chain:

  1. Explicit Configuration: Checks fs.s3a.endpoint.region Hadoop configuration
  2. Environment Variables: AWS_REGION or AWS_DEFAULT_REGION
  3. EC2 Instance Metadata: Queries EC2 metadata service if running on AWS
  4. AWS Config/Credentials Files: ~/.aws/config region settings
  5. Bucket Location API: As a last resort, calls GetBucketLocation API to determine the bucket's region

This is handled by AWS SDK v1's DefaultAwsRegionProviderChain class, which is automatically used when no region is explicitly configured.

Native Connector (AWS SDK v2)

The Native connector uses AWS SDK v2 which is more strict:

  • Hardcoded default: us-east-1 (as we saw in NativeS3FileSystemFactory.java:63)
  • No automatic bucket location detection: SDK v2 does not call GetBucketLocation API by default
  • Requires explicit configuration: Must set s3.region in Flink config

I think we must use DefaultAwsRegionProviderChain in the native lib too.

@litiliu
Copy link

litiliu commented Nov 28, 2025

Great Job! It would be great to update/add the related doc within this PR

@gaborgsomogyi
Copy link
Contributor

gaborgsomogyi commented Dec 5, 2025

I think s3a FS support missing. See hadoop factory:

/** Simple factory for the S3 file system, registered for the <tt>s3a://</tt> scheme. */
public class S3AFileSystemFactory extends S3FileSystemFactory {
    @Override
    public String getScheme() {
        return "s3a";
    }
}

@Samrat002
Copy link
Contributor Author

Thanks, @gaborgsomogyi, for the feedback and for trying out the patch. I’ll address the review comments.

@gaborgsomogyi
Copy link
Contributor

The auto region detection code looks good. Intended to test it on Monday

@gaborgsomogyi
Copy link
Contributor

Not sure all changes are in place but I think s3a://foo/bar is still not supported, right?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants