funcx.executors package

Module contents

class funcx.executors.HighThroughputExecutor(label='HighThroughputExecutor', provider=LocalProvider( channel=LocalChannel( envs={}, script_dir=None, userhome='/home/docs/checkouts/readthedocs.org/user_builds/funcx/checkouts/latest/docs' ), cmd_timeout=30, init_blocks=4, launcher=SingleNodeLauncher(), max_blocks=10, min_blocks=0, move_files=None, nodes_per_block=1, parallelism=1, walltime='00:15:00', worker_init='' ), launch_cmd=None, address='127.0.0.1', worker_ports=None, worker_port_range=(54000, 55000), interchange_port_range=(55000, 56000), storage_access=None, working_dir=None, worker_debug=False, cores_per_worker=1.0, max_workers=inf, heartbeat_threshold=120, heartbeat_period=30, poll_period=10, container_image=None, worker_mode='singularity_reuse', suppress_failure=False, endpoint_id=None, endpoint_db=None, managed=True, task_status_queue=None)

Bases: parsl.executors.base.ParslExecutor, parsl.utils.RepresentationMixin

Executor designed for cluster-scale

The HighThroughputExecutor system has the following components:
  1. The HighThroughputExecutor instance which is run as part of the Parsl script.
  2. The Interchange which is acts as a load-balancing proxy between workers and Parsl
  3. The multiprocessing based worker pool which coordinates task execution over several cores on a node.
  4. ZeroMQ pipes connect the HighThroughputExecutor, Interchange and the process_worker_pool

Here is a diagram

             |  Data   |  Executor   |  Interchange  | External Process(es)
             |  Flow   |             |               |
        Task | Kernel  |             |               |
      +----->|-------->|------------>|->outgoing_q---|-> process_worker_pool
      |      |         |             | batching      |    |         |
Parsl<---Fut-|         |             | load-balancing|  result   exception
          ^  |         |             | watchdogs     |    |         |
          |  |         |   Q_mngmnt  |               |    V         V
          |  |         |    Thread<--|-incoming_q<---|--- +---------+
          |  |         |      |      |               |
          |  |         |      |      |               |
          +----update_fut-----+
Parameters:
  • provider (ExecutionProvider) –
    Provider to access computation resources. Can be one of EC2Provider,
    Cobalt, Condor, GoogleCloud, GridEngine, Jetstream, Local, GridEngine, Slurm, or Torque.
  • label (str) – Label for this executor instance.
  • launch_cmd (str) – Command line string to launch the process_worker_pool from the provider. The command line string will be formatted with appropriate values for the following values (debug, task_url, result_url, cores_per_worker, nodes_per_block, heartbeat_period ,heartbeat_threshold, logdir). For eg: launch_cmd=”process_worker_pool.py {debug} -c {cores_per_worker} –task_url={task_url} –result_url={result_url}”
  • address (string) – An address to connect to the main Parsl process which is reachable from the network in which workers will be running. This can be either a hostname as returned by hostname or an IP address. Most login nodes on clusters have several network interfaces available, only some of which can be reached from the compute nodes. Some trial and error might be necessary to indentify what addresses are reachable from compute nodes.
  • worker_ports ((int, int)) – Specify the ports to be used by workers to connect to Parsl. If this option is specified, worker_port_range will not be honored.
  • worker_port_range ((int, int)) – Worker ports will be chosen between the two integers provided.
  • interchange_port_range ((int, int)) – Port range used by Parsl to communicate with the Interchange.
  • working_dir (str) – Working dir to be used by the executor.
  • worker_debug (Bool) – Enables worker debug logging.
  • managed (Bool) – If this executor is managed by the DFK or externally handled.
  • cores_per_worker (float) – cores to be assigned to each worker. Oversubscription is possible by setting cores_per_worker < 1.0. Default=1
  • max_workers (int) – Caps the number of workers launched by the manager. Default: infinity
  • suppress_failure (Bool) – If set, the interchange will suppress failures rather than terminate early. Default: False
  • heartbeat_threshold (int) – Seconds since the last message from the counterpart in the communication pair: (interchange, manager) after which the counterpart is assumed to be un-available. Default:120s
  • heartbeat_period (int) – Number of seconds after which a heartbeat message indicating liveness is sent to the counterpart (interchange, manager). Default:30s
  • poll_period (int) – Timeout period to be used by the executor components in milliseconds. Increasing poll_periods trades performance for cpu efficiency. Default: 10ms
  • container_image (str) – Path or identfier to the container image to be used by the workers
  • = None (endpoint_db) – Endpoint DB object
  • worker_mode (str) – Select the mode of operation from no_container, singularity_reuse, singularity_single_use Default: singularity_reuse
  • task_status_queue (queue.Queue) – Queue to pass updates to task statuses back to the forwarder.
connected_workers
connection_info

All connection info necessary for the endpoint to connect back

Returns:Dict with connection info
hold_worker(worker_id)

Puts a worker on hold, preventing scheduling of additional tasks to it.

This is called “hold” mostly because this only stops scheduling of tasks, and does not actually kill the worker.

Parameters:worker_id (str) – Worker id to be put on hold
initialize_scaling()

Compose the launch command and call the scale_out

This should be implemented in the child classes to take care of executor specific oddities.

outstanding
scale_in(blocks)

Scale in the number of active blocks by specified amount.

The scale in method here is very rude. It doesn’t give the workers the opportunity to finish current tasks or cleanup. This is tracked in issue #530

Raises:NotImplementedError
scale_out(blocks=1)

Scales out the number of blocks by “blocks”

Raises:NotImplementedError
scaling_enabled

Specify if scaling is enabled.

The callers of ParslExecutors need to differentiate between Executors and Executors wrapped in a resource provider

send_heartbeat()
shutdown(hub=True, targets='all', block=False)

Shutdown the executor, including all workers and controllers.

This is not implemented.

Kwargs:
  • hub (Bool): Whether the hub should be shutdown, Default:True,
  • targets (list of ints| ‘all’): List of block id’s to kill, Default:’all’
  • block (Bool): To block for confirmations or not
Raises:NotImplementedError
start()

Create the Interchange process and connect to it.

status()

Return status of all blocks.

submit(bufs, task_id=None)

Submits work to the the outgoing_q.

The outgoing_q is an external process listens on this queue for new work. This method behaves like a submit call as described here Python docs:

Parameters:
  • - Pickled buffer with (b'<Function>', b'<args>', b'<kwargs>') (Bufs) –
  • Returns – Future
wait_for_endpoint()
weakref_cb(q=None)

We do not use this yet.