Skip to content

Commit 3866111

Browse files
authored
[service-bus] pass abortSignal to link initialization and awaitableSender (Azure#15349)
* [service-bus] pass abortSignal to link initialization and awaitableSender * [service-bus] add changelog entry * [core-amqp] remove AsyncLock! * [service-bus] fix test after the great merge * npm run format * fix abort on send after great merge
1 parent 1fea6c0 commit 3866111

File tree

13 files changed

+164
-104
lines changed

13 files changed

+164
-104
lines changed

sdk/core/core-amqp/CHANGELOG.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
# Release History
22

3-
## 3.0.0 (Unreleased)
3+
## 3.0.0 (2021-06-09)
44

55
### Breaking changes
66

77
- Updates the `rhea-promise` and `rhea` dependencies to version 2.x. `rhea` contains a breaking change that changes deserialization of timestamps from numbers to Date objects.
8+
- Removes the `AsyncLock` and `defaultLock` exports. `defaultCancellableLock` should be used instead.
89

910
## 2.3.0 (2021-04-29)
1011

sdk/core/core-amqp/package.json

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,8 +72,6 @@
7272
"@azure/abort-controller": "^1.0.0",
7373
"@azure/core-auth": "^1.3.0",
7474
"@azure/logger": "^1.0.0",
75-
"@types/async-lock": "^1.1.0",
76-
"async-lock": "^1.1.3",
7775
"buffer": "^5.2.1",
7876
"events": "^3.0.0",
7977
"jssha": "^3.1.0",

sdk/core/core-amqp/review/core-amqp.api.md

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
import { AbortSignalLike } from '@azure/abort-controller';
88
import { AccessToken } from '@azure/core-auth';
99
import { AmqpError } from 'rhea-promise';
10-
import AsyncLock from 'async-lock';
1110
import { Connection } from 'rhea-promise';
1211
import { Message } from 'rhea-promise';
1312
import { MessageHeader } from 'rhea-promise';
@@ -91,8 +90,6 @@ export const AmqpMessageProperties: {
9190
fromRheaMessageProperties(props: MessageProperties): AmqpMessageProperties;
9291
};
9392

94-
export { AsyncLock }
95-
9693
// @public
9794
export interface CancellableAsyncLock {
9895
acquire<T = void>(key: string, task: (...args: any[]) => Promise<T>, properties: AcquireLockProperties): Promise<T>;
@@ -368,9 +365,6 @@ export function createSasTokenProvider(data: {
368365
// @public
369366
export const defaultCancellableLock: CancellableAsyncLock;
370367

371-
// @public
372-
export const defaultLock: AsyncLock;
373-
374368
// @public
375369
export function delay<T>(delayInMs: number, abortSignal?: AbortSignalLike, abortErrorMsg?: string, value?: T): Promise<T | void>;
376370

sdk/core/core-amqp/src/index.ts

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,7 @@ export {
3434
delay,
3535
parseConnectionString,
3636
defaultCancellableLock,
37-
defaultLock,
3837
ParsedOutput,
39-
AsyncLock,
4038
WebSocketOptions
4139
} from "./util/utils";
4240
export { AmqpAnnotatedMessage } from "./amqpAnnotatedMessage";

sdk/core/core-amqp/src/util/utils.ts

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,12 @@
11
// Copyright (c) Microsoft Corporation.
22
// Licensed under the MIT license.
33

4-
import AsyncLock from "async-lock";
54
import { AbortError, AbortSignalLike } from "@azure/abort-controller";
65
import { WebSocketImpl } from "rhea-promise";
76
import { isDefined } from "./typeGuards";
87
import { StandardAbortMessage } from "../errors";
98
import { CancellableAsyncLock, CancellableAsyncLockImpl } from "./lock";
109

11-
export { AsyncLock };
1210
/**
1311
* @internal
1412
*
@@ -113,22 +111,6 @@ export function parseConnectionString<T>(connectionString: string): ParsedOutput
113111
return output as any;
114112
}
115113

116-
/**
117-
* @internal
118-
*
119-
* Gets a new instance of the async lock with desired settings.
120-
* @param options - The async lock options.
121-
* @returns AsyncLock
122-
*/
123-
export function getNewAsyncLock(options?: AsyncLockOptions): AsyncLock {
124-
return new AsyncLock(options);
125-
}
126-
127-
/**
128-
* The async lock instance with default settings.
129-
*/
130-
export const defaultLock: AsyncLock = new AsyncLock({ maxPending: 10000 });
131-
132114
/**
133115
* The cancellable async lock instance.
134116
*/

sdk/core/core-amqp/test/cbs.spec.ts

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
import { assert } from "chai";
55
import { AbortController } from "@azure/abort-controller";
6-
import { CbsClient, defaultLock, TokenType } from "../src";
6+
import { CbsClient, defaultCancellableLock, TokenType } from "../src";
77
import { createConnectionStub } from "./utils/createConnectionStub";
88
import { Connection } from "rhea-promise";
99
import { stub } from "sinon";
@@ -38,14 +38,18 @@ describe("CbsClient", function() {
3838

3939
// Make the existing `init` invocation wait until the abortSignal
4040
// is aborted before acquiring it's lock.
41-
await defaultLock.acquire(lock, () => {
42-
return new Promise<void>((resolve) => {
43-
setTimeout(() => {
44-
controller.abort();
45-
resolve();
46-
}, 0);
47-
});
48-
});
41+
await defaultCancellableLock.acquire(
42+
lock,
43+
() => {
44+
return new Promise<void>((resolve) => {
45+
setTimeout(() => {
46+
controller.abort();
47+
resolve();
48+
}, 0);
49+
});
50+
},
51+
{ abortSignal: undefined, timeoutInMs: undefined }
52+
);
4953

5054
try {
5155
await cbsClient.init({ abortSignal: signal });

sdk/servicebus/service-bus/CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@
22

33
## 7.2.0-beta.2 (Unreleased)
44

5+
- Improves cancellation support when sending messages or initializing a connection to the service.
6+
Resolves [#15311](https://github.com/Azure/azure-sdk-for-js/issues/15311) and [#13504](https://github.com/Azure/azure-sdk-for-js/issues/13504).
7+
58
### Bug fixes
69

710
- ServiceBusSender could throw an error (`TypeError: Cannot read property 'maxMessageSize' of undefined`) if a link was being restarted while calling sendMessages().

sdk/servicebus/service-bus/src/core/linkEntity.ts

Lines changed: 76 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import {
55
Constants,
66
TokenType,
7-
defaultLock,
7+
defaultCancellableLock,
88
RequestResponseLink,
99
StandardAbortMessage,
1010
isSasTokenProvider
@@ -217,12 +217,19 @@ export abstract class LinkEntity<LinkT extends Receiver | AwaitableSender | Requ
217217
this._logger.verbose(
218218
`${this._logPrefix} Attempting to acquire lock token ${this._openLock} for initializing link`
219219
);
220-
return defaultLock.acquire(this._openLock, () => {
221-
this._logger.verbose(
222-
`${this._logPrefix} Lock ${this._openLock} acquired for initializing link`
223-
);
224-
return this._initLinkImpl(options, abortSignal);
225-
});
220+
return defaultCancellableLock.acquire(
221+
this._openLock,
222+
() => {
223+
this._logger.verbose(
224+
`${this._logPrefix} Lock ${this._openLock} acquired for initializing link`
225+
);
226+
return this._initLinkImpl(options, abortSignal);
227+
},
228+
{
229+
abortSignal: abortSignal,
230+
timeoutInMs: Constants.defaultOperationTimeoutInMs
231+
}
232+
);
226233
}
227234

228235
private async _initLinkImpl(
@@ -258,7 +265,11 @@ export abstract class LinkEntity<LinkT extends Receiver | AwaitableSender | Requ
258265
);
259266

260267
try {
261-
await this._negotiateClaim();
268+
await this._negotiateClaim({
269+
abortSignal,
270+
setTokenRenewal: false,
271+
timeoutInMs: Constants.defaultOperationTimeoutInMs
272+
});
262273

263274
checkAborted();
264275
this.checkIfConnectionReady();
@@ -324,10 +335,14 @@ export abstract class LinkEntity<LinkT extends Receiver | AwaitableSender | Requ
324335
this._logger.verbose(
325336
`${this._logPrefix} Attempting to acquire lock token ${this._openLock} for closing link`
326337
);
327-
return defaultLock.acquire(this._openLock, () => {
328-
this._logger.verbose(`${this._logPrefix} Lock ${this._openLock} acquired for closing link`);
329-
return this.closeLinkImpl();
330-
});
338+
return defaultCancellableLock.acquire(
339+
this._openLock,
340+
() => {
341+
this._logger.verbose(`${this._logPrefix} Lock ${this._openLock} acquired for closing link`);
342+
return this.closeLinkImpl();
343+
},
344+
{ abortSignal: undefined, timeoutInMs: undefined }
345+
);
331346
}
332347

333348
private async closeLinkImpl(): Promise<void> {
@@ -375,7 +390,15 @@ export abstract class LinkEntity<LinkT extends Receiver | AwaitableSender | Requ
375390
* Negotiates the cbs claim for the ClientEntity.
376391
* @param setTokenRenewal - Set the token renewal timer. Default false.
377392
*/
378-
private async _negotiateClaim(setTokenRenewal?: boolean): Promise<void> {
393+
private async _negotiateClaim({
394+
abortSignal,
395+
setTokenRenewal,
396+
timeoutInMs
397+
}: {
398+
setTokenRenewal: boolean;
399+
abortSignal: AbortSignalLike | undefined;
400+
timeoutInMs: number;
401+
}): Promise<void> {
379402
this._logger.verbose(`${this._logPrefix} negotiateclaim() has been called`);
380403

381404
// Wait for the connectionContext to be ready to open the link.
@@ -394,10 +417,22 @@ export abstract class LinkEntity<LinkT extends Receiver | AwaitableSender | Requ
394417
this.name,
395418
this.address
396419
);
397-
await defaultLock.acquire(this._context.cbsSession.cbsLock, async () => {
398-
this.checkIfConnectionReady();
399-
return this._context.cbsSession.init();
400-
});
420+
421+
const startTime = Date.now();
422+
if (!this._context.cbsSession.isOpen()) {
423+
await defaultCancellableLock.acquire(
424+
this._context.cbsSession.cbsLock,
425+
() => {
426+
this.checkIfConnectionReady();
427+
return this._context.cbsSession.init({ abortSignal, timeoutInMs });
428+
},
429+
{
430+
abortSignal,
431+
timeoutInMs: timeoutInMs - (Date.now() - startTime)
432+
}
433+
);
434+
}
435+
401436
let tokenObject: AccessToken;
402437
let tokenType: TokenType;
403438
if (isSasTokenProvider(this._context.tokenCredential)) {
@@ -433,10 +468,25 @@ export abstract class LinkEntity<LinkT extends Receiver | AwaitableSender | Requ
433468
if (!tokenObject) {
434469
throw new Error("Token cannot be null");
435470
}
436-
await defaultLock.acquire(this._context.negotiateClaimLock, () => {
437-
this.checkIfConnectionReady();
438-
return this._context.cbsSession.negotiateClaim(this.audience, tokenObject.token, tokenType);
439-
});
471+
await defaultCancellableLock.acquire(
472+
this._context.negotiateClaimLock,
473+
() => {
474+
this.checkIfConnectionReady();
475+
return this._context.cbsSession.negotiateClaim(
476+
this.audience,
477+
tokenObject.token,
478+
tokenType,
479+
{
480+
abortSignal,
481+
timeoutInMs: timeoutInMs - (Date.now() - startTime)
482+
}
483+
);
484+
},
485+
{
486+
abortSignal,
487+
timeoutInMs: timeoutInMs - (Date.now() - startTime)
488+
}
489+
);
440490
this._logger.verbose(
441491
"%s Negotiated claim for %s '%s' with with address: %s",
442492
this.logPrefix,
@@ -485,7 +535,11 @@ export abstract class LinkEntity<LinkT extends Receiver | AwaitableSender | Requ
485535
}
486536
this._tokenRenewalTimer = setTimeout(async () => {
487537
try {
488-
await this._negotiateClaim(true);
538+
await this._negotiateClaim({
539+
setTokenRenewal: true,
540+
abortSignal: undefined,
541+
timeoutInMs: Constants.defaultOperationTimeoutInMs
542+
});
489543
} catch (err) {
490544
this._logger.logError(
491545
err,

sdk/servicebus/service-bus/src/core/messageSender.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -253,7 +253,8 @@ export class MessageSender extends LinkEntity<AwaitableSender> {
253253
try {
254254
const delivery = await this.link!.send(encodedMessage, {
255255
format: sendBatch ? 0x80013700 : 0,
256-
timeoutInSeconds: (timeoutInMs - timeTakenByInit - waitTimeForSendable) / 1000
256+
timeoutInSeconds: (timeoutInMs - timeTakenByInit - waitTimeForSendable) / 1000,
257+
abortSignal
257258
});
258259
logger.verbose(
259260
"%s Sender '%s', sent message with delivery id: %d",

0 commit comments

Comments
 (0)