@@ -17,24 +17,28 @@ struct ElasticManager <: Distributed.ClusterManager
1717 terminated:: Set{Int} # terminated worker ids
1818 topology:: Symbol
1919 sockname
20+ manage_callback
2021 printing_kwargs
2122
22- function ElasticManager (;addr= Sockets. IPv4 (" 127.0.0.1" ), port= 9009 , cookie= nothing , topology= :all_to_all , printing_kwargs= ())
23+ function ElasticManager (;
24+ addr= IPv4 (" 127.0.0.1" ), port= 9009 , cookie= nothing ,
25+ topology= :all_to_all , manage_callback= elastic_no_op_callback, printing_kwargs= ()
26+ )
2327 Distributed. init_multi ()
2428 cookie != = nothing && Distributed. cluster_cookie (cookie)
2529
2630 # Automatically check for the IP address of the local machine
2731 if addr == :auto
2832 try
29- addr = Sockets. getipaddr (Distributed . IPv4)
33+ addr = Sockets. getipaddr (Sockets . IPv4)
3034 catch
3135 error (" Failed to automatically get host's IP address. Please specify `addr=` explicitly." )
3236 end
3337 end
3438
3539 l_sock = Distributed. listen (addr, port)
3640
37- lman = new (Dict {Int, Distributed.WorkerConfig} (), Channel {Sockets.TCPSocket} (typemax (Int)), Set {Int} (), topology, Sockets. getsockname (l_sock), printing_kwargs)
41+ lman = new (Dict {Int, Distributed.WorkerConfig} (), Channel {Sockets.TCPSocket} (typemax (Int)), Set {Int} (), topology, Sockets. getsockname (l_sock), manage_callback, printing_kwargs)
3842
3943 t1 = @async begin
4044 while true
@@ -57,8 +61,10 @@ ElasticManager(port) = ElasticManager(;port=port)
5761ElasticManager (addr, port) = ElasticManager (;addr= addr, port= port)
5862ElasticManager (addr, port, cookie) = ElasticManager (;addr= addr, port= port, cookie= cookie)
5963
64+ elastic_no_op_callback (:: ElasticManager , :: Integer , :: Symbol ) = nothing
6065
6166function process_worker_conn (mgr:: ElasticManager , s:: Sockets.TCPSocket )
67+ @debug " ElasticManager got new worker connection"
6268 # Socket is the worker's STDOUT
6369 wc = Distributed. WorkerConfig ()
6470 wc. io = s
94100function Distributed. launch (mgr:: ElasticManager , params:: Dict , launched:: Array , c:: Condition )
95101 # The workers have already been started.
96102 while isready (mgr. pending)
103+ @debug " ElasticManager.launch new worker"
97104 wc= Distributed. WorkerConfig ()
98105 wc. io = take! (mgr. pending)
99106 push! (launched, wc)
104111
105112function Distributed. manage (mgr:: ElasticManager , id:: Integer , config:: Distributed.WorkerConfig , op:: Symbol )
106113 if op == :register
114+ @debug " ElasticManager registering process id $id "
107115 mgr. active[id] = config
116+ mgr. manage_callback (mgr, id, op)
108117 elseif op == :deregister
118+ @debug " ElasticManager deregistering process id $id "
119+ mgr. manage_callback (mgr, id, op)
109120 delete! (mgr. active, id)
110121 push! (mgr. terminated, id)
111122 end
@@ -138,9 +149,18 @@ function Base.show(io::IO, mgr::ElasticManager)
138149end
139150
140151# Does not return. If executing from a REPL try
141- # @async connect_to_cluster (.....)
152+ # @async elastic_worker (.....)
142153# addr, port that a ElasticManager on the master processes is listening on.
143- function elastic_worker (cookie, addr= " 127.0.0.1" , port= 9009 ; stdout_to_master= true )
154+ function elastic_worker (
155+ cookie:: AbstractString , addr:: AbstractString = " 127.0.0.1" , port:: Integer = 9009 ;
156+ stdout_to_master:: Bool = true ,
157+ Base. @nospecialize (env:: AbstractVector = [],)
158+ )
159+ @debug " ElasticManager.elastic_worker(cookie, $addr , $port ; stdout_to_master=$stdout_to_master , env=$env )"
160+ for (k, v) in env
161+ ENV [k] = v
162+ end
163+
144164 c = connect (addr, port)
145165 write (c, rpad (cookie, HDR_COOKIE_LEN)[1 : HDR_COOKIE_LEN])
146166 stdout_to_master && redirect_stdout (c)
0 commit comments