Skip to content

Commit d85b5c1

Browse files
committed
fix: ensure mp backend ends nicely and improve throughput
1 parent 5b1b602 commit d85b5c1

File tree

6 files changed

+981
-761
lines changed

6 files changed

+981
-761
lines changed

changelog.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,15 @@
2323

2424
- Sort files before iterating over a standoff or json folder to ensure reproducibility
2525
- Sentence detection now correctly match capitalized letters + apostrophe
26+
- We now ensure that the workers pool is properly closed whatever happens (exception, garbage collection, data ending) in the `multiprocessing` backend. This prevents some executions from hanging indefinitely at the end of the processing.
2627

2728
### Data API changes
2829

2930
- `LazyCollection` objects are now called `Stream` objects
30-
- By default, `multiprocessing` backend now preserves the order of the input data
31+
- By default, `multiprocessing` backend now preserves the order of the input data. To disable this and improve performance, use `deterministic=False` in the `set_processing` method
32+
- :rocket: Parallelized GPU inference throughput improvements !
33+
- For simple {pre-process → model → post-process} pipelines, GPU inference can be up to 30% faster in non-deterministic mode (results can be out of order) and up to 20% faster in deterministic mode (results are in order)
34+
- For multitask pipelines, GPU inference can be up to twice as fast (measured in a two-tasks BERT+NER+Qualif pipeline on T4 and A100 GPUs)
3135
- The `.map_batches`, `.map_pipeline` and `.map_gpu` methods now support a specific `batch_size` and batching function, instead of having a single batch size for all pipes
3236
- Readers now have a `loop` parameter to cycle over the data indefinitely (useful for training)
3337
- Readers now have a `shuffle` parameter to shuffle the data before iterating over it

edsnlp/core/pipeline.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -721,8 +721,6 @@ def preprocess_many(self, docs: Iterable[Doc], compress=True, supervision=True):
721721
"""
722722
res = Stream.ensure_stream(docs)
723723
res = res.map(functools.partial(self.preprocess, supervision=supervision))
724-
if compress:
725-
res = res.map(batch_compress_dict())
726724
return res
727725

728726
def collate(

edsnlp/core/stream.py

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434

3535
from edsnlp import Pipeline
3636
from edsnlp.core.torch_component import TorchComponent
37-
from edsnlp.data.base import BaseReader, BaseWriter
37+
from edsnlp.data.base import BaseReader, BaseWriter, BatchWriter
3838

3939

4040
def deep_isgeneratorfunction(x):
@@ -234,6 +234,9 @@ def __init__(self, prepare_batch, forward, postprocess, elementwise=False):
234234
self.postprocess = postprocess
235235
self.elementwise = elementwise
236236

237+
def __call__(self, *args, **kwargs):
238+
return self.forward(*args, **kwargs)
239+
237240
def batch_process(self, batch):
238241
res = self.forward(self.prepare_batch(batch, None))
239242
return self.postprocess(batch, res) if self.postprocess is not None else res
@@ -275,7 +278,7 @@ class Stream(metaclass=MetaStream):
275278
def __init__(
276279
self,
277280
reader: Optional[BaseReader] = None,
278-
writer: Optional[BaseWriter] = None,
281+
writer: Optional[Union[BaseWriter, BatchWriter]] = None,
279282
ops: List[Any] = [],
280283
config: Dict = {},
281284
):
@@ -446,10 +449,10 @@ def set_processing(
446449
List of GPU devices to use for the CPU workers. Used for debugging purposes.
447450
deterministic: bool
448451
Whether to try and preserve the order of the documents in "multiprocessing"
449-
mode. If set to False, workers will process documents whenever they are
450-
available in a dynamic fashion, which may result in out-of-order processing.
451-
If set to true, tasks will be distributed in a static, round-robin fashion
452-
to workers. Defaults to True.
452+
mode. If set to `False`, workers will process documents whenever they are
453+
available in a dynamic fashion, which may result in out-of-order but usually
454+
faster processing. If set to true, tasks will be distributed in a
455+
static, round-robin fashion to workers. Defaults to `True`.
453456
454457
Returns
455458
-------
@@ -461,8 +464,8 @@ def set_processing(
461464
or kwargs.pop("sort_chunks", INFER) is not INFER
462465
):
463466
warnings.warn(
464-
"""chunk_size and sort_chunks are deprecated, use \
465-
map_batched(sort_fn, batch_size=chunk_size) instead.""",
467+
"chunk_size and sort_chunks are deprecated, use "
468+
"map_batched(sort_fn, batch_size=chunk_size) instead.",
466469
VisibleDeprecationWarning,
467470
)
468471
if kwargs.pop("split_into_batches_after", INFER) is not INFER:

0 commit comments

Comments
 (0)