Globus Compute Executor

As a subclass of Python’s concurrent.futures.Executor, Compute SDK’s Executor is simultaneously easier to use than the Client, more performant, and more efficient on the network.

For usage examples and explanations, please see the Executor User Guide.


class globus_compute_sdk.Executor(endpoint_id: UUID | str | None = None, client: Client | None = None, task_group_id: UUID | str | None = None, resource_specification: dict[str, Any] | None = None, user_endpoint_config: dict[str, Any] | None = None, label: str = '', batch_size: int = 128, amqp_port: int | None = None, api_burst_limit: int = 4, api_burst_window_s: int = 16, serializer: ComputeSerializer | None = None, result_serializers: Iterable[SerializationStrategy | type[SerializationStrategy] | str] | None = None, **kwargs)

Extend Python’s Executor base class for Globus Compute’s purposes.

Parameters:
  • endpoint_id – id of the endpoint to which to submit tasks

  • client – instance of Client to be used by the executor. If not provided, the executor will instantiate one with default arguments.

  • task_group_id – The Task Group to which to associate tasks. If not set, one will be instantiated.

  • resource_specification – Specify resource requirements for individual task execution.

  • user_endpoint_config – User endpoint configuration values as described and allowed by endpoint administrators. Must be a JSON-serializable dict or None.

  • result_serializers – A list of Strategylike objects that will be sent to the endpoint for use in serializing task results. The endpoint will attempt to serialize results with each strategy in the list until one succeeds, returning the first successful serialization. N.B.: If this is falsy, the endpoint will use the default strategies.

  • label – a label to name the executor; mainly utilized for logging and advanced needs with multiple executors.

  • batch_size – the maximum number of tasks to coalesce before sending upstream [min: 1, default: 128]

  • amqp_port – Port to use when connecting to results queue. Note that the Compute web services only support 5671, 5672, and 443.

  • api_burst_limit – Number of “free” API calls to allow before engaging client-side (i.e., this executor) rate-limiting. See api_burst_window_s

  • api_burst_window_s – Window of time (in seconds) in which to count API calls for rate-limiting.

  • serializer – Used to serialize task args and kwargs. If passed, and a Client is also passed, this takes precedence over the Client’s serializer.

property endpoint_id

The ID of the endpoint currently associated with this instance. This determines where tasks are sent for execution, and since tasks have to go somewhere, the Executor will not run unless it has an endpoint_id set.

Must be a UUID, valid uuid-like string, or None. Set by simple assignment:

>>> import uuid
>>> from globus_compute_sdk import Executor
>>> ep_id = uuid.uuid4()  # IRL: some *known* endpoint id
>>> gce = Executor(endpoint_id=ep_id)

# Alternatively, may use a stringified uuid:
>>> gce = Executor(endpoint_id=str(ep_id))

# May also alter after construction:
>>> gce.endpoint_id = ep_id
>>> gce.endpoint_id = str(ep_id)

# Internally, it is always stored as a UUID (or None):
>>> gce.endpoint_id
UUID('11111111-2222-4444-8888-000000000000')

# Executors only run if they have an endpoint_id set:
>>> gce = Executor(endpoint_id=None)
Traceback (most recent call last):
    ...
ValueError: No endpoint_id set.  Did you forget to set it at construction?
  Hint:

    gce = Executor(endpoint_id=<ep_id>)
    gce.endpoint_id = <ep_id>    # alternative
property task_group_id

The Task Group with which this instance is currently associated. New tasks will be sent to this Task Group upstream, and the result listener will only listen for results for this group.

Must be a UUID, valid uuid-like string, or None. Set by simple assignment:

>>> import uuid
>>> from globus_compute_sdk import Executor
>>> tg_id = uuid.uuid4()  # IRL: some *known* taskgroup id
>>> gce = Executor(task_group_id=tg_id)

# Alternatively, may use a stringified uuid:
>>> gce = Executor(task_group_id=str(tg_id))

# May also alter after construction:
>>> gce.task_group_id = tg_id
>>> gce.task_group_id = str(tg_id)

# Internally, it is always stored as a UUID (or None):
>>> gce.task_group_id
UUID('11111111-2222-4444-8888-000000000000')

This is typically used when reattaching to a previously initiated set of tasks. See reload_tasks() for more information.

If not set manually, this will be set automatically on submit(), to a Task Group ID supplied by the services. Subsequent Executor objects will reuse the same task group ID by default.

