9090import java .net .URI ;
9191import java .net .URLEncoder ;
9292import java .nio .ByteBuffer ;
93+ import java .time .Duration ;
9394import java .time .Instant ;
9495import java .util .ArrayList ;
9596import java .util .Arrays ;
@@ -1001,57 +1002,86 @@ private <T> Flux<FeedResponse<T>> createQueryInternal(
10011002 }, Queues .SMALL_BUFFER_SIZE , 1 );
10021003 }
10031004
1005+ private static void applyExceptionToMergedDiagnostics (
1006+ CosmosQueryRequestOptions requestOptions ,
1007+ CosmosException exception ) {
1008+
1009+ List <CosmosDiagnostics > cancelledRequestDiagnostics =
1010+ ImplementationBridgeHelpers
1011+ .CosmosQueryRequestOptionsHelper
1012+ .getCosmosQueryRequestOptionsAccessor ()
1013+ .getCancelledRequestDiagnosticsTracker (requestOptions );
1014+
1015+ // if there is any cancelled requests, collect cosmos diagnostics
1016+ if (cancelledRequestDiagnostics != null && !cancelledRequestDiagnostics .isEmpty ()) {
1017+ // combine all the cosmos diagnostics
1018+ CosmosDiagnostics aggregratedCosmosDiagnostics =
1019+ cancelledRequestDiagnostics
1020+ .stream ()
1021+ .reduce ((first , toBeMerged ) -> {
1022+ ClientSideRequestStatistics clientSideRequestStatistics =
1023+ ImplementationBridgeHelpers
1024+ .CosmosDiagnosticsHelper
1025+ .getCosmosDiagnosticsAccessor ()
1026+ .getClientSideRequestStatisticsRaw (first );
1027+
1028+ ClientSideRequestStatistics toBeMergedClientSideRequestStatistics =
1029+ ImplementationBridgeHelpers
1030+ .CosmosDiagnosticsHelper
1031+ .getCosmosDiagnosticsAccessor ()
1032+ .getClientSideRequestStatisticsRaw (first );
1033+
1034+ if (clientSideRequestStatistics == null ) {
1035+ return toBeMerged ;
1036+ } else {
1037+ clientSideRequestStatistics .mergeClientSideRequestStatistics (toBeMergedClientSideRequestStatistics );
1038+ return first ;
1039+ }
1040+ })
1041+ .get ();
1042+
1043+ BridgeInternal .setCosmosDiagnostics (exception , aggregratedCosmosDiagnostics );
1044+ }
1045+ }
1046+
10041047 private static <T > Flux <FeedResponse <T >> getFeedResponseFluxWithTimeout (
10051048 Flux <FeedResponse <T >> feedResponseFlux ,
10061049 CosmosEndToEndOperationLatencyPolicyConfig endToEndPolicyConfig ,
10071050 CosmosQueryRequestOptions requestOptions ,
10081051 final AtomicBoolean isQueryCancelledOnTimeout ) {
10091052
1053+ Duration endToEndTimeout = endToEndPolicyConfig .getEndToEndOperationTimeout ();
1054+
1055+
1056+ Flux <FeedResponse <T >> flux ;
1057+ if (endToEndTimeout .isNegative ()) {
1058+ return feedResponseFlux
1059+ .timeout (endToEndTimeout )
1060+ .onErrorMap (throwable -> {
1061+ if (throwable instanceof TimeoutException ) {
1062+ CosmosException cancellationException = getNegativeTimeoutException (null , endToEndTimeout );
1063+ cancellationException .setStackTrace (throwable .getStackTrace ());
1064+
1065+ isQueryCancelledOnTimeout .set (true );
1066+
1067+ applyExceptionToMergedDiagnostics (requestOptions , cancellationException );
1068+
1069+ return cancellationException ;
1070+ }
1071+ return throwable ;
1072+ });
1073+ }
1074+
10101075 return feedResponseFlux
1011- .timeout (endToEndPolicyConfig . getEndToEndOperationTimeout () )
1076+ .timeout (endToEndTimeout )
10121077 .onErrorMap (throwable -> {
10131078 if (throwable instanceof TimeoutException ) {
10141079 CosmosException exception = new OperationCancelledException ();
10151080 exception .setStackTrace (throwable .getStackTrace ());
10161081
10171082 isQueryCancelledOnTimeout .set (true );
10181083
1019- List <CosmosDiagnostics > cancelledRequestDiagnostics =
1020- ImplementationBridgeHelpers
1021- .CosmosQueryRequestOptionsHelper
1022- .getCosmosQueryRequestOptionsAccessor ()
1023- .getCancelledRequestDiagnosticsTracker (requestOptions );
1024-
1025- // if there is any cancelled requests, collect cosmos diagnostics
1026- if (cancelledRequestDiagnostics != null && !cancelledRequestDiagnostics .isEmpty ()) {
1027- // combine all the cosmos diagnostics
1028- CosmosDiagnostics aggregratedCosmosDiagnostics =
1029- cancelledRequestDiagnostics
1030- .stream ()
1031- .reduce ((first , toBeMerged ) -> {
1032- ClientSideRequestStatistics clientSideRequestStatistics =
1033- ImplementationBridgeHelpers
1034- .CosmosDiagnosticsHelper
1035- .getCosmosDiagnosticsAccessor ()
1036- .getClientSideRequestStatisticsRaw (first );
1037-
1038- ClientSideRequestStatistics toBeMergedClientSideRequestStatistics =
1039- ImplementationBridgeHelpers
1040- .CosmosDiagnosticsHelper
1041- .getCosmosDiagnosticsAccessor ()
1042- .getClientSideRequestStatisticsRaw (first );
1043-
1044- if (clientSideRequestStatistics == null ) {
1045- return toBeMerged ;
1046- } else {
1047- clientSideRequestStatistics .mergeClientSideRequestStatistics (toBeMergedClientSideRequestStatistics );
1048- return first ;
1049- }
1050- })
1051- .get ();
1052-
1053- BridgeInternal .setCosmosDiagnostics (exception , aggregratedCosmosDiagnostics );
1054- }
1084+ applyExceptionToMergedDiagnostics (requestOptions , exception );
10551085
10561086 return exception ;
10571087 }
@@ -2022,9 +2052,15 @@ private static <T> Mono<T> getRxDocumentServiceResponseMonoWithE2ETimeout(RxDocu
20222052 CosmosEndToEndOperationLatencyPolicyConfig endToEndPolicyConfig ,
20232053 Mono <T > rxDocumentServiceResponseMono ) {
20242054 if (endToEndPolicyConfig != null && endToEndPolicyConfig .isEnabled ()) {
2055+
2056+ Duration endToEndTimeout = endToEndPolicyConfig .getEndToEndOperationTimeout ();
2057+ if (endToEndTimeout .isNegative ()) {
2058+ return Mono .error (getNegativeTimeoutException (request , endToEndTimeout ));
2059+ }
2060+
20252061 request .requestContext .setEndToEndOperationLatencyPolicyConfig (endToEndPolicyConfig );
20262062 return rxDocumentServiceResponseMono
2027- .timeout (endToEndPolicyConfig . getEndToEndOperationTimeout () )
2063+ .timeout (endToEndTimeout )
20282064 .onErrorMap (throwable -> getCancellationException (request , throwable ));
20292065 }
20302066 return rxDocumentServiceResponseMono ;
@@ -2045,6 +2081,23 @@ private static Throwable getCancellationException(RxDocumentServiceRequest reque
20452081 return throwable ;
20462082 }
20472083
2084+ private static CosmosException getNegativeTimeoutException (RxDocumentServiceRequest request , Duration negativeTimeout ) {
2085+ checkNotNull (negativeTimeout , "Argument 'negativeTimeout' must not be null" );
2086+ checkArgument (
2087+ negativeTimeout .isNegative (),
2088+ "This exception should only be used for negative timeouts" );
2089+
2090+ String message = String .format ("Negative timeout '%s' provided." , negativeTimeout );
2091+ CosmosException exception = new OperationCancelledException (message , null );
2092+ BridgeInternal .setSubStatusCode (exception , HttpConstants .SubStatusCodes .NEGATIVE_TIMEOUT_PROVIDED );
2093+ if (request != null && request .requestContext != null ) {
2094+ request .requestContext .setIsRequestCancelledOnTimeout (new AtomicBoolean (true ));
2095+ return BridgeInternal .setCosmosDiagnostics (exception , request .requestContext .cosmosDiagnostics );
2096+ }
2097+
2098+ return exception ;
2099+ }
2100+
20482101 @ Override
20492102 public Mono <ResourceResponse <Document >> upsertDocument (String collectionLink , Object document ,
20502103 RequestOptions options , boolean disableAutomaticIdGeneration ) {
0 commit comments