@@ -3,7 +3,6 @@ use futures::{
33 stream:: BoxStream ,
44 Future , FutureExt , Stream , StreamExt ,
55} ;
6- use once_cell:: sync:: Lazy ;
76use serde:: Deserialize ;
87use snafu:: prelude:: * ;
98use std:: {
@@ -821,29 +820,60 @@ enum DemultiplexCommand {
821820 ListenOnce ( JobId , oneshot:: Sender < WorkerMessage > ) ,
822821}
823822
823+ #[ derive( Debug , Copy , Clone ) ]
824+ pub struct CoordinatorId {
825+ start : u64 ,
826+ id : u64 ,
827+ }
828+
824829/// Enforces a limited number of concurrent `Coordinator`s.
825830#[ derive( Debug ) ]
826831pub struct CoordinatorFactory {
827832 semaphore : Arc < Semaphore > ,
833+
834+ start : u64 ,
835+ id : AtomicU64 ,
828836}
829837
830838impl CoordinatorFactory {
831839 pub fn new ( maximum : usize ) -> Self {
840+ let semaphore = Arc :: new ( Semaphore :: new ( maximum) ) ;
841+
842+ let now = std:: time:: SystemTime :: now ( ) ;
843+ let start = now
844+ . duration_since ( std:: time:: UNIX_EPOCH )
845+ . unwrap_or_default ( )
846+ . as_secs ( ) ;
847+
848+ let id = AtomicU64 :: new ( 0 ) ;
849+
832850 Self {
833- semaphore : Arc :: new ( Semaphore :: new ( maximum) ) ,
851+ semaphore,
852+ start,
853+ id,
834854 }
835855 }
836856
837- pub async fn build < B > ( & self , backend : B ) -> LimitedCoordinator < B >
857+ fn next_id ( & self ) -> CoordinatorId {
858+ let start = self . start ;
859+ let id = self . id . fetch_add ( 1 , Ordering :: SeqCst ) ;
860+
861+ CoordinatorId { start, id }
862+ }
863+
864+ pub async fn build < B > ( & self ) -> LimitedCoordinator < B >
838865 where
839- B : Backend ,
866+ B : Backend + From < CoordinatorId > ,
840867 {
841868 let semaphore = self . semaphore . clone ( ) ;
842869 let permit = semaphore
843870 . acquire_owned ( )
844871 . await
845872 . expect ( "Unable to acquire permit" ) ;
846873
874+ let id = self . next_id ( ) ;
875+ let backend = B :: from ( id) ;
876+
847877 let coordinator = Coordinator :: new ( backend) ;
848878
849879 LimitedCoordinator {
@@ -1149,12 +1179,6 @@ where
11491179 }
11501180}
11511181
1152- impl Coordinator < DockerBackend > {
1153- pub fn new_docker ( ) -> Self {
1154- Self :: new ( DockerBackend ( ( ) ) )
1155- }
1156- }
1157-
11581182#[ derive( Debug ) ]
11591183struct Container {
11601184 task : JoinHandle < Result < ( ) > > ,
@@ -2581,24 +2605,26 @@ fn basic_secure_docker_command() -> Command {
25812605 )
25822606}
25832607
2584- static DOCKER_BACKEND_START : Lazy < u64 > = Lazy :: new ( || {
2585- use std:: time;
2586-
2587- let now = time:: SystemTime :: now ( ) ;
2588- now. duration_since ( time:: UNIX_EPOCH )
2589- . unwrap_or_default ( )
2590- . as_secs ( )
2591- } ) ;
2592-
2593- static DOCKER_BACKEND_ID : AtomicU64 = AtomicU64 :: new ( 0 ) ;
2608+ pub struct DockerBackend {
2609+ id : CoordinatorId ,
2610+ instance : AtomicU64 ,
2611+ }
25942612
2595- pub struct DockerBackend ( ( ) ) ;
2613+ impl From < CoordinatorId > for DockerBackend {
2614+ fn from ( id : CoordinatorId ) -> Self {
2615+ Self {
2616+ id,
2617+ instance : Default :: default ( ) ,
2618+ }
2619+ }
2620+ }
25962621
25972622impl DockerBackend {
25982623 fn next_name ( & self ) -> String {
2599- let start = * DOCKER_BACKEND_START ;
2600- let id = DOCKER_BACKEND_ID . fetch_add ( 1 , Ordering :: SeqCst ) ;
2601- format ! ( "playground-{start}-{id}" )
2624+ let CoordinatorId { start, id } = self . id ;
2625+ let instance = self . instance . fetch_add ( 1 , Ordering :: SeqCst ) ;
2626+
2627+ format ! ( "playground-{start}-{id}-{instance}" )
26022628 }
26032629}
26042630
@@ -2758,6 +2784,7 @@ fn spawn_io_queue(stdin: ChildStdin, stdout: ChildStdout, token: CancellationTok
27582784mod tests {
27592785 use assertables:: * ;
27602786 use futures:: future:: { join, try_join_all} ;
2787+ use once_cell:: sync:: Lazy ;
27612788 use std:: { env, sync:: Once } ;
27622789 use tempdir:: TempDir ;
27632790
@@ -2781,8 +2808,8 @@ mod tests {
27812808 project_dir : TempDir ,
27822809 }
27832810
2784- impl TestBackend {
2785- fn new ( ) -> Self {
2811+ impl From < CoordinatorId > for TestBackend {
2812+ fn from ( _id : CoordinatorId ) -> Self {
27862813 static COMPILE_WORKER_ONCE : Once = Once :: new ( ) ;
27872814
27882815 COMPILE_WORKER_ONCE . call_once ( || {
@@ -2839,12 +2866,12 @@ mod tests {
28392866 static TEST_COORDINATOR_FACTORY : Lazy < CoordinatorFactory > =
28402867 Lazy :: new ( || CoordinatorFactory :: new ( * MAX_CONCURRENT_TESTS ) ) ;
28412868
2842- async fn new_coordinator_test ( ) -> LimitedCoordinator < impl Backend > {
2843- TEST_COORDINATOR_FACTORY . build ( TestBackend :: new ( ) ) . await
2869+ async fn new_coordinator_test ( ) -> LimitedCoordinator < TestBackend > {
2870+ TEST_COORDINATOR_FACTORY . build ( ) . await
28442871 }
28452872
2846- async fn new_coordinator_docker ( ) -> LimitedCoordinator < impl Backend > {
2847- TEST_COORDINATOR_FACTORY . build ( DockerBackend ( ( ) ) ) . await
2873+ async fn new_coordinator_docker ( ) -> LimitedCoordinator < DockerBackend > {
2874+ TEST_COORDINATOR_FACTORY . build ( ) . await
28482875 }
28492876
28502877 async fn new_coordinator ( ) -> LimitedCoordinator < impl Backend > {
0 commit comments