Skip to content

Commit 2e91823

Browse files
authored
add digest validation utils and examples (Azure#20887)
* add digest validation utils and examples * update recording assets * ci fix * update upload chunk example and test * refine with review * update examples * fix lint * update example
1 parent e926d35 commit 2e91823

14 files changed

+595
-210
lines changed

sdk/containers/azcontainerregistry/assets.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,5 @@
22
"AssetsRepo": "Azure/azure-sdk-assets",
33
"AssetsRepoPrefixPath": "go",
44
"TagPrefix": "go/containers/azcontainerregistry",
5-
"Tag": "go/containers/azcontainerregistry_5bce238ccf"
5+
"Tag": "go/containers/azcontainerregistry_9579d04096"
66
}

sdk/containers/azcontainerregistry/blob_client_test.go

Lines changed: 58 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,21 @@ package azcontainerregistry
99
import (
1010
"bytes"
1111
"context"
12+
"fmt"
13+
"github.com/Azure/azure-sdk-for-go/sdk/azcore/policy"
1214
"github.com/Azure/azure-sdk-for-go/sdk/azcore/runtime"
1315
"github.com/Azure/azure-sdk-for-go/sdk/azcore/streaming"
16+
"github.com/Azure/azure-sdk-for-go/sdk/internal/mock"
1417
"github.com/stretchr/testify/require"
1518
"io"
19+
"net/http"
20+
"strconv"
21+
"strings"
1622
"testing"
1723
)
1824

