task.h
Go to the documentation of this file.
1 #ifndef TTG_PARSEC_TASK_H
2 #define TTG_PARSEC_TASK_H
3 
5 
6 #include <parsec/parsec_internal.h>
7 #include <parsec/mca/device/device_gpu.h>
8 
9 namespace ttg_parsec {
10 
11  namespace detail {
12 
13  struct device_ptr_t {
14  parsec_gpu_task_t* gpu_task = nullptr;
15  parsec_flow_t* flows = nullptr;
16  parsec_gpu_exec_stream_t* stream = nullptr;
17  parsec_device_gpu_module_t* device = nullptr;
18  parsec_task_class_t task_class; // copy of the taskclass
19  };
20 
21  template<bool SupportDevice>
23  {
24  static constexpr bool support_device = false;
25  static constexpr size_t num_flows = 0;
27  { }
28  static constexpr device_ptr_t* dev_ptr() {
29  return nullptr;
30  }
31  };
32 
33  template<>
34  struct device_state_t<true> {
35  static constexpr bool support_device = false;
36  static constexpr size_t num_flows = MAX_PARAM_COUNT;
37  parsec_flow_t m_flows[num_flows];
38  device_ptr_t m_dev_ptr = {nullptr, &m_flows[0], nullptr, nullptr}; // gpu_task will be allocated in each task
40  return &m_dev_ptr;
41  }
42  };
43 
44  enum class ttg_parsec_data_flags : uint8_t {
45  NONE = 0,
46  SINGLE_READER = 1 << 0,
47  MULTIPLE_READER = 1 << 1,
48  SINGLE_WRITER = 1 << 2,
49  MULTIPLE_WRITER = 1 << 3,
50  IS_MODIFIED = 1 << 4,
51  MARKED_PUSHOUT = 1 << 5
52  };
53 
54  inline
56  using flags_type = std::underlying_type<ttg_parsec_data_flags>::type;
57  return ttg_parsec_data_flags(static_cast<flags_type>(lhs) | static_cast<flags_type>(rhs));
58  }
59 
60  inline
62  using flags_type = std::underlying_type<ttg_parsec_data_flags>::type;
63  lhs = ttg_parsec_data_flags(static_cast<flags_type>(lhs) | static_cast<flags_type>(rhs));
64  return lhs;
65  }
66 
67  inline
69  using flags_type = std::underlying_type<ttg_parsec_data_flags>::type;
70  return static_cast<flags_type>(lhs) & static_cast<flags_type>(rhs);
71  }
72 
73  inline
75  using flags_type = std::underlying_type<ttg_parsec_data_flags>::type;
76  lhs = ttg_parsec_data_flags(static_cast<flags_type>(lhs) & static_cast<flags_type>(rhs));
77  return lhs;
78  }
79 
80  inline
82  using flags_type = std::underlying_type<ttg_parsec_data_flags>::type;
83  return lhs == ttg_parsec_data_flags::NONE;
84  }
85 
86 
87  typedef parsec_hook_return_t (*parsec_static_op_t)(void *); // static_op will be cast to this type
88 
90  parsec_task_t parsec_task;
91  int32_t in_data_count = 0; //< number of satisfied inputs
92  int32_t data_count = 0; //< number of data elements in the copies array
93  ttg_data_copy_t **copies; //< pointer to the fixed copies array of the derived task
94  parsec_hash_table_item_t tt_ht_item = {};
95 
96  struct stream_info_t {
97  std::size_t goal;
98  std::size_t size;
99  parsec_lifo_t reduce_copies;
100  std::atomic<std::size_t> reduce_count;
101  };
102 
103  protected:
104  template<std::size_t i = 0, typename TT>
105  void init_stream_info_impl(TT *tt, std::array<stream_info_t, TT::numins>& streams) {
106  if constexpr (TT::numins > i) {
107  if (std::get<i>(tt->input_reducers)) {
108  streams[i].goal = tt->static_stream_goal[i];
109  streams[i].size = 0;
110  PARSEC_OBJ_CONSTRUCT(&streams[i].reduce_copies, parsec_lifo_t);
111  streams[i].reduce_count.store(0, std::memory_order_relaxed);
112  }
113  /* recursion */
114  if constexpr((i + 1) < TT::numins) {
115  init_stream_info_impl<i+1>(tt, streams);
116  }
117  }
118  }
119 
120  template<typename TT>
121  void init_stream_info(TT *tt, std::array<stream_info_t, TT::numins>& streams) {
122  init_stream_info_impl<0>(tt, streams);
123  }
124 
125  public:
127  /* Poor-mans virtual function
128  * We cannot use virtual inheritance or private visibility because we
129  * need offsetof for the mempool and scheduling.
130  */
132  device_ptr_t* dev_ptr = nullptr;
133  bool remove_from_hash = true;
134  bool dummy = false;
135  bool defer_writer = TTG_PARSEC_DEFER_WRITER; // whether to defer writer instead of creating a new copy
136  ttg_parsec_data_flags data_flags; // HACKY: flags set by prepare_send and reset by the copy_handler
137 
138  /*
139  virtual void release_task() = 0;
140  */
141  //public:
142  void release_task() {
143  release_task_cb(this);
144  }
145 
146  protected:
152  parsec_ttg_task_base_t(parsec_thread_mempool_t *mempool, parsec_task_class_t *task_class,
156  , copies(copies)
158  PARSEC_OBJ_CONSTRUCT(&parsec_task, parsec_task_t);
159  PARSEC_LIST_ITEM_SINGLETON(&parsec_task.super);
160  parsec_task.mempool_owner = mempool;
161  parsec_task.task_class = task_class;
162  parsec_task.priority = 0;
163 
164  // TODO: can we avoid this?
165  for (int i = 0; i < MAX_PARAM_COUNT; ++i) {
166  this->parsec_task.data[i].data_in = nullptr;
167  this->parsec_task.data[i].data_out = nullptr;
168  }
169  }
170 
171  parsec_ttg_task_base_t(parsec_thread_mempool_t *mempool, parsec_task_class_t *task_class,
172  parsec_taskpool_t *taskpool, int32_t priority,
174  release_task_fn *release_fn,
177  , copies(copies)
178  , release_task_cb(release_fn)
180  PARSEC_OBJ_CONSTRUCT(&parsec_task, parsec_task_t);
181  PARSEC_LIST_ITEM_SINGLETON(&parsec_task.super);
182  parsec_task.mempool_owner = mempool;
183  parsec_task.task_class = task_class;
184  parsec_task.status = PARSEC_TASK_STATUS_HOOK;
185  parsec_task.taskpool = taskpool;
186  parsec_task.priority = priority;
187  parsec_task.chore_mask = 1<<0;
188 
189  // TODO: can we avoid this?
190  for (int i = 0; i < MAX_PARAM_COUNT; ++i) {
191  this->parsec_task.data[i].data_in = nullptr;
192  this->parsec_task.data[i].data_out = nullptr;
193  }
194  }
195 
196  public:
197  void set_dummy(bool d) { dummy = d; }
198  bool is_dummy() { return dummy; }
199  };
200 
201  template <typename TT, bool KeyIsVoid = ttg::meta::is_void_v<typename TT::key_type>>
203  using key_type = typename TT::key_type;
204  static constexpr size_t num_streams = TT::numins;
205  /* device tasks may have to store more copies than # of its inputs as their sends are aggregated */
206  static constexpr size_t num_copies = TT::derived_has_device_op() ? static_cast<size_t>(MAX_PARAM_COUNT)
207  : (num_streams+1);
208  TT* tt = nullptr;
210  std::array<stream_info_t, num_streams> streams;
211 #ifdef TTG_HAVE_COROUTINE
212  void* suspended_task_address = nullptr; // if not null the function is suspended
214 #endif
216  ttg_data_copy_t *copies[num_copies] = { nullptr }; // the data copies tracked by this task
217 
218  parsec_ttg_task_t(parsec_thread_mempool_t *mempool, parsec_task_class_t *task_class, TT *tt_ptr)
219  : parsec_ttg_task_base_t(mempool, task_class, num_streams, copies)
220  , tt(tt_ptr) {
221  tt_ht_item.key = pkey();
222  this->dev_ptr = this->dev_state.dev_ptr();
223  // We store the hash of the key and the address where it can be found in locals considered as a scratchpad
224  *(uintptr_t*)&(parsec_task.locals[0]) = 0; //there is no key
225  *(uintptr_t*)&(parsec_task.locals[2]) = 0; //there is no key
226  }
227 
228  parsec_ttg_task_t(const key_type& key, parsec_thread_mempool_t *mempool,
229  parsec_task_class_t *task_class, parsec_taskpool_t *taskpool,
230  TT *tt_ptr, int32_t priority)
231  : parsec_ttg_task_base_t(mempool, task_class, taskpool, priority,
233  &release_task, tt_ptr->m_defer_writer)
234  , tt(tt_ptr), key(key) {
235  tt_ht_item.key = pkey();
236  this->dev_ptr = this->dev_state.dev_ptr();
237 
238  // We store the hash of the key and the address where it can be found in locals considered as a scratchpad
239  uint64_t hv = ttg::hash<std::decay_t<decltype(key)>>{}(key);
240  *(uintptr_t*)&(parsec_task.locals[0]) = hv;
241  *(uintptr_t*)&(parsec_task.locals[2]) = reinterpret_cast<uintptr_t>(&this->key);
242 
244  }
245 
246  static void release_task(parsec_ttg_task_base_t* task_base) {
247  parsec_ttg_task_t *task = static_cast<parsec_ttg_task_t*>(task_base);
248  TT *tt = task->tt;
249  tt->release_task(task);
250  }
251 
252  template<ttg::ExecutionSpace Space>
253  parsec_hook_return_t invoke_op() {
254  if constexpr (Space == ttg::ExecutionSpace::Host) {
255  return TT::static_op(&this->parsec_task);
256  } else {
257  return TT::device_static_op(&this->parsec_task);
258  }
259  }
260 
261  template<ttg::ExecutionSpace Space>
262  parsec_hook_return_t invoke_evaluate() {
263  if constexpr (Space == ttg::ExecutionSpace::Host) {
264  return PARSEC_HOOK_RETURN_DONE;
265  } else {
266  return TT::device_static_evaluate(&this->parsec_task);
267  }
268  }
269 
270  parsec_key_t pkey() { return reinterpret_cast<parsec_key_t>(&key); }
271  };
272 
273  template <typename TT>
274  struct parsec_ttg_task_t<TT, true> : public parsec_ttg_task_base_t {
275  static constexpr size_t num_streams = TT::numins;
276  TT* tt = nullptr;
277  std::array<stream_info_t, num_streams> streams;
278 #ifdef TTG_HAVE_COROUTINE
279  void* suspended_task_address = nullptr; // if not null the function is suspended
281 #endif
283  ttg_data_copy_t *copies[num_streams+1] = { nullptr }; // the data copies tracked by this task
284  // +1 for the copy needed during send/bcast
285 
286  parsec_ttg_task_t(parsec_thread_mempool_t *mempool, parsec_task_class_t *task_class, TT *tt_ptr)
287  : parsec_ttg_task_base_t(mempool, task_class, num_streams, copies)
288  , tt(tt_ptr) {
289  tt_ht_item.key = pkey();
290  this->dev_ptr = this->dev_state.dev_ptr();
291  }
292 
293  parsec_ttg_task_t(parsec_thread_mempool_t *mempool, parsec_task_class_t *task_class,
294  parsec_taskpool_t *taskpool, TT *tt_ptr, int32_t priority)
295  : parsec_ttg_task_base_t(mempool, task_class, taskpool, priority,
297  &release_task, tt_ptr->m_defer_writer)
298  , tt(tt_ptr) {
299  tt_ht_item.key = pkey();
300  this->dev_ptr = this->dev_state.dev_ptr();
302  }
303 
304  static void release_task(parsec_ttg_task_base_t* task_base) {
305  parsec_ttg_task_t *task = static_cast<parsec_ttg_task_t*>(task_base);
306  TT *tt = task->tt;
307  tt->release_task(task);
308  }
309 
310  template<ttg::ExecutionSpace Space>
311  parsec_hook_return_t invoke_op() {
312  if constexpr (Space == ttg::ExecutionSpace::Host) {
313  return TT::static_op(&this->parsec_task);
314  } else {
315  return TT::device_static_op(&this->parsec_task);
316  }
317  }
318 
319  template<ttg::ExecutionSpace Space>
320  parsec_hook_return_t invoke_evaluate() {
321  if constexpr (Space == ttg::ExecutionSpace::Host) {
322  return PARSEC_HOOK_RETURN_DONE;
323  } else {
324  return TT::device_static_evaluate(&this->parsec_task);
325  }
326  }
327 
328  parsec_key_t pkey() { return 0; }
329  };
330 
331 
341  bool is_first;
342 
343  reducer_task_t(parsec_ttg_task_base_t* task, parsec_thread_mempool_t *mempool,
344  parsec_task_class_t *task_class, parsec_taskpool_t *taskpool,
345  int32_t priority, bool is_first)
346  : parsec_ttg_task_base_t(mempool, task_class, taskpool, priority,
347  0, nullptr,
348  &release_task,
349  true /* deferred until other readers have completed */)
350  , parent_task(task)
351  , is_first(is_first)
352  {
353  /* store the first 4 integers from the parent task (needed for profiling) */
354  for (int i = 0; i < 4; ++i) {
355  parsec_task.locals[i] = task->parsec_task.locals[i];
356  }
357  }
358 
359  static void release_task(parsec_ttg_task_base_t* task_base) {
360  /* reducer tasks have one mutable input so the task can be submitted on the first release */
361  parsec_task_t *vp_task_rings[1] = { &task_base->parsec_task };
362  parsec_execution_stream_t *es = parsec_my_execution_stream();
363  __parsec_schedule_vp(es, vp_task_rings, 0);
364  }
365  };
366 
367  } // namespace detail
368 
369 } // namespace ttg_parsec
370 
371 #endif // TTG_PARSEC_TASK_H
void release_task(task_t *task, parsec_task_t **task_ring=nullptr)
Definition: ttg.h:2645
keyT key_type
Definition: ttg.h:1241
static constexpr bool derived_has_device_op()
Definition: ttg.h:1236
uint8_t operator&(ttg_parsec_data_flags lhs, ttg_parsec_data_flags rhs)
Definition: task.h:68
parsec_hook_return_t(* parsec_static_op_t)(void *)
Definition: task.h:87
ttg_parsec_data_flags operator|=(ttg_parsec_data_flags &lhs, ttg_parsec_data_flags rhs)
Definition: task.h:61
ttg_parsec_data_flags operator&=(ttg_parsec_data_flags &lhs, ttg_parsec_data_flags rhs)
Definition: task.h:74
bool operator!(ttg_parsec_data_flags lhs)
Definition: task.h:81
ttg_parsec_data_flags operator|(ttg_parsec_data_flags lhs, ttg_parsec_data_flags rhs)
Definition: task.h:55
this contains PaRSEC-based TTG functionality
Definition: fwd.h:18
TaskCoroutineID
Definition: coroutine.h:222
@ Invalid
not a coroutine, i.e. a standard task function, -> void
Computes hash values for objects of type T.
Definition: hash.h:81
parsec_gpu_task_t * gpu_task
Definition: task.h:14
parsec_task_class_t task_class
Definition: task.h:18
parsec_gpu_exec_stream_t * stream
Definition: task.h:16
parsec_flow_t * flows
Definition: task.h:15
parsec_device_gpu_module_t * device
Definition: task.h:17
static constexpr size_t num_flows
Definition: task.h:25
static constexpr bool support_device
Definition: task.h:24
static constexpr device_ptr_t * dev_ptr()
Definition: task.h:28
parsec_ttg_task_base_t(parsec_thread_mempool_t *mempool, parsec_task_class_t *task_class, int data_count, ttg_data_copy_t **copies, bool defer_writer=TTG_PARSEC_DEFER_WRITER)
Definition: task.h:152
void() release_task_fn(parsec_ttg_task_base_t *)
Definition: task.h:126
void init_stream_info(TT *tt, std::array< stream_info_t, TT::numins > &streams)
Definition: task.h:121
void init_stream_info_impl(TT *tt, std::array< stream_info_t, TT::numins > &streams)
Definition: task.h:105
ttg_parsec_data_flags data_flags
Definition: task.h:136
parsec_hash_table_item_t tt_ht_item
Definition: task.h:94
parsec_ttg_task_base_t(parsec_thread_mempool_t *mempool, parsec_task_class_t *task_class, parsec_taskpool_t *taskpool, int32_t priority, int data_count, ttg_data_copy_t **copies, release_task_fn *release_fn, bool defer_writer=TTG_PARSEC_DEFER_WRITER)
Definition: task.h:171
parsec_ttg_task_t(parsec_thread_mempool_t *mempool, parsec_task_class_t *task_class, parsec_taskpool_t *taskpool, TT *tt_ptr, int32_t priority)
Definition: task.h:293
parsec_ttg_task_t(parsec_thread_mempool_t *mempool, parsec_task_class_t *task_class, TT *tt_ptr)
Definition: task.h:286
device_state_t< TT::derived_has_device_op()> dev_state
Definition: task.h:282
std::array< stream_info_t, num_streams > streams
Definition: task.h:277
static void release_task(parsec_ttg_task_base_t *task_base)
Definition: task.h:304
static constexpr size_t num_copies
Definition: task.h:206
parsec_hook_return_t invoke_op()
Definition: task.h:253
parsec_ttg_task_t(parsec_thread_mempool_t *mempool, parsec_task_class_t *task_class, TT *tt_ptr)
Definition: task.h:218
std::array< stream_info_t, num_streams > streams
Definition: task.h:210
typename TT::key_type key_type
Definition: task.h:203
parsec_ttg_task_t(const key_type &key, parsec_thread_mempool_t *mempool, parsec_task_class_t *task_class, parsec_taskpool_t *taskpool, TT *tt_ptr, int32_t priority)
Definition: task.h:228
static constexpr size_t num_streams
Definition: task.h:204
parsec_hook_return_t invoke_evaluate()
Definition: task.h:262
device_state_t< TT::derived_has_device_op()> dev_state
Definition: task.h:215
static void release_task(parsec_ttg_task_base_t *task_base)
Definition: task.h:246
ttg_data_copy_t * copies[num_copies]
Definition: task.h:216
parsec_ttg_task_base_t * parent_task
Definition: task.h:340
reducer_task_t(parsec_ttg_task_base_t *task, parsec_thread_mempool_t *mempool, parsec_task_class_t *task_class, parsec_taskpool_t *taskpool, int32_t priority, bool is_first)
Definition: task.h:343
static void release_task(parsec_ttg_task_base_t *task_base)
Definition: task.h:359
#define TTG_PARSEC_DEFER_WRITER
Definition: ttg.h:13