Skip to content

Commit 279f6bf

Browse files
authored
[service-bus] remove unnecessary wrapper in streaming receiver (Azure#12604)
Fixes Azure#12260 The code that's removed is no longer necessary since the `retry` method is being called in a loop and the loop only calls the user's processError handler after the retries in a single `retry` call are exhausted.
1 parent d16bebc commit 279f6bf

File tree

2 files changed

+3
-101
lines changed

2 files changed

+3
-101
lines changed

sdk/servicebus/service-bus/src/core/streamingReceiver.ts

Lines changed: 1 addition & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -360,26 +360,6 @@ export class StreamingReceiver extends MessageReceiver {
360360
await this._receiverHelper.suspend();
361361
}
362362

363-
/**
364-
* Returns a wrapped function that makes any thrown errors retryable (_except_ for AbortError) by
365-
* wrapping them in a StreamingReceiverError .
366-
*/
367-
private static wrapRetryOperation(fn: () => Promise<void>) {
368-
return async () => {
369-
try {
370-
await fn();
371-
} catch (err) {
372-
if (err.name === "AbortError") {
373-
throw err;
374-
}
375-
376-
// once we're at this point where we can spin up a connection we're past the point
377-
// of fatal errors like the connection string just outright being malformed, for instance.
378-
throw new StreamingReceiverError(err);
379-
}
380-
};
381-
}
382-
383363
/**
384364
* Initializes the link. This method will retry infinitely until a connection is established.
385365
*
@@ -404,7 +384,7 @@ export class StreamingReceiver extends MessageReceiver {
404384
++numRetryCycles;
405385

406386
const config: RetryConfig<void> = {
407-
operation: StreamingReceiver.wrapRetryOperation(() => this._initOnce(args)),
387+
operation: () => this._initOnce(args),
408388
connectionId: args.connectionId,
409389
operationType: RetryOperationType.receiverLink,
410390
// even though we're going to loop infinitely we allow them to control the pattern we use on each
@@ -417,11 +397,6 @@ export class StreamingReceiver extends MessageReceiver {
417397
await this._retry<void>(config);
418398
break;
419399
} catch (err) {
420-
if (StreamingReceiverError.isStreamingReceiverError(err)) {
421-
// report the original error to the user
422-
err = err.originalError;
423-
}
424-
425400
// we only report the error here - this avoids spamming the user with too many
426401
// redundant reports of errors while still providing them incremental status on failures.
427402
args.onError({
@@ -544,28 +519,3 @@ export class StreamingReceiver extends MessageReceiver {
544519
}
545520
}
546521
}
547-
548-
/**
549-
* Wraps an error thrown from the operation for retry<>.
550-
*
551-
* Used so we don't have to worry about modifying the errors that
552-
* also might have originated from the user's processMessage handler,
553-
* which they rightfully own.
554-
*
555-
* @internal
556-
* @ignore
557-
*/
558-
export class StreamingReceiverError extends Error {
559-
constructor(public originalError: Error | MessagingError) {
560-
super(originalError.message);
561-
this.name = "StreamingReceiverError";
562-
}
563-
564-
retryable: boolean = true;
565-
566-
static isStreamingReceiverError(
567-
err: Error | StreamingReceiverError
568-
): err is StreamingReceiverError {
569-
return err.name === "StreamingReceiverError";
570-
}
571-
}

sdk/servicebus/service-bus/test/internal/streamingReceiver.spec.ts

Lines changed: 2 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import chaiAsPromised from "chai-as-promised";
66
import { createConnectionContextForTests } from "./unittestUtils";
77
import { ProcessErrorArgs } from "../../src";
88
import { ReceiveMode } from "../../src/models";
9-
import { StreamingReceiver, StreamingReceiverError } from "../../src/core/streamingReceiver";
9+
import { StreamingReceiver } from "../../src/core/streamingReceiver";
1010
import sinon from "sinon";
1111
import { EventContext } from "rhea-promise";
1212
import { Constants, MessagingError, RetryConfig, RetryMode } from "@azure/core-amqp";
@@ -322,13 +322,7 @@ describe("StreamingReceiver unit tests", () => {
322322
);
323323

324324
++numTimesRetryFnCalled;
325-
if (numTimesRetryFnCalled === 0) {
326-
// add in a little variety - throwing this wrapper error should also
327-
// work just fine and it'll be properly extracted when it's reported to the user's onError handler.
328-
throw new StreamingReceiverError(
329-
new Error(`Error in retry cycle ${numTimesRetryFnCalled}`)
330-
);
331-
} else if (numTimesRetryFnCalled < 4) {
325+
if (numTimesRetryFnCalled < 4) {
332326
throw new Error(`Error in retry cycle ${numTimesRetryFnCalled}`);
333327
} else {
334328
// go ahead and let the retry loop succeed (and it should properly terminate!)
@@ -422,48 +416,6 @@ describe("StreamingReceiver unit tests", () => {
422416
retryable: undefined
423417
});
424418
});
425-
426-
describe("wrapped operation", () => {
427-
it("wrapped operation reports via onError for exceptions.", async () => {
428-
const wrappedOperation = StreamingReceiver["wrapRetryOperation"](async () => {
429-
throw new Error("Normal errors are logged and rethrown.");
430-
});
431-
432-
try {
433-
await wrappedOperation();
434-
assert.fail("Should have thrown");
435-
} catch (err) {
436-
assertError(err, {
437-
name: "StreamingReceiverError",
438-
message: "Normal errors are logged and rethrown.",
439-
// they're also marked as retryable.
440-
retryable: true
441-
});
442-
}
443-
});
444-
445-
it("wrapped operation does throw if it gets an abortError.", async () => {
446-
const wrappedOperation = StreamingReceiver["wrapRetryOperation"](async () => {
447-
//. the user is obviously welcome to pass in and abort their passed in abortSignal.
448-
// Another way this can happen is that LinkEntity.initLink will also throw an AbortError if the link has
449-
// been closed by the user.
450-
throw new AbortError(
451-
"AbortError's should propagate or the link has had .close() called on it"
452-
);
453-
});
454-
455-
try {
456-
await wrappedOperation();
457-
assert.fail("AbortError should have been thrown");
458-
} catch (err) {
459-
assertError(err, {
460-
name: "AbortError",
461-
message: "AbortError's should propagate or the link has had .close() called on it",
462-
retryable: undefined
463-
});
464-
}
465-
});
466-
});
467419
});
468420

469421
it("onDetach calls through to init", async () => {

0 commit comments

Comments
 (0)