25+
const alpineBlobDigest = "sha256:042a816809aac8d0f7d7cacac7965782ee2ecac3f21bcf9f24b1de1a7387b769"
26+
1927
func TestBlobClient_CancelUpload(t *testing.T) {
2028
startRecording(t)
2129
endpoint, cred, options := getEndpointCredAndClientOptions(t)
@@ -44,10 +52,9 @@ func TestBlobClient_CheckBlobExists(t *testing.T) {
4452
ctx := context.Background()
4553
client, err := NewBlobClient(endpoint, cred, &BlobClientOptions{ClientOptions: options})
4654
require.NoError(t, err)
47-
digest := "sha256:042a816809aac8d0f7d7cacac7965782ee2ecac3f21bcf9f24b1de1a7387b769"
48-
res, err := client.CheckBlobExists(ctx, "alpine", digest, nil)
55+
res, err := client.CheckBlobExists(ctx, "alpine", alpineBlobDigest, nil)
4956
require.NoError(t, err)
50-
require.Equal(t, digest, *res.DockerContentDigest)
57+
require.Equal(t, alpineBlobDigest, *res.DockerContentDigest)
5158
}
5259

5360
func TestBlobClient_CheckBlobExists_fail(t *testing.T) {
@@ -76,8 +83,7 @@ func TestBlobClient_CheckChunkExists(t *testing.T) {
7683
ctx := context.Background()
7784
client, err := NewBlobClient(endpoint, cred, &BlobClientOptions{ClientOptions: options})
7885
require.NoError(t, err)
79-
digest := "sha256:042a816809aac8d0f7d7cacac7965782ee2ecac3f21bcf9f24b1de1a7387b769"
80-
res, err := client.CheckChunkExists(ctx, "alpine", digest, "bytes=0-299", nil)
86+
res, err := client.CheckChunkExists(ctx, "alpine", alpineBlobDigest, "bytes=0-299", nil)
8187
require.NoError(t, err)
8288
require.NotEmpty(t, *res.ContentLength)
8389
}
@@ -108,8 +114,7 @@ func TestBlobClient_completeUpload_wrongDigest(t *testing.T) {
108114
ctx := context.Background()
109115
client, err := NewBlobClient(endpoint, cred, &BlobClientOptions{ClientOptions: options})
110116
require.NoError(t, err)
111-
digest := "sha256:042a816809aac8d0f7d7cacac7965782ee2ecac3f21bcf9f24b1de1a7387b769"
112-
getRes, err := client.GetBlob(ctx, "alpine", digest, nil)
117+
getRes, err := client.GetBlob(ctx, "alpine", alpineBlobDigest, nil)
113118
require.NoError(t, err)
114119
blob, err := io.ReadAll(getRes.BlobData)
115120
require.NoError(t, err)
@@ -127,8 +132,7 @@ func TestBlobClient_DeleteBlob(t *testing.T) {
127132
ctx := context.Background()
128133
client, err := NewBlobClient(endpoint, cred, &BlobClientOptions{ClientOptions: options})
129134
require.NoError(t, err)
130-
digest := "sha256:042a816809aac8d0f7d7cacac7965782ee2ecac3f21bcf9f24b1de1a7387b769"
131-
_, err = client.DeleteBlob(ctx, "alpine", digest, nil)
135+
_, err = client.DeleteBlob(ctx, "alpine", alpineBlobDigest, nil)
132136
require.NoError(t, err)
133137
}
134138

@@ -158,10 +162,32 @@ func TestBlobClient_GetBlob(t *testing.T) {
158162
ctx := context.Background()
159163
client, err := NewBlobClient(endpoint, cred, &BlobClientOptions{ClientOptions: options})
160164
require.NoError(t, err)
161-
digest := "sha256:042a816809aac8d0f7d7cacac7965782ee2ecac3f21bcf9f24b1de1a7387b769"
162-
res, err := client.GetBlob(ctx, "alpine", digest, nil)
165+
res, err := client.GetBlob(ctx, "alpine", alpineBlobDigest, nil)
163166
require.NoError(t, err)
164167
require.NotEmpty(t, *res.ContentLength)
168+
reader, err := NewDigestValidationReader(alpineBlobDigest, res.BlobData)
169+
require.NoError(t, err)
170+
_, err = io.ReadAll(reader)
171+
require.NoError(t, err)
172+
}
173+
174+
func TestBlobClient_GetBlob_wrongDigest(t *testing.T) {
175+
srv, closeServer := mock.NewServer()
176+
defer closeServer()
177+
srv.AppendResponse(mock.WithStatusCode(http.StatusOK), mock.WithBody([]byte("test")))
178+
179+
pl := runtime.NewPipeline(moduleName, moduleVersion, runtime.PipelineOptions{}, &policy.ClientOptions{Transport: srv})
180+
client := &BlobClient{
181+
srv.URL(),
182+
pl,
183+
}
184+
ctx := context.Background()
185+
resp, err := client.GetBlob(ctx, "name", "sha256:9f86d081884c7d659a2feaa0c55ad015a3bf4f1b2b0b822cd15d6c15b0f00a08", nil)
186+
require.NoError(t, err)
187+
reader, err := NewDigestValidationReader("sha256:wrong", resp.BlobData)
188+
require.NoError(t, err)
189+
_, err = io.ReadAll(reader)
190+
require.Error(t, err, ErrMismatchedHash)
165191
}
166192

167193
func TestBlobClient_GetBlob_fail(t *testing.T) {
@@ -190,10 +216,27 @@ func TestBlobClient_GetChunk(t *testing.T) {
190216
ctx := context.Background()
191217
client, err := NewBlobClient(endpoint, cred, &BlobClientOptions{ClientOptions: options})
192218
require.NoError(t, err)
193-
digest := "sha256:042a816809aac8d0f7d7cacac7965782ee2ecac3f21bcf9f24b1de1a7387b769"
194-
res, err := client.GetChunk(ctx, "alpine", digest, "bytes=0-999", nil)
219+
chunkSize := 1000
220+
current := 0
221+
blob := bytes.NewBuffer(nil)
222+
for {
223+
res, err := client.GetChunk(ctx, "alpine", alpineBlobDigest, fmt.Sprintf("bytes=%d-%d", current, current+chunkSize-1), nil)
224+
require.NoError(t, err)
225+
chunk, err := io.ReadAll(res.ChunkData)
226+
require.NoError(t, err)
227+
_, err = blob.Write(chunk)
228+
require.NoError(t, err)
229+
totalSize, _ := strconv.Atoi(strings.Split(*res.ContentRange, "/")[1])
230+
currentRangeEnd, _ := strconv.Atoi(strings.Split(strings.Split(*res.ContentRange, "/")[0], "-")[1])
231+
if totalSize == currentRangeEnd+1 {
232+
break
233+
}
234+
current += chunkSize
235+
}
236+
reader, err := NewDigestValidationReader(alpineBlobDigest, blob)
237+
require.NoError(t, err)
238+
_, err = io.ReadAll(reader)
195239
require.NoError(t, err)
196-
require.Equal(t, int64(1000), *res.ContentLength)
197240
}
198241

199242
func TestBlobClient_GetChunk_fail(t *testing.T) {
@@ -247,8 +290,7 @@ func TestBlobClient_MountBlob(t *testing.T) {
247290
ctx := context.Background()
248291
client, err := NewBlobClient(endpoint, cred, &BlobClientOptions{ClientOptions: options})
249292
require.NoError(t, err)
250-
digest := "sha256:042a816809aac8d0f7d7cacac7965782ee2ecac3f21bcf9f24b1de1a7387b769"
251-
res, err := client.MountBlob(ctx, "hello-world", "alpine", digest, nil)
293+
res, err := client.MountBlob(ctx, "hello-world", "alpine", alpineBlobDigest, nil)
252294
require.NoError(t, err)
253295
require.NotEmpty(t, res.Location)
254296
}

sdk/containers/azcontainerregistry/blob_custom_client.go

Lines changed: 3 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -8,16 +8,13 @@ package azcontainerregistry
88

99
import (
1010
"context"
11-
"crypto/sha256"
12-
"encoding"
1311
"errors"
1412
"fmt"
1513
"github.com/Azure/azure-sdk-for-go/sdk/azcore"
1614
"github.com/Azure/azure-sdk-for-go/sdk/azcore/policy"
1715
"github.com/Azure/azure-sdk-for-go/sdk/azcore/runtime"
1816
"github.com/Azure/azure-sdk-for-go/sdk/azcore/streaming"
1917
"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
20-
"hash"
2118
"io"
2219
"reflect"
2320
)
@@ -61,59 +58,6 @@ func NewBlobClient(endpoint string, credential azcore.TokenCredential, options *
6158
}, nil
6259
}
6360

64-
// BlobDigestCalculator help to calculate blob digest when uploading blob.
65-
// Don't use this type directly, use NewBlobDigestCalculator() instead.
66-
type BlobDigestCalculator struct {
67-
h hash.Hash
68-
hashState []byte
69-
}
70-
71-
type wrappedReadSeeker struct {
72-
io.Reader
73-
io.Seeker
74-
}
75-
76-
// NewBlobDigestCalculator creates a new calculator to help to calculate blob digest when uploading blob.
77-
func NewBlobDigestCalculator() *BlobDigestCalculator {
78-
return &BlobDigestCalculator{
79-
h: sha256.New(),
80-
}
81-
}
82-
83-
func (b *BlobDigestCalculator) saveState() {
84-
b.hashState, _ = b.h.(encoding.BinaryMarshaler).MarshalBinary()
85-
}
86-
87-
func (b *BlobDigestCalculator) restoreState() {
88-
if b.hashState == nil {
89-
return
90-
}
91-
_ = b.h.(encoding.BinaryUnmarshaler).UnmarshalBinary(b.hashState)
92-
}
93-
94-
// newLimitTeeReader returns a Reader that writes to w what it reads from r with n bytes limit.
95-
func newLimitTeeReader(r io.Reader, w io.Writer, n int64) io.Reader {
96-
return &limitTeeReader{r, w, n}
97-
}
98-
99-
type limitTeeReader struct {
100-
r io.Reader
101-
w io.Writer
102-
n int64
103-
}
104-
105-
func (lt *limitTeeReader) Read(p []byte) (int, error) {
106-
n, err := lt.r.Read(p)
107-
if n > 0 && lt.n > 0 {
108-
wn, werr := lt.w.Write(p[:n])
109-
if werr != nil {
110-
return wn, werr
111-
}
112-
lt.n -= int64(wn)
113-
}
114-
return n, err
115-
}
116-
11761
// BlobClientUploadChunkOptions contains the optional parameters for the BlobClient.UploadChunk method.
11862
type BlobClientUploadChunkOptions struct {
11963
// Start of range for the blob to be uploaded.
@@ -130,15 +74,11 @@ type BlobClientUploadChunkOptions struct {
13074
// - options - BlobClientUploadChunkOptions contains the optional parameters for the BlobClient.UploadChunk method.
13175
func (client *BlobClient) UploadChunk(ctx context.Context, location string, chunkData io.ReadSeeker, blobDigestCalculator *BlobDigestCalculator, options *BlobClientUploadChunkOptions) (BlobClientUploadChunkResponse, error) {
13276
blobDigestCalculator.saveState()
133-
size, err := chunkData.Seek(0, io.SeekEnd) // Seek to the end to get the stream's size
134-
if err != nil {
135-
return BlobClientUploadChunkResponse{}, err
136-
}
137-
_, err = chunkData.Seek(0, io.SeekStart)
77+
reader, err := blobDigestCalculator.wrapReader(chunkData)
13878
if err != nil {
13979
return BlobClientUploadChunkResponse{}, err
14080
}
141-
wrappedChunkData := &wrappedReadSeeker{Reader: newLimitTeeReader(chunkData, blobDigestCalculator.h, size), Seeker: chunkData}
81+
wrappedChunkData := &wrappedReadSeeker{Reader: reader, Seeker: chunkData}
14282
var requestOptions *blobClientUploadChunkOptions
14383
if options != nil && options.RangeStart != nil && options.RangeEnd != nil {
14484
requestOptions = &blobClientUploadChunkOptions{ContentRange: to.Ptr(fmt.Sprintf("%d-%d", *options.RangeStart, *options.RangeEnd))}
@@ -157,5 +97,5 @@ func (client *BlobClient) UploadChunk(ctx context.Context, location string, chun
15797
// - blobDigestCalculator - Calculator that help to calculate blob digest
15898
// - options - BlobClientCompleteUploadOptions contains the optional parameters for the BlobClient.CompleteUpload method.
15999
func (client *BlobClient) CompleteUpload(ctx context.Context, location string, blobDigestCalculator *BlobDigestCalculator, options *BlobClientCompleteUploadOptions) (BlobClientCompleteUploadResponse, error) {
160-
return client.completeUpload(ctx, fmt.Sprintf("sha256:%x", blobDigestCalculator.h.Sum(nil)), location, options)
100+
return client.completeUpload(ctx, blobDigestCalculator.getDigest(), location, options)
161101
}

sdk/containers/azcontainerregistry/blob_custom_client_test.go

Lines changed: 24 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,7 @@ func TestBlobClient_CompleteUpload(t *testing.T) {
2828
ctx := context.Background()
2929
client, err := NewBlobClient(endpoint, cred, &BlobClientOptions{ClientOptions: options})
3030
require.NoError(t, err)
31-
digest := "sha256:042a816809aac8d0f7d7cacac7965782ee2ecac3f21bcf9f24b1de1a7387b769"
32-
getRes, err := client.GetBlob(ctx, "alpine", digest, nil)
31+
getRes, err := client.GetBlob(ctx, "alpine", alpineBlobDigest, nil)
3332
require.NoError(t, err)
3433
blob, err := io.ReadAll(getRes.BlobData)
3534
require.NoError(t, err)
@@ -49,8 +48,7 @@ func TestBlobClient_UploadChunk(t *testing.T) {
4948
ctx := context.Background()
5049
client, err := NewBlobClient(endpoint, cred, &BlobClientOptions{ClientOptions: options})
5150
require.NoError(t, err)
52-
digest := "sha256:042a816809aac8d0f7d7cacac7965782ee2ecac3f21bcf9f24b1de1a7387b769"
53-
getRes, err := client.GetBlob(ctx, "alpine", digest, nil)
51+
getRes, err := client.GetBlob(ctx, "alpine", alpineBlobDigest, nil)
5452
require.NoError(t, err)
5553
blob, err := io.ReadAll(getRes.BlobData)
5654
require.NoError(t, err)
@@ -70,24 +68,34 @@ func TestBlobClient_CompleteUpload_uploadByChunk(t *testing.T) {
7068
ctx := context.Background()
7169
client, err := NewBlobClient(endpoint, cred, &BlobClientOptions{ClientOptions: options})
7270
require.NoError(t, err)
73-
digest := "sha256:042a816809aac8d0f7d7cacac7965782ee2ecac3f21bcf9f24b1de1a7387b769"
74-
getRes, err := client.GetBlob(ctx, "alpine", digest, nil)
71+
getRes, err := client.GetBlob(ctx, "alpine", alpineBlobDigest, nil)
7572
require.NoError(t, err)
7673
blob, err := io.ReadAll(getRes.BlobData)
7774
require.NoError(t, err)
7875
startRes, err := client.StartUpload(ctx, "hello-world", nil)
7976
require.NoError(t, err)
8077
calculator := NewBlobDigestCalculator()
8178
oriReader := bytes.NewReader(blob)
82-
firstPart := io.NewSectionReader(oriReader, int64(0), int64(len(blob)/2))
83-
secondPart := io.NewSectionReader(oriReader, int64(len(blob)/2), int64(len(blob)-len(blob)/2))
84-
uploadResp, err := client.UploadChunk(ctx, *startRes.Location, firstPart, calculator, &BlobClientUploadChunkOptions{RangeStart: to.Ptr(int32(0)), RangeEnd: to.Ptr(int32(len(blob)/2 - 1))})
85-
require.NoError(t, err)
86-
require.NotEmpty(t, *uploadResp.Location)
87-
uploadResp, err = client.UploadChunk(ctx, *uploadResp.Location, secondPart, calculator, &BlobClientUploadChunkOptions{RangeStart: to.Ptr(int32(len(blob) / 2)), RangeEnd: to.Ptr(int32(len(blob) - 1))})
88-
require.NoError(t, err)
89-
require.NotEmpty(t, *uploadResp.Location)
90-
completeResp, err := client.CompleteUpload(ctx, *uploadResp.Location, calculator, nil)
79+
size := int64(len(blob))
80+
chunkSize := int64(736)
81+
current := int64(0)
82+
location := *startRes.Location
83+
for {
84+
end := current + chunkSize
85+
if end > size {
86+
end = size
87+
}
88+
chunkReader := io.NewSectionReader(oriReader, current, end-current)
89+
uploadResp, err := client.UploadChunk(ctx, location, chunkReader, calculator, &BlobClientUploadChunkOptions{RangeStart: to.Ptr(int32(current)), RangeEnd: to.Ptr(int32(end - 1))})
90+
require.NoError(t, err)
91+
require.NotEmpty(t, *uploadResp.Location)
92+
location = *uploadResp.Location
93+
current = end
94+
if current >= size {
95+
break
96+
}
97+
}
98+
completeResp, err := client.CompleteUpload(ctx, location, calculator, nil)
9199
require.NoError(t, err)
92100
require.NotEmpty(t, *completeResp.DockerContentDigest)
93101
}
@@ -103,28 +111,13 @@ func TestNewBlobClient(t *testing.T) {
103111
require.Errorf(t, err, "provided Cloud field is missing Azure Container Registry configuration")
104112
}
105113

106-
func TestBlobDigestCalculator_saveAndRestoreState(t *testing.T) {
107-
calculator := NewBlobDigestCalculator()
108-
calculator.restoreState()
109-
calculator.saveState()
110-
calculator.restoreState()
111-
calculator.h.Write([]byte("test1"))
112-
sum := calculator.h.Sum(nil)
113-
calculator.saveState()
114-
calculator.h.Write([]byte("test2"))
115-
require.NotEqual(t, sum, calculator.h.Sum(nil))
116-
calculator.restoreState()
117-
require.Equal(t, sum, calculator.h.Sum(nil))
118-
}
119-
120114
func TestBlobClient_CompleteUpload_uploadByChunkFailOver(t *testing.T) {
121115
startRecording(t)
122116
endpoint, cred, options := getEndpointCredAndClientOptions(t)
123117
ctx := context.Background()
124118
client, err := NewBlobClient(endpoint, cred, &BlobClientOptions{ClientOptions: options})
125119
require.NoError(t, err)
126-
digest := "sha256:042a816809aac8d0f7d7cacac7965782ee2ecac3f21bcf9f24b1de1a7387b769"
127-
getRes, err := client.GetBlob(ctx, "alpine", digest, nil)
120+
getRes, err := client.GetBlob(ctx, "alpine", alpineBlobDigest, nil)
128121
require.NoError(t, err)
129122
blob, err := io.ReadAll(getRes.BlobData)
130123
require.NoError(t, err)

0 commit comments

Comments
 (0)