From 1b1d8f0b02e5e6103f2cd3cbf250b8ec4d034ebc Mon Sep 17 00:00:00 2001 From: Xingyao Wang Date: Tue, 24 Sep 2024 15:47:27 -0500 Subject: [PATCH] [eval] Use `imap_unorderd` for parallizing evaluation (#4040) --- evaluation/utils/shared.py | 23 ++++++++++------------- 1 file changed, 10 insertions(+), 13 deletions(-) diff --git a/evaluation/utils/shared.py b/evaluation/utils/shared.py index d385088288..2981e55799 100644 --- a/evaluation/utils/shared.py +++ b/evaluation/utils/shared.py @@ -301,6 +301,11 @@ def _process_instance_wrapper( time.sleep(5) +def _process_instance_wrapper_mp(args): + """Wrapper for multiprocessing, especially for imap_unordered.""" + return _process_instance_wrapper(*args) + + def run_evaluation( dataset: pd.DataFrame, metadata: EvalMetadata | None, @@ -328,21 +333,13 @@ def run_evaluation( try: if use_multiprocessing: with mp.Pool(num_workers) as pool: - results = [ - pool.apply_async( - _process_instance_wrapper, - args=( - process_instance_func, - instance, - metadata, - True, - max_retries, - ), - ) + args_iter = ( + (process_instance_func, instance, metadata, True, max_retries) for _, instance in dataset.iterrows() - ] + ) + results = pool.imap_unordered(_process_instance_wrapper_mp, args_iter) for result in results: - update_progress(result.get(), pbar, output_fp) + update_progress(result, pbar, output_fp) else: for _, instance in dataset.iterrows(): result = _process_instance_wrapper(