Skip to content

Commit d2f6ad6

Browse files
authored
Cosmos DB: Add single partition query support (Azure#17657)
Adding single partition query support
1 parent cf2a514 commit d2f6ad6

18 files changed

+1293
-86
lines changed

.github/CODEOWNERS

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
/sdk/appconfig/ @antkmsft @seankane-msft @jhendrixMSFT
3333

3434
# PRLabel: %Cosmos
35-
/sdk/data/azcosmos/ @ealsur @ausfeldt
35+
/sdk/data/azcosmos/ @ealsur @kirankumarkolli
3636

3737
# PRLabel: %Tables
3838
/sdk/data/aztables/ @seankane-msft @jhendrixMSFT

sdk/data/azcosmos/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,10 @@
33
## 0.2.1 (Unreleased)
44

55
### Features Added
6+
* Added single partition query support.
67

78
### Breaking Changes
9+
* This module now requires Go 1.18
810

911
### Bugs Fixed
1012

sdk/data/azcosmos/ci.yml

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,14 +32,10 @@ stages:
3232

3333
strategy:
3434
matrix:
35-
Windows_Go116:
35+
Windows_Go118:
3636
pool.name: azsdk-pool-mms-win-2019-general
3737
image.name: MMS2019
38-
go.version: '1.16.7'
39-
Windows_Go117:
40-
pool.name: azsdk-pool-mms-win-2019-general
41-
image.name: MMS2019
42-
go.version: '1.17'
38+
go.version: '1.18'
4339
pool:
4440
name: $(pool.name)
4541
vmImage: $(image.name)

sdk/data/azcosmos/cosmos_client_test.go

Lines changed: 35 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"encoding/json"
99
"io/ioutil"
1010
"net/http"
11+
"net/url"
1112
"testing"
1213

1314
"github.com/Azure/azure-sdk-for-go/sdk/azcore"
@@ -224,8 +225,8 @@ func TestSendDelete(t *testing.T) {
224225
t.Fatal(err)
225226
}
226227

227-
if verifier.method != http.MethodDelete {
228-
t.Errorf("Expected %v, but got %v", http.MethodDelete, verifier.method)
228+
if verifier.requests[0].method != http.MethodDelete {
229+
t.Errorf("Expected %v, but got %v", http.MethodDelete, verifier.requests[0].method)
229230
}
230231
}
231232

@@ -247,8 +248,8 @@ func TestSendGet(t *testing.T) {
247248
t.Fatal(err)
248249
}
249250

250-
if verifier.method != http.MethodGet {
251-
t.Errorf("Expected %v, but got %v", http.MethodGet, verifier.method)
251+
if verifier.requests[0].method != http.MethodGet {
252+
t.Errorf("Expected %v, but got %v", http.MethodGet, verifier.requests[0].method)
252253
}
253254
}
254255

@@ -276,12 +277,12 @@ func TestSendPut(t *testing.T) {
276277
t.Fatal(err)
277278
}
278279

279-
if verifier.method != http.MethodPut {
280-
t.Errorf("Expected %v, but got %v", http.MethodPut, verifier.method)
280+
if verifier.requests[0].method != http.MethodPut {
281+
t.Errorf("Expected %v, but got %v", http.MethodPut, verifier.requests[0].method)
281282
}
282283

283-
if verifier.body != string(marshalled) {
284-
t.Errorf("Expected %v, but got %v", string(marshalled), verifier.body)
284+
if verifier.requests[0].body != string(marshalled) {
285+
t.Errorf("Expected %v, but got %v", string(marshalled), verifier.requests[0].body)
285286
}
286287
}
287288

@@ -309,12 +310,12 @@ func TestSendPost(t *testing.T) {
309310
t.Fatal(err)
310311
}
311312

312-
if verifier.method != http.MethodPost {
313-
t.Errorf("Expected %v, but got %v", http.MethodPost, verifier.method)
313+
if verifier.requests[0].method != http.MethodPost {
314+
t.Errorf("Expected %v, but got %v", http.MethodPost, verifier.requests[0].method)
314315
}
315316

316-
if verifier.body != string(marshalled) {
317-
t.Errorf("Expected %v, but got %v", string(marshalled), verifier.body)
317+
if verifier.requests[0].body != string(marshalled) {
318+
t.Errorf("Expected %v, but got %v", string(marshalled), verifier.requests[0].body)
318319
}
319320
}
320321

@@ -336,37 +337,47 @@ func TestSendQuery(t *testing.T) {
336337
t.Fatal(err)
337338
}
338339

339-
if verifier.method != http.MethodPost {
340-
t.Errorf("Expected %v, but got %v", http.MethodPost, verifier.method)
340+
if verifier.requests[0].method != http.MethodPost {
341+
t.Errorf("Expected %v, but got %v", http.MethodPost, verifier.requests[0].method)
341342
}
342343

343-
if verifier.isQuery != true {
344-
t.Errorf("Expected %v, but got %v", true, verifier.isQuery)
344+
if verifier.requests[0].isQuery != true {
345+
t.Errorf("Expected %v, but got %v", true, verifier.requests[0].isQuery)
345346
}
346347

347-
if verifier.contentType != cosmosHeaderValuesQuery {
348-
t.Errorf("Expected %v, but got %v", cosmosHeaderValuesQuery, verifier.contentType)
348+
if verifier.requests[0].contentType != cosmosHeaderValuesQuery {
349+
t.Errorf("Expected %v, but got %v", cosmosHeaderValuesQuery, verifier.requests[0].contentType)
349350
}
350351

351-
if verifier.body != "{\"query\":\"SELECT * FROM c\"}" {
352-
t.Errorf("Expected %v, but got %v", "{\"query\":\"SELECT * FROM c\"}", verifier.body)
352+
if verifier.requests[0].body != "{\"query\":\"SELECT * FROM c\"}" {
353+
t.Errorf("Expected %v, but got %v", "{\"query\":\"SELECT * FROM c\"}", verifier.requests[0].body)
353354
}
354355
}
355356

356357
type pipelineVerifier struct {
358+
requests []pipelineVerifierRequest
359+
}
360+
361+
type pipelineVerifierRequest struct {
357362
method string
358363
body string
359364
contentType string
360365
isQuery bool
366+
url *url.URL
367+
headers http.Header
361368
}
362369

363370
func (p *pipelineVerifier) Do(req *policy.Request) (*http.Response, error) {
364-
p.method = req.Raw().Method
371+
pr := pipelineVerifierRequest{}
372+
pr.method = req.Raw().Method
373+
pr.url = req.Raw().URL
365374
if req.Body() != nil {
366375
readBody, _ := ioutil.ReadAll(req.Body())
367-
p.body = string(readBody)
376+
pr.body = string(readBody)
368377
}
369-
p.contentType = req.Raw().Header.Get(headerContentType)
370-
p.isQuery = req.Raw().Method == http.MethodPost && req.Raw().Header.Get(cosmosHeaderQuery) == "True"
378+
pr.contentType = req.Raw().Header.Get(headerContentType)
379+
pr.headers = req.Raw().Header
380+
pr.isQuery = req.Raw().Method == http.MethodPost && req.Raw().Header.Get(cosmosHeaderQuery) == "True"
381+
p.requests = append(p.requests, pr)
371382
return req.Next()
372383
}

sdk/data/azcosmos/cosmos_container.go

Lines changed: 59 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ import (
77
"context"
88

99
"github.com/Azure/azure-sdk-for-go/sdk/azcore/policy"
10+
"github.com/Azure/azure-sdk-for-go/sdk/azcore/runtime"
11+
"github.com/Azure/azure-sdk-for-go/sdk/internal/uuid"
1012
)
1113

1214
// ContainerClient lets you perform read, update, change throughput, and delete container operations.
@@ -199,7 +201,7 @@ func (c *ContainerClient) CreateItem(
199201
isWriteOperation: true,
200202
headerOptionsOverride: &h}
201203

202-
path, err := generatePathForNameBased(resourceTypeDocument, c.link, true)
204+
path, err := generatePathForNameBased(resourceTypeDocument, operationContext.resourceAddress, true)
203205
if err != nil {
204206
return ItemResponse{}, err
205207
}
@@ -248,7 +250,7 @@ func (c *ContainerClient) UpsertItem(
248250
isWriteOperation: true,
249251
headerOptionsOverride: &h}
250252

251-
path, err := generatePathForNameBased(resourceTypeDocument, c.link, true)
253+
path, err := generatePathForNameBased(resourceTypeDocument, operationContext.resourceAddress, true)
252254
if err != nil {
253255
return ItemResponse{}, err
254256
}
@@ -399,6 +401,61 @@ func (c *ContainerClient) DeleteItem(
399401
return newItemResponse(azResponse)
400402
}
401403

404+
// NewQueryItemsPager executes a single partition query in a Cosmos container.
405+
// ctx - The context for the request.
406+
// query - The SQL query to execute.
407+
// partitionKey - The partition key to scope the query on.
408+
// o - Options for the operation.
409+
func (c *ContainerClient) NewQueryItemsPager(query string, partitionKey PartitionKey, o *QueryOptions) *runtime.Pager[QueryItemsResponse] {
410+
correlatedActivityId, _ := uuid.New()
411+
h := headerOptionsOverride{
412+
partitionKey: &partitionKey,
413+
correlatedActivityId: &correlatedActivityId,
414+
}
415+
416+
queryOptions := &QueryOptions{}
417+
if o != nil {
418+
originalOptions := *o
419+
queryOptions = &originalOptions
420+
}
421+
422+
operationContext := pipelineRequestOptions{
423+
resourceType: resourceTypeDocument,
424+
resourceAddress: c.link,
425+
headerOptionsOverride: &h,
426+
}
427+
428+
path, _ := generatePathForNameBased(resourceTypeDocument, operationContext.resourceAddress, true)
429+
430+
return runtime.NewPager(runtime.PageProcessor[QueryItemsResponse]{
431+
More: func(page QueryItemsResponse) bool {
432+
return page.ContinuationToken != ""
433+
},
434+
Fetcher: func(ctx context.Context, page *QueryItemsResponse) (QueryItemsResponse, error) {
435+
if page != nil {
436+
if page.ContinuationToken != "" {
437+
// Use the previous page continuation if available
438+
queryOptions.ContinuationToken = page.ContinuationToken
439+
}
440+
}
441+
442+
azResponse, err := c.database.client.sendQueryRequest(
443+
path,
444+
ctx,
445+
query,
446+
operationContext,
447+
queryOptions,
448+
nil)
449+
450+
if err != nil {
451+
return QueryItemsResponse{}, err
452+
}
453+
454+
return newQueryResponse(azResponse)
455+
},
456+
})
457+
}
458+
402459
func (c *ContainerClient) getRID(ctx context.Context) (string, error) {
403460
containerResponse, err := c.Read(ctx, nil)
404461
if err != nil {

0 commit comments

Comments
 (0)