Source code for hynet.distributed.server

#pylint: disable=no-member,logging-format-interpolation
"""
*hynet* optimization server for distributed computation.
"""

import logging
import subprocess
import getpass
from multiprocessing import Queue, Value, Process
from multiprocessing.managers import SyncManager
from time import sleep

import numpy as np
import tqdm

import hynet.config as config
from hynet.types_ import SolverType
from hynet.scenario.representation import Scenario
from hynet.system.model import SystemModel
from hynet.system.calc import calc, select_solver
from hynet.opf.calc import calc_opf
from hynet.qcqp.problem import QCQP

_log = logging.getLogger(__name__)


[docs]def start_optimization_server(port=None, authkey=None, local=False): """ Create, start, and return a *hynet* optimization server. Parameters ---------- port : int, optional TCP port on which the *hynet* optimization server shall be running. authkey : str, optional Authentication key that must be presented by *hynet* optimization clients to connect to the server. local : bool, optional If ``True`` (default is ``False``), the optimization server processes all jobs on the local machine and *connections of clients are not accepted*. In case that some code is designed to utilize distributed computation, but the the server cluster is not available, this local mode supports the computation on the local machine without the client management overhead. Returns ------- server : OptimizationServer The *hynet* optimization server. """ if port is None: port = config.DISTRIBUTED['default_port'] if authkey is None: authkey = config.DISTRIBUTED['default_authkey'] return OptimizationServer(port, authkey, local)
[docs]class OptimizationJob: """ Represents a *hynet* optimization job. *Customization:* To customize the job processing, derive from this class and override ``process``. Parameters ---------- data : Scenario or SystemModel or QCQP Scenario (to solve its OPF), a problem builder (an object of a derived class of ``SystemModel`` like ``OPFModel``), or a QCQP specification. solver : SolverInterface, optional Solver for the provided problem. The default automatically selects an appropriate solver of the specified solver type. Please make sure that the selected solver is installed on all client machines. solver_type : SolverType, optional Solver type for the automatic solver selection (default ``SolverType.QCQP``). It is ignored if ``solver`` is not ``None``. """ def __init__(self, data, solver=None, solver_type=SolverType.QCQP): if not isinstance(data, (Scenario, SystemModel, QCQP)): raise ValueError("The provided problem specification is invalid.") self.data = data self.solver = solver self.solver_type = solver_type self.id = None # Used internally to sort the job results
[docs] def process(self): """ Process the optimization job and return the result (or exception). """ try: if isinstance(self.data, QCQP): if self.solver is None: self.solver = select_solver(self.solver_type)() result = self.solver.solve(self.data) elif isinstance(self.data, SystemModel): result = calc(self.data, solver=self.solver, solver_type=self.solver_type) else: result = calc_opf(self.data, solver=self.solver, solver_type=self.solver_type) except Exception as exception: _log.warning("Job {0} failed: {1}".format(self.id, str(exception))) result = exception return result
[docs]class OptimizationServer: """ *hynet* optimization server for distributed computation. This server manages the distributed computation of a set of *hynet* optimization problems (OPF or QCQPs) on *hynet* optimization clients. """ def __init__(self, port, authkey, local): """ Create a *hynet* optimization server. """ self._port = port self._authkey = authkey self._local = local if not local: self._manager = _create_server_manager(port, authkey.encode('utf-8')) self._job_queue = self._manager.get_job_queue() self._result_queue = self._manager.get_result_queue() else: self._manager = self._job_queue = self._result_queue = None @staticmethod def _enumerate_jobs(job_list, solver): """Create and return an enumerated job list.""" job_enumeration = [] for i, job in enumerate(job_list): if not isinstance(job, OptimizationJob): job = OptimizationJob(job, solver=solver) job.id = i + 1 job_enumeration.append(job) return job_enumeration
[docs] def calc_jobs(self, job_list, solver=None, show_progress=True): """ Calculate the list of *hynet* optimization jobs and return the results. The provided list of jobs is processed by distributing them to the connected *hynet* optimization clients, collecting the results, and returning an array of results that corresponds with the provided array of jobs. Note that if there are no clients connected, this method will wait until a client is connected to process the jobs. Parameters ---------- job_list : array-like List of *hynet* optimization jobs (``OptimizationJob``) or problem specifications (``Scenario`` [issues an OPF computation], ``SystemModel``, or ``QCQP``). solver : SolverInterface, optional If provided, this solver is used for problem specifications (``Scenario``, ``SystemModel``, or ``QCQP``). It is ignored for job specifications (``OptimizationJob``). show_progress : bool, optional If True (default), the progress is reported to the standard output. Returns ------- results : numpy.ndarray Array containing the optimization results. """ job_list = self._enumerate_jobs(job_list, solver) results = np.empty(len(job_list), dtype=object) # Are we in local mode? If so, process here... if self._local: for i, job in enumerate(tqdm.tqdm(job_list, unit="job", disable=not show_progress)): results[job.id - 1] = job.process() return results # ...otherwise, put them in the queue and let the clients process them. for job in job_list: self._job_queue.put(job) num_results = Value('I', 0, lock=False) # Unsigned int shared mem. var. if show_progress: progress_bar = Process(target=_progress_bar, args=(num_results, len(job_list))) progress_bar.start() while num_results.value < len(job_list): result_dict = self._result_queue.get() for (job_id, result) in result_dict.items(): results[job_id - 1] = result num_results.value += len(result_dict) if show_progress: # Wait for the progress bar to finish progress_bar.join() return results
[docs] def shutdown(self): """Stop the *hynet* optimization server and all connected clients.""" if self._manager is not None: self._manager.shutdown()
[docs] def start_clients(self, client_list, server_ip, ssh_user=None, ssh_port=None, num_workers=None, log_file=None, suppress_output=True): """ Automated start of *hynet* optimization clients. This method provides an automatic start of *hynet* optimization clients via SSH if the server can connect to the clients via ``ssh [client]`` (e.g. by configuring SSH keys; please be aware of the related aspects of system security). *hynet* must be properly installed on all client machines. This function uses SSH to run the *hynet* package with the sub-command ``client`` and corresponding command line arguments (``python -m hynet client ...``) on every client machine. To customize the SSH and Python command, see ``hynet.config``. Parameters ---------- client_list : array-like List of strings containing the host names or IP addresses of the client machines. server_ip : str IP address the *hynet* optimization server. ssh_user : str, optional The user name for the SSH login on the client machines. By default, this is set to the current user name (``getpass.getuser()``). ssh_port : int, optional Port on which SSH is running on the client machines. num_workers : int, optional Number of worker processes that should run in parallel on every client machine. log_file : str, optional Log file on the client machines to capture the output. suppress_output : bool, optional If ``True`` (default), the activity output of the optimization clients is suppressed. """ if self._local: _log.warning("The server is in local mode. " "Skipping the start of clients.") return if ssh_user is None: ssh_user = getpass.getuser() command_pre = config.DISTRIBUTED['ssh_command'] + " -f " if ssh_port is not None: command_pre += "-p {0:d} ".format(ssh_port) command_pre += ssh_user + "@" command_post = ' "' + config.DISTRIBUTED['python_command'] command_post += " -m hynet client " command_post += server_ip command_post += " -p {0:d}".format(self._port) command_post += " -a " + self._authkey if num_workers is not None: command_post += " -n {0:d}".format(num_workers) if log_file is not None: command_post += " &> " + log_file command_post += '"' if suppress_output: output_stream = subprocess.DEVNULL else: output_stream = None # Stay with the default for client in client_list: try: subprocess.run(command_pre + client + command_post, shell=True, check=True, stdout=output_stream) except subprocess.CalledProcessError as exception: _log.error("Failed to start client on '{0}': {1}" .format(client, str(exception)))
def _create_server_manager(port, authkey): """ Return a (started) manager for a *hynet* optimization server. Parameters ---------- port : int TCP port on which the *hynet* optimization server shall be running. authkey : str Authentication key that must be presented by *hynet* optimization clients to connect to the server. Returns ------- manager : ServerManager Manager object for a *hynet* optimization server. """ job_queue = Queue() result_queue = Queue() class ServerManager(SyncManager): """This class manages the synchronization of the queues.""" pass ServerManager.register('get_job_queue', callable=lambda: job_queue) ServerManager.register('get_result_queue', callable=lambda: result_queue) manager = ServerManager(address=('', port), authkey=authkey) manager.start() _log.info('Started server on port {0}'.format(port)) return manager def _progress_bar(counter, num_total): """ Show a progress bar on the standard output. Parameters ---------- counter : multiprocessing.Value Counter for the number of processed jobs as a shared memory object. num_total : int Total number of jobs. """ num_progress = 0 with tqdm.tqdm(total=num_total, unit="job") as progress_bar: while True: num_done = counter.value if num_done != num_progress: progress_bar.update(num_done - num_progress) num_progress = num_done else: progress_bar.refresh() if num_done == num_total: return sleep(0.25)