Endpoints

An endpoint is a persistent service launched by the user on their compute system that serves as a conduit for routing and executing functions to their compute system. This could be their laptop, the login node of a campus cluster, grid, or supercomputing facility.

The endpoint can be configured to connect to the funcX API web service at funcx.org. Once the endpoint is registered you can invoke functions to be executed on it.

To install the funcX endpoint agent software:

$ python3 -m pip install funcx_endpoint

Note

Please note that the funcx-endpoint is supported on Linux and MacOS, and not on Windows.

First time setup

The first time you run any of the funcx-endpoint commands, if there is no existing configuration found at $HOME/.funcx, you will be prompted to authenticate. For example, you will likely want to configure an endpoint so you may as well start by

$ funcx-endpoint configure

You will be asked to authenticate with Globus Auth. We require authentication in order to associate endpoints with users and enforce authentication and access control on the endpoint. As part of this step we request access to your identity information (to retrieve your email address) and Globus Groups management. We use Groups information to facilitate sharing of functions and endpoints by checking the Group membership of a group associated with a function.

Once you’ve run this command, a directory will be created at $HOME/.funcx and a set of configuration files will be generated. A default endpoint profile is also created that will be used, whenever you do not explicitly specify a profile to use for endpoint actions.

You can also set up auto-completion for the funcx-endpoint commands in your shell, by using the command

$ funcx-endpoint –install-completion [zsh bash fish …]

Which will allow commands and endpoint names to autocomplete.

Configuring funcX

A configuration file will be created at $HOME/.funcx/config.py. This contains base information regarding the endpoint, such as a username and email. By default, this includes an address and port for a local broker, which is used if a local broker is deployed. You can also set the address of the endpoint for your workers to connect to. This is necessary if your workers are not deployed on the same resource as the endpoint (e.g., when using a batch submission system, or cloud workers).

Note

If your funcX workers are not deployed on the same resource as the endpoint you must set the endpoint address for the workers to find the endpoint. This is done by setting the endpoint_address.

For example

import getpass
from parsl.addresses import address_by_route, address_by_hostname

global_options = {
  'username': getpass.getuser(),
  'email': 'USER@USERDOMAIN.COM',
  'broker_address': '127.0.0.1',
  'broker_port': 8088,
  'endpoint_address': address_by_hostname(),
}

Configuring an Endpoint

FuncX endpoints are designed to act as gateways to computational resources such as clusters, clouds, supercomputers, and even your laptop. To make the best use of your resources, the endpoint must be configured to match the resources’ capabilities and to reflect the needs of the workloads you plan to execute. For example, you may want to limit the number of cores available to your endpoint.

