2020
2121import static com .qcloud .cos .event .SDKProgressPublisher .publishProgress ;
2222
23+ import java .io .File ;
24+ import java .io .IOException ;
2325import java .io .InputStream ;
26+ import java .util .Arrays ;
2427import java .util .ArrayList ;
2528import java .util .HashMap ;
2629import java .util .List ;
2730import java .util .Map ;
31+ import java .util .Objects ;
2832import java .util .concurrent .Callable ;
2933import java .util .concurrent .CancellationException ;
3034import java .util .concurrent .ExecutorService ;
3539import com .qcloud .cos .event .COSProgressPublisher ;
3640import com .qcloud .cos .event .ProgressEventType ;
3741import com .qcloud .cos .event .ProgressListenerChain ;
42+ import com .qcloud .cos .exception .CosClientException ;
43+ import com .qcloud .cos .exception .CosServiceException ;
44+ import com .qcloud .cos .internal .InputSubstream ;
45+ import com .qcloud .cos .internal .ResettableInputStream ;
3846import com .qcloud .cos .internal .UploadPartRequestFactory ;
3947import com .qcloud .cos .model .AbortMultipartUploadRequest ;
4048import com .qcloud .cos .model .CompleteMultipartUploadRequest ;
5159import com .qcloud .cos .model .PutObjectResult ;
5260import com .qcloud .cos .model .UploadPartRequest ;
5361import com .qcloud .cos .model .UploadResult ;
62+ import com .qcloud .cos .model .ListMultipartUploadsRequest ;
63+ import com .qcloud .cos .model .MultipartUploadListing ;
64+ import com .qcloud .cos .model .MultipartUpload ;
5465import com .qcloud .cos .transfer .Transfer .TransferState ;
66+ import com .qcloud .cos .utils .BinaryUtils ;
67+ import com .qcloud .cos .utils .Md5Utils ;
5568
69+ import org .apache .commons .codec .DecoderException ;
5670import org .slf4j .Logger ;
5771import org .slf4j .LoggerFactory ;
5872
@@ -76,8 +90,12 @@ public class UploadCallable implements Callable<UploadResult> {
7690 */
7791 private final List <PartETag > eTagsToSkip = new ArrayList <PartETag >();
7892
93+ private Map <Integer , PartSummary > skipParts = new HashMap <Integer , PartSummary >();
94+
7995 private PersistableUpload persistableUpload ;
8096
97+ private boolean isResumableUpload = false ;
98+
8199 public UploadCallable (TransferManager transferManager , ExecutorService threadPool ,
82100 UploadImpl upload , PutObjectRequest origReq ,
83101 ProgressListenerChain progressListenerChain , String uploadId ,
@@ -179,7 +197,10 @@ private UploadResult uploadInParts() throws Exception {
179197 long optimalPartSize = getOptimalPartSize (isUsingEncryption );
180198 try {
181199 if (multipartUploadId == null ) {
182- multipartUploadId = initiateMultipartUpload (origReq , isUsingEncryption , optimalPartSize );
200+ isResumableUpload = getResumableUploadId (isUsingEncryption );
201+ if (!isResumableUpload ) {
202+ multipartUploadId = initiateMultipartUpload (origReq , isUsingEncryption , optimalPartSize );
203+ }
183204 }
184205
185206 UploadPartRequestFactory requestFactory =
@@ -194,7 +215,6 @@ private UploadResult uploadInParts() throws Exception {
194215 }
195216 } catch (Exception e ) {
196217 publishProgress (listener , ProgressEventType .TRANSFER_FAILED_EVENT );
197- performAbortMultipartUpload ();
198218 throw e ;
199219 } finally {
200220 if (origReq .getInputStream () != null ) {
@@ -207,6 +227,114 @@ private UploadResult uploadInParts() throws Exception {
207227 }
208228 }
209229
230+ private boolean getResumableUploadId (boolean isUsingEncryption ) throws CosServiceException , CosClientException {
231+ if (isUsingEncryption || origReq .getFile () == null || !origReq .isEnableResumableUpload ()) {
232+ return false ;
233+ }
234+ ListMultipartUploadsRequest listMultipartUploadsRequest = new ListMultipartUploadsRequest (origReq .getBucketName ());
235+ listMultipartUploadsRequest .setPrefix (origReq .getKey ());
236+ MultipartUploadListing uploadListing = cos .listMultipartUploads (listMultipartUploadsRequest );
237+ List <MultipartUpload > uploads = uploadListing .getMultipartUploads ();
238+ for (int index = uploads .size () - 1 ; index >= 0 ; index --) {
239+ if (Objects .equals (uploads .get (index ).getKey (), origReq .getKey ())) {
240+ multipartUploadId = uploads .get (index ).getUploadId ();
241+ break ;
242+ }
243+ }
244+
245+ if (multipartUploadId != null ) {
246+ return checkResumableUpload ();
247+ }
248+
249+ return false ;
250+ }
251+
252+ private boolean checkResumableUpload () {
253+ ListPartsRequest listPartsRequest = new ListPartsRequest (origReq .getBucketName (), origReq .getKey (), multipartUploadId );
254+ PartListing partListing = null ;
255+ List <PartSummary > partSummaries = new ArrayList <>();
256+ do {
257+ try {
258+ partListing = cos .listParts (listPartsRequest );
259+ } catch (CosServiceException cse ) {
260+ if (cse .getStatusCode () == 404 && Objects .equals (cse .getErrorCode (), "NoSuchUpload" )) {
261+ String warnMsg = String .format ("will not do resumable upload, because the uploadId[%s] is not exist, request id is %s" , multipartUploadId , cse .getRequestId ());
262+ log .warn (warnMsg );
263+ return false ;
264+ }
265+ throw cse ;
266+ } catch (CosClientException cce ) {
267+ throw cce ;
268+ }
269+ partSummaries .addAll (partListing .getParts ());
270+ listPartsRequest .setPartNumberMarker (partListing .getNextPartNumberMarker ());
271+ } while (partListing .isTruncated ());
272+
273+ long optimalPartSize = getOptimalPartSize (false );
274+ long contentLength = TransferManagerUtils .getContentLength (origReq );
275+ long lastPartSize = contentLength % optimalPartSize ;
276+ long partsNum = (contentLength / optimalPartSize );
277+ if (lastPartSize == 0 ) {
278+ lastPartSize = optimalPartSize ;
279+ } else {
280+ partsNum = partsNum + 1 ;
281+ }
282+
283+ for (PartSummary partSummary : partSummaries ) {
284+ int partNum = partSummary .getPartNumber ();
285+ if (partNum > partsNum ) {
286+ return false ;
287+ }
288+ long offset = (partNum - 1 ) * optimalPartSize ;
289+ long localPartSize = optimalPartSize ;
290+ boolean isLastPart = false ;
291+ if (partNum == partsNum ) {
292+ localPartSize = lastPartSize ;
293+ isLastPart = true ;
294+ }
295+ if (!checkSingleUploadPart (offset , localPartSize , partSummary , isLastPart )) {
296+ return false ;
297+ }
298+ skipParts .put (partNum , partSummary );
299+ }
300+
301+ return true ;
302+ }
303+
304+ private boolean checkSingleUploadPart (long offset , long loaclPartSize , PartSummary partSummary , boolean isLastPart ) {
305+ long remoteSize = partSummary .getSize ();
306+ if (loaclPartSize != remoteSize ) {
307+ String warnMsg = String .format ("The remote part size[%d] is not equal to the local part size[%d], will not do resumable upload,"
308+ + " uploadId[%s], partNum[%d]" , remoteSize , loaclPartSize , multipartUploadId , partSummary .getPartNumber ());
309+ log .warn (warnMsg );
310+ return false ;
311+ }
312+
313+ File fileOrig = origReq .getFile ();
314+ InputStream isCurr = null ;
315+ try {
316+ isCurr = new ResettableInputStream (fileOrig );
317+ } catch (IOException ie ) {
318+ log .warn ("Can not check single upload part due to the IO exception: " , ie );
319+ return false ;
320+ }
321+ isCurr = new InputSubstream (isCurr , offset , loaclPartSize , isLastPart );
322+ try {
323+ byte [] clientSideHash = Md5Utils .computeMD5Hash (isCurr );
324+ byte [] serverSideHash = BinaryUtils .fromHex (partSummary .getETag ());
325+ if (!Arrays .equals (clientSideHash , serverSideHash )) {
326+ return false ;
327+ }
328+ } catch (DecoderException de ) {
329+ log .warn ("Can not check single upload part due to the decode exception: " , de );
330+ return false ;
331+ } catch (IOException e ) {
332+ throw new RuntimeException (e );
333+ }
334+
335+ return true ;
336+ }
337+
210338 /**
211339 * Performs an {@link COS#abortMultipartUpload(AbortMultipartUploadRequest)} operation for the
212340 * given multi-part upload.
@@ -305,7 +433,7 @@ private UploadResult uploadPartsInSeries(UploadPartRequestFactory requestFactory
305433 */
306434 private void uploadPartsInParallel (UploadPartRequestFactory requestFactory , String uploadId ) {
307435
308- Map <Integer , PartSummary > partNumbers = identifyExistingPartsForResume (uploadId );
436+ Map <Integer , PartSummary > partNumbers = isResumableUpload ? skipParts : identifyExistingPartsForResume (uploadId );
309437
310438 while (requestFactory .hasMoreRequests ()) {
311439 if (threadPool .isShutdown ())
0 commit comments