funcx.mock_broker package

Submodules

funcx.mock_broker.forwarder module

class funcx.mock_broker.forwarder.Forwarder(task_q, result_q, executor, endpoint_id, logdir='forwarder', logging_level=20)

Bases: multiprocessing.context.Process

Forwards tasks/results between the executor and the queues

Tasks_Q Results_Q
^

V |

Executors

Todo : We need to clarify what constitutes a task that comes down the task pipe. Does it already have the code fragment? Or does that need to be sorted out from some DB ?

connection_info

Get the client ports to which the interchange must connect to

handle_app_update(task_id, future)

Triggered when the executor sees a task complete.

This can be further optimized at the executor level, where we trigger this or a similar function when we see a results item inbound from the interchange.

run()

Process entry point.

funcx.mock_broker.forwarder.double(x)
funcx.mock_broker.forwarder.failer(x)
funcx.mock_broker.forwarder.spawn_forwarder(address, executor=None, task_q=None, result_q=None, endpoint_id=UUID('eae812e0-f199-48d5-b4ec-a9983f51d36d'), logging_level=20)

Spawns a forwarder and returns the forwarder process for tracking.

Parameters:
  • address (str) – IP Address to which the endpoint must connect
  • executor (Executor object. Optional) – Executor object to be instantiated.
  • task_q (Queue object) – Queue object matching funcx.queues.base.FuncxQueue interface
  • logging_level (int) – Logging level as defined in the logging module. Default: logging.INFO (20)
  • endpoint_id (uuid string) – Endpoint id for which the forwarder is being spawned.
  • Returns – A Forwarder object

funcx.mock_broker.mock_broker module

The broker service

This REST service fields incoming registration requests from endpoints, creates an appropriate forwarder to which the endpoint can connect up.

funcx.mock_broker.mock_broker.list_mappings()
funcx.mock_broker.mock_broker.register()

Register an endpoint request

  1. Start an executor client object corresponding to the endpoint
  2. Pass connection info back as a json response.

funcx.mock_broker.mock_tester module

funcx.mock_broker.mock_tester.test(address)

funcx.mock_broker.test module

funcx.mock_broker.test.double(x)
funcx.mock_broker.test.fail(x)
funcx.mock_broker.test.test_1()
funcx.mock_broker.test.test_2()
funcx.mock_broker.test.test_3()

Module contents