SDK

The funcX SDK provides a programmatic interface to funcX from Python. The SDK provides a convenient Pythonic interface to:

  1. Register functions
  2. Register containers and execution environments
  3. Launch registered functions on accessible endpoints
  4. Check the status of launched functions
  5. Retrieve outputs from functions

The SDK provides a client class for interacting with funcX. The client abstracts authentication and provides an interface to make funcX API calls without needing to know the funcX REST endpoints for those operations. You can instantiate a funcX client as follows:

from funcx.sdk.client import FuncXClient
fxc = FuncXClient()

Instantiating a client will start an authentication process where you will be asked to authenticate via Globus Auth. We require every interaction with funcX to be authenticated, as this enables enforced access control on both functions and endpoints. Globus Auth is an identity and access management platform that provides authentication brokering capablities enabling users to login using one of several hundred supported identities. It also provides group and profile management for user accounts. As part of the authentication process, funcX will request access to your identity (to retrieve your email address) and Globus Groups. funcX uses Groups to facilitate sharing and to make authorization decisions. funcX allows endpoints and functions to be shared by associating a Globus Group.

Note

funcX internally caches function, endpoint, and authorization lookups. Caches are based on user authentication tokens. To force refresh cached entries, you can re-authenticate your client with force_login=True.

Registering Functions

You can register a Python function with funcX via register_function(). Function registration serializes the function body and transmits it to funcX. Once the function is registered with funcX, it is assigned a UUID that can be used to manage and invoke the function.

Note

You must import any dependencies required by the function inside the function body.

The following example shows how to register a function. In this case, the function simply returns the platform information of the system on which it is executed. The function is defined in the same way as any Python function before being registered with funcX.

def platform_func():
  import platform
  return platform.platform()

func_uuid = fxc.register_function(platform_func)

Running Functions

You can invoke a function using the UUID returned when registering the function. The run() function requires that you specify the function (function_id) and endpoint (endpoint_id) on which to execute the function. funcX will return a UUID for the executing function (called a task) via which you can monitor status and retrieve results.

tutorial_endpoint = '4b116d3c-1703-4f8f-9f6f-39921e5864df'
task_id = fxc.run(endpoint_id=tutorial_endpoint, function_id=func_uuid)

Note

funcX places limits on the size of the functions and the rate at which functions can be submitted. Please refer to the limits section for TODO:YADU

Retrieving Results

The result of your function’s invocation can be retrieved using the get_result() function. This will either return the deserialized result of your invocation or raise an exception indicating that the task is still pending.

Note

If your function raises an exception, get_result() will reraise it.

try:
  print(fxc.get_result(task_id))
except Exception as e:
  print("Exception: {}".format(e))

Note

funcX caches results in the cloud until they have been retrieved. The SDK also caches results during a session. However, calling get_result() from a new session will not be able to access the results.

Arguments and data

funcX functions operate the same as any other Python function. You can pass arguments *args and **kwargs and return values from functions. The only constraint is that data passed to/from a funcX function must be serializable (e.g., via Pickle) and fall within Limits . Input arguments can be passed to the function using the run() function. The following example shows how strings can be passed to and from a function.

def funcx_hello(firstname, lastname):
  return 'Hello {} {}'.format(firstname, lastname)

func_id = fxc.register_function(funcx_hello)

task_id = fxc.run("Bob", "Smith", endpoint_id=tutorial_endpoint, function_id=func_id)

try:
  print(fxc.get_result(task_id))
except Exception as e:
  print("Exception: {}".format(e))

Sharing Functions

You may share functions publicly (with anyone) or a set of users via a Globus Group. You can also add a function description such that it can be discovered by others.

To share with a group, set group=<globus_group_id> when registering a function.

fxc.register_function(funcx, description="My function", group=<globus_group_id>)

Upon execution, funcX will check group membership to ensure that the user is authorized to execute the function.

You can also set a function to be publicly accessible by setting public=True when registering the function.

fxc.register_function(funcx, description="My function", public=True)

Discovering Functions

