Skip to content

Commit 481b28d

Browse files
committed
Address to concurrency issues.
1 parent 3628600 commit 481b28d

File tree

4 files changed

+928
-4
lines changed

4 files changed

+928
-4
lines changed

flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -236,28 +236,32 @@ public S3ClientProvider build() {
236236
disableCertCheck = true;
237237
}
238238
if (region == null || region.isEmpty()) {
239-
throw new IllegalArgumentException("Region is required for AWS S3");
239+
region = "us-east-1";
240+
LOG.debug(
241+
"Setting default region to us-east-1 for S3-compatible storage (required by AWS SDK)");
240242
}
241243
}
242244

243245
Region awsRegion;
244246
if (region != null && !region.isEmpty()) {
245247
awsRegion = Region.of(region);
248+
LOG.debug("Using explicitly configured region: {}", region);
249+
} else if (!isS3Compatible) {
246250
try {
247251
awsRegion = DefaultAwsRegionProviderChain.builder().build().getRegion();
248252
LOG.info(
249253
"Automatically detected AWS region: {} (via DefaultAwsRegionProviderChain)",
250254
awsRegion.id());
251255
} catch (Exception e) {
252-
LOG.error(
256+
awsRegion = Region.US_EAST_1;
257+
LOG.warn(
253258
"Failed to automatically detect AWS region, falling back to us-east-1. "
254259
+ "Consider setting the s3.region configuration or AWS_REGION environment variable. "
255260
+ "Error: {}",
256261
e.getMessage());
257-
throw new IllegalArgumentException("Region is required for AWS S3");
258262
}
259263
} else {
260-
throw new IllegalArgumentException("Region is required for AWS S3");
264+
awsRegion = Region.US_EAST_1;
261265
}
262266

263267
LOG.info(
Lines changed: 213 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,213 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.fs.s3native.commit;
20+
21+
import org.apache.flink.annotation.Internal;
22+
23+
import javax.annotation.concurrent.Immutable;
24+
25+
import java.io.Serializable;
26+
import java.util.Collections;
27+
import java.util.List;
28+
import java.util.Objects;
29+
30+
import static org.apache.flink.util.Preconditions.checkArgument;
31+
import static org.apache.flink.util.Preconditions.checkNotNull;
32+
33+
/**
34+
* Commit marker for atomic visibility of multipart uploads on S3.
35+
36+
*
37+
* <p>Key principles:
38+
*
39+
* <ul>
40+
* <li>MPU completion is a non-visible operation until marker is written
41+
* <li>Commit marker is the single source of truth for upload completion
42+
* <li>Never rely on S3 listings to determine completeness
43+
* <li>Enables deterministic recovery from partial uploads
44+
* </ul>
45+
46+
*/
47+
@Immutable
48+
@Internal
49+
public class S3CommitMarker implements Serializable {
50+
51+
private static final long serialVersionUID = 1L;
52+
53+
private static final int VERSION = 1;
54+
private final String bucket;
55+
private final String key;
56+
private final String uploadId;
57+
private final List<String> eTags;
58+
private final String checksum;
59+
private final long uploadStartTime;
60+
private final long uploadCompleteTime;
61+
private final long objectSize;
62+
private final long checkpointId;
63+
64+
public S3CommitMarker(
65+
String bucket,
66+
String key,
67+
String uploadId,
68+
List<String> eTags,
69+
String checksum,
70+
long uploadStartTime,
71+
long uploadCompleteTime,
72+
long objectSize,
73+
long checkpointId) {
74+
this.bucket = checkNotNull(bucket, "bucket must not be null");
75+
this.key = checkNotNull(key, "key must not be null");
76+
this.uploadId = checkNotNull(uploadId, "uploadId must not be null");
77+
this.eTags = Collections.unmodifiableList(checkNotNull(eTags, "eTags must not be null"));
78+
checkArgument(!eTags.isEmpty(), "eTags list must not be empty");
79+
checkArgument(
80+
uploadCompleteTime >= uploadStartTime,
81+
"uploadCompleteTime must be >= uploadStartTime");
82+
checkArgument(objectSize >= 0, "objectSize must be non-negative");
83+
checkArgument(checkpointId >= 0, "checkpointId must be non-negative");
84+
85+
this.checksum = checksum;
86+
this.uploadStartTime = uploadStartTime;
87+
this.uploadCompleteTime = uploadCompleteTime;
88+
this.objectSize = objectSize;
89+
this.checkpointId = checkpointId;
90+
}
91+
92+
public String getBucket() {
93+
return bucket;
94+
}
95+
96+
public String getKey() {
97+
return key;
98+
}
99+
100+
public String getUploadId() {
101+
return uploadId;
102+
}
103+
104+
public List<String> getETags() {
105+
return eTags;
106+
}
107+
108+
public String getChecksum() {
109+
return checksum;
110+
}
111+
112+
public long getUploadStartTime() {
113+
return uploadStartTime;
114+
}
115+
116+
public long getUploadCompleteTime() {
117+
return uploadCompleteTime;
118+
}
119+
120+
public long getObjectSize() {
121+
return objectSize;
122+
}
123+
124+
public long getCheckpointId() {
125+
return checkpointId;
126+
}
127+
128+
public int getVersion() {
129+
return VERSION;
130+
}
131+
132+
/**
133+
* Gets the S3 path for this commit marker file.
134+
*
135+
* <p>Convention: {original-key}.commit
136+
*/
137+
public String getMarkerPath() {
138+
return key + ".commit";
139+
}
140+
141+
/**
142+
* Validates that this commit marker is complete and consistent.
143+
*
144+
* @return true if the marker is valid
145+
*/
146+
public boolean isValid() {
147+
return bucket != null
148+
&& !bucket.isEmpty()
149+
&& key != null
150+
&& !key.isEmpty()
151+
&& uploadId != null
152+
&& !uploadId.isEmpty()
153+
&& eTags != null
154+
&& !eTags.isEmpty()
155+
&& uploadCompleteTime >= uploadStartTime
156+
&& objectSize >= 0;
157+
}
158+
159+
@Override
160+
public boolean equals(Object o) {
161+
if (this == o) {
162+
return true;
163+
}
164+
if (o == null || getClass() != o.getClass()) {
165+
return false;
166+
}
167+
S3CommitMarker that = (S3CommitMarker) o;
168+
return uploadStartTime == that.uploadStartTime
169+
&& uploadCompleteTime == that.uploadCompleteTime
170+
&& objectSize == that.objectSize
171+
&& checkpointId == that.checkpointId
172+
&& Objects.equals(bucket, that.bucket)
173+
&& Objects.equals(key, that.key)
174+
&& Objects.equals(uploadId, that.uploadId)
175+
&& Objects.equals(eTags, that.eTags)
176+
&& Objects.equals(checksum, that.checksum);
177+
}
178+
179+
@Override
180+
public int hashCode() {
181+
return Objects.hash(
182+
bucket,
183+
key,
184+
uploadId,
185+
eTags,
186+
checksum,
187+
uploadStartTime,
188+
uploadCompleteTime,
189+
objectSize,
190+
checkpointId);
191+
}
192+
193+
@Override
194+
public String toString() {
195+
return "S3CommitMarker{"
196+
+ "bucket='"
197+
+ bucket
198+
+ '\''
199+
+ ", key='"
200+
+ key
201+
+ '\''
202+
+ ", uploadId='"
203+
+ uploadId
204+
+ '\''
205+
+ ", parts="
206+
+ eTags.size()
207+
+ ", checkpointId="
208+
+ checkpointId
209+
+ ", size="
210+
+ objectSize
211+
+ '}';
212+
}
213+
}

0 commit comments

Comments
 (0)