ttg 1.0.0-alpha
Template Task Graph (TTG): flowgraph-based programming model for high-performance distributed-memory algorithms
Loading...
Searching...
No Matches
task.h
Go to the documentation of this file.
1#ifndef TTG_PARSEC_TASK_H
2#define TTG_PARSEC_TASK_H
3
5
6#include <parsec/parsec_internal.h>
7#include <parsec/mca/device/device_gpu.h>
8
9namespace ttg_parsec {
10
11 namespace detail {
12
13 struct device_ptr_t {
14 parsec_gpu_task_t* gpu_task = nullptr;
15 parsec_flow_t* flows = nullptr;
16 parsec_gpu_exec_stream_t* stream = nullptr;
17 parsec_device_gpu_module_t* device = nullptr;
18 parsec_task_class_t task_class; // copy of the taskclass
19 };
20
21 template<bool SupportDevice>
23 {
24 static constexpr bool support_device = false;
25 static constexpr size_t num_flows = 0;
28 static constexpr device_ptr_t* dev_ptr() {
29 return nullptr;
30 }
31 };
32
33 template<>
34 struct device_state_t<true> {
35 static constexpr bool support_device = false;
36 static constexpr size_t num_flows = MAX_PARAM_COUNT;
37 parsec_flow_t m_flows[num_flows];
38 device_ptr_t m_dev_ptr = {nullptr, &m_flows[0], nullptr, nullptr}; // gpu_task will be allocated in each task
40 return &m_dev_ptr;
41 }
42 };
43
44 enum class ttg_parsec_data_flags : uint8_t {
45 NONE = 0,
46 SINGLE_READER = 1 << 0,
47 MULTIPLE_READER = 1 << 1,
48 SINGLE_WRITER = 1 << 2,
49 MULTIPLE_WRITER = 1 << 3,
50 IS_MODIFIED = 1 << 4,
51 MARKED_PUSHOUT = 1 << 5
52 };
53
54 inline
56 using flags_type = std::underlying_type<ttg_parsec_data_flags>::type;
57 return ttg_parsec_data_flags(static_cast<flags_type>(lhs) | static_cast<flags_type>(rhs));
58 }
59
60 inline
62 using flags_type = std::underlying_type<ttg_parsec_data_flags>::type;
63 lhs = ttg_parsec_data_flags(static_cast<flags_type>(lhs) | static_cast<flags_type>(rhs));
64 return lhs;
65 }
66
67 inline
69 using flags_type = std::underlying_type<ttg_parsec_data_flags>::type;
70 return static_cast<flags_type>(lhs) & static_cast<flags_type>(rhs);
71 }
72
73 inline
75 using flags_type = std::underlying_type<ttg_parsec_data_flags>::type;
76 lhs = ttg_parsec_data_flags(static_cast<flags_type>(lhs) & static_cast<flags_type>(rhs));
77 return lhs;
78 }
79
80 inline
82 using flags_type = std::underlying_type<ttg_parsec_data_flags>::type;
83 return lhs == ttg_parsec_data_flags::NONE;
84 }
85
86
87 typedef parsec_hook_return_t (*parsec_static_op_t)(void *); // static_op will be cast to this type
88
90 parsec_task_t parsec_task;
91 int32_t in_data_count = 0; //< number of satisfied inputs
92 int32_t data_count = 0; //< number of data elements in the copies array
93 ttg_data_copy_t **copies; //< pointer to the fixed copies array of the derived task
94 parsec_hash_table_item_t tt_ht_item = {};
95
97 std::size_t goal;
98 std::size_t size;
99 parsec_lifo_t reduce_copies;
100 std::atomic<std::size_t> reduce_count;
101 };
102
103 protected:
104 template<std::size_t i = 0, typename TT>
105 void init_stream_info_impl(TT *tt, std::array<stream_info_t, TT::numins>& streams) {
106 if constexpr (TT::numins > i) {
107 if (std::get<i>(tt->input_reducers)) {
108 streams[i].goal = tt->static_stream_goal[i];
109 streams[i].size = 0;
110 PARSEC_OBJ_CONSTRUCT(&streams[i].reduce_copies, parsec_lifo_t);
111 streams[i].reduce_count.store(0, std::memory_order_relaxed);
112 }
113 /* recursion */
114 if constexpr((i + 1) < TT::numins) {
115 init_stream_info_impl<i+1>(tt, streams);
116 }
117 }
118 }
119
120 template<typename TT>
121 void init_stream_info(TT *tt, std::array<stream_info_t, TT::numins>& streams) {
122 init_stream_info_impl<0>(tt, streams);
123 }
124
125 public:
127 /* Poor-mans virtual function
128 * We cannot use virtual inheritance or private visibility because we
129 * need offsetof for the mempool and scheduling.
130 */
133 bool remove_from_hash = true;
134 bool dummy = false;
135 bool defer_writer = TTG_PARSEC_DEFER_WRITER; // whether to defer writer instead of creating a new copy
136 ttg_parsec_data_flags data_flags; // HACKY: flags set by prepare_send and reset by the copy_handler
137
138 /*
139 virtual void release_task() = 0;
140 */
141 //public:
143 release_task_cb(this);
144 }
145
146 protected:
152 parsec_ttg_task_base_t(parsec_thread_mempool_t *mempool, parsec_task_class_t *task_class,
156 , copies(copies)
158 PARSEC_OBJ_CONSTRUCT(&parsec_task, parsec_task_t);
159 PARSEC_LIST_ITEM_SINGLETON(&parsec_task.super);
160 parsec_task.mempool_owner = mempool;
161 parsec_task.task_class = task_class;
162 parsec_task.priority = 0;
163
164 // TODO: can we avoid this?
165 for (int i = 0; i < MAX_PARAM_COUNT; ++i) {
166 this->parsec_task.data[i].data_in = nullptr;
167 this->parsec_task.data[i].data_out = nullptr;
168 }
169 }
170
171 parsec_ttg_task_base_t(parsec_thread_mempool_t *mempool, parsec_task_class_t *task_class,
172 parsec_taskpool_t *taskpool, int32_t priority,
174 release_task_fn *release_fn,
177 , copies(copies)
178 , release_task_cb(release_fn)
180 PARSEC_OBJ_CONSTRUCT(&parsec_task, parsec_task_t);
181 PARSEC_LIST_ITEM_SINGLETON(&parsec_task.super);
182 parsec_task.mempool_owner = mempool;
183 parsec_task.task_class = task_class;
184 parsec_task.status = PARSEC_TASK_STATUS_HOOK;
185 parsec_task.taskpool = taskpool;
186 parsec_task.priority = priority;
187 parsec_task.chore_mask = 1<<0;
188
189 // TODO: can we avoid this?
190 for (int i = 0; i < MAX_PARAM_COUNT; ++i) {
191 this->parsec_task.data[i].data_in = nullptr;
192 this->parsec_task.data[i].data_out = nullptr;
193 }
194 }
195
196 public:
197 void set_dummy(bool d) { dummy = d; }
198 bool is_dummy() { return dummy; }
199 };
200
201 template <typename TT, bool KeyIsVoid = ttg::meta::is_void_v<typename TT::key_type>>
203 using key_type = typename TT::key_type;
204 static constexpr size_t num_streams = TT::numins;
205 /* device tasks may have to store more copies than # of its inputs as their sends are aggregated */
206 static constexpr size_t num_copies = TT::derived_has_device_op() ? static_cast<size_t>(MAX_PARAM_COUNT)
207 : (num_streams+1);
208 TT* tt = nullptr;
210 std::array<stream_info_t, num_streams> streams;
211#ifdef TTG_HAVE_COROUTINE
212 void* suspended_task_address = nullptr; // if not null the function is suspended
214#endif
216 ttg_data_copy_t *copies[num_copies] = { nullptr }; // the data copies tracked by this task
217
218 parsec_ttg_task_t(parsec_thread_mempool_t *mempool, parsec_task_class_t *task_class, TT *tt_ptr)
219 : parsec_ttg_task_base_t(mempool, task_class, num_streams, copies)
220 , tt(tt_ptr) {
221 tt_ht_item.key = pkey();
222 this->dev_ptr = this->dev_state.dev_ptr();
223 // We store the hash of the key and the address where it can be found in locals considered as a scratchpad
224 *(uintptr_t*)&(parsec_task.locals[0]) = 0; //there is no key
225 *(uintptr_t*)&(parsec_task.locals[2]) = 0; //there is no key
226 }
227
228 parsec_ttg_task_t(const key_type& key, parsec_thread_mempool_t *mempool,
229 parsec_task_class_t *task_class, parsec_taskpool_t *taskpool,
230 TT *tt_ptr, int32_t priority)
231 : parsec_ttg_task_base_t(mempool, task_class, taskpool, priority,
233 &release_task, tt_ptr->m_defer_writer)
234 , tt(tt_ptr), key(key) {
235 tt_ht_item.key = pkey();
236 this->dev_ptr = this->dev_state.dev_ptr();
237
238 // We store the hash of the key and the address where it can be found in locals considered as a scratchpad
239 uint64_t hv = ttg::hash<std::decay_t<decltype(key)>>{}(key);
240 *(uintptr_t*)&(parsec_task.locals[0]) = hv;
241 *(uintptr_t*)&(parsec_task.locals[2]) = reinterpret_cast<uintptr_t>(&this->key);
242
244 }
245
246 static void release_task(parsec_ttg_task_base_t* task_base) {
247 parsec_ttg_task_t *task = static_cast<parsec_ttg_task_t*>(task_base);
248 TT *tt = task->tt;
249 tt->release_task(task);
250 }
251
252 template<ttg::ExecutionSpace Space>
253 parsec_hook_return_t invoke_op() {
254 if constexpr (Space == ttg::ExecutionSpace::Host) {
255 return TT::static_op(&this->parsec_task);
256 } else {
257 return TT::device_static_op(&this->parsec_task);
258 }
259 }
260
261 template<ttg::ExecutionSpace Space>
262 parsec_hook_return_t invoke_evaluate() {
263 if constexpr (Space == ttg::ExecutionSpace::Host) {
264 return PARSEC_HOOK_RETURN_DONE;
265 } else {
267 }
268 }
269
270 parsec_key_t pkey() { return reinterpret_cast<parsec_key_t>(&key); }
271 };
272
273 template <typename TT>
275 static constexpr size_t num_streams = TT::numins;
276 TT* tt = nullptr;
277 std::array<stream_info_t, num_streams> streams;
278#ifdef TTG_HAVE_COROUTINE
279 void* suspended_task_address = nullptr; // if not null the function is suspended
281#endif
283 ttg_data_copy_t *copies[num_streams+1] = { nullptr }; // the data copies tracked by this task
284 // +1 for the copy needed during send/bcast
285
286 parsec_ttg_task_t(parsec_thread_mempool_t *mempool, parsec_task_class_t *task_class, TT *tt_ptr)
287 : parsec_ttg_task_base_t(mempool, task_class, num_streams, copies)
288 , tt(tt_ptr) {
289 tt_ht_item.key = pkey();
290 this->dev_ptr = this->dev_state.dev_ptr();
291 }
292
293 parsec_ttg_task_t(parsec_thread_mempool_t *mempool, parsec_task_class_t *task_class,
294 parsec_taskpool_t *taskpool, TT *tt_ptr, int32_t priority)
295 : parsec_ttg_task_base_t(mempool, task_class, taskpool, priority,
297 &release_task, tt_ptr->m_defer_writer)
298 , tt(tt_ptr) {
299 tt_ht_item.key = pkey();
300 this->dev_ptr = this->dev_state.dev_ptr();
302 }
303
304 static void release_task(parsec_ttg_task_base_t* task_base) {
305 parsec_ttg_task_t *task = static_cast<parsec_ttg_task_t*>(task_base);
306 TT *tt = task->tt;
307 tt->release_task(task);
308 }
309
310 template<ttg::ExecutionSpace Space>
311 parsec_hook_return_t invoke_op() {
312 if constexpr (Space == ttg::ExecutionSpace::Host) {
313 return TT::static_op(&this->parsec_task);
314 } else {
315 return TT::device_static_op(&this->parsec_task);
316 }
317 }
318
319 template<ttg::ExecutionSpace Space>
320 parsec_hook_return_t invoke_evaluate() {
321 if constexpr (Space == ttg::ExecutionSpace::Host) {
322 return PARSEC_HOOK_RETURN_DONE;
323 } else {
325 }
326 }
327
328 parsec_key_t pkey() { return 0; }
329 };
330
331
342
343 reducer_task_t(parsec_ttg_task_base_t* task, parsec_thread_mempool_t *mempool,
344 parsec_task_class_t *task_class, parsec_taskpool_t *taskpool,
345 int32_t priority, bool is_first)
346 : parsec_ttg_task_base_t(mempool, task_class, taskpool, priority,
347 0, nullptr,
349 true /* deferred until other readers have completed */)
350 , parent_task(task)
352 {
353 /* store the first 4 integers from the parent task (needed for profiling) */
354 for (int i = 0; i < 4; ++i) {
355 parsec_task.locals[i] = task->parsec_task.locals[i];
356 }
357 }
358
359 static void release_task(parsec_ttg_task_base_t* task_base) {
360 /* reducer tasks have one mutable input so the task can be submitted on the first release */
361 parsec_task_t *vp_task_rings[1] = { &task_base->parsec_task };
362 parsec_execution_stream_t *es = parsec_my_execution_stream();
363 __parsec_schedule_vp(es, vp_task_rings, 0);
364 }
365 };
366
367 } // namespace detail
368
369} // namespace ttg_parsec
370
371#endif // TTG_PARSEC_TASK_H
void release_task(task_t *task, parsec_task_t **task_ring=nullptr)
Definition ttg.h:2724
static constexpr bool derived_has_device_op()
Definition ttg.h:1261
static resultT get(InTuple &&intuple)
Definition ttg.h:1292
keyT key_type
Definition ttg.h:1272
uint8_t operator&(ttg_parsec_data_flags lhs, ttg_parsec_data_flags rhs)
Definition task.h:68
parsec_hook_return_t(* parsec_static_op_t)(void *)
Definition task.h:87
ttg_parsec_data_flags operator|=(ttg_parsec_data_flags &lhs, ttg_parsec_data_flags rhs)
Definition task.h:61
ttg_parsec_data_flags operator&=(ttg_parsec_data_flags &lhs, ttg_parsec_data_flags rhs)
Definition task.h:74
bool operator!(ttg_parsec_data_flags lhs)
Definition task.h:81
ttg_parsec_data_flags operator|(ttg_parsec_data_flags lhs, ttg_parsec_data_flags rhs)
Definition task.h:55
this contains PaRSEC-based TTG functionality
Definition fwd.h:18
TaskCoroutineID
Definition coroutine.h:222
@ Invalid
not a coroutine, i.e. a standard task function, -> void
Computes hash values for objects of type T.
Definition hash.h:81
parsec_gpu_task_t * gpu_task
Definition task.h:14
parsec_task_class_t task_class
Definition task.h:18
parsec_gpu_exec_stream_t * stream
Definition task.h:16
parsec_flow_t * flows
Definition task.h:15
parsec_device_gpu_module_t * device
Definition task.h:17
static constexpr device_ptr_t * dev_ptr()
Definition task.h:28
static constexpr size_t num_flows
Definition task.h:25
static constexpr bool support_device
Definition task.h:24
parsec_ttg_task_base_t(parsec_thread_mempool_t *mempool, parsec_task_class_t *task_class, int data_count, ttg_data_copy_t **copies, bool defer_writer=TTG_PARSEC_DEFER_WRITER)
Definition task.h:152
void() release_task_fn(parsec_ttg_task_base_t *)
Definition task.h:126
void init_stream_info(TT *tt, std::array< stream_info_t, TT::numins > &streams)
Definition task.h:121
void init_stream_info_impl(TT *tt, std::array< stream_info_t, TT::numins > &streams)
Definition task.h:105
ttg_parsec_data_flags data_flags
Definition task.h:136
parsec_hash_table_item_t tt_ht_item
Definition task.h:94
parsec_ttg_task_base_t(parsec_thread_mempool_t *mempool, parsec_task_class_t *task_class, parsec_taskpool_t *taskpool, int32_t priority, int data_count, ttg_data_copy_t **copies, release_task_fn *release_fn, bool defer_writer=TTG_PARSEC_DEFER_WRITER)
Definition task.h:171
parsec_ttg_task_t(parsec_thread_mempool_t *mempool, parsec_task_class_t *task_class, parsec_taskpool_t *taskpool, TT *tt_ptr, int32_t priority)
Definition task.h:293
parsec_ttg_task_t(parsec_thread_mempool_t *mempool, parsec_task_class_t *task_class, TT *tt_ptr)
Definition task.h:286
device_state_t< TT::derived_has_device_op()> dev_state
Definition task.h:282
std::array< stream_info_t, num_streams > streams
Definition task.h:277
static void release_task(parsec_ttg_task_base_t *task_base)
Definition task.h:304
static constexpr size_t num_copies
Definition task.h:206
parsec_hook_return_t invoke_op()
Definition task.h:253
parsec_ttg_task_t(parsec_thread_mempool_t *mempool, parsec_task_class_t *task_class, TT *tt_ptr)
Definition task.h:218
std::array< stream_info_t, num_streams > streams
Definition task.h:210
typename TT::key_type key_type
Definition task.h:203
parsec_ttg_task_t(const key_type &key, parsec_thread_mempool_t *mempool, parsec_task_class_t *task_class, parsec_taskpool_t *taskpool, TT *tt_ptr, int32_t priority)
Definition task.h:228
static constexpr size_t num_streams
Definition task.h:204
parsec_hook_return_t invoke_evaluate()
Definition task.h:262
device_state_t< TT::derived_has_device_op()> dev_state
Definition task.h:215
static void release_task(parsec_ttg_task_base_t *task_base)
Definition task.h:246
ttg_data_copy_t * copies[num_copies]
Definition task.h:216
parsec_ttg_task_base_t * parent_task
Definition task.h:340
reducer_task_t(parsec_ttg_task_base_t *task, parsec_thread_mempool_t *mempool, parsec_task_class_t *task_class, parsec_taskpool_t *taskpool, int32_t priority, bool is_first)
Definition task.h:343
static void release_task(parsec_ttg_task_base_t *task_base)
Definition task.h:359
#define TTG_PARSEC_DEFER_WRITER
Definition ttg.h:13