@@ -474,21 +474,42 @@ impl RateLimiter {
474474 }
475475 }
476476
477- /// Updates the parameters of the token buckets associated with this RateLimiter.
478- // TODO: Please note that, right now, the buckets become full after being updated.
477+ /// Updates the parameters of the token buckets associated with this `RateLimiter`.
478+ ///
479+ /// When updating bucket parameters, the existing state (`budget`, `one_time_burst`, and
480+ /// `last_update` timestamp) is preserved from the current bucket. The `budget` and
481+ /// `one_time_burst` values are capped at the new bucket's limits if the new limits
482+ /// are smaller than the existing values.
479483 pub fn update_buckets ( & mut self , bytes : BucketUpdate , ops : BucketUpdate ) {
480484 match bytes {
481485 BucketUpdate :: Disabled => self . bandwidth = None ,
482- BucketUpdate :: Update ( tb) => self . bandwidth = Some ( tb) ,
486+ BucketUpdate :: Update ( tb) => {
487+ let updated = Self :: update_bucket ( tb, self . bandwidth . as_ref ( ) ) ;
488+ self . bandwidth = Some ( updated) ;
489+ }
483490 BucketUpdate :: None => ( ) ,
484491 } ;
485492 match ops {
486493 BucketUpdate :: Disabled => self . ops = None ,
487- BucketUpdate :: Update ( tb) => self . ops = Some ( tb) ,
494+ BucketUpdate :: Update ( tb) => {
495+ let updated = Self :: update_bucket ( tb, self . ops . as_ref ( ) ) ;
496+ self . ops = Some ( updated) ;
497+ }
488498 BucketUpdate :: None => ( ) ,
489499 } ;
490500 }
491501
502+ /// Updates a token bucket with preserved state from an existing bucket.
503+ fn update_bucket ( mut new_bucket : TokenBucket , existing : Option < & TokenBucket > ) -> TokenBucket {
504+ if let Some ( existing) = existing {
505+ new_bucket. budget = std:: cmp:: min ( existing. budget , new_bucket. size ) ;
506+ new_bucket. one_time_burst =
507+ std:: cmp:: min ( existing. one_time_burst , new_bucket. initial_one_time_burst ) ;
508+ new_bucket. last_update = existing. last_update ;
509+ }
510+ new_bucket
511+ }
512+
492513 /// Returns an immutable view of the inner bandwidth token bucket.
493514 pub fn bandwidth ( & self ) -> Option < & TokenBucket > {
494515 self . bandwidth . as_ref ( )
@@ -1167,20 +1188,112 @@ pub(crate) mod tests {
11671188 BucketUpdate :: Update ( new_ops. clone ( ) ) ,
11681189 ) ;
11691190
1170- // We have manually adjust the last_update field, because it changes when update_buckets()
1171- // constructs new buckets (and thus gets a different value for last_update). We do this so
1172- // it makes sense to test the following assertions.
1173- x. bandwidth . as_mut ( ) . unwrap ( ) . last_update = new_bw. last_update ;
1174- x. ops . as_mut ( ) . unwrap ( ) . last_update = new_ops. last_update ;
1191+ // Verify that new bucket parameters are applied
1192+ assert_eq ! ( x. bandwidth. as_ref( ) . unwrap( ) . capacity( ) , new_bw. capacity( ) ) ;
1193+ assert_eq ! (
1194+ x. bandwidth. as_ref( ) . unwrap( ) . refill_time_ms( ) ,
1195+ new_bw. refill_time_ms( )
1196+ ) ;
1197+ assert_eq ! (
1198+ x. bandwidth. as_ref( ) . unwrap( ) . initial_one_time_burst( ) ,
1199+ new_bw. initial_one_time_burst( )
1200+ ) ;
1201+ assert_eq ! ( x. ops. as_ref( ) . unwrap( ) . capacity( ) , new_ops. capacity( ) ) ;
1202+ assert_eq ! (
1203+ x. ops. as_ref( ) . unwrap( ) . refill_time_ms( ) ,
1204+ new_ops. refill_time_ms( )
1205+ ) ;
1206+ assert_eq ! (
1207+ x. ops. as_ref( ) . unwrap( ) . initial_one_time_burst( ) ,
1208+ new_ops. initial_one_time_burst( )
1209+ ) ;
11751210
1176- assert_eq ! ( x. bandwidth, Some ( new_bw) ) ;
1177- assert_eq ! ( x. ops, Some ( new_ops) ) ;
1211+ // Verify that state is preserved (budget is capped at new bucket size if smaller)
1212+ assert_eq ! (
1213+ x. bandwidth. as_ref( ) . unwrap( ) . budget( ) ,
1214+ std:: cmp:: min( initial_bw. as_ref( ) . unwrap( ) . budget( ) , new_bw. capacity( ) )
1215+ ) ;
1216+ assert_eq ! (
1217+ x. bandwidth. as_ref( ) . unwrap( ) . one_time_burst( ) ,
1218+ std:: cmp:: min(
1219+ initial_bw. as_ref( ) . unwrap( ) . one_time_burst( ) ,
1220+ new_bw. initial_one_time_burst( )
1221+ )
1222+ ) ;
1223+ assert_eq ! (
1224+ x. bandwidth. as_ref( ) . unwrap( ) . get_last_update( ) ,
1225+ initial_bw. as_ref( ) . unwrap( ) . get_last_update( )
1226+ ) ;
1227+ assert_eq ! (
1228+ x. ops. as_ref( ) . unwrap( ) . budget( ) ,
1229+ std:: cmp:: min( initial_ops. as_ref( ) . unwrap( ) . budget( ) , new_ops. capacity( ) )
1230+ ) ;
1231+ assert_eq ! (
1232+ x. ops. as_ref( ) . unwrap( ) . one_time_burst( ) ,
1233+ std:: cmp:: min(
1234+ initial_ops. as_ref( ) . unwrap( ) . one_time_burst( ) ,
1235+ new_ops. initial_one_time_burst( )
1236+ )
1237+ ) ;
1238+ assert_eq ! (
1239+ x. ops. as_ref( ) . unwrap( ) . get_last_update( ) ,
1240+ initial_ops. as_ref( ) . unwrap( ) . get_last_update( )
1241+ ) ;
11781242
11791243 x. update_buckets ( BucketUpdate :: Disabled , BucketUpdate :: Disabled ) ;
11801244 assert_eq ! ( x. bandwidth, None ) ;
11811245 assert_eq ! ( x. ops, None ) ;
11821246 }
11831247
1248+ #[ test]
1249+ fn test_update_buckets_preserves_budget_after_consumption ( ) {
1250+ let mut x = RateLimiter :: new ( 1000 , 0 , 1000 , 500 , 0 , 1000 ) . unwrap ( ) ;
1251+
1252+ // Consume some tokens to reduce budget
1253+ assert ! ( x. consume( 300 , TokenType :: Bytes ) ) ;
1254+ assert ! ( x. consume( 200 , TokenType :: Ops ) ) ;
1255+
1256+ // Verify budget was reduced
1257+ assert_eq ! ( x. bandwidth. as_ref( ) . unwrap( ) . budget( ) , 700 ) ;
1258+ assert_eq ! ( x. ops. as_ref( ) . unwrap( ) . budget( ) , 300 ) ;
1259+
1260+ // Save state before update
1261+ let initial_bw = x. bandwidth . as_ref ( ) . unwrap ( ) . clone ( ) ;
1262+ let initial_ops = x. ops . as_ref ( ) . unwrap ( ) . clone ( ) ;
1263+
1264+ // Update buckets with new parameters
1265+ let new_bw = TokenBucket :: new ( 2000 , 0 , 500 ) . unwrap ( ) ;
1266+ let new_ops = TokenBucket :: new ( 800 , 0 , 500 ) . unwrap ( ) ;
1267+ x. update_buckets (
1268+ BucketUpdate :: Update ( new_bw. clone ( ) ) ,
1269+ BucketUpdate :: Update ( new_ops. clone ( ) ) ,
1270+ ) ;
1271+
1272+ // Verify new parameters are applied
1273+ assert_eq ! ( x. bandwidth. as_ref( ) . unwrap( ) . capacity( ) , new_bw. capacity( ) ) ;
1274+ assert_eq ! ( x. ops. as_ref( ) . unwrap( ) . capacity( ) , new_ops. capacity( ) ) ;
1275+
1276+ // Verify budget is preserved (not reset to full)
1277+ assert_eq ! ( x. bandwidth. as_ref( ) . unwrap( ) . budget( ) , initial_bw. budget( ) ) ;
1278+ assert_eq ! ( x. ops. as_ref( ) . unwrap( ) . budget( ) , initial_ops. budget( ) ) ;
1279+
1280+ // Test edge case: update with smaller size - budget should be capped
1281+ let mut x2 = RateLimiter :: new ( 1000 , 0 , 1000 , 500 , 0 , 1000 ) . unwrap ( ) ;
1282+ assert ! ( x2. consume( 300 , TokenType :: Bytes ) ) ;
1283+ assert_eq ! ( x2. bandwidth. as_ref( ) . unwrap( ) . budget( ) , 700 ) ;
1284+
1285+ // Update with smaller bucket size
1286+ let smaller_bw = TokenBucket :: new ( 500 , 0 , 1000 ) . unwrap ( ) ;
1287+ x2. update_buckets ( BucketUpdate :: Update ( smaller_bw. clone ( ) ) , BucketUpdate :: None ) ;
1288+
1289+ // Budget should be capped at new size
1290+ assert_eq ! ( x2. bandwidth. as_ref( ) . unwrap( ) . budget( ) , smaller_bw. budget( ) ) ;
1291+ assert_eq ! (
1292+ x2. bandwidth. as_ref( ) . unwrap( ) . capacity( ) ,
1293+ smaller_bw. capacity( )
1294+ ) ;
1295+ }
1296+
11841297 #[ test]
11851298 fn test_rate_limiter_debug ( ) {
11861299 let l = RateLimiter :: new ( 1 , 2 , 3 , 4 , 5 , 6 ) . unwrap ( ) ;
0 commit comments