99import com .azure .cosmos .implementation .MetadataDiagnosticsContext ;
1010import com .azure .cosmos .implementation .RxDocumentServiceRequest ;
1111import com .azure .cosmos .implementation .Utils ;
12+ import com .azure .cosmos .implementation .routing .HexConvert ;
13+ import com .azure .cosmos .implementation .routing .Int128 ;
14+ import com .azure .cosmos .implementation .routing .NumberPartitionKeyComponent ;
15+ import com .azure .cosmos .implementation .routing .PartitionKeyInternalHelper ;
1216import com .azure .cosmos .implementation .routing .Range ;
1317import com .azure .cosmos .models .FeedRange ;
18+ import com .azure .cosmos .models .PartitionKeyDefinition ;
19+ import com .azure .cosmos .models .PartitionKeyDefinitionVersion ;
20+ import com .azure .cosmos .models .PartitionKind ;
1421import com .fasterxml .jackson .databind .ObjectMapper ;
1522import com .fasterxml .jackson .databind .annotation .JsonDeserialize ;
1623import org .slf4j .Logger ;
1926
2027import java .io .IOException ;
2128import java .nio .charset .StandardCharsets ;
29+ import java .util .ArrayList ;
2230import java .util .Base64 ;
31+ import java .util .Collections ;
2332import java .util .List ;
2433
34+ import static com .azure .cosmos .implementation .guava25 .base .Preconditions .checkArgument ;
2535import static com .azure .cosmos .implementation .guava25 .base .Preconditions .checkNotNull ;
2636
2737@ JsonDeserialize (using = FeedRangeInternalDeserializer .class )
2838public abstract class FeedRangeInternal extends JsonSerializable implements FeedRange {
2939 private final static Logger LOGGER = LoggerFactory .getLogger (FeedRangeInternal .class );
40+ private final static Long UINT64_TO_DOUBLE_MASK = Long .parseUnsignedLong ("9223372036854775808" );
41+ private final static Long UINT_MAX_VALUE = Long .parseUnsignedLong ("4294967295" );
3042
3143 public static FeedRangeInternal convert (final FeedRange feedRange ) {
3244 checkNotNull (feedRange , "Argument 'feedRange' must not be null" );
@@ -63,11 +75,97 @@ public static FeedRangeInternal fromBase64EncodedJsonString(String base64Encoded
6375 return parsedRange ;
6476 }
6577
66- public abstract Mono <Range <String >> getEffectiveRange (
78+ protected abstract Mono <Range <String >> getEffectiveRange (
6779 IRoutingMapProvider routingMapProvider ,
6880 MetadataDiagnosticsContext metadataDiagnosticsCtx ,
6981 Mono <Utils .ValueHolder <DocumentCollection >> collectionResolutionMono );
7082
83+ // Will return a normalized range with minInclusive and maxExclusive boundaries
84+ public Mono <Range <String >> getNormalizedEffectiveRange (
85+ IRoutingMapProvider routingMapProvider ,
86+ MetadataDiagnosticsContext metadataDiagnosticsCtx ,
87+ Mono <Utils .ValueHolder <DocumentCollection >> collectionResolutionMono
88+ ) {
89+ return Mono .zip (
90+ this .getEffectiveRange (
91+ routingMapProvider ,
92+ metadataDiagnosticsCtx ,
93+ collectionResolutionMono ),
94+ collectionResolutionMono )
95+ .map (tuple -> {
96+ Range <String > effectiveRange = tuple .getT1 ();
97+
98+ if (effectiveRange .isMinInclusive () && !effectiveRange .isMaxInclusive ()) {
99+ return effectiveRange ;
100+ }
101+
102+ Utils .ValueHolder <DocumentCollection > collectionValueHolder = tuple .getT2 ();
103+
104+ if (collectionValueHolder .v == null ) {
105+ throw new IllegalStateException ("Collection should have been resolved." );
106+ }
107+
108+ PartitionKeyDefinition pkDefinition =
109+ collectionValueHolder .v .getPartitionKey ();
110+
111+ PartitionKeyDefinitionVersion effectivePKVersion =
112+ pkDefinition .getVersion () != null
113+ ? pkDefinition .getVersion ()
114+ : PartitionKeyDefinitionVersion .V1 ;
115+
116+ String min ;
117+ String max ;
118+
119+ if (effectiveRange .isMinInclusive ()) {
120+ min = effectiveRange .getMin ();
121+ } else {
122+ min = addToEffectivePartitionKey (effectiveRange .getMin (), -1 , effectivePKVersion );
123+ }
124+
125+ if (!effectiveRange .isMaxInclusive ()) {
126+ max = effectiveRange .getMax ();
127+ } else {
128+ max = addToEffectivePartitionKey (effectiveRange .getMax (), 1 , effectivePKVersion );
129+ }
130+
131+ return new Range <>(min , max , true , false );
132+ });
133+ }
134+
135+ private String addToEffectivePartitionKey (
136+ String effectivePartitionKey ,
137+ int value ,
138+ PartitionKeyDefinitionVersion version ) {
139+
140+ checkArgument (
141+ value == 1 || value == -1 ,
142+ "Argument 'value' has invalid value - only 1 and -1 are allowed" );
143+
144+ byte [] blob = hexBinaryToByteArray (effectivePartitionKey );
145+
146+ if (value == 1 ) {
147+ for (int i = blob .length - 1 ; i >= 0 ; i --) {
148+ if ((0xff & blob [i ]) < 255 ) {
149+ blob [i ] = (byte )((0xff & blob [i ]) + 1 );
150+ break ;
151+ } else {
152+ blob [i ] = 0 ;
153+ }
154+ }
155+ } else {
156+ for (int i = blob .length - 1 ; i >= 0 ; i --) {
157+ if ((0xff & blob [i ]) != 0 ) {
158+ blob [i ] = (byte )((0xff & blob [i ]) - 1 );
159+ break ;
160+ } else {
161+ blob [i ] = (byte )255 ;
162+ }
163+ }
164+ }
165+
166+ return HexConvert .bytesToHex (blob );
167+ }
168+
71169 public abstract Mono <List <String >> getPartitionKeyRanges (
72170 IRoutingMapProvider routingMapProvider ,
73171 RxDocumentServiceRequest request ,
@@ -82,17 +180,6 @@ public void populatePropertyBag() {
82180 setProperties (this , false );
83181 }
84182
85- public abstract void removeProperties (JsonSerializable serializable );
86-
87- public void setProperties (
88- JsonSerializable serializable ,
89- boolean populateProperties ) {
90-
91- if (populateProperties ) {
92- super .populatePropertyBag ();
93- }
94- }
95-
96183 @ Override
97184 public String toString () {
98185 String json = this .toJson ();
@@ -104,6 +191,17 @@ public String toString() {
104191 return Base64 .getUrlEncoder ().encodeToString (json .getBytes (StandardCharsets .UTF_8 ));
105192 }
106193
194+ public abstract void removeProperties (JsonSerializable serializable );
195+
196+ public void setProperties (
197+ JsonSerializable serializable ,
198+ boolean populateProperties ) {
199+
200+ if (populateProperties ) {
201+ super .populatePropertyBag ();
202+ }
203+ }
204+
107205 public static FeedRangeInternal tryParse (final String jsonString ) {
108206 checkNotNull (jsonString , "Argument 'jsonString' must not be null" );
109207 final ObjectMapper mapper = Utils .getSimpleObjectMapper ();
@@ -115,4 +213,217 @@ public static FeedRangeInternal tryParse(final String jsonString) {
115213 return null ;
116214 }
117215 }
216+
217+ public Mono <List <FeedRange >> trySplit (
218+ IRoutingMapProvider routingMapProvider ,
219+ MetadataDiagnosticsContext metadataDiagnosticsCtx ,
220+ Mono <Utils .ValueHolder <DocumentCollection >> collectionResolutionMono ,
221+ int targetedSplitCount ) {
222+
223+ return Mono .zip (
224+ this .getNormalizedEffectiveRange (
225+ routingMapProvider ,
226+ metadataDiagnosticsCtx ,
227+ collectionResolutionMono ),
228+ collectionResolutionMono )
229+ .map (tuple -> {
230+
231+ Range <String > effectiveRange = tuple .getT1 ();
232+ Utils .ValueHolder <DocumentCollection > collectionValueHolder = tuple .getT2 ();
233+
234+ if (collectionValueHolder .v == null ) {
235+ throw new IllegalStateException ("Collection should have been resolved." );
236+ }
237+
238+ PartitionKeyDefinition pkDefinition =
239+ collectionValueHolder .v .getPartitionKey ();
240+
241+ if (targetedSplitCount <= 1 ||
242+ effectiveRange .isSingleValue () ||
243+ // splitting ranges into sub ranges only possible for hash partitioning
244+ pkDefinition .getKind () != PartitionKind .HASH ) {
245+
246+ return Collections .singletonList (new FeedRangeEpkImpl (effectiveRange ));
247+ }
248+
249+ PartitionKeyDefinitionVersion effectivePKVersion =
250+ pkDefinition .getVersion () != null
251+ ? pkDefinition .getVersion ()
252+ : PartitionKeyDefinitionVersion .V1 ;
253+ switch (effectivePKVersion ) {
254+ case V1 :
255+ return trySplitWithHashV1 (effectiveRange , targetedSplitCount );
256+
257+ case V2 :
258+ return trySplitWithHashV2 (effectiveRange , targetedSplitCount );
259+
260+ default :
261+ return Collections .singletonList (new FeedRangeEpkImpl (effectiveRange ));
262+ }
263+ });
264+ }
265+
266+ static List <FeedRange > trySplitWithHashV1 (
267+ Range <String > effectiveRange ,
268+ int targetedSplitCount ) {
269+
270+ long min = 0 ;
271+ long max = UINT_MAX_VALUE ;
272+
273+ if (!effectiveRange .getMin ().equalsIgnoreCase (
274+ PartitionKeyInternalHelper .MinimumInclusiveEffectivePartitionKey )) {
275+
276+ min = fromHexEncodedBinaryString (effectiveRange .getMin ());
277+ }
278+
279+ if (!effectiveRange .getMax ().equalsIgnoreCase (
280+ PartitionKeyInternalHelper .MaximumExclusiveEffectivePartitionKey )) {
281+
282+ max = fromHexEncodedBinaryString (effectiveRange .getMax ());
283+ }
284+
285+ String minRange = effectiveRange .getMin ();
286+ long diff = max - min ;
287+ List <FeedRange > splitFeedRanges = new ArrayList <>(targetedSplitCount );
288+ for (int i = 1 ; i < targetedSplitCount ; i ++) {
289+ long splitPoint = min + (i * (diff / targetedSplitCount ));
290+ String maxRange = PartitionKeyInternalHelper .toHexEncodedBinaryString (
291+ new NumberPartitionKeyComponent [] {
292+ new NumberPartitionKeyComponent (splitPoint )
293+ });
294+ splitFeedRanges .add (
295+ new FeedRangeEpkImpl (
296+ new Range <>(
297+ minRange ,
298+ maxRange ,
299+ i > 1 || effectiveRange .isMinInclusive (),
300+ false )));
301+
302+ minRange = maxRange ;
303+ }
304+
305+ splitFeedRanges .add (
306+ new FeedRangeEpkImpl (
307+ new Range <>(
308+ minRange ,
309+ effectiveRange .getMax (),
310+ true ,
311+ effectiveRange .isMaxInclusive ())));
312+
313+ return splitFeedRanges ;
314+ }
315+
316+ static List <FeedRange > trySplitWithHashV2 (
317+ Range <String > effectiveRange ,
318+ int targetedSplitCount ) {
319+
320+ Int128 min = new Int128 (0 );
321+ if (!effectiveRange .getMin ().equalsIgnoreCase (
322+ PartitionKeyInternalHelper .MinimumInclusiveEffectivePartitionKey )) {
323+
324+ byte [] minBytes = hexBinaryToByteArray (effectiveRange .getMin ());
325+ min = new Int128 (minBytes );
326+ }
327+
328+ Int128 max = PartitionKeyInternalHelper .MaxHashV2Value ;
329+ if (!effectiveRange .getMax ().equalsIgnoreCase (
330+ PartitionKeyInternalHelper .MaximumExclusiveEffectivePartitionKey )) {
331+
332+ byte [] maxBytes = hexBinaryToByteArray (effectiveRange .getMax ());
333+ max = new Int128 (maxBytes );
334+ }
335+
336+ if (Int128 .lt (
337+ Int128 .subtract (max , min ),
338+ new Int128 (targetedSplitCount ))) {
339+
340+ return Collections .singletonList (new FeedRangeEpkImpl (effectiveRange ));
341+ }
342+
343+ String minRange = effectiveRange .getMin ();
344+ Int128 diff = Int128 .subtract (max , min );
345+ Int128 splitCountInt128 = new Int128 (targetedSplitCount );
346+ List <FeedRange > splitFeedRanges = new ArrayList <>(targetedSplitCount );
347+ for (int i = 1 ; i < targetedSplitCount ; i ++) {
348+ byte [] currentBlob = Int128 .add (
349+ min ,
350+ Int128 .multiply (new Int128 (i ), Int128 .div (diff , splitCountInt128 ))
351+ ).bytes ();
352+
353+ String maxRange = HexConvert .bytesToHex (currentBlob );
354+ splitFeedRanges .add (
355+ new FeedRangeEpkImpl (
356+ new Range <>(
357+ minRange ,
358+ maxRange ,
359+ i > 1 || effectiveRange .isMinInclusive (),
360+ false )));
361+
362+ minRange = maxRange ;
363+ }
364+
365+ splitFeedRanges .add (
366+ new FeedRangeEpkImpl (
367+ new Range <>(
368+ minRange ,
369+ effectiveRange .getMax (),
370+ true ,
371+ effectiveRange .isMaxInclusive ())));
372+
373+ return splitFeedRanges ;
374+ }
375+
376+ private static double decodeDoubleFromUInt64Long (long value ) {
377+ value = (value < UINT64_TO_DOUBLE_MASK ) ? -value : value ^ UINT64_TO_DOUBLE_MASK ;
378+ return Double .longBitsToDouble (value );
379+ }
380+
381+ private static long fromHexEncodedBinaryString (String hexBinary ) {
382+ byte [] byteString = hexBinaryToByteArray (hexBinary );
383+ if (byteString .length < 2 || byteString [0 ] != 5 ) {
384+ throw new IllegalStateException ("Invalid hex-byteString" );
385+ }
386+ int byteStringOffset = 1 ;
387+ int offset = 64 ;
388+ long payload = 0 ;
389+
390+ // Decode first 8-bit chunk
391+ offset -= 8 ;
392+ payload |= (((long )byteString [byteStringOffset ++]) & 0x00FF ) << offset ;
393+
394+ // Decode remaining 7-bit chunks
395+ while (true ) {
396+ if (byteStringOffset >= byteString .length ) {
397+ throw new IllegalStateException ("Incorrect byte string without termination" );
398+ }
399+
400+ byte currentByte = byteString [byteStringOffset ++];
401+
402+ offset -= 7 ;
403+ payload |= (((long )(currentByte >> 1 )) & 0x00FF ) << offset ;
404+
405+ if ((currentByte & 0x01 ) == 0 ) {
406+ break ;
407+ }
408+ }
409+
410+ return (long )decodeDoubleFromUInt64Long (payload );
411+ }
412+
413+ private static byte [] hexBinaryToByteArray (String hexBinary ) {
414+ checkNotNull (hexBinary , "Argument 'hexBinary' must not be null." );
415+
416+ int len = hexBinary .length ();
417+ checkArgument (
418+ (len & 0x01 ) == 0 ,
419+ "Argument 'hexBinary' must not have odd number of characters." );
420+
421+ byte [] blob = new byte [len / 2 ];
422+ for (int i = 0 ; i < len ; i += 2 ) {
423+ blob [i / 2 ] = (byte )((Character .digit (hexBinary .charAt (i ), 16 ) << 4 )
424+ + Character .digit (hexBinary .charAt (i + 1 ), 16 ));
425+ }
426+
427+ return blob ;
428+ }
118429}
0 commit comments