2020
2121import static com .qcloud .cos .event .SDKProgressPublisher .publishProgress ;
2222
23+ import java .io .File ;
24+ import java .io .IOException ;
2325import java .io .InputStream ;
26+ import java .util .Arrays ;
2427import java .util .ArrayList ;
2528import java .util .HashMap ;
2629import java .util .List ;
2730import java .util .Map ;
31+ import java .util .Objects ;
2832import java .util .concurrent .Callable ;
2933import java .util .concurrent .CancellationException ;
3034import java .util .concurrent .ExecutorService ;
3539import com .qcloud .cos .event .COSProgressPublisher ;
3640import com .qcloud .cos .event .ProgressEventType ;
3741import com .qcloud .cos .event .ProgressListenerChain ;
42+ import com .qcloud .cos .exception .CosClientException ;
43+ import com .qcloud .cos .exception .CosServiceException ;
44+ import com .qcloud .cos .internal .InputSubstream ;
45+ import com .qcloud .cos .internal .ResettableInputStream ;
3846import com .qcloud .cos .internal .UploadPartRequestFactory ;
3947import com .qcloud .cos .model .AbortMultipartUploadRequest ;
4048import com .qcloud .cos .model .CompleteMultipartUploadRequest ;
5159import com .qcloud .cos .model .PutObjectResult ;
5260import com .qcloud .cos .model .UploadPartRequest ;
5361import com .qcloud .cos .model .UploadResult ;
62+ import com .qcloud .cos .model .ListMultipartUploadsRequest ;
63+ import com .qcloud .cos .model .MultipartUploadListing ;
64+ import com .qcloud .cos .model .MultipartUpload ;
5465import com .qcloud .cos .transfer .Transfer .TransferState ;
66+ import com .qcloud .cos .utils .BinaryUtils ;
67+ import com .qcloud .cos .utils .Md5Utils ;
5568
69+ import org .apache .commons .codec .DecoderException ;
5670import org .slf4j .Logger ;
5771import org .slf4j .LoggerFactory ;
5872
@@ -76,8 +90,12 @@ public class UploadCallable implements Callable<UploadResult> {
7690 */
7791 private final List <PartETag > eTagsToSkip = new ArrayList <PartETag >();
7892
93+ private Map <Integer , PartSummary > skipParts = new HashMap <Integer , PartSummary >();
94+
7995 private PersistableUpload persistableUpload ;
8096
97+ private boolean isResumableUpload = false ;
98+
8199 public UploadCallable (TransferManager transferManager , ExecutorService threadPool ,
82100 UploadImpl upload , PutObjectRequest origReq ,
83101 ProgressListenerChain progressListenerChain , String uploadId ,
@@ -179,7 +197,10 @@ private UploadResult uploadInParts() throws Exception {
179197 long optimalPartSize = getOptimalPartSize (isUsingEncryption );
180198 try {
181199 if (multipartUploadId == null ) {
182- multipartUploadId = initiateMultipartUpload (origReq , isUsingEncryption , optimalPartSize );
200+ isResumableUpload = getResumableUploadId (isUsingEncryption );
201+ if (!isResumableUpload ) {
202+ multipartUploadId = initiateMultipartUpload (origReq , isUsingEncryption , optimalPartSize );
203+ }
183204 }
184205
185206 UploadPartRequestFactory requestFactory =
@@ -194,7 +215,6 @@ private UploadResult uploadInParts() throws Exception {
194215 }
195216 } catch (Exception e ) {
196217 publishProgress (listener , ProgressEventType .TRANSFER_FAILED_EVENT );
197- performAbortMultipartUpload ();
198218 throw e ;
199219 } finally {
200220 if (origReq .getInputStream () != null ) {
@@ -207,6 +227,112 @@ private UploadResult uploadInParts() throws Exception {
207227 }
208228 }
209229
230+ private boolean getResumableUploadId (boolean isUsingEncryption ) throws CosServiceException , CosClientException {
231+ if (isUsingEncryption || origReq .getFile () == null || !origReq .isEnableResumableUpload ()) {
232+ return false ;
233+ }
234+ ListMultipartUploadsRequest listMultipartUploadsRequest = new ListMultipartUploadsRequest (origReq .getBucketName ());
235+ listMultipartUploadsRequest .setPrefix (origReq .getKey ());
236+ MultipartUploadListing uploadListing = cos .listMultipartUploads (listMultipartUploadsRequest );
237+ List <MultipartUpload > uploads = uploadListing .getMultipartUploads ();
238+ for (int index = uploads .size () - 1 ; index >= 0 ; index --) {
239+ if (Objects .equals (uploads .get (index ).getKey (), origReq .getKey ())) {
240+ multipartUploadId = uploads .get (index ).getUploadId ();
241+ break ;
242+ }
243+ }
244+
245+ if (multipartUploadId != null ) {
246+ return checkResumableUpload ();
247+ }
248+
249+ return false ;
250+ }
251+
252+ private boolean checkResumableUpload () {
253+ ListPartsRequest listPartsRequest = new ListPartsRequest (origReq .getBucketName (), origReq .getKey (), multipartUploadId );
254+ PartListing partListing = null ;
255+ List <PartSummary > partSummaries = new ArrayList <>();
256+ do {
257+ try {
258+ partListing = cos .listParts (listPartsRequest );
259+ } catch (CosServiceException cse ) {
260+ if (cse .getStatusCode () == 404 && Objects .equals (cse .getErrorCode (), "NoSuchUpload" )) {
261+ return false ;
262+ }
263+ throw cse ;
264+ } catch (CosClientException cce ) {
265+ throw cce ;
266+ }
267+ partSummaries .addAll (partListing .getParts ());
268+ listPartsRequest .setPartNumberMarker (partListing .getNextPartNumberMarker ());
269+ } while (partListing .isTruncated ());
270+
271+ long optimalPartSize = getOptimalPartSize (false );
272+ long contentLength = TransferManagerUtils .getContentLength (origReq );
273+ long lastPartSize = contentLength % optimalPartSize ;
274+ long partsNum = (contentLength / optimalPartSize );
275+ if (lastPartSize == 0 ) {
276+ lastPartSize = optimalPartSize ;
277+ } else {
278+ partsNum = partsNum + 1 ;
279+ }
280+
281+ for (PartSummary partSummary : partSummaries ) {
282+ int partNum = partSummary .getPartNumber ();
283+ if (partNum > partsNum ) {
284+ return false ;
285+ }
286+ long offset = (partNum - 1 ) * optimalPartSize ;
287+ long localPartSize = optimalPartSize ;
288+ boolean isLastPart = false ;
289+ if (partNum == partsNum ) {
290+ localPartSize = lastPartSize ;
291+ isLastPart = true ;
292+ }
293+ if (!checkSingleUploadPart (offset , localPartSize , partSummary , isLastPart )) {
294+ return false ;
295+ }
296+ skipParts .put (partNum , partSummary );
297+ }
298+
299+ return true ;
300+ }
301+
302+ private boolean checkSingleUploadPart (long offset , long loaclPartSize , PartSummary partSummary , boolean isLastPart ) {
303+ long remoteSize = partSummary .getSize ();
304+ if (loaclPartSize != remoteSize ) {
305+ String warnMsg = String .format ("The remote part size[%d] is not equal to the local part size[%d], will not do resumable upload,"
306+ + " uploadId[%s], partNum[%d]" , remoteSize , loaclPartSize , multipartUploadId , partSummary .getPartNumber ());
307+ log .warn (warnMsg );
308+ return false ;
309+ }
310+
311+ File fileOrig = origReq .getFile ();
312+ InputStream isCurr = null ;
313+ try {
314+ isCurr = new ResettableInputStream (fileOrig );
315+ } catch (IOException ie ) {
316+ log .warn ("Can not check single upload part due to the IO exception: " , ie );
317+ return false ;
318+ }
319+ isCurr = new InputSubstream (isCurr , offset , loaclPartSize , isLastPart );
320+ try {
321+ byte [] clientSideHash = Md5Utils .computeMD5Hash (isCurr );
322+ byte [] serverSideHash = BinaryUtils .fromHex (partSummary .getETag ());
323+ if (!Arrays .equals (clientSideHash , serverSideHash )) {
324+ return false ;
325+ }
326+ } catch (DecoderException de ) {
327+ log .warn ("Can not check single upload part due to the decode exception: " , de );
328+ return false ;
329+ } catch (IOException e ) {
330+ throw new RuntimeException (e );
331+ }
332+
333+ return true ;
334+ }
335+
210336 /**
211337 * Performs an {@link COS#abortMultipartUpload(AbortMultipartUploadRequest)} operation for the
212338 * given multi-part upload.
@@ -305,7 +431,7 @@ private UploadResult uploadPartsInSeries(UploadPartRequestFactory requestFactory
305431 */
306432 private void uploadPartsInParallel (UploadPartRequestFactory requestFactory , String uploadId ) {
307433
308- Map <Integer , PartSummary > partNumbers = identifyExistingPartsForResume (uploadId );
434+ Map <Integer , PartSummary > partNumbers = isResumableUpload ? skipParts : identifyExistingPartsForResume (uploadId );
309435
310436 while (requestFactory .hasMoreRequests ()) {
311437 if (threadPool .isShutdown ())
0 commit comments