task.h
Go to the documentation of this file.
1 #ifndef TTG_PARSEC_TASK_H
2 #define TTG_PARSEC_TASK_H
3 
5 
6 #include <parsec/parsec_internal.h>
7 #include <parsec/mca/device/device_gpu.h>
8 
9 namespace ttg_parsec {
10 
11  namespace detail {
12 
13  struct device_ptr_t {
14  parsec_gpu_task_t* gpu_task = nullptr;
15  parsec_flow_t* flows = nullptr;
16  parsec_gpu_exec_stream_t* stream = nullptr;
17  parsec_device_gpu_module_t* device = nullptr;
18  parsec_task_class_t task_class; // copy of the taskclass
19  };
20 
21  template<bool SupportDevice>
23  {
24  static constexpr bool support_device = false;
25  static constexpr size_t num_flows = 0;
27  { }
28  static constexpr device_ptr_t* dev_ptr() {
29  return nullptr;
30  }
31  };
32 
33  template<>
34  struct device_state_t<true> {
35  static constexpr bool support_device = false;
36  static constexpr size_t num_flows = MAX_PARAM_COUNT;
37  parsec_flow_t m_flows[num_flows];
38  device_ptr_t m_dev_ptr = {nullptr, &m_flows[0], nullptr, nullptr}; // gpu_task will be allocated in each task
40  return &m_dev_ptr;
41  }
42  };
43 
44  enum class ttg_parsec_data_flags : uint8_t {
45  NONE = 0,
46  SINGLE_READER = 1 << 0,
47  MULTIPLE_READER = 1 << 1,
48  SINGLE_WRITER = 1 << 2,
49  MULTIPLE_WRITER = 1 << 3,
50  IS_MODIFIED = 1 << 4,
51  MARKED_PUSHOUT = 1 << 5
52  };
53 
54  inline
56  using flags_type = std::underlying_type<ttg_parsec_data_flags>::type;
57  return ttg_parsec_data_flags(static_cast<flags_type>(lhs) | static_cast<flags_type>(rhs));
58  }
59 
60  inline
62  using flags_type = std::underlying_type<ttg_parsec_data_flags>::type;
63  lhs = ttg_parsec_data_flags(static_cast<flags_type>(lhs) | static_cast<flags_type>(rhs));
64  return lhs;
65  }
66 
67  inline
69  using flags_type = std::underlying_type<ttg_parsec_data_flags>::type;
70  return static_cast<flags_type>(lhs) & static_cast<flags_type>(rhs);
71  }
72 
73  inline
75  using flags_type = std::underlying_type<ttg_parsec_data_flags>::type;
76  lhs = ttg_parsec_data_flags(static_cast<flags_type>(lhs) & static_cast<flags_type>(rhs));
77  return lhs;
78  }
79 
80  inline
82  using flags_type = std::underlying_type<ttg_parsec_data_flags>::type;
83  return lhs == ttg_parsec_data_flags::NONE;
84  }
85 
86 
87  typedef parsec_hook_return_t (*parsec_static_op_t)(void *); // static_op will be cast to this type
88 
90  parsec_task_t parsec_task;
91  int32_t in_data_count = 0; //< number of satisfied inputs
92  int32_t data_count = 0; //< number of data elements in the copies array
93  ttg_data_copy_t **copies; //< pointer to the fixed copies array of the derived task
94  parsec_hash_table_item_t tt_ht_item = {};
95 
96  struct stream_info_t {
97  std::size_t goal;
98  std::size_t size;
99  parsec_lifo_t reduce_copies;
100  std::atomic<std::size_t> reduce_count;
101  };
102 
103  protected:
104  template<std::size_t i = 0, typename TT>
105  void init_stream_info_impl(TT *tt, std::array<stream_info_t, TT::numins>& streams) {
106  if constexpr (TT::numins > i) {
107  if (std::get<i>(tt->input_reducers)) {
108  streams[i].goal = tt->static_stream_goal[i];
109  streams[i].size = 0;
110  PARSEC_OBJ_CONSTRUCT(&streams[i].reduce_copies, parsec_lifo_t);
111  streams[i].reduce_count.store(0, std::memory_order_relaxed);
112  }
113  /* recursion */
114  if constexpr((i + 1) < TT::numins) {
115  init_stream_info_impl<i+1>(tt, streams);
116  }
117  }
118  }
119 
120  template<typename TT>
121  void init_stream_info(TT *tt, std::array<stream_info_t, TT::numins>& streams) {
122  init_stream_info_impl<0>(tt, streams);
123  }
124 
125  public:
127  /* Poor-mans virtual function
128  * We cannot use virtual inheritance or private visibility because we
129  * need offsetof for the mempool and scheduling.
130  */
132  device_ptr_t* dev_ptr = nullptr;
133  bool remove_from_hash = true;
134  bool dummy = false;
135  bool defer_writer = TTG_PARSEC_DEFER_WRITER; // whether to defer writer instead of creating a new copy
136  ttg_parsec_data_flags data_flags; // HACKY: flags set by prepare_send and reset by the copy_handler
137 
138  /*
139  virtual void release_task() = 0;
140  */
141  //public:
142  void release_task() {
143  release_task_cb(this);
144  }
145 
146  protected:
152  parsec_ttg_task_base_t(parsec_thread_mempool_t *mempool, parsec_task_class_t *task_class,
156  , copies(copies)
158  PARSEC_OBJ_CONSTRUCT(&parsec_task, parsec_task_t);
159  PARSEC_LIST_ITEM_SINGLETON(&parsec_task.super);
160  parsec_task.mempool_owner = mempool;
161  parsec_task.task_class = task_class;
162  parsec_task.priority = 0;
163 
164  // TODO: can we avoid this?
165  for (int i = 0; i < MAX_PARAM_COUNT; ++i) { this->parsec_task.data[i].data_in = nullptr; }
166  }
167 
168  parsec_ttg_task_base_t(parsec_thread_mempool_t *mempool, parsec_task_class_t *task_class,
169  parsec_taskpool_t *taskpool, int32_t priority,
171  release_task_fn *release_fn,
174  , copies(copies)
175  , release_task_cb(release_fn)
177  PARSEC_OBJ_CONSTRUCT(&parsec_task, parsec_task_t);
178  PARSEC_LIST_ITEM_SINGLETON(&parsec_task.super);
179  parsec_task.mempool_owner = mempool;
180  parsec_task.task_class = task_class;
181  parsec_task.status = PARSEC_TASK_STATUS_HOOK;
182  parsec_task.taskpool = taskpool;
183  parsec_task.priority = priority;
184  parsec_task.chore_mask = 1<<0;
185 
186  // TODO: can we avoid this?
187  for (int i = 0; i < MAX_PARAM_COUNT; ++i) { this->parsec_task.data[i].data_in = nullptr; }
188  }
189 
190  public:
191  void set_dummy(bool d) { dummy = d; }
192  bool is_dummy() { return dummy; }
193  };
194 
195  template <typename TT, bool KeyIsVoid = ttg::meta::is_void_v<typename TT::key_type>>
197  using key_type = typename TT::key_type;
198  static constexpr size_t num_streams = TT::numins;
199  /* device tasks may have to store more copies than it's inputs as their sends are aggregated */
200  static constexpr size_t num_copies = TT::derived_has_device_op() ? static_cast<size_t>(MAX_PARAM_COUNT)
201  : (num_streams+1);
202  TT* tt = nullptr;
204  std::array<stream_info_t, num_streams> streams;
205 #ifdef TTG_HAVE_COROUTINE
206  void* suspended_task_address = nullptr; // if not null the function is suspended
208 #endif
210  ttg_data_copy_t *copies[num_copies] = { nullptr }; // the data copies tracked by this task
211 
212  parsec_ttg_task_t(parsec_thread_mempool_t *mempool, parsec_task_class_t *task_class)
213  : parsec_ttg_task_base_t(mempool, task_class, num_streams, copies) {
214  tt_ht_item.key = pkey();
215  this->dev_ptr = this->dev_state.dev_ptr();
216  // We store the hash of the key and the address where it can be found in locals considered as a scratchpad
217  *(uintptr_t*)&(parsec_task.locals[0]) = 0; //there is no key
218  *(uintptr_t*)&(parsec_task.locals[2]) = 0; //there is no key
219  }
220 
221  parsec_ttg_task_t(const key_type& key, parsec_thread_mempool_t *mempool,
222  parsec_task_class_t *task_class, parsec_taskpool_t *taskpool,
223  TT *tt_ptr, int32_t priority)
224  : parsec_ttg_task_base_t(mempool, task_class, taskpool, priority,
226  &release_task, tt_ptr->m_defer_writer)
227  , tt(tt_ptr), key(key) {
228  tt_ht_item.key = pkey();
229  this->dev_ptr = this->dev_state.dev_ptr();
230 
231  // We store the hash of the key and the address where it can be found in locals considered as a scratchpad
232  uint64_t hv = ttg::hash<std::decay_t<decltype(key)>>{}(key);
233  *(uintptr_t*)&(parsec_task.locals[0]) = hv;
234  *(uintptr_t*)&(parsec_task.locals[2]) = reinterpret_cast<uintptr_t>(&this->key);
235 
237  }
238 
239  static void release_task(parsec_ttg_task_base_t* task_base) {
240  parsec_ttg_task_t *task = static_cast<parsec_ttg_task_t*>(task_base);
241  TT *tt = task->tt;
242  tt->release_task(task);
243  }
244 
245  template<ttg::ExecutionSpace Space>
246  parsec_hook_return_t invoke_op() {
247  if constexpr (Space == ttg::ExecutionSpace::Host) {
248  return TT::template static_op<Space>(&this->parsec_task);
249  } else {
250  return TT::template device_static_op<Space>(&this->parsec_task);
251  }
252  }
253 
254  template<ttg::ExecutionSpace Space>
255  parsec_hook_return_t invoke_evaluate() {
256  if constexpr (Space == ttg::ExecutionSpace::Host) {
257  return PARSEC_HOOK_RETURN_DONE;
258  } else {
259  return TT::template device_static_evaluate<Space>(&this->parsec_task);
260  }
261  }
262 
263  parsec_key_t pkey() { return reinterpret_cast<parsec_key_t>(&key); }
264  };
265 
266  template <typename TT>
267  struct parsec_ttg_task_t<TT, true> : public parsec_ttg_task_base_t {
268  static constexpr size_t num_streams = TT::numins;
269  TT* tt = nullptr;
270  std::array<stream_info_t, num_streams> streams;
271 #ifdef TTG_HAVE_COROUTINE
272  void* suspended_task_address = nullptr; // if not null the function is suspended
274 #endif
276  ttg_data_copy_t *copies[num_streams+1] = { nullptr }; // the data copies tracked by this task
277  // +1 for the copy needed during send/bcast
278 
279  parsec_ttg_task_t(parsec_thread_mempool_t *mempool, parsec_task_class_t *task_class)
280  : parsec_ttg_task_base_t(mempool, task_class, num_streams, copies) {
281  tt_ht_item.key = pkey();
282  this->dev_ptr = this->dev_state.dev_ptr();
283  }
284 
285  parsec_ttg_task_t(parsec_thread_mempool_t *mempool, parsec_task_class_t *task_class,
286  parsec_taskpool_t *taskpool, TT *tt_ptr, int32_t priority)
287  : parsec_ttg_task_base_t(mempool, task_class, taskpool, priority,
289  &release_task, tt_ptr->m_defer_writer)
290  , tt(tt_ptr) {
291  tt_ht_item.key = pkey();
292  this->dev_ptr = this->dev_state.dev_ptr();
294  }
295 
296  static void release_task(parsec_ttg_task_base_t* task_base) {
297  parsec_ttg_task_t *task = static_cast<parsec_ttg_task_t*>(task_base);
298  TT *tt = task->tt;
299  tt->release_task(task);
300  }
301 
302  template<ttg::ExecutionSpace Space>
303  parsec_hook_return_t invoke_op() {
304  if constexpr (Space == ttg::ExecutionSpace::Host) {
305  return TT::template static_op<Space>(&this->parsec_task);
306  } else {
307  return TT::template device_static_op<Space>(&this->parsec_task);
308  }
309  }
310 
311  parsec_key_t pkey() { return 0; }
312  };
313 
314 
324  bool is_first;
325 
326  reducer_task_t(parsec_ttg_task_base_t* task, parsec_thread_mempool_t *mempool,
327  parsec_task_class_t *task_class, parsec_taskpool_t *taskpool,
328  int32_t priority, bool is_first)
329  : parsec_ttg_task_base_t(mempool, task_class, taskpool, priority,
330  0, nullptr,
331  &release_task,
332  true /* deferred until other readers have completed */)
333  , parent_task(task)
334  , is_first(is_first)
335  {
336  /* store the first 4 integers from the parent task (needed for profiling) */
337  for (int i = 0; i < 4; ++i) {
338  parsec_task.locals[i] = task->parsec_task.locals[i];
339  }
340  }
341 
342  static void release_task(parsec_ttg_task_base_t* task_base) {
343  /* reducer tasks have one mutable input so the task can be submitted on the first release */
344  parsec_task_t *vp_task_rings[1] = { &task_base->parsec_task };
345  parsec_execution_stream_t *es = parsec_my_execution_stream();
346  __parsec_schedule_vp(es, vp_task_rings, 0);
347  }
348  };
349 
350  } // namespace detail
351 
352 } // namespace ttg_parsec
353 
354 #endif // TTG_PARSEC_TASK_H
keyT key_type
Definition: ttg.h:1226
void release_task(task_t *task, parsec_task_t **task_ring=nullptr)
Definition: ttg.h:2573
static constexpr bool derived_has_device_op()
Definition: ttg.h:1221
uint8_t operator&(ttg_parsec_data_flags lhs, ttg_parsec_data_flags rhs)
Definition: task.h:68
parsec_hook_return_t(* parsec_static_op_t)(void *)
Definition: task.h:87
ttg_parsec_data_flags operator|=(ttg_parsec_data_flags &lhs, ttg_parsec_data_flags rhs)
Definition: task.h:61
ttg_parsec_data_flags operator&=(ttg_parsec_data_flags &lhs, ttg_parsec_data_flags rhs)
Definition: task.h:74
bool operator!(ttg_parsec_data_flags lhs)
Definition: task.h:81
ttg_parsec_data_flags operator|(ttg_parsec_data_flags lhs, ttg_parsec_data_flags rhs)
Definition: task.h:55
this contains PaRSEC-based TTG functionality
Definition: fwd.h:18
TaskCoroutineID
Definition: coroutine.h:222
@ Invalid
not a coroutine, i.e. a standard task function, -> void
Computes hash values for objects of type T.
Definition: hash.h:81
parsec_gpu_task_t * gpu_task
Definition: task.h:14
parsec_task_class_t task_class
Definition: task.h:18
parsec_gpu_exec_stream_t * stream
Definition: task.h:16
parsec_flow_t * flows
Definition: task.h:15
parsec_device_gpu_module_t * device
Definition: task.h:17
static constexpr size_t num_flows
Definition: task.h:25
static constexpr bool support_device
Definition: task.h:24
static constexpr device_ptr_t * dev_ptr()
Definition: task.h:28
parsec_ttg_task_base_t(parsec_thread_mempool_t *mempool, parsec_task_class_t *task_class, int data_count, ttg_data_copy_t **copies, bool defer_writer=TTG_PARSEC_DEFER_WRITER)
Definition: task.h:152
void() release_task_fn(parsec_ttg_task_base_t *)
Definition: task.h:126
void init_stream_info(TT *tt, std::array< stream_info_t, TT::numins > &streams)
Definition: task.h:121
void init_stream_info_impl(TT *tt, std::array< stream_info_t, TT::numins > &streams)
Definition: task.h:105
ttg_parsec_data_flags data_flags
Definition: task.h:136
parsec_hash_table_item_t tt_ht_item
Definition: task.h:94
parsec_ttg_task_base_t(parsec_thread_mempool_t *mempool, parsec_task_class_t *task_class, parsec_taskpool_t *taskpool, int32_t priority, int data_count, ttg_data_copy_t **copies, release_task_fn *release_fn, bool defer_writer=TTG_PARSEC_DEFER_WRITER)
Definition: task.h:168
parsec_ttg_task_t(parsec_thread_mempool_t *mempool, parsec_task_class_t *task_class)
Definition: task.h:279
parsec_ttg_task_t(parsec_thread_mempool_t *mempool, parsec_task_class_t *task_class, parsec_taskpool_t *taskpool, TT *tt_ptr, int32_t priority)
Definition: task.h:285
device_state_t< TT::derived_has_device_op()> dev_state
Definition: task.h:275
std::array< stream_info_t, num_streams > streams
Definition: task.h:270
static void release_task(parsec_ttg_task_base_t *task_base)
Definition: task.h:296
static constexpr size_t num_copies
Definition: task.h:200
parsec_hook_return_t invoke_op()
Definition: task.h:246
std::array< stream_info_t, num_streams > streams
Definition: task.h:204
typename TT::key_type key_type
Definition: task.h:197
parsec_ttg_task_t(const key_type &key, parsec_thread_mempool_t *mempool, parsec_task_class_t *task_class, parsec_taskpool_t *taskpool, TT *tt_ptr, int32_t priority)
Definition: task.h:221
parsec_ttg_task_t(parsec_thread_mempool_t *mempool, parsec_task_class_t *task_class)
Definition: task.h:212
static constexpr size_t num_streams
Definition: task.h:198
parsec_hook_return_t invoke_evaluate()
Definition: task.h:255
device_state_t< TT::derived_has_device_op()> dev_state
Definition: task.h:209
static void release_task(parsec_ttg_task_base_t *task_base)
Definition: task.h:239
ttg_data_copy_t * copies[num_copies]
Definition: task.h:210
parsec_ttg_task_base_t * parent_task
Definition: task.h:323
reducer_task_t(parsec_ttg_task_base_t *task, parsec_thread_mempool_t *mempool, parsec_task_class_t *task_class, parsec_taskpool_t *taskpool, int32_t priority, bool is_first)
Definition: task.h:326
static void release_task(parsec_ttg_task_base_t *task_base)
Definition: task.h:342
#define TTG_PARSEC_DEFER_WRITER
Definition: ttg.h:13