Source code for hynet.utilities.worker

"""
Management of worker processes in *hynet*.

Parameters
----------
workers : WorkerManager
    Worker manager created at import time for parallel processing within the
    *hynet* package.
"""

import logging
from os import cpu_count
import multiprocessing
import numbers
import functools
from itertools import starmap

import tqdm

from hynet.utilities.base import Timer
import hynet.config as config

_log = logging.getLogger(__name__)


[docs]def pool_operation(method): """ Decorates a worker operation method with a pool state verification. The worker manager maintains its pool of workers lazily, i.e., it is only created on demand. Due to this, every potential pool operation in the worker manager class has to verify and, if necessary, update the pool state prior to the operation itself. This decorator manages this task. In the worker operation method, the code only needs to adapt to the availability of the pool. """ @functools.wraps(method) def wrapper(self, *args, **kwargs): self._check_pool() return method(self, *args, **kwargs) return wrapper
[docs]class WorkerManager: """ Manage operations on a pool of workers. This class provides operations on a pool of worker processes in case that parallel processing in *hynet* is enabled and, otherwise, it performs the operations in the current process. The pool of workers is maintained lazily, i.e., it is created on demand. See Also -------- hynet.config """ def __init__(self): self._pool = None self._num_workers = cpu_count() if self._num_workers is None: self._num_workers = 1 @property def num_workers(self): """ Return the number of worker processes. **These workers are only active if parallel processing is enabled.** See Also -------- hynet.config """ return self._num_workers @num_workers.setter def num_workers(self, value): """ Set the number of worker processes. **These workers are only active if parallel processing is enabled.** If a pool with a different number of worker processes is already open, it is closed immediately and reopened on demand with the updated number of workers. See Also -------- hynet.config """ if not isinstance(value, numbers.Integral) or value < 1: raise ValueError("The number of workers must be a positive integer.") if value == self._num_workers: return self._num_workers = value self.close_pool() def _open_pool(self): """Open a pool of worker processes.""" if self._pool is not None: raise RuntimeError("There is already an open pool of workers.") if self._num_workers == 1: _log.debug("Skipping pool creation due to use of a single worker.") return timer = Timer() self._pool = multiprocessing.Pool(processes=self._num_workers) _log.debug("Opened pool with {:d} workers ({:.3f} sec.)" .format(self._num_workers, timer.total()))
[docs] def close_pool(self): """Close the pool of worker processes.""" if self._pool is None: return timer = Timer() self._pool.close() self._pool.join() self._pool = None _log.debug("Closed worker pool ({:.3f} sec.)".format(timer.total()))
def _check_pool(self): """ Verify and, if necessary, update the state of the worker pool. See Also -------- hynet.utilities.worker.pool_operation: Decorator for a worker operation method. """ if config.GENERAL['parallelize']: if self._pool is None: self._open_pool() else: if self._pool is not None: _log.debug("Parallel processing was disabled. Closing the pool.") self.close_pool() @property def is_multiprocessing(self): """Return True if the operations utilize multiprocessing.""" self._check_pool() return self._pool is not None
[docs] @pool_operation def map(self, func, iterable, show_progress=False, unit="it", **kwds): """ Apply the function to every item (single argument) in the iterable. Parameters ---------- func : function Function that shall be applied to the items in the iterable. The function object must support pickling. iterable : iterable Iterable containing the function arguments. An item in this iterable is the single argument passed to the function. The iterable must support ``len(iterable)``. show_progress : bool, optional If ``True`` (default ``False``), the progress is reported to the standard output. unit : str, optional Name of one unit of iteration for the progress visualization (default ``it``). kwds : dict, optional If parallel processing is enabled, the keyword arguments are passed to the ``map``-method of ``multiprocessing.Pool``. Returns ------- result : list List of the function result for every item in the iterable. See Also -------- multiprocessing.Pool.map """ if self._pool is None: return list(map(func, tqdm.tqdm(iterable, unit=unit, disable=not show_progress))) return list(tqdm.tqdm(self._pool.imap(func, iterable, **kwds), total=len(iterable), unit=unit, disable=not show_progress))
[docs] @pool_operation def starmap(self, func, iterable, **kwds): """ Apply the function to every item (argument tuple) in the iterable. Parameters ---------- func : function Function that shall be applied to the items in the iterable. The function object must support pickling. iterable : iterable Iterable containing the function arguments. An item in this iterable is a tuple of the arguments passed to the function. kwds : dict, optional If parallel processing is enabled, the keyword arguments are passed to the ``starmap``-method of ``multiprocessing.Pool``. Returns ------- result : list List of the function result for every item in the iterable. See Also -------- multiprocessing.Pool.starmap """ if self._pool is None: return list(starmap(func, iterable)) return self._pool.starmap(func, iterable, **kwds)
workers = WorkerManager() # One instance for use within the *hynet* package.