Skip to content

Commit f6895cb

Browse files
serkaneripmurgatroid99
authored andcommitted
Send halfClose immediately after messages to prevent late halfClose issues with Envoy
1 parent 37f2817 commit f6895cb

File tree

1 file changed

+48
-8
lines changed

1 file changed

+48
-8
lines changed

packages/grpc-js/src/retrying-call.ts

Lines changed: 48 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,15 @@ interface UnderlyingCall {
123123
state: UnderlyingCallState;
124124
call: LoadBalancingCall;
125125
nextMessageToSend: number;
126+
/**
127+
* Tracks the highest message index that has been sent to the underlying call.
128+
* This is different from nextMessageToSend which tracks completion/acknowledgment.
129+
*/
130+
highestSentMessageIndex: number;
131+
/**
132+
* Tracks whether halfClose has been sent to this child call.
133+
*/
134+
halfCloseSent: boolean;
126135
startTime: Date;
127136
}
128137

@@ -695,6 +704,8 @@ export class RetryingCall implements Call, DeadlineInfoProvider {
695704
state: 'ACTIVE',
696705
call: child,
697706
nextMessageToSend: 0,
707+
highestSentMessageIndex: -1,
708+
halfCloseSent: false,
698709
startTime: new Date(),
699710
});
700711
const previousAttempts = this.attempts - 1;
@@ -778,6 +789,7 @@ export class RetryingCall implements Call, DeadlineInfoProvider {
778789
const bufferEntry = this.getBufferEntry(childCall.nextMessageToSend);
779790
switch (bufferEntry.entryType) {
780791
case 'MESSAGE':
792+
childCall.highestSentMessageIndex = childCall.nextMessageToSend;
781793
childCall.call.sendMessageWithContext(
782794
{
783795
callback: error => {
@@ -787,10 +799,26 @@ export class RetryingCall implements Call, DeadlineInfoProvider {
787799
},
788800
bufferEntry.message!.message
789801
);
802+
// Optimization: if the next entry is HALF_CLOSE, send it immediately
803+
// without waiting for the message callback. This is safe because the message
804+
// has already been passed to the underlying transport.
805+
const nextEntry = this.getBufferEntry(childCall.nextMessageToSend + 1);
806+
if (nextEntry.entryType === 'HALF_CLOSE' && !childCall.halfCloseSent) {
807+
this.trace(
808+
'Sending halfClose immediately after message to child [' +
809+
childCall.call.getCallNumber() +
810+
'] - optimizing for unary/final message'
811+
);
812+
childCall.halfCloseSent = true;
813+
childCall.call.halfClose();
814+
}
790815
break;
791816
case 'HALF_CLOSE':
792-
childCall.nextMessageToSend += 1;
793-
childCall.call.halfClose();
817+
if (!childCall.halfCloseSent) {
818+
childCall.nextMessageToSend += 1;
819+
childCall.halfCloseSent = true;
820+
childCall.call.halfClose();
821+
}
794822
break;
795823
case 'FREED':
796824
// Should not be possible
@@ -819,6 +847,7 @@ export class RetryingCall implements Call, DeadlineInfoProvider {
819847
call.state === 'ACTIVE' &&
820848
call.nextMessageToSend === messageIndex
821849
) {
850+
call.highestSentMessageIndex = messageIndex;
822851
call.call.sendMessageWithContext(
823852
{
824853
callback: error => {
@@ -839,6 +868,7 @@ export class RetryingCall implements Call, DeadlineInfoProvider {
839868
const call = this.underlyingCalls[this.committedCallIndex];
840869
bufferEntry.callback = context.callback;
841870
if (call.state === 'ACTIVE' && call.nextMessageToSend === messageIndex) {
871+
call.highestSentMessageIndex = messageIndex;
842872
call.call.sendMessageWithContext(
843873
{
844874
callback: error => {
@@ -868,12 +898,22 @@ export class RetryingCall implements Call, DeadlineInfoProvider {
868898
allocated: false,
869899
});
870900
for (const call of this.underlyingCalls) {
871-
if (
872-
call?.state === 'ACTIVE' &&
873-
call.nextMessageToSend === halfCloseIndex
874-
) {
875-
call.nextMessageToSend += 1;
876-
call.call.halfClose();
901+
if (call?.state === 'ACTIVE' && !call.halfCloseSent) {
902+
// Send halfClose immediately if all messages have been sent to this call
903+
// We check highestSentMessageIndex >= halfCloseIndex - 1 because:
904+
// - If halfCloseIndex is 0, there are no messages, so send immediately
905+
// - If halfCloseIndex is N, the last message is at index N-1
906+
// - If highestSentMessageIndex >= N-1, all messages have been sent
907+
if (halfCloseIndex === 0 || call.highestSentMessageIndex >= halfCloseIndex - 1) {
908+
this.trace(
909+
'Sending halfClose immediately to child [' +
910+
call.call.getCallNumber() +
911+
'] - all messages already sent'
912+
);
913+
call.halfCloseSent = true;
914+
call.call.halfClose();
915+
}
916+
// Otherwise, halfClose will be sent by sendNextChildMessage when messages complete
877917
}
878918
}
879919
}

0 commit comments

Comments
 (0)