The FuncX Executor

class funcx.FuncXExecutor(endpoint_id: Optional[str] = None, container_id: Optional[str] = None, funcx_client: Optional[funcx.sdk.client.FuncXClient] = None, task_group_id: Optional[str] = None, label: str = '', batch_size: int = 128, **kwargs)

Extend Python’s FuncXExecutor base class for funcX’s purposes.

Parameters:
  • endpoint_id – id of the endpoint to which to submit tasks

  • container_id – id of the container in which to execute tasks

  • funcx_client – instance of FuncXClient to be used by the executor. If not provided, the executor will instantiate one with default arguments.

  • task_group_id – The Task Group to which to associate tasks. If not set, one will be instantiated.

  • label – a label to name the executor; mainly utilized for logging and advanced needs with multiple executors.

  • batch_size – the maximum number of tasks to coalesce before sending upstream [min: 1, default: 128]

  • batch_interval – [DEPRECATED; unused] number of seconds to coalesce tasks before submitting upstream

  • batch_enabled – [DEPRECATED; unused] whether to batch results

property task_group_id: str

The Task Group with which this instance is currently associated. New tasks will be sent to this Task Group upstream, and the result listener will only listen for results for this group.

Must be a string. Set by simple assignment:

fxe = FuncXExecutor(endpoint_id="...")
fxe.task_group_id = "Some-stored-id"

This is typically used when reattaching to a previously initiated set of tasks. See reload_tasks() for more information.

[default: None, which translates to the FuncXClient task group id]

register_function(fn: Callable, function_id: Optional[str] = None, **func_register_kwargs) str

Register a task function with this Executor’s cache.

All function execution submissions (i.e., .submit()) communicate which pre-registered function to execute on the endpoint by the function’s identifier, the function_id. This method makes the appropriate API call to the funcX web services to first register the task function, and then stores the returned function_id in the Executor’s cache.

In the standard workflow, .submit() will automatically handle invoking this method, so the common use-case will not need to use this method. However, some advanced use-cases may need to fine-tune the registration of a function and so may manually set the registration arguments via this method.

If a function has already been registered (perhaps in a previous iteration), the upstream API call may be avoided by specifying the known function_id.

If a function already exists in the Executor’s cache, this method will raise a ValueError to help track down the errant double registration attempt.

Parameters:
  • fn – function to be registered for remote execution

  • function_id – if specified, associate the function_id to the fn immediately, short-circuiting the upstream registration call.

  • func_register_kwargs – all other keyword arguments are passed to the FuncXClient.register_function().

Returns:

the function’s function_id string, as returned by registration upstream

submit(fn, *args, **kwargs)

Submit a function to be executed on the Executor’s specified endpoint with the given arguments.

Schedules the callable to be executed as fn(*args, **kwargs) and returns a FuncXFuture instance representing the execution of the callable.

Example use:

>>> def add(a: int, b: int) -> int: return a + b
>>> fxe = FuncXExecutor(endpoint_id="some-ep-id")
>>> fut = fxe.submit(add, 1, 2)
>>> fut.result()    # wait (block) until result is received from remote
3
Parameters:
  • fn – Python function to execute on endpoint

  • args – positional arguments (if any) as required to execute the function

  • kwargs – keyword arguments (if any) as required to execute the function

Returns:

a future object that will receive a .task_id when the funcX Web Service acknowledges receipt, and eventually will have a .result() when the funcX web services receive and stream it.

submit_to_registered_function(function_id: str, args: Optional[tuple] = None, kwargs: Optional[dict] = None)

Request an execution of an already registered function.

This method supports use of public functions with the FuncXExecutor, or knowledge of an already registered function. An example use might be:

# pre_registration.py
from funcx import FuncXExecutor

def some_processor(*args, **kwargs):
    # ... function logic ...
    return ["some", "result"]

fxe = FuncXExecutor()
fn_id = fxe.register_function(some_processor)
print(f"Function registered successfully.\nFunction ID: {fn_id}")

# Example output:
#
# Function registered successfully.
# Function ID: c407ae80-b31f-447a-9fa6-124098492057

In this case, the function would be privately registered to you, but note that the function id is just a string. One could substitute for a publicly available function. For instance, b0a5d1a0-2b22-4381-b899-ba73321e41e0 is a “well-known” uuid for the “Hello, World!” function (same as the example in the FuncX tutorial), which is publicly available:

from funcx import FuncXExecutor

fn_id = "b0a5d1a0-2b22-4381-b899-ba73321e41e0"  # public; "Hello World"
with FuncXExecutor(endpoint_id="your-endpoint-id") as fxe:
    futs = [
        fxe.submit_to_registered_function(function_id=fn_id)
        for i in range(5)
    ]

for f in futs:
    print(f.result())
Parameters:
  • function_id – identifier (str) of registered Python function

  • args – positional arguments (if any) as required to execute the function

  • kwargs – keyword arguments (if any) as required to execute the function

Returns:

a future object that (eventually) will have a .result() when the funcX web services receive and stream it.

map(fn: Callable, *iterables, timeout=None, chunksize=1) Iterator

FuncX does not currently implement the .map() method of the Executor interface. In a naive implementation, this method would merely be syntactic sugar for bulk use of the .submit() method. For example:

def map(fxexec, fn, *fn_args_kwargs):
    return [fxexec.submit(fn, *a, **kw) for a, kw in fn_args_kwargs]

This naive implementation ignores a number of potential optimizations, so we have decided to look at this at a future date if there is interest.

Raises:

NotImplementedError – always raised

reload_tasks() Iterable[funcx.sdk.asynchronous.funcx_future.FuncXFuture]

Load the set of tasks associated with this Executor’s Task Group from the web services and return a list of futures, one for each task. This is nominally intended to “reattach” to a previously initiated session, based on the Task Group ID.

Returns:

An iterable of futures.

Raises:
  • ValueError – if the server response is incorrect or invalid

  • KeyError – the server did not return an expected response

  • various – the usual (unhandled) request errors (e.g., no connection; invalid authorization)

Notes

Any previous futures received from this executor will be cancelled.

shutdown(wait=True, *, cancel_futures=False)

Clean-up the resources associated with the Executor.

It is safe to call this method several times. Otherwise, no other methods can be called after this one.

Parameters:
  • wait – If True then shutdown will not return until all running futures have finished executing and the resources used by the executor have been reclaimed.

  • cancel_futures – If True then shutdown will cancel all pending futures. Futures that are completed or running will not be cancelled.

class funcx.sdk.executor.FuncXFuture(task_id: Optional[str] = None)

Extend concurrent.futures.Future to include an optional task UUID.

Initializes the future. Should not be called by clients.

task_id: Optional[str]

The UUID for the task behind this Future. In batch mode, this will not be populated immediately, but will appear later when the task is submitted to the FuncX services.