The FuncX Executor

class funcx.FuncXExecutor(funcx_client: Optional[funcx.sdk.client.FuncXClient] = None, label: str = 'FuncXExecutor', batch_enabled: bool = True, batch_size: int = 100, **kwargs)

The FuncXExecutor class, a subclass of concurrent.futures.Executor, is the preferred approach to collecting results from the funcX Web Service. Over polling (the historical approach) where the web service must be repeatedly queried for the status of tasks and results eventually collected in bulk, the FuncXExecutor class instantiates a WebSocket connection that streams results directly – and immediately – as they arrive at the server. This is a far more efficient paradigm, simultaneously in terms of bytes over the wire, time spent waiting for results, and boilerplate code to check for results.

An interaction might look like:

from funcx import FuncXExecutor
from funcx.sdk.executor import FuncXFuture

fxexec = FuncXExecutor()
ep_id = "<YOUR_ENDPOINT_UUID>"

def example_funcx_kernel(num):
    import time
    time.sleep(num * random.random())  # simulate some processing
    return f"result, from task: {num}"

futs: list[FuncXFuture] = [
    fxexec.submit(example_funcx_kernel, task_i, endpoint_id=ep_id)
    for task_i in range(1, 21)
]
# FuncXFuture is a subclass of concurrent.futures.Future

results, exceptions = [], []
for f in concurrent.futures.as_completed(futs, timeout=30):
    # wait no more than 30s for all results
    try:
        results.append(f.result())
    except Exception as exc:
        exceptions.append((f.task_id, exc))

print("Results received (unordered):\n  ", "\n  ".join(results))
for task_id, exc in exceptions:
    print(f"  Exception received from task {task_id}: {exc}")

Each future returned by .submit() is a handle to that particular task’s result; that future will be completed by a background thread in the FuncXExecutor as soon as the server sends the result – no polling, just an event-based interaction.

submit(function, *args, endpoint_id=None, container_uuid=None, **kwargs)

Initiate an invocation

Parameters:
  • function (Function/Callable) – Function / Callable to execute

  • *args (Any) – Args as specified by the function signature

  • endpoint_id (uuid str) – Endpoint UUID string. Required

  • **kwargs (Any) – Arbitrary kwargs

Returns:

future – A future object, that will receive a .task_id when the funcX Web Service acknowledges receipt, and eventually will have a .result() when the Web Service streams it over the WebSocket.

Return type:

FuncXFuture

reload_tasks() Iterable[funcx.sdk.asynchronous.funcx_future.FuncXFuture]

Load the set of tasks associated with this Executor’s Task Group (FuncXClient) from the server and return a set of futures, one for each task. This is nominally intended to “reattach” to a previously initiated session, based on the Task Group ID. An example use might be:

import sys
import typing as T
from funcx import FuncXClient, FuncXExecutor
from funcx.sdk.executor import FuncXFuture

fxc_kwargs = {}
if len(sys.argv) > 1:
    fxc_kwargs["task_group_id"] = sys.argv[1]

def example_funcx_kernel(num):
    result = f"your funcx logic result, from task: {num}"
    return result

fxclient = FuncXClient(**fxc_kwargs)
fxexec = FuncXExecutor(fxclient)

# Save the task_group_id somewhere.  Perhaps in a file, or less
# robustly "as mere text" on your console:
print("If this script dies, rehydrate futures with this "
     f"Task Group ID: {fxexec.task_group_id}")

futures: T.Iterable[FuncXFuture] = []
results, exceptions = [], []
if "task_group_id" in fxc_kwargs:
    print(f"Reloading tasks from Task Group ID: {fxexec.task_group_id}")
    futures = fxexec.reload_tasks()

    # Ask server once up-front if there are any known results before
    # waiting for each result in turn (below):
    task_ids = [f.task_id for f in futures]
    finished_tasks = set()
    for task_id, state in fxclient.get_batch_result(task_ids).items():
        if not state["pending"]:
            finished_tasks.add(task_id)
            if state["status"] == "success":
                results.append(state["result"])
            else:
                exceptions.append(state["exception"])
    futures = [f for f in futures if f.task_id not in finished_tasks]

else:
    print("New session; creating FuncX tasks ...")
    ep_id = "<YOUR_ENDPOINT_UUID>"
    for i in range(1, 5):
        futures.append(
            fxexec.submit(example_funcx_kernel, endpoint_id=ep_id)
        )

    # ... Right here, your script dies for [SILLY REASON;
    #           DID YOU LOSE POWER?] ...

# Get results:
for f in futures:
    try:
        results.append(f.result(timeout=10))
    except Exception as exc:
        exceptions.append(exc)
Return type:

An iterable of futures.

Raises:
  • ValueError – if the server response is incorrect:

  • KeyError – if the server did not return an expected response:

  • various – the usual (unhandled) request errors (e.g., no connection; invalid authorization):

Notes

Any previous futures received from this executor will be cancelled.

shutdown()

Clean-up the resources associated with the Executor.

It is safe to call this method several times. Otherwise, no other methods can be called after this one.

Parameters:
  • wait – If True then shutdown will not return until all running futures have finished executing and the resources used by the executor have been reclaimed.

  • cancel_futures – If True then shutdown will cancel all pending futures. Futures that are completed or running will not be cancelled.

class funcx.sdk.executor.FuncXFuture(task_id: Optional[str] = None)

Extends concurrent.futures.Future to include an optional task UUID.

task_id: Optional[str]

The UUID for the task behind this Future. In batch mode, this will not be populated immediately, but will appear later when the task is submitted to the FuncX services.