@@ -13,7 +13,7 @@ import {
1313 Message as RheaMessage
1414} from "rhea-promise" ;
1515import {
16- Constants ,
16+ delay ,
1717 ErrorNameConditionMapper ,
1818 RetryConfig ,
1919 RetryOperationType ,
@@ -29,10 +29,10 @@ import { EventHubProducerOptions } from "./models/private";
2929import { SendOptions } from "./models/public" ;
3030
3131import { getRetryAttemptTimeoutInMs } from "./util/retries" ;
32- import { AbortError , AbortSignalLike } from "@azure/abort-controller" ;
32+ import { AbortSignalLike } from "@azure/abort-controller" ;
3333import { EventDataBatch , isEventDataBatch } from "./eventDataBatch" ;
3434import { defaultDataTransformer } from "./dataTransformer" ;
35-
35+ import { waitForTimeoutOrAbortOrResolve } from "./util/timeoutAbortSignalUtils" ;
3636/**
3737 * Describes the EventHubSender that will send event data to EventHub.
3838 * @internal
@@ -217,68 +217,9 @@ export class EventHubSender extends LinkEntity {
217217 abortSignal ?: AbortSignalLike ;
218218 } = { }
219219 ) : Promise < number > {
220- const abortSignal = options . abortSignal ;
221- const retryOptions = options . retryOptions || { } ;
222- if ( this . isOpen ( ) ) {
223- return this . _sender ! . maxMessageSize ;
224- }
225- return new Promise < number > ( async ( resolve , reject ) => {
226- const rejectOnAbort = ( ) : void => {
227- const desc : string = `[${ this . _context . connectionId } ] The create batch operation has been cancelled by the user.` ;
228- // Cancellation is user-intented, so treat as info instead of warning.
229- logger . info ( desc ) ;
230- const error = new AbortError ( `The create batch operation has been cancelled by the user.` ) ;
231- reject ( error ) ;
232- } ;
233-
234- const onAbort = ( ) : void => {
235- if ( abortSignal ) {
236- abortSignal . removeEventListener ( "abort" , onAbort ) ;
237- }
238- rejectOnAbort ( ) ;
239- } ;
240-
241- if ( abortSignal ) {
242- // the aborter may have been triggered between request attempts
243- // so check if it was triggered and reject if needed.
244- if ( abortSignal . aborted ) {
245- return rejectOnAbort ( ) ;
246- }
247- abortSignal . addEventListener ( "abort" , onAbort ) ;
248- }
249- try {
250- logger . verbose (
251- "Acquiring lock %s for initializing the session, sender and " +
252- "possibly the connection." ,
253- this . senderLock
254- ) ;
255- const senderOptions = this . _createSenderOptions ( Constants . defaultOperationTimeoutInMs ) ;
256- await defaultLock . acquire ( this . senderLock , ( ) => {
257- const config : RetryConfig < void > = {
258- operation : ( ) => this . _init ( senderOptions ) ,
259- connectionId : this . _context . connectionId ,
260- operationType : RetryOperationType . senderLink ,
261- abortSignal : abortSignal ,
262- retryOptions : retryOptions
263- } ;
264-
265- return retry < void > ( config ) ;
266- } ) ;
267- resolve ( this . _sender ! . maxMessageSize ) ;
268- } catch ( err ) {
269- logger . warning (
270- "[%s] An error occurred while creating the sender %s" ,
271- this . _context . connectionId ,
272- this . name
273- ) ;
274- logErrorStackTrace ( err ) ;
275- reject ( err ) ;
276- } finally {
277- if ( abortSignal ) {
278- abortSignal . removeEventListener ( "abort" , onAbort ) ;
279- }
280- }
281- } ) ;
220+ await this . _createLinkIfNotOpen ( options ) ;
221+
222+ return this . _sender ! . maxMessageSize ;
282223 }
283224
284225 /**
@@ -390,141 +331,96 @@ export class EventHubSender extends LinkEntity {
390331 * @param rheaMessage - The message to be sent to EventHub.
391332 * @returns Promise<void>
392333 */
393- private _trySendBatch (
334+ private async _trySendBatch (
394335 rheaMessage : RheaMessage | Buffer ,
395336 options : SendOptions & EventHubProducerOptions = { }
396337 ) : Promise < void > {
397338 const abortSignal : AbortSignalLike | undefined = options . abortSignal ;
398339 const retryOptions = options . retryOptions || { } ;
399340 const timeoutInMs = getRetryAttemptTimeoutInMs ( retryOptions ) ;
400341 retryOptions . timeoutInMs = timeoutInMs ;
401- const sendEventPromise = ( ) : Promise < void > =>
402- new Promise < void > ( async ( resolve , reject ) => {
403- const rejectOnAbort = ( ) : void => {
404- const desc : string =
405- `[${ this . _context . connectionId } ] The send operation on the Sender "${ this . name } " with ` +
406- `address "${ this . address } " has been cancelled by the user.` ;
407- // Cancellation is user-intended, so log to info instead of warning.
408- logger . info ( desc ) ;
409- return reject ( new AbortError ( "The send operation has been cancelled by the user." ) ) ;
410- } ;
411342
412- if ( abortSignal && abortSignal . aborted ) {
413- // operation has been cancelled, so exit quickly
414- return rejectOnAbort ( ) ;
415- }
343+ const initStartTime = Date . now ( ) ;
344+ await this . _createLinkIfNotOpen ( options ) ;
345+ const timeTakenByInit = Date . now ( ) - initStartTime ;
416346
417- const removeListeners = ( ) : void => {
418- clearTimeout ( waitTimer ) ; // eslint-disable-line @typescript-eslint/no-use-before-define
419- if ( abortSignal ) {
420- abortSignal . removeEventListener ( "abort" , onAborted ) ; // eslint-disable-line @typescript-eslint/no-use-before-define
421- }
422- } ;
347+ const sendEventPromise = async ( ) : Promise < void > => {
348+ logger . verbose (
349+ "[%s] Sender '%s', credit: %d available: %d" ,
350+ this . _context . connectionId ,
351+ this . name ,
352+ this . _sender ! . credit ,
353+ this . _sender ! . session . outgoing . available ( )
354+ ) ;
423355
424- const onAborted = ( ) : void => {
425- removeListeners ( ) ;
426- return rejectOnAbort ( ) ;
427- } ;
356+ let waitTimeForSendable = 1000 ;
357+ if ( ! this . _sender ! . sendable ( ) && timeoutInMs - timeTakenByInit > waitTimeForSendable ) {
358+ logger . verbose (
359+ "%s Sender '%s', waiting for 1 second for sender to become sendable" ,
360+ this . _context . connectionId ,
361+ this . name
362+ ) ;
428363
429- if ( abortSignal ) {
430- abortSignal . addEventListener ( "abort" , onAborted ) ;
431- }
364+ await delay ( waitTimeForSendable ) ;
432365
433- const actionAfterTimeout = ( ) : void => {
434- removeListeners ( ) ;
435- const desc : string =
436- `[${ this . _context . connectionId } ] Sender "${ this . name } " with ` +
437- `address "${ this . address } ", was not able to send the message right now, due ` +
438- `to operation timeout.` ;
439- logger . warning ( desc ) ;
440- const e : Error = {
441- name : "OperationTimeoutError" ,
442- message : desc
443- } ;
444- return reject ( translate ( e ) ) ;
445- } ;
366+ logger . verbose (
367+ "%s Sender '%s' after waiting for a second, credit: %d available: %d" ,
368+ this . _context . connectionId ,
369+ this . name ,
370+ this . _sender ! . credit ,
371+ this . _sender ! . session ?. outgoing ?. available ( )
372+ ) ;
373+ } else {
374+ waitTimeForSendable = 0 ;
375+ }
446376
447- const waitTimer = setTimeout ( actionAfterTimeout , timeoutInMs ) ;
448- const initStartTime = Date . now ( ) ;
449- if ( ! this . isOpen ( ) ) {
450- logger . verbose (
451- "Acquiring lock %s for initializing the session, sender and " +
452- "possibly the connection." ,
453- this . senderLock
454- ) ;
377+ if ( ! this . _sender ! . sendable ( ) ) {
378+ // let us retry to send the message after some time.
379+ const msg =
380+ `[${ this . _context . connectionId } ] Sender "${ this . name } ", ` +
381+ `cannot send the message right now. Please try later.` ;
382+ logger . warning ( msg ) ;
383+ const amqpError : AmqpError = {
384+ condition : ErrorNameConditionMapper . SenderBusyError ,
385+ description : msg
386+ } ;
387+ throw translate ( amqpError ) ;
388+ }
455389
456- try {
457- const senderOptions = this . _createSenderOptions ( timeoutInMs ) ;
458- await defaultLock . acquire ( this . senderLock , ( ) => {
459- return this . _init ( senderOptions ) ;
460- } ) ;
461- } catch ( err ) {
462- removeListeners ( ) ;
463- const translatedError = translate ( err ) ;
464- logger . warning (
465- "[%s] An error occurred while creating the sender %s: %s" ,
466- this . _context . connectionId ,
467- this . name ,
468- `${ translatedError ?. name } : ${ translatedError ?. message } `
469- ) ;
470- logErrorStackTrace ( translatedError ) ;
471- return reject ( translatedError ) ;
472- }
473- }
474- const timeTakenByInit = Date . now ( ) - initStartTime ;
390+ logger . verbose (
391+ "[%s] Sender '%s', sending message with id '%s'." ,
392+ this . _context . connectionId ,
393+ this . name
394+ ) ;
395+ if ( timeoutInMs <= timeTakenByInit + waitTimeForSendable ) {
396+ const desc : string =
397+ `${ this . _context . connectionId } Sender "${ this . name } " ` +
398+ `with address "${ this . address } ", was not able to send the message right now, due ` +
399+ `to operation timeout.` ;
400+ logger . warning ( desc ) ;
401+ const e : AmqpError = {
402+ condition : ErrorNameConditionMapper . ServiceUnavailableError ,
403+ description : desc
404+ } ;
405+ throw translate ( e ) ;
406+ }
475407
476- logger . verbose (
477- "[%s] Sender '%s', credit: %d available: %d" ,
408+ this . _sender ! . sendTimeoutInSeconds =
409+ ( timeoutInMs - timeTakenByInit - waitTimeForSendable ) / 1000 ;
410+ try {
411+ const delivery = await this . _sender ! . send ( rheaMessage , undefined , 0x80013700 , {
412+ abortSignal
413+ } ) ;
414+ logger . info (
415+ "[%s] Sender '%s', sent message with delivery id: %d" ,
478416 this . _context . connectionId ,
479417 this . name ,
480- this . _sender ! . credit ,
481- this . _sender ! . session . outgoing . available ( )
418+ delivery . id
482419 ) ;
483- if ( this . _sender ! . sendable ( ) ) {
484- logger . verbose (
485- "[%s] Sender '%s', sending message with id '%s'." ,
486- this . _context . connectionId ,
487- this . name
488- ) ;
489- if ( timeoutInMs <= timeTakenByInit ) {
490- actionAfterTimeout ( ) ;
491- return ;
492- }
493- try {
494- this . _sender ! . sendTimeoutInSeconds = ( timeoutInMs - timeTakenByInit ) / 1000 ;
495- const delivery = await this . _sender ! . send ( rheaMessage , undefined , 0x80013700 ) ;
496- logger . info (
497- "[%s] Sender '%s', sent message with delivery id: %d" ,
498- this . _context . connectionId ,
499- this . name ,
500- delivery . id
501- ) ;
502- return resolve ( ) ;
503- } catch ( err ) {
504- const translatedError = translate ( err . innerError || err ) ;
505- logger . warning (
506- "[%s] An error occurred while sending the message %s" ,
507- this . _context . connectionId ,
508- `${ translatedError ?. name } : ${ translatedError ?. message } `
509- ) ;
510- logErrorStackTrace ( translatedError ) ;
511- return reject ( translatedError ) ;
512- } finally {
513- removeListeners ( ) ;
514- }
515- } else {
516- // let us retry to send the message after some time.
517- const msg =
518- `[${ this . _context . connectionId } ] Sender "${ this . name } ", ` +
519- `cannot send the message right now. Please try later.` ;
520- logger . warning ( msg ) ;
521- const amqpError : AmqpError = {
522- condition : ErrorNameConditionMapper . SenderBusyError ,
523- description : msg
524- } ;
525- reject ( translate ( amqpError ) ) ;
526- }
527- } ) ;
420+ } catch ( err ) {
421+ throw err . innerError || err ;
422+ }
423+ } ;
528424
529425 const config : RetryConfig < void > = {
530426 operation : sendEventPromise ,
@@ -533,7 +429,73 @@ export class EventHubSender extends LinkEntity {
533429 abortSignal : abortSignal ,
534430 retryOptions : retryOptions
535431 } ;
536- return retry < void > ( config ) ;
432+
433+ try {
434+ await retry < void > ( config ) ;
435+ } catch ( err ) {
436+ const translatedError = translate ( err ) ;
437+ logger . warning (
438+ "[%s] Sender '%s', An error occurred while sending the message %s" ,
439+ this . _context . connectionId ,
440+ this . name ,
441+ `${ translatedError ?. name } : ${ translatedError ?. message } `
442+ ) ;
443+ logErrorStackTrace ( translatedError ) ;
444+ throw translatedError ;
445+ }
446+ }
447+
448+ private async _createLinkIfNotOpen (
449+ options : {
450+ retryOptions ?: RetryOptions ;
451+ abortSignal ?: AbortSignalLike ;
452+ } = { }
453+ ) : Promise < void > {
454+ if ( this . isOpen ( ) ) {
455+ return ;
456+ }
457+ const retryOptions = options . retryOptions || { } ;
458+ const timeoutInMs = getRetryAttemptTimeoutInMs ( retryOptions ) ;
459+ retryOptions . timeoutInMs = timeoutInMs ;
460+ const senderOptions = this . _createSenderOptions ( timeoutInMs ) ;
461+
462+ const createLinkPromise = async ( ) : Promise < void > => {
463+ return waitForTimeoutOrAbortOrResolve ( {
464+ actionFn : ( ) => {
465+ return defaultLock . acquire ( this . senderLock , ( ) => {
466+ return this . _init ( senderOptions ) ;
467+ } ) ;
468+ } ,
469+ abortSignal : options ?. abortSignal ,
470+ timeoutMs : timeoutInMs ,
471+ timeoutMessage :
472+ `[${ this . _context . connectionId } ] Sender "${ this . name } " ` +
473+ `with address "${ this . address } ", cannot be created right now, due ` +
474+ `to operation timeout.`
475+ } ) ;
476+ } ;
477+
478+ const config : RetryConfig < void > = {
479+ operation : createLinkPromise ,
480+ connectionId : this . _context . connectionId ,
481+ operationType : RetryOperationType . senderLink ,
482+ abortSignal : options . abortSignal ,
483+ retryOptions : retryOptions
484+ } ;
485+
486+ try {
487+ await retry < void > ( config ) ;
488+ } catch ( err ) {
489+ const translatedError = translate ( err ) ;
490+ logger . warning (
491+ "[%s] An error occurred while creating the sender %s: %s" ,
492+ this . _context . connectionId ,
493+ this . name ,
494+ `${ translatedError ?. name } : ${ translatedError ?. message } `
495+ ) ;
496+ logErrorStackTrace ( translatedError ) ;
497+ throw translatedError ;
498+ }
537499 }
538500
539501 /**
0 commit comments