ttg 1.0.0
Template Task Graph (TTG): flowgraph-based programming model for high-performance distributed-memory algorithms
Loading...
Searching...
No Matches
task.h
Go to the documentation of this file.
1// SPDX-License-Identifier: BSD-3-Clause
2#ifndef TTG_PARSEC_TASK_H
3#define TTG_PARSEC_TASK_H
4
6
7#include <parsec/parsec_internal.h>
8#include <parsec/mca/device/device_gpu.h>
9
10namespace ttg_parsec {
11
12 namespace detail {
13
14 struct device_ptr_t {
15 parsec_gpu_task_t* gpu_task = nullptr;
16 parsec_flow_t* flows = nullptr;
17 parsec_gpu_exec_stream_t* stream = nullptr;
18 parsec_device_gpu_module_t* device = nullptr;
19 parsec_task_class_t task_class; // copy of the taskclass
20 };
21
22 template<bool SupportDevice>
24 {
25 static constexpr bool support_device = false;
26 static constexpr size_t num_flows = 0;
29 static constexpr device_ptr_t* dev_ptr() {
30 return nullptr;
31 }
32 };
33
34 template<>
35 struct device_state_t<true> {
36 static constexpr bool support_device = false;
37 static constexpr size_t num_flows = MAX_PARAM_COUNT;
38 parsec_flow_t m_flows[num_flows];
39 device_ptr_t m_dev_ptr = {nullptr, &m_flows[0], nullptr, nullptr}; // gpu_task will be allocated in each task
41 return &m_dev_ptr;
42 }
43 };
44
45 enum class ttg_parsec_data_flags : uint8_t {
46 NONE = 0,
47 SINGLE_READER = 1 << 0,
48 MULTIPLE_READER = 1 << 1,
49 SINGLE_WRITER = 1 << 2,
50 MULTIPLE_WRITER = 1 << 3,
51 IS_MODIFIED = 1 << 4,
52 MARKED_PUSHOUT = 1 << 5
53 };
54
55 inline
57 using flags_type = std::underlying_type<ttg_parsec_data_flags>::type;
58 return ttg_parsec_data_flags(static_cast<flags_type>(lhs) | static_cast<flags_type>(rhs));
59 }
60
61 inline
63 using flags_type = std::underlying_type<ttg_parsec_data_flags>::type;
64 lhs = ttg_parsec_data_flags(static_cast<flags_type>(lhs) | static_cast<flags_type>(rhs));
65 return lhs;
66 }
67
68 inline
70 using flags_type = std::underlying_type<ttg_parsec_data_flags>::type;
71 return static_cast<flags_type>(lhs) & static_cast<flags_type>(rhs);
72 }
73
74 inline
76 using flags_type = std::underlying_type<ttg_parsec_data_flags>::type;
77 lhs = ttg_parsec_data_flags(static_cast<flags_type>(lhs) & static_cast<flags_type>(rhs));
78 return lhs;
79 }
80
81 inline
83 using flags_type = std::underlying_type<ttg_parsec_data_flags>::type;
84 return lhs == ttg_parsec_data_flags::NONE;
85 }
86
87
88 typedef parsec_hook_return_t (*parsec_static_op_t)(void *); // static_op will be cast to this type
89
91 parsec_task_t parsec_task;
92 int32_t in_data_count = 0; //< number of satisfied inputs
93 int32_t data_count = 0; //< number of data elements in the copies array
94 ttg_data_copy_t **copies; //< pointer to the fixed copies array of the derived task
95 parsec_hash_table_item_t tt_ht_item = {};
96
98 std::size_t goal;
99 std::size_t size;
100 parsec_lifo_t reduce_copies;
101 std::atomic<std::size_t> reduce_count;
102 };
103
104 protected:
105 template<std::size_t i = 0, typename TT>
106 void init_stream_info_impl(TT *tt, std::array<stream_info_t, TT::numins>& streams) {
107 if constexpr (TT::numins > i) {
108 if (std::get<i>(tt->input_reducers)) {
109 streams[i].goal = tt->static_stream_goal[i];
110 streams[i].size = 0;
111 PARSEC_OBJ_CONSTRUCT(&streams[i].reduce_copies, parsec_lifo_t);
112 streams[i].reduce_count.store(0, std::memory_order_relaxed);
113 }
114 /* recursion */
115 if constexpr((i + 1) < TT::numins) {
116 init_stream_info_impl<i+1>(tt, streams);
117 }
118 }
119 }
120
121 template<typename TT>
122 void init_stream_info(TT *tt, std::array<stream_info_t, TT::numins>& streams) {
123 init_stream_info_impl<0>(tt, streams);
124 }
125
126 public:
128 /* Poor-mans virtual function
129 * We cannot use virtual inheritance or private visibility because we
130 * need offsetof for the mempool and scheduling.
131 */
134 bool remove_from_hash = true;
135 bool dummy = false;
136 bool defer_writer = TTG_PARSEC_DEFER_WRITER; // whether to defer writer instead of creating a new copy
137 ttg_parsec_data_flags data_flags; // HACKY: flags set by prepare_send and reset by the copy_handler
138
139 /*
140 virtual void release_task() = 0;
141 */
142 //public:
144 release_task_cb(this);
145 }
146
147 protected:
153 parsec_ttg_task_base_t(parsec_thread_mempool_t *mempool, parsec_task_class_t *task_class,
157 , copies(copies)
159 PARSEC_OBJ_CONSTRUCT(&parsec_task, parsec_task_t);
160 PARSEC_LIST_ITEM_SINGLETON(&parsec_task.super);
161 parsec_task.mempool_owner = mempool;
162 parsec_task.task_class = task_class;
163 parsec_task.priority = 0;
164
165 // TODO: can we avoid this?
166 for (int i = 0; i < MAX_PARAM_COUNT; ++i) {
167 this->parsec_task.data[i].data_in = nullptr;
168 this->parsec_task.data[i].data_out = nullptr;
169 }
170 }
171
172 parsec_ttg_task_base_t(parsec_thread_mempool_t *mempool, parsec_task_class_t *task_class,
173 parsec_taskpool_t *taskpool, int32_t priority,
175 release_task_fn *release_fn,
178 , copies(copies)
179 , release_task_cb(release_fn)
181 PARSEC_OBJ_CONSTRUCT(&parsec_task, parsec_task_t);
182 PARSEC_LIST_ITEM_SINGLETON(&parsec_task.super);
183 parsec_task.mempool_owner = mempool;
184 parsec_task.task_class = task_class;
185 parsec_task.status = PARSEC_TASK_STATUS_HOOK;
186 parsec_task.taskpool = taskpool;
187 parsec_task.priority = priority;
188 parsec_task.chore_mask = 1<<0;
189
190 // TODO: can we avoid this?
191 for (int i = 0; i < MAX_PARAM_COUNT; ++i) {
192 this->parsec_task.data[i].data_in = nullptr;
193 this->parsec_task.data[i].data_out = nullptr;
194 }
195 }
196
197 public:
198 void set_dummy(bool d) { dummy = d; }
199 bool is_dummy() { return dummy; }
200 };
201
202 template <typename TT, bool KeyIsVoid = ttg::meta::is_void_v<typename TT::key_type>>
204 using key_type = typename TT::key_type;
205 static constexpr size_t num_streams = TT::numins;
206 /* device tasks may have to store more copies than # of its inputs as their sends are aggregated */
207 static constexpr size_t num_copies = TT::derived_has_device_op() ? static_cast<size_t>(MAX_PARAM_COUNT)
208 : (num_streams+1);
209 TT* tt = nullptr;
211 std::array<stream_info_t, num_streams> streams;
212#ifdef TTG_HAVE_COROUTINE
213 void* suspended_task_address = nullptr; // if not null the function is suspended
215#endif
217 ttg_data_copy_t *copies[num_copies] = { nullptr }; // the data copies tracked by this task
218
219 parsec_ttg_task_t(parsec_thread_mempool_t *mempool, parsec_task_class_t *task_class, TT *tt_ptr)
220 : parsec_ttg_task_base_t(mempool, task_class, num_streams, copies)
221 , tt(tt_ptr) {
222 tt_ht_item.key = pkey();
223 this->dev_ptr = this->dev_state.dev_ptr();
224 // We store the hash of the key and the address where it can be found in locals considered as a scratchpad
225 *(uintptr_t*)&(parsec_task.locals[0]) = 0; //there is no key
226 *(uintptr_t*)&(parsec_task.locals[2]) = 0; //there is no key
227 }
228
229 parsec_ttg_task_t(const key_type& key, parsec_thread_mempool_t *mempool,
230 parsec_task_class_t *task_class, parsec_taskpool_t *taskpool,
231 TT *tt_ptr, int32_t priority)
232 : parsec_ttg_task_base_t(mempool, task_class, taskpool, priority,
234 &release_task, tt_ptr->m_defer_writer)
235 , tt(tt_ptr), key(key) {
236 tt_ht_item.key = pkey();
237 this->dev_ptr = this->dev_state.dev_ptr();
238
239 // We store the hash of the key and the address where it can be found in locals considered as a scratchpad
240 uint64_t hv = ttg::hash<std::decay_t<decltype(key)>>{}(key);
241 *(uintptr_t*)&(parsec_task.locals[0]) = hv;
242 *(uintptr_t*)&(parsec_task.locals[2]) = reinterpret_cast<uintptr_t>(&this->key);
243
245 }
246
247 static void release_task(parsec_ttg_task_base_t* task_base) {
248 parsec_ttg_task_t *task = static_cast<parsec_ttg_task_t*>(task_base);
249 TT *tt = task->tt;
250 tt->release_task(task);
251 }
252
253 template<ttg::ExecutionSpace Space>
254 parsec_hook_return_t invoke_op() {
255 if constexpr (Space == ttg::ExecutionSpace::Host) {
256 return TT::static_op(&this->parsec_task);
257 } else {
258 return TT::device_static_op(&this->parsec_task);
259 }
260 }
261
262 template<ttg::ExecutionSpace Space>
263 parsec_hook_return_t invoke_evaluate() {
264 if constexpr (Space == ttg::ExecutionSpace::Host) {
265 return PARSEC_HOOK_RETURN_DONE;
266 } else {
268 }
269 }
270
271 parsec_key_t pkey() { return reinterpret_cast<parsec_key_t>(&key); }
272 };
273
274 template <typename TT>
276 static constexpr size_t num_streams = TT::numins;
277 TT* tt = nullptr;
278 std::array<stream_info_t, num_streams> streams;
279#ifdef TTG_HAVE_COROUTINE
280 void* suspended_task_address = nullptr; // if not null the function is suspended
282#endif
284 ttg_data_copy_t *copies[num_streams+1] = { nullptr }; // the data copies tracked by this task
285 // +1 for the copy needed during send/bcast
286
287 parsec_ttg_task_t(parsec_thread_mempool_t *mempool, parsec_task_class_t *task_class, TT *tt_ptr)
288 : parsec_ttg_task_base_t(mempool, task_class, num_streams, copies)
289 , tt(tt_ptr) {
290 tt_ht_item.key = pkey();
291 this->dev_ptr = this->dev_state.dev_ptr();
292 }
293
294 parsec_ttg_task_t(parsec_thread_mempool_t *mempool, parsec_task_class_t *task_class,
295 parsec_taskpool_t *taskpool, TT *tt_ptr, int32_t priority)
296 : parsec_ttg_task_base_t(mempool, task_class, taskpool, priority,
298 &release_task, tt_ptr->m_defer_writer)
299 , tt(tt_ptr) {
300 tt_ht_item.key = pkey();
301 this->dev_ptr = this->dev_state.dev_ptr();
303 }
304
305 static void release_task(parsec_ttg_task_base_t* task_base) {
306 parsec_ttg_task_t *task = static_cast<parsec_ttg_task_t*>(task_base);
307 TT *tt = task->tt;
308 tt->release_task(task);
309 }
310
311 template<ttg::ExecutionSpace Space>
312 parsec_hook_return_t invoke_op() {
313 if constexpr (Space == ttg::ExecutionSpace::Host) {
314 return TT::static_op(&this->parsec_task);
315 } else {
316 return TT::device_static_op(&this->parsec_task);
317 }
318 }
319
320 template<ttg::ExecutionSpace Space>
321 parsec_hook_return_t invoke_evaluate() {
322 if constexpr (Space == ttg::ExecutionSpace::Host) {
323 return PARSEC_HOOK_RETURN_DONE;
324 } else {
326 }
327 }
328
329 parsec_key_t pkey() { return 0; }
330 };
331
332
343
344 reducer_task_t(parsec_ttg_task_base_t* task, parsec_thread_mempool_t *mempool,
345 parsec_task_class_t *task_class, parsec_taskpool_t *taskpool,
346 int32_t priority, bool is_first)
347 : parsec_ttg_task_base_t(mempool, task_class, taskpool, priority,
348 0, nullptr,
350 true /* deferred until other readers have completed */)
351 , parent_task(task)
353 {
354 /* store the first 4 integers from the parent task (needed for profiling) */
355 for (int i = 0; i < 4; ++i) {
356 parsec_task.locals[i] = task->parsec_task.locals[i];
357 }
358 }
359
360 static void release_task(parsec_ttg_task_base_t* task_base) {
361 /* reducer tasks have one mutable input so the task can be submitted on the first release */
362 parsec_task_t *vp_task_rings[1] = { &task_base->parsec_task };
363 parsec_execution_stream_t *es = parsec_my_execution_stream();
364 __parsec_schedule_vp(es, vp_task_rings, 0);
365 }
366 };
367
368 } // namespace detail
369
370} // namespace ttg_parsec
371
372#endif // TTG_PARSEC_TASK_H
void release_task(task_t *task, parsec_task_t **task_ring=nullptr)
Definition ttg.h:2725
static constexpr bool derived_has_device_op()
Definition ttg.h:1262
static resultT get(InTuple &&intuple)
Definition ttg.h:1293
keyT key_type
Definition ttg.h:1273
uint8_t operator&(ttg_parsec_data_flags lhs, ttg_parsec_data_flags rhs)
Definition task.h:69
parsec_hook_return_t(* parsec_static_op_t)(void *)
Definition task.h:88
ttg_parsec_data_flags operator|=(ttg_parsec_data_flags &lhs, ttg_parsec_data_flags rhs)
Definition task.h:62
ttg_parsec_data_flags operator&=(ttg_parsec_data_flags &lhs, ttg_parsec_data_flags rhs)
Definition task.h:75
bool operator!(ttg_parsec_data_flags lhs)
Definition task.h:82
ttg_parsec_data_flags operator|(ttg_parsec_data_flags lhs, ttg_parsec_data_flags rhs)
Definition task.h:56
this contains PaRSEC-based TTG functionality
Definition fwd.h:19
TaskCoroutineID
Definition coroutine.h:223
@ Invalid
not a coroutine, i.e. a standard task function, -> void
Computes hash values for objects of type T.
Definition hash.h:82
parsec_gpu_task_t * gpu_task
Definition task.h:15
parsec_task_class_t task_class
Definition task.h:19
parsec_gpu_exec_stream_t * stream
Definition task.h:17
parsec_flow_t * flows
Definition task.h:16
parsec_device_gpu_module_t * device
Definition task.h:18
static constexpr device_ptr_t * dev_ptr()
Definition task.h:29
static constexpr size_t num_flows
Definition task.h:26
static constexpr bool support_device
Definition task.h:25
parsec_ttg_task_base_t(parsec_thread_mempool_t *mempool, parsec_task_class_t *task_class, int data_count, ttg_data_copy_t **copies, bool defer_writer=TTG_PARSEC_DEFER_WRITER)
Definition task.h:153
void() release_task_fn(parsec_ttg_task_base_t *)
Definition task.h:127
void init_stream_info(TT *tt, std::array< stream_info_t, TT::numins > &streams)
Definition task.h:122
void init_stream_info_impl(TT *tt, std::array< stream_info_t, TT::numins > &streams)
Definition task.h:106
ttg_parsec_data_flags data_flags
Definition task.h:137
parsec_hash_table_item_t tt_ht_item
Definition task.h:95
parsec_ttg_task_base_t(parsec_thread_mempool_t *mempool, parsec_task_class_t *task_class, parsec_taskpool_t *taskpool, int32_t priority, int data_count, ttg_data_copy_t **copies, release_task_fn *release_fn, bool defer_writer=TTG_PARSEC_DEFER_WRITER)
Definition task.h:172
parsec_ttg_task_t(parsec_thread_mempool_t *mempool, parsec_task_class_t *task_class, parsec_taskpool_t *taskpool, TT *tt_ptr, int32_t priority)
Definition task.h:294
parsec_ttg_task_t(parsec_thread_mempool_t *mempool, parsec_task_class_t *task_class, TT *tt_ptr)
Definition task.h:287
device_state_t< TT::derived_has_device_op()> dev_state
Definition task.h:283
std::array< stream_info_t, num_streams > streams
Definition task.h:278
static void release_task(parsec_ttg_task_base_t *task_base)
Definition task.h:305
static constexpr size_t num_copies
Definition task.h:207
parsec_hook_return_t invoke_op()
Definition task.h:254
parsec_ttg_task_t(parsec_thread_mempool_t *mempool, parsec_task_class_t *task_class, TT *tt_ptr)
Definition task.h:219
std::array< stream_info_t, num_streams > streams
Definition task.h:211
typename TT::key_type key_type
Definition task.h:204
parsec_ttg_task_t(const key_type &key, parsec_thread_mempool_t *mempool, parsec_task_class_t *task_class, parsec_taskpool_t *taskpool, TT *tt_ptr, int32_t priority)
Definition task.h:229
static constexpr size_t num_streams
Definition task.h:205
parsec_hook_return_t invoke_evaluate()
Definition task.h:263
device_state_t< TT::derived_has_device_op()> dev_state
Definition task.h:216
static void release_task(parsec_ttg_task_base_t *task_base)
Definition task.h:247
ttg_data_copy_t * copies[num_copies]
Definition task.h:217
parsec_ttg_task_base_t * parent_task
Definition task.h:341
reducer_task_t(parsec_ttg_task_base_t *task, parsec_thread_mempool_t *mempool, parsec_task_class_t *task_class, parsec_taskpool_t *taskpool, int32_t priority, bool is_first)
Definition task.h:344
static void release_task(parsec_ttg_task_base_t *task_base)
Definition task.h:360
#define TTG_PARSEC_DEFER_WRITER
Definition ttg.h:14