The Globus Compute Executor¶
- class globus_compute_sdk.Executor(endpoint_id: Union[None, uuid.UUID, str] = None, container_id: Union[None, uuid.UUID, str] = None, funcx_client: Optional[globus_compute_sdk.sdk.client.Client] = None, task_group_id: Union[None, uuid.UUID, str] = None, user_endpoint_config: Optional[dict[str, Any]] = None, label: str = '', batch_size: int = 128, **kwargs)¶
Extend Python’s
Executor
base class for Globus Compute’s purposes.- Parameters
endpoint_id – id of the endpoint to which to submit tasks
container_id – id of the container in which to execute tasks
funcx_client – instance of Client to be used by the executor. If not provided, the executor will instantiate one with default arguments.
task_group_id – The Task Group to which to associate tasks. If not set, one will be instantiated.
user_endpoint_config – User endpoint configuration values as described and allowed by endpoint administrators. Must be a JSON-serializable dict or None.
label – a label to name the executor; mainly utilized for logging and advanced needs with multiple executors.
batch_size – the maximum number of tasks to coalesce before sending upstream [min: 1, default: 128]
batch_interval – [DEPRECATED; unused] number of seconds to coalesce tasks before submitting upstream
batch_enabled – [DEPRECATED; unused] whether to batch results
- property task_group_id¶
The Task Group with which this instance is currently associated. New tasks will be sent to this Task Group upstream, and the result listener will only listen for results for this group.
Must be a UUID, valid uuid-like string, or None. Set by simple assignment:
>>> import uuid >>> from globus_compute_sdk import Executor >>> tg_id = uuid.uuid4() # IRL: some *known* taskgroup id >>> gce = Executor(task_group_id=tg_id) # Alternatively, may use a stringified uuid: >>> gce = Executor(task_group_id=str(tg_id)) # May also alter after construction: >>> gce.task_group_id = tg_id >>> gce.task_group_id = str(tg_id) # Internally, it is always stored as a UUID (or None): >>> gce.task_group_id UUID('11111111-2222-4444-8888-000000000000')
This is typically used when reattaching to a previously initiated set of tasks. See reload_tasks() for more information.
If not set manually, this will be set automatically on submit(), to a Task Group ID supplied by the services.
[default:
None
]
- property user_endpoint_config: dict[str, Any] | None¶
The endpoint configuration values, as described and allowed by endpoint administrators, that this instance is currently associated with.
Must be a JSON-serializable dict or None. Set by simple assignment:
>>> from globus_compute_sdk import Executor >>> uep_config = {"foo": "bar"} >>> gce = Executor(user_endpoint_config=uep_config) # May also alter after construction: >>> gce.user_endpoint_config = uep_config
- property container_id: uuid.UUID | None¶
The container id with which this Executor instance is currently associated. Tasks submitted after this is set will use this container.
Must be a UUID, valid uuid-like string, or None. Set by simple assignment:
>>> import uuid >>> from globus_compute_sdk import Executor >>> c_id = "00000000-0000-0000-0000-000000000000" # some known container id >>> c_as_uuid = uuid.UUID(c_id) >>> gce = Executor(container_id=c_id) # May also alter after construction: >>> gce.container_id = c_id >>> gce.container_id = c_as_uuid # also accepts a UUID object # Internally, it is always stored as a UUID (or None): >>> gce.container_id UUID('00000000-0000-0000-0000-000000000000')
[default:
None
]
- register_function(fn: Callable, function_id: Optional[str] = None, **func_register_kwargs) str ¶
Register a task function with this Executor’s cache.
All function execution submissions (i.e.,
.submit()
) communicate which pre-registered function to execute on the endpoint by the function’s identifier, thefunction_id
. This method makes the appropriate API call to the Globus Compute web services to first register the task function, and then stores the returnedfunction_id
in the Executor’s cache.In the standard workflow,
.submit()
will automatically handle invoking this method, so the common use-case will not need to use this method. However, some advanced use-cases may need to fine-tune the registration of a function and so may manually set the registration arguments via this method.If a function has already been registered (perhaps in a previous iteration), the upstream API call may be avoided by specifying the known
function_id
.If a function already exists in the Executor’s cache, this method will raise a ValueError to help track down the errant double registration attempt.
- Parameters
fn – function to be registered for remote execution
function_id – if specified, associate the
function_id
to thefn
immediately, short-circuiting the upstream registration call.func_register_kwargs – all other keyword arguments are passed to the
Client.register_function()
.
- Returns
the function’s
function_id
string, as returned by registration upstream
- submit(fn, *args, **kwargs)¶
Submit a function to be executed on the Executor’s specified endpoint with the given arguments.
Schedules the callable to be executed as
fn(*args, **kwargs)
and returns a ComputeFuture instance representing the execution of the callable.Example use:
>>> def add(a: int, b: int) -> int: return a + b >>> gce = Executor(endpoint_id="some-ep-id") >>> fut = gce.submit(add, 1, 2) >>> fut.result() # wait (block) until result is received from remote 3
- Parameters
fn – Python function to execute on endpoint
args – positional arguments (if any) as required to execute the function
kwargs – keyword arguments (if any) as required to execute the function
- Returns
a future object that will receive a
.task_id
when the Globus Compute Web Service acknowledges receipt, and eventually will have a.result()
when the Globus Compute web services receive and stream it.
- submit_to_registered_function(function_id: str, args: Optional[tuple] = None, kwargs: Optional[dict] = None)¶
Request an execution of an already registered function.
This method supports use of public functions with the Executor, or knowledge of an already registered function. An example use might be:
# pre_registration.py from globus_compute_sdk import Executor def some_processor(*args, **kwargs): # ... function logic ... return ["some", "result"] gce = Executor() fn_id = gce.register_function(some_processor) print(f"Function registered successfully.\nFunction ID: {fn_id}") # Example output: # # Function registered successfully. # Function ID: c407ae80-b31f-447a-9fa6-124098492057
In this case, the function would be privately registered to you, but note that the function id is just a string. One could substitute for a publicly available function. For instance,
b0a5d1a0-2b22-4381-b899-ba73321e41e0
is a “well-known” uuid for the “Hello, World!” function (same as the example in the Globus Compute tutorial), which is publicly available:from globus_compute_sdk import Executor fn_id = "b0a5d1a0-2b22-4381-b899-ba73321e41e0" # public; "Hello World" with Executor(endpoint_id="your-endpoint-id") as fxe: futs = [ fxe.submit_to_registered_function(function_id=fn_id) for i in range(5) ] for f in futs: print(f.result())
- Parameters
function_id – identifier (str) of registered Python function
args – positional arguments (if any) as required to execute the function
kwargs – keyword arguments (if any) as required to execute the function
- Returns
a future object that (eventually) will have a
.result()
when the Globus Compute web services receive and stream it.
- map(fn: Callable, *iterables, timeout=None, chunksize=1) Iterator ¶
Globus Compute does not currently implement the .map() method of the Executor interface. In a naive implementation, this method would merely be syntactic sugar for bulk use of the
.submit()
method. For example:def map(fxexec, fn, *fn_args_kwargs): return [fxexec.submit(fn, *a, **kw) for a, kw in fn_args_kwargs]
This naive implementation ignores a number of potential optimizations, so we have decided to look at this at a future date if there is interest.
- Raises
NotImplementedError – always raised
- reload_tasks(task_group_id: Union[None, uuid.UUID, str] = None) Iterable[globus_compute_sdk.sdk.asynchronous.compute_future.ComputeFuture] ¶
Load the set of tasks associated with this Executor’s Task Group from the web services and return a list of futures, one for each task. This is nominally intended to “reattach” to a previously initiated session, based on the Task Group ID.
- Parameters
task_group_id – Optionally specify a task_group_id to use. If present, will overwrite the Executor’s task_group_id
- Returns
An iterable of futures.
- Raises
ValueError – if the server response is incorrect or invalid
KeyError – the server did not return an expected response
various – the usual (unhandled) request errors (e.g., no connection; invalid authorization)
Notes
Any previous futures received from this executor will be cancelled.
- shutdown(wait=True, *, cancel_futures=False)¶
Clean-up the resources associated with the Executor.
It is safe to call this method several times. Otherwise, no other methods can be called after this one.
- Parameters
wait – If True then shutdown will not return until all running futures have finished executing and the resources used by the executor have been reclaimed.
cancel_futures – If True then shutdown will cancel all pending futures. Futures that are completed or running will not be cancelled.
- class globus_compute_sdk.sdk.executor.ComputeFuture(task_id: Optional[str] = None)¶
Extend concurrent.futures.Future to include an optional task UUID.
Initializes the future. Should not be called by clients.