@@ -474,21 +474,42 @@ impl RateLimiter {
474474 }
475475 }
476476
477- /// Updates the parameters of the token buckets associated with this RateLimiter.
478- // TODO: Please note that, right now, the buckets become full after being updated.
477+ /// Updates the parameters of the token buckets associated with this `RateLimiter`.
478+ ///
479+ /// When updating bucket parameters, the existing state (`budget`, `one_time_burst`, and
480+ /// `last_update` timestamp) is preserved from the current bucket. The `budget` and
481+ /// `one_time_burst` values are capped at the new bucket's limits if the new limits
482+ /// are smaller than the existing values.
479483 pub fn update_buckets ( & mut self , bytes : BucketUpdate , ops : BucketUpdate ) {
480484 match bytes {
481485 BucketUpdate :: Disabled => self . bandwidth = None ,
482- BucketUpdate :: Update ( tb) => self . bandwidth = Some ( tb) ,
486+ BucketUpdate :: Update ( tb) => {
487+ let updated = Self :: update_bucket ( tb, self . bandwidth . as_ref ( ) ) ;
488+ self . bandwidth = Some ( updated) ;
489+ }
483490 BucketUpdate :: None => ( ) ,
484491 } ;
485492 match ops {
486493 BucketUpdate :: Disabled => self . ops = None ,
487- BucketUpdate :: Update ( tb) => self . ops = Some ( tb) ,
494+ BucketUpdate :: Update ( tb) => {
495+ let updated = Self :: update_bucket ( tb, self . ops . as_ref ( ) ) ;
496+ self . ops = Some ( updated) ;
497+ }
488498 BucketUpdate :: None => ( ) ,
489499 } ;
490500 }
491501
502+ /// Updates a token bucket with preserved state from an existing bucket.
503+ fn update_bucket ( mut new_bucket : TokenBucket , existing : Option < & TokenBucket > ) -> TokenBucket {
504+ if let Some ( existing) = existing {
505+ new_bucket. budget = std:: cmp:: min ( existing. budget , new_bucket. size ) ;
506+ new_bucket. one_time_burst =
507+ std:: cmp:: min ( existing. one_time_burst , new_bucket. initial_one_time_burst ) ;
508+ new_bucket. last_update = existing. last_update ;
509+ }
510+ new_bucket
511+ }
512+
492513 /// Returns an immutable view of the inner bandwidth token bucket.
493514 pub fn bandwidth ( & self ) -> Option < & TokenBucket > {
494515 self . bandwidth . as_ref ( )
@@ -1149,16 +1170,40 @@ pub(crate) mod tests {
11491170 assert ! ( l. consume( 100 , TokenType :: Bytes ) ) ;
11501171 }
11511172
1173+ fn assert_bucket_updated ( current : & TokenBucket , initial : & TokenBucket , new : & TokenBucket ) {
1174+ assert_eq ! (
1175+ (
1176+ current. capacity( ) ,
1177+ current. refill_time_ms( ) ,
1178+ current. initial_one_time_burst( ) ,
1179+ ) ,
1180+ (
1181+ new. capacity( ) ,
1182+ new. refill_time_ms( ) ,
1183+ new. initial_one_time_burst( ) ,
1184+ )
1185+ ) ;
1186+ assert_eq ! (
1187+ current. budget( ) ,
1188+ std:: cmp:: min( initial. budget( ) , new. capacity( ) )
1189+ ) ;
1190+ assert_eq ! (
1191+ current. one_time_burst( ) ,
1192+ std:: cmp:: min( initial. one_time_burst( ) , new. initial_one_time_burst( ) )
1193+ ) ;
1194+ assert_eq ! ( current. get_last_update( ) , initial. get_last_update( ) ) ;
1195+ }
1196+
11521197 #[ test]
11531198 fn test_update_buckets ( ) {
11541199 let mut x = RateLimiter :: new ( 1000 , 2000 , 1000 , 10 , 20 , 1000 ) . unwrap ( ) ;
11551200
1156- let initial_bw = x. bandwidth . clone ( ) ;
1157- let initial_ops = x. ops . clone ( ) ;
1201+ let initial_bw = x. bandwidth . as_ref ( ) . unwrap ( ) . clone ( ) ;
1202+ let initial_ops = x. ops . as_ref ( ) . unwrap ( ) . clone ( ) ;
11581203
11591204 x. update_buckets ( BucketUpdate :: None , BucketUpdate :: None ) ;
1160- assert_eq ! ( x. bandwidth, initial_bw) ;
1161- assert_eq ! ( x. ops, initial_ops) ;
1205+ assert_eq ! ( x. bandwidth. as_ref ( ) , Some ( & initial_bw) ) ;
1206+ assert_eq ! ( x. ops. as_ref ( ) , Some ( & initial_ops) ) ;
11621207
11631208 let new_bw = TokenBucket :: new ( 123 , 0 , 57 ) . unwrap ( ) ;
11641209 let new_ops = TokenBucket :: new ( 321 , 12346 , 89 ) . unwrap ( ) ;
@@ -1167,20 +1212,62 @@ pub(crate) mod tests {
11671212 BucketUpdate :: Update ( new_ops. clone ( ) ) ,
11681213 ) ;
11691214
1170- // We have manually adjust the last_update field, because it changes when update_buckets()
1171- // constructs new buckets (and thus gets a different value for last_update). We do this so
1172- // it makes sense to test the following assertions.
1173- x. bandwidth . as_mut ( ) . unwrap ( ) . last_update = new_bw. last_update ;
1174- x. ops . as_mut ( ) . unwrap ( ) . last_update = new_ops. last_update ;
1175-
1176- assert_eq ! ( x. bandwidth, Some ( new_bw) ) ;
1177- assert_eq ! ( x. ops, Some ( new_ops) ) ;
1215+ let bw = x. bandwidth . as_ref ( ) . unwrap ( ) ;
1216+ let ops = x. ops . as_ref ( ) . unwrap ( ) ;
1217+ assert_bucket_updated ( bw, & initial_bw, & new_bw) ;
1218+ assert_bucket_updated ( ops, & initial_ops, & new_ops) ;
11781219
11791220 x. update_buckets ( BucketUpdate :: Disabled , BucketUpdate :: Disabled ) ;
11801221 assert_eq ! ( x. bandwidth, None ) ;
11811222 assert_eq ! ( x. ops, None ) ;
11821223 }
11831224
1225+ #[ test]
1226+ fn test_update_buckets_preserves_budget_after_consumption ( ) {
1227+ let mut x = RateLimiter :: new ( 1000 , 0 , 1000 , 500 , 0 , 1000 ) . unwrap ( ) ;
1228+
1229+ // Consume some tokens to reduce budget
1230+ assert ! ( x. consume( 300 , TokenType :: Bytes ) ) ;
1231+ assert ! ( x. consume( 200 , TokenType :: Ops ) ) ;
1232+
1233+ let expected_bw_budget = 700 ;
1234+ let expected_ops_budget = 300 ;
1235+ assert_eq ! ( x. bandwidth. as_ref( ) . unwrap( ) . budget( ) , expected_bw_budget) ;
1236+ assert_eq ! ( x. ops. as_ref( ) . unwrap( ) . budget( ) , expected_ops_budget) ;
1237+
1238+ // Update buckets with new parameters
1239+ let new_bw = TokenBucket :: new ( 2000 , 0 , 500 ) . unwrap ( ) ;
1240+ let new_ops = TokenBucket :: new ( 800 , 0 , 500 ) . unwrap ( ) ;
1241+ x. update_buckets (
1242+ BucketUpdate :: Update ( new_bw. clone ( ) ) ,
1243+ BucketUpdate :: Update ( new_ops. clone ( ) ) ,
1244+ ) ;
1245+
1246+ let bw = x. bandwidth . as_ref ( ) . unwrap ( ) ;
1247+ let ops = x. ops . as_ref ( ) . unwrap ( ) ;
1248+ assert_eq ! (
1249+ ( bw. capacity( ) , bw. budget( ) ) ,
1250+ ( new_bw. capacity( ) , expected_bw_budget)
1251+ ) ;
1252+ assert_eq ! (
1253+ ( ops. capacity( ) , ops. budget( ) ) ,
1254+ ( new_ops. capacity( ) , expected_ops_budget)
1255+ ) ;
1256+
1257+ // Test edge case: update with smaller size - budget should be capped
1258+ let mut x2 = RateLimiter :: new ( 1000 , 0 , 1000 , 500 , 0 , 1000 ) . unwrap ( ) ;
1259+ assert ! ( x2. consume( 300 , TokenType :: Bytes ) ) ;
1260+ assert_eq ! ( x2. bandwidth. as_ref( ) . unwrap( ) . budget( ) , 700 ) ;
1261+
1262+ // Update with smaller bucket size
1263+ let smaller_bw = TokenBucket :: new ( 500 , 0 , 1000 ) . unwrap ( ) ;
1264+ x2. update_buckets ( BucketUpdate :: Update ( smaller_bw. clone ( ) ) , BucketUpdate :: None ) ;
1265+
1266+ let bw = x2. bandwidth . as_ref ( ) . unwrap ( ) ;
1267+ assert_eq ! ( bw. capacity( ) , smaller_bw. capacity( ) ) ;
1268+ assert_eq ! ( bw. budget( ) , smaller_bw. capacity( ) ) ;
1269+ }
1270+
11841271 #[ test]
11851272 fn test_rate_limiter_debug ( ) {
11861273 let l = RateLimiter :: new ( 1 , 2 , 3 , 4 , 5 , 6 ) . unwrap ( ) ;
0 commit comments