class task_manager
Declaration
class task_manager { /* full declaration omitted */ };
Member Variables
- static const task_id initial_epoch_task = 0
- const size_t m_num_collective_nodes
- policy_set m_policy
- task_ring_buffer m_task_buffer
- task_id m_epoch_for_new_tasks = {initial_epoch_task}
- std::unordered_map<buffer_id, buffer_state> m_buffers
- std::unordered_map<collective_group_id, task_id> m_last_collective_tasks
- std::unordered_map<host_object_id, host_object_state> m_host_objects
- std::vector<task_callback> m_task_callbacks
- int m_task_horizon_step_size = 4
- int m_task_horizon_max_parallelism = 64
- int m_max_pseudo_critical_path_length = 0
- int m_current_horizon_critical_path_length = 0
- std::optional<task_id> m_current_horizon
- std::optional<task_id> m_latest_horizon_reached
- epoch_monitor m_latest_epoch_reached = {initial_epoch_task}
- std::unordered_set<task*> m_execution_front
- detail::task_recorder* m_task_recorder
Member Function Overview
- await_epoch(task_id epoch) → void
- find_task(task_id tid) const → const task *
- generate_epoch_task(epoch_action action) → task_id
- generate_fence_task(buffer_access_map access_map, side_effect_map side_effects, std::unique_ptr<fence_promise> fence_promise) → task_id
- get_current_task_count() const → size_t
- get_task(task_id tid) const → const task *
- get_total_task_count() const → size_t
- has_task(task_id tid) const → bool
- notify_buffer_created(buffer_id bid, const range<3> & range, bool host_initialized) → void
- notify_buffer_debug_name_changed(buffer_id bid, const std::string & name) → void
- notify_buffer_destroyed(buffer_id bid) → void
- notify_epoch_reached(task_id epoch_tid) → void
- notify_horizon_reached(task_id horizon_tid) → void
- notify_host_object_created(host_object_id hoid) → void
- notify_host_object_destroyed(host_object_id hoid) → void
- register_task_callback(task_callback cb) → void
- set_horizon_max_parallelism(int para) → void
- set_horizon_step(int step) → void
- template <typename CGF>
submit_command_group(CGF cgf) → task_id - task_manager(size_t num_collective_nodes, detail::task_recorder * recorder, const policy_set & policy = default_policy_set())
- ~task_manager() → virtual
- add_dependency(task & depender, task & dependee, dependency_kind kind, dependency_origin origin) → void
- await_free_task_slot_callback() → task_ring_buffer::wait_callback
- compute_dependencies(task & tsk) → void
- default_policy_set() → static constexpr policy_set
- generate_horizon_task() → task_id
- get_execution_front() → const std::unordered_set<task *> &
- get_first_in_flight_epoch() const → task_id
- get_max_pseudo_critical_path_length() const → int
- invoke_callbacks(const task * tsk) const → void
- need_new_horizon() const → bool
- print_buffer_debug_label(buffer_id bid) const → std::string
- reduce_execution_front(task_ring_buffer::reservation && reserve, std::unique_ptr<task> new_front) → task &
- register_task_internal(task_ring_buffer::reservation && reserve, std::unique_ptr<task> task) → task &
- set_epoch_for_new_tasks(task_id epoch) → void
Member Functions
¶void await_epoch(task_id epoch)
void await_epoch(task_id epoch)
Description
Blocks until an epoch task has executed on this node (or all nodes, if the epoch_for_new_tasks was created with epoch_action::barrier
).
Parameters
- task_id epoch
¶const task* find_task(task_id tid) const
const task* find_task(task_id tid) const
Description
Returns the specified task if it still exists, nullptr otherwise.
Parameters
- task_id tid
¶task_id generate_epoch_task(epoch_action action)
task_id generate_epoch_task(epoch_action action)
Description
Inserts an epoch task that depends on the entire execution front and that immediately becomes the current epoch_for_new_tasks and the last writer for all buffers.
Parameters
- epoch_action action
¶task_id generate_fence_task(
buffer_access_map access_map,
side_effect_map side_effects,
std::unique_ptr<fence_promise> fence_promise)
task_id generate_fence_task(
buffer_access_map access_map,
side_effect_map side_effects,
std::unique_ptr<fence_promise> fence_promise)
Parameters
- buffer_access_map access_map
- side_effect_map side_effects
- std::unique_ptr<fence_promise> fence_promise
¶size_t get_current_task_count() const
size_t get_current_task_count() const
Description
Returns the number of tasks currently being managed by the task_manager.
¶const task* get_task(task_id tid) const
const task* get_task(task_id tid) const
Description
Asserts that the specified task exists and returns a non-null pointer to the task object.
Parameters
- task_id tid
¶size_t get_total_task_count() const
size_t get_total_task_count() const
Description
Returns the number of tasks created during the lifetime of the task_manager, including tasks that have already been deleted.
¶bool has_task(task_id tid) const
bool has_task(task_id tid) const
Description
Checks whether a task has already been registered with the queue.
This is useful for scenarios where the master node sends out commands concerning tasks that have not yet been registered through the local execution of the user program.
Parameters
- task_id tid
¶void notify_buffer_created(buffer_id bid,
const range<3>& range,
bool host_initialized)
void notify_buffer_created(buffer_id bid,
const range<3>& range,
bool host_initialized)
Description
Adds a new buffer for dependency tracking
Parameters
¶void notify_buffer_debug_name_changed(
buffer_id bid,
const std::string& name)
void notify_buffer_debug_name_changed(
buffer_id bid,
const std::string& name)
Parameters
- buffer_id bid
- const std::string& name
¶void notify_buffer_destroyed(buffer_id bid)
void notify_buffer_destroyed(buffer_id bid)
Parameters
- buffer_id bid
¶void notify_epoch_reached(task_id epoch_tid)
void notify_epoch_reached(task_id epoch_tid)
Description
Notifies the task manager that the given epoch has been executed on this node.
notify_horizon_reached and notify_epoch_reached must only ever be called from a single thread, but that thread does not have to be the main thread.
Parameters
- task_id epoch_tid
¶void notify_horizon_reached(task_id horizon_tid)
void notify_horizon_reached(task_id horizon_tid)
Description
Notifies the task manager that the given horizon has been executed (used for task deletion).
notify_horizon_reached and notify_epoch_reached must only ever be called from a single thread, but that thread does not have to be the main thread.
Parameters
- task_id horizon_tid
¶void notify_host_object_created(
host_object_id hoid)
void notify_host_object_created(
host_object_id hoid)
Parameters
- host_object_id hoid
¶void notify_host_object_destroyed(
host_object_id hoid)
void notify_host_object_destroyed(
host_object_id hoid)
Parameters
- host_object_id hoid
¶void register_task_callback(task_callback cb)
void register_task_callback(task_callback cb)
Description
Registers a new callback that will be called whenever a new task is created.
Parameters
- task_callback cb
¶void set_horizon_max_parallelism(int para)
void set_horizon_max_parallelism(int para)
Parameters
- int para
¶void set_horizon_step(int step)
void set_horizon_step(int step)
Parameters
- int step
¶template <typename CGF>
task_id submit_command_group(CGF cgf)
template <typename CGF>
task_id submit_command_group(CGF cgf)
Template Parameters
- CGF
Parameters
- CGF cgf
¶task_manager(size_t num_collective_nodes,
detail::task_recorder* recorder,
const policy_set& policy =
default_policy_set())
task_manager(size_t num_collective_nodes,
detail::task_recorder* recorder,
const policy_set& policy =
default_policy_set())
Parameters
- size_t num_collective_nodes
- detail::task_recorder* recorder
- const policy_set& policy = default_policy_set()
¶virtual ~task_manager()
virtual ~task_manager()
¶void add_dependency(task& depender,
task& dependee,
dependency_kind kind,
dependency_origin origin)
void add_dependency(task& depender,
task& dependee,
dependency_kind kind,
dependency_origin origin)
Parameters
- task& depender
- task& dependee
- dependency_kind kind
- dependency_origin origin
¶task_ring_buffer::wait_callback
await_free_task_slot_callback()
task_ring_buffer::wait_callback
await_free_task_slot_callback()
¶void compute_dependencies(task& tsk)
void compute_dependencies(task& tsk)
Parameters
- task& tsk
¶static constexpr policy_set default_policy_set()
static constexpr policy_set default_policy_set()
¶task_id generate_horizon_task()
task_id generate_horizon_task()
¶const std::unordered_set<task*>&
get_execution_front()
const std::unordered_set<task*>&
get_execution_front()
¶task_id get_first_in_flight_epoch() const
task_id get_first_in_flight_epoch() const
¶int get_max_pseudo_critical_path_length() const
int get_max_pseudo_critical_path_length() const
¶void invoke_callbacks(const task* tsk) const
void invoke_callbacks(const task* tsk) const
Parameters
- const task* tsk
¶bool need_new_horizon() const
bool need_new_horizon() const
¶std::string print_buffer_debug_label(
buffer_id bid) const
std::string print_buffer_debug_label(
buffer_id bid) const
Parameters
- buffer_id bid
¶task& reduce_execution_front(
task_ring_buffer::reservation&& reserve,
std::unique_ptr<task> new_front)
task& reduce_execution_front(
task_ring_buffer::reservation&& reserve,
std::unique_ptr<task> new_front)
Parameters
- task_ring_buffer::reservation&& reserve
- std::unique_ptr<task> new_front
¶task& register_task_internal(
task_ring_buffer::reservation&& reserve,
std::unique_ptr<task> task)
task& register_task_internal(
task_ring_buffer::reservation&& reserve,
std::unique_ptr<task> task)
Parameters
- task_ring_buffer::reservation&& reserve
- std::unique_ptr<task> task
¶void set_epoch_for_new_tasks(task_id epoch)
void set_epoch_for_new_tasks(task_id epoch)
Parameters
- task_id epoch