The FuncX Executor

class funcx.FuncXExecutor(funcx_client: funcx.sdk.client.FuncXClient, label: str = 'FuncXExecutor', batch_enabled: bool = True, batch_interval: float = 1.0, batch_size: int = 100)

Extends the concurrent.futures.Executor class to layer this interface over funcX. The executor returns future objects that are asynchronously updated with results by the WebSocketPollingTask using a websockets connection to the hosted funcx-websocket-service.

submit(function, *args, endpoint_id=None, container_uuid=None, **kwargs)

Initiate an invocation

Parameters:
  • function (Function/Callable) – Function / Callable to execute

  • *args (Any) – Args as specified by the function signature

  • endpoint_id (uuid str) – Endpoint UUID string. Required

  • **kwargs (Any) – Arbitrary kwargs

Returns:

Future – A future object

Return type:

funcx.sdk.asynchronous.funcx_future.FuncXFuture

reload_tasks() Iterable[funcx.sdk.asynchronous.funcx_future.FuncXFuture]

Load the set of tasks associated with this Executor’s Task Group (FuncXClient) from the server and return a set of futures, one for each task. This is nominally intended to “reattach” to a previously initiated session, based on the Task Group ID. An example use might be:

import sys
import typing as T
from funcx import FuncXClient, FuncXExecutor
from funcx.sdk.executor import FuncXFuture

fxc_kwargs = {}
if len(sys.argv) > 1:
    fxc_kwargs["task_group_id"] = sys.argv[1]

def example_funcx_kernel(num):
    result = f"your funcx logic result, from task: {num}"
    return result

fxclient = FuncXClient(**fxc_kwargs)
fxexec = FuncXExecutor(fxclient)

# Save the task_group_id somewhere.  Perhaps in a file, or less
# robustly "as mere text" on your console:
print("If this script dies, rehydrate futures with this "
     f"Task Group ID: {fxexec.task_group_id}")

futures: T.Iterable[FuncXFuture] = []
results, exceptions = [], []
if "task_group_id" in fxc_kwargs:
    print(f"Reloading tasks from Task Group ID: {fxexec.task_group_id}")
    futures = fxexec.reload_tasks()

    # Ask server once up-front if there are any known results before
    # waiting for each result in turn (below):
    task_ids = [f.task_id for f in futures]
    finished_tasks = set()
    for task_id, state in fxclient.get_batch_result(task_ids).items():
        if not state["pending"]:
            finished_tasks.add(task_id)
            if state["status"] == "success":
                results.append(state["result"])
            else:
                exceptions.append(state["exception"])
    futures = [f for f in futures if f.task_id not in finished_tasks]

else:
    print("New session; creating FuncX tasks ...")
    ep_id = "<YOUR_ENDPOINT_UUID>"
    for i in range(1, 5):
        futures.append(
            fxexec.submit(example_funcx_kernel, endpoint_id=ep_id)
        )

    # ... Right here, your script dies for [SILLY REASON;
    #           DID YOU LOSE POWER?] ...

# Get results:
for f in futures:
    try:
        results.append(f.result(timeout=10))
    except Exception as exc:
        exceptions.append(exc)
Returns:

  • An iterable of futures.

  • Known throws

  • ——

  • - The usual (unhandled) request errors (e.g., no connection; invalid – authorization)

  • - ValueError if the server response is incorrect

  • - KeyError if the server did not return an expected response

Notes

Any previous futures received from this executor will be cancelled.

shutdown()

Clean-up the resources associated with the Executor.

It is safe to call this method several times. Otherwise, no other methods can be called after this one.

Parameters:
  • wait – If True then shutdown will not return until all running futures have finished executing and the resources used by the executor have been reclaimed.

  • cancel_futures – If True then shutdown will cancel all pending futures. Futures that are completed or running will not be cancelled.

class funcx.sdk.executor.FuncXFuture(task_id: Optional[str] = None)

Extends concurrent.futures.Future to include an optional task UUID.

task_id: Optional[str]

The UUID for the task behind this Future. In batch mode, this will not be populated immediately, but will appear later when the task is submitted to the FuncX services.