Globus Compute Executor¶
- class globus_compute_sdk.Executor(endpoint_id: UUID | str | None = None, container_id: UUID | str | None = None, client: Client | None = None, task_group_id: UUID | str | None = None, resource_specification: dict[str, Any] | None = None, user_endpoint_config: dict[str, Any] | None = None, label: str = '', batch_size: int = 128, amqp_port: int | None = None, api_burst_limit: int = 4, api_burst_window_s: int = 16, **kwargs)¶
Extend Python’s
Executor
base class for Globus Compute’s purposes.- Parameters:
endpoint_id – id of the endpoint to which to submit tasks
container_id – id of the container in which to execute tasks
client – instance of Client to be used by the executor. If not provided, the executor will instantiate one with default arguments.
task_group_id – The Task Group to which to associate tasks. If not set, one will be instantiated.
resource_specification – Specify resource requirements for individual task execution.
user_endpoint_config – User endpoint configuration values as described and allowed by endpoint administrators. Must be a JSON-serializable dict or None.
label – a label to name the executor; mainly utilized for logging and advanced needs with multiple executors.
batch_size – the maximum number of tasks to coalesce before sending upstream [min: 1, default: 128]
amqp_port – Port to use when connecting to results queue. Note that the Compute web services only support 5671, 5672, and 443.
api_burst_limit – Number of “free” API calls to allow before engaging client-side (i.e., this executor) rate-limiting. See
api_burst_window_s
api_burst_window_s – Window of time (in seconds) in which to count API calls for rate-limiting.
- property endpoint_id¶
The ID of the endpoint currently associated with this instance. This determines where tasks are sent for execution, and since tasks have to go somewhere, the Executor will not run unless it has an endpoint_id set.
Must be a UUID, valid uuid-like string, or None. Set by simple assignment:
>>> import uuid >>> from globus_compute_sdk import Executor >>> ep_id = uuid.uuid4() # IRL: some *known* endpoint id >>> gce = Executor(endpoint_id=ep_id) # Alternatively, may use a stringified uuid: >>> gce = Executor(endpoint_id=str(ep_id)) # May also alter after construction: >>> gce.endpoint_id = ep_id >>> gce.endpoint_id = str(ep_id) # Internally, it is always stored as a UUID (or None): >>> gce.endpoint_id UUID('11111111-2222-4444-8888-000000000000') # Executors only run if they have an endpoint_id set: >>> gce = Executor(endpoint_id=None) Traceback (most recent call last): ... ValueError: No endpoint_id set. Did you forget to set it at construction? Hint: gce = Executor(endpoint_id=<ep_id>) gce.endpoint_id = <ep_id> # alternative
- property task_group_id¶
The Task Group with which this instance is currently associated. New tasks will be sent to this Task Group upstream, and the result listener will only listen for results for this group.
Must be a UUID, valid uuid-like string, or None. Set by simple assignment:
>>> import uuid >>> from globus_compute_sdk import Executor >>> tg_id = uuid.uuid4() # IRL: some *known* taskgroup id >>> gce = Executor(task_group_id=tg_id) # Alternatively, may use a stringified uuid: >>> gce = Executor(task_group_id=str(tg_id)) # May also alter after construction: >>> gce.task_group_id = tg_id >>> gce.task_group_id = str(tg_id) # Internally, it is always stored as a UUID (or None): >>> gce.task_group_id UUID('11111111-2222-4444-8888-000000000000')
This is typically used when reattaching to a previously initiated set of tasks. See reload_tasks() for more information.
If not set manually, this will be set automatically on submit(), to a Task Group ID supplied by the services. Subsequent Executor objects will reuse the same task group ID by default.
[default:
None
]
- property resource_specification: dict[str, Any] | None¶
Specify resource requirements for individual task execution.
Must be a JSON-serializable dict or None. Set by simple assignment:
>>> from globus_compute_sdk import Executor >>> res_spec = {"foo": "bar"} >>> gce = Executor(resource_specification=res_spec) # May also alter after construction: >>> gce.resource_specification = res_spec
- property user_endpoint_config: dict[str, Any] | None¶
The endpoint configuration values, as described and allowed by endpoint administrators, that this instance is currently associated with.
Must be a JSON-serializable dict or None. Set by simple assignment:
>>> from globus_compute_sdk import Executor >>> uep_config = {"foo": "bar"} >>> gce = Executor(user_endpoint_config=uep_config) # May also alter after construction: >>> gce.user_endpoint_config = uep_config
- property container_id: UUID | None¶
The container id with which this Executor instance is currently associated. Tasks submitted after this is set will use this container.
Must be a UUID, valid uuid-like string, or None. Set by simple assignment:
>>> import uuid >>> from globus_compute_sdk import Executor >>> c_id = "00000000-0000-0000-0000-000000000000" # some known container id >>> c_as_uuid = uuid.UUID(c_id) >>> gce = Executor(container_id=c_id) # May also alter after construction: >>> gce.container_id = c_id >>> gce.container_id = c_as_uuid # also accepts a UUID object # Internally, it is always stored as a UUID (or None): >>> gce.container_id UUID('00000000-0000-0000-0000-000000000000')
[default:
None
]
- property amqp_port: int | None¶
The port to use when connecting to the result queue. Can be one of 443, 5671, 5672, or None. If None, the port is assigned by the Compute web services (which default to 443).
- register_function(fn: Callable, function_id: str | None = None, **func_register_kwargs) str ¶
Register a task function with this Executor’s cache.
All function execution submissions (i.e.,
.submit()
) communicate which pre-registered function to execute on the endpoint by the function’s identifier, thefunction_id
. This method makes the appropriate API call to the Globus Compute web services to first register the task function, and then stores the returnedfunction_id
in the Executor’s cache.In the standard workflow,
.submit()
will automatically handle invoking this method, so the common use-case will not need to use this method. However, some advanced use-cases may need to fine-tune the registration of a function and so may manually set the registration arguments via this method.If a function has already been registered (perhaps in a previous iteration), the upstream API call may be avoided by specifying the known
function_id
.If a function already exists in the Executor’s cache, this method will raise a ValueError to help track down the errant double registration attempt.
- Parameters:
fn – function to be registered for remote execution
function_id – if specified, associate the
function_id
to thefn
immediately, short-circuiting the upstream registration call.func_register_kwargs – all other keyword arguments are passed to the
Client.register_function()
.
- Returns:
the function’s
function_id
string, as returned by registration upstream- Raises:
ValueError – raised if a function has already been registered with this Executor
- submit(fn, *args, **kwargs)¶
Submit a function to be executed on the Executor’s specified endpoint with the given arguments.
Schedules the callable to be executed as
fn(*args, **kwargs)
and returns a ComputeFuture instance representing the execution of the callable.Example use:
>>> def add(a: int, b: int) -> int: return a + b >>> gce = Executor(endpoint_id="some-ep-id") >>> fut = gce.submit(add, 1, 2) >>> fut.result() # wait (block) until result is received from remote 3
- Parameters:
fn – Python function to execute on endpoint
args – positional arguments (if any) as required to execute the function
kwargs – keyword arguments (if any) as required to execute the function
- Returns:
a future object that will receive a
.task_id
when the Globus Compute Web Service acknowledges receipt, and eventually will have a.result()
when the Globus Compute web services receive and stream it.
- submit_to_registered_function(function_id: str, args: tuple | None = None, kwargs: dict | None = None)¶
Request an execution of an already registered function.
This method supports use of public functions with the Executor, or knowledge of an already registered function. An example use might be:
# pre_registration.py from globus_compute_sdk import Executor def some_processor(*args, **kwargs): # ... function logic ... return ["some", "result"] gce = Executor() fn_id = gce.register_function(some_processor) print(f"Function registered successfully.\nFunction ID: {fn_id}") # Example output: # # Function registered successfully. # Function ID: c407ae80-b31f-447a-9fa6-124098492057
In this case, the function would be privately registered to you, but note that the function id is just a string. One could substitute for a publicly available function. For instance,
b0a5d1a0-2b22-4381-b899-ba73321e41e0
is a “well-known” uuid for the “Hello, World!” function (same as the example in the Globus Compute tutorial), which is publicly available:from globus_compute_sdk import Executor fn_id = "b0a5d1a0-2b22-4381-b899-ba73321e41e0" # public; "Hello World" with Executor(endpoint_id="your-endpoint-id") as fxe: futs = [ fxe.submit_to_registered_function(function_id=fn_id) for i in range(5) ] for f in futs: print(f.result())
- Parameters:
function_id – identifier (str) of registered Python function
args – positional arguments (if any) as required to execute the function
kwargs – keyword arguments (if any) as required to execute the function
- Returns:
a future object that (eventually) will have a
.result()
when the Globus Compute web services receive and stream it.
- map(fn: Callable, *iterables, timeout=None, chunksize=1) Iterator ¶
Globus Compute does not currently implement the .map() method of the Executor interface. In a naive implementation, this method would merely be syntactic sugar for bulk use of the
.submit()
method. For example:def map(fxexec, fn, *fn_args_kwargs): return [fxexec.submit(fn, *a, **kw) for a, kw in fn_args_kwargs]
This naive implementation ignores a number of potential optimizations, so we have decided to look at this at a future date if there is interest.
- Raises:
RuntimeError – if called after shutdown, otherwise, …
NotImplementedError – … always raised
- reload_tasks(task_group_id: UUID | str | None = None) Iterable[ComputeFuture] ¶
Load the set of tasks associated with this Executor’s Task Group from the web services and return a list of futures, one for each task. This is nominally intended to “reattach” to a previously initiated session, based on the Task Group ID.
- Parameters:
task_group_id – Optionally specify a task_group_id to use. If present, will overwrite the Executor’s task_group_id
- Returns:
An iterable of futures.
- Raises:
ValueError – if the server response is incorrect or invalid
KeyError – the server did not return an expected response
various – the usual (unhandled) request errors (e.g., no connection; invalid authorization)
Notes
Any previous futures received from this executor will be cancelled.
- shutdown(wait=True, *, cancel_futures=False)¶
Clean-up the resources associated with the Executor.
It is safe to call this method several times. Otherwise, no other methods can be called after this one.
- Parameters:
wait – If True, then this method will not return until all pending futures have received results.
cancel_futures – If True, then this method will cancel all futures that have not yet registered their tasks with the Compute web services. Tasks cannot be cancelled once they are registered.
- class globus_compute_sdk.sdk.executor.ComputeFuture(task_id: str | None = None)¶
Extend concurrent.futures.Future to include an optional task UUID.
Initializes the future. Should not be called by clients.