Skip to content

Commit 4d387d4

Browse files
serkaneripmurgatroid99
authored andcommitted
Use nextMessageToSend for early half-close
1 parent 88a083d commit 4d387d4

File tree

2 files changed

+37
-17
lines changed

2 files changed

+37
-17
lines changed

packages/grpc-js/src/retrying-call.ts

Lines changed: 11 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -123,11 +123,6 @@ interface UnderlyingCall {
123123
state: UnderlyingCallState;
124124
call: LoadBalancingCall;
125125
nextMessageToSend: number;
126-
/**
127-
* Tracks the highest message index that has been sent to the underlying call.
128-
* This is different from nextMessageToSend which tracks completion/acknowledgment.
129-
*/
130-
highestSentMessageIndex: number;
131126
startTime: Date;
132127
}
133128

@@ -700,7 +695,6 @@ export class RetryingCall implements Call, DeadlineInfoProvider {
700695
state: 'ACTIVE',
701696
call: child,
702697
nextMessageToSend: 0,
703-
highestSentMessageIndex: -1,
704698
startTime: new Date(),
705699
});
706700
const previousAttempts = this.attempts - 1;
@@ -793,7 +787,6 @@ export class RetryingCall implements Call, DeadlineInfoProvider {
793787
},
794788
bufferEntry.message!.message
795789
);
796-
childCall.highestSentMessageIndex = messageIndex;
797790
// Optimization: if the next entry is HALF_CLOSE, send it immediately
798791
// without waiting for the message callback. This is safe because the message
799792
// has already been passed to the underlying transport.
@@ -833,7 +826,11 @@ export class RetryingCall implements Call, DeadlineInfoProvider {
833826
};
834827
this.writeBuffer.push(bufferEntry);
835828
if (bufferEntry.allocated) {
836-
context.callback?.();
829+
// Run this in next tick to avoid suspending the current execution context
830+
// otherwise it might cause half closing the call before sending message
831+
process.nextTick(() => {
832+
context.callback?.();
833+
});
837834
for (const [callIndex, call] of this.underlyingCalls.entries()) {
838835
if (
839836
call.state === 'ACTIVE' &&
@@ -848,7 +845,6 @@ export class RetryingCall implements Call, DeadlineInfoProvider {
848845
},
849846
message
850847
);
851-
call.highestSentMessageIndex = messageIndex;
852848
}
853849
}
854850
} else {
@@ -860,7 +856,6 @@ export class RetryingCall implements Call, DeadlineInfoProvider {
860856
const call = this.underlyingCalls[this.committedCallIndex];
861857
bufferEntry.callback = context.callback;
862858
if (call.state === 'ACTIVE' && call.nextMessageToSend === messageIndex) {
863-
call.highestSentMessageIndex = messageIndex;
864859
call.call.sendMessageWithContext(
865860
{
866861
callback: error => {
@@ -891,11 +886,11 @@ export class RetryingCall implements Call, DeadlineInfoProvider {
891886
});
892887
for (const call of this.underlyingCalls) {
893888
if (call?.state === 'ACTIVE') {
894-
// Send halfClose immediately if all messages have been sent to this call
895-
// We check highestSentMessageIndex >= halfCloseIndex - 1 because:
896-
// - If halfCloseIndex is N, the last message is at index N-1
897-
// - If highestSentMessageIndex >= N-1, all messages have been sent
898-
if (call.highestSentMessageIndex >= halfCloseIndex - 1) {
889+
// Send halfClose to call when either:
890+
// - nextMessageToSend === halfCloseIndex - 1: last message sent, callback pending (optimization)
891+
// - nextMessageToSend === halfCloseIndex: all messages sent and acknowledged
892+
if (call.nextMessageToSend === halfCloseIndex
893+
|| call.nextMessageToSend === halfCloseIndex - 1) {
899894
this.trace(
900895
'Sending halfClose immediately to child [' +
901896
call.call.getCallNumber() +
@@ -904,7 +899,7 @@ export class RetryingCall implements Call, DeadlineInfoProvider {
904899
call.nextMessageToSend += 1;
905900
call.call.halfClose();
906901
}
907-
// Otherwise, halfClose will be sent by sendNextChildMessage when messages complete
902+
// Otherwise, halfClose will be sent by sendNextChildMessage when message callbacks complete
908903
}
909904
}
910905
}

packages/grpc-js/test/test-end-to-end.ts

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
import * as assert from 'assert';
1919
import * as path from 'path';
2020
import { loadProtoFile } from './common';
21-
import { Metadata, Server, ServerDuplexStream, ServerUnaryCall, ServiceClientConstructor, ServiceError, experimental, sendUnaryData } from '../src';
21+
import { Metadata, Server, ServerCredentials, ServerDuplexStream, ServerReadableStream, ServerUnaryCall, ServiceClientConstructor, ServiceError, credentials, experimental, sendUnaryData } from '../src';
2222
import { ServiceClient } from '../src/make-client';
2323

2424
const protoFile = path.join(__dirname, 'fixtures', 'echo_service.proto');
@@ -36,6 +36,15 @@ const echoServiceImplementation = {
3636
call.end();
3737
});
3838
},
39+
echoClientStream(call: ServerReadableStream<any, any>, callback: sendUnaryData<any>) {
40+
const messages: any[] = [];
41+
call.on('data', (message: any) => {
42+
messages.push(message);
43+
});
44+
call.on('end', () => {
45+
callback(null, { value: messages.map(m => m.value).join(','), value2: messages.length });
46+
});
47+
},
3948
};
4049

4150
describe('Client should successfully communicate with server', () => {
@@ -77,4 +86,20 @@ describe('Client should successfully communicate with server', () => {
7786
});
7887
});
7988
}).timeout(5000);
89+
90+
it('Client streaming with one message should work', done => {
91+
server = new Server();
92+
server.addService(EchoService.service, echoServiceImplementation);
93+
server.bindAsync('localhost:0', ServerCredentials.createInsecure(), (error, port) => {
94+
assert.ifError(error);
95+
client = new EchoService(`localhost:${port}`, credentials.createInsecure());
96+
const call = client.echoClientStream((error: ServiceError, response: any) => {
97+
assert.ifError(error);
98+
assert.deepStrictEqual(response, { value: 'test value', value2: 1 });
99+
done();
100+
});
101+
call.write({ value: 'test value', value2: 42 });
102+
call.end();
103+
});
104+
});
80105
});

0 commit comments

Comments
 (0)