funcx.executors.high_throughput package

Submodules

funcx.executors.high_throughput.container_sched module

funcx.executors.high_throughput.container_sched.naive_scheduler(task_qs, outstanding_task_count, max_workers, old_worker_map, to_die_list, logger)
Return two items (as one tuple) dict kill_list :: KILL [(worker_type, num_kill), …]
dict create_list :: CREATE [(worker_type, num_create), …]

In this scheduler model, there is minimum 1 instance of each nonempty task queue.

funcx.executors.high_throughput.default_config module

funcx.executors.high_throughput.executor module

HighThroughputExecutor builds on the Swift/T EMEWS architecture to use MPI for fast task distribution

There’s a slow but sure deviation from Parsl’s Executor interface here, that needs to be addressed.

class funcx.executors.high_throughput.executor.HighThroughputExecutor(label='HighThroughputExecutor', provider=LocalProvider( channel=LocalChannel( envs={}, script_dir=None, userhome='/home/docs/checkouts/readthedocs.org/user_builds/funcx/checkouts/latest/docs' ), cmd_timeout=30, init_blocks=4, launcher=SingleNodeLauncher(), max_blocks=10, min_blocks=0, move_files=None, nodes_per_block=1, parallelism=1, walltime='00:15:00', worker_init='' ), launch_cmd=None, address='127.0.0.1', worker_ports=None, worker_port_range=(54000, 55000), interchange_port_range=(55000, 56000), storage_access=None, working_dir=None, worker_debug=False, cores_per_worker=1.0, max_workers=inf, heartbeat_threshold=120, heartbeat_period=30, poll_period=10, container_image=None, worker_mode='singularity_reuse', suppress_failure=False, endpoint_id=None, endpoint_db=None, managed=True, task_status_queue=None)

Bases: parsl.executors.base.ParslExecutor, parsl.utils.RepresentationMixin

Executor designed for cluster-scale

The HighThroughputExecutor system has the following components:
  1. The HighThroughputExecutor instance which is run as part of the Parsl script.
  2. The Interchange which is acts as a load-balancing proxy between workers and Parsl
  3. The multiprocessing based worker pool which coordinates task execution over several cores on a node.
  4. ZeroMQ pipes connect the HighThroughputExecutor, Interchange and the process_worker_pool

Here is a diagram

             |  Data   |  Executor   |  Interchange  | External Process(es)
             |  Flow   |             |               |
        Task | Kernel  |             |               |
      +----->|-------->|------------>|->outgoing_q---|-> process_worker_pool
      |      |         |             | batching      |    |         |
Parsl<---Fut-|         |             | load-balancing|  result   exception
          ^  |         |             | watchdogs     |    |         |
          |  |         |   Q_mngmnt  |               |    V         V
          |  |         |    Thread<--|-incoming_q<---|--- +---------+
          |  |         |      |      |               |
          |  |         |      |      |               |
          +----update_fut-----+
Parameters:
  • provider (ExecutionProvider) –
    Provider to access computation resources. Can be one of EC2Provider,
    Cobalt, Condor, GoogleCloud, GridEngine, Jetstream, Local, GridEngine, Slurm, or Torque.
  • label (str) – Label for this executor instance.
  • launch_cmd (str) – Command line string to launch the process_worker_pool from the provider. The command line string will be formatted with appropriate values for the following values (debug, task_url, result_url, cores_per_worker, nodes_per_block, heartbeat_period ,heartbeat_threshold, logdir). For eg: launch_cmd=”process_worker_pool.py {debug} -c {cores_per_worker} –task_url={task_url} –result_url={result_url}”
  • address (string) – An address to connect to the main Parsl process which is reachable from the network in which workers will be running. This can be either a hostname as returned by hostname or an IP address. Most login nodes on clusters have several network interfaces available, only some of which can be reached from the compute nodes. Some trial and error might be necessary to indentify what addresses are reachable from compute nodes.
  • worker_ports ((int, int)) – Specify the ports to be used by workers to connect to Parsl. If this option is specified, worker_port_range will not be honored.
  • worker_port_range ((int, int)) – Worker ports will be chosen between the two integers provided.
  • interchange_port_range ((int, int)) – Port range used by Parsl to communicate with the Interchange.
  • working_dir (str) – Working dir to be used by the executor.
  • worker_debug (Bool) – Enables worker debug logging.
  • managed (Bool) – If this executor is managed by the DFK or externally handled.
  • cores_per_worker (float) – cores to be assigned to each worker. Oversubscription is possible by setting cores_per_worker < 1.0. Default=1
  • max_workers (int) – Caps the number of workers launched by the manager. Default: infinity
  • suppress_failure (Bool) – If set, the interchange will suppress failures rather than terminate early. Default: False
  • heartbeat_threshold (int) – Seconds since the last message from the counterpart in the communication pair: (interchange, manager) after which the counterpart is assumed to be un-available. Default:120s
  • heartbeat_period (int) – Number of seconds after which a heartbeat message indicating liveness is sent to the counterpart (interchange, manager). Default:30s
  • poll_period (int) – Timeout period to be used by the executor components in milliseconds. Increasing poll_periods trades performance for cpu efficiency. Default: 10ms
  • container_image (str) – Path or identfier to the container image to be used by the workers
  • = None (endpoint_db) – Endpoint DB object
  • worker_mode (str) – Select the mode of operation from no_container, singularity_reuse, singularity_single_use Default: singularity_reuse
  • task_status_queue (queue.Queue) – Queue to pass updates to task statuses back to the forwarder.
