Skip to main content

I/O and Verification using Host Tasks

While Celerity is focused on accelerator computations, real-world applications will contain host code as part of their data flow. This may include calls to specialized libraries, distributed I/O operations or verification.

To integrate such tasks into an asynchronous distributed program, Celerity offers host tasks with semantics specialized for these different applications. Similar to compute tasks, they are scheduled through the command group handler using the celerity::handler::host_task family of methods.

Host tasks are executed in a background thread pool on each participating node and may execute concurrently.

Master-Node Host Tasks

The simplest kind of host task is executed on the master node only. The relevant overload of host_task is selected with the on_master_node tag:

cgh.host_task(celerity::on_master_node, []{ ... });

Buffers can be accessed in the usual fashion, although there is no item structure passed into the kernel. Instead, when constructing an accessor, a fixed or all range mapper is used to specify the range of interest. Also, the *_host_task selector must be used for selecting the access mode.

celerity::distr_queue q;
celerity::buffer<float, 1> result;
q.submit([&](celerity::handler &cgh) {
celerity::accessor acc{buffer, cgh, celerity::access::all{},
celerity::read_only_host_task};
cgh.host_task(celerity::on_master_node, [=]{
printf("The result is %g\n", acc[0]);
});
});

Distributed Host Tasks

If a computation involving host code is to be distributed across a cluster, Celerity can split the iteration space accordingly. Such a distributed host task is created by passing a global size to host_task:

cgh.host_task(global_size, [](celerity::partition<Dims>) { ... });
cgh.host_task(global_size, global_offset, [](celerity::partition<Dims>) { ... });

Instead of the per-item kernel invocation of handler::parallel_for that is useful for accelerator computations, ther host kernel will receive partitions of the iteration space. They describe the iteration sub-space this node receives:

celerity::distr_queue q;
q.submit([&](celerity::handler &cgh) {
cgh.host_task(celerity::range<1>(100), [=](celerity::partition<1> part) {
printf("This node received %zu items\n", part.get_subrange().range[0]);
});
});

In distributed host tasks, buffers can be accessed using the same range mappers as in device computations with the expected semantics.

Celerity makes no guarantees about the granularity of the split. Also, some nodes may receive multiple concurrent invocations of the kernel while others might not participate in the host task at all.

Experimental: Collective Host Tasks

Note: This feature is subject to change.

Efficient distributed I/O routines need to perform collective operations accross a cluster, i.e. operations in which all nodes participate simultaneously. A popular API that makes use of this feature is Parallel HDF5, providing access to array data on the file system through an API using MPI I/O as the underlying driver (See the Celerity distr_io example for a demonstration).

Invoking collective operations from a Celerity program requires additional support from the runtime to guarantee proper ordering of MPI calls, the participation of each node in the operation and absence of race conditions between concurrent host tasks on the same queue. To request a host task satisfying these conditions, use the experimental::collective tag:

cgh.host_task(celerity::experimental::collective,
[](celerity::experimental::collective_partition part) { ... });

collective_partition is a specialization of the one-dimensional partition. Note how no global size is passed to the host task. Instead, the runtime creates a one-dimensional iteration space where the size is the number of participating nodes, and the single-element subrange on each node is the node index. Additionally, collective_partition provides access to the MPI communicator for this task:

celerity::distr_queue q;
q.submit([](celerity::handler &cgh) {
cgh.host_task(celerity::experimental::collective,
[](celerity::experimental::collective_partition> part) {
MPI_Comm comm = part.get_collective_mpi_comm();
MPI_Barrier(comm);
});
});

Third-party APIs using MPI collectives will have a MPI_Comm parameter where this communicator can be passed in. Celerity guarantees the communicator to be free of race conditions with other operations for the duration of the host task. If multiple collective tasks are scheduled, they receive the same MPI communicator.

Collective Groups

To guarantee proper ordering of MPI operations across the cluster, collective host tasks on the same communicator must neither be run concurrently nor be reordered on one node. In case there are multiple independent collective operations eligible to be run concurrently, Celerity can be notified of this by using collective groups:

celerity::distr_queue q;
celerity::experimental::collective_group first_group;
celerity::experimental::collective_group second_group;
q.submit([&](celerity::handler &cgh) {
cgh.host_task(celerity::experimental::collective(first_group), []...);
});
q.submit([&](celerity::handler &cgh) {
cgh.host_task(celerity::experimental::collective(second_group), []...);
});

Since these two host tasks use different collective groups and are also independent with regards to their buffer accesses, they can now be executed concurrently. For this purpose, each kernel receives a MPI communicator unique to its collective group. The prior example without explicit mentions of a collective group implicitly binds to celerity::experimental::default_collective_group.

Buffer Access from a Collective Host Task

Collective host tasks are special in that they receive an implicit one-dimensional iteration space that just identifies the participating nodes. To access buffers in a meaningful way, these node indices must be translated to buffer regions. In the typical Celerity fashion, this is handled via range mappers.

The celerity::experimental::even_split range mapper maps a one-dimensional range onto arbitrary-dimensional buffers by splitting them along the first (slowest) dimension into contiguous memory portions. celerity::accessor::get_allocation_window can then be used to retrieve the host-local chunk of the buffer:

celerity::distr_queue q;
celerity::buffer<float, 2> buf;
q.submit([&](celerity::handler& cgh) {
celerity::accessor acc{buffer, cgh,
celerity::experimental::access::even_split<2>{},
celerity::read_only_host_task};
cgh.host_task(celerity::experimental::collective,
[=](celerity::experimental::collective_partition part) {
auto aw = acc.get_allocation_window(part);
// ...
});
});