The FuncX Executor¶
- class funcx.FuncXExecutor(funcx_client: funcx.sdk.client.FuncXClient, label: str = 'FuncXExecutor', batch_enabled: bool = True, batch_interval: float = 1.0, batch_size: int = 100)¶
Extends the concurrent.futures.Executor class to layer this interface over funcX. The executor returns future objects that are asynchronously updated with results by the WebSocketPollingTask using a websockets connection to the hosted funcx-websocket-service.
- submit(function, *args, endpoint_id=None, container_uuid=None, **kwargs)¶
Initiate an invocation
- Parameters:
function (Function/Callable) – Function / Callable to execute
*args (Any) – Args as specified by the function signature
endpoint_id (uuid str) – Endpoint UUID string. Required
**kwargs (Any) – Arbitrary kwargs
- Returns:
Future – A future object
- Return type:
- reload_tasks() Iterable[funcx.sdk.asynchronous.funcx_future.FuncXFuture] ¶
Load the set of tasks associated with this Executor’s Task Group (FuncXClient) from the server and return a set of futures, one for each task. This is nominally intended to “reattach” to a previously initiated session, based on the Task Group ID. An example use might be:
import sys import typing as T from funcx import FuncXClient, FuncXExecutor from funcx.sdk.executor import FuncXFuture fxc_kwargs = {} if len(sys.argv) > 1: fxc_kwargs["task_group_id"] = sys.argv[1] def example_funcx_kernel(num): result = f"your funcx logic result, from task: {num}" return result fxclient = FuncXClient(**fxc_kwargs) fxexec = FuncXExecutor(fxclient) # Save the task_group_id somewhere. Perhaps in a file, or less # robustly "as mere text" on your console: print("If this script dies, rehydrate futures with this " f"Task Group ID: {fxexec.task_group_id}") futures: T.Iterable[FuncXFuture] = [] results, exceptions = [], [] if "task_group_id" in fxc_kwargs: print(f"Reloading tasks from Task Group ID: {fxexec.task_group_id}") futures = fxexec.reload_tasks() # Ask server once up-front if there are any known results before # waiting for each result in turn (below): task_ids = [f.task_id for f in futures] finished_tasks = set() for task_id, state in fxclient.get_batch_result(task_ids).items(): if not state["pending"]: finished_tasks.add(task_id) if state["status"] == "success": results.append(state["result"]) else: exceptions.append(state["exception"]) futures = [f for f in futures if f.task_id not in finished_tasks] else: print("New session; creating FuncX tasks ...") ep_id = "<YOUR_ENDPOINT_UUID>" for i in range(1, 5): futures.append( fxexec.submit(example_funcx_kernel, endpoint_id=ep_id) ) # ... Right here, your script dies for [SILLY REASON; # DID YOU LOSE POWER?] ... # Get results: for f in futures: try: results.append(f.result(timeout=10)) except Exception as exc: exceptions.append(exc)
- Returns:
An iterable of futures.
Known throws
——
- The usual (unhandled) request errors (e.g., no connection; invalid – authorization)
- ValueError if the server response is incorrect
- KeyError if the server did not return an expected response
Notes
Any previous futures received from this executor will be cancelled.
- shutdown()¶
Clean-up the resources associated with the Executor.
It is safe to call this method several times. Otherwise, no other methods can be called after this one.
- Parameters:
wait – If True then shutdown will not return until all running futures have finished executing and the resources used by the executor have been reclaimed.
cancel_futures – If True then shutdown will cancel all pending futures. Futures that are completed or running will not be cancelled.