connected_workers
connection_info

All connection info necessary for the endpoint to connect back

Returns:Dict with connection info
hold_worker(worker_id)

Puts a worker on hold, preventing scheduling of additional tasks to it.

This is called “hold” mostly because this only stops scheduling of tasks, and does not actually kill the worker.

Parameters:worker_id (str) – Worker id to be put on hold
initialize_scaling()

Compose the launch command and call the scale_out

This should be implemented in the child classes to take care of executor specific oddities.

outstanding
scale_in(blocks)

Scale in the number of active blocks by specified amount.

The scale in method here is very rude. It doesn’t give the workers the opportunity to finish current tasks or cleanup. This is tracked in issue #530

Raises:NotImplementedError
scale_out(blocks=1)

Scales out the number of blocks by “blocks”

Raises:NotImplementedError
scaling_enabled

Specify if scaling is enabled.

The callers of ParslExecutors need to differentiate between Executors and Executors wrapped in a resource provider

send_heartbeat()
shutdown(hub=True, targets='all', block=False)

Shutdown the executor, including all workers and controllers.

This is not implemented.

Kwargs:
  • hub (Bool): Whether the hub should be shutdown, Default:True,
  • targets (list of ints| ‘all’): List of block id’s to kill, Default:’all’
  • block (Bool): To block for confirmations or not
Raises:NotImplementedError
start()

Create the Interchange process and connect to it.

status()

Return status of all blocks.

submit(bufs, task_id=None)

Submits work to the the outgoing_q.

The outgoing_q is an external process listens on this queue for new work. This method behaves like a submit call as described here Python docs:

Parameters:
  • - Pickled buffer with (b'<Function>', b'<args>', b'<kwargs>') (Bufs) –
  • Returns – Future
wait_for_endpoint()
weakref_cb(q=None)

We do not use this yet.

funcx.executors.high_throughput.executor.executor_starter(htex, logdir, endpoint_id, logging_level=10)

funcx.executors.high_throughput.funcx_manager module

class funcx.executors.high_throughput.funcx_manager.Manager(task_q_url='tcp://127.0.0.1:50097', result_q_url='tcp://127.0.0.1:50098', max_queue_size=10, cores_per_worker=1, max_workers=inf, uid=None, heartbeat_threshold=120, heartbeat_period=30, logdir=None, debug=False, block_id=None, internal_worker_port_range=(50000, 60000), worker_mode='singularity_reuse', scheduler_mode='hard', worker_type=None, worker_max_idletime=60, poll_period=100)

Bases: object

Manager manages task execution by the workers

0mq | Manager | Worker Processes
| |
<—–Request N task—–+–Count task reqs | Request task<–+
Interchange | ————————-+->Receive task batch| | |
| Distribute tasks–+—-> Get(block) & |
| | Execute task |
| | | |
<————————+–Return results—-+—- Post result |
| | | |
| | +———-+
| IPC-Qeueues
create_reg_message()

Creates a registration message to identify the worker to the interchange

pull_tasks(kill_event)

Pull tasks from the incoming tasks 0mq pipe onto the internal pending task queue

While :

receive results and task requests from the workers receive tasks/heartbeats from the Interchange match tasks to workers if task doesn’t have appropriate worker type:

launch worker of type.. with LRU or some sort of caching strategy.
if workers >> tasks:
advertize available capacity
kill_event : threading.Event
Event to let the thread know when it is time to die.
push_results(kill_event, max_result_batch_size=1)

Listens on the pending_result_queue and sends out results via 0mq

kill_event : threading.Event
Event to let the thread know when it is time to die.
remove_worker_init(worker_type)

Kill/Remove a worker of a given worker_type.

Add a kill message to the task_type queue.

Assumption : All workers of the same type are uniform, and therefore don’t discriminate when killing.

start()
  • while True:
    Receive tasks and start appropriate workers Push tasks to available workers Forward results
