Skip to content

Commit b037b1d

Browse files
author
jojoliang
committed
add retry for put object
1 parent 48b30bc commit b037b1d

File tree

7 files changed

+373
-42
lines changed

7 files changed

+373
-42
lines changed

ci_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -612,6 +612,7 @@ func TestCIService_Put(t *testing.T) {
612612
&ObjectPutHeaderOptions{
613613
XOptionHeader: &http.Header{},
614614
},
615+
nil,
615616
}
616617
opt.XOptionHeader.Add("Pic-Operations", EncodePicOperations(pic))
617618
res, _, err := client.CI.Put(context.Background(), name, f, opt)
@@ -734,6 +735,7 @@ func TestCIService_PutFromFile(t *testing.T) {
734735
&ObjectPutHeaderOptions{
735736
XOptionHeader: &http.Header{},
736737
},
738+
nil,
737739
}
738740
opt.XOptionHeader.Add("Pic-Operations", EncodePicOperations(pic))
739741
res, _, err := client.CI.PutFromFile(context.Background(), name, filePath, opt)

cos.go

Lines changed: 24 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -373,6 +373,24 @@ func toSwitchHost(oldURL *url.URL) *url.URL {
373373
return newURL
374374
}
375375

376+
func (c *Client) CheckRetrieable(u *url.URL, resp *Response, err error) (*url.URL, bool) {
377+
res := u
378+
if err != nil && err != invalidBucketErr {
379+
// 不重试
380+
if resp != nil && resp.StatusCode < 500 {
381+
return res, false
382+
}
383+
if c.Conf.RetryOpt.AutoSwitchHost {
384+
// 收不到报文 或者 不存在RequestId
385+
if resp == nil || resp.Header.Get("X-Cos-Request-Id") == "" {
386+
res = toSwitchHost(u)
387+
}
388+
}
389+
return res, true
390+
}
391+
return res, false
392+
}
393+
376394
func (c *Client) doRetry(ctx context.Context, opt *sendOptions) (resp *Response, err error) {
377395
if opt.body != nil {
378396
if _, ok := opt.body.(io.Reader); ok {
@@ -384,29 +402,21 @@ func (c *Client) doRetry(ctx context.Context, opt *sendOptions) (resp *Response,
384402
if c.Conf.RetryOpt.Count > 0 {
385403
count = c.Conf.RetryOpt.Count
386404
}
387-
interval := c.Conf.RetryOpt.Interval
405+
var retrieable bool
388406
for nr := 0; nr < count; nr++ {
389407
resp, err = c.send(ctx, opt)
390-
if err != nil && err != invalidBucketErr {
391-
// 不重试
392-
if resp != nil && resp.StatusCode < 500 {
393-
break
394-
}
395-
if c.Conf.RetryOpt.AutoSwitchHost {
396-
// 收不到报文 或者 不存在RequestId
397-
if resp == nil || resp.Header.Get("X-Cos-Request-Id") == "" {
398-
opt.baseURL = toSwitchHost(opt.baseURL)
399-
}
400-
}
401-
if interval > 0 && nr+1 < count {
402-
time.Sleep(interval)
408+
opt.baseURL, retrieable = c.CheckRetrieable(opt.baseURL, resp, err)
409+
if retrieable {
410+
if c.Conf.RetryOpt.Interval > 0 && nr+1 < count {
411+
time.Sleep(c.Conf.RetryOpt.Interval)
403412
}
404413
continue
405414
}
406415
break
407416
}
408417
return
409418
}
419+
410420
func (c *Client) send(ctx context.Context, opt *sendOptions) (resp *Response, err error) {
411421
req, err := c.newRequest(ctx, opt.baseURL, opt.uri, opt.method, opt.body, opt.optQuery, opt.optHeader)
412422
if err != nil {

helper.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,7 @@ func CloneObjectPutOptions(opt *ObjectPutOptions) *ObjectPutOptions {
233233
res := &ObjectPutOptions{
234234
&ACLHeaderOptions{},
235235
&ObjectPutHeaderOptions{},
236+
nil,
236237
}
237238
if opt != nil {
238239
if opt.ACLHeaderOptions != nil {
@@ -243,6 +244,9 @@ func CloneObjectPutOptions(opt *ObjectPutOptions) *ObjectPutOptions {
243244
res.XCosMetaXXX = cloneHeader(opt.XCosMetaXXX)
244245
res.XOptionHeader = cloneHeader(opt.XOptionHeader)
245246
}
247+
if opt.innerSwitchURL != nil {
248+
res.innerSwitchURL = opt.innerSwitchURL
249+
}
246250
}
247251
return res
248252
}

object.go

Lines changed: 62 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -332,6 +332,9 @@ type ObjectPutHeaderOptions struct {
332332
type ObjectPutOptions struct {
333333
*ACLHeaderOptions `header:",omitempty" url:"-" xml:"-"`
334334
*ObjectPutHeaderOptions `header:",omitempty" url:"-" xml:"-"`
335+
336+
// PutFromFile 使用
337+
innerSwitchURL *url.URL `header:"-" url:"-" xml:"-"`
335338
}
336339

337340
// Put Object请求可以将一个文件(Oject)上传至指定Bucket。
@@ -358,27 +361,58 @@ func (s *ObjectService) Put(ctx context.Context, name string, r io.Reader, uopt
358361
opt.ContentLength = totalBytes
359362
}
360363
}
361-
reader := TeeReader(r, nil, totalBytes, nil)
362-
if s.client.Conf.EnableCRC {
363-
reader.writer = crc64.New(crc64.MakeTable(crc64.ECMA))
364-
}
365-
if opt != nil && opt.Listener != nil {
366-
reader.listener = opt.Listener
367-
}
368-
sendOpt := sendOptions{
369-
baseURL: s.client.BaseURL.BucketURL,
370-
uri: "/" + encodeURIComponent(name),
371-
method: http.MethodPut,
372-
body: reader,
373-
optHeader: opt,
364+
// 如果是io.Seeker,则重试
365+
count := 1
366+
var position int64
367+
if seeker, ok := r.(io.Seeker); ok {
368+
// 记录原始位置
369+
position, err = seeker.Seek(0, io.SeekCurrent)
370+
if err == nil && s.client.Conf.RetryOpt.Count > 0 {
371+
count = s.client.Conf.RetryOpt.Count
372+
}
373+
}
374+
var resp *Response
375+
var retrieable bool
376+
sUrl := s.client.BaseURL.BucketURL
377+
if opt.innerSwitchURL != nil {
378+
sUrl = opt.innerSwitchURL
379+
}
380+
for nr := 0; nr < count; nr++ {
381+
reader := TeeReader(r, nil, totalBytes, nil)
382+
if s.client.Conf.EnableCRC {
383+
reader.writer = crc64.New(crc64.MakeTable(crc64.ECMA))
384+
}
385+
if opt != nil && opt.Listener != nil {
386+
reader.listener = opt.Listener
387+
}
388+
sendOpt := sendOptions{
389+
baseURL: sUrl,
390+
uri: "/" + encodeURIComponent(name),
391+
method: http.MethodPut,
392+
body: reader,
393+
optHeader: opt,
394+
}
395+
resp, err = s.client.send(ctx, &sendOpt)
396+
sUrl, retrieable = s.client.CheckRetrieable(sUrl, resp, err)
397+
if retrieable && nr+1 < count {
398+
if seeker, ok := r.(io.Seeker); ok {
399+
_, e := seeker.Seek(position, io.SeekStart)
400+
if e != nil {
401+
break
402+
}
403+
continue
404+
}
405+
}
406+
break
374407
}
375-
resp, err := s.client.send(ctx, &sendOpt)
376-
377408
return resp, err
378409
}
379410

380411
// PutFromFile put object from local file
381412
func (s *ObjectService) PutFromFile(ctx context.Context, name string, filePath string, opt *ObjectPutOptions) (resp *Response, err error) {
413+
if opt == nil {
414+
opt = &ObjectPutOptions{}
415+
}
382416
nr := 0
383417
for nr < 3 {
384418
fd, e := os.Open(filePath)
@@ -390,6 +424,12 @@ func (s *ObjectService) PutFromFile(ctx context.Context, name string, filePath s
390424
if err != nil {
391425
nr++
392426
fd.Close()
427+
if s.client.Conf.RetryOpt.AutoSwitchHost {
428+
// 收不到报文 或者 不存在RequestId
429+
if resp == nil || resp.Header.Get("X-Cos-Request-Id") == "" {
430+
opt.innerSwitchURL = toSwitchHost(s.client.BaseURL.BucketURL)
431+
}
432+
}
393433
continue
394434
}
395435
fd.Close()
@@ -900,6 +940,12 @@ func worker(ctx context.Context, s *ObjectService, jobs <-chan *Jobs, results ch
900940
results <- &res
901941
break
902942
}
943+
if s.client.Conf.RetryOpt.AutoSwitchHost {
944+
// 收不到报文 或者 不存在RequestId
945+
if resp == nil || resp.Header.Get("X-Cos-Request-Id") == "" {
946+
j.Opt.innerSwitchURL = toSwitchHost(s.client.BaseURL.BucketURL)
947+
}
948+
}
903949
time.Sleep(time.Millisecond)
904950
continue
905951
}
@@ -1128,6 +1174,7 @@ func (s *ObjectService) Upload(ctx context.Context, name string, filepath string
11281174
opt0 = &ObjectPutOptions{
11291175
opt.OptIni.ACLHeaderOptions,
11301176
opt.OptIni.ObjectPutHeaderOptions,
1177+
nil,
11311178
}
11321179
}
11331180
rsp, err := s.PutFromFile(ctx, name, filepath, opt0)

object_part.go

Lines changed: 45 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,9 @@ type ObjectUploadPartOptions struct {
6060
XOptionHeader *http.Header `header:"-,omitempty" url:"-" xml:"-"`
6161
// 上传进度, ProgressCompleteEvent不能表示对应API调用成功,API是否调用成功的判断标准为返回err==nil
6262
Listener ProgressListener `header:"-" url:"-" xml:"-"`
63+
64+
// Upload方法使用
65+
innerSwitchURL *url.URL `header:"-" url:"-" xml:"-"`
6366
}
6467

6568
// UploadPart 请求实现在初始化以后的分块上传,支持的块的数量为1到10000,块的大小为1 MB 到5 GB。
@@ -93,22 +96,51 @@ func (s *ObjectService) UploadPart(ctx context.Context, name, uploadID string, p
9396
opt.ContentLength = totalBytes
9497
}
9598
}
96-
reader := TeeReader(r, nil, totalBytes, nil)
97-
if s.client.Conf.EnableCRC {
98-
reader.writer = crc64.New(crc64.MakeTable(crc64.ECMA))
99+
// 如果是io.Seeker,则重试
100+
count := 1
101+
var position int64
102+
if seeker, ok := r.(io.Seeker); ok {
103+
// 记录原始位置
104+
position, err = seeker.Seek(0, io.SeekCurrent)
105+
if err == nil && s.client.Conf.RetryOpt.Count > 0 {
106+
count = s.client.Conf.RetryOpt.Count
107+
}
99108
}
100-
if opt != nil && opt.Listener != nil {
101-
reader.listener = opt.Listener
109+
var resp *Response
110+
var retrieable bool
111+
sUrl := s.client.BaseURL.BucketURL
112+
if opt.innerSwitchURL != nil {
113+
sUrl = opt.innerSwitchURL
102114
}
103-
u := fmt.Sprintf("/%s?partNumber=%d&uploadId=%s", encodeURIComponent(name), partNumber, uploadID)
104-
sendOpt := sendOptions{
105-
baseURL: s.client.BaseURL.BucketURL,
106-
uri: u,
107-
method: http.MethodPut,
108-
optHeader: opt,
109-
body: reader,
115+
for nr := 0; nr < count; nr++ {
116+
reader := TeeReader(r, nil, totalBytes, nil)
117+
if s.client.Conf.EnableCRC {
118+
reader.writer = crc64.New(crc64.MakeTable(crc64.ECMA))
119+
}
120+
if opt != nil && opt.Listener != nil {
121+
reader.listener = opt.Listener
122+
}
123+
u := fmt.Sprintf("/%s?partNumber=%d&uploadId=%s", encodeURIComponent(name), partNumber, uploadID)
124+
sendOpt := sendOptions{
125+
baseURL: sUrl,
126+
uri: u,
127+
method: http.MethodPut,
128+
optHeader: opt,
129+
body: reader,
130+
}
131+
resp, err = s.client.send(ctx, &sendOpt)
132+
sUrl, retrieable = s.client.CheckRetrieable(sUrl, resp, err)
133+
if retrieable && nr+1 < count {
134+
if seeker, ok := r.(io.Seeker); ok {
135+
_, e := seeker.Seek(position, io.SeekStart)
136+
if e != nil {
137+
break
138+
}
139+
continue
140+
}
141+
}
142+
break
110143
}
111-
resp, err := s.client.send(ctx, &sendOpt)
112144
return resp, err
113145
}
114146

object_part_test.go

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,12 @@ import (
1010
"hash/crc64"
1111
"io/ioutil"
1212
"net/http"
13+
"os"
1314
"reflect"
1415
"strconv"
16+
"strings"
1517
"testing"
18+
"time"
1619
)
1720

1821
func TestObjectService_AbortMultipartUpload(t *testing.T) {
@@ -540,3 +543,79 @@ func TestObjectService_MultiCopy(t *testing.T) {
540543
}
541544

542545
}
546+
547+
func TestObjectService_UploadPartRetry(t *testing.T) {
548+
setup()
549+
defer teardown()
550+
551+
opt := &ObjectUploadPartOptions{
552+
Listener: &DefaultProgressListener{},
553+
}
554+
name := "test/hello.txt"
555+
data := make([]byte, 1024*1024*3)
556+
_, err := rand.Read(data)
557+
tb := crc64.MakeTable(crc64.ECMA)
558+
realcrc := crc64.Update(0, tb, data)
559+
uploadID := "xxxxx"
560+
partNumber := 1
561+
562+
nr, count := 0, 3
563+
mux.HandleFunc("/test/hello.txt", func(w http.ResponseWriter, r *http.Request) {
564+
testMethod(t, r, http.MethodPut)
565+
vs := values{
566+
"uploadId": uploadID,
567+
"partNumber": "1",
568+
}
569+
testFormValues(t, r, vs)
570+
571+
b, _ := ioutil.ReadAll(r.Body)
572+
crc := crc64.Update(0, tb, b)
573+
if !reflect.DeepEqual(crc, realcrc) {
574+
t.Errorf("Object.Put crc: %v, want: %v", crc, realcrc)
575+
}
576+
w.Header().Add("x-cos-hash-crc64ecma", strconv.FormatUint(crc, 10))
577+
nr++
578+
if nr < count {
579+
w.WriteHeader(http.StatusInternalServerError)
580+
}
581+
})
582+
583+
_, err = client.Object.UploadPart(context.Background(), name, uploadID, partNumber, bytes.NewReader(data), opt)
584+
if err != nil || nr != count {
585+
t.Errorf("Object.UploadPart failed: %v", err)
586+
}
587+
588+
nr, count = 0, 3
589+
_, err = client.Object.UploadPart(context.Background(), name, uploadID, partNumber, strings.NewReader(string(data)), opt)
590+
if err != nil || nr != count {
591+
t.Errorf("Object.UploadPart failed: %v", err)
592+
}
593+
594+
// 非io.Seeker不做重试
595+
nr, count = 0, 3
596+
_, err = client.Object.UploadPart(context.Background(), name, uploadID, partNumber, bytes.NewBuffer(data), opt)
597+
if err == nil || nr != 1 {
598+
t.Errorf("Object.UploadPart failed: %v", err)
599+
}
600+
601+
filePath := "tmpfile" + time.Now().Format(time.RFC3339)
602+
newfile, err := os.Create(filePath)
603+
if err != nil {
604+
t.Fatalf("create tmp file failed")
605+
}
606+
defer os.Remove(filePath)
607+
// 源文件内容
608+
newfile.Write(data)
609+
newfile.Close()
610+
611+
fd, err := os.Open(filePath)
612+
if err != nil {
613+
t.Fatalf("open file failed: %v", err)
614+
}
615+
// 文件关闭, 不做重试
616+
nr, count = 0, 3
617+
_, err = client.Object.UploadPart(context.Background(), name, uploadID, partNumber, fd, opt)
618+
if err == nil || nr != 1 {
619+
t.Errorf("Object.UploadPart failed: %v", err)
620+
}
621+
}

0 commit comments

Comments
 (0)