@@ -20,7 +20,7 @@ use tokio::{
2020 join,
2121 process:: { Child , ChildStdin , ChildStdout , Command } ,
2222 select,
23- sync:: { mpsc, oneshot, OnceCell } ,
23+ sync:: { mpsc, oneshot, OnceCell , OwnedSemaphorePermit , Semaphore } ,
2424 task:: { JoinHandle , JoinSet } ,
2525 time:: { self , MissedTickBehavior } ,
2626} ;
@@ -821,6 +821,66 @@ enum DemultiplexCommand {
821821 ListenOnce ( JobId , oneshot:: Sender < WorkerMessage > ) ,
822822}
823823
824+ /// Enforces a limited number of concurrent `Coordinator`s.
825+ #[ derive( Debug ) ]
826+ pub struct CoordinatorFactory {
827+ semaphore : Arc < Semaphore > ,
828+ }
829+
830+ impl CoordinatorFactory {
831+ pub fn new ( maximum : usize ) -> Self {
832+ Self {
833+ semaphore : Arc :: new ( Semaphore :: new ( maximum) ) ,
834+ }
835+ }
836+
837+ pub async fn build < B > ( & self , backend : B ) -> LimitedCoordinator < B >
838+ where
839+ B : Backend ,
840+ {
841+ let semaphore = self . semaphore . clone ( ) ;
842+ let permit = semaphore
843+ . acquire_owned ( )
844+ . await
845+ . expect ( "Unable to acquire permit" ) ;
846+
847+ let coordinator = Coordinator :: new ( backend) ;
848+
849+ LimitedCoordinator {
850+ coordinator,
851+ _permit : permit,
852+ }
853+ }
854+ }
855+
856+ pub struct LimitedCoordinator < T > {
857+ coordinator : Coordinator < T > ,
858+ _permit : OwnedSemaphorePermit ,
859+ }
860+
861+ impl < T > LimitedCoordinator < T >
862+ where
863+ T : Backend ,
864+ {
865+ pub async fn shutdown ( self ) -> Result < T > {
866+ self . coordinator . shutdown ( ) . await
867+ }
868+ }
869+
870+ impl < T > ops:: Deref for LimitedCoordinator < T > {
871+ type Target = Coordinator < T > ;
872+
873+ fn deref ( & self ) -> & Self :: Target {
874+ & self . coordinator
875+ }
876+ }
877+
878+ impl < T > ops:: DerefMut for LimitedCoordinator < T > {
879+ fn deref_mut ( & mut self ) -> & mut Self :: Target {
880+ & mut self . coordinator
881+ }
882+ }
883+
824884#[ derive( Debug ) ]
825885pub struct Coordinator < B > {
826886 backend : B ,
@@ -2700,7 +2760,6 @@ mod tests {
27002760 use futures:: future:: { join, try_join_all} ;
27012761 use std:: { env, sync:: Once } ;
27022762 use tempdir:: TempDir ;
2703- use tokio:: sync:: { OwnedSemaphorePermit , Semaphore } ;
27042763
27052764 use super :: * ;
27062765
@@ -2777,67 +2836,9 @@ mod tests {
27772836 . unwrap_or ( 5 )
27782837 } ) ;
27792838
2780- struct CoordinatorFactory {
2781- semaphore : Arc < Semaphore > ,
2782- }
2783-
2784- impl CoordinatorFactory {
2785- pub fn new ( maximum : usize ) -> Self {
2786- Self {
2787- semaphore : Arc :: new ( Semaphore :: new ( maximum) ) ,
2788- }
2789- }
2790-
2791- pub async fn build < B > ( & self , backend : B ) -> LimitedCoordinator < B >
2792- where
2793- B : Backend ,
2794- {
2795- let semaphore = self . semaphore . clone ( ) ;
2796- let permit = semaphore
2797- . acquire_owned ( )
2798- . await
2799- . expect ( "Unable to acquire permit" ) ;
2800-
2801- let coordinator = Coordinator :: new ( backend) ;
2802-
2803- LimitedCoordinator {
2804- _permit : permit,
2805- coordinator,
2806- }
2807- }
2808- }
2809-
28102839 static TEST_COORDINATOR_FACTORY : Lazy < CoordinatorFactory > =
28112840 Lazy :: new ( || CoordinatorFactory :: new ( * MAX_CONCURRENT_TESTS ) ) ;
28122841
2813- struct LimitedCoordinator < T > {
2814- _permit : OwnedSemaphorePermit ,
2815- coordinator : Coordinator < T > ,
2816- }
2817-
2818- impl < T > LimitedCoordinator < T >
2819- where
2820- T : Backend ,
2821- {
2822- async fn shutdown ( self ) -> super :: Result < T , super :: Error > {
2823- self . coordinator . shutdown ( ) . await
2824- }
2825- }
2826-
2827- impl < T > ops:: Deref for LimitedCoordinator < T > {
2828- type Target = Coordinator < T > ;
2829-
2830- fn deref ( & self ) -> & Self :: Target {
2831- & self . coordinator
2832- }
2833- }
2834-
2835- impl < T > ops:: DerefMut for LimitedCoordinator < T > {
2836- fn deref_mut ( & mut self ) -> & mut Self :: Target {
2837- & mut self . coordinator
2838- }
2839- }
2840-
28412842 async fn new_coordinator_test ( ) -> LimitedCoordinator < impl Backend > {
28422843 TEST_COORDINATOR_FACTORY . build ( TestBackend :: new ( ) ) . await
28432844 }
0 commit comments