funcx.executors.high_throughput.funcx_manager.cli_run()

funcx.executors.high_throughput.funcx_worker module

class funcx.executors.high_throughput.funcx_worker.FuncXWorker(worker_id, address, port, logdir, debug=False, worker_type='RAW')

Bases: object

The FuncX worker :param worker_id: Worker id string :type worker_id: str :param address: Address at which the manager might be reached. This is usually 127.0.0.1 :type address: str :param port: Port at which the manager can be reached :type port: int :param logdir: Logging directory :type logdir: str :param debug: Enables debug logging :type debug: Bool

Funcx worker will use the REP sockets to:
task = recv () result = execute(task) send(result)
execute_task(message)

Deserialize the buffer and execute the task.

Returns the result or throws exception.

registration_message()
start()
funcx.executors.high_throughput.funcx_worker.cli_run()

funcx.executors.high_throughput.global_config module

funcx.executors.high_throughput.interchange module

exception funcx.executors.high_throughput.interchange.BadRegistration(worker_id, critical=False)

Bases: Exception

A new Manager tried to join the executor with a BadRegistration message

class funcx.executors.high_throughput.interchange.Interchange(config, client_address='127.0.0.1', interchange_address='127.0.0.1', client_ports: Tuple[int, int, int] = (50055, 50056, 50057), worker_ports=None, worker_port_range=(54000, 55000), cores_per_worker=1.0, worker_debug=False, launch_cmd=None, heartbeat_threshold=30, logdir='.', logging_level=20, poll_period=10, endpoint_id=None, suppress_failure=False, max_heartbeats_missed=2)

Bases: object

Interchange is a task orchestrator for distributed systems.

  1. Asynchronously queue large volume of tasks (>100K)
  2. Allow for workers to join and leave the union
  3. Detect workers that have failed using heartbeats
  4. Service single and batch requests from workers
  5. Be aware of requests worker resource capacity, eg. schedule only jobs that fit into walltime.

TODO: We most likely need a PUB channel to send out global commands, like shutdown

get_container(container_uuid)

Get the container image location if it is not known to the interchange

get_outstanding_breakdown()

Get outstanding breakdown per manager and in the interchange queues

Returns:
  • List of status for online elements
  • [ (element, tasks_pending, status) … ]
get_status_report()

Get utilization numbers

get_tasks(count)

Obtains a batch of tasks from the internal pending_task_queue

Parameters:count (int) – Count of tasks to get from the queue
Returns:eg. [{‘task_id’:<x>, ‘buffer’:<buf>} … ]
Return type:List of upto count tasks. May return fewer than count down to an empty list
get_total_live_workers()

Get the total active workers

get_total_tasks_outstanding()

Get the outstanding tasks in total

hold_manager(manager)

Put manager on hold :param manager: Manager id to be put on hold while being killed :type manager: str

load_config()

Load the config

migrate_tasks_to_internal(kill_event, status_request)

Pull tasks from the incoming tasks 0mq pipe onto the internal pending task queue

kill_event : threading.Event
Event to let the thread know when it is time to die.
provider_status()

Get status of all blocks from the provider

scale_in(blocks=None, block_ids=[], task_type=None)

Scale in the number of active blocks by specified amount.

Parameters:
  • blocks (int) – # of blocks to terminate
  • block_ids ([str.. ]) – List of external block ids to terminate
scale_out(blocks=1, task_type=None)

Scales out the number of blocks by “blocks”

Raises:NotImplementedError
start(poll_period=None)

Start the Interchange

poll_period : int
poll_period in milliseconds
stop()

Prepare the interchange for shutdown

exception funcx.executors.high_throughput.interchange.ManagerLost(worker_id)

Bases: Exception

Task lost due to worker loss. Worker is considered lost when multiple heartbeats have been missed.

exception funcx.executors.high_throughput.interchange.ShutdownRequest

Bases: Exception

Exception raised when any async component receives a ShutdownRequest

funcx.executors.high_throughput.interchange.cli_run()
funcx.executors.high_throughput.interchange.start_file_logger(filename, name='interchange', level=10, format_string=None)

Add a stream log handler.

Parameters:
  • filename (string) – Name of the file to write logs to. Required.
  • name (string) – Logger name. Default=”parsl.executors.interchange”
  • level (logging.LEVEL) – Set the logging level. Default=logging.DEBUG - format_string (string): Set the format string
  • format_string (string) – Format string to use.
Returns:

Return type:

None.

funcx.executors.high_throughput.interchange.starter(comm_q, *args, **kwargs)

Start the interchange process

The executor is expected to call this function. The args, kwargs match that of the Interchange.__init__

funcx.executors.high_throughput.interchange_task_dispatch module

