Skip to content

Commit 7afeca8

Browse files
committed
[FLINK-38592] Native s3 FileSystem
1 parent 4072808 commit 7afeca8

27 files changed

+4549
-0
lines changed
Lines changed: 334 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,334 @@
1+
# Native S3 FileSystem
2+
3+
This module provides a native S3 filesystem implementation for Apache Flink using AWS SDK v2.
4+
5+
## Overview
6+
7+
The Native S3 FileSystem is a direct implementation of Flink's FileSystem interface using AWS SDK v2, without Hadoop dependencies. It provides exactly-once semantics for checkpointing and file sinks through S3 multipart uploads.
8+
9+
## Usage
10+
11+
Add this module to Flink's plugins directory:
12+
13+
```bash
14+
mkdir -p $FLINK_HOME/plugins/s3-fs-native
15+
cp flink-s3-fs-native-*.jar $FLINK_HOME/plugins/s3-fs-native/
16+
```
17+
18+
Configure S3 credentials in `conf/config.yaml`:
19+
20+
```yaml
21+
s3.access-key: YOUR_ACCESS_KEY
22+
s3.secret-key: YOUR_SECRET_KEY
23+
s3.endpoint: https://s3.amazonaws.com # Optional, defaults to AWS
24+
```
25+
26+
Use S3 paths in your Flink application:
27+
28+
```java
29+
env.getCheckpointConfig().setCheckpointStorage("s3://my-bucket/checkpoints");
30+
31+
DataStream<String> input = env.readTextFile("s3://my-bucket/input");
32+
input.sinkTo(FileSink.forRowFormat(new Path("s3://my-bucket/output"),
33+
new SimpleStringEncoder<>()).build());
34+
```
35+
36+
## Configuration Options
37+
38+
### Core Settings
39+
40+
| Key | Default | Description |
41+
|-----|---------|-------------|
42+
| s3.access-key | (none) | AWS access key |
43+
| s3.secret-key | (none) | AWS secret key |
44+
| s3.region | (auto-detect) | AWS region (auto-detected via AWS_REGION, ~/.aws/config, EC2 metadata) |
45+
| s3.endpoint | (none) | Custom S3 endpoint (for MinIO, LocalStack, etc.) |
46+
| s3.path-style-access | false | Use path-style access (auto-enabled for custom endpoints) |
47+
| s3.upload.min.part.size | 5242880 | Minimum part size for multipart uploads (5MB) |
48+
| s3.upload.max.concurrent.uploads | CPU cores | Maximum concurrent uploads per stream |
49+
| s3.entropy.key | (none) | Key for entropy injection in paths |
50+
| s3.entropy.length | 4 | Length of entropy string |
51+
| s3.bulk-copy.enabled | true | Enable bulk copy operations |
52+
| s3.async.enabled | true | Enable async read/write with TransferManager |
53+
| s3.read.buffer.size | 262144 (256KB) | Read buffer size per stream (64KB - 4MB) |
54+
55+
### Server-Side Encryption (SSE)
56+
57+
| Key | Default | Description |
58+
|-----|---------|-------------|
59+
| s3.sse.type | none | Encryption type: `none`, `sse-s3` (AES256), `sse-kms` (AWS KMS) |
60+
| s3.sse.kms.key-id | (none) | KMS key ID/ARN/alias for SSE-KMS (uses default aws/s3 key if not specified) |
61+
62+
### IAM Assume Role
63+
64+
| Key | Default | Description |
65+
|-----|---------|-------------|
66+
| s3.assume-role.arn | (none) | ARN of the IAM role to assume |
67+
| s3.assume-role.external-id | (none) | External ID for cross-account access |
68+
| s3.assume-role.session-name | flink-s3-session | Session name for the assumed role |
69+
| s3.assume-role.session-duration | 3600 | Session duration in seconds (900-43200) |
70+
71+
## Server-Side Encryption (SSE)
72+
73+
The filesystem supports server-side encryption for data at rest:
74+
75+
### SSE-S3 (S3-Managed Keys)
76+
77+
Amazon S3 manages the encryption keys. Simplest option with no additional configuration.
78+
79+
```yaml
80+
s3.sse.type: sse-s3
81+
```
82+
83+
All objects will be encrypted with AES-256 using keys managed by S3.
84+
85+
### SSE-KMS (AWS KMS-Managed Keys)
86+
87+
Use AWS Key Management Service for encryption key management. Provides additional security features like key rotation, audit trails, and fine-grained access control.
88+
89+
**Using the default aws/s3 key:**
90+
```yaml
91+
s3.sse.type: sse-kms
92+
```
93+
94+
**Using a custom KMS key:**
95+
```yaml
96+
s3.sse.type: sse-kms
97+
s3.sse.kms.key-id: arn:aws:kms:us-east-1:123456789012:key/12345678-1234-1234-1234-123456789abc
98+
# Or use an alias:
99+
# s3.sse.kms.key-id: alias/my-s3-encryption-key
100+
```
101+
102+
**Note:** Ensure the IAM role/user has `kms:Encrypt` and `kms:GenerateDataKey` permissions on the KMS key.
103+
104+
## IAM Assume Role
105+
106+
For cross-account access or temporary elevated permissions, configure an IAM role to assume:
107+
108+
### Basic Assume Role
109+
110+
```yaml
111+
s3.assume-role.arn: arn:aws:iam::123456789012:role/S3AccessRole
112+
```
113+
114+
### Cross-Account Access with External ID
115+
116+
For enhanced security when granting access to third parties:
117+
118+
```yaml
119+
s3.assume-role.arn: arn:aws:iam::123456789012:role/CrossAccountS3Role
120+
s3.assume-role.external-id: your-secret-external-id
121+
s3.assume-role.session-name: flink-cross-account-session
122+
s3.assume-role.session-duration: 3600 # 1 hour
123+
```
124+
125+
### How It Works
126+
127+
1. Flink uses base credentials (access key, environment, or IAM role) to call STS AssumeRole
128+
2. STS returns temporary credentials (access key, secret key, session token)
129+
3. All S3 operations use the assumed role's permissions
130+
4. Credentials are automatically refreshed before expiration
131+
132+
**IAM Policy Example for the Assumed Role:**
133+
134+
```json
135+
{
136+
"Version": "2012-10-17",
137+
"Statement": [
138+
{
139+
"Effect": "Allow",
140+
"Action": [
141+
"s3:GetObject",
142+
"s3:PutObject",
143+
"s3:DeleteObject",
144+
"s3:ListBucket"
145+
],
146+
"Resource": [
147+
"arn:aws:s3:::my-bucket",
148+
"arn:aws:s3:::my-bucket/*"
149+
]
150+
}
151+
]
152+
}
153+
```
154+
155+
**Trust Policy for Cross-Account Access:**
156+
157+
```json
158+
{
159+
"Version": "2012-10-17",
160+
"Statement": [
161+
{
162+
"Effect": "Allow",
163+
"Principal": {
164+
"AWS": "arn:aws:iam::SOURCE_ACCOUNT_ID:root"
165+
},
166+
"Action": "sts:AssumeRole",
167+
"Condition": {
168+
"StringEquals": {
169+
"sts:ExternalId": "your-secret-external-id"
170+
}
171+
}
172+
}
173+
]
174+
}
175+
```
176+
177+
## MinIO and S3-Compatible Storage
178+
179+
The filesystem auto-detects custom endpoints and configures appropriate settings:
180+
181+
```yaml
182+
s3.access-key: minioadmin
183+
s3.secret-key: minioadmin
184+
s3.endpoint: http://localhost:9000
185+
s3.path-style-access: true # Auto-enabled for custom endpoints
186+
```
187+
188+
MinIO-specific optimizations are applied automatically:
189+
- Path-style access enabled
190+
- Chunked encoding disabled (compatibility)
191+
- Checksum validation disabled (compatibility)
192+
193+
## Memory Optimization for Large Files
194+
195+
The filesystem is optimized to handle large files without OOM errors:
196+
197+
### Streaming Reads (No Buffering)
198+
- Files are **streamed** chunk-by-chunk, not loaded into memory
199+
- Configurable read buffer (default 256KB) prevents memory spikes
200+
- Only small buffer held in memory at any time
201+
202+
### Configuration for Memory Efficiency
203+
204+
```yaml
205+
# Read buffer: smaller = less memory, larger = better throughput
206+
s3.read.buffer.size: 262144 # 256KB (default)
207+
# For memory-constrained environments: 65536 (64KB)
208+
# For high-throughput: 1048576 (1MB)
209+
```
210+
211+
**Memory Calculation Per Parallel Read:**
212+
- Buffer size × concurrent reads = total memory
213+
- Example: 256KB buffer × 16 parallel readers = 4MB total
214+
- This allows processing GB-sized files with MB-sized memory
215+
216+
### OOM Prevention Strategies
217+
218+
1. **Use smaller read buffers** (64-128KB) for very large files
219+
2. **Reduce parallelism** to limit concurrent S3 readers
220+
3. **Enable managed memory** for Flink state backend
221+
4. **Monitor**: `s3.read.buffer.size` × `parallelism` = peak memory
222+
223+
## Async Operations with TransferManager
224+
225+
The filesystem uses AWS SDK's TransferManager for high-performance async read/write operations:
226+
227+
**Benefits:**
228+
- **Automatic multipart management**: TransferManager automatically handles multipart uploads for large files
229+
- **Parallel transfers**: Multiple parts uploaded concurrently for maximum throughput
230+
- **Progress tracking**: Built-in progress monitoring and retry logic
231+
- **Resource optimization**: Efficient connection pooling and memory management
232+
- **Streaming uploads**: Data streamed from disk, not buffered in memory
233+
234+
**Configuration:**
235+
```yaml
236+
s3.async.enabled: true # Default: enabled
237+
```
238+
239+
When enabled, file uploads automatically use TransferManager for:
240+
- Large file uploads (automatic multipart handling)
241+
- Checkpoint data writes
242+
- Recoverable output stream operations
243+
244+
**Performance Impact:**
245+
- Up to 10x faster uploads for large files (>100MB)
246+
- **Reduced memory pressure** through streaming
247+
- Better utilization of available bandwidth
248+
- Lower heap requirements for write operations
249+
250+
## Checkpointing
251+
252+
Configure checkpoint storage in `conf/config.yaml`:
253+
254+
```yaml
255+
state.checkpoints.dir: s3://my-bucket/checkpoints
256+
execution.checkpointing.interval: 10s
257+
execution.checkpointing.mode: EXACTLY_ONCE
258+
```
259+
260+
Or programmatically:
261+
262+
```java
263+
env.enableCheckpointing(10000);
264+
env.getCheckpointConfig().setCheckpointStorage("s3://my-bucket/checkpoints");
265+
```
266+
267+
## Entropy Injection
268+
269+
For high-throughput checkpointing to avoid S3 hot partitions:
270+
271+
```yaml
272+
s3.entropy.key: _entropy_
273+
s3.entropy.length: 4
274+
```
275+
276+
Paths like `s3://bucket/_entropy_/checkpoints` will be expanded to `s3://bucket/af7e/checkpoints` with random characters.
277+
278+
## Implementation Details
279+
280+
The filesystem uses:
281+
- **AWS SDK v2** for all S3 operations
282+
- **Multipart uploads** for recoverable writes and large files
283+
- **S3 Transfer Manager** for bulk copy operations
284+
- **Separate sync and async clients** for optimal performance
285+
286+
Key classes:
287+
- `NativeS3FileSystem` - Main FileSystem implementation
288+
- `NativeS3RecoverableWriter` - Exactly-once writer using multipart uploads
289+
- `S3ClientProvider` - Manages S3 client lifecycle
290+
- `NativeS3AccessHelper` - Low-level S3 operations
291+
292+
## Building
293+
294+
```bash
295+
mvn clean package
296+
```
297+
298+
## Testing with MinIO
299+
300+
```bash
301+
# Start MinIO
302+
docker run -d -p 9000:9000 -p 9001:9001 \
303+
-e "MINIO_ROOT_USER=minioadmin" \
304+
-e "MINIO_ROOT_PASSWORD=minioadmin" \
305+
minio/minio server /data --console-address ":9001"
306+
307+
# Create bucket
308+
mc alias set local http://localhost:9000 minioadmin minioadmin
309+
mc mb local/test-bucket
310+
311+
# Run Flink with MinIO
312+
export FLINK_HOME=/path/to/flink
313+
cat > $FLINK_HOME/conf/config.yaml <<EOF
314+
s3.endpoint: http://localhost:9000
315+
s3.access-key: minioadmin
316+
s3.secret-key: minioadmin
317+
s3.path-style-access: true
318+
EOF
319+
320+
$FLINK_HOME/bin/flink run YourJob.jar
321+
```
322+
323+
## Delegation Tokens
324+
325+
The filesystem supports delegation tokens for secure multi-tenant deployments. The delegation token service name is `s3-native` to avoid conflicts with other S3 filesystem implementations.
326+
327+
Configure delegation tokens:
328+
329+
```yaml
330+
security.delegation.token.provider.s3-native.enabled: true
331+
security.delegation.token.provider.s3-native.access-key: YOUR_KEY
332+
security.delegation.token.provider.s3-native.secret-key: YOUR_SECRET
333+
security.delegation.token.provider.s3-native.region: us-east-1
334+
```

0 commit comments

Comments
 (0)