1414#
1515
1616import collections
17+ import functools
1718import itertools
1819import os
1920import queue
@@ -395,32 +396,20 @@ def _guarded_task_generation(self, result_job, func, iterable):
395396 yield (result_job , i + 1 , _helper_reraises_exception , (e ,), {})
396397
397398 def _guarded_task_generation_lazy (self , result_job , func , iterable ,
398- lazy_task_gen_helper ):
399- ''' Provides a generator of tasks for imap and imap_unordered with
399+ backpressure_sema ):
400+ """ Provides a generator of tasks for imap and imap_unordered with
400401 appropriate handling for iterables which throw exceptions during
401- iteration.'''
402- if not lazy_task_gen_helper .feature_enabled :
403- yield from self ._guarded_task_generation (result_job , func , iterable )
404- return
405-
402+ iteration."""
406403 try :
407404 i = - 1
408405 enumerated_iter = iter (enumerate (iterable ))
409- thread = threading .current_thread ()
410- max_generated_tasks = self ._processes + lazy_task_gen_helper .buffersize
411-
412- while thread ._state == RUN :
413- with lazy_task_gen_helper .iterator_cond :
414- if lazy_task_gen_helper .not_finished_tasks >= max_generated_tasks :
415- continue # wait for some task to be (picked up and) finished
416-
406+ while True :
407+ backpressure_sema .acquire ()
417408 try :
418- i , x = enumerated_iter . __next__ ( )
409+ i , x = next ( enumerated_iter )
419410 except StopIteration :
420411 break
421-
422412 yield (result_job , i , func , (x ,), {})
423- lazy_task_gen_helper .tasks_generated += 1
424413
425414 except Exception as e :
426415 yield (result_job , i + 1 , _helper_reraises_exception , (e ,), {})
@@ -430,31 +419,32 @@ def imap(self, func, iterable, chunksize=1, buffersize=None):
430419 Equivalent of `map()` -- can be MUCH slower than `Pool.map()`.
431420 '''
432421 self ._check_running ()
422+ if chunksize < 1 :
423+ raise ValueError ("Chunksize must be 1+, not {0:n}" .format (chunksize ))
424+
425+ result = IMapIterator (self , buffersize )
426+
427+ if result ._backpressure_sema is None :
428+ task_generation = self ._guarded_task_generation
429+ else :
430+ task_generation = functools .partial (
431+ self ._guarded_task_generation_lazy ,
432+ backpressure_sema = result ._backpressure_sema ,
433+ )
434+
433435 if chunksize == 1 :
434- result = IMapIterator (self , buffersize )
435436 self ._taskqueue .put (
436437 (
437- self ._guarded_task_generation_lazy (result ._job ,
438- func ,
439- iterable ,
440- result ._lazy_task_gen_helper ),
438+ task_generation (result ._job , func , iterable ),
441439 result ._set_length ,
442440 )
443441 )
444442 return result
445443 else :
446- if chunksize < 1 :
447- raise ValueError (
448- "Chunksize must be 1+, not {0:n}" .format (
449- chunksize ))
450444 task_batches = Pool ._get_tasks (func , iterable , chunksize )
451- result = IMapIterator (self , buffersize )
452445 self ._taskqueue .put (
453446 (
454- self ._guarded_task_generation_lazy (result ._job ,
455- mapstar ,
456- task_batches ,
457- result ._lazy_task_gen_helper ),
447+ task_generation (result ._job , mapstar , task_batches ),
458448 result ._set_length ,
459449 )
460450 )
@@ -465,30 +455,34 @@ def imap_unordered(self, func, iterable, chunksize=1, buffersize=None):
465455 Like `imap()` method but ordering of results is arbitrary.
466456 '''
467457 self ._check_running ()
458+ if chunksize < 1 :
459+ raise ValueError (
460+ "Chunksize must be 1+, not {0!r}" .format (chunksize )
461+ )
462+
463+ result = IMapUnorderedIterator (self , buffersize )
464+
465+ if result ._backpressure_sema is None :
466+ task_generation = self ._guarded_task_generation
467+ else :
468+ task_generation = functools .partial (
469+ self ._guarded_task_generation_lazy ,
470+ backpressure_sema = result ._backpressure_sema ,
471+ )
472+
468473 if chunksize == 1 :
469- result = IMapUnorderedIterator (self , buffersize )
470474 self ._taskqueue .put (
471475 (
472- self ._guarded_task_generation_lazy (result ._job ,
473- func ,
474- iterable ,
475- result ._lazy_task_gen_helper ),
476+ task_generation (result ._job , func , iterable ),
476477 result ._set_length ,
477478 )
478479 )
479480 return result
480481 else :
481- if chunksize < 1 :
482- raise ValueError (
483- "Chunksize must be 1+, not {0!r}" .format (chunksize ))
484482 task_batches = Pool ._get_tasks (func , iterable , chunksize )
485- result = IMapUnorderedIterator (self , buffersize )
486483 self ._taskqueue .put (
487484 (
488- self ._guarded_task_generation_lazy (result ._job ,
489- mapstar ,
490- task_batches ,
491- result ._lazy_task_gen_helper ),
485+ task_generation (result ._job , mapstar , task_batches ),
492486 result ._set_length ,
493487 )
494488 )
@@ -889,7 +883,13 @@ def __init__(self, pool, buffersize):
889883 self ._length = None
890884 self ._unsorted = {}
891885 self ._cache [self ._job ] = self
892- self ._lazy_task_gen_helper = _LazyTaskGenHelper (buffersize , self ._cond )
886+
887+ if buffersize is None :
888+ self ._backpressure_sema = None
889+ else :
890+ self ._backpressure_sema = threading .Semaphore (
891+ value = self ._pool ._processes + buffersize
892+ )
893893
894894 def __iter__ (self ):
895895 return self
@@ -910,7 +910,9 @@ def next(self, timeout=None):
910910 self ._pool = None
911911 raise StopIteration from None
912912 raise TimeoutError from None
913- self ._lazy_task_gen_helper .tasks_finished += 1
913+
914+ if self ._backpressure_sema :
915+ self ._backpressure_sema .release ()
914916
915917 success , value = item
916918 if success :
@@ -959,22 +961,6 @@ def _set(self, i, obj):
959961 del self ._cache [self ._job ]
960962 self ._pool = None
961963
962- #
963- # Class to store stats for lazy task generation and share them
964- # between the main thread and `_guarded_task_generation()` thread.
965- #
966- class _LazyTaskGenHelper (object ):
967- def __init__ (self , buffersize , iterator_cond ):
968- self .feature_enabled = buffersize is not None
969- self .buffersize = buffersize
970- self .tasks_generated = 0
971- self .tasks_finished = 0
972- self .iterator_cond = iterator_cond
973-
974- @property
975- def not_finished_tasks (self ):
976- return self .tasks_generated - self .tasks_finished
977-
978964#
979965#
980966#
0 commit comments