funcx.executors.high_throughput.interchange_task_dispatch.dispatch(interesting_managers, pending_task_queue, ready_manager_queue, scheduler_mode='hard', loop='first', task_dispatch=None, dispatched_tasks=0)

This is the core task dispatching algorithm for interchange. The algorithm depends on the scheduler mode and which loop.

funcx.executors.high_throughput.interchange_task_dispatch.get_tasks_hard(pending_task_queue, manager_ads, real_capacity)
funcx.executors.high_throughput.interchange_task_dispatch.get_tasks_soft(pending_task_queue, manager_ads, real_capacity, loop='first')
funcx.executors.high_throughput.interchange_task_dispatch.naive_interchange_task_dispatch(interesting_managers, pending_task_queue, ready_manager_queue, scheduler_mode='hard')

This is an initial task dispatching algorithm for interchange. It returns a dictionary, whose key is manager, and the value is the list of tasks to be sent to manager, and the total number of dispatched tasks.

funcx.executors.high_throughput.worker_map module

class funcx.executors.high_throughput.worker_map.WorkerMap(max_worker_count)

Bases: object

WorkerMap keeps track of workers

add_worker(worker_id='0.6328570906899961', mode='no_container', worker_type='RAW', container_uri=None, walltime=1, address=None, debug=None, worker_port=None, logdir=None, uid=None)

Launch the appropriate worker

Parameters:
  • worker_id (str) – Worker identifier string
  • mode (str) – Valid options are no_container, singularity
  • walltime (int) – Walltime in seconds before we check status
get_next_worker_q(new_worker_map)
Helper function to generate a queue of next workers to spin up .
From a mapping generated by the scheduler
Parameters:new_worker_map (dict) – {worker_type: total_number_of_containers,…}
Returns:
Return type:Queue containing the next workers the system should spin-up.
get_worker(worker_type)

Get a task and reduce the # of worker for that type by 1. Raises queue.Empty if empty

get_worker_counts()

Returns just the dict of worker_type and counts

put_worker(worker)

Adds worker to the list of waiting workers

ready_worker_count()
register_worker(worker_id, worker_type)

Add a new worker

remove_worker(worker_id)

Remove the worker from the WorkerMap

Should already be KILLed by this point.

spin_down_workers(new_worker_map, worker_max_idletime=60, need_more=False, scheduler_mode='hard')

Helper function to call ‘remove’ for appropriate workers in ‘new_worker_map’.

Parameters:new_worker_map (dict) – {worker_type: total_number_of_containers,…}.
Returns:
Return type:List of removed worker types.
spin_up_workers(next_worker_q, address=None, debug=None, uid=None, logdir=None, worker_port=None)

Helper function to call ‘remove’ for appropriate workers in ‘new_worker_map’.

Parameters:
  • new_worker_q (queue.Queue()) – Queue of worker types to be spun up next.
  • address (str) – Address at which to connect to the workers.
  • debug (bool) – Whether debug logging is activated.
  • uid (str) – Worker ID to be assigned to worker.
  • logdir (str) – Directory in which to write logs
  • worker_port (int) – Port at which to connect to the workers.
Returns:

Return type:

Total number of spun-up workers.

update_worker_idle(worker_type)

Update the workers’ last idle time by worker type

funcx.executors.high_throughput.zmq_pipes module

class funcx.executors.high_throughput.zmq_pipes.CommandClient(ip_address, port_range)

Bases: object

close()
run(message)

This function needs to be fast at the same time aware of the possibility of ZMQ pipes overflowing.

The timeout increases slowly if contention is detected on ZMQ pipes. We could set copy=False and get slightly better latency but this results in ZMQ sockets reaching a broken state once there are ~10k tasks in flight. This issue can be magnified if each the serialized buffer itself is larger.

class funcx.executors.high_throughput.zmq_pipes.ResultsIncoming(ip_address, port_range)

Bases: object

Incoming results queue from the Interchange to the executor

close()
get(block=True, timeout=None)
request_close()
class funcx.executors.high_throughput.zmq_pipes.TasksOutgoing(ip_address, port_range)

Bases: object

Outgoing task queue from the executor to the Interchange

close()
put(message, max_timeout=1000)

This function needs to be fast at the same time aware of the possibility of ZMQ pipes overflowing.

The timeout increases slowly if contention is detected on ZMQ pipes. We could set copy=False and get slightly better latency but this results in ZMQ sockets reaching a broken state once there are ~10k tasks in flight. This issue can be magnified if each the serialized buffer itself is larger.

Parameters:
  • message (py object) – Python object to send
  • max_timeout (int) – Max timeout in milliseconds that we will wait for before raising an exception
Raises:

zmq.EAGAIN if the send failed.

Module contents