FuncX provides a rich class-based configuration model that allows you to specify the shape of the resources (# of nodes, # of cores per worker, walltime, etc.) as well as allowing you to place limits on how funcX may scale the resources in response to changing workload demands.

To generate the appropriate directories and default config template, run the following command:

$ funcx-endpoint configure <ENDPOINT_NAME>

This command will create a profile for your endpoint in $HOME/.funcx/<ENDPOINT_NAME>/ and will instantiate a config.py file. This file should be updated with the appropriate configurations for the computational system you are targeting before you start the endpoint. funcX builds on Parsl and is configured using a Config object. For more information, see the Config class documentation.

Note

If the ENDPOINT_NAME is not specified, a default endpoint named “default” is configured.

Note

All configuration examples below must be customized for the user’s allocation, Python environment, file system, etc.

Blue Waters (NCSA)

_images/blue-waters-supercomputer.jpg

The following snippet shows an example configuration for executing remotely on Blue Waters, a supercomputer at the National Center for Supercomputing Applications. The configuration assumes the user is running on a login node, uses the TorqueProvider to interface with the scheduler, and uses the AprunLauncher to launch workers.

from funcx_endpoint.endpoint.utils.config import Config
from funcx_endpoint.executors import HighThroughputExecutor
from parsl.providers import TorqueProvider
from parsl.launchers import AprunLauncher
from parsl.addresses import address_by_hostname

# PLEASE UPDATE user_opts BEFORE USE
user_opts = {
    'bluewaters': {
        'worker_init': 'module load bwpy;source anaconda3/etc/profile.d/conda.sh;conda activate funcx_testing_py3.7',
        'scheduler_options': '',
    }
}

config = Config(
    executors=[
        HighThroughputExecutor(
            max_workers_per_node=1,
            worker_debug=False,
            address=address_by_hostname(),
            provider=TorqueProvider(
                queue='normal',
                launcher=AprunLauncher(overrides="-b -- bwpy-environ --"),
                # string to prepend to #SBATCH blocks in the submit
                scheduler_options=user_opts['bluewaters']['scheduler_options'],

                # Command to be run before starting a worker, such as:
                # 'module load bwpy; source activate parsl_env'.
                worker_init=user_opts['bluewaters']['worker_init'],
                init_blocks=1,
                max_blocks=1,
                min_blocks=1,
                nodes_per_block=2,
                walltime='00:30:00'
            ),
        )

    ],
)

UChicago AI Cluster

_images/ai-science-web.jpeg

The following snippet shows an example configuration for the University of Chicago’s AI Cluster. The configuration assumes the user is running on a login node and uses the SlurmProvider to interface with the scheduler and launch onto the GPUs.

Link to docs.

from funcx_endpoint.endpoint.utils.config import Config
from funcx_endpoint.executors import HighThroughputExecutor

from parsl.providers import LocalProvider
from parsl.channels import LocalChannel
from parsl.providers import SlurmProvider
from parsl.launchers import SrunLauncher
from parsl.addresses import address_by_hostname


# PLEASE CONFIGURE THESE OPTIONS BEFORE USE
NODES_PER_JOB = 2
GPUS_PER_NODE = 4
GPUS_PER_WORKER = 2

# Do not modify:
TOTAL_WORKERS = int((NODES_PER_JOB*GPUS_PER_NODE)/GPUS_PER_WORKER)
WORKERS_PER_NODE = int(GPUS_PER_NODE / GPUS_PER_WORKER)
GPU_MAP = ','.join([str(x) for x in range(1,TOTAL_WORKERS + 1)])

config = Config(
    executors=[HighThroughputExecutor(
        label="fe.cs.uchicago",
        address=address_by_hostname(),
        provider=SlurmProvider(
            channel=LocalChannel(),
            nodes_per_block=NODES_PER_JOB,
            init_blocks=1,
            partition='general',
            # Launch 4 managers per node, each bound to 1 GPU
            # This is a hack. We use hostname ; to terminate the srun command, and start our own
            # DO NOT MODIFY unless you know what you are doing.
            launcher=SrunLauncher(overrides=(f'hostname; srun --ntasks={TOTAL_WORKERS} '
                                             f'--ntasks-per-node={WORKERS_PER_NODE} '
                                             f'--gpus-per-task=rtx2080ti:{GPUS_PER_WORKER} '
                                             f'--gpu-bind=map_gpu:{GPU_MAP}')
            ),
            walltime='01:00:00',
        ),
    )],
)

Midway (RCC, UChicago)

_images/20140430_RCC_8978.jpg

The Midway cluster is a campus cluster hosted by the Research Computing Center at the University of Chicago. The snippet below shows an example configuration for executing remotely on Midway. The configuration assumes the user is running on a login node and uses the SlurmProvider to interface with the scheduler, and uses the SrunLauncher to launch workers.

from funcx_endpoint.endpoint.utils.config import Config
from funcx_endpoint.executors import HighThroughputExecutor
from parsl.providers import SlurmProvider
from parsl.launchers import SrunLauncher
from parsl.addresses import address_by_hostname

# PLEASE UPDATE user_opts BEFORE USE
user_opts = {
    'midway': {
        'worker_init': 'source ~/setup_funcx_test_env.sh',
        'scheduler_options': '',
    }
}

config = Config(
    executors=[
        HighThroughputExecutor(
            max_workers_per_node=10,
            address=address_by_hostname(),
            provider=SlurmProvider(
                'broadwl',
                launcher=SrunLauncher(),
                nodes_per_block=2,
                init_blocks=1,
                # string to prepend to #SBATCH blocks in the submit
                # script to the scheduler eg: '#SBATCH --constraint=knl,quad,cache'
                scheduler_options=user_opts['midway']['scheduler_options'],

                # Command to be run before starting a worker, such as:
                # 'module load Anaconda; source activate parsl_env'.
                worker_init=user_opts['midway']['worker_init'],

                min_blocks=1,
                max_blocks=1,
                walltime='00:20:00'
            ),
        )
    ],
    scaling_enabled=True
)

Kubernetes Clusters

_images/kuberneteslogo.eabc6359f48c8e30b7a138c18177f3fd39338e05.png

Kubernetes is an open-source system for container management, such as automating deployment and scaling of containers. The snippet below shows an example configuration for deploying pods as workers on a Kubernetes cluster. The KubernetesProvider exploits the Python Kubernetes API, which assumes that you have kube config in ~/.kube/config.

Theta (ALCF)

_images/ALCF-Theta_111016-1000px.jpg

The following snippet shows an example configuration for executing on Argonne Leadership Computing Facility’s Theta supercomputer. This example uses the HighThroughputExecutor and connects to Theta’s Cobalt scheduler using the CobaltProvider. This configuration assumes that the script is being executed on the login nodes of Theta.

from funcx_endpoint.endpoint.utils.config import Config
from funcx_endpoint.executors import HighThroughputExecutor
from parsl.providers import CobaltProvider
from parsl.launchers import AprunLauncher
from parsl.addresses import address_by_hostname

# PLEASE UPDATE user_opts BEFORE USE
user_opts = {
    'theta': {
        'worker_init': 'source ~/setup_funcx_test_env.sh',
        'scheduler_options': '',
        # Specify the account/allocation to which jobs should be charged
        'account': '<YOUR_THETA_ALLOCATION>'
    }
}

config = Config(
    executors=[
        HighThroughputExecutor(
            max_workers_per_node=1,
            address=address_by_hostname(),
            provider=CobaltProvider(
                queue='debug-flat-quad',
                account=user_opts['theta']['account'],
                launcher=AprunLauncher(overrides="-d 64"),
                # string to prepend to #COBALT blocks in the submit
                # script to the scheduler eg: '#COBALT -t 50'
                scheduler_options=user_opts['theta']['scheduler_options'],

                # Command to be run before starting a worker, such as:
                # 'module load Anaconda; source activate funcx_env'.
                worker_init=user_opts['theta']['worker_init'],

                walltime='00:30:00',
                nodes_per_block=2,
                init_blocks=1,
                min_blocks=1,
                max_blocks=1,
            ),
        )
    ],
    scaling_enabled=True
)

Cori (NERSC)

_images/Cori-NERSC.png

The following snippet shows an example configuration for accessing NERSC’s Cori supercomputer. This example uses the HighThroughputExecutor and connects to Cori’s Slurm scheduler. It is configured to request 2 nodes configured with 1 TaskBlock per node. Finally, it includes override information to request a particular node type (Haswell) and to configure a specific Python environment on the worker nodes using Anaconda.

from funcx_endpoint.endpoint.utils.config import Config
from funcx_endpoint.executors import HighThroughputExecutor
from parsl.providers import SlurmProvider
from parsl.launchers import SrunLauncher
from parsl.addresses import address_by_interface

# PLEASE UPDATE user_opts BEFORE USE
user_opts = {
    'cori': {
        'worker_init': 'source ~/setup_funcx_test_env.sh',
        'scheduler_options': '#SBATCH --constraint=knl,quad,cache',
    }
}

config = Config(
    executors=[
        HighThroughputExecutor(
            label='Cori_HTEX_multinode',
            address=address_by_interface('bond0.144'),
            provider=SlurmProvider(
                'debug',  # Partition / QOS
                nodes_per_block=2,
                init_blocks=1,
                # string to prepend to #SBATCH blocks in the submit
                # script to the scheduler eg: '#SBATCH --constraint=knl,quad,cache'
                scheduler_options=user_opts['cori']['scheduler_options'],

                # Command to be run before starting a worker, such as:
                # 'module load Anaconda; source activate parsl_env'.
                worker_init=user_opts['cori']['worker_init'],

                # We request all hyperthreads on a node.
                launcher=SrunLauncher(overrides='-c 272'),
                walltime='00:10:00',
                # Slurm scheduler on Cori can be slow at times,
                # increase the command timeouts
                cmd_timeout=120,
            ),
        ),
    ],
)

Perlmutter (NERSC)

_images/Nersc9-image-compnew-sizer7-group-type-4-1.jpg

The following snippet shows an example configuration for accessing NERSC’s Perlmutter supercomputer. This example uses the HighThroughputExecutor and connects to Perlmutters’s Slurm scheduler. It is configured to request 2 nodes configured with 1 TaskBlock per node. Finally, it includes override information to request a particular node type (Haswell) and to configure a specific Python environment on the worker nodes using Anaconda.

Note

Please run module load cgpu prior to executing funcx-endpoint start <endpoint_name> on the Cori login nodes to access the Perlmutter queues.

from funcx_endpoint.endpoint.utils.config import Config
from funcx_endpoint.executors import HighThroughputExecutor
from parsl.providers import SlurmProvider
from parsl.launchers import SrunLauncher
from parsl.addresses import address_by_interface

# PLEASE UPDATE user_opts BEFORE USE
user_opts = {
    'perlmutter': {
        'worker_init': 'source ~/setup_funcx_test_env.sh',
        'scheduler_options': '#SBATCH -C gpu'
    }
}

config = Config(
    executors=[
        HighThroughputExecutor(
            label='Cori_HTEX_multinode',
            address=address_by_interface('bond0.144'),
            provider=SlurmProvider(
                'GPU',  # Partition / QOS
                nodes_per_block=2,
                init_blocks=1,
                # string to prepend to #SBATCH blocks in the submit
                # script to the scheduler eg: '#SBATCH --constraint=gpu'
                scheduler_options=user_opts['perlmutter']['scheduler_options'],

                # Command to be run before starting a worker, such as:
                # 'module load Anaconda; source activate parsl_env'.
                worker_init=user_opts['perlmutter']['worker_init'],

                # We request all hyperthreads on a node.
                launcher=SrunLauncher(overrides='-c 272'),
                walltime='00:10:00',
                # Slurm scheduler on Cori can be slow at times,
                # increase the command timeouts
                cmd_timeout=120,
            ),
        ),
    ],
)

Frontera (TACC)

_images/frontera-banner-home.jpg

The following snippet shows an example configuration for accessing the Frontera system at TACC. The configuration below assumes that the user is running on a login node, uses the SlurmProvider to interface with the scheduler, and uses the SrunLauncher to launch workers.

from funcx_endpoint.endpoint.utils.config import Config
from funcx_endpoint.executors import HighThroughputExecutor
from parsl.providers import SlurmProvider
from parsl.launchers import SrunLauncher
from parsl.addresses import address_by_hostname

# PLEASE UPDATE user_opts BEFORE USE
user_opts = {
    'frontera': {
        'worker_init': 'source ~/setup_funcx_test_env.sh',
        'scheduler_options': '#SBATCH -A MCB20024',
        'partition': 'development',

    }
}

config = Config(
    executors=[
        HighThroughputExecutor(
            label="frontera_htex",
            max_workers_per_node=2,
            address=address_by_hostname(),
            provider=SlurmProvider(
                cmd_timeout=60,     # Add extra time for slow scheduler responses
                nodes_per_block=2,
                init_blocks=1,
                min_blocks=0,
                max_blocks=1,
                partition=user_opts['frontera']['partition'],  # Replace with partition name

                # Enter scheduler_options if needed
                scheduler_options=user_opts['frontera']['scheduler_options'],

                # Command to be run before starting a worker, such as:
                # 'module load Anaconda; source activate parsl_env'.
                worker_init=user_opts['frontera']['worker_init'],

                # Ideally we set the walltime to the longest supported walltime.
                walltime='00:10:00',
                launcher=SrunLauncher(),
            ),
        )
    ],
)

Starting an Endpoint

To start a new endpoint run the following command:

$ funcx-endpoint start <ENDPOINT_NAME>

The above command will create a profile for your endpoint in $HOME/.funcx/<ENDPOINT_NAME>/config.py. This file should be updated with the appropriate configurations for the computational system you are targeting before you start the endpoint. To launch the endpoint, simply rerun the above command.

Note

If the ENDPOINT_NAME is not specified, a default endpoint named “default” is started.

Starting an endpoint will perform a registration process with the funcX Web Service. The registration process provides funcX with information regarding the endpoint. The Web Service then creates a Forwarder process for the endpoint and returns a UUID and connection information to the Forwarder. The endpoint will use this connection information to connect to the Forwarder. The endpoint establishes three outbound ZeroMQ channels to the forwarder (on the three ports returned during registration) to retrieve tasks, send results, and communicate command information.

Once started, the endpoint uses a daemon process to run in the background.

Warning

Only the owner of an endpoint is authorized to start an endpoint. Thus if you register with a different Globus Auth identity and try to start an endpoint owned by another identity, it will fail.

Stopping an Endpoint

To stop an endpoint, run the following command:

$ funcx-endpoint stop <ENDPOINT_NAME>

Note

If the ENDPOINT_NAME is not specified, the default endpoint is stopped.

Warning

Run the funcx-endpoint stop command twice to ensure that the endpoint is shutdown.

Listing Endpoints

To list available endpoints on the current system, run:

$ funcx-endpoint list
+---------------+-------------+--------------------------------------+
| Endpoint Name |   Status    |             Endpoint ID              |
+===============+=============+======================================+
| default       | Active      | 1e999502-b434-49a2-a2e0-d925383d2dd4 |
+---------------+-------------+--------------------------------------+
| KNL_test      | Inactive    | 8c01d13c-cfc1-42d9-96d2-52c51784ea16 |
+---------------+-------------+--------------------------------------+
| gpu_cluster   | Initialized | None                                 |
+---------------+-------------+--------------------------------------+

Endpoints can be the following states:

  • Initialized: This status means that the endpoint has been created, but not started following configuration and not registered with the funcx service
  • Active: This status means that the endpoint is active and available for executing functions
  • Inactive: This status means that endpoint is not running right now and therefore, cannot service any functions.