funcX maintains an access controlled search index of registered functions. You can look up your own functions, functions that have been shared with you, or publicly accessible functions via the search_function() function.

search_results = fxc.search_function("my function", offset=0, limit=5)
print(search_results)

Batching

The SDK includes a batch interface to reduce the overheads of launching a function many times. To use this interface, you must first create a batch object and then pass that object to the batch_run function. batch_run is non-blocking and returns a list of task ids corresponding to the functions in the batch with the ordering preserved.

batch = fxc.create_batch()

for x in range(0,5):
  batch.add(x, endpoint_id=tutorial_endpoint, function_id=func_id)

# batch_run returns a list task ids
batch_res = fxc.batch_run(batch)

The batch result interface is useful to to fetch the results of a collection of task_ids. get_batch_result is called with a list of task_ids. It is non-blocking and returns a dict with task_ids as the keys and each value is a dict that contains status information and a result if it is available.

>>> results = fxc.get_batch_result(batch_res)
>>> print(results)

{'10c9678c-b404-4e40-bfd4-81581f52f9db': {'pending': False,
                                          'status': 'success',
                                          'result': 0,
                                          'completion_t': '1632876695.6450012'},
 '587afd2e-59e0-4d2d-82ab-cee409784c4c': {'pending': False,
                                          'status': 'success',
                                          'result': 0,
                                          'completion_t': '1632876695.7048604'},
 '11f34d69-913a-4442-ae79-ede046585d8f': {'pending': True,
                                          'status': 'waiting-for-ep'},
 'a2d86014-28a8-486d-b86e-5f38c80d0333': {'pending': True,
                                          'status': 'waiting-for-ep'},
 'e453a993-73e6-4149-8078-86e7b8370c35': {'pending': True,
                                          'status': 'waiting-for-ep'}
}

FuncXClient Reference:

class funcx.FuncXClient(http_timeout=None, funcx_home='~/.funcx', force_login=False, fx_authorizer=None, search_authorizer=None, openid_authorizer=None, funcx_service_address='https://api2.funcx.org/v2', check_endpoint_version=False, asynchronous=False, loop=None, results_ws_uri='wss://api2.funcx.org/ws/v2/', use_offprocess_checker=True, **kwargs)

Main class for interacting with the funcX service

Holds helper operations for performing common tasks with the funcX service.

add_to_whitelist(endpoint_id, function_ids)

Adds the function to the endpoint’s whitelist

Parameters:
  • endpoint_id (str) – The uuid of the endpoint
  • function_ids (list) – A list of function id’s to be whitelisted
Returns:

The response of the request

Return type:

json

batch_run(batch)

Initiate a batch of tasks to funcX

Parameters:batch (a Batch object) –
Returns:task_ids
Return type:a list of UUID strings that identify the tasks
create_batch(task_group_id=None)

Create a Batch instance to handle batch submission in funcX

Parameters:task_group_id (str) – Override the session wide session_task_group_id with a different task_group_id for this batch. If task_group_id is not specified, it will default to using the client’s session_task_group_id
Returns:Status block containing “status” key.
Return type:Batch instance
delete_from_whitelist(endpoint_id, function_ids)

List the endpoint’s whitelist

Parameters:
  • endpoint_id (str) – The uuid of the endpoint
  • function_ids (list) – A list of function id’s to be whitelisted
Returns:

The response of the request

Return type:

json

get_batch_result(task_id_list)

Request status for a batch of task_ids

get_container(container_uuid, container_type)

Get the details of a container for staging it locally.

Parameters:
  • container_uuid (str) – UUID of the container in question
  • container_type (str) – The type of containers that will be used (Singularity, Shifter, Docker)
Returns:

The details of the containers to deploy

Return type:

dict

get_containers(name, description=None)

Register a DLHub endpoint with the funcX service and get the containers to launch.

Parameters:
  • name (str) – Name of the endpoint
  • description (str) – Description of the endpoint
Returns:

The port to connect to and a list of containers

Return type:

int

get_endpoint_status(endpoint_uuid)

Get the status reports for an endpoint.

