@@ -851,16 +851,14 @@ def __init__(self, stream):
851851 # one single shared queue but N bounded queues, one for each producer.
852852 # This is to avoid the situation where a single producer occupies all
853853 # slots, leading to congestions affecting the whole workers pool.
854- # In practice, using a single shared queue, in particular for the
855- # worker -> main
856854
857855 # Input queues for each CPU worker
858856 if not self .stream .reader .read_in_worker :
859857 queue = mp .Queue (2 * num_cpu_workers ) if share_queues else None
860858 for cpu in self .cpu_worker_names :
861859 name = f"from-main_to-stage-0_of-{ cpu } "
862860 if not share_queues :
863- queue = mp .Queue ( 2 )
861+ queue = mp .SimpleQueue ( )
864862 self .data_queues [name ] = queue
865863 self .input_queue_names .append (name )
866864
@@ -872,11 +870,11 @@ def __init__(self, stream):
872870 for stage in range (0 , len (self .stages ) - 1 ):
873871 # Queue to send data from CPU to GPU
874872 name = f"from-{ cpu } _to-stage-{ stage } _of-{ gpu } "
875- self .data_queues [name ] = mp .Queue (2 )
873+ self .data_queues [name ] = mp .Queue ()
876874
877875 # Answer queue from GPU to CPU
878876 name = f"from-{ gpu } _to-stage-{ stage + 1 } _of-{ cpu } "
879- self .data_queues [name ] = mp .Queue (2 )
877+ self .data_queues [name ] = mp .Queue ()
880878
881879 # Final output queue for each CPU worker
882880 name = f"from-{ cpu } _to-main"
0 commit comments