The Globus Compute Executor#

class globus_compute_sdk.Executor(endpoint_id: UUID | str | None = None, container_id: UUID | str | None = None, client: Client | None = None, task_group_id: UUID | str | None = None, user_endpoint_config: dict[str, Any] | None = None, label: str = '', batch_size: int = 128, funcx_client: Client | None = None, amqp_port: int | None = None, **kwargs)#

Extend Python’s Executor base class for Globus Compute’s purposes.

Parameters:
  • endpoint_id – id of the endpoint to which to submit tasks

  • container_id – id of the container in which to execute tasks

  • client – instance of Client to be used by the executor. If not provided, the executor will instantiate one with default arguments.

  • task_group_id – The Task Group to which to associate tasks. If not set, one will be instantiated.

  • user_endpoint_config – User endpoint configuration values as described and allowed by endpoint administrators. Must be a JSON-serializable dict or None.

  • label – a label to name the executor; mainly utilized for logging and advanced needs with multiple executors.

  • batch_size – the maximum number of tasks to coalesce before sending upstream [min: 1, default: 128]

  • funcx_client – [DEPRECATED] alias for client.

  • batch_interval – [DEPRECATED; unused] number of seconds to coalesce tasks before submitting upstream

  • batch_enabled – [DEPRECATED; unused] whether to batch results

  • amqp_port – Port to use when connecting to results queue. Note that the Compute web services only support 5671, 5672, and 443.

property endpoint_id#

The ID of the endpoint currently associated with this instance. This determines where tasks are sent for execution, and since tasks have to go somewhere, the Executor will not run unless it has an endpoint_id set.

Must be a UUID, valid uuid-like string, or None. Set by simple assignment:

>>> import uuid
>>> from globus_compute_sdk import Executor
>>> ep_id = uuid.uuid4()  # IRL: some *known* endpoint id
>>> gce = Executor(endpoint_id=ep_id)

# Alternatively, may use a stringified uuid:
>>> gce = Executor(endpoint_id=str(ep_id))

# May also alter after construction:
>>> gce.endpoint_id = ep_id
>>> gce.endpoint_id = str(ep_id)

# Internally, it is always stored as a UUID (or None):
>>> gce.endpoint_id
UUID('11111111-2222-4444-8888-000000000000')

# Executors only run if they have an endpoint_id set:
>>> gce = Executor(endpoint_id=None)
Traceback (most recent call last):
    ...
ValueError: No endpoint_id set.  Did you forget to set it at construction?
  Hint:

    gce = Executor(endpoint_id=<ep_id>)
    gce.endpoint_id = <ep_id>    # alternative
property task_group_id#

The Task Group with which this instance is currently associated. New tasks will be sent to this Task Group upstream, and the result listener will only listen for results for this group.

Must be a UUID, valid uuid-like string, or None. Set by simple assignment:

>>> import uuid
>>> from globus_compute_sdk import Executor
>>> tg_id = uuid.uuid4()  # IRL: some *known* taskgroup id
>>> gce = Executor(task_group_id=tg_id)

# Alternatively, may use a stringified uuid:
>>> gce = Executor(task_group_id=str(tg_id))

# May also alter after construction:
>>> gce.task_group_id = tg_id
>>> gce.task_group_id = str(tg_id)

# Internally, it is always stored as a UUID (or None):
>>> gce.task_group_id
UUID('11111111-2222-4444-8888-000000000000')

This is typically used when reattaching to a previously initiated set of tasks. See reload_tasks() for more information.

If not set manually, this will be set automatically on submit(), to a Task Group ID supplied by the services.

[default: None]

property user_endpoint_config: dict[str, Any] | None#

The endpoint configuration values, as described and allowed by endpoint administrators, that this instance is currently associated with.

Must be a JSON-serializable dict or None. Set by simple assignment:

>>> from globus_compute_sdk import Executor
>>> uep_config = {"foo": "bar"}
>>> gce = Executor(user_endpoint_config=uep_config)

# May also alter after construction:
>>> gce.user_endpoint_config = uep_config
property container_id: UUID | None#

The container id with which this Executor instance is currently associated. Tasks submitted after this is set will use this container.

Must be a UUID, valid uuid-like string, or None. Set by simple assignment:

>>> import uuid
>>> from globus_compute_sdk import Executor
>>> c_id = "00000000-0000-0000-0000-000000000000"  # some known container id
>>> c_as_uuid = uuid.UUID(c_id)
>>> gce = Executor(container_id=c_id)

# May also alter after construction:
>>> gce.container_id = c_id
>>> gce.container_id = c_as_uuid  # also accepts a UUID object

# Internally, it is always stored as a UUID (or None):
>>> gce.container_id
UUID('00000000-0000-0000-0000-000000000000')

[default: None]

property amqp_port: int | None#

The port to use when connecting to the result queue. Can be one of 443, 5671, 5672, or None. If None, the port is assigned by the Compute web services (typically 5671).

register_function(fn: Callable, function_id: str | None = None, **func_register_kwargs) str#

Register a task function with this Executor’s cache.

All function execution submissions (i.e., .submit()) communicate which pre-registered function to execute on the endpoint by the function’s identifier, the function_id. This method makes the appropriate API call to the Globus Compute web services to first register the task function, and then stores the returned function_id in the Executor’s cache.