Parameters:endpoint_uuid (str) – UUID of the endpoint in question
Returns:The details of the endpoint’s stats
Return type:dict
get_result(task_id)

Get the result of a funcX task

Parameters:task_id (str) – UUID of the task
Returns:Result obj
Return type:If task completed
Raises:Exception obj: Exception due to which the task failed
get_task(task_id)

Get a funcX task.

Parameters:task_id (str) – UUID of the task
Returns:Task block containing “status” key.
Return type:dict
get_whitelist(endpoint_id)

List the endpoint’s whitelist

Parameters:endpoint_id (str) – The uuid of the endpoint
Returns:The response of the request
Return type:json
logout()

Remove credentials from your local system

map_run(*args, endpoint_id=None, function_id=None, asynchronous=False, **kwargs)

Initiate an invocation

Parameters:
  • *args (Any) – Args as specified by the function signature
  • endpoint_id (uuid str) – Endpoint UUID string. Required
  • function_id (uuid str) – Function UUID string. Required
  • asynchronous (bool) – Whether or not to run the function asynchronously
Returns:

  • task_id (str)
  • UUID string that identifies the task

register_container(location, container_type, name='', description='')

Register a container with the funcX service.

Parameters:
  • location (str) – The location of the container (e.g., its docker url). Required
  • container_type (str) – The type of containers that will be used (Singularity, Shifter, Docker). Required
  • name (str) – A name for the container. Default = ‘’
  • description (str) – A description to associate with the container. Default = ‘’
Returns:

The id of the container

Return type:

str

register_endpoint(name, endpoint_uuid, metadata=None, endpoint_version=None)

Register an endpoint with the funcX service.

Parameters:
  • name (str) – Name of the endpoint
  • endpoint_uuid (str) – The uuid of the endpoint
  • metadata (dict) – endpoint metadata, see default_config example
  • endpoint_version (str) – Version string to be passed to the webService as a compatibility check
Returns:

{‘endpoint_id’ : <>,

’address’ : <>, ‘client_ports’: <>}

Return type:

A dict

register_function(function, function_name=None, container_uuid=None, description=None, public=False, group=None, searchable=True)

Register a function code with the funcX service.

Parameters:
  • function (Python Function) – The function to be registered for remote execution
  • function_name (str) – The entry point (function name) of the function. Default: None
  • container_uuid (str) – Container UUID from registration with funcX
  • description (str) – Description of the file
  • public (bool) – Whether or not the function is publicly accessible. Default = False
  • group (str) – A globus group uuid to share this function with
  • searchable (bool) – If true, the function will be indexed into globus search with the appropriate permissions
Returns:

function uuid – UUID identifier for the registered function

Return type:

str

run(*args, endpoint_id=None, function_id=None, **kwargs)

Initiate an invocation

Parameters:
  • *args (Any) – Args as specified by the function signature
  • endpoint_id (uuid str) – Endpoint UUID string. Required
  • function_id (uuid str) – Function UUID string. Required
  • asynchronous (bool) – Whether or not to run the function asynchronously
Returns:

  • task_id (str)
  • UUID string that identifies the task if asynchronous is False
  • funcX Task (asyncio.Task)
  • A future that will eventually resolve into the function’s result if
  • asynchronous is True

search_endpoint(q, scope='all', owner_id=None)
Parameters:
  • q
  • scope (str) – Can be one of {‘all’, ‘my-endpoints’, ‘shared-with-me’}
  • owner_id – should be urn like f”urn:globus:auth:identity:{owner_uuid}”
search_function(q, offset=0, limit=10, advanced=False)

Search for function via the funcX service

Parameters:
  • q (str) – free-form query string
  • offset (int) – offset into total results
  • limit (int) – max number of results to return
  • advanced (bool) – allows elastic-search like syntax in query string
Returns:

Return type:

FunctionSearchResults

update_table(return_msg, task_id)

Parses the return message from the service and updates the internal func_table

Parameters:
  • return_msg (str) – Return message received from the funcx service
  • task_id (str) – task id string
version_check()

Check this client version meets the service’s minimum supported version.