[default: None]

property resource_specification: dict[str, Any] | None

Specify resource requirements for individual task execution.

Must be a JSON-serializable dict or None. Set by simple assignment:

>>> from globus_compute_sdk import Executor
>>> res_spec = {"foo": "bar"}
>>> gce = Executor(resource_specification=res_spec)

# May also alter after construction:
>>> gce.resource_specification = res_spec
property user_endpoint_config: dict[str, Any] | None

The endpoint configuration values, as described and allowed by endpoint administrators, that this instance is currently associated with.

Must be a JSON-serializable dict or None. Set by simple assignment:

>>> from globus_compute_sdk import Executor
>>> uep_config = {"foo": "bar"}
>>> gce = Executor(user_endpoint_config=uep_config)

# May also alter after construction:
>>> gce.user_endpoint_config = uep_config
property amqp_port: int | None

The port to use when connecting to the result queue. Can be one of 443, 5671, 5672, or None. If None, the port is assigned by the Compute web services (which default to 443).

property serializer: ComputeSerializer

Property access to the underlying Client instance’s owned serializer. This is used to serialize function code during function registration, serialize args/ kwargs during task submission, and deserialize results from submitted tasks.

Must be a ComputeSerializer. Set by simple assignment:

>>> from globus_compute_sdk import Executor
>>> from globus_compute_sdk.serialize import CombinedCode, ComputeSerializer
>>> serde = ComputeSerializer(strategy_code=CombinedCode())
>>> gce = Executor(serializer=serde)

# May also alter after construction:
>>> gce.serializer = serde
property result_serializers: Iterable[SerializationStrategy | type[SerializationStrategy] | str] | None

A list of strategies that will be sent to the endpoint for use in serializing task results. The endpoint will attempt to serialize results with each strategy in the list until one succeeds, returning the first successful serialization.

If falsy, the endpoint will try the default strategies, i.e. DEFAULT_STRATEGY_CODE and DEFAULT_STRATEGY_DATA.

Must be valid Strategylike values. Set by simple assignment:

>>> from globus_compute_sdk import Executor
>>> from globus_compute_sdk.serialize import CombinedCode
>>> result_serializers = [CombinedCode()]
>>> gce = Executor(result_serializers=result_serializers)

# May also alter after construction:
>>> gce.result_serializers = result_serializers
register_function(fn: Callable, function_id: str | None = None, **func_register_kwargs) str

Register a task function with this Executor’s cache.

All function execution submissions (i.e., .submit()) communicate which pre-registered function to execute on the endpoint by the function’s identifier, the function_id. This method makes the appropriate API call to the Globus Compute web services to first register the task function, and then stores the returned function_id in the Executor’s cache.

In the standard workflow, .submit() will automatically handle invoking this method, so the common use-case will not need to use this method. However, some advanced use-cases may need to fine-tune the registration of a function and so may manually set the registration arguments via this method.

If a function has already been registered (perhaps in a previous iteration), the upstream API call may be avoided by specifying the known function_id.

If a function already exists in the Executor’s cache, this method will raise a ValueError to help track down the errant double registration attempt.

Parameters:
  • fn – function to be registered for remote execution

  • function_id – if specified, associate the function_id to the fn immediately, short-circuiting the upstream registration call.

  • func_register_kwargs – all other keyword arguments are passed to the Client.register_function().

Returns:

the function’s function_id string, as returned by registration upstream

Raises:

ValueError – raised if a function has already been registered with this Executor

register_source_code(source: str, function_name: str, **registration_kwargs) str

Register arbitrary Python source code with entrypoint function

The standard .register_function() method expects a callable function object, which it then serializes using the specified code serialization strategy. In contrast, this method enables the user to directly provide an arbitrary source code string and entrypoint function name.

As with .register_function(), this method will store the registered function source code and function_id in the Executor’s cache. If a function is already in the cache, this method raise a ValueError to help track down the errant double registration attempt.

Important

This method will ignore the current code serialization strategy and use PureSourceTextInspect instead.

Parameters:
  • source – The source code string

  • function_name – The name of the entrypoint function within the source code

  • registration_kwargs – Additional keyword arguments passed to Client.register_source_code

Returns:

UUID string of the registered function

submit(fn, *args, **kwargs)

Submit a function to be executed on the Executor’s specified endpoint with the given arguments.

