funcx.strategies package

Submodules

funcx.strategies.base module

class funcx.strategies.base.BaseStrategy(*args, threshold=20, interval=5)

Bases: object

Implements threshold-interval based flow control.

The overall goal is to trap the flow of apps from the workflow, measure it and redirect it the appropriate executors for processing.

This is based on the following logic:

BEGIN (INTERVAL, THRESHOLD, callback) :
    start = current_time()

    while (current_time()-start < INTERVAL) :
         count = get_events_since(start)
         if count >= THRESHOLD :
             break

    callback()

This logic ensures that the callbacks are activated with a maximum delay of interval for systems with infrequent events as well as systems which would generate large bursts of events.

Once a callback is triggered, the callback generally runs a strategy method on the sites available as well asqeuque

TODO: When the debug logs are enabled this module emits duplicate messages. This issue needs more debugging. What I’ve learnt so far is that the duplicate messages are present only when the timer thread is started, so this could be from a duplicate logger being added by the thread.

close()

Merge the threads and terminate.

make_callback(kind=None)

Makes the callback and resets the timer.

KWargs:
  • kind (str): Default=None, used to pass information on what triggered the callback
notify(event_id)

Let the FlowControl system know that there is an event.

This method is to be called from the Interchange to notify the flowcontrol

start(interchange)

Actually start the strategy :param interchange: Interchange to bind the strategy to :type interchange: funcx.executors.high_throughput.interchange.Interchange

strategize(*args, **kwargs)

Strategize is called everytime the threshold or the interval is hit

class funcx.strategies.base.Timer(callback, *args, interval=5)

Bases: object

This timer is a simplified version of the FlowControl timer. This timer does not employ notify events.

This is based on the following logic :

BEGIN (INTERVAL, THRESHOLD, callback) :
    start = current_time()

    while (current_time()-start < INTERVAL) :
         wait()
         break

    callback()
close()

Merge the threads and terminate.

make_callback(kind=None)

Makes the callback and resets the timer.

funcx.strategies.kube_simple module

class funcx.strategies.kube_simple.KubeSimpleStrategy(*args, threshold=20, interval=1, max_idletime=60)

Bases: funcx.strategies.base.BaseStrategy

Implements the simple strategy for Kubernetes

strategize(*args, **kwargs)

Strategize is called everytime the threshold or the interval is hit

funcx.strategies.simple module

class funcx.strategies.simple.SimpleStrategy(*args, threshold=20, interval=1, max_idletime=60)

Bases: funcx.strategies.base.BaseStrategy

Implements the simple strategy

strategize(*args, **kwargs)

Strategize is called everytime the threshold or the interval is hit

funcx.strategies.test module

Module contents

class funcx.strategies.BaseStrategy(*args, threshold=20, interval=5)

Bases: object

Implements threshold-interval based flow control.

The overall goal is to trap the flow of apps from the workflow, measure it and redirect it the appropriate executors for processing.

This is based on the following logic:

BEGIN (INTERVAL, THRESHOLD, callback) :
    start = current_time()

    while (current_time()-start < INTERVAL) :
         count = get_events_since(start)
         if count >= THRESHOLD :
             break

    callback()

This logic ensures that the callbacks are activated with a maximum delay of interval for systems with infrequent events as well as systems which would generate large bursts of events.

Once a callback is triggered, the callback generally runs a strategy method on the sites available as well asqeuque

TODO: When the debug logs are enabled this module emits duplicate messages. This issue needs more debugging. What I’ve learnt so far is that the duplicate messages are present only when the timer thread is started, so this could be from a duplicate logger being added by the thread.

close()

Merge the threads and terminate.

make_callback(kind=None)

Makes the callback and resets the timer.

KWargs:
  • kind (str): Default=None, used to pass information on what triggered the callback
notify(event_id)

Let the FlowControl system know that there is an event.

This method is to be called from the Interchange to notify the flowcontrol

start(interchange)

Actually start the strategy :param interchange: Interchange to bind the strategy to :type interchange: funcx.executors.high_throughput.interchange.Interchange

strategize(*args, **kwargs)

Strategize is called everytime the threshold or the interval is hit

class funcx.strategies.SimpleStrategy(*args, threshold=20, interval=1, max_idletime=60)

Bases: funcx.strategies.base.BaseStrategy

Implements the simple strategy

strategize(*args, **kwargs)

Strategize is called everytime the threshold or the interval is hit

class funcx.strategies.KubeSimpleStrategy(*args, threshold=20, interval=1, max_idletime=60)

Bases: funcx.strategies.base.BaseStrategy

Implements the simple strategy for Kubernetes

strategize(*args, **kwargs)

Strategize is called everytime the threshold or the interval is hit