Skip to main content

class receive_arbiter

Declaration

class receive_arbiter { /* full declaration omitted */ };

Description

Matches receive instructions to inbound pilots and triggers in-place payload receives on the communicator. For scalability reasons, distributed command graph generation only yields exact destinations and buffer sub-ranges for push commands, while await-pushes do not carry such information - they just denote the full region to be received. Sender nodes later communicate the exact ranges to the receiver during execution time via pilot messages that are generated alongside the instruction graph. The receive_arbiter's job is to match these inbound pilots to receive instructions generated from await-push commands to issue in-place receives (i.e. MPI_Recv) of the data into an appropriate host allocation. Since these inputs may arrive in arbitrary order, it maintains a separate state machine for each transfer_id to drive all operations that eventually result in completing an async_event for each receive instruction.

Member Variables

communicator* m_comm
size_t m_num_nodes
std::unordered_map< transfer_id, receive_arbiter_detail::transfer> m_transfers
State machines for all `transfer_id`s that were mentioned in an inbound pilot or call to one of the receive functions. Once a transfer is complete, it is cleared from `m_transfers`, but `multi_region_transfer`s can be re-created if there later appears another pair of inbound pilots and `receive`s for the same transfer id that did not temporally overlap with the original ones.

Member Function Overview

  • await_split_receive_subregion(const transfer_id & trid, const region<3> & subregion) → [[nodiscard]] async_event
  • begin_split_receive(const transfer_id & trid, const region<3> & request, void * allocation, const box<3> & allocated_box, size_t elem_size) → void
  • gather_receive(const transfer_id & trid, void * allocation, size_t node_chunk_size) → [[nodiscard]] async_event
  • operator=(const receive_arbiter &) → receive_arbiter &
  • operator=(receive_arbiter &&) → receive_arbiter &
  • poll_communicator() → void
  • receive(const transfer_id & trid, const region<3> & request, void * allocation, const box<3> & allocated_box, size_t elem_size) → [[nodiscard]] async_event
  • receive_arbiter(communicator & comm) → explicit
  • receive_arbiter(const receive_arbiter &)
  • receive_arbiter(receive_arbiter &&)
  • ~receive_arbiter()
  • handle_gather_request_pilot(receive_arbiter_detail::gather_request & gr, const inbound_pilot & pilot) → void
  • handle_region_request_pilot(receive_arbiter_detail::region_request & rr, const inbound_pilot & pilot, size_t elem_size) → void
  • initiate_region_request(const transfer_id & trid, const region<3> & request, void * allocation, const box<3> & allocated_box, size_t elem_size) → receive_arbiter_detail::stable_region_request &

Member Functions

[[nodiscard]] async_event
await_split_receive_subregion(
    const transfer_id& trid,
    const region<3>& subregion)

Description

To be called after begin_split_receive to await receiving a subregion of the original request. Subregions passed to different invocations of this function may overlap, but must not exceed the original request. If the entire split-receive has finished already, this will return an instantly complete event.

Parameters

const transfer_id& trid
const region<3>& subregion

void begin_split_receive(
    const transfer_id& trid,
    const region<3>& request,
    void* allocation,
    const box<3>& allocated_box,
    size_t elem_size)

Description

Begin the reception of a buffer region into an existing allocation similar to receive, but do not await its completion with a single async_event. Instead, the caller must follow up with calls to await_split_receive_subregion to the same transfer_id whose request regions do not necessarily have to be disjoint, but whose union must be equal to the original request.

Parameters

const transfer_id& trid
const region<3>& request
void* allocation
const box<3>& allocated_box
size_t elem_size

[[nodiscard]] async_event gather_receive(
    const transfer_id& trid,
    void* allocation,
    size_t node_chunk_size)

Description

Receive a contiguous chunk of data from every peer node, placing the results in allocation[node_chunk_size * node_id]. The location reserved for the local node is not written to and may be concurrently accessed while this operation is in progress. If a peer node announces that it will not contribute to this transfer by sending an empty-box pilot, its location will also remain unmodified. This feature is a temporary solution until we implement inter-node reductions through inter-node collectives.

Parameters

const transfer_id& trid
void* allocation
size_t node_chunk_size

receive_arbiter& operator=(const receive_arbiter&)

Parameters

const receive_arbiter&

receive_arbiter& operator=(receive_arbiter&&)

Parameters

receive_arbiter&&

void poll_communicator()

Description

Polls the communicator for inbound pilots and advances the state of all ongoing receive operations. This is expected to be called in a loop unconditionally.


[[nodiscard]] async_event receive(
    const transfer_id& trid,
    const region<3>& request,
    void* allocation,
    const box<3>& allocated_box,
    size_t elem_size)

Description

Receive a buffer region associated with a single transfer id trid into an existing allocation with size allocated_box.size() * elem_size. The request region must be fully contained in allocated_box, and the caller must ensure that it the communicator will not receive an inbound pilot that intersects request without being fully contained in it. The returned async_event will complete once the receive is complete.

Parameters

const transfer_id& trid
const region<3>& request
void* allocation
const box<3>& allocated_box
size_t elem_size

explicit receive_arbiter(communicator& comm)

Description

receive_arbiter will use comm to poll for inbound pilots and issue payload-receives.

Parameters

communicator& comm

receive_arbiter(const receive_arbiter&)

Parameters

const receive_arbiter&

receive_arbiter(receive_arbiter&&)

Parameters

receive_arbiter&&

~receive_arbiter()


void handle_gather_request_pilot(
    receive_arbiter_detail::gather_request& gr,
    const inbound_pilot& pilot)

Description

Updates the state of an active gather_request from receiving an inbound pilot.

Parameters

receive_arbiter_detail::gather_request& gr
const inbound_pilot& pilot

void handle_region_request_pilot(
    receive_arbiter_detail::region_request& rr,
    const inbound_pilot& pilot,
    size_t elem_size)

Description

Updates the state of an active region_request from receiving an inbound pilot.

Parameters

receive_arbiter_detail::region_request& rr
const inbound_pilot& pilot
size_t elem_size

receive_arbiter_detail::stable_region_request&
initiate_region_request(
    const transfer_id& trid,
    const region<3>& request,
    void* allocation,
    const box<3>& allocated_box,
    size_t elem_size)

Description

Initiates a new region_request for which the caller can construct events to await either the entire region or sub-regions.

Parameters

const transfer_id& trid
const region<3>& request
void* allocation
const box<3>& allocated_box
size_t elem_size