In the standard workflow, .submit() will automatically handle invoking this method, so the common use-case will not need to use this method. However, some advanced use-cases may need to fine-tune the registration of a function and so may manually set the registration arguments via this method.

If a function has already been registered (perhaps in a previous iteration), the upstream API call may be avoided by specifying the known function_id.

If a function already exists in the Executor’s cache, this method will raise a ValueError to help track down the errant double registration attempt.

Parameters:
  • fn – function to be registered for remote execution

  • function_id – if specified, associate the function_id to the fn immediately, short-circuiting the upstream registration call.

  • func_register_kwargs – all other keyword arguments are passed to the Client.register_function().

Returns:

the function’s function_id string, as returned by registration upstream

submit(fn, *args, **kwargs)#

Submit a function to be executed on the Executor’s specified endpoint with the given arguments.

Schedules the callable to be executed as fn(*args, **kwargs) and returns a ComputeFuture instance representing the execution of the callable.

Example use:

>>> def add(a: int, b: int) -> int: return a + b
>>> gce = Executor(endpoint_id="some-ep-id")
>>> fut = gce.submit(add, 1, 2)
>>> fut.result()    # wait (block) until result is received from remote
3
Parameters:
  • fn – Python function to execute on endpoint

  • args – positional arguments (if any) as required to execute the function

  • kwargs – keyword arguments (if any) as required to execute the function

Returns:

a future object that will receive a .task_id when the Globus Compute Web Service acknowledges receipt, and eventually will have a .result() when the Globus Compute web services receive and stream it.

submit_to_registered_function(function_id: str, args: tuple | None = None, kwargs: dict | None = None)#

Request an execution of an already registered function.

This method supports use of public functions with the Executor, or knowledge of an already registered function. An example use might be:

# pre_registration.py
from globus_compute_sdk import Executor

def some_processor(*args, **kwargs):
    # ... function logic ...
    return ["some", "result"]

gce = Executor()
fn_id = gce.register_function(some_processor)
print(f"Function registered successfully.\nFunction ID: {fn_id}")

# Example output:
#
# Function registered successfully.
# Function ID: c407ae80-b31f-447a-9fa6-124098492057

In this case, the function would be privately registered to you, but note that the function id is just a string. One could substitute for a publicly available function. For instance, b0a5d1a0-2b22-4381-b899-ba73321e41e0 is a “well-known” uuid for the “Hello, World!” function (same as the example in the Globus Compute tutorial), which is publicly available:

from globus_compute_sdk import Executor

fn_id = "b0a5d1a0-2b22-4381-b899-ba73321e41e0"  # public; "Hello World"
with Executor(endpoint_id="your-endpoint-id") as fxe:
    futs = [
        fxe.submit_to_registered_function(function_id=fn_id)
        for i in range(5)
    ]

for f in futs:
    print(f.result())
Parameters:
  • function_id – identifier (str) of registered Python function

  • args – positional arguments (if any) as required to execute the function

  • kwargs – keyword arguments (if any) as required to execute the function

Returns:

a future object that (eventually) will have a .result() when the Globus Compute web services receive and stream it.

map(fn: Callable, *iterables, timeout=None, chunksize=1) Iterator#

Globus Compute does not currently implement the .map() method of the Executor interface. In a naive implementation, this method would merely be syntactic sugar for bulk use of the .submit() method. For example:

def map(fxexec, fn, *fn_args_kwargs):
    return [fxexec.submit(fn, *a, **kw) for a, kw in fn_args_kwargs]

This naive implementation ignores a number of potential optimizations, so we have decided to look at this at a future date if there is interest.

Raises:
reload_tasks(task_group_id: UUID | str | None = None) Iterable[ComputeFuture]#

Load the set of tasks associated with this Executor’s Task Group from the web services and return a list of futures, one for each task. This is nominally intended to “reattach” to a previously initiated session, based on the Task Group ID.

Parameters:

task_group_id – Optionally specify a task_group_id to use. If present, will overwrite the Executor’s task_group_id

Returns:

An iterable of futures.

Raises:
  • ValueError – if the server response is incorrect or invalid

  • KeyError – the server did not return an expected response

  • various – the usual (unhandled) request errors (e.g., no connection; invalid authorization)

Notes

Any previous futures received from this executor will be cancelled.

shutdown(wait=True, *, cancel_futures=False)#

Clean-up the resources associated with the Executor.

It is safe to call this method several times. Otherwise, no other methods can be called after this one.

Parameters:
  • wait – If True then shutdown will not return until all running futures have finished executing and the resources used by the executor have been reclaimed.

  • cancel_futures – If True then shutdown will cancel all pending futures. Futures that are completed or running will not be cancelled.

get_worker_hardware_details() str#

Retrieve hardware information about worker nodes for the endpoint associated with this Executor. For example:

from globus_compute_sdk import Executor
with Executor(ep_uuid) as gcx:
    print(gcx.get_worker_hardware_details())
class globus_compute_sdk.sdk.executor.ComputeFuture(task_id: str | None = None)#

Extend concurrent.futures.Future to include an optional task UUID.

Initializes the future. Should not be called by clients.

task_id: str | None#

The UUID for the task behind this Future. In batch mode, this will not be populated immediately, but will appear later when the task is submitted to the Globus Compute services.