Schedules the callable to be executed as fn(*args, **kwargs) and returns a ComputeFuture instance representing the execution of the callable.

Example use:

>>> def add(a: int, b: int) -> int: return a + b
>>> gce = Executor(endpoint_id="some-ep-id")
>>> fut = gce.submit(add, 1, 2)
>>> fut.result()    # wait (block) until result is received from remote
3
Parameters:
  • fn – Python function to execute on endpoint

  • args – positional arguments (if any) as required to execute the function

  • kwargs – keyword arguments (if any) as required to execute the function

Returns:

a future object that will receive a .task_id when the Globus Compute Web Service acknowledges receipt, and eventually will have a .result() when the Globus Compute web services receive and stream it.

submit_to_registered_function(function_id: str, args: tuple | None = None, kwargs: dict | None = None)

Request an execution of an already registered function.

This method supports use of public functions with the Executor, or knowledge of an already registered function. An example use might be:

# pre_registration.py
from globus_compute_sdk import Executor

def some_processor(*args, **kwargs):
    # ... function logic ...
    return ["some", "result"]

gce = Executor()
fn_id = gce.register_function(some_processor)
print(f"Function registered successfully.\nFunction ID: {fn_id}")

# Example output:
#
# Function registered successfully.
# Function ID: c407ae80-b31f-447a-9fa6-124098492057

In this case, the function would be privately registered to you, but note that the function id is just a string. One could substitute for a publicly available function. For instance, b0a5d1a0-2b22-4381-b899-ba73321e41e0 is a “well-known” uuid for the “Hello, World!” function (same as the example in the Globus Compute tutorial), which is publicly available:

from globus_compute_sdk import Executor

fn_id = "b0a5d1a0-2b22-4381-b899-ba73321e41e0"  # public; "Hello World"
with Executor(endpoint_id="your-endpoint-id") as fxe:
    futs = [
        fxe.submit_to_registered_function(function_id=fn_id)
        for i in range(5)
    ]

for f in futs:
    print(f.result())
Parameters:
  • function_id – identifier (str) of registered Python function

  • args – positional arguments (if any) as required to execute the function

  • kwargs – keyword arguments (if any) as required to execute the function

Returns:

a future object that (eventually) will have a .result() when the Globus Compute web services receive and stream it.

map(fn: Callable, *iterables, timeout=None, chunksize=1) Iterator

Globus Compute does not currently implement the .map() method of the Executor interface. In a naive implementation, this method would merely be syntactic sugar for bulk use of the .submit() method. For example:

def map(fxexec, fn, *fn_args_kwargs):
    return [fxexec.submit(fn, *a, **kw) for a, kw in fn_args_kwargs]

This naive implementation ignores a number of potential optimizations, so we have decided to look at this at a future date if there is interest.

Raises:
reload_tasks(task_group_id: UUID | str | None = None) Iterable[ComputeFuture]

Load the set of tasks associated with this Executor’s Task Group from the web services and return a list of futures, one for each task. This is nominally intended to “reattach” to a previously initiated session, based on the Task Group ID.

Parameters:

task_group_id – Optionally specify a task_group_id to use. If present, will overwrite the Executor’s task_group_id

Returns:

An iterable of futures.

Raises:
  • ValueError – if the server response is incorrect or invalid

  • KeyError – the server did not return an expected response

  • various – the usual (unhandled) request errors (e.g., no connection; invalid authorization)

Notes

Any previous futures received from this executor will be cancelled.

shutdown(wait=True, *, cancel_futures=False)

Clean-up the resources associated with the Executor.

It is safe to call this method several times. Otherwise, no other methods can be called after this one.

Parameters:
  • wait – If True, then this method will not return until all pending futures have received results.

  • cancel_futures – If True, then this method will cancel all futures that have not yet registered their tasks with the Compute web services. Tasks cannot be cancelled once they are registered.

get_worker_hardware_details() str

Retrieve hardware information about worker nodes for the endpoint associated with this Executor. For example:

from globus_compute_sdk import Executor
with Executor(ep_uuid) as gcx:
    print(gcx.get_worker_hardware_details())
class globus_compute_sdk.sdk.executor.ComputeFuture(task_id: str | None = None)

Extend concurrent.futures.Future to include an optional task UUID.

Initializes the future. Should not be called by clients.

task_id: str | None

The UUID for the task behind this Future. In batch mode, this will not be populated immediately, but will appear later when the task is submitted to the Globus Compute services.