-
Notifications
You must be signed in to change notification settings - Fork 13.8k
[FLINK-38592] [POC] Native Flink S3 FileSystem #27187
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
|
@pnowojski PTAL whenever a free cycle is available Thank you |
CI report:
Bot commandsThe @flinkbot bot supports the following commands:
|
a556415 to
bcdcf46
Compare
|
I've just picked this up and intend to test it in-depth. At the first touch without any special config entries the app blows up with invalid region. After some digging I've found out the following: Hadoop Connector (S3A with AWS SDK v1) The Hadoop connector uses AWS SDK v1 which has automatic region detection through a comprehensive fallback chain:
This is handled by AWS SDK v1's Native Connector (AWS SDK v2) The Native connector uses AWS SDK v2 which is more strict:
I think we must use |
|
Great Job! It would be great to update/add the related doc within this PR |
|
I think |
|
Thanks, @gaborgsomogyi, for the feedback and for trying out the patch. I’ll address the review comments. |
9644892 to
b0ee595
Compare
|
The auto region detection code looks good. Intended to test it on Monday |
481b28d to
7afeca8
Compare
|
Not sure all changes are in place but I think |
DESCRIPTION
Native S3 FileSystem
This module provides a native S3 filesystem implementation for Apache Flink using AWS SDK v2.
Overview
The Native S3 FileSystem is a direct implementation of Flink's FileSystem interface using AWS SDK v2, without Hadoop dependencies. It provides exactly-once semantics for checkpointing and file sinks through S3 multipart uploads.
Usage
Add this module to Flink's plugins directory:
Configure S3 credentials in
conf/config.yaml:Use S3 paths in your Flink application:
Configuration Options
Core Settings
Server-Side Encryption (SSE)
none,sse-s3(AES256),sse-kms(AWS KMS)IAM Assume Role
Server-Side Encryption (SSE)
The filesystem supports server-side encryption for data at rest:
SSE-S3 (S3-Managed Keys)
Amazon S3 manages the encryption keys. Simplest option with no additional configuration.
All objects will be encrypted with AES-256 using keys managed by S3.
SSE-KMS (AWS KMS-Managed Keys)
Use AWS Key Management Service for encryption key management. Provides additional security features like key rotation, audit trails, and fine-grained access control.
Using the default aws/s3 key:
Using a custom KMS key:
Note: Ensure the IAM role/user has
kms:Encryptandkms:GenerateDataKeypermissions on the KMS key.IAM Assume Role
For cross-account access or temporary elevated permissions, configure an IAM role to assume:
Basic Assume Role
Cross-Account Access with External ID
For enhanced security when granting access to third parties:
How It Works
IAM Policy Example for the Assumed Role:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "s3:GetObject", "s3:PutObject", "s3:DeleteObject", "s3:ListBucket" ], "Resource": [ "arn:aws:s3:::my-bucket", "arn:aws:s3:::my-bucket/*" ] } ] }Trust Policy for Cross-Account Access:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "AWS": "arn:aws:iam::SOURCE_ACCOUNT_ID:root" }, "Action": "sts:AssumeRole", "Condition": { "StringEquals": { "sts:ExternalId": "your-secret-external-id" } } } ] }MinIO and S3-Compatible Storage
The filesystem auto-detects custom endpoints and configures appropriate settings:
MinIO-specific optimizations are applied automatically:
Memory Optimization for Large Files
The filesystem is optimized to handle large files without OOM errors:
Streaming Reads (No Buffering)
Configuration for Memory Efficiency
Memory Calculation Per Parallel Read:
OOM Prevention Strategies
s3.read.buffer.size×parallelism= peak memoryConfiguration:
When enabled, file uploads automatically use TransferManager for:
Checkpointing
Configure checkpoint storage in
conf/config.yaml:Or programmatically:
Entropy Injection
For high-throughput checkpointing to avoid S3 hot partitions:
Paths like
s3://bucket/_entropy_/checkpointswill be expanded tos3://bucket/af7e/checkpointswith random characters.Implementation Details
The filesystem uses:
Key classes:
NativeS3FileSystem- Main FileSystem implementationNativeS3RecoverableWriter- Exactly-once writer using multipart uploadsS3ClientProvider- Manages S3 client lifecycleNativeS3AccessHelper- Low-level S3 operationsBuilding
Testing with MinIO
Delegation Tokens
The filesystem supports delegation tokens for secure multi-tenant deployments. The delegation token service name is
s3-nativeto avoid conflicts with other S3 filesystem implementations.Configure delegation tokens:
TESTING
Setup And Testing Details
Callout :
Log Snippet from testing
Reading the input data
Small File Async Upload (< 5MB)
Async upload completed in ~1.4 seconds for 550KB file
Multipart Upload Operations (Large Files)
Checkpoint Multipart Uploads to S3
Outcome
Final Output Results
Able to verify read, write and checkpointing using the native-s3-filesystem
Jobmanager log: https://drive.google.com/file/d/1lfEJRn9F-r72YgU_uyyQFlEg2VAN_3qJ/view?usp=sharing
TaskManager log :
taskmanager-1.log