ttg 1.0.0
Template Task Graph (TTG): flowgraph-based programming model for high-performance distributed-memory algorithms
Loading...
Searching...
No Matches
ttg.h
Go to the documentation of this file.
1// SPDX-License-Identifier: BSD-3-Clause
2// clang-format off
3#ifndef PARSEC_TTG_H_INCLUDED
4#define PARSEC_TTG_H_INCLUDED
5
6/* set up env if this header was included directly */
7#if !defined(TTG_IMPL_NAME)
8#define TTG_USE_PARSEC 1
9#endif // !defined(TTG_IMPL_NAME)
10
11/* Whether to defer a potential writer if there are readers.
12 * This may avoid extra copies in exchange for concurrency.
13 * This may cause deadlocks, so use with caution. */
14#define TTG_PARSEC_DEFER_WRITER false
15
16#include "ttg/config.h"
17
18#include "ttg/impl_selector.h"
19
20/* include ttg header to make symbols available in case this header is included directly */
21#include "../../ttg.h"
22
23#include "ttg/base/keymap.h"
24#include "ttg/base/tt.h"
25#include "ttg/base/world.h"
26#include "ttg/constraint.h"
27#include "ttg/edge.h"
28#include "ttg/execution.h"
29#include "ttg/func.h"
30#include "ttg/runtimes.h"
31#include "ttg/terminal.h"
32#include "ttg/tt.h"
33#include "ttg/util/env.h"
34#include "ttg/util/hash.h"
35#include "ttg/util/meta.h"
37#include "ttg/util/print.h"
38#include "ttg/util/scope_exit.h"
39#include "ttg/util/trace.h"
40#include "ttg/util/typelist.h"
41
43
44#include "ttg/parsec/fwd.h"
45
46#include "ttg/parsec/buffer.h"
50#include "ttg/parsec/ttvalue.h"
51#include "ttg/device/task.h"
53
54#include <algorithm>
55#include <array>
56#include <cassert>
57#include <cstring>
58#include <experimental/type_traits>
59#include <functional>
60#include <future>
61#include <iostream>
62#include <list>
63#include <map>
64#include <memory>
65#include <mutex>
66#include <numeric>
67#include <sstream>
68#include <string>
69#include <tuple>
70#include <vector>
71
72// needed for MPIX_CUDA_AWARE_SUPPORT
73#if defined(TTG_HAVE_MPI)
74#include <mpi.h>
75#if defined(TTG_HAVE_MPIEXT)
76#include <mpi-ext.h>
77#endif // TTG_HAVE_MPIEXT
78#endif // TTG_HAVE_MPI
79
80
81#include <parsec.h>
82#include <parsec/class/parsec_hash_table.h>
83#include <parsec/data_internal.h>
84#include <parsec/execution_stream.h>
85#include <parsec/interfaces/interface.h>
86#include <parsec/mca/device/device.h>
87#include <parsec/parsec_comm_engine.h>
88#include <parsec/parsec_internal.h>
89#include <parsec/scheduling.h>
90#include <parsec/remote_dep.h>
91#include <parsec/utils/mca_param.h>
92
93#ifdef PARSEC_HAVE_DEV_CUDA_SUPPORT
94#include <parsec/mca/device/cuda/device_cuda.h>
95#endif // PARSEC_HAVE_DEV_CUDA_SUPPORT
96#ifdef PARSEC_HAVE_DEV_HIP_SUPPORT
97#include <parsec/mca/device/hip/device_hip.h>
98#endif // PARSEC_HAVE_DEV_HIP_SUPPORT
99#ifdef PARSEC_HAVE_DEV_LEVEL_ZERO_SUPPORT
100#include <parsec/mca/device/level_zero/device_level_zero.h>
101#endif //PARSEC_HAVE_DEV_LEVEL_ZERO_SUPPORT
102
103#include <parsec/mca/device/device_gpu.h>
104#if defined(PARSEC_PROF_TRACE)
105#include <parsec/profiling.h>
106#undef PARSEC_TTG_PROFILE_BACKEND
107#if defined(PARSEC_PROF_GRAPHER)
108#include <parsec/parsec_prof_grapher.h>
109#endif
110#endif
111#include <cstdlib>
112#include <cstring>
113
114#if defined(TTG_PARSEC_DEBUG_TRACK_DATA_COPIES)
115#include <unordered_set>
116#endif
117
120#include "ttg/parsec/ptr.h"
121#include "ttg/parsec/task.h"
123
124#include "ttg/device/device.h"
125
126#undef TTG_PARSEC_DEBUG_TRACK_DATA_COPIES
127
128/* PaRSEC function declarations */
129extern "C" {
130void parsec_taskpool_termination_detected(parsec_taskpool_t *tp);
131int parsec_add_fetch_runtime_task(parsec_taskpool_t *tp, int tasks);
132}
133
134namespace ttg_parsec {
135 typedef void (*static_set_arg_fct_type)(void *, size_t, ttg::TTBase *);
136 typedef std::pair<static_set_arg_fct_type, ttg::TTBase *> static_set_arg_fct_call_t;
137 inline std::map<uint64_t, static_set_arg_fct_call_t> static_id_to_op_map;
138 inline std::mutex static_map_mutex;
139 typedef std::tuple<int, std::unique_ptr<std::byte[]>, size_t> static_set_arg_fct_arg_t;
140 inline std::multimap<uint64_t, static_set_arg_fct_arg_t> delayed_unpack_actions;
141
149 uint32_t taskpool_id = std::numeric_limits<uint32_t>::max();
150 uint64_t op_id = std::numeric_limits<uint64_t>::max();
151 std::size_t key_offset = 0;
153 std::int8_t num_iovecs = 0;
154 bool inline_data = false;
155 int32_t param_id = -1;
156 int num_keys = 0;
157 int sender = -1;
158
159 msg_header_t() = default;
160
161 msg_header_t(fn_id_t fid, uint32_t tid, uint64_t oid, int32_t pid, int sender, int nk)
162 : fn_id(fid)
163 , taskpool_id(tid)
164 , op_id(oid)
165 , param_id(pid)
166 , num_keys(nk)
167 , sender(sender)
168 { }
169 };
170
171 static void unregister_parsec_tags(void *_);
172
173 namespace detail {
174
175 constexpr const int PARSEC_TTG_MAX_AM_SIZE = 1 * 1024*1024;
176
177 struct msg_t {
179 static constexpr std::size_t max_payload_size = PARSEC_TTG_MAX_AM_SIZE - sizeof(msg_header_t);
180 unsigned char bytes[max_payload_size];
181
182 msg_t() = default;
183 msg_t(uint64_t tt_id,
184 uint32_t taskpool_id,
186 int32_t param_id,
187 int sender,
188 int num_keys = 1)
189 : tt_id(fn_id, taskpool_id, tt_id, param_id, sender, num_keys)
190 {}
191 };
192
194
195 static int static_unpack_msg(parsec_comm_engine_t *ce, uint64_t tag, void *data, long unsigned int size,
196 int src_rank, void *obj) {
197 static_set_arg_fct_type static_set_arg_fct;
198 parsec_taskpool_t *tp = NULL;
199 msg_header_t *msg = static_cast<msg_header_t *>(data);
200 uint64_t op_id = msg->op_id;
201 tp = parsec_taskpool_lookup(msg->taskpool_id);
202 assert(NULL != tp);
203 static_map_mutex.lock();
204 if (PARSEC_TERM_TP_NOT_READY != tp->tdm.module->taskpool_state(tp)) {
205 try {
206 auto op_pair = static_id_to_op_map.at(op_id);
207 static_map_mutex.unlock();
208 tp->tdm.module->incoming_message_start(tp, src_rank, NULL, NULL, 0, NULL);
209 static_set_arg_fct = op_pair.first;
210 static_set_arg_fct(data, size, op_pair.second);
211 tp->tdm.module->incoming_message_end(tp, NULL);
212 return 0;
213 } catch (const std::out_of_range &e)
214 { /* fall-through */ }
215 }
216 auto data_cpy = std::make_unique_for_overwrite<std::byte[]>(size);
217 memcpy(data_cpy.get(), data, size);
218 ttg::trace("ttg_parsec(", ttg_default_execution_context().rank(), ") Delaying delivery of message (", src_rank,
219 ", ", op_id, ", ", data_cpy, ", ", size, ")");
220 delayed_unpack_actions.insert(std::make_pair(op_id, std::make_tuple(src_rank, std::move(data_cpy), size)));
221 static_map_mutex.unlock();
222 return 1;
223 }
224
225 static int get_remote_complete_cb(parsec_comm_engine_t *ce, parsec_ce_tag_t tag, void *msg, size_t msg_size,
226 int src, void *cb_data);
227
228 inline bool &initialized_mpi() {
229 static bool im = false;
230 return im;
231 }
232
234
235 } // namespace detail
236
238 ttg::Edge<> m_ctl_edge;
239 bool _dag_profiling;
240 bool _task_profiling;
241 std::array<bool, static_cast<std::size_t>(ttg::ExecutionSpace::Invalid)>
242 mpi_space_support = {true, false, false};
243
244 int query_comm_size() {
245 int comm_size;
246 MPI_Comm_size(MPI_COMM_WORLD, &comm_size);
247 return comm_size;
248 }
249
250 int query_comm_rank() {
251 int comm_rank;
252 MPI_Comm_rank(MPI_COMM_WORLD, &comm_rank);
253 return comm_rank;
254 }
255
256 static void ttg_parsec_ce_up(parsec_comm_engine_t *comm_engine, void *user_data)
257 {
258 parsec_ce.tag_register(WorldImpl::parsec_ttg_tag(), &detail::static_unpack_msg, user_data, detail::PARSEC_TTG_MAX_AM_SIZE);
259 parsec_ce.tag_register(WorldImpl::parsec_ttg_rma_tag(), &detail::get_remote_complete_cb, user_data, 128);
260 }
261
262 static void ttg_parsec_ce_down(parsec_comm_engine_t *comm_engine, void *user_data)
263 {
264 parsec_ce.tag_unregister(WorldImpl::parsec_ttg_tag());
265 parsec_ce.tag_unregister(WorldImpl::parsec_ttg_rma_tag());
266 }
267
268 public:
269#if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND)
270 int parsec_ttg_profile_backend_set_arg_start, parsec_ttg_profile_backend_set_arg_end;
271 int parsec_ttg_profile_backend_bcast_arg_start, parsec_ttg_profile_backend_bcast_arg_end;
272 int parsec_ttg_profile_backend_allocate_datacopy, parsec_ttg_profile_backend_free_datacopy;
273#endif
274
275 WorldImpl(int *argc, char **argv[], int ncores, parsec_context_t *c = nullptr)
276 : WorldImplBase(query_comm_size(), query_comm_rank())
277 , ctx(c)
278 , own_ctx(c == nullptr)
279#if defined(PARSEC_PROF_TRACE)
280 , profiling_array(nullptr)
281 , profiling_array_size(0)
282#endif
283 , _dag_profiling(false)
284 , _task_profiling(false)
285 {
287 if (own_ctx) ctx = parsec_init(ncores, argc, argv);
288
289 /* query MPI device support */
291#if defined(MPIX_CUDA_AWARE_SUPPORT) && MPIX_CUDA_AWARE_SUPPORT
292 || MPIX_Query_cuda_support()
293#endif // MPIX_CUDA_AWARE_SUPPORT
294 ) {
295 mpi_space_support[static_cast<std::size_t>(ttg::ExecutionSpace::CUDA)] = true;
296 }
297
299#if defined(MPIX_HIP_AWARE_SUPPORT) && MPIX_HIP_AWARE_SUPPORT
300 || MPIX_Query_hip_support()
301#endif // MPIX_HIP_AWARE_SUPPORT
302 ) {
303 mpi_space_support[static_cast<std::size_t>(ttg::ExecutionSpace::HIP)] = true;
304 }
305
306#if defined(PARSEC_PROF_TRACE)
307 if(parsec_profile_enabled) {
308 profile_on();
309#if defined(PARSEC_TTG_PROFILE_BACKEND)
310 parsec_profiling_add_dictionary_keyword("PARSEC_TTG_SET_ARG_IMPL", "fill:000000", 0, NULL,
311 (int*)&parsec_ttg_profile_backend_set_arg_start,
312 (int*)&parsec_ttg_profile_backend_set_arg_end);
313 parsec_profiling_add_dictionary_keyword("PARSEC_TTG_BCAST_ARG_IMPL", "fill:000000", 0, NULL,
314 (int*)&parsec_ttg_profile_backend_bcast_arg_start,
315 (int*)&parsec_ttg_profile_backend_bcast_arg_end);
316 parsec_profiling_add_dictionary_keyword("PARSEC_TTG_DATACOPY", "fill:000000",
317 sizeof(size_t), "size{int64_t}",
318 (int*)&parsec_ttg_profile_backend_allocate_datacopy,
319 (int*)&parsec_ttg_profile_backend_free_datacopy);
320#endif
321 }
322#endif
323
324#ifdef PARSEC_PROF_GRAPHER
325 /* check if PaRSEC's dot tracing is enabled */
326 int dot_param_idx;
327 dot_param_idx = parsec_mca_param_find("profile", NULL, "dot");
328
329 if (dot_param_idx != PARSEC_ERROR) {
330 char *filename;
331 parsec_mca_param_lookup_string(dot_param_idx, &filename);
332 dag_on(filename);
333 }
334#endif // PARSEC_PROF_GRAPHER
335
336 if( NULL != parsec_ce.tag_register) {
337 parsec_ce.tag_register(WorldImpl::parsec_ttg_tag(), &detail::static_unpack_msg, this, detail::PARSEC_TTG_MAX_AM_SIZE);
338 parsec_ce.tag_register(WorldImpl::parsec_ttg_rma_tag(), &detail::get_remote_complete_cb, this, 128);
339 }
340
341 create_tpool();
342 }
343
344
345 auto *context() { return ctx; }
346 auto *execution_stream() { return parsec_my_execution_stream(); }
347 auto *taskpool() { return tpool; }
348
350 assert(nullptr == tpool);
351 tpool = PARSEC_OBJ_NEW(parsec_taskpool_t);
352 tpool->taskpool_id = std::numeric_limits<uint32_t>::max();
353 tpool->update_nb_runtime_task = parsec_add_fetch_runtime_task;
354 tpool->taskpool_type = PARSEC_TASKPOOL_TYPE_TTG;
355 tpool->taskpool_name = strdup("TTG Taskpool");
356 parsec_taskpool_reserve_id(tpool);
357
358 tpool->devices_index_mask = 0;
359 for(int i = 0; i < (int)parsec_nb_devices; i++) {
360 parsec_device_module_t *device = parsec_mca_device_get(i);
361 if( NULL == device ) continue;
362 tpool->devices_index_mask |= (1 << device->device_index);
363 }
364
365#ifdef TTG_USE_USER_TERMDET
366 parsec_termdet_open_module(tpool, "user_trigger");
367#else // TTG_USE_USER_TERMDET
368 parsec_termdet_open_dyn_module(tpool);
369#endif // TTG_USE_USER_TERMDET
370 tpool->tdm.module->monitor_taskpool(tpool, parsec_taskpool_termination_detected);
371 // In TTG, we use the pending actions to denote that the
372 // taskpool is not ready, i.e. some local tasks could still
373 // be added by the main thread. It should then be initialized
374 // to 0, execute will set it to 1 and mark the tpool as ready,
375 // and the fence() will decrease it back to 0.
376 tpool->tdm.module->taskpool_set_runtime_actions(tpool, 0);
377 parsec_taskpool_enable(tpool, NULL, NULL, execution_stream(), size() > 1);
378
379#if defined(PARSEC_PROF_TRACE)
380 tpool->profiling_array = profiling_array;
381#endif
382
383 // Termination detection in PaRSEC requires to synchronize the
384 // taskpool enabling, to avoid a race condition that would keep
385 // termination detection-related messages in a waiting queue
386 // forever
387 MPI_Barrier(comm());
388
389 parsec_taskpool_started = false;
390 }
391
392 /* Deleted copy ctor */
393 WorldImpl(const WorldImpl &other) = delete;
394
395 /* Deleted move ctor */
396 WorldImpl(WorldImpl &&other) = delete;
397
398 /* Deleted copy assignment */
399 WorldImpl &operator=(const WorldImpl &other) = delete;
400
401 /* Deleted move assignment */
402 WorldImpl &operator=(WorldImpl &&other) = delete;
403
405
406 static constexpr int parsec_ttg_tag() { return PARSEC_DSL_TTG_TAG; }
407 static constexpr int parsec_ttg_rma_tag() { return PARSEC_DSL_TTG_RMA_TAG; }
408
409 MPI_Comm comm() const { return MPI_COMM_WORLD; }
410
411 virtual void execute() override {
412 if (!parsec_taskpool_started) {
413 parsec_enqueue(ctx, tpool);
414 tpool->tdm.module->taskpool_addto_runtime_actions(tpool, 1);
415 tpool->tdm.module->taskpool_ready(tpool);
416 [[maybe_unused]] auto ret = parsec_context_start(ctx);
417 // ignore ret since all of its nonzero values are OK (e.g. -1 due to ctx already being active)
418 parsec_taskpool_started = true;
419 }
420 }
421
423#if defined(PARSEC_PROF_TRACE)
424 // We don't want to release the profiling array, as it should be persistent
425 // between fences() to allow defining a TT/TTG before a fence() and schedule
426 // it / complete it after a fence()
427 tpool->profiling_array = nullptr;
428#endif
429 assert(NULL != tpool->tdm.monitor);
430 tpool->tdm.module->unmonitor_taskpool(tpool);
431 parsec_taskpool_free(tpool);
432 tpool = nullptr;
433 }
434
435 virtual void destroy() override {
436 if (is_valid()) {
437 if (parsec_taskpool_started) {
438 // We are locally ready (i.e. we won't add new tasks)
439 tpool->tdm.module->taskpool_addto_runtime_actions(tpool, -1);
440 ttg::trace("ttg_parsec(", this->rank(), "): final waiting for completion");
441 if (own_ctx)
442 parsec_context_wait(ctx);
443 else
444 parsec_taskpool_wait(tpool);
445 }
446 release_ops();
449 if (own_ctx) {
450 unregister_parsec_tags(nullptr);
451 } else {
452 parsec_context_at_fini(unregister_parsec_tags, nullptr);
453 }
454#if defined(PARSEC_PROF_TRACE)
455 if(nullptr != profiling_array) {
456 free(profiling_array);
457 profiling_array = nullptr;
458 profiling_array_size = 0;
459 }
460#endif
461 if (own_ctx) parsec_fini(&ctx);
462 mark_invalid();
463 }
464 }
465
466 ttg::Edge<> &ctl_edge() { return m_ctl_edge; }
467
468 const ttg::Edge<> &ctl_edge() const { return m_ctl_edge; }
469
470 void increment_created() { taskpool()->tdm.module->taskpool_addto_nb_tasks(taskpool(), 1); }
471
472 void increment_inflight_msg() { taskpool()->tdm.module->taskpool_addto_runtime_actions(taskpool(), 1); }
473 void decrement_inflight_msg() { taskpool()->tdm.module->taskpool_addto_runtime_actions(taskpool(), -1); }
474
475 bool dag_profiling() override { return _dag_profiling; }
476
477 virtual void dag_on(const std::string &filename) override {
478#if defined(PARSEC_PROF_GRAPHER)
479 if(!_dag_profiling) {
480 profile_on();
481 size_t len = strlen(filename.c_str())+32;
482 char ext_filename[len];
483 snprintf(ext_filename, len, "%s-%d.dot", filename.c_str(), rank());
484 parsec_prof_grapher_init(ctx, ext_filename);
485 _dag_profiling = true;
486 }
487#else
488 ttg::print("Error: requested to create '", filename, "' to create a DAG of tasks,\n"
489 "but PaRSEC does not support graphing options. Reconfigure with PARSEC_PROF_GRAPHER=ON\n");
490#endif
491 }
492
493 virtual void dag_off() override {
494#if defined(PARSEC_PROF_GRAPHER)
495 if(_dag_profiling) {
496 parsec_prof_grapher_fini();
497 _dag_profiling = false;
498 }
499#endif
500 }
501
502 virtual void profile_off() override {
503#if defined(PARSEC_PROF_TRACE)
504 _task_profiling = false;
505#endif
506 }
507
508 virtual void profile_on() override {
509#if defined(PARSEC_PROF_TRACE)
510 _task_profiling = true;
511#endif
512 }
513
514 virtual bool profiling() override { return _task_profiling; }
515
517 return mpi_space_support[static_cast<std::size_t>(space)];
518 }
519
520 virtual void final_task() override {
521#ifdef TTG_USE_USER_TERMDET
522 if(parsec_taskpool_started) {
523 taskpool()->tdm.module->taskpool_set_nb_tasks(taskpool(), 0);
524 parsec_taskpool_started = false;
525 }
526#endif // TTG_USE_USER_TERMDET
527 }
528
529 template <typename keyT, typename output_terminalsT, typename derivedT,
530 typename input_valueTs = ttg::typelist<>, ttg::ExecutionSpace Space>
532#if defined(PARSEC_PROF_TRACE)
533 std::stringstream ss;
534 build_composite_name_rec(t->ttg_ptr(), ss);
535 ss << t->get_name();
536 register_new_profiling_event(ss.str().c_str(), t->get_instance_id());
537#endif
538 }
539
540 protected:
541#if defined(PARSEC_PROF_TRACE)
542 void build_composite_name_rec(const ttg::TTBase *t, std::stringstream &ss) {
543 if(nullptr == t)
544 return;
545 build_composite_name_rec(t->ttg_ptr(), ss);
546 ss << t->get_name() << "::";
547 }
548
549 void register_new_profiling_event(const char *name, int position) {
550 if(2*position >= profiling_array_size) {
551 size_t new_profiling_array_size = 64 * ((2*position + 63)/64 + 1);
552 profiling_array = (int*)realloc((void*)profiling_array,
553 new_profiling_array_size * sizeof(int));
554 memset((void*)&profiling_array[profiling_array_size], 0, sizeof(int)*(new_profiling_array_size - profiling_array_size));
555 profiling_array_size = new_profiling_array_size;
556 tpool->profiling_array = profiling_array;
557 }
558
559 assert(0 == tpool->profiling_array[2*position]);
560 assert(0 == tpool->profiling_array[2*position+1]);
561 // TODO PROFILING: 0 and NULL should be replaced with something that depends on the key human-readable serialization...
562 // Typically, we would put something like 3*sizeof(int32_t), "m{int32_t};n{int32_t};k{int32_t}" to say
563 // there are three fields, named m, n and k, stored in this order, and each of size int32_t
564 parsec_profiling_add_dictionary_keyword(name, "fill:000000", 64, "key{char[64]}",
565 (int*)&tpool->profiling_array[2*position],
566 (int*)&tpool->profiling_array[2*position+1]);
567 }
568#endif
569
570 virtual void fence_impl(void) override {
571 int rank = this->rank();
572 if (!parsec_taskpool_started) {
573 ttg::trace("ttg_parsec::(", rank, "): parsec taskpool has not been started, fence is a simple MPI_Barrier");
574 MPI_Barrier(comm());
575 return;
576 }
577 ttg::trace("ttg_parsec::(", rank, "): parsec taskpool is ready for completion");
578 // We are locally ready (i.e. we won't add new tasks)
579 tpool->tdm.module->taskpool_addto_runtime_actions(tpool, -1);
580 ttg::trace("ttg_parsec(", rank, "): waiting for completion");
581 parsec_taskpool_wait(tpool);
582
583 // We need the synchronization between the end of the context and the restart of the taskpool
584 // until we use parsec_taskpool_wait and implement an epoch in the PaRSEC taskpool
585 // see Issue #118 (TTG)
586 MPI_Barrier(comm());
587
589 create_tpool();
590 execute();
591 }
592
593 private:
594 parsec_context_t *ctx = nullptr;
595 bool own_ctx = false; //< whether I own the context
596 parsec_taskpool_t *tpool = nullptr;
597 bool parsec_taskpool_started = false;
598#if defined(PARSEC_PROF_TRACE)
599 int *profiling_array;
600 std::size_t profiling_array_size;
601#endif
602 };
603
604 static void unregister_parsec_tags(void *_pidx)
605 {
606 if(NULL != parsec_ce.tag_unregister) {
607 parsec_ce.tag_unregister(WorldImpl::parsec_ttg_tag());
608 parsec_ce.tag_unregister(WorldImpl::parsec_ttg_rma_tag());
609 }
610 }
611
612 namespace detail {
613
614 const parsec_symbol_t parsec_taskclass_param0 = {
615 .flags = PARSEC_SYMBOL_IS_STANDALONE|PARSEC_SYMBOL_IS_GLOBAL,
616 .name = "HASH0",
617 .context_index = 0,
618 .min = nullptr,
619 .max = nullptr,
620 .expr_inc = nullptr,
621 .cst_inc = 0 };
622 const parsec_symbol_t parsec_taskclass_param1 = {
623 .flags = PARSEC_SYMBOL_IS_STANDALONE|PARSEC_SYMBOL_IS_GLOBAL,
624 .name = "HASH1",
625 .context_index = 1,
626 .min = nullptr,
627 .max = nullptr,
628 .expr_inc = nullptr,
629 .cst_inc = 0 };
630 const parsec_symbol_t parsec_taskclass_param2 = {
631 .flags = PARSEC_SYMBOL_IS_STANDALONE|PARSEC_SYMBOL_IS_GLOBAL,
632 .name = "KEY0",
633 .context_index = 2,
634 .min = nullptr,
635 .max = nullptr,
636 .expr_inc = nullptr,
637 .cst_inc = 0 };
638 const parsec_symbol_t parsec_taskclass_param3 = {
639 .flags = PARSEC_SYMBOL_IS_STANDALONE|PARSEC_SYMBOL_IS_GLOBAL,
640 .name = "KEY1",
641 .context_index = 3,
642 .min = nullptr,
643 .max = nullptr,
644 .expr_inc = nullptr,
645 .cst_inc = 0 };
646
647 inline ttg_data_copy_t *find_copy_in_task(parsec_ttg_task_base_t *task, const void *ptr) {
648 ttg_data_copy_t *res = nullptr;
649 if (task == nullptr || ptr == nullptr) {
650 return res;
651 }
652 for (int i = 0; i < task->data_count; ++i) {
653 auto copy = static_cast<ttg_data_copy_t *>(task->copies[i]);
654 if (NULL != copy && copy->get_ptr() == ptr) {
655 res = copy;
656 break;
657 }
658 }
659 return res;
660 }
661
662 inline int find_index_of_copy_in_task(parsec_ttg_task_base_t *task, const void *ptr) {
663 int i = -1;
664 if (task == nullptr || ptr == nullptr) {
665 return i;
666 }
667 for (i = 0; i < task->data_count; ++i) {
668 auto copy = static_cast<ttg_data_copy_t *>(task->copies[i]);
669 if (NULL != copy && copy->get_ptr() == ptr) {
670 return i;
671 }
672 }
673 return -1;
674 }
675
676 inline bool add_copy_to_task(ttg_data_copy_t *copy, parsec_ttg_task_base_t *task) {
677 if (task == nullptr || copy == nullptr) {
678 return false;
679 }
680
681 if (MAX_PARAM_COUNT < task->data_count) {
682 throw std::logic_error("Too many data copies, check MAX_PARAM_COUNT!");
683 }
684
685 task->copies[task->data_count] = copy;
686 task->data_count++;
687 return true;
688 }
689
690 inline void remove_data_copy(ttg_data_copy_t *copy, parsec_ttg_task_base_t *task) {
691 int i;
692 /* find and remove entry; copies are usually appended and removed, so start from back */
693 for (i = task->data_count-1; i >= 0; --i) {
694 if (copy == task->copies[i]) {
695 break;
696 }
697 }
698 if (i < 0) return;
699 /* move all following elements one up */
700 for (; i < task->data_count - 1; ++i) {
701 task->copies[i] = task->copies[i + 1];
702 }
703 /* null last element */
704 task->copies[i] = nullptr;
705 task->data_count--;
706 }
707
708#if defined(TTG_PARSEC_DEBUG_TRACK_DATA_COPIES)
709#warning "ttg::PaRSEC enables data copy tracking"
710 static std::unordered_set<ttg_data_copy_t *> pending_copies;
711 static std::mutex pending_copies_mutex;
712#endif
713#if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND)
714 static int64_t parsec_ttg_data_copy_uid = 0;
715#endif
716
717 template <typename Value>
718 inline ttg_data_copy_t *create_new_datacopy(Value &&value) {
719 using value_type = std::decay_t<Value>;
720 ttg_data_copy_t *copy = nullptr;
721 if constexpr (std::is_base_of_v<ttg::TTValue<value_type>, value_type> &&
722 std::is_constructible_v<value_type, decltype(value)>) {
723 copy = new value_type(std::forward<Value>(value));
724 } else if constexpr (std::is_constructible_v<ttg_data_value_copy_t<value_type>, decltype(value)>) {
725 copy = new ttg_data_value_copy_t<value_type>(std::forward<Value>(value));
726 } else {
727 /* we have no way to create a new copy from this value */
728 throw std::logic_error("Trying to copy-construct data that is not copy-constructible!");
729 }
730#if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND)
731 // Keep track of additional memory usage
732 if(ttg::default_execution_context().impl().profiling()) {
733 copy->size = sizeof(Value);
734 copy->uid = parsec_atomic_fetch_inc_int64(&parsec_ttg_data_copy_uid);
735 parsec_profiling_ts_trace_flags(ttg::default_execution_context().impl().parsec_ttg_profile_backend_allocate_datacopy,
736 static_cast<uint64_t>(copy->uid),
737 PROFILE_OBJECT_ID_NULL, &copy->size,
738 PARSEC_PROFILING_EVENT_COUNTER|PARSEC_PROFILING_EVENT_HAS_INFO);
739 }
740#endif
741#if defined(TTG_PARSEC_DEBUG_TRACK_DATA_COPIES)
742 {
743 const std::lock_guard<std::mutex> lock(pending_copies_mutex);
744 auto rc = pending_copies.insert(copy);
745 assert(std::get<1>(rc));
746 }
747#endif
748 return copy;
749 }
750
751#if 0
752 template <std::size_t... IS, typename Key = keyT>
753 void invoke_pull_terminals(std::index_sequence<IS...>, const Key &key, detail::parsec_ttg_task_base_t *task) {
754 int junk[] = {0, (invoke_pull_terminal<IS>(
755 std::get<IS>(input_terminals), key, task),
756 0)...};
757 junk[0]++;
758 }
759#endif // 0
760
761 template<typename T>
762 inline void transfer_ownership_impl(T&& arg, int device) {
763 if constexpr(!std::is_const_v<std::remove_reference_t<T>>) {
764 detail::foreach_parsec_data(arg, [&](parsec_data_t *data){
765 parsec_data_transfer_ownership_to_copy(data, device, PARSEC_FLOW_ACCESS_RW);
766 /* make sure we increment the version since we will modify the data */
767 data->device_copies[0]->version++;
768 });
769 }
770 }
771
772 template<typename TT, std::size_t... Is>
773 inline void transfer_ownership(parsec_ttg_task_t<TT> *me, int device, std::index_sequence<Is...>) {
774 /* transfer ownership of each data */
775 int junk[] = {0,
776 (transfer_ownership_impl(
777 *reinterpret_cast<std::remove_reference_t<std::tuple_element_t<Is, typename TT::input_refs_tuple_type>> *>(
778 me->copies[Is]->get_ptr()), device), 0)...};
779 junk[0]++;
780 }
781
782 template<typename TT>
783 inline parsec_hook_return_t hook(struct parsec_execution_stream_s *es, parsec_task_t *parsec_task) {
784 parsec_ttg_task_t<TT> *me = (parsec_ttg_task_t<TT> *)parsec_task;
785 if constexpr(std::tuple_size_v<typename TT::input_values_tuple_type> > 0) {
786 transfer_ownership<TT>(me, 0, std::make_index_sequence<std::tuple_size_v<typename TT::input_values_tuple_type>>{});
787 }
788 return me->template invoke_op<ttg::ExecutionSpace::Host>();
789 }
790
791 template<typename TT>
792 inline parsec_hook_return_t hook_cuda(struct parsec_execution_stream_s *es, parsec_task_t *parsec_task) {
793 if constexpr(TT::derived_has_cuda_op()) {
794 parsec_ttg_task_t<TT> *me = (parsec_ttg_task_t<TT> *)parsec_task;
795 return me->template invoke_op<ttg::ExecutionSpace::CUDA>();
796 } else {
797 std::cerr << "CUDA hook called without having a CUDA op!" << std::endl;
798 return PARSEC_HOOK_RETURN_ERROR;
799 }
800 }
801
802 template<typename TT>
803 inline parsec_hook_return_t hook_hip(struct parsec_execution_stream_s *es, parsec_task_t *parsec_task) {
804 if constexpr(TT::derived_has_hip_op()) {
805 parsec_ttg_task_t<TT> *me = (parsec_ttg_task_t<TT> *)parsec_task;
806 return me->template invoke_op<ttg::ExecutionSpace::HIP>();
807 } else {
808 std::cerr << "HIP hook called without having a HIP op!" << std::endl;
809 return PARSEC_HOOK_RETURN_ERROR;
810 }
811 }
812
813 template<typename TT>
814 inline parsec_hook_return_t hook_level_zero(struct parsec_execution_stream_s *es, parsec_task_t *parsec_task) {
815 if constexpr(TT::derived_has_level_zero_op()) {
816 parsec_ttg_task_t<TT> *me = (parsec_ttg_task_t<TT> *)parsec_task;
817 return me->template invoke_op<ttg::ExecutionSpace::L0>();
818 } else {
819 std::cerr << "L0 hook called without having a L0 op!" << std::endl;
820 return PARSEC_HOOK_RETURN_ERROR;
821 }
822 }
823
824
825 template<typename TT>
826 inline parsec_hook_return_t evaluate_cuda(const parsec_task_t *parsec_task) {
827 if constexpr(TT::derived_has_cuda_op()) {
828 parsec_ttg_task_t<TT> *me = (parsec_ttg_task_t<TT> *)parsec_task;
829 return me->template invoke_evaluate<ttg::ExecutionSpace::CUDA>();
830 } else {
831 return PARSEC_HOOK_RETURN_NEXT;
832 }
833 }
834
835 template<typename TT>
836 inline parsec_hook_return_t evaluate_hip(const parsec_task_t *parsec_task) {
837 if constexpr(TT::derived_has_hip_op()) {
838 parsec_ttg_task_t<TT> *me = (parsec_ttg_task_t<TT> *)parsec_task;
839 return me->template invoke_evaluate<ttg::ExecutionSpace::HIP>();
840 } else {
841 return PARSEC_HOOK_RETURN_NEXT;
842 }
843 }
844
845 template<typename TT>
846 inline parsec_hook_return_t evaluate_level_zero(const parsec_task_t *parsec_task) {
847 if constexpr(TT::derived_has_level_zero_op()) {
848 parsec_ttg_task_t<TT> *me = (parsec_ttg_task_t<TT> *)parsec_task;
849 return me->template invoke_evaluate<ttg::ExecutionSpace::L0>();
850 } else {
851 return PARSEC_HOOK_RETURN_NEXT;
852 }
853 }
854
855
856 template <typename KeyT, typename ActivationCallbackT>
857 class rma_delayed_activate {
858 std::vector<KeyT> _keylist;
859 std::atomic<int> _outstanding_transfers;
860 ActivationCallbackT _cb;
861 detail::ttg_data_copy_t *_copy;
862
863 public:
864 rma_delayed_activate(std::vector<KeyT> &&key, detail::ttg_data_copy_t *copy, int num_transfers, ActivationCallbackT cb)
865 : _keylist(std::move(key)), _outstanding_transfers(num_transfers), _cb(cb), _copy(copy) {}
866
867 bool complete_transfer(void) {
868 assert(_outstanding_transfers > 0);
869 int left = --_outstanding_transfers;
870 if (0 == left) {
871 _cb(std::move(_keylist), _copy);
872 return true;
873 }
874 return false;
875 }
876 };
877
878 template <typename ActivationT>
879 static int get_complete_cb(parsec_comm_engine_t *comm_engine, parsec_ce_mem_reg_handle_t lreg, ptrdiff_t ldispl,
880 parsec_ce_mem_reg_handle_t rreg, ptrdiff_t rdispl, size_t size, int remote,
881 void *cb_data) {
882 parsec_ce.mem_unregister(&lreg);
883 ActivationT *activation = static_cast<ActivationT *>(cb_data);
884 if (activation->complete_transfer()) {
885 delete activation;
886 }
887 return PARSEC_SUCCESS;
888 }
889
890 static int get_remote_complete_cb(parsec_comm_engine_t *ce, parsec_ce_tag_t tag, void *msg, size_t msg_size,
891 int src, void *cb_data) {
892 std::intptr_t *fn_ptr = static_cast<std::intptr_t *>(msg);
893 std::function<void(void)> *fn = reinterpret_cast<std::function<void(void)> *>(*fn_ptr);
894 (*fn)();
895 delete fn;
896 return PARSEC_SUCCESS;
897 }
898
899 template <typename FuncT>
900 static int invoke_get_remote_complete_cb(parsec_comm_engine_t *ce, parsec_ce_tag_t tag, void *msg, size_t msg_size,
901 int src, void *cb_data) {
902 std::intptr_t *iptr = static_cast<std::intptr_t *>(msg);
903 FuncT *fn_ptr = reinterpret_cast<FuncT *>(*iptr);
904 (*fn_ptr)();
905 delete fn_ptr;
906 return PARSEC_SUCCESS;
907 }
908
909 inline void release_data_copy(ttg_data_copy_t *copy) {
910 if (copy->is_mutable() && nullptr == copy->get_next_task()) {
911 /* current task mutated the data but there are no consumers so prepare
912 * the copy to be freed below */
913 copy->reset_readers();
914 }
915
916 int32_t readers = copy->num_readers();
917 if (readers > 1) {
918 /* potentially more than one reader, decrement atomically */
919 readers = copy->decrement_readers();
920 } else if (readers == 1) {
921 /* make sure readers drop to zero */
922 readers = copy->decrement_readers<false>();
923 }
924 /* if there was only one reader (the current task) or
925 * a mutable copy and a successor, we release the copy */
926 if (1 == readers || readers == copy->mutable_tag) {
927 std::atomic_thread_fence(std::memory_order_acquire);
928 if (nullptr != copy->get_next_task()) {
929 /* Release the deferred task.
930 * The copy was mutable and will be mutated by the released task,
931 * so simply transfer ownership.
932 */
933 parsec_task_t *next_task = copy->get_next_task();
934 copy->set_next_task(nullptr);
935 parsec_ttg_task_base_t *deferred_op = (parsec_ttg_task_base_t *)next_task;
936 copy->mark_mutable();
937 deferred_op->release_task();
938 } else if ((1 == copy->num_ref()) || (1 == copy->drop_ref())) {
939 /* we are the last reference, delete the copy */
940#if defined(TTG_PARSEC_DEBUG_TRACK_DATA_COPIES)
941 {
942 const std::lock_guard<std::mutex> lock(pending_copies_mutex);
943 size_t rc = pending_copies.erase(copy);
944 assert(1 == rc);
945 }
946#endif
947#if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND)
948 // Keep track of additional memory usage
949 if(ttg::default_execution_context().impl().profiling()) {
950 parsec_profiling_ts_trace_flags(ttg::default_execution_context().impl().parsec_ttg_profile_backend_free_datacopy,
951 static_cast<uint64_t>(copy->uid),
952 PROFILE_OBJECT_ID_NULL, &copy->size,
953 PARSEC_PROFILING_EVENT_COUNTER|PARSEC_PROFILING_EVENT_HAS_INFO);
954 }
955#endif
956 delete copy;
957 }
958 }
959 }
960
961 template <typename Value>
962 inline ttg_data_copy_t *register_data_copy(ttg_data_copy_t *copy_in, parsec_ttg_task_base_t *task, bool readonly) {
963 ttg_data_copy_t *copy_res = copy_in;
964 bool replace = false;
965 int32_t readers = copy_in->num_readers();
966 assert(readers != 0);
967
968 if (readonly && !copy_in->is_mutable()) {
969 /* simply increment the number of readers */
970 readers = copy_in->increment_readers();
971 }
972
973 if (readers == copy_in->mutable_tag) {
974 if (copy_res->get_next_task() != nullptr) {
975 if (readonly) {
976 parsec_ttg_task_base_t *next_task = reinterpret_cast<parsec_ttg_task_base_t *>(copy_res->get_next_task());
977 if (next_task->defer_writer) {
978 /* there is a writer but it signalled that it wants to wait for readers to complete */
979 return copy_res;
980 }
981 }
982 }
983 /* someone is going to write into this copy -> we need to make a copy */
984 copy_res = NULL;
985 if (readonly) {
986 /* we replace the copy in a deferred task if the copy will be mutated by
987 * the deferred task and we are readonly.
988 * That way, we can share the copy with other readonly tasks and release
989 * the deferred task. */
990 replace = true;
991 }
992 } else if (!readonly) {
993 /* this task will mutate the data
994 * check whether there are other readers already and potentially
995 * defer the release of this task to give following readers a
996 * chance to make a copy of the data before this task mutates it
997 *
998 * Try to replace the readers with a negative value that indicates
999 * the value is mutable. If that fails we know that there are other
1000 * readers or writers already.
1001 *
1002 * NOTE: this check is not atomic: either there is a single reader
1003 * (current task) or there are others, in which we case won't
1004 * touch it.
1005 */
1006 /* try hard to defer writers if we cannot make copies
1007 * if deferral fails we have to bail out */
1008 bool defer_writer = (!std::is_copy_constructible_v<std::decay_t<Value>>) ||
1009 ((nullptr != task) && task->defer_writer);
1010
1011 if (1 == copy_in->num_readers() && !defer_writer) {
1016 assert(nullptr == copy_in->get_next_task());
1017 copy_in->set_next_task(&task->parsec_task);
1018 std::atomic_thread_fence(std::memory_order_release);
1019 copy_in->mark_mutable();
1020 } else {
1021 if (defer_writer && nullptr == copy_in->get_next_task()) {
1022 /* we're the first writer and want to wait for all readers to complete */
1023 copy_res->set_next_task(&task->parsec_task);
1024 task->defer_writer = true;
1025 } else {
1026 /* there are writers and/or waiting already of this copy already, make a copy that we can mutate */
1027 copy_res = NULL;
1028 }
1029 }
1030 }
1031
1032 if (NULL == copy_res) {
1033 // can only make a copy if Value is copy-constructible ... so this codepath should never be hit
1034 if constexpr (std::is_copy_constructible_v<std::decay_t<Value>>) {
1035 ttg_data_copy_t *new_copy = detail::create_new_datacopy(*static_cast<Value *>(copy_in->get_ptr()));
1036 if (replace && nullptr != copy_in->get_next_task()) {
1037 /* replace the task that was deferred */
1038 parsec_ttg_task_base_t *deferred_op = (parsec_ttg_task_base_t *)copy_in->get_next_task();
1039 new_copy->mark_mutable();
1040 /* replace the copy in the deferred task */
1041 for (int i = 0; i < deferred_op->data_count; ++i) {
1042 if (deferred_op->copies[i] == copy_in) {
1043 deferred_op->copies[i] = new_copy;
1044 break;
1045 }
1046 }
1047 copy_in->set_next_task(nullptr);
1048 deferred_op->release_task();
1049 copy_in->reset_readers(); // set the copy back to being read-only
1050 copy_in->increment_readers<false>(); // register as reader
1051 copy_res = copy_in; // return the copy we were passed
1052 } else {
1053 if (!readonly) {
1054 new_copy->mark_mutable();
1055 }
1056 copy_res = new_copy; // return the new copy
1057 }
1058 }
1059 else {
1060 throw std::logic_error(std::string("TTG::PaRSEC: need to copy a datum of type") + typeid(std::decay_t<Value>).name() + " but the type is not copyable");
1061 }
1062 }
1063 return copy_res;
1064 }
1065
1066 } // namespace detail
1067
1068 inline void ttg_initialize(int argc, char **argv, int num_threads, parsec_context_t *ctx) {
1069 if (detail::initialized_mpi()) throw std::runtime_error("ttg_parsec::ttg_initialize: can only be called once");
1070
1071 // make sure it's not already initialized
1072 int mpi_initialized;
1073 MPI_Initialized(&mpi_initialized);
1074 if (!mpi_initialized) { // MPI not initialized? do it, remember that we did it
1075 int provided;
1076 MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided);
1077 if (!provided)
1078 throw std::runtime_error("ttg_parsec::ttg_initialize: MPI_Init_thread did not provide MPI_THREAD_MULTIPLE");
1079 detail::initialized_mpi() = true;
1080 } else { // no way to test that MPI was initialized with MPI_THREAD_MULTIPLE, cross fingers and proceed
1081 }
1082
1083 if (num_threads < 1) num_threads = ttg::detail::num_threads();
1084 auto world_ptr = new ttg_parsec::WorldImpl{&argc, &argv, num_threads, ctx};
1085 std::shared_ptr<ttg::base::WorldImplBase> world_sptr{static_cast<ttg::base::WorldImplBase *>(world_ptr)};
1086 ttg::World world{std::move(world_sptr)};
1087 ttg::detail::set_default_world(std::move(world));
1088
1089 // query the first device ID
1091 for (int i = 0; i < parsec_nb_devices; ++i) {
1092 bool is_gpu = parsec_mca_device_is_gpu(i);
1093 if (detail::first_device_id == -1 && is_gpu) {
1095 } else if (detail::first_device_id > -1 && !is_gpu) {
1096 throw std::runtime_error("PaRSEC: Found non-GPU device in GPU ID range!");
1097 }
1098 }
1099
1100 /* parse the maximum inline size */
1101 const char* ttg_max_inline_cstr = std::getenv("TTG_MAX_INLINE");
1102 if (nullptr != ttg_max_inline_cstr) {
1103 std::size_t inline_size = std::atol(ttg_max_inline_cstr);
1104 if (inline_size < detail::max_inline_size) {
1105 detail::max_inline_size = inline_size;
1106 }
1107 }
1108
1109 bool all_peer_access = true;
1110 /* check whether all GPUs can access all peer GPUs */
1111 for (int i = 0; (i < parsec_nb_devices) && all_peer_access; ++i) {
1112 parsec_device_module_t *idevice = parsec_mca_device_get(i);
1113 if (PARSEC_DEV_IS_GPU(idevice->type)) {
1114 parsec_device_gpu_module_t *gpu_device = (parsec_device_gpu_module_t*)idevice;
1115 for (int j = 0; (j < parsec_nb_devices) && all_peer_access; ++j) {
1116 if (i != j) { // because no device can access itself, says PaRSEC
1117 parsec_device_module_t *jdevice = parsec_mca_device_get(j);
1118 if (PARSEC_DEV_IS_GPU(jdevice->type)) {
1119 all_peer_access &= (gpu_device->peer_access_mask & (1<<j)) ? true : false;
1120 }
1121 }
1122 }
1123 }
1124 }
1125 detail::all_devices_peer_access = all_peer_access;
1126 }
1127 inline void ttg_finalize() {
1128 // We need to notify the current taskpool of termination if we are in user termination detection mode
1129 // or the parsec_context_wait() in destroy_worlds() will never complete
1130 if(0 == ttg::default_execution_context().rank())
1131 ttg::default_execution_context().impl().final_task();
1132 ttg::detail::set_default_world(ttg::World{}); // reset the default world
1134 ttg::detail::destroy_worlds<ttg_parsec::WorldImpl>();
1135 if (detail::initialized_mpi()) MPI_Finalize();
1136 }
1138 [[noreturn]]
1139 inline void ttg_abort() { MPI_Abort(ttg_default_execution_context().impl().comm(), 1); std::abort(); }
1140 inline void ttg_execute(ttg::World world) { world.impl().execute(); }
1141 inline void ttg_fence(ttg::World world) { world.impl().fence(); }
1142
1143 template <typename T>
1144 inline void ttg_register_ptr(ttg::World world, const std::shared_ptr<T> &ptr) {
1145 world.impl().register_ptr(ptr);
1146 }
1147
1148 template <typename T>
1149 inline void ttg_register_ptr(ttg::World world, std::unique_ptr<T> &&ptr) {
1150 world.impl().register_ptr(std::move(ptr));
1151 }
1152
1153 inline void ttg_register_status(ttg::World world, const std::shared_ptr<std::promise<void>> &status_ptr) {
1154 world.impl().register_status(status_ptr);
1155 }
1156
1157 template <typename Callback>
1158 inline void ttg_register_callback(ttg::World world, Callback &&callback) {
1159 world.impl().register_callback(std::forward<Callback>(callback));
1160 }
1161
1162 inline ttg::Edge<> &ttg_ctl_edge(ttg::World world) { return world.impl().ctl_edge(); }
1163
1164 inline void ttg_sum(ttg::World world, double &value) {
1165 double result = 0.0;
1166 MPI_Allreduce(&value, &result, 1, MPI_DOUBLE, MPI_SUM, world.impl().comm());
1167 value = result;
1168 }
1169
1170 inline void make_executable_hook(ttg::World& world) {
1171 MPI_Barrier(world.impl().comm());
1172 }
1173
1176 template <typename T>
1177 void ttg_broadcast(::ttg::World world, T &data, int source_rank) {
1178 int64_t BUFLEN;
1179 if (world.rank() == source_rank) {
1181 }
1182 MPI_Bcast(&BUFLEN, 1, MPI_INT64_T, source_rank, world.impl().comm());
1183
1184 unsigned char *buf = new unsigned char[BUFLEN];
1185 if (world.rank() == source_rank) {
1186 ttg::default_data_descriptor<T>::pack_payload(&data, BUFLEN, 0, buf);
1187 }
1188 MPI_Bcast(buf, BUFLEN, MPI_UNSIGNED_CHAR, source_rank, world.impl().comm());
1189 if (world.rank() != source_rank) {
1191 }
1192 delete[] buf;
1193 }
1194
1195 namespace detail {
1196
1198 protected:
1199 // static std::map<int, ParsecBaseTT*> function_id_to_instance;
1200 parsec_hash_table_t tasks_table;
1201 parsec_hash_table_t task_constraint_table;
1202 parsec_task_class_t self;
1203 };
1204
1205 } // namespace detail
1206
1207 template <typename keyT, typename output_terminalsT, typename derivedT, typename input_valueTs, ttg::ExecutionSpace Space>
1209 private:
1212 "The fourth template for ttg::TT must be a ttg::typelist containing the input types");
1213 // create a virtual control input if the input list is empty, to be used in invoke()
1214 using actual_input_tuple_type = std::conditional_t<!ttg::meta::typelist_is_empty_v<input_valueTs>,
1216 using input_tuple_type = ttg::meta::typelist_to_tuple_t<input_valueTs>;
1218 "Second template argument for ttg::TT must be std::tuple containing the output terminal types");
1219 static_assert((ttg::meta::none_has_reference_v<input_valueTs>), "Input typelist cannot contain reference types");
1220 static_assert(ttg::meta::is_none_Void_v<input_valueTs>, "ttg::Void is for internal use only, do not use it");
1221
1222 parsec_mempool_t mempools;
1223
1224 // check for a non-type member named have_cuda_op
1225 template <typename T>
1226 using have_cuda_op_non_type_t = decltype(T::have_cuda_op);
1227
1228 template <typename T>
1229 using have_hip_op_non_type_t = decltype(T::have_hip_op);
1230
1231 template <typename T>
1232 using have_level_zero_op_non_type_t = decltype(T::have_level_zero_op);
1233
1234 bool alive = true;
1235
1236 static constexpr int numinedges = std::tuple_size_v<input_tuple_type>; // number of input edges
1237 static constexpr int numins = std::tuple_size_v<actual_input_tuple_type>; // number of input arguments
1238 static constexpr int numouts = std::tuple_size_v<output_terminalsT>; // number of outputs
1239 static constexpr int numflows = std::max(numins, numouts); // max number of flows
1240
1241 public:
1243 template<typename DerivedT = derivedT>
1244 static constexpr bool derived_has_cuda_op() {
1246 }
1247
1249 template<typename DerivedT = derivedT>
1250 static constexpr bool derived_has_hip_op() {
1252 }
1253
1255 template<typename DerivedT = derivedT>
1256 static constexpr bool derived_has_level_zero_op() {
1258 }
1259
1261 template<typename DerivedT = derivedT>
1267
1270 "Data sent from a device-capable template task must be serializable.");
1271
1272 using ttT = TT;
1275 using input_args_type = actual_input_tuple_type;
1277 // if have data inputs and (always last) control input, convert last input to Void to make logic easier
1284
1285 static constexpr int numinvals =
1286 std::tuple_size_v<input_refs_tuple_type>; // number of input arguments with values (i.e. omitting the control
1287 // input, if any)
1288
1291
1292 template <std::size_t i, typename resultT, typename InTuple>
1294 return static_cast<resultT>(std::get<i>(std::forward<InTuple>(intuple)));
1295 };
1296 template <std::size_t i, typename InTuple>
1297 static auto &get(InTuple &&intuple) {
1298 return std::get<i>(std::forward<InTuple>(intuple));
1299 };
1300
1301 private:
1302 using task_t = detail::parsec_ttg_task_t<ttT>;
1303
1305 friend task_t;
1306
1307 /* the offset of the key placed after the task structure in the memory from mempool */
1308 constexpr static const size_t task_key_offset = sizeof(task_t);
1309
1310 input_terminals_type input_terminals;
1311 output_terminalsT output_terminals;
1312
1313 protected:
1314 const auto &get_output_terminals() const { return output_terminals; }
1315
1316 private:
1317 template <std::size_t... IS>
1318 static constexpr auto make_set_args_fcts(std::index_sequence<IS...>) {
1319 using resultT = decltype(set_arg_from_msg_fcts);
1320 return resultT{{&TT::set_arg_from_msg<IS>...}};
1321 }
1322 constexpr static std::array<void (TT::*)(void *, std::size_t), numins> set_arg_from_msg_fcts =
1323 make_set_args_fcts(std::make_index_sequence<numins>{});
1324
1325 template <std::size_t... IS>
1326 static constexpr auto make_set_size_fcts(std::index_sequence<IS...>) {
1327 using resultT = decltype(set_argstream_size_from_msg_fcts);
1329 }
1330 constexpr static std::array<void (TT::*)(void *, std::size_t), numins> set_argstream_size_from_msg_fcts =
1331 make_set_size_fcts(std::make_index_sequence<numins>{});
1332
1333 template <std::size_t... IS>
1334 static constexpr auto make_finalize_argstream_fcts(std::index_sequence<IS...>) {
1335 using resultT = decltype(finalize_argstream_from_msg_fcts);
1337 }
1338 constexpr static std::array<void (TT::*)(void *, std::size_t), numins> finalize_argstream_from_msg_fcts =
1339 make_finalize_argstream_fcts(std::make_index_sequence<numins>{});
1340
1341 template <std::size_t... IS>
1342 static constexpr auto make_get_from_pull_fcts(std::index_sequence<IS...>) {
1343 using resultT = decltype(get_from_pull_msg_fcts);
1344 return resultT{{&TT::get_from_pull_msg<IS>...}};
1345 }
1346 constexpr static std::array<void (TT::*)(void *, std::size_t), numinedges> get_from_pull_msg_fcts =
1347 make_get_from_pull_fcts(std::make_index_sequence<numinedges>{});
1348
1349 template<std::size_t... IS>
1350 constexpr static auto make_input_is_const(std::index_sequence<IS...>) {
1351 using resultT = decltype(input_is_const);
1352 return resultT{{std::is_const_v<std::tuple_element_t<IS, input_args_type>>...}};
1353 }
1354 constexpr static std::array<bool, numins> input_is_const = make_input_is_const(std::make_index_sequence<numins>{});
1355
1356 ttg::World world;
1357 ttg::meta::detail::keymap_t<keyT> keymap;
1358 ttg::meta::detail::keymap_t<keyT> priomap;
1359 ttg::meta::detail::keymap_t<keyT, ttg::device::Device> devicemap;
1360 // For now use same type for unary/streaming input terminals, and stream reducers assigned at runtime
1361 ttg::meta::detail::input_reducers_t<actual_input_tuple_type>
1362 input_reducers;
1363 std::array<parsec_task_class_t*, numins> inpute_reducers_taskclass = { nullptr };
1364 std::array<std::size_t, numins> static_stream_goal = { std::numeric_limits<std::size_t>::max() };
1365 int num_pullins = 0;
1366
1367 bool m_defer_writer = TTG_PARSEC_DEFER_WRITER;
1368
1369 std::vector<ttg::meta::detail::constraint_callback_t<keyT>> constraints_check;
1370 std::vector<ttg::meta::detail::constraint_callback_t<keyT>> constraints_complete;
1371
1372 public:
1374
1375 private:
1379 template <typename... Args>
1380 auto op(Args &&...args) {
1381 derivedT *derived = static_cast<derivedT *>(this);
1382 using return_type = decltype(derived->op(std::forward<Args>(args)...));
1383 if constexpr (std::is_same_v<return_type,void>) {
1384 derived->op(std::forward<Args>(args)...);
1385 return;
1386 }
1387 else {
1388 return derived->op(std::forward<Args>(args)...);
1389 }
1390 }
1391
1392 template <std::size_t i, typename terminalT, typename Key>
1393 void invoke_pull_terminal(terminalT &in, const Key &key, detail::parsec_ttg_task_base_t *task) {
1394 if (in.is_pull_terminal) {
1395 auto owner = in.container.owner(key);
1396 if (owner != world.rank()) {
1398 } else {
1399 // push the data to the task
1400 set_arg<i>(key, (in.container).get(key));
1401 }
1402 }
1403 }
1404
1405 template <std::size_t i, typename Key>
1406 void get_pull_terminal_data_from(const int owner,
1407 const Key &key) {
1408 using msg_t = detail::msg_t;
1409 auto &world_impl = world.impl();
1410 parsec_taskpool_t *tp = world_impl.taskpool();
1411 std::unique_ptr<msg_t> msg = std::make_unique<msg_t>(get_instance_id(), tp->taskpool_id,
1413 world.rank(), 1);
1414 /* pack the key */
1415 size_t pos = 0;
1416 pos = pack(key, msg->bytes, pos);
1417 tp->tdm.module->outgoing_message_start(tp, owner, NULL);
1418 tp->tdm.module->outgoing_message_pack(tp, owner, NULL, NULL, 0);
1419 parsec_ce.send_am(&parsec_ce, world_impl.parsec_ttg_tag(), owner, static_cast<void *>(msg.get()),
1420 sizeof(msg_header_t) + pos);
1421 }
1422
1423 template <std::size_t... IS, typename Key = keyT>
1424 void invoke_pull_terminals(std::index_sequence<IS...>, const Key &key, detail::parsec_ttg_task_base_t *task) {
1425 int junk[] = {0, (invoke_pull_terminal<IS>(
1426 std::get<IS>(input_terminals), key, task),
1427 0)...};
1428 junk[0]++;
1429 }
1430
1431 template <std::size_t... IS>
1432 static input_refs_tuple_type make_tuple_of_ref_from_array(task_t *task, std::index_sequence<IS...>) {
1433 return input_refs_tuple_type{static_cast<std::tuple_element_t<IS, input_refs_tuple_type>>(
1434 *reinterpret_cast<std::remove_reference_t<std::tuple_element_t<IS, input_refs_tuple_type>> *>(
1435 task->copies[IS]->get_ptr()))...};
1436 }
1437
1438#ifdef TTG_HAVE_DEVICE
1443 parsec_gpu_task_t *gpu_task,
1445
1446 task_t *task = (task_t*)gpu_task->ec;
1447 // get the device task from the coroutine handle
1448 ttg::device::Task dev_task = ttg::device::detail::device_task_handle_type::from_address(task->suspended_task_address);
1449
1450 task->dev_ptr->stream = gpu_stream;
1451
1452 //std::cout << "device_static_submit task " << task << std::endl;
1453
1454 // get the promise which contains the views
1455 auto dev_data = dev_task.promise();
1456
1457 if (dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_SENDOUT) {
1458 /* The device task is sending out already but we go through the device scheduler
1459 * for a pushout. We just report this task done, no callbacks to invoke. */
1461 }
1462
1463 /* we're waiting for the initial transfers or for the kernel to complete */
1464 assert(dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_WAIT_TRANSFER ||
1465 dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_WAIT_KERNEL);
1466
1467#if defined(PARSEC_HAVE_DEV_CUDA_SUPPORT) && defined(TTG_HAVE_CUDA)
1468 {
1470 int device = detail::parsec_device_to_ttg_device(gpu_device->super.device_index);
1471 ttg::device::detail::set_current(device, cuda_stream->cuda_stream);
1472 }
1473#elif defined(PARSEC_HAVE_DEV_HIP_SUPPORT) && defined(TTG_HAVE_HIP)
1474 {
1476 int device = detail::parsec_device_to_ttg_device(gpu_device->super.device_index);
1477 ttg::device::detail::set_current(device, hip_stream->hip_stream);
1478 }
1479#elif defined(PARSEC_HAVE_DEV_LEVEL_ZERO_SUPPORT) && defined(TTG_HAVE_LEVEL_ZERO)
1480 {
1483 int device = detail::parsec_device_to_ttg_device(gpu_device->super.device_index);
1484 ttg::device::detail::set_current(device, stream->swq->queue);
1485 }
1486#endif // defined(PARSEC_HAVE_DEV_LEVEL_ZERO_SUPPORT) && defined(TTG_HAVE_LEVEL_ZERO)
1487
1488 /* Here we call back into the coroutine again after the transfers have completed */
1489 static_op(&task->parsec_task);
1490
1492
1493 auto discard_tmp_flows = [&](){
1494 for (int i = 0; i < MAX_PARAM_COUNT; ++i) {
1495 if (gpu_task->flow[i]->flow_flags & TTG_PARSEC_FLOW_ACCESS_TMP) {
1496 /* temporary flow, discard by setting it to read-only to avoid evictions */
1497 const_cast<parsec_flow_t*>(gpu_task->flow[i])->flow_flags = PARSEC_FLOW_ACCESS_READ;
1498 task->parsec_task.data[i].data_out->readers = 1;
1499 }
1500 }
1501 };
1502
1503 /* we will come back into this function once the kernel and transfers are done */
1505 if (nullptr != task->suspended_task_address) {
1506 /* Get a new handle for the promise*/
1507 dev_task = ttg::device::detail::device_task_handle_type::from_address(task->suspended_task_address);
1508 dev_data = dev_task.promise();
1509
1510 assert(dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_WAIT_KERNEL ||
1511 dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_SENDOUT ||
1512 dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_COMPLETE);
1513
1514 if (ttg::device::detail::TTG_DEVICE_CORO_SENDOUT == dev_data.state() ||
1515 ttg::device::detail::TTG_DEVICE_CORO_COMPLETE == dev_data.state()) {
1516 /* the task started sending so we won't come back here */
1517 //std::cout << "device_static_submit task " << task << " complete" << std::endl;
1519 } else {
1520 //std::cout << "device_static_submit task " << task << " return-again" << std::endl;
1522 }
1523 } else {
1524 /* the task is done so we won't come back here */
1525 //std::cout << "device_static_submit task " << task << " complete" << std::endl;
1527 }
1528 return rc;
1529 }
1530
1532
1533 task_t *task = (task_t*)parsec_task;
1534 if (task->dev_ptr->gpu_task == nullptr) {
1535
1536 /* set up a device task */
1537 parsec_gpu_task_t *gpu_task;
1538 /* PaRSEC wants to free the gpu_task, because F***K ownerships */
1539 gpu_task = static_cast<parsec_gpu_task_t*>(std::calloc(1, sizeof(*gpu_task)));
1541 gpu_task->ec = parsec_task;
1542 gpu_task->task_type = 0; // user task
1543 gpu_task->last_data_check_epoch = std::numeric_limits<uint64_t>::max(); // used internally
1544 gpu_task->pushout = 0;
1545 gpu_task->submit = &TT::device_static_submit;
1546 parsec_flow_t *flows = task->dev_ptr->flows;
1547 /* set up flow information so it is available for pushout */
1548 for (uint8_t i = 0; i < MAX_PARAM_COUNT; ++i) {
1549 gpu_task->flow[i] = &flows[i]; // will be set in register_device_memory
1550 /* ignore the flow */
1551 flows[i] = parsec_flow_t{.name = nullptr,
1552 .sym_type = PARSEC_FLOW_ACCESS_NONE,
1553 .flow_flags = 0,
1554 .flow_index = i,
1555 .flow_datatype_mask = ~0 };
1556 }
1557
1558 // one way to force the task device
1559 // currently this will probably break all of PaRSEC if this hint
1560 // does not match where the data is located, not really useful for us
1561 // instead we set a hint on the data if there is no hint set yet
1562 //parsec_task->selected_device = ...;
1563
1564 /* set the gpu_task so it's available in register_device_memory */
1565 task->dev_ptr->gpu_task = gpu_task;
1566
1567 /* TODO: is this the right place to set the mask? */
1568 task->parsec_task.chore_mask = PARSEC_DEV_ALL;
1569
1570 /* copy over the task class, because that's what we need */
1571 task->dev_ptr->task_class = *task->parsec_task.task_class;
1572
1573 // first invocation of the coroutine to get the coroutine handle
1574 static_op(parsec_task);
1575
1576 /* when we come back here, the flows in gpu_task are set (see register_device_memory) */
1577
1578 parsec_task_class_t& tc = task->dev_ptr->task_class;
1579
1580 // input flows are set up during register_device_memory as part of the first invocation above
1581 for (int i = 0; i < MAX_PARAM_COUNT; ++i) {
1582 tc.in[i] = gpu_task->flow[i];
1583 tc.out[i] = gpu_task->flow[i];
1584 }
1585 tc.nb_flows = MAX_PARAM_COUNT;
1586
1587 /* set the device hint on the data */
1588 TT *tt = task->tt;
1589 if (tt->devicemap) {
1590 int parsec_dev;
1591 if constexpr (std::is_void_v<keyT>) {
1593 } else {
1595 }
1596 for (int i = 0; i < MAX_PARAM_COUNT; ++i) {
1597 /* only set on mutable data since we have exclusive access */
1598 if (tc.in[i] != nullptr && (tc.in[i]->flow_flags & PARSEC_FLOW_ACCESS_WRITE)) {
1599 parsec_data_t *data = parsec_task->data[i].data_in->original;
1600 /* only set the preferred device if the host has the latest copy
1601 * as otherwise we may end up with the wrong data if there is a newer
1602 * version on a different device. Also, keep fingers crossed. */
1603 if (data->owner_device == 0) {
1605 }
1606 }
1607 }
1608 }
1609
1610 /* set the new task class that contains the flows */
1611 task->parsec_task.task_class = &task->dev_ptr->task_class;
1612
1613 /* select this one */
1615 }
1616
1617 std::cerr << "EVALUATE called on task with assigned GPU task!" << std::endl;
1618
1619 /* not sure if this might happen*/
1621
1622 }
1623
1625 static_assert(derived_has_device_op());
1626
1627 /* when we come in here we have a device assigned and are ready to go */
1628
1629 task_t *task = (task_t*)parsec_task;
1630
1631 if (nullptr == task->suspended_task_address) {
1632 /* short-cut in case the task returned immediately */
1634 }
1635
1636 // get the device task from the coroutine handle
1637 auto dev_task = ttg::device::detail::device_task_handle_type::from_address(task->suspended_task_address);
1638
1639 // get the promise which contains the views
1640 ttg::device::detail::device_task_promise_type& dev_data = dev_task.promise();
1641
1642 parsec_gpu_task_t *gpu_task = task->dev_ptr->gpu_task;
1643
1644 if (dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_SENDOUT) {
1645 /* check if any of the copies needs to be pushed out
1646 * if so, we need to push the task into the scheduler where the pushout can happen */
1647 if (gpu_task->pushout == 0) {
1649 }
1650 }
1651
1652 if (dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_COMPLETE) {
1653 /* the task returned immediately so we're done */
1655 }
1656
1657 parsec_device_gpu_module_t *device = (parsec_device_gpu_module_t*)task->parsec_task.selected_device;
1658 assert(NULL != device);
1659
1660 task->dev_ptr->device = device;
1661 parsec_execution_stream_s *es = task->tt->world.impl().execution_stream();
1662
1663 switch(device->super.type) {
1664
1665#if defined(PARSEC_HAVE_DEV_CUDA_SUPPORT)
1666 case PARSEC_DEV_CUDA:
1667 if constexpr (Space == ttg::ExecutionSpace::CUDA) {
1668 /* TODO: we need custom staging functions because PaRSEC looks at the
1669 * task-class to determine the number of flows. */
1670 gpu_task->stage_in = parsec_default_gpu_stage_in;
1671 gpu_task->stage_out = parsec_default_gpu_stage_out;
1672 return parsec_device_kernel_scheduler(&device->super, es, gpu_task);
1673 }
1674 break;
1675#endif
1676#if defined(PARSEC_HAVE_DEV_HIP_SUPPORT)
1677 case PARSEC_DEV_HIP:
1678 if constexpr (Space == ttg::ExecutionSpace::HIP) {
1679 gpu_task->stage_in = parsec_default_gpu_stage_in;
1680 gpu_task->stage_out = parsec_default_gpu_stage_out;
1681 return parsec_device_kernel_scheduler(&device->super, es, gpu_task);
1682 }
1683 break;
1684#endif // PARSEC_HAVE_DEV_HIP_SUPPORT
1685#if defined(PARSEC_HAVE_DEV_LEVEL_ZERO_SUPPORT)
1687 if constexpr (Space == ttg::ExecutionSpace::L0) {
1688 gpu_task->stage_in = parsec_default_gpu_stage_in;
1689 gpu_task->stage_out = parsec_default_gpu_stage_out;
1690 return parsec_device_kernel_scheduler(&device->super, es, gpu_task);
1691 }
1692 break;
1693#endif // PARSEC_HAVE_DEV_LEVEL_ZERO_SUPPORT
1694 default:
1695 break;
1696 }
1697 ttg::print_error(task->tt->get_name(), " : received mismatching device type ", (int)device->super.type, " from PaRSEC");
1698 ttg::abort();
1699 return PARSEC_HOOK_RETURN_DONE; // will not be reacehed
1700 }
1701#endif // TTG_HAVE_DEVICE
1702
1703 static parsec_hook_return_t static_op(parsec_task_t *parsec_task) {
1704
1705 task_t *task = (task_t*)parsec_task;
1706 void* suspended_task_address =
1707#ifdef TTG_HAVE_COROUTINE
1708 task->suspended_task_address; // non-null = need to resume the task
1709#else // TTG_HAVE_COROUTINE
1710 nullptr;
1711#endif // TTG_HAVE_COROUTINE
1712 //std::cout << "static_op: suspended_task_address " << suspended_task_address << std::endl;
1713 if (suspended_task_address == nullptr) { // task is a coroutine that has not started or an ordinary function
1714
1715 ttT *baseobj = task->tt;
1716 derivedT *obj = static_cast<derivedT *>(baseobj);
1718 detail::parsec_ttg_caller = static_cast<detail::parsec_ttg_task_base_t*>(task);
1719 if (obj->tracing()) {
1720 if constexpr (!ttg::meta::is_void_v<keyT>)
1721 ttg::trace(obj->get_world().rank(), ":", obj->get_name(), " : ", task->key, ": executing");
1722 else
1723 ttg::trace(obj->get_world().rank(), ":", obj->get_name(), " : executing");
1724 }
1725
1727 auto input = make_tuple_of_ref_from_array(task, std::make_index_sequence<numinvals>{});
1728 TTG_PROCESS_TT_OP_RETURN(suspended_task_address, task->coroutine_id, baseobj->op(task->key, std::move(input), obj->output_terminals));
1730 TTG_PROCESS_TT_OP_RETURN(suspended_task_address, task->coroutine_id, baseobj->op(task->key, obj->output_terminals));
1732 auto input = make_tuple_of_ref_from_array(task, std::make_index_sequence<numinvals>{});
1733 TTG_PROCESS_TT_OP_RETURN(suspended_task_address, task->coroutine_id, baseobj->op(std::move(input), obj->output_terminals));
1735 TTG_PROCESS_TT_OP_RETURN(suspended_task_address, task->coroutine_id, baseobj->op(obj->output_terminals));
1736 } else {
1737 ttg::abort();
1738 }
1739 detail::parsec_ttg_caller = nullptr;
1740 }
1741 else { // resume the suspended coroutine
1742
1743#ifdef TTG_HAVE_COROUTINE
1744 assert(task->coroutine_id != ttg::TaskCoroutineID::Invalid);
1745
1746#ifdef TTG_HAVE_DEVICE
1747 if (task->coroutine_id == ttg::TaskCoroutineID::DeviceTask) {
1748 ttg::device::Task coro = ttg::device::detail::device_task_handle_type::from_address(suspended_task_address);
1750 detail::parsec_ttg_caller = static_cast<detail::parsec_ttg_task_base_t*>(task);
1751 // TODO: unify the outputs tls handling
1752 auto old_output_tls_ptr = task->tt->outputs_tls_ptr_accessor();
1753 task->tt->set_outputs_tls_ptr();
1754 coro.resume();
1755 if (coro.completed()) {
1756 coro.destroy();
1757 suspended_task_address = nullptr;
1758 }
1759 task->tt->set_outputs_tls_ptr(old_output_tls_ptr);
1760 detail::parsec_ttg_caller = nullptr;
1761 } else
1762#endif // TTG_HAVE_DEVICE
1763 if (task->coroutine_id == ttg::TaskCoroutineID::ResumableTask) {
1765 assert(ret.ready());
1766 auto old_output_tls_ptr = task->tt->outputs_tls_ptr_accessor();
1767 task->tt->set_outputs_tls_ptr();
1768 ret.resume();
1769 if (ret.completed()) {
1770 ret.destroy();
1771 suspended_task_address = nullptr;
1772 }
1773 else { // not yet completed
1774 // leave suspended_task_address as is
1775
1776 // right now can events are not properly implemented, we are only testing the workflow with dummy events
1777 // so mark the events finished manually, parsec will rerun this task again and it should complete the second time
1778 auto events = static_cast<ttg::resumable_task>(ttg::coroutine_handle<ttg::resumable_task_state>::from_address(suspended_task_address)).events();
1779 for (auto &event_ptr : events) {
1780 event_ptr->finish();
1781 }
1783 }
1784 task->tt->set_outputs_tls_ptr(old_output_tls_ptr);
1785 detail::parsec_ttg_caller = nullptr;
1786 task->suspended_task_address = suspended_task_address;
1787 }
1788 else
1789 ttg::abort(); // unrecognized task id
1790#else // TTG_HAVE_COROUTINE
1791 ttg::abort(); // should not happen
1792#endif // TTG_HAVE_COROUTINE
1793 }
1794#ifdef TTG_HAVE_COROUTINE
1795 task->suspended_task_address = suspended_task_address;
1796#endif // TTG_HAVE_COROUTINE
1797 if (suspended_task_address == nullptr) {
1798 ttT *baseobj = task->tt;
1799 derivedT *obj = static_cast<derivedT *>(baseobj);
1800 if (obj->tracing()) {
1801 if constexpr (!ttg::meta::is_void_v<keyT>)
1802 ttg::trace(obj->get_world().rank(), ":", obj->get_name(), " : ", task->key, ": done executing");
1803 else
1804 ttg::trace(obj->get_world().rank(), ":", obj->get_name(), " : done executing");
1805 }
1806 }
1807
1809 }
1810
1811 static parsec_hook_return_t static_op_noarg(parsec_task_t *parsec_task) {
1812 task_t *task = static_cast<task_t*>(parsec_task);
1813
1814 void* suspended_task_address =
1815#ifdef TTG_HAVE_COROUTINE
1816 task->suspended_task_address; // non-null = need to resume the task
1817#else // TTG_HAVE_COROUTINE
1818 nullptr;
1819#endif // TTG_HAVE_COROUTINE
1820 if (suspended_task_address == nullptr) { // task is a coroutine that has not started or an ordinary function
1821 ttT *baseobj = (ttT *)task->object_ptr;
1822 derivedT *obj = (derivedT *)task->object_ptr;
1825 if constexpr (!ttg::meta::is_void_v<keyT>) {
1826 TTG_PROCESS_TT_OP_RETURN(suspended_task_address, task->coroutine_id, baseobj->op(task->key, obj->output_terminals));
1827 } else if constexpr (ttg::meta::is_void_v<keyT>) {
1828 TTG_PROCESS_TT_OP_RETURN(suspended_task_address, task->coroutine_id, baseobj->op(obj->output_terminals));
1829 } else // unreachable
1830 ttg:: abort();
1832 }
1833 else {
1834#ifdef TTG_HAVE_COROUTINE
1836 assert(ret.ready());
1837 ret.resume();
1838 if (ret.completed()) {
1839 ret.destroy();
1840 suspended_task_address = nullptr;
1841 }
1842 else { // not yet completed
1843 // leave suspended_task_address as is
1844 }
1845#else // TTG_HAVE_COROUTINE
1846 ttg::abort(); // should not happen
1847#endif // TTG_HAVE_COROUTINE
1848 }
1849 task->suspended_task_address = suspended_task_address;
1850
1851 if (suspended_task_address) {
1852 ttg::abort(); // not yet implemented
1853 // see comments in static_op()
1855 }
1856 else
1858 }
1859
1860 template <std::size_t i>
1861 static parsec_hook_return_t static_reducer_op(parsec_execution_stream_s *es, parsec_task_t *parsec_task) {
1862 using rtask_t = detail::reducer_task_t;
1863 using value_t = std::tuple_element_t<i, actual_input_tuple_type>;
1864 constexpr const bool val_is_void = ttg::meta::is_void_v<value_t>;
1865 constexpr const bool input_is_const = std::is_const_v<value_t>;
1866 rtask_t *rtask = (rtask_t*)parsec_task;
1867 task_t *parent_task = static_cast<task_t*>(rtask->parent_task);
1868 ttT *baseobj = parent_task->tt;
1869 derivedT *obj = static_cast<derivedT *>(baseobj);
1870
1871 auto& reducer = std::get<i>(baseobj->input_reducers);
1872
1873 //std::cout << "static_reducer_op " << parent_task->key << std::endl;
1874
1875 if (obj->tracing()) {
1876 if constexpr (!ttg::meta::is_void_v<keyT>)
1877 ttg::trace(obj->get_world().rank(), ":", obj->get_name(), " : ", parent_task->key, ": reducer executing");
1878 else
1879 ttg::trace(obj->get_world().rank(), ":", obj->get_name(), " : reducer executing");
1880 }
1881
1882 /* the copy to reduce into */
1883 detail::ttg_data_copy_t *target_copy;
1884 target_copy = parent_task->copies[i];
1885 assert(val_is_void || nullptr != target_copy);
1886 /* once we hit 0 we have to stop since another thread might enqueue a new reduction task */
1887 std::size_t c = 0;
1888 std::size_t size = 0;
1889 assert(parent_task->streams[i].reduce_count > 0);
1890 if (rtask->is_first) {
1891 if (0 == (parent_task->streams[i].reduce_count.fetch_sub(1, std::memory_order_acq_rel)-1)) {
1892 /* we were the first and there is nothing to be done */
1893 if (obj->tracing()) {
1894 if constexpr (!ttg::meta::is_void_v<keyT>)
1895 ttg::trace(obj->get_world().rank(), ":", obj->get_name(), " : ", parent_task->key, ": first reducer empty");
1896 else
1897 ttg::trace(obj->get_world().rank(), ":", obj->get_name(), " : first reducer empty");
1898 }
1899
1901 }
1902 }
1903
1905 detail::parsec_ttg_caller = rtask->parent_task;
1906
1907 do {
1908 if constexpr(!val_is_void) {
1909 /* the copies to reduce out of */
1910 detail::ttg_data_copy_t *source_copy;
1912 item = parsec_lifo_pop(&parent_task->streams[i].reduce_copies);
1913 if (nullptr == item) {
1914 // maybe someone is changing the goal right now
1915 break;
1916 }
1917 source_copy = ((detail::ttg_data_copy_self_t *)(item))->self;
1918 assert(target_copy->num_readers() == target_copy->mutable_tag);
1919 assert(source_copy->num_readers() > 0);
1920 reducer(*reinterpret_cast<std::decay_t<value_t> *>(target_copy->get_ptr()),
1921 *reinterpret_cast<std::decay_t<value_t> *>(source_copy->get_ptr()));
1922 detail::release_data_copy(source_copy);
1923 } else if constexpr(val_is_void) {
1924 reducer(); // invoke control reducer
1925 }
1926 // there is only one task working on this stream, so no need to be atomic here
1927 size = ++parent_task->streams[i].size;
1928 //std::cout << "static_reducer_op size " << size << " of " << parent_task->streams[i].goal << std::endl;
1929 } while ((c = (parent_task->streams[i].reduce_count.fetch_sub(1, std::memory_order_acq_rel)-1)) > 0);
1930 //} while ((c = (--task->streams[i].reduce_count)) > 0);
1931
1932 /* finalize_argstream sets goal to 1, so size may be larger than goal */
1933 bool complete = (size >= parent_task->streams[i].goal);
1934
1935 //std::cout << "static_reducer_op size " << size
1936 // << " of " << parent_task->streams[i].goal << " complete " << complete
1937 // << " c " << c << std::endl;
1938 if (complete && c == 0) {
1939 if constexpr(input_is_const) {
1940 /* make the consumer task a reader if its input is const */
1941 target_copy->reset_readers();
1942 }
1943 /* task may not be runnable yet because other inputs are missing, have release_task decide */
1944 parent_task->remove_from_hash = true;
1945 parent_task->release_task(parent_task);
1946 }
1947
1949
1950 if (obj->tracing()) {
1951 if constexpr (!ttg::meta::is_void_v<keyT>)
1952 ttg::trace(obj->get_world().rank(), ":", obj->get_name(), " : ", parent_task->key, ": done executing");
1953 else
1954 ttg::trace(obj->get_world().rank(), ":", obj->get_name(), " : done executing");
1955 }
1956
1958 }
1959
1960
1961 protected:
1962 template <typename T>
1965 uint64_t payload_size;
1966 if constexpr (!dd_t::serialize_size_is_const) {
1968 } else {
1969 payload_size = dd_t::payload_size(&obj);
1970 }
1971 pos = dd_t::unpack_payload(&obj, payload_size, pos, _bytes);
1972 return pos;
1973 }
1974
1975 template <typename T>
1976 uint64_t pack(T &obj, void *bytes, uint64_t pos, detail::ttg_data_copy_t *copy = nullptr) {
1978 uint64_t payload_size = dd_t::payload_size(&obj);
1979 if constexpr (!dd_t::serialize_size_is_const) {
1980 pos = ttg::default_data_descriptor<uint64_t>::pack_payload(&payload_size, sizeof(uint64_t), pos, bytes);
1981 }
1982 pos = dd_t::pack_payload(&obj, payload_size, pos, bytes);
1983 return pos;
1984 }
1985
1986 static void static_set_arg(void *data, std::size_t size, ttg::TTBase *bop) {
1987 assert(size >= sizeof(msg_header_t) &&
1988 "Trying to unpack as message that does not hold enough bytes to represent a single header");
1989 msg_header_t *hd = static_cast<msg_header_t *>(data);
1990 derivedT *obj = reinterpret_cast<derivedT *>(bop);
1991 switch (hd->fn_id) {
1993 if (0 <= hd->param_id) {
1994 assert(hd->param_id >= 0);
1995 assert(hd->param_id < obj->set_arg_from_msg_fcts.size());
1996 auto member = obj->set_arg_from_msg_fcts[hd->param_id];
1997 (obj->*member)(data, size);
1998 } else {
1999 // there is no good reason to have negative param ids
2000 ttg::abort();
2001 }
2002 break;
2003 }
2005 assert(hd->param_id >= 0);
2006 assert(hd->param_id < obj->set_argstream_size_from_msg_fcts.size());
2007 auto member = obj->set_argstream_size_from_msg_fcts[hd->param_id];
2008 (obj->*member)(data, size);
2009 break;
2010 }
2012 assert(hd->param_id >= 0);
2013 assert(hd->param_id < obj->finalize_argstream_from_msg_fcts.size());
2014 auto member = obj->finalize_argstream_from_msg_fcts[hd->param_id];
2015 (obj->*member)(data, size);
2016 break;
2017 }
2019 assert(hd->param_id >= 0);
2020 assert(hd->param_id < obj->get_from_pull_msg_fcts.size());
2021 auto member = obj->get_from_pull_msg_fcts[hd->param_id];
2022 (obj->*member)(data, size);
2023 break;
2024 }
2025 default:
2026 ttg::abort();
2027 }
2028 }
2029
2032 auto &world_impl = world.impl();
2033 parsec_execution_stream_s *es = world_impl.execution_stream();
2034 int index = (es->virtual_process->vp_id * es->virtual_process->nb_cores + es->th_id);
2035 return &mempools.thread_mempools[index];
2036 }
2037
2038 template <size_t i, typename valueT>
2040 /* create a dummy task that holds the copy, which can be reused by others */
2041 task_t *dummy;
2042 parsec_execution_stream_s *es = world.impl().execution_stream();
2044 dummy = new (parsec_thread_mempool_allocate(mempool)) task_t(mempool, &this->self, this);
2045 dummy->set_dummy(true);
2046 // TODO: do we need to copy static_stream_goal in dummy?
2047
2048 /* set the received value as the dummy's only data */
2049 dummy->copies[0] = copy;
2050
2051 /* We received the task on this world, so it's using the same taskpool */
2052 dummy->parsec_task.taskpool = world.impl().taskpool();
2053
2054 /* save the current task and set the dummy task */
2057
2058 /* iterate over the keys and have them use the copy we made */
2059 parsec_task_t *task_ring = nullptr;
2060 for (auto &&key : keylist) {
2061 // copy-constructible? can broadcast to any number of keys
2062 if constexpr (std::is_copy_constructible_v<valueT>) {
2063 set_arg_local_impl<i>(key, *reinterpret_cast<valueT *>(copy->get_ptr()), copy, &task_ring);
2064 }
2065 else {
2066 // not copy-constructible? can move, but only to single key
2067 static_assert(!std::is_reference_v<valueT>);
2068 if (std::size(keylist) == 1)
2069 set_arg_local_impl<i>(key, std::move(*reinterpret_cast<valueT *>(copy->get_ptr())), copy, &task_ring);
2070 else {
2071 throw std::logic_error(std::string("TTG::PaRSEC: need to copy a datum of type") + typeid(std::decay_t<valueT>).name() + " but the type is not copyable");
2072 }
2073 }
2074 }
2075
2076 if (nullptr != task_ring) {
2077 auto &world_impl = world.impl();
2079 __parsec_schedule_vp(world_impl.execution_stream(), vp_task_ring, 0);
2080 }
2081
2082 /* restore the previous task */
2084
2085 /* release the dummy task */
2086 complete_task_and_release(es, &dummy->parsec_task);
2088 }
2089
2090 // there are 6 types of set_arg:
2091 // - case 1: nonvoid Key, complete Value type
2092 // - case 2: nonvoid Key, void Value, mixed (data+control) inputs
2093 // - case 3: nonvoid Key, void Value, no inputs
2094 // - case 4: void Key, complete Value type
2095 // - case 5: void Key, void Value, mixed (data+control) inputs
2096 // - case 6: void Key, void Value, no inputs
2097 // implementation of these will be further split into "local-only" and global+local
2098
2099 template <std::size_t i>
2100 void set_arg_from_msg(void *data, std::size_t size) {
2101 using valueT = std::tuple_element_t<i, actual_input_tuple_type>;
2102 using msg_t = detail::msg_t;
2103 msg_t *msg = static_cast<msg_t *>(data);
2104 if constexpr (!ttg::meta::is_void_v<keyT>) {
2105 /* unpack the keys */
2106 /* TODO: can we avoid copying all the keys?! */
2109 std::vector<keyT> keylist;
2110 int num_keys = msg->tt_id.num_keys;
2111 keylist.reserve(num_keys);
2112 auto rank = world.rank();
2113 for (int k = 0; k < num_keys; ++k) {
2114 keyT key;
2115 pos = unpack(key, msg->bytes, pos);
2116 assert(keymap(key) == rank);
2117 keylist.push_back(std::move(key));
2118 }
2119 key_end_pos = pos;
2120 /* jump back to the beginning of the message to get the value */
2121 pos = 0;
2122 // case 1
2123 if constexpr (!ttg::meta::is_void_v<valueT>) {
2124 using decvalueT = std::decay_t<valueT>;
2125 int32_t num_iovecs = msg->tt_id.num_iovecs;
2126 //bool inline_data = msg->inline_data;
2130 using metadata_t = decltype(descr.get_metadata(std::declval<decvalueT>()));
2131
2132 /* unpack the metadata */
2134 pos = unpack(metadata, msg->bytes, pos);
2135
2136 //std::cout << "set_arg_from_msg splitmd num_iovecs " << num_iovecs << std::endl;
2137
2138 copy = detail::create_new_datacopy(descr.create_from_metadata(metadata));
2139 } else if constexpr (!ttg::has_split_metadata<decvalueT>::value) {
2141 /* unpack the object, potentially discovering iovecs */
2142 pos = unpack(*static_cast<decvalueT *>(copy->get_ptr()), msg->bytes, pos);
2143 }
2144
2145 if (num_iovecs == 0) {
2146 set_arg_from_msg_keylist<i, decvalueT>(ttg::span<keyT>(&keylist[0], num_keys), copy);
2147 } else {
2148 /* unpack the header and start the RMA transfers */
2149
2150 /* get the remote rank */
2151 int remote = msg->tt_id.sender;
2152 assert(remote < world.size());
2153
2154 auto &val = *static_cast<decvalueT *>(copy->get_ptr());
2155
2156 bool inline_data = msg->tt_id.inline_data;
2157
2158 int nv = 0;
2160 /* start the RMA transfers */
2161 auto create_activation_fn = [&]() {
2162 /* extract the callback tag */
2163 std::memcpy(&cbtag, msg->bytes + pos, sizeof(cbtag));
2164 pos += sizeof(cbtag);
2165
2166 copy->add_ref(); // so we can safely decrement the readers in the activation
2167 /* create the value from the metadata */
2168 auto activation = new detail::rma_delayed_activate(
2169 std::move(keylist), copy, num_iovecs, [this, &val](std::vector<keyT> &&keylist, detail::ttg_data_copy_t *copy) {
2171 this->world.impl().decrement_inflight_msg();
2173 /* decrement readers we incremented before the transfer */
2174 parsec_atomic_fetch_dec_int32(&data->device_copies[data->owner_device]->readers);
2175 });
2176 copy->drop_ref();
2177 });
2178 return activation;
2179 };
2180 auto read_inline_data = [&](auto&& iovec){
2181 /* unpack the data from the message */
2182 ++nv;
2183 std::memcpy(iovec.data, msg->bytes + pos, iovec.num_bytes);
2184 pos += iovec.num_bytes;
2185 };
2186 auto handle_iovec_fn = [&](auto&& iovec, auto activation) {
2187 using ActivationT = std::decay_t<decltype(*activation)>;
2188
2189 ++nv;
2192 std::memcpy(&rreg_size_i, msg->bytes + pos, sizeof(rreg_size_i));
2193 pos += sizeof(rreg_size_i);
2194 rreg = static_cast<parsec_ce_mem_reg_handle_t>(msg->bytes + pos);
2195 pos += rreg_size_i;
2196 // std::intptr_t *fn_ptr = reinterpret_cast<std::intptr_t *>(msg->bytes + pos);
2197 // pos += sizeof(*fn_ptr);
2198 std::intptr_t fn_ptr;
2199 std::memcpy(&fn_ptr, msg->bytes + pos, sizeof(fn_ptr));
2200 pos += sizeof(fn_ptr);
2201
2202 /* register the local memory */
2204 size_t lreg_size;
2205 parsec_ce.mem_register(iovec.data, PARSEC_MEM_TYPE_NONCONTIGUOUS, iovec.num_bytes, parsec_datatype_int8_t,
2206 iovec.num_bytes, &lreg, &lreg_size);
2207 world.impl().increment_inflight_msg();
2208 /* TODO: PaRSEC should treat the remote callback as a tag, not a function pointer! */
2209 //std::cout << "set_arg_from_msg: get rreg " << rreg << " remote " << remote << std::endl;
2210 parsec_ce.get(&parsec_ce, lreg, 0, rreg, 0, iovec.num_bytes, remote,
2211 &detail::get_complete_cb<ActivationT>, activation,
2212 /*world.impl().parsec_ttg_rma_tag()*/
2213 cbtag, &fn_ptr, sizeof(std::intptr_t));
2214 };
2215 /* make sure all buffers are properly allocated */
2216 ttg::detail::buffer_apply(val, [&]<typename T, typename A>(const ttg::Buffer<T, A>& b){
2217 /* cast away const */
2218 auto& buffer = const_cast<ttg::Buffer<T, A>&>(b);
2219 /* remember which device we used last time */
2220 static auto last_device = ttg::device::Device{0, Space};
2221 ttg::device::Device device;
2222 if (inline_data || !world.impl().mpi_support(Space)) {
2223 device = ttg::device::Device::host(); // have to allocate on host
2224 } else if (!keylist.empty() && devicemap) {
2225 device = devicemap(keylist[0]); // pick a device we know will use the data
2226 } else {
2227 device = last_device; // use the previously used device
2228 }
2229 // remember where we started so we can cycle through all devices once
2230 auto start_device = device;
2231 do {
2232 /* try to allocate on any device */
2233 try {
2234 buffer.allocate_on(device);
2235 buffer.set_owner_device(device);
2236 break;
2237 } catch (const std::bad_alloc&) {
2238 device = device.cycle();
2239 if (device == start_device) {
2240 /* make sure we have memory on the host */
2241 buffer.allocate_on(ttg::device::Device::host());
2242 break; // failed to find a device that works
2243 }
2244 last_device = device;
2245 }
2246 } while(true);
2247 });
2248
2249 /* kick off transfers */
2252 if (inline_data) {
2253 for (auto&& iov : descr.get_data(val)) {
2255 }
2256 } else {
2258 for (auto&& iov : descr.get_data(val)) {
2260 }
2261 }
2262 } else if constexpr (!ttg::has_split_metadata<decvalueT>::value) {
2263 if (inline_data) {
2265 read_inline_data(ttg::iovec{data->nb_elts, data->device_copies[data->owner_device]->device_private});
2266 });
2267 } else {
2270 parsec_atomic_fetch_inc_int32(&data->device_copies[data->owner_device]->readers);
2271 handle_iovec_fn(ttg::iovec{data->nb_elts, data->device_copies[data->owner_device]->device_private}, activation);
2272 });
2273 }
2274 }
2275
2276 assert(num_iovecs == nv);
2277 assert(size == (key_end_pos + sizeof(msg_header_t)));
2278
2279 if (inline_data) {
2280 set_arg_from_msg_keylist<i, decvalueT>(ttg::span<keyT>(&keylist[0], num_keys), copy);
2281 }
2282 }
2283 // case 2 and 3
2284 } else if constexpr (!ttg::meta::is_void_v<keyT> && std::is_void_v<valueT>) {
2285 for (auto &&key : keylist) {
2287 }
2288 }
2289 // case 4
2290 } else if constexpr (ttg::meta::is_void_v<keyT> && !std::is_void_v<valueT>) {
2291 using decvalueT = std::decay_t<valueT>;
2292 decvalueT val;
2293 /* TODO: handle split-metadata case as with non-void keys */
2294 unpack(val, msg->bytes, 0);
2295 set_arg<i, keyT, valueT>(std::move(val));
2296 // case 5 and 6
2297 } else if constexpr (ttg::meta::is_void_v<keyT> && std::is_void_v<valueT>) {
2299 } else { // unreachable
2300 ttg::abort();
2301 }
2302 }
2303
2304 template <std::size_t i>
2305 void finalize_argstream_from_msg(void *data, std::size_t size) {
2306 using msg_t = detail::msg_t;
2307 msg_t *msg = static_cast<msg_t *>(data);
2308 if constexpr (!ttg::meta::is_void_v<keyT>) {
2309 /* unpack the key */
2310 uint64_t pos = 0;
2311 auto rank = world.rank();
2312 keyT key;
2313 pos = unpack(key, msg->bytes, pos);
2314 assert(keymap(key) == rank);
2316 } else {
2317 auto rank = world.rank();
2318 assert(keymap() == rank);
2320 }
2321 }
2322
2323 template <std::size_t i>
2324 void argstream_set_size_from_msg(void *data, std::size_t size) {
2325 using msg_t = detail::msg_t;
2326 auto msg = static_cast<msg_t *>(data);
2327 uint64_t pos = 0;
2328 if constexpr (!ttg::meta::is_void_v<keyT>) {
2329 /* unpack the key */
2330 auto rank = world.rank();
2331 keyT key;
2332 pos = unpack(key, msg->bytes, pos);
2333 assert(keymap(key) == rank);
2334 std::size_t argstream_size;
2335 pos = unpack(argstream_size, msg->bytes, pos);
2337 } else {
2338 auto rank = world.rank();
2339 assert(keymap() == rank);
2340 std::size_t argstream_size;
2341 pos = unpack(argstream_size, msg->bytes, pos);
2343 }
2344 }
2345
2346 template <std::size_t i>
2347 void get_from_pull_msg(void *data, std::size_t size) {
2348 using msg_t = detail::msg_t;
2349 msg_t *msg = static_cast<msg_t *>(data);
2350 auto &in = std::get<i>(input_terminals);
2351 if constexpr (!ttg::meta::is_void_v<keyT>) {
2352 /* unpack the key */
2353 uint64_t pos = 0;
2354 keyT key;
2355 pos = unpack(key, msg->bytes, pos);
2356 set_arg<i>(key, (in.container).get(key));
2357 }
2358 }
2359
2360 template <std::size_t i, typename Key, typename Value>
2361 std::enable_if_t<!ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>, void> set_arg_local(
2362 const Key &key, Value &&value) {
2363 set_arg_local_impl<i>(key, std::forward<Value>(value));
2364 }
2365
2366 template <std::size_t i, typename Key = keyT, typename Value>
2367 std::enable_if_t<ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>, void> set_arg_local(
2368 Value &&value) {
2369 set_arg_local_impl<i>(ttg::Void{}, std::forward<Value>(value));
2370 }
2371
2372 template <std::size_t i, typename Key, typename Value>
2373 std::enable_if_t<!ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>, void> set_arg_local(
2374 const Key &key, const Value &value) {
2375 set_arg_local_impl<i>(key, value);
2376 }
2377
2378 template <std::size_t i, typename Key = keyT, typename Value>
2379 std::enable_if_t<ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>, void> set_arg_local(
2380 const Value &value) {
2382 }
2383
2384 template <std::size_t i, typename Key = keyT, typename Value>
2385 std::enable_if_t<ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>, void> set_arg_local(
2386 std::shared_ptr<const Value> &valueptr) {
2388 }
2389
2390 template <typename Key>
2392 constexpr const bool keyT_is_Void = ttg::meta::is_void_v<keyT>;
2393 auto &world_impl = world.impl();
2394 task_t *newtask;
2397 int32_t priority = 0;
2398 if constexpr (!keyT_is_Void) {
2399 priority = priomap(key);
2400 /* placement-new the task */
2401 newtask = new (taskobj) task_t(key, mempool, &this->self, world_impl.taskpool(), this, priority);
2402 } else {
2403 priority = priomap();
2404 /* placement-new the task */
2405 newtask = new (taskobj) task_t(mempool, &this->self, world_impl.taskpool(), this, priority);
2406 }
2407
2408 for (int i = 0; i < static_stream_goal.size(); ++i) {
2409 newtask->streams[i].goal = static_stream_goal[i];
2410 }
2411
2412 ttg::trace(world.rank(), ":", get_name(), " : ", key, ": creating task");
2413 return newtask;
2414 }
2415
2416
2417 template <std::size_t i>
2419 /* make sure we can reuse the existing memory pool and don't have to create a new one */
2420 static_assert(sizeof(task_t) >= sizeof(detail::reducer_task_t));
2421 constexpr const bool keyT_is_Void = ttg::meta::is_void_v<keyT>;
2422 auto &world_impl = world.impl();
2426 // use the priority of the task we stream into
2427 int32_t priority = 0;
2428 if constexpr (!keyT_is_Void) {
2429 priority = priomap(task->key);
2430 ttg::trace(world.rank(), ":", get_name(), " : ", task->key, ": creating reducer task");
2431 } else {
2432 priority = priomap();
2433 ttg::trace(world.rank(), ":", get_name(), ": creating reducer task");
2434 }
2435 /* placement-new the task */
2436 newtask = new (taskobj) detail::reducer_task_t(task, mempool, inpute_reducers_taskclass[i],
2437 world_impl.taskpool(), priority, is_first);
2438
2439 return newtask;
2440 }
2441
2442
2443 // Used to set the i'th argument
2444 template <std::size_t i, typename Key, typename Value>
2445 void set_arg_local_impl(const Key &key, Value &&value, detail::ttg_data_copy_t *copy_in = nullptr,
2446 parsec_task_t **task_ring = nullptr) {
2447 using valueT = std::tuple_element_t<i, input_values_full_tuple_type>;
2448 constexpr const bool input_is_const = std::is_const_v<std::tuple_element_t<i, input_args_type>>;
2449 constexpr const bool valueT_is_Void = ttg::meta::is_void_v<valueT>;
2450 constexpr const bool keyT_is_Void = ttg::meta::is_void_v<Key>;
2451
2452
2453 ttg::trace(world.rank(), ":", get_name(), " : ", key, ": received value for argument : ", i);
2454
2455 parsec_key_t hk = 0;
2456 if constexpr (!keyT_is_Void) {
2457 hk = reinterpret_cast<parsec_key_t>(&key);
2458 assert(keymap(key) == world.rank());
2459 }
2460
2461 task_t *task;
2462 auto &world_impl = world.impl();
2463 auto &reducer = std::get<i>(input_reducers);
2464 bool release = false;
2465 bool remove_from_hash = true;
2466#if defined(PARSEC_PROF_GRAPHER)
2467 bool discover_task = true;
2468#endif
2469 bool get_pull_data = false;
2470 bool has_lock = false;
2471 /* If we have only one input and no reducer on that input we can skip the hash table */
2472 if (numins > 1 || reducer) {
2473 has_lock = true;
2475 if (nullptr == (task = (task_t *)parsec_hash_table_nolock_find(&tasks_table, hk))) {
2476 task = create_new_task(key);
2477 world_impl.increment_created();
2480 if( world_impl.dag_profiling() ) {
2481#if defined(PARSEC_PROF_GRAPHER)
2482 parsec_prof_grapher_task(&task->parsec_task, world_impl.execution_stream()->th_id, 0,
2483 key_hash(make_key(task->parsec_task.taskpool, task->parsec_task.locals), nullptr));
2484#endif
2485 }
2486 } else if (!reducer && numins == (task->in_data_count + 1)) {
2487 /* remove while we have the lock */
2489 remove_from_hash = false;
2490 }
2491 /* if we have a reducer, we need to hold on to the lock for just a little longer */
2492 if (!reducer) {
2494 has_lock = false;
2495 }
2496 } else {
2497 task = create_new_task(key);
2498 world_impl.increment_created();
2499 remove_from_hash = false;
2500 if( world_impl.dag_profiling() ) {
2501#if defined(PARSEC_PROF_GRAPHER)
2502 parsec_prof_grapher_task(&task->parsec_task, world_impl.execution_stream()->th_id, 0,
2503 key_hash(make_key(task->parsec_task.taskpool, task->parsec_task.locals), nullptr));
2504#endif
2505 }
2506 }
2507
2508 if( world_impl.dag_profiling() ) {
2509#if defined(PARSEC_PROF_GRAPHER)
2511 int orig_index = detail::find_index_of_copy_in_task(detail::parsec_ttg_caller, &value);
2512 char orig_str[32];
2513 char dest_str[32];
2514 if(orig_index >= 0) {
2515 snprintf(orig_str, 32, "%d", orig_index);
2516 } else {
2517 strncpy(orig_str, "_", 32);
2518 }
2519 snprintf(dest_str, 32, "%lu", i);
2520 parsec_flow_t orig{ .name = orig_str, .sym_type = PARSEC_SYM_INOUT, .flow_flags = PARSEC_FLOW_ACCESS_RW,
2521 .flow_index = 0, .flow_datatype_mask = ~0 };
2522 parsec_flow_t dest{ .name = dest_str, .sym_type = PARSEC_SYM_INOUT, .flow_flags = PARSEC_FLOW_ACCESS_RW,
2523 .flow_index = 0, .flow_datatype_mask = ~0 };
2525 }
2526#endif
2527 }
2528
2529 auto get_copy_fn = [&](detail::parsec_ttg_task_base_t *task, auto&& value, bool is_const){
2531 if (nullptr == copy && nullptr != detail::parsec_ttg_caller) {
2532 copy = detail::find_copy_in_task(detail::parsec_ttg_caller, &value);
2533 }
2534 if (nullptr != copy) {
2535 /* retain the data copy */
2536 copy = detail::register_data_copy<valueT>(copy, task, is_const);
2537 } else {
2538 /* create a new copy */
2539 copy = detail::create_new_datacopy(std::forward<Value>(value));
2540 if (!is_const) {
2541 copy->mark_mutable();
2542 }
2543 }
2544 return copy;
2545 };
2546
2547 if (reducer && 1 != task->streams[i].goal) { // is this a streaming input? reduce the received value
2548 auto submit_reducer_task = [&](auto *parent_task){
2549 /* check if we need to create a task */
2550 std::size_t c = parent_task->streams[i].reduce_count.fetch_add(1, std::memory_order_acquire);
2551 //std::cout << "submit_reducer_task " << key << " c " << c << std::endl;
2552 if (0 == c) {
2553 /* we are responsible for creating the reduction task */
2555 reduce_task = create_new_reducer_task<i>(parent_task, false);
2556 reduce_task->release_task(reduce_task); // release immediately
2557 }
2558 };
2559
2560 if constexpr (!ttg::meta::is_void_v<valueT>) { // for data values
2561 // have a value already? if not, set, otherwise reduce
2562 detail::ttg_data_copy_t *copy = nullptr;
2563 if (nullptr == (copy = task->copies[i])) {
2564 using decay_valueT = std::decay_t<valueT>;
2565
2566 /* first input value, create a task and bind it to the copy */
2567 //std::cout << "Creating new reducer task for " << key << std::endl;
2570
2571 /* protected by the bucket lock */
2572 task->streams[i].size = 1;
2573 task->streams[i].reduce_count.store(1, std::memory_order_relaxed);
2574
2575 /* get the copy to use as input for this task */
2576 detail::ttg_data_copy_t *copy = get_copy_fn(reduce_task, std::forward<Value>(value), false);
2577
2578 /* put the copy into the task */
2579 task->copies[i] = copy;
2580
2581 /* release the task if we're not deferred
2582 * TODO: can we delay that until we get the second value?
2583 */
2584 if (copy->get_next_task() != &reduce_task->parsec_task) {
2585 reduce_task->release_task(reduce_task);
2586 }
2587
2588 /* now we can unlock the bucket */
2590 } else {
2591 /* unlock the bucket, the lock is not needed anymore */
2593
2594 /* get the copy to use as input for this task */
2595 detail::ttg_data_copy_t *copy = get_copy_fn(task, std::forward<Value>(value), true);
2596
2597 /* enqueue the data copy to be reduced */
2598 parsec_lifo_push(&task->streams[i].reduce_copies, &copy->super);
2600 }
2601 } else {
2602 /* unlock the bucket, the lock is not needed anymore */
2604 /* submit reducer for void values to handle side effects */
2606 }
2607 //if (release) {
2608 // parsec_hash_table_nolock_remove(&tasks_table, hk);
2609 // remove_from_hash = false;
2610 //}
2611 //parsec_hash_table_unlock_bucket(&tasks_table, hk);
2612 } else {
2613 /* unlock the bucket, the lock is not needed anymore */
2614 if (has_lock) {
2616 }
2617 /* whether the task needs to be deferred or not */
2618 if constexpr (!valueT_is_Void) {
2619 if (nullptr != task->copies[i]) {
2620 ttg::print_error(get_name(), " : ", key, ": error argument is already set : ", i);
2621 throw std::logic_error("bad set arg");
2622 }
2623
2624 /* get the copy to use as input for this task */
2625 detail::ttg_data_copy_t *copy = get_copy_fn(task, std::forward<Value>(value), input_is_const);
2626
2627 /* if we registered as a writer and were the first to register with this copy
2628 * we need to defer the release of this task to give other tasks a chance to
2629 * make a copy of the original data */
2630 release = (copy->get_next_task() != &task->parsec_task);
2631 task->copies[i] = copy;
2632 } else {
2633 release = true;
2634 }
2635 }
2636 task->remove_from_hash = remove_from_hash;
2637 if (release) {
2639 }
2640 /* if not pulling lazily, pull the data here */
2641 if constexpr (!ttg::meta::is_void_v<keyT>) {
2642 if (get_pull_data) {
2643 invoke_pull_terminals(std::make_index_sequence<std::tuple_size_v<input_values_tuple_type>>{}, task->key, task);
2644 }
2645 }
2646 }
2647
2649 bool constrained = false;
2650 if (constraints_check.size() > 0) {
2651 if constexpr (ttg::meta::is_void_v<keyT>) {
2652 constrained = !constraints_check[0]();
2653 } else {
2654 constrained = !constraints_check[0](task->key);
2655 }
2656 }
2657 if (constrained) {
2658 // store the task so we can later access it once it is released
2660 }
2661 return !constrained;
2662 }
2663
2664 template<typename Key = keyT>
2665 std::enable_if_t<ttg::meta::is_void_v<Key>, void> release_constraint(std::size_t cid) {
2666 // check the next constraint, if any
2667 assert(cid < constraints_check.size());
2668 bool release = true;
2669 for (std::size_t i = cid+1; i < constraints_check.size(); i++) {
2670 if (!constraints_check[i]()) {
2671 release = false;
2672 break;
2673 }
2674 }
2675 if (release) {
2676 // no constraint blocked us
2677 task_t *task;
2678 parsec_key_t hk = 0;
2680 assert(task != nullptr);
2681 auto &world_impl = world.impl();
2682 parsec_execution_stream_t *es = world_impl.execution_stream();
2683 parsec_task_t *vp_task_rings[1] = { &task->parsec_task };
2685 }
2686 }
2687
2688 template<typename Key = keyT>
2689 std::enable_if_t<!ttg::meta::is_void_v<Key>, void> release_constraint(std::size_t cid, const std::span<Key>& keys) {
2690 assert(cid < constraints_check.size());
2691 parsec_task_t *task_ring = nullptr;
2692 for (auto& key : keys) {
2693 task_t *task;
2694 bool release = true;
2695 for (std::size_t i = cid+1; i < constraints_check.size(); i++) {
2696 if (!constraints_check[i](key)) {
2697 release = false;
2698 break;
2699 }
2700 }
2701
2702 if (release) {
2703 // no constraint blocked this task, so go ahead and release
2704 auto hk = reinterpret_cast<parsec_key_t>(&key);
2706 assert(task != nullptr);
2707 if (task_ring == nullptr) {
2708 /* the first task is set directly */
2709 task_ring = &task->parsec_task;
2710 } else {
2711 /* push into the ring */
2712 parsec_list_item_ring_push_sorted(&task_ring->super, &task->parsec_task.super,
2714 }
2715 }
2716 }
2717 if (nullptr != task_ring) {
2718 auto &world_impl = world.impl();
2719 parsec_execution_stream_t *es = world_impl.execution_stream();
2722 }
2723 }
2724
2726 parsec_task_t **task_ring = nullptr) {
2727 constexpr const bool keyT_is_Void = ttg::meta::is_void_v<keyT>;
2728
2729 /* if remove_from_hash == false, someone has already removed the task from the hash table
2730 * so we know that the task is ready, no need to do atomic increments here */
2731 bool is_ready = !task->remove_from_hash;
2732 int32_t count;
2733 if (is_ready) {
2734 count = numins;
2735 } else {
2736 count = parsec_atomic_fetch_inc_int32(&task->in_data_count) + 1;
2737 assert(count <= self.dependencies_goal);
2738 }
2739
2740 auto &world_impl = world.impl();
2741 ttT *baseobj = task->tt;
2742
2743 if (count == numins) {
2744 parsec_execution_stream_t *es = world_impl.execution_stream();
2745 parsec_key_t hk = task->pkey();
2746 if (tracing()) {
2747 if constexpr (!keyT_is_Void) {
2748 ttg::trace(world.rank(), ":", get_name(), " : ", task->key, ": submitting task for op ");
2749 } else {
2750 ttg::trace(world.rank(), ":", get_name(), ": submitting task for op ");
2751 }
2752 }
2753 if (task->remove_from_hash) parsec_hash_table_remove(&tasks_table, hk);
2754
2755 if (check_constraints(task)) {
2756 if (nullptr == task_ring) {
2757 parsec_task_t *vp_task_rings[1] = { &task->parsec_task };
2759 } else if (*task_ring == nullptr) {
2760 /* the first task is set directly */
2761 *task_ring = &task->parsec_task;
2762 } else {
2763 /* push into the ring */
2764 parsec_list_item_ring_push_sorted(&(*task_ring)->super, &task->parsec_task.super,
2766 }
2767 }
2768 } else if constexpr (!ttg::meta::is_void_v<keyT>) {
2769 if ((baseobj->num_pullins + count == numins) && baseobj->is_lazy_pull()) {
2770 /* lazily pull the pull terminal data */
2771 baseobj->invoke_pull_terminals(std::make_index_sequence<std::tuple_size_v<input_values_tuple_type>>{}, task->key, task);
2772 }
2773 }
2774 }
2775
2776 // cases 1+2
2777 template <std::size_t i, typename Key, typename Value>
2778 std::enable_if_t<!ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>, void> set_arg(const Key &key,
2779 Value &&value) {
2780 set_arg_impl<i>(key, std::forward<Value>(value));
2781 }
2782
2783 // cases 4+5+6
2784 template <std::size_t i, typename Key, typename Value>
2785 std::enable_if_t<ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>, void> set_arg(Value &&value) {
2786 set_arg_impl<i>(ttg::Void{}, std::forward<Value>(value));
2787 }
2788
2789 template <std::size_t i, typename Key = keyT>
2790 std::enable_if_t<ttg::meta::is_void_v<Key>, void> set_arg() {
2792 }
2793
2794 // case 3
2795 template <std::size_t i, typename Key>
2796 std::enable_if_t<!ttg::meta::is_void_v<Key>, void> set_arg(const Key &key) {
2797 set_arg_impl<i>(key, ttg::Void{});
2798 }
2799
2800 template<typename Value, typename Key>
2801 bool can_inline_data(Value* value_ptr, detail::ttg_data_copy_t *copy, const Key& key, std::size_t num_keys) {
2802 if constexpr (derived_has_device_op()) {
2803 /* don't inline if data is possibly on the device */
2804 return false;
2805 }
2806 /* non-device data */
2807 using decvalueT = std::decay_t<Value>;
2808 bool inline_data = false;
2809 /* check whether to send data in inline */
2810 std::size_t iov_size = 0;
2811 std::size_t metadata_size = 0;
2812 if constexpr (ttg::has_split_metadata<std::decay_t<Value>>::value) {
2814 auto iovs = descr.get_data(*const_cast<decvalueT *>(value_ptr));
2815 iov_size = std::accumulate(iovs.begin(), iovs.end(), 0,
2816 [](std::size_t s, auto& iov){ return s + iov.num_bytes; });
2817 auto metadata = descr.get_metadata(*const_cast<decvalueT *>(value_ptr));
2818 metadata_size = ttg::default_data_descriptor<decltype(metadata)>::payload_size(&metadata);
2819 } else {
2820 /* TODO: how can we query the iovecs of the buffers here without actually packing the data? */
2822 detail::foreach_parsec_data(*value_ptr, [&](parsec_data_t* data){ iov_size += data->nb_elts; });
2823 }
2824 /* key is packed at the end */
2828 inline_data = true;
2829 }
2830 return inline_data;
2831 }
2832
2833 // Used to set the i'th argument
2834 template <std::size_t i, typename Key, typename Value>
2835 void set_arg_impl(const Key &key, Value &&value, detail::ttg_data_copy_t *copy_in = nullptr) {
2836 int owner;
2837 using decvalueT = std::decay_t<Value>;
2838 using norefvalueT = std::remove_reference_t<Value>;
2839 norefvalueT *value_ptr = &value;
2840
2841#if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND)
2842 if(world.impl().profiling()) {
2843 parsec_profiling_ts_trace(world.impl().parsec_ttg_profile_backend_set_arg_start, 0, 0, NULL);
2844 }
2845#endif
2846
2847 if constexpr (!ttg::meta::is_void_v<Key>)
2848 owner = keymap(key);
2849 else
2850 owner = keymap();
2851 if (owner == world.rank()) {
2852 if constexpr (!ttg::meta::is_void_v<keyT>)
2853 set_arg_local_impl<i>(key, std::forward<Value>(value), copy_in);
2854 else
2855 set_arg_local_impl<i>(ttg::Void{}, std::forward<Value>(value), copy_in);
2856#if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND)
2857 if(world.impl().profiling()) {
2858 parsec_profiling_ts_trace(world.impl().parsec_ttg_profile_backend_set_arg_end, 0, 0, NULL);
2859 }
2860#endif
2861 return;
2862 }
2863 // the target task is remote. Pack the information and send it to
2864 // the corresponding peer.
2865 // TODO do we need to copy value?
2866 using msg_t = detail::msg_t;
2867 auto &world_impl = world.impl();
2868 uint64_t pos = 0;
2869 int num_iovecs = 0;
2870 std::unique_ptr<msg_t> msg = std::make_unique<msg_t>(get_instance_id(), world_impl.taskpool()->taskpool_id,
2872
2873 if constexpr (!ttg::meta::is_void_v<decvalueT>) {
2874
2876 /* make sure we have a data copy to register with */
2877 if (nullptr == copy) {
2878 copy = detail::find_copy_in_task(detail::parsec_ttg_caller, value_ptr);
2879 if (nullptr == copy) {
2880 // We need to create a copy for this data, as it does not exist yet.
2881 copy = detail::create_new_datacopy(std::forward<Value>(value));
2882 // use the new value from here on out
2883 value_ptr = static_cast<norefvalueT*>(copy->get_ptr());
2884 }
2885 }
2886
2887 bool inline_data = can_inline_data(value_ptr, copy, key, 1);
2888 msg->tt_id.inline_data = inline_data;
2889
2890 auto write_header_fn = [&]() {
2891 if (!inline_data) {
2892 /* TODO: at the moment, the tag argument to parsec_ce.get() is treated as a
2893 * raw function pointer instead of a preregistered AM tag, so play that game.
2894 * Once this is fixed in PaRSEC we need to use parsec_ttg_rma_tag instead! */
2895 parsec_ce_tag_t cbtag = reinterpret_cast<parsec_ce_tag_t>(&detail::get_remote_complete_cb);
2896 std::memcpy(msg->bytes + pos, &cbtag, sizeof(cbtag));
2897 pos += sizeof(cbtag);
2898 }
2899 };
2900 auto handle_iovec_fn = [&](auto&& iovec, parsec_data_copy_t *device_copy = nullptr) {
2901
2902 if (inline_data) {
2903 /* inline data is packed right after the tt_id in the message */
2904 std::memcpy(msg->bytes + pos, iovec.data, iovec.num_bytes);
2905 pos += iovec.num_bytes;
2906 } else {
2907
2912 copy = detail::register_data_copy<decvalueT>(copy, nullptr, true);
2914 size_t lreg_size;
2915 /* TODO: only register once when we can broadcast the data! */
2916 parsec_ce.mem_register(iovec.data, PARSEC_MEM_TYPE_NONCONTIGUOUS, iovec.num_bytes, parsec_datatype_int8_t,
2917 iovec.num_bytes, &lreg, &lreg_size);
2918 auto lreg_ptr = std::shared_ptr<void>{lreg, [device_copy](void *ptr) {
2920 parsec_ce.mem_unregister(&memreg);
2921 if (device_copy != nullptr) {
2922 /* remove a reader */
2924 }
2925 }};
2927 std::memcpy(msg->bytes + pos, &lreg_size_i, sizeof(lreg_size_i));
2928 pos += sizeof(lreg_size_i);
2929 std::memcpy(msg->bytes + pos, lreg, lreg_size);
2930 pos += lreg_size;
2931 //std::cout << "set_arg_impl lreg " << lreg << std::endl;
2932 /* TODO: can we avoid the extra indirection of going through std::function? */
2933 std::function<void(void)> *fn = new std::function<void(void)>([=]() mutable {
2934 /* shared_ptr of value and registration captured by value so resetting
2935 * them here will eventually release the memory/registration */
2936 lreg_ptr.reset();
2937 detail::release_data_copy(copy);
2938 });
2939 std::intptr_t fn_ptr{reinterpret_cast<std::intptr_t>(fn)};
2940 std::memcpy(msg->bytes + pos, &fn_ptr, sizeof(fn_ptr));
2941 pos += sizeof(fn_ptr);
2942 }
2943 };
2944
2945 if constexpr (ttg::has_split_metadata<std::decay_t<Value>>::value) {
2947 auto iovs = descr.get_data(*const_cast<decvalueT *>(value_ptr));
2948 num_iovecs = std::distance(std::begin(iovs), std::end(iovs));
2949 /* pack the metadata */
2950 auto metadata = descr.get_metadata(*const_cast<decvalueT *>(value_ptr));
2951 pos = pack(metadata, msg->bytes, pos);
2952 //std::cout << "set_arg_impl splitmd num_iovecs " << num_iovecs << std::endl;
2954 for (auto&& iov : iovs) {
2956 }
2957 } else if constexpr (!ttg::has_split_metadata<std::decay_t<Value>>::value) {
2958 /* serialize the object */
2959 pos = pack(*value_ptr, msg->bytes, pos, copy);
2960 detail::foreach_parsec_data(value, [&](parsec_data_t *data){ ++num_iovecs; });
2961 //std::cout << "POST pack num_iovecs " << num_iovecs << std::endl;
2962 /* handle any iovecs contained in it */
2965 int device = 0;
2966 parsec_data_copy_t* device_copy = nullptr;
2967 if (world.impl().mpi_support(Space) && Space != ttg::ExecutionSpace::Host) {
2968 /* Try to find a device that is not the host and has the latest version. */
2969 std::tie(device, device_copy) = detail::find_device_copy(data);
2970 }
2971 handle_iovec_fn(ttg::iovec{data->nb_elts, data->device_copies[device]->device_private},
2972 device_copy);
2973 });
2974 }
2975
2976 msg->tt_id.num_iovecs = num_iovecs;
2977 }
2978
2979 /* pack the key */
2980 msg->tt_id.num_keys = 0;
2981 msg->tt_id.key_offset = pos;
2982 if constexpr (!ttg::meta::is_void_v<Key>) {
2983 size_t tmppos = pack(key, msg->bytes, pos);
2984 pos = tmppos;
2985 msg->tt_id.num_keys = 1;
2986 }
2987
2988 parsec_taskpool_t *tp = world_impl.taskpool();
2989 tp->tdm.module->outgoing_message_start(tp, owner, NULL);
2990 tp->tdm.module->outgoing_message_pack(tp, owner, NULL, NULL, 0);
2991 //std::cout << "set_arg_impl send_am owner " << owner << " sender " << msg->tt_id.sender << std::endl;
2992 parsec_ce.send_am(&parsec_ce, world_impl.parsec_ttg_tag(), owner, static_cast<void *>(msg.get()),
2993 sizeof(msg_header_t) + pos);
2994#if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND)
2995 if(world.impl().profiling()) {
2996 parsec_profiling_ts_trace(world.impl().parsec_ttg_profile_backend_set_arg_end, 0, 0, NULL);
2997 }
2998#endif
2999#if defined(PARSEC_PROF_GRAPHER)
3001 int orig_index = detail::find_index_of_copy_in_task(detail::parsec_ttg_caller, value_ptr);
3002 char orig_str[32];
3003 char dest_str[32];
3004 if(orig_index >= 0) {
3005 snprintf(orig_str, 32, "%d", orig_index);
3006 } else {
3007 strncpy(orig_str, "_", 32);
3008 }
3009 snprintf(dest_str, 32, "%lu", i);
3010 parsec_flow_t orig{ .name = orig_str, .sym_type = PARSEC_SYM_INOUT, .flow_flags = PARSEC_FLOW_ACCESS_RW,
3011 .flow_index = 0, .flow_datatype_mask = ~0 };
3012 parsec_flow_t dest{ .name = dest_str, .sym_type = PARSEC_SYM_INOUT, .flow_flags = PARSEC_FLOW_ACCESS_RW,
3013 .flow_index = 0, .flow_datatype_mask = ~0 };
3014 task_t *task = create_new_task(key);
3016 delete task;
3017 }
3018#endif
3019 }
3020
3021 template <int i, typename Iterator, typename Value>
3022 void broadcast_arg_local(Iterator &&begin, Iterator &&end, const Value &value) {
3023#if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND)
3024 if(world.impl().profiling()) {
3025 parsec_profiling_ts_trace(world.impl().parsec_ttg_profile_backend_bcast_arg_start, 0, 0, NULL);
3026 }
3027#endif
3028 parsec_task_t *task_ring = nullptr;
3029 detail::ttg_data_copy_t *copy = nullptr;
3030 if (nullptr != detail::parsec_ttg_caller) {
3031 copy = detail::find_copy_in_task(detail::parsec_ttg_caller, &value);
3032 }
3033
3034 for (auto it = begin; it != end; ++it) {
3036 }
3037 /* submit all ready tasks at once */
3038 if (nullptr != task_ring) {
3040 __parsec_schedule_vp(world.impl().execution_stream(), vp_task_ring, 0);
3041 }
3042#if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND)
3043 if(world.impl().profiling()) {
3044 parsec_profiling_ts_trace(world.impl().parsec_ttg_profile_backend_set_arg_end, 0, 0, NULL);
3045 }
3046#endif
3047 }
3048
3049 template <std::size_t i, typename Key, typename Value>
3050 std::enable_if_t<!ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>,
3051 void>
3052 broadcast_arg(const ttg::span<const Key> &keylist, const Value &value) {
3053 using valueT = std::tuple_element_t<i, input_values_full_tuple_type>;
3054 auto world = ttg_default_execution_context();
3055 auto np = world.size();
3056 int rank = world.rank();
3057 uint64_t pos = 0;
3058 bool have_remote = keylist.end() != std::find_if(keylist.begin(), keylist.end(),
3059 [&](const Key &key) { return keymap(key) != rank; });
3060
3061 if (have_remote) {
3062 using decvalueT = std::decay_t<Value>;
3063
3064 /* sort the input key list by owner and check whether there are remote keys */
3065 std::vector<Key> keylist_sorted(keylist.begin(), keylist.end());
3066 std::sort(keylist_sorted.begin(), keylist_sorted.end(), [&](const Key &a, const Key &b) mutable {
3067 int rank_a = keymap(a);
3068 int rank_b = keymap(b);
3069 // sort so that the keys for my rank are first, rank+1 next, ..., wrapping around to 0
3070 int pos_a = (rank_a + np - rank) % np;
3071 int pos_b = (rank_b + np - rank) % np;
3072 return pos_a < pos_b;
3073 });
3074
3075 /* Assuming there are no local keys, will be updated while iterating over the keys */
3076 auto local_begin = keylist_sorted.end();
3077 auto local_end = keylist_sorted.end();
3078
3079 int32_t num_iovs = 0;
3080
3082 copy = detail::find_copy_in_task(detail::parsec_ttg_caller, &value);
3083 assert(nullptr != copy);
3084
3085 using msg_t = detail::msg_t;
3086 auto &world_impl = world.impl();
3087 std::unique_ptr<msg_t> msg = std::make_unique<msg_t>(get_instance_id(), world_impl.taskpool()->taskpool_id,
3089
3090 /* check if we inline the data */
3091 /* TODO: this assumes the worst case: that all keys are packed at once (i.e., go to the same remote). Can we do better?*/
3092 bool inline_data = can_inline_data(&value, copy, keylist_sorted[0], keylist_sorted.size());
3093 msg->tt_id.inline_data = inline_data;
3094
3095 std::vector<std::pair<int32_t, std::shared_ptr<void>>> memregs;
3096 auto write_iov_header = [&](){
3097 if (!inline_data) {
3098 /* TODO: at the moment, the tag argument to parsec_ce.get() is treated as a
3099 * raw function pointer instead of a preregistered AM tag, so play that game.
3100 * Once this is fixed in PaRSEC we need to use parsec_ttg_rma_tag instead! */
3101 parsec_ce_tag_t cbtag = reinterpret_cast<parsec_ce_tag_t>(&detail::get_remote_complete_cb);
3102 std::memcpy(msg->bytes + pos, &cbtag, sizeof(cbtag));
3103 pos += sizeof(cbtag);
3104 }
3105 };
3106 auto handle_iov_fn = [&](auto&& iovec, parsec_data_copy_t *device_copy = nullptr){
3107 if (inline_data) {
3108 /* inline data is packed right after the tt_id in the message */
3109 std::memcpy(msg->bytes + pos, iovec.data, iovec.num_bytes);
3110 pos += iovec.num_bytes;
3111 } else {
3113 size_t lreg_size;
3114 parsec_ce.mem_register(iovec.data, PARSEC_MEM_TYPE_NONCONTIGUOUS, iovec.num_bytes, parsec_datatype_int8_t,
3115 iovec.num_bytes, &lreg, &lreg_size);
3116 /* TODO: use a static function for deregistration here? */
3117 memregs.push_back(
3118 std::make_pair(static_cast<int32_t>(lreg_size),
3119 /* TODO: this assumes that parsec_ce_mem_reg_handle_t is void* */
3120 std::shared_ptr<void>{lreg,
3121 [device_copy](void *ptr) {
3124 //std::cout << "broadcast_arg memunreg lreg " << memreg << std::endl;
3125 parsec_ce.mem_unregister(&memreg);
3126 if (device_copy != nullptr) {
3127 /* remove a reader */
3129 }
3130 }})
3131 );
3132 //std::cout << "broadcast_arg memreg lreg " << lreg << std::endl;
3133 }
3134 };
3135
3136 if constexpr (ttg::has_split_metadata<std::decay_t<Value>>::value) {
3138 /* pack the metadata */
3139 auto metadata = descr.get_metadata(value);
3140 pos = pack(metadata, msg->bytes, pos);
3141 auto iovs = descr.get_data(*const_cast<decvalueT *>(&value));
3142 num_iovs = std::distance(std::begin(iovs), std::end(iovs));
3143 memregs.reserve(num_iovs);
3145 for (auto &&iov : iovs) {
3147 }
3148 //std::cout << "broadcast_arg splitmd num_iovecs " << num_iovs << std::endl;
3149 } else if constexpr (!ttg::has_split_metadata<std::decay_t<Value>>::value) {
3150 /* serialize the object once */
3151 pos = pack(value, msg->bytes, pos, copy);
3152 detail::foreach_parsec_data(value, [&](parsec_data_t *data){ ++num_iovs; });
3153 memregs.reserve(num_iovs);
3156 int device = 0;
3157 parsec_data_copy_t* device_copy = nullptr;
3158 if (world.impl().mpi_support(Space) && Space != ttg::ExecutionSpace::Host) {
3159 /* Try to find a device that is not the host and has the latest version. */
3160 std::tie(device, device_copy) = detail::find_device_copy(data);
3161 }
3162 handle_iov_fn(ttg::iovec{data->nb_elts,
3163 data->device_copies[device]->device_private},
3164 device_copy);
3165 });
3166 }
3167
3168 msg->tt_id.num_iovecs = num_iovs;
3169
3170 std::size_t save_pos = pos;
3171
3172 parsec_taskpool_t *tp = world_impl.taskpool();
3173 for (auto it = keylist_sorted.begin(); it < keylist_sorted.end(); /* increment done inline */) {
3174
3175 auto owner = keymap(*it);
3176 if (owner == rank) {
3177 local_begin = it;
3178 /* find first non-local key */
3179 local_end =
3180 std::find_if_not(++it, keylist_sorted.end(), [&](const Key &key) { return keymap(key) == rank; });
3181 it = local_end;
3182 continue;
3183 }
3184
3185 /* rewind the buffer and start packing a new set of memregs and keys */
3186 pos = save_pos;
3192 if (!inline_data) {
3193 for (int idx = 0; idx < num_iovs; ++idx) {
3194 // auto [lreg_size, lreg_ptr] = memregs[idx];
3196 std::shared_ptr<void> lreg_ptr;
3197 std::tie(lreg_size, lreg_ptr) = memregs[idx];
3198 std::memcpy(msg->bytes + pos, &lreg_size, sizeof(lreg_size));
3199 pos += sizeof(lreg_size);
3200 std::memcpy(msg->bytes + pos, lreg_ptr.get(), lreg_size);
3201 pos += lreg_size;
3202 //std::cout << "broadcast_arg lreg_ptr " << lreg_ptr.get() << std::endl;
3203 /* mark another reader on the copy */
3204 copy = detail::register_data_copy<valueT>(copy, nullptr, true);
3205 /* create a function that will be invoked upon RMA completion at the target */
3206 std::function<void(void)> *fn = new std::function<void(void)>([=]() mutable {
3207 /* shared_ptr of value and registration captured by value so resetting
3208 * them here will eventually release the memory/registration */
3209 lreg_ptr.reset();
3210 detail::release_data_copy(copy);
3211 });
3212 std::intptr_t fn_ptr{reinterpret_cast<std::intptr_t>(fn)};
3213 std::memcpy(msg->bytes + pos, &fn_ptr, sizeof(fn_ptr));
3214 pos += sizeof(fn_ptr);
3215 }
3216 }
3217
3218 /* mark the beginning of the keys */
3219 msg->tt_id.key_offset = pos;
3220
3221 /* pack all keys for this owner */
3222 int num_keys = 0;
3223 do {
3224 ++num_keys;
3225 pos = pack(*it, msg->bytes, pos);
3226 ++it;
3227 } while (it < keylist_sorted.end() && keymap(*it) == owner);
3228 msg->tt_id.num_keys = num_keys;
3229
3230 tp->tdm.module->outgoing_message_start(tp, owner, NULL);
3231 tp->tdm.module->outgoing_message_pack(tp, owner, NULL, NULL, 0);
3232 //std::cout << "broadcast_arg send_am owner " << owner << std::endl;
3233 parsec_ce.send_am(&parsec_ce, world_impl.parsec_ttg_tag(), owner, static_cast<void *>(msg.get()),
3234 sizeof(msg_header_t) + pos);
3235 }
3236 /* handle local keys */
3238 } else {
3239 /* handle local keys */
3240 broadcast_arg_local<i>(keylist.begin(), keylist.end(), value);
3241 }
3242 }
3243
3244 // Used by invoke to set all arguments associated with a task
3245 // Is: index sequence of elements in args
3246 // Js: index sequence of input terminals to set
3247 template <typename Key, typename... Ts, size_t... Is, size_t... Js>
3248 std::enable_if_t<ttg::meta::is_none_void_v<Key>, void> set_args(std::index_sequence<Is...>,
3249 std::index_sequence<Js...>, const Key &key,
3250 const std::tuple<Ts...> &args) {
3251 static_assert(sizeof...(Js) == sizeof...(Is));
3252 constexpr size_t js[] = {Js...};
3253 int junk[] = {0, (set_arg<js[Is]>(key, TT::get<Is>(args)), 0)...};
3254 junk[0]++;
3255 }
3256
3257 // Used by invoke to set all arguments associated with a task
3258 // Is: index sequence of input terminals to set
3259 template <typename Key, typename... Ts, size_t... Is>
3260 std::enable_if_t<ttg::meta::is_none_void_v<Key>, void> set_args(std::index_sequence<Is...> is, const Key &key,
3261 const std::tuple<Ts...> &args) {
3262 set_args(std::index_sequence_for<Ts...>{}, is, key, args);
3263 }
3264
3265 // Used by invoke to set all arguments associated with a task
3266 // Is: index sequence of elements in args
3267 // Js: index sequence of input terminals to set
3268 template <typename Key = keyT, typename... Ts, size_t... Is, size_t... Js>
3269 std::enable_if_t<ttg::meta::is_void_v<Key>, void> set_args(std::index_sequence<Is...>, std::index_sequence<Js...>,
3270 const std::tuple<Ts...> &args) {
3271 static_assert(sizeof...(Js) == sizeof...(Is));
3272 constexpr size_t js[] = {Js...};
3273 int junk[] = {0, (set_arg<js[Is], void>(TT::get<Is>(args)), 0)...};
3274 junk[0]++;
3275 }
3276
3277 // Used by invoke to set all arguments associated with a task
3278 // Is: index sequence of input terminals to set
3279 template <typename Key = keyT, typename... Ts, size_t... Is>
3280 std::enable_if_t<ttg::meta::is_void_v<Key>, void> set_args(std::index_sequence<Is...> is,
3281 const std::tuple<Ts...> &args) {
3282 set_args(std::index_sequence_for<Ts...>{}, is, args);
3283 }
3284
3285 public:
3288 template <std::size_t i>
3289 void set_static_argstream_size(std::size_t size) {
3290 assert(std::get<i>(input_reducers) && "TT::set_static_argstream_size called on nonstreaming input terminal");
3291 assert(size > 0 && "TT::set_static_argstream_size(key,size) called with size=0");
3292
3293 this->trace(world.rank(), ":", get_name(), ": setting global stream size for terminal ", i);
3294
3295 // Check if stream is already bounded
3296 if (static_stream_goal[i] < std::numeric_limits<std::size_t>::max()) {
3297 ttg::print_error(world.rank(), ":", get_name(), " : error stream is already bounded : ", i);
3298 throw std::runtime_error("TT::set_static_argstream_size called for a bounded stream");
3299 }
3300
3301 static_stream_goal[i] = size;
3302 }
3303
3307 template <std::size_t i, typename Key>
3308 std::enable_if_t<!ttg::meta::is_void_v<Key>, void> set_argstream_size(const Key &key, std::size_t size) {
3309 // preconditions
3310 assert(std::get<i>(input_reducers) && "TT::set_argstream_size called on nonstreaming input terminal");
3311 assert(size > 0 && "TT::set_argstream_size(key,size) called with size=0");
3312
3313 // body
3314 const auto owner = keymap(key);
3315 if (owner != world.rank()) {
3316 ttg::trace(world.rank(), ":", get_name(), ":", key, " : forwarding stream size for terminal ", i);
3317 using msg_t = detail::msg_t;
3318 auto &world_impl = world.impl();
3319 uint64_t pos = 0;
3320 std::unique_ptr<msg_t> msg = std::make_unique<msg_t>(get_instance_id(), world_impl.taskpool()->taskpool_id,
3322 world_impl.rank(), 1);
3323 /* pack the key */
3324 pos = pack(key, msg->bytes, pos);
3325 pos = pack(size, msg->bytes, pos);
3326 parsec_taskpool_t *tp = world_impl.taskpool();
3327 tp->tdm.module->outgoing_message_start(tp, owner, NULL);
3328 tp->tdm.module->outgoing_message_pack(tp, owner, NULL, NULL, 0);
3329 parsec_ce.send_am(&parsec_ce, world_impl.parsec_ttg_tag(), owner, static_cast<void *>(msg.get()),
3330 sizeof(msg_header_t) + pos);
3331 } else {
3332 ttg::trace(world.rank(), ":", get_name(), ":", key, " : setting stream size to ", size, " for terminal ", i);
3333
3334 auto hk = reinterpret_cast<parsec_key_t>(&key);
3335 task_t *task;
3337 if (nullptr == (task = (task_t *)parsec_hash_table_nolock_find(&tasks_table, hk))) {
3338 task = create_new_task(key);
3339 world.impl().increment_created();
3341 if( world.impl().dag_profiling() ) {
3342#if defined(PARSEC_PROF_GRAPHER)
3343 parsec_prof_grapher_task(&task->parsec_task, world.impl().execution_stream()->th_id, 0, *(uintptr_t*)&(task->parsec_task.locals[0]));
3344#endif
3345 }
3346 }
3348
3349 // TODO: Unfriendly implementation, cannot check if stream is already bounded
3350 // TODO: Unfriendly implementation, cannot check if stream has been finalized already
3351
3352 // commit changes
3353 // 1) "lock" the stream by incrementing the reduce_count
3354 // 2) set the goal
3355 // 3) "unlock" the stream
3356 // only one thread will see the reduce_count be zero and the goal match the size
3357 task->streams[i].reduce_count.fetch_add(1, std::memory_order_acquire);
3358 task->streams[i].goal = size;
3359 auto c = task->streams[i].reduce_count.fetch_sub(1, std::memory_order_release);
3360 if (1 == c && (task->streams[i].size >= size)) {
3362 }
3363 }
3364 }
3365
3368 template <std::size_t i, typename Key = keyT>
3369 std::enable_if_t<ttg::meta::is_void_v<Key>, void> set_argstream_size(std::size_t size) {
3370 // preconditions
3371 assert(std::get<i>(input_reducers) && "TT::set_argstream_size called on nonstreaming input terminal");
3372 assert(size > 0 && "TT::set_argstream_size(key,size) called with size=0");
3373
3374 // body
3375 const auto owner = keymap();
3376 if (owner != world.rank()) {
3377 ttg::trace(world.rank(), ":", get_name(), " : forwarding stream size for terminal ", i);
3378 using msg_t = detail::msg_t;
3379 auto &world_impl = world.impl();
3380 uint64_t pos = 0;
3381 std::unique_ptr<msg_t> msg = std::make_unique<msg_t>(get_instance_id(), world_impl.taskpool()->taskpool_id,
3383 world_impl.rank(), 0);
3384 pos = pack(size, msg->bytes, pos);
3385 parsec_taskpool_t *tp = world_impl.taskpool();
3386 tp->tdm.module->outgoing_message_start(tp, owner, NULL);
3387 tp->tdm.module->outgoing_message_pack(tp, owner, NULL, NULL, 0);
3388 parsec_ce.send_am(&parsec_ce, world_impl.parsec_ttg_tag(), owner, static_cast<void *>(msg.get()),
3389 sizeof(msg_header_t) + pos);
3390 } else {
3391 ttg::trace(world.rank(), ":", get_name(), " : setting stream size to ", size, " for terminal ", i);
3392
3393 parsec_key_t hk = 0;
3394 task_t *task;
3396 if (nullptr == (task = (task_t *)parsec_hash_table_nolock_find(&tasks_table, hk))) {
3398 world.impl().increment_created();
3400 if( world.impl().dag_profiling() ) {
3401#if defined(PARSEC_PROF_GRAPHER)
3402 parsec_prof_grapher_task(&task->parsec_task, world.impl().execution_stream()->th_id, 0, *(uintptr_t*)&(task->parsec_task.locals[0]));
3403#endif
3404 }
3405 }
3407
3408 // TODO: Unfriendly implementation, cannot check if stream is already bounded
3409 // TODO: Unfriendly implementation, cannot check if stream has been finalized already
3410
3411 // commit changes
3412 // 1) "lock" the stream by incrementing the reduce_count
3413 // 2) set the goal
3414 // 3) "unlock" the stream
3415 // only one thread will see the reduce_count be zero and the goal match the size
3416 task->streams[i].reduce_count.fetch_add(1, std::memory_order_acquire);
3417 task->streams[i].goal = size;
3418 auto c = task->streams[i].reduce_count.fetch_sub(1, std::memory_order_release);
3419 if (1 == c && (task->streams[i].size >= size)) {
3421 }
3422 }
3423 }
3424
3426 template <std::size_t i, typename Key>
3427 std::enable_if_t<!ttg::meta::is_void_v<Key>, void> finalize_argstream(const Key &key) {
3428 // preconditions
3429 assert(std::get<i>(input_reducers) && "TT::finalize_argstream called on nonstreaming input terminal");
3430
3431 // body
3432 const auto owner = keymap(key);
3433 if (owner != world.rank()) {
3434 ttg::trace(world.rank(), ":", get_name(), " : ", key, ": forwarding stream finalize for terminal ", i);
3435 using msg_t = detail::msg_t;
3436 auto &world_impl = world.impl();
3437 uint64_t pos = 0;
3438 std::unique_ptr<msg_t> msg = std::make_unique<msg_t>(get_instance_id(), world_impl.taskpool()->taskpool_id,
3440 world_impl.rank(), 1);
3441 /* pack the key */
3442 pos = pack(key, msg->bytes, pos);
3443 parsec_taskpool_t *tp = world_impl.taskpool();
3444 tp->tdm.module->outgoing_message_start(tp, owner, NULL);
3445 tp->tdm.module->outgoing_message_pack(tp, owner, NULL, NULL, 0);
3446 parsec_ce.send_am(&parsec_ce, world_impl.parsec_ttg_tag(), owner, static_cast<void *>(msg.get()),
3447 sizeof(msg_header_t) + pos);
3448 } else {
3449 ttg::trace(world.rank(), ":", get_name(), " : ", key, ": finalizing stream for terminal ", i);
3450
3451 auto hk = reinterpret_cast<parsec_key_t>(&key);
3452 task_t *task = nullptr;
3453 //parsec_hash_table_lock_bucket(&tasks_table, hk);
3454 if (nullptr == (task = (task_t *)parsec_hash_table_find(&tasks_table, hk))) {
3455 ttg::print_error(world.rank(), ":", get_name(), ":", key,
3456 " : error finalize called on stream that never received an input data: ", i);
3457 throw std::runtime_error("TT::finalize called on stream that never received an input data");
3458 }
3459
3460 // TODO: Unfriendly implementation, cannot check if stream is already bounded
3461 // TODO: Unfriendly implementation, cannot check if stream has been finalized already
3462
3463 // commit changes
3464 // 1) "lock" the stream by incrementing the reduce_count
3465 // 2) set the goal
3466 // 3) "unlock" the stream
3467 // only one thread will see the reduce_count be zero and the goal match the size
3468 task->streams[i].reduce_count.fetch_add(1, std::memory_order_acquire);
3469 task->streams[i].goal = 1;
3470 auto c = task->streams[i].reduce_count.fetch_sub(1, std::memory_order_release);
3471 if (1 == c && (task->streams[i].size >= 1)) {
3473 }
3474 }
3475 }
3476
3478 template <std::size_t i, bool key_is_void = ttg::meta::is_void_v<keyT>>
3479 std::enable_if_t<key_is_void, void> finalize_argstream() {
3480 // preconditions
3481 assert(std::get<i>(input_reducers) && "TT::finalize_argstream called on nonstreaming input terminal");
3482
3483 // body
3484 const auto owner = keymap();
3485 if (owner != world.rank()) {
3486 ttg::trace(world.rank(), ":", get_name(), ": forwarding stream finalize for terminal ", i);
3487 using msg_t = detail::msg_t;
3488 auto &world_impl = world.impl();
3489 uint64_t pos = 0;
3490 std::unique_ptr<msg_t> msg = std::make_unique<msg_t>(get_instance_id(), world_impl.taskpool()->taskpool_id,
3492 world_impl.rank(), 0);
3493 parsec_taskpool_t *tp = world_impl.taskpool();
3494 tp->tdm.module->outgoing_message_start(tp, owner, NULL);
3495 tp->tdm.module->outgoing_message_pack(tp, owner, NULL, NULL, 0);
3496 parsec_ce.send_am(&parsec_ce, world_impl.parsec_ttg_tag(), owner, static_cast<void *>(msg.get()),
3497 sizeof(msg_header_t) + pos);
3498 } else {
3499 ttg::trace(world.rank(), ":", get_name(), ": finalizing stream for terminal ", i);
3500
3501 auto hk = static_cast<parsec_key_t>(0);
3502 task_t *task = nullptr;
3503 if (nullptr == (task = (task_t *)parsec_hash_table_find(&tasks_table, hk))) {
3504 ttg::print_error(world.rank(), ":", get_name(),
3505 " : error finalize called on stream that never received an input data: ", i);
3506 throw std::runtime_error("TT::finalize called on stream that never received an input data");
3507 }
3508
3509 // TODO: Unfriendly implementation, cannot check if stream is already bounded
3510 // TODO: Unfriendly implementation, cannot check if stream has been finalized already
3511
3512 // commit changes
3513 // 1) "lock" the stream by incrementing the reduce_count
3514 // 2) set the goal
3515 // 3) "unlock" the stream
3516 // only one thread will see the reduce_count be zero and the goal match the size
3517 task->streams[i].reduce_count.fetch_add(1, std::memory_order_acquire);
3518 task->streams[i].goal = 1;
3519 auto c = task->streams[i].reduce_count.fetch_sub(1, std::memory_order_release);
3520 if (1 == c && (task->streams[i].size >= 1)) {
3522 }
3523 }
3524 }
3525
3526 template<typename Value>
3527 void copy_mark_pushout(const Value& value) {
3528
3529 assert(detail::parsec_ttg_caller->dev_ptr && detail::parsec_ttg_caller->dev_ptr->gpu_task);
3531 auto check_parsec_data = [&](parsec_data_t* data) {
3532 if (data->owner_device != 0) {
3533 /* find the flow */
3534 int flowidx = 0;
3535 while (flowidx < MAX_PARAM_COUNT &&
3536 gpu_task->flow[flowidx] != nullptr &&
3537 gpu_task->flow[flowidx]->flow_flags != PARSEC_FLOW_ACCESS_NONE) {
3538 if (detail::parsec_ttg_caller->parsec_task.data[flowidx].data_in->original == data) {
3539 /* found the right data, set the corresponding flow as pushout */
3540 break;
3541 }
3542 ++flowidx;
3543 }
3544 if (flowidx == MAX_PARAM_COUNT) {
3545 throw std::runtime_error("Cannot add more than MAX_PARAM_COUNT flows to a task!");
3546 }
3547 if (gpu_task->flow[flowidx]->flow_flags == PARSEC_FLOW_ACCESS_NONE) {
3548 /* no flow found, add one and mark it pushout */
3549 detail::parsec_ttg_caller->parsec_task.data[flowidx].data_in = data->device_copies[0];
3550 detail::parsec_ttg_caller->parsec_task.data[flowidx].data_out = data->device_copies[data->owner_device];
3551 gpu_task->flow_nb_elts[flowidx] = data->nb_elts;
3552 }
3553 /* need to mark the flow WRITE, otherwise PaRSEC will not do the pushout */
3554 ((parsec_flow_t *)gpu_task->flow[flowidx])->flow_flags |= PARSEC_FLOW_ACCESS_WRITE;
3555 gpu_task->pushout |= 1<<flowidx;
3556 }
3557 };
3559 [&](parsec_data_t* data){
3560 check_parsec_data(data);
3561 });
3562 }
3563
3564
3565 /* check whether a data needs to be pushed out */
3566 template <std::size_t i, typename Value, typename RemoteCheckFn>
3567 std::enable_if_t<!std::is_void_v<std::decay_t<Value>>,
3568 void>
3570 constexpr const bool value_is_const = std::is_const_v<std::tuple_element_t<i, input_args_type>>;
3571
3572 /* get the copy */
3574 copy = detail::find_copy_in_task(detail::parsec_ttg_caller, &value);
3575
3576 /* if there is no copy we don't need to prepare anything */
3577 if (nullptr == copy) {
3578 return;
3579 }
3580
3582 bool need_pushout = false;
3583
3585 /* already marked pushout, skip the rest */
3586 return;
3587 }
3588
3589 /* TODO: remove this once we support reductions on the GPU */
3590 auto &reducer = std::get<i>(input_reducers);
3591 if (reducer) {
3592 /* reductions are currently done only on the host so push out */
3593 copy_mark_pushout(value);
3595 return;
3596 }
3597
3598 if constexpr (value_is_const) {
3600 /* The data has been modified previously. If not all devices can access
3601 * their peers then we need to push out to the host so that all devices
3602 * have the data available for reading.
3603 * NOTE: we currently don't allow users to force the next writer to be
3604 * on a different device. In that case PaRSEC would take the host-side
3605 * copy. If we change our restriction we need to revisit this.
3606 * Ideally, PaRSEC would take the device copy if the owner moves... */
3608 }
3609
3610 /* check for multiple readers */
3613 }
3614
3616 /* there is a writer already, we will need to create a copy */
3617 need_pushout = true;
3618 }
3619
3621 } else {
3624 need_pushout = true;
3625 } else {
3627 /* there are readers, we will need to create a copy */
3628 need_pushout = true;
3629 }
3631 }
3632 }
3633
3634 if constexpr (!derived_has_device_op()) {
3635 need_pushout = true;
3636 }
3637
3638 /* check if there are non-local successors if it's a device task */
3639 if (!need_pushout) {
3640 bool device_supported = world.impl().mpi_support(Space);
3641 /* if MPI supports the device we don't care whether we have remote peers
3642 * because we can send from the device directly */
3643 if (!device_supported) {
3645 }
3646 }
3647
3648 if (need_pushout) {
3649 copy_mark_pushout(value);
3651 }
3652 }
3653
3654 /* check whether a data needs to be pushed out */
3655 template <std::size_t i, typename Key, typename Value>
3656 std::enable_if_t<!ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>,
3657 void>
3658 prepare_send(const ttg::span<const Key> &keylist, const Value &value) {
3659 auto remote_check = [&](){
3660 auto world = ttg_default_execution_context();
3661 int rank = world.rank();
3662 bool remote = keylist.end() != std::find_if(keylist.begin(), keylist.end(),
3663 [&](const Key &key) { return keymap(key) != rank; });
3664 return remote;
3665 };
3667 }
3668
3669 template <std::size_t i, typename Key, typename Value>
3670 std::enable_if_t<ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>,
3671 void>
3672 prepare_send(const Value &value) {
3673 auto remote_check = [&](){
3674 auto world = ttg_default_execution_context();
3675 int rank = world.rank();
3676 return (keymap() != rank);
3677 };
3679 }
3680
3681 private:
3682 // Copy/assign/move forbidden ... we could make it work using
3683 // PIMPL for this base class. However, this instance of the base
3684 // class is tied to a specific instance of a derived class a
3685 // pointer to which is captured for invoking derived class
3686 // functions. Thus, not only does the derived class has to be
3687 // involved but we would have to do it in a thread safe way
3688 // including for possibly already running tasks and remote
3689 // references. This is not worth the effort ... wherever you are
3690 // wanting to move/assign an TT you should be using a pointer.
3691 TT(const TT &other) = delete;
3692 TT &operator=(const TT &other) = delete;
3693 TT(TT &&other) = delete;
3694 TT &operator=(TT &&other) = delete;
3695
3696 // Registers the callback for the i'th input terminal
3697 template <typename terminalT, std::size_t i>
3698 void register_input_callback(terminalT &input) {
3699 using valueT = std::decay_t<typename terminalT::value_type>;
3700 if (input.is_pull_terminal) {
3701 num_pullins++;
3702 }
3704 // case 1: nonvoid key, nonvoid value
3706 if constexpr (!ttg::meta::is_void_v<keyT> && !std::is_void_v<valueT>) {
3707 auto move_callback = [this](const keyT &key, valueT &&value) {
3708 set_arg<i, keyT, valueT>(key, std::forward<valueT>(value));
3709 };
3710 auto send_callback = [this](const keyT &key, const valueT &value) {
3712 };
3713 auto broadcast_callback = [this](const ttg::span<const keyT> &keylist, const valueT &value) {
3715 };
3716 auto prepare_send_callback = [this](const ttg::span<const keyT> &keylist, const valueT &value) {
3718 };
3719 auto setsize_callback = [this](const keyT &key, std::size_t size) { set_argstream_size<i>(key, size); };
3720 auto finalize_callback = [this](const keyT &key) { finalize_argstream<i>(key); };
3721 input.set_callback(send_callback, move_callback, broadcast_callback,
3722 setsize_callback, finalize_callback, prepare_send_callback);
3723 }
3725 // case 2: nonvoid key, void value, mixed inputs
3727 else if constexpr (!ttg::meta::is_void_v<keyT> && std::is_void_v<valueT>) {
3728 auto send_callback = [this](const keyT &key) { set_arg<i, keyT, ttg::Void>(key, ttg::Void{}); };
3729 auto setsize_callback = [this](const keyT &key, std::size_t size) { set_argstream_size<i>(key, size); };
3730 auto finalize_callback = [this](const keyT &key) { finalize_argstream<i>(key); };
3731 input.set_callback(send_callback, send_callback, {}, setsize_callback, finalize_callback);
3732 }
3734 // case 3: nonvoid key, void value, no inputs
3735 // NOTE: subsumed in case 2 above, kept for historical reasons
3738 // case 4: void key, nonvoid value
3740 else if constexpr (ttg::meta::is_void_v<keyT> && !std::is_void_v<valueT>) {
3741 auto move_callback = [this](valueT &&value) { set_arg<i, keyT, valueT>(std::forward<valueT>(value)); };
3742 auto send_callback = [this](const valueT &value) {
3743 if constexpr (std::is_copy_constructible_v<valueT>) {
3745 }
3746 else {
3747 throw std::logic_error(std::string("TTG::PaRSEC: send_callback is invoked on datum of type ") + typeid(std::decay_t<valueT>).name() + " which is not copy constructible, std::move datum into send/broadcast statement");
3748 }
3749 };
3750 auto setsize_callback = [this](std::size_t size) { set_argstream_size<i>(size); };
3751 auto finalize_callback = [this]() { finalize_argstream<i>(); };
3752 auto prepare_send_callback = [this](const valueT &value) {
3753 prepare_send<i, void>(value);
3754 };
3755 input.set_callback(send_callback, move_callback, {}, setsize_callback, finalize_callback, prepare_send_callback);
3756 }
3758 // case 5: void key, void value, mixed inputs
3760 else if constexpr (ttg::meta::is_void_v<keyT> && std::is_void_v<valueT>) {
3761 auto send_callback = [this]() { set_arg<i, keyT, ttg::Void>(ttg::Void{}); };
3762 auto setsize_callback = [this](std::size_t size) { set_argstream_size<i>(size); };
3763 auto finalize_callback = [this]() { finalize_argstream<i>(); };
3764 input.set_callback(send_callback, send_callback, {}, setsize_callback, finalize_callback);
3765 }
3767 // case 6: void key, void value, no inputs
3768 // NOTE: subsumed in case 5 above, kept for historical reasons
3770 else
3771 ttg::abort();
3772 }
3773
3774 template <std::size_t... IS>
3775 void register_input_callbacks(std::index_sequence<IS...>) {
3776 int junk[] = {
3777 0,
3779 0)...};
3780 junk[0]++;
3781 }
3782
3783 template <std::size_t... IS, typename inedgesT>
3784 void connect_my_inputs_to_incoming_edge_outputs(std::index_sequence<IS...>, inedgesT &inedges) {
3785 int junk[] = {0, (std::get<IS>(inedges).set_out(&std::get<IS>(input_terminals)), 0)...};
3786 junk[0]++;
3787 }
3788
3789 template <std::size_t... IS, typename outedgesT>
3790 void connect_my_outputs_to_outgoing_edge_inputs(std::index_sequence<IS...>, outedgesT &outedges) {
3791 int junk[] = {0, (std::get<IS>(outedges).set_in(&std::get<IS>(output_terminals)), 0)...};
3792 junk[0]++;
3793 }
3794
3795#if 0
3796 template <typename input_terminals_tupleT, std::size_t... IS, typename flowsT>
3797 void _initialize_flows(std::index_sequence<IS...>, flowsT &&flows) {
3798 int junk[] = {0,
3799 (*(const_cast<std::remove_const_t<decltype(flows[IS]->flow_flags)> *>(&(flows[IS]->flow_flags))) =
3800 (std::is_const_v<std::tuple_element_t<IS, input_terminals_tupleT>> ? PARSEC_FLOW_ACCESS_READ
3802 0)...};
3803 junk[0]++;
3804 }
3805
3806 template <typename input_terminals_tupleT, typename flowsT>
3807 void initialize_flows(flowsT &&flows) {
3809 std::make_index_sequence<std::tuple_size<input_terminals_tupleT>::value>{}, flows);
3810 }
3811#endif // 0
3812
3813 void fence() override { ttg::default_execution_context().impl().fence(); }
3814
3815 static int key_equal(parsec_key_t a, parsec_key_t b, void *user_data) {
3816 if constexpr (std::is_same_v<keyT, void>) {
3817 return 1;
3818 } else {
3819 keyT &ka = *(reinterpret_cast<keyT *>(a));
3820 keyT &kb = *(reinterpret_cast<keyT *>(b));
3821 return ka == kb;
3822 }
3823 }
3824
3825 static uint64_t key_hash(parsec_key_t k, void *user_data) {
3826 constexpr const bool keyT_is_Void = ttg::meta::is_void_v<keyT>;
3827 if constexpr (keyT_is_Void || std::is_same_v<keyT, void>) {
3828 return 0;
3829 } else {
3830 keyT &kk = *(reinterpret_cast<keyT *>(k));
3831 using ttg::hash;
3832 uint64_t hv = hash<std::decay_t<decltype(kk)>>{}(kk);
3833 return hv;
3834 }
3835 }
3836
3837 static char *key_print(char *buffer, size_t buffer_size, parsec_key_t k, void *user_data) {
3838 if constexpr (std::is_same_v<keyT, void>) {
3839 buffer[0] = '\0';
3840 return buffer;
3841 } else {
3842 keyT kk = *(reinterpret_cast<keyT *>(k));
3843 std::stringstream iss;
3844 iss << kk;
3846 iss.get(buffer, buffer_size);
3847 return buffer;
3848 }
3849 }
3850
3851 static parsec_key_t make_key(const parsec_taskpool_t *tp, const parsec_assignment_t *as) {
3852 // we use the parsec_assignment_t array as a scratchpad to store the hash and address of the key
3853 keyT *key = *(keyT**)&(as[2]);
3854 return reinterpret_cast<parsec_key_t>(key);
3855 }
3856
3857 static char *parsec_ttg_task_snprintf(char *buffer, size_t buffer_size, const parsec_task_t *parsec_task) {
3858 if(buffer_size == 0)
3859 return buffer;
3860
3861 if constexpr (ttg::meta::is_void_v<keyT>) {
3862 snprintf(buffer, buffer_size, "%s()[]<%d>", parsec_task->task_class->name, parsec_task->priority);
3863 } else {
3864 const task_t *task = reinterpret_cast<const task_t*>(parsec_task);
3865 std::stringstream ss;
3866 ss << task->key;
3867
3868 std::string keystr = ss.str();
3869 std::replace(keystr.begin(), keystr.end(), '(', ':');
3870 std::replace(keystr.begin(), keystr.end(), ')', ':');
3871
3872 snprintf(buffer, buffer_size, "%s(%s)[]<%d>", parsec_task->task_class->name, keystr.c_str(), parsec_task->priority);
3873 }
3874 return buffer;
3875 }
3876
3877#if defined(PARSEC_PROF_TRACE)
3878 static void *parsec_ttg_task_info(void *dst, const void *data, size_t size)
3879 {
3880 const task_t *task = reinterpret_cast<const task_t *>(data);
3881
3882 if constexpr (ttg::meta::is_void_v<keyT>) {
3883 snprintf(reinterpret_cast<char*>(dst), size, "()");
3884 } else {
3885 std::stringstream ss;
3886 ss << task->key;
3887 snprintf(reinterpret_cast<char*>(dst), size, "%s", ss.str().c_str());
3888 }
3889 return dst;
3890 }
3891#endif
3892
3893 parsec_key_fn_t tasks_hash_fcts = {key_equal, key_print, key_hash};
3894
3895 static parsec_hook_return_t complete_task_and_release(parsec_execution_stream_t *es, parsec_task_t *parsec_task) {
3896
3897 //std::cout << "complete_task_and_release: task " << parsec_task << std::endl;
3898
3899 task_t *task = (task_t*)parsec_task;
3900
3901#ifdef TTG_HAVE_COROUTINE
3902 /* if we still have a coroutine handle we invoke it one more time to get the sends/broadcasts */
3903 if (task->suspended_task_address) {
3904 assert(task->coroutine_id != ttg::TaskCoroutineID::Invalid);
3905#ifdef TTG_HAVE_DEVICE
3906 if (task->coroutine_id == ttg::TaskCoroutineID::DeviceTask) {
3907 // get the device task from the coroutine handle
3908 auto dev_task = ttg::device::detail::device_task_handle_type::from_address(task->suspended_task_address);
3909
3910 // get the promise which contains the views
3911 auto dev_data = dev_task.promise();
3912
3913 assert(dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_SENDOUT ||
3914 dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_COMPLETE);
3915
3916 /* execute the sends we stored */
3917 if (dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_SENDOUT) {
3918 /* set the current task, needed inside the sends */
3920 auto old_output_tls_ptr = task->tt->outputs_tls_ptr_accessor();
3921 task->tt->set_outputs_tls_ptr();
3922 dev_data.do_sends(); // all sends happen here
3923 task->tt->set_outputs_tls_ptr(old_output_tls_ptr);
3924 detail::parsec_ttg_caller = nullptr;
3925 }
3926 dev_task.destroy(); // safe to destroy the coroutine now
3927 }
3928#endif // TTG_HAVE_DEVICE
3929 /* the coroutine should have completed and we cannot access the promise anymore */
3930 task->suspended_task_address = nullptr;
3931 }
3932#endif // TTG_HAVE_COROUTINE
3933
3934 /* release our data copies */
3935 for (int i = 0; i < task->data_count; i++) {
3936 detail::ttg_data_copy_t *copy = task->copies[i];
3937 if (nullptr == copy) continue;
3938 detail::release_data_copy(copy);
3939 task->copies[i] = nullptr;
3940 }
3941
3942 for (auto& c : task->tt->constraints_complete) {
3943 if constexpr(std::is_void_v<keyT>) {
3944 c();
3945 } else {
3946 c(task->key);
3947 }
3948 }
3950 }
3951
3952 public:
3953 template <typename keymapT = ttg::detail::default_keymap<keyT>,
3954 typename priomapT = ttg::detail::default_priomap<keyT>>
3955 TT(const std::string &name, const std::vector<std::string> &innames, const std::vector<std::string> &outnames,
3957 : ttg::TTBase(name, numinedges, numouts)
3958 , world(world)
3959 // if using default keymap, rebind to the given world
3960 , keymap(std::is_same<keymapT, ttg::detail::default_keymap<keyT>>::value
3961 ? decltype(keymap)(ttg::detail::default_keymap<keyT>(world))
3962 : decltype(keymap)(std::forward<keymapT>(keymap_)))
3963 , priomap(decltype(keymap)(std::forward<priomapT>(priomap_))) {
3964 // Cannot call these in base constructor since terminals not yet constructed
3965 if (innames.size() != numinedges) throw std::logic_error("ttg_parsec::TT: #input names != #input terminals");
3966 if (outnames.size() != numouts) throw std::logic_error("ttg_parsec::TT: #output names != #output terminals");
3967
3968 auto &world_impl = world.impl();
3969 world_impl.register_op(this);
3970
3971 if constexpr (numinedges == numins) {
3972 register_input_terminals(input_terminals, innames);
3973 } else {
3974 // create a name for the virtual control input
3975 register_input_terminals(input_terminals, std::array<std::string, 1>{std::string("Virtual Control")});
3976 }
3977 register_output_terminals(output_terminals, outnames);
3978
3979 register_input_callbacks(std::make_index_sequence<numinedges>{});
3980 int i;
3981
3982 memset(&self, 0, sizeof(parsec_task_class_t));
3983
3984 self.name = strdup(get_name().c_str());
3985 self.task_class_id = get_instance_id();
3986 self.nb_parameters = 0;
3987 self.nb_locals = 0;
3988 //self.nb_flows = numflows;
3989 self.nb_flows = MAX_PARAM_COUNT; // we're not using all flows but have to
3990 // trick the device handler into looking at all of them
3991
3992 if( world_impl.profiling() ) {
3993 // first two ints are used to store the hash of the key.
3994 self.nb_parameters = (sizeof(void*)+sizeof(int)-1)/sizeof(int);
3995 // seconds two ints are used to store a pointer to the key of the task.
3996 self.nb_locals = self.nb_parameters + (sizeof(void*)+sizeof(int)-1)/sizeof(int);
3997
3998 // If we have parameters and locals, we need to define the corresponding dereference arrays
3999 self.params[0] = &detail::parsec_taskclass_param0;
4000 self.params[1] = &detail::parsec_taskclass_param1;
4001
4002 self.locals[0] = &detail::parsec_taskclass_param0;
4003 self.locals[1] = &detail::parsec_taskclass_param1;
4004 self.locals[2] = &detail::parsec_taskclass_param2;
4005 self.locals[3] = &detail::parsec_taskclass_param3;
4006 }
4007 self.make_key = make_key;
4008 self.key_functions = &tasks_hash_fcts;
4009 self.task_snprintf = parsec_ttg_task_snprintf;
4010
4011#if defined(PARSEC_PROF_TRACE)
4012 self.profile_info = &parsec_ttg_task_info;
4013#endif
4014
4015 world_impl.taskpool()->nb_task_classes = std::max(world_impl.taskpool()->nb_task_classes, static_cast<decltype(world_impl.taskpool()->nb_task_classes)>(self.task_class_id+1));
4016 // function_id_to_instance[self.task_class_id] = this;
4017 //self.incarnations = incarnations_array.data();
4018//#if 0
4019 if constexpr (derived_has_cuda_op()) {
4020 self.incarnations = (__parsec_chore_t *)malloc(3 * sizeof(__parsec_chore_t));
4021 ((__parsec_chore_t *)self.incarnations)[0].type = PARSEC_DEV_CUDA;
4022 ((__parsec_chore_t *)self.incarnations)[0].evaluate = &detail::evaluate_cuda<TT>;
4023 ((__parsec_chore_t *)self.incarnations)[0].hook = &detail::hook_cuda<TT>;
4024 ((__parsec_chore_t *)self.incarnations)[1].type = PARSEC_DEV_NONE;
4025 ((__parsec_chore_t *)self.incarnations)[1].evaluate = NULL;
4026 ((__parsec_chore_t *)self.incarnations)[1].hook = NULL;
4027 } else if constexpr (derived_has_hip_op()) {
4028 self.incarnations = (__parsec_chore_t *)malloc(3 * sizeof(__parsec_chore_t));
4029 ((__parsec_chore_t *)self.incarnations)[0].type = PARSEC_DEV_HIP;
4030 ((__parsec_chore_t *)self.incarnations)[0].evaluate = &detail::evaluate_hip<TT>;
4031 ((__parsec_chore_t *)self.incarnations)[0].hook = &detail::hook_hip<TT>;
4032
4033 ((__parsec_chore_t *)self.incarnations)[1].type = PARSEC_DEV_NONE;
4034 ((__parsec_chore_t *)self.incarnations)[1].evaluate = NULL;
4035 ((__parsec_chore_t *)self.incarnations)[1].hook = NULL;
4036#if defined(PARSEC_HAVE_DEV_LEVEL_ZERO_SUPPORT)
4037 } else if constexpr (derived_has_level_zero_op()) {
4038 self.incarnations = (__parsec_chore_t *)malloc(3 * sizeof(__parsec_chore_t));
4039 ((__parsec_chore_t *)self.incarnations)[0].type = PARSEC_DEV_LEVEL_ZERO;
4040 ((__parsec_chore_t *)self.incarnations)[0].evaluate = &detail::evaluate_level_zero<TT>;
4041 ((__parsec_chore_t *)self.incarnations)[0].hook = &detail::hook_level_zero<TT>;
4042
4043 ((__parsec_chore_t *)self.incarnations)[1].type = PARSEC_DEV_NONE;
4044 ((__parsec_chore_t *)self.incarnations)[1].evaluate = NULL;
4045 ((__parsec_chore_t *)self.incarnations)[1].hook = NULL;
4046#endif // PARSEC_HAVE_DEV_LEVEL_ZERO_SUPPORT
4047 } else {
4048 self.incarnations = (__parsec_chore_t *)malloc(2 * sizeof(__parsec_chore_t));
4049 ((__parsec_chore_t *)self.incarnations)[0].type = PARSEC_DEV_CPU;
4050 ((__parsec_chore_t *)self.incarnations)[0].evaluate = NULL;
4051 ((__parsec_chore_t *)self.incarnations)[0].hook = &detail::hook<TT>;
4052 ((__parsec_chore_t *)self.incarnations)[1].type = PARSEC_DEV_NONE;
4053 ((__parsec_chore_t *)self.incarnations)[1].evaluate = NULL;
4054 ((__parsec_chore_t *)self.incarnations)[1].hook = NULL;
4055 }
4056//#endif // 0
4057
4059 self.complete_execution = complete_task_and_release;
4060
4061 for (i = 0; i < MAX_PARAM_COUNT; i++) {
4063 flow->name = strdup((std::string("flow in") + std::to_string(i)).c_str());
4064 flow->sym_type = PARSEC_SYM_INOUT;
4065 // see initialize_flows below
4066 // flow->flow_flags = PARSEC_FLOW_ACCESS_RW;
4067 flow->dep_in[0] = NULL;
4068 flow->dep_out[0] = NULL;
4069 flow->flow_index = i;
4070 flow->flow_datatype_mask = ~0;
4071 *((parsec_flow_t **)&(self.in[i])) = flow;
4072 }
4073 //*((parsec_flow_t **)&(self.in[i])) = NULL;
4074 //initialize_flows<input_terminals_type>(self.in);
4075
4076 for (i = 0; i < MAX_PARAM_COUNT; i++) {
4078 flow->name = strdup((std::string("flow out") + std::to_string(i)).c_str());
4079 flow->sym_type = PARSEC_SYM_INOUT;
4080 flow->flow_flags = PARSEC_FLOW_ACCESS_READ; // does PaRSEC use this???
4081 flow->dep_in[0] = NULL;
4082 flow->dep_out[0] = NULL;
4083 flow->flow_index = i;
4084 flow->flow_datatype_mask = (1 << i);
4085 *((parsec_flow_t **)&(self.out[i])) = flow;
4086 }
4087 //*((parsec_flow_t **)&(self.out[i])) = NULL;
4088
4089 self.flags = 0;
4090 self.dependencies_goal = numins; /* (~(uint32_t)0) >> (32 - numins); */
4091
4092 int nbthreads = 0;
4093 auto *context = world_impl.context();
4094 for (int i = 0; i < context->nb_vp; i++) {
4095 nbthreads += context->virtual_processes[i]->nb_cores;
4096 }
4097
4100
4102 NULL);
4103
4105 NULL);
4106 }
4107
4108 template <typename keymapT = ttg::detail::default_keymap<keyT>,
4109 typename priomapT = ttg::detail::default_priomap<keyT>>
4110 TT(const std::string &name, const std::vector<std::string> &innames, const std::vector<std::string> &outnames,
4111 keymapT &&keymap = keymapT(ttg::default_execution_context()), priomapT &&priomap = priomapT())
4112 : TT(name, innames, outnames, ttg::default_execution_context(), std::forward<keymapT>(keymap),
4113 std::forward<priomapT>(priomap)) {}
4114
4115 template <typename keymapT = ttg::detail::default_keymap<keyT>,
4116 typename priomapT = ttg::detail::default_priomap<keyT>>
4117 TT(const input_edges_type &inedges, const output_edges_type &outedges, const std::string &name,
4118 const std::vector<std::string> &innames, const std::vector<std::string> &outnames, ttg::World world,
4119 keymapT &&keymap_ = keymapT(), priomapT &&priomap = priomapT())
4120 : TT(name, innames, outnames, world, std::forward<keymapT>(keymap_), std::forward<priomapT>(priomap)) {
4121 connect_my_inputs_to_incoming_edge_outputs(std::make_index_sequence<numinedges>{}, inedges);
4122 connect_my_outputs_to_outgoing_edge_inputs(std::make_index_sequence<numouts>{}, outedges);
4123 //DO NOT MOVE THIS - information about the number of pull terminals is only available after connecting the edges.
4124 if constexpr (numinedges > 0) {
4125 register_input_callbacks(std::make_index_sequence<numinedges>{});
4126 }
4127 }
4128 template <typename keymapT = ttg::detail::default_keymap<keyT>,
4129 typename priomapT = ttg::detail::default_priomap<keyT>>
4130 TT(const input_edges_type &inedges, const output_edges_type &outedges, const std::string &name,
4131 const std::vector<std::string> &innames, const std::vector<std::string> &outnames,
4132 keymapT &&keymap = keymapT(ttg::default_execution_context()), priomapT &&priomap = priomapT())
4133 : TT(inedges, outedges, name, innames, outnames, ttg::default_execution_context(),
4134 std::forward<keymapT>(keymap), std::forward<priomapT>(priomap)) {}
4135
4136 // Destructor checks for unexecuted tasks
4137 virtual ~TT() {
4138 if(nullptr != self.name ) {
4139 free((void*)self.name);
4140 self.name = nullptr;
4141 }
4142
4143 for (std::size_t i = 0; i < numins; ++i) {
4144 if (inpute_reducers_taskclass[i] != nullptr) {
4145 std::free(inpute_reducers_taskclass[i]);
4146 inpute_reducers_taskclass[i] = nullptr;
4147 }
4148 }
4149 release();
4150 }
4151
4152 static void ht_iter_cb(void *item, void *cb_data) {
4153 task_t *task = (task_t *)item;
4154 ttT *op = (ttT *)cb_data;
4155 if constexpr (!ttg::meta::is_void_v<keyT>) {
4156 std::cout << "Left over task " << op->get_name() << " " << task->key << std::endl;
4157 } else {
4158 std::cout << "Left over task " << op->get_name() << std::endl;
4159 }
4160 }
4161
4165
4166 virtual void release() override { do_release(); }
4167
4168 void do_release() {
4169 if (!alive) {
4170 return;
4171 }
4172 alive = false;
4173 /* print all outstanding tasks */
4176 parsec_mempool_destruct(&mempools);
4177 // uintptr_t addr = (uintptr_t)self.incarnations;
4178 // free((void *)addr);
4179 free((__parsec_chore_t *)self.incarnations);
4180 for (int i = 0; i < MAX_PARAM_COUNT; i++) {
4181 if (NULL != self.in[i]) {
4182 free(self.in[i]->name);
4183 delete self.in[i];
4184 self.in[i] = nullptr;
4185 }
4186 if (NULL != self.out[i]) {
4187 free(self.out[i]->name);
4188 delete self.out[i];
4189 self.out[i] = nullptr;
4190 }
4191 }
4192 world.impl().deregister_op(this);
4193 }
4194
4196
4202 template <std::size_t i, typename Reducer>
4204 ttg::trace(world.rank(), ":", get_name(), " : setting reducer for terminal ", i);
4205 std::get<i>(input_reducers) = reducer;
4206
4207 parsec_task_class_t *tc = inpute_reducers_taskclass[i];
4208 if (nullptr == tc) {
4209 tc = (parsec_task_class_t *)std::calloc(1, sizeof(*tc));
4210 inpute_reducers_taskclass[i] = tc;
4211
4212 tc->name = strdup((get_name() + std::string(" reducer ") + std::to_string(i)).c_str());
4213 tc->task_class_id = get_instance_id();
4214 tc->nb_parameters = 0;
4215 tc->nb_locals = 0;
4216 tc->nb_flows = numflows;
4217
4218 auto &world_impl = world.impl();
4219
4220 if( world_impl.profiling() ) {
4221 // first two ints are used to store the hash of the key.
4222 tc->nb_parameters = (sizeof(void*)+sizeof(int)-1)/sizeof(int);
4223 // seconds two ints are used to store a pointer to the key of the task.
4224 tc->nb_locals = self.nb_parameters + (sizeof(void*)+sizeof(int)-1)/sizeof(int);
4225
4226 // If we have parameters and locals, we need to define the corresponding dereference arrays
4227 tc->params[0] = &detail::parsec_taskclass_param0;
4228 tc->params[1] = &detail::parsec_taskclass_param1;
4229
4230 tc->locals[0] = &detail::parsec_taskclass_param0;
4231 tc->locals[1] = &detail::parsec_taskclass_param1;
4232 tc->locals[2] = &detail::parsec_taskclass_param2;
4233 tc->locals[3] = &detail::parsec_taskclass_param3;
4234 }
4235 tc->make_key = make_key;
4236 tc->key_functions = &tasks_hash_fcts;
4237 tc->task_snprintf = parsec_ttg_task_snprintf;
4238
4239#if defined(PARSEC_PROF_TRACE)
4240 tc->profile_info = &parsec_ttg_task_info;
4241#endif
4242
4243 world_impl.taskpool()->nb_task_classes = std::max(world_impl.taskpool()->nb_task_classes, static_cast<decltype(world_impl.taskpool()->nb_task_classes)>(self.task_class_id+1));
4244
4245#if 0
4246 // FIXME: currently only support reduction on the host
4247 if constexpr (derived_has_cuda_op()) {
4248 self.incarnations = (__parsec_chore_t *)malloc(3 * sizeof(__parsec_chore_t));
4249 ((__parsec_chore_t *)self.incarnations)[0].type = PARSEC_DEV_CUDA;
4250 ((__parsec_chore_t *)self.incarnations)[0].evaluate = NULL;
4251 ((__parsec_chore_t *)self.incarnations)[0].hook = detail::hook_cuda;
4252 ((__parsec_chore_t *)self.incarnations)[1].type = PARSEC_DEV_CPU;
4253 ((__parsec_chore_t *)self.incarnations)[1].evaluate = NULL;
4254 ((__parsec_chore_t *)self.incarnations)[1].hook = detail::hook;
4255 ((__parsec_chore_t *)self.incarnations)[2].type = PARSEC_DEV_NONE;
4256 ((__parsec_chore_t *)self.incarnations)[2].evaluate = NULL;
4257 ((__parsec_chore_t *)self.incarnations)[2].hook = NULL;
4258 } else
4259#endif // 0
4260 {
4261 tc->incarnations = (__parsec_chore_t *)malloc(2 * sizeof(__parsec_chore_t));
4262 ((__parsec_chore_t *)tc->incarnations)[0].type = PARSEC_DEV_CPU;
4263 ((__parsec_chore_t *)tc->incarnations)[0].evaluate = NULL;
4264 ((__parsec_chore_t *)tc->incarnations)[0].hook = &static_reducer_op<i>;
4265 ((__parsec_chore_t *)tc->incarnations)[1].type = PARSEC_DEV_NONE;
4266 ((__parsec_chore_t *)tc->incarnations)[1].evaluate = NULL;
4267 ((__parsec_chore_t *)tc->incarnations)[1].hook = NULL;
4268 }
4269
4270 /* the reduction task does not alter the termination detection because the target task will execute */
4271 tc->release_task = &parsec_release_task_to_mempool;
4272 tc->complete_execution = NULL;
4273 }
4274 }
4275
4283 template <std::size_t i, typename Reducer>
4284 void set_input_reducer(Reducer &&reducer, std::size_t size) {
4285 set_input_reducer<i>(std::forward<Reducer>(reducer));
4287 }
4288
4289 // Returns reference to input terminal i to facilitate connection --- terminal
4290 // cannot be copied, moved or assigned
4291 template <std::size_t i>
4292 std::tuple_element_t<i, input_terminals_type> *in() {
4293 return &std::get<i>(input_terminals);
4294 }
4295
4296 // Returns reference to output terminal for purpose of connection --- terminal
4297 // cannot be copied, moved or assigned
4298 template <std::size_t i>
4299 std::tuple_element_t<i, output_terminalsT> *out() {
4300 return &std::get<i>(output_terminals);
4301 }
4302
4303 // Manual injection of a task with all input arguments specified as a tuple
4304 template <typename Key = keyT>
4305 std::enable_if_t<!ttg::meta::is_void_v<Key> && !ttg::meta::is_empty_tuple_v<input_values_tuple_type>, void> invoke(
4306 const Key &key, const input_values_tuple_type &args) {
4308 if constexpr(!std::is_same_v<Key, key_type>) {
4309 key_type k = key; /* cast that type into the key type we know */
4310 invoke(k, args);
4311 } else {
4312 /* trigger non-void inputs */
4314 /* trigger void inputs */
4316 set_args(void_index_seq{}, key, ttg::detail::make_void_tuple<void_index_seq::size()>());
4317 }
4318 }
4319
4320 // Manual injection of a key-free task and all input arguments specified as a tuple
4321 template <typename Key = keyT>
4322 std::enable_if_t<ttg::meta::is_void_v<Key> && !ttg::meta::is_empty_tuple_v<input_values_tuple_type>, void> invoke(
4323 const input_values_tuple_type &args) {
4325 /* trigger non-void inputs */
4327 /* trigger void inputs */
4329 set_args(void_index_seq{}, ttg::detail::make_void_tuple<void_index_seq::size()>());
4330 }
4331
4332 // Manual injection of a task that has no arguments
4333 template <typename Key = keyT>
4334 std::enable_if_t<!ttg::meta::is_void_v<Key> && ttg::meta::is_empty_tuple_v<input_values_tuple_type>, void> invoke(
4335 const Key &key) {
4337
4338 if constexpr(!std::is_same_v<Key, key_type>) {
4339 key_type k = key; /* cast that type into the key type we know */
4340 invoke(k);
4341 } else {
4342 /* trigger void inputs */
4344 set_args(void_index_seq{}, key, ttg::detail::make_void_tuple<void_index_seq::size()>());
4345 }
4346 }
4347
4348 // Manual injection of a task that has no key or arguments
4349 template <typename Key = keyT>
4350 std::enable_if_t<ttg::meta::is_void_v<Key> && ttg::meta::is_empty_tuple_v<input_values_tuple_type>, void> invoke() {
4352 /* trigger void inputs */
4354 set_args(void_index_seq{}, ttg::detail::make_void_tuple<void_index_seq::size()>());
4355 }
4356
4357 // overrides TTBase::invoke()
4358 void invoke() override {
4360 invoke<keyT>();
4361 else
4362 TTBase::invoke();
4363 }
4364
4365 private:
4366 template<typename Key, typename Arg, typename... Args, std::size_t I, std::size_t... Is>
4367 void invoke_arglist(std::index_sequence<I, Is...>, const Key& key, Arg&& arg, Args&&... args) {
4368 using arg_type = std::decay_t<Arg>;
4369 if constexpr (ttg::meta::is_ptr_v<arg_type>) {
4370 /* add a reference to the object */
4372 copy->add_ref();
4373 /* reset readers so that the value can flow without copying */
4374 copy->reset_readers();
4375 auto& val = *arg;
4376 set_arg_impl<I>(key, val, copy);
4377 ttg_parsec::detail::release_data_copy(copy);
4378 if constexpr (std::is_rvalue_reference_v<Arg>) {
4379 /* if the ptr was moved in we reset it */
4380 arg.reset();
4381 }
4382 } else if constexpr (!ttg::meta::is_ptr_v<arg_type>) {
4383 set_arg<I>(key, std::forward<Arg>(arg));
4384 }
4385 if constexpr (sizeof...(Is) > 0) {
4386 /* recursive next argument */
4387 invoke_arglist(std::index_sequence<Is...>{}, key, std::forward<Args>(args)...);
4388 }
4389 }
4390
4391 public:
4392 // Manual injection of a task with all input arguments specified as variadic arguments
4393 template <typename Key = keyT, typename Arg, typename... Args>
4394 std::enable_if_t<!ttg::meta::is_void_v<Key> && !ttg::meta::is_empty_tuple_v<input_values_tuple_type>, void> invoke(
4395 const Key &key, Arg&& arg, Args&&... args) {
4396 static_assert(sizeof...(Args)+1 == std::tuple_size_v<actual_input_tuple_type>,
4397 "Number of arguments to invoke must match the number of task inputs.");
4399 /* trigger non-void inputs */
4401 std::forward<Arg>(arg), std::forward<Args>(args)...);
4402 //set_args(ttg::meta::nonvoid_index_seq<actual_input_tuple_type>{}, key, args);
4403 /* trigger void inputs */
4405 set_args(void_index_seq{}, key, ttg::detail::make_void_tuple<void_index_seq::size()>());
4406 }
4407
4408 void set_defer_writer(bool value) {
4409 m_defer_writer = value;
4410 }
4411
4412 bool get_defer_writer(bool value) {
4413 return m_defer_writer;
4414 }
4415
4416 public:
4417 void make_executable() override {
4418 world.impl().register_tt_profiling(this);
4421 }
4422
4425 const decltype(keymap) &get_keymap() const { return keymap; }
4426
4428 template <typename Keymap>
4430 keymap = km;
4431 }
4432
4435 const decltype(priomap) &get_priomap() const { return priomap; }
4436
4439 template <typename Priomap>
4441 priomap = std::forward<Priomap>(pm);
4442 }
4443
4449 template<typename Devicemap>
4451 //static_assert(derived_has_device_op(), "Device map only allowed on device-enabled TT!");
4452 if constexpr (std::is_same_v<ttg::device::Device, decltype(dm(std::declval<keyT>()))>) {
4453 // dm returns a Device
4454 devicemap = std::forward<Devicemap>(dm);
4455 } else {
4456 // convert dm return into a Device
4457 devicemap = [=](const keyT& key) {
4458 if constexpr (derived_has_cuda_op()) {
4460 } else if constexpr (derived_has_hip_op()) {
4462 } else if constexpr (derived_has_level_zero_op()) {
4464 } else {
4465 throw std::runtime_error("Unknown device type!");
4466 return ttg::device::Device{};
4467 }
4468 };
4469 }
4470 }
4471
4474 auto get_devicemap() { return devicemap; }
4475
4478 template<typename Constraint>
4479 void add_constraint(std::shared_ptr<Constraint> c) {
4480 std::size_t cid = constraints_check.size();
4481 if constexpr(ttg::meta::is_void_v<keyT>) {
4482 c->add_listener([this, cid](){ this->release_constraint(cid); }, this);
4483 constraints_check.push_back([c, this](){ return c->check(this); });
4484 constraints_complete.push_back([c, this](const keyT& key){ c->complete(this); return true; });
4485 } else {
4486 c->add_listener([this, cid](const std::span<keyT>& keys){ this->release_constraint(cid, keys); }, this);
4487 constraints_check.push_back([c, this](const keyT& key){ return c->check(key, this); });
4488 constraints_complete.push_back([c, this](const keyT& key){ c->complete(key, this); return true; });
4489 }
4490 }
4491
4494 template<typename Constraint>
4496 // need to make this a shared_ptr since it's shared between different callbacks
4497 this->add_constraint(std::make_shared<Constraint>(std::forward<Constraint>(c)));
4498 }
4499
4503 template<typename Constraint, typename Mapper>
4504 void add_constraint(std::shared_ptr<Constraint> c, Mapper&& map) {
4505 static_assert(std::is_same_v<typename Constraint::key_type, keyT>);
4506 std::size_t cid = constraints_check.size();
4507 if constexpr(ttg::meta::is_void_v<keyT>) {
4508 c->add_listener([this, cid](){ this->release_constraint(cid); }, this);
4509 constraints_check.push_back([map, c, this](){ return c->check(map(), this); });
4510 constraints_complete.push_back([map, c, this](){ c->complete(map(), this); return true; });
4511 } else {
4512 c->add_listener([this, cid](const std::span<keyT>& keys){ this->release_constraint(cid, keys); }, this);
4513 constraints_check.push_back([map, c, this](const keyT& key){ return c->check(key, map(key), this); });
4514 constraints_complete.push_back([map, c, this](const keyT& key){ c->complete(key, map(key), this); return true; });
4515 }
4516 }
4517
4521 template<typename Constraint, typename Mapper>
4523 // need to make this a shared_ptr since it's shared between different callbacks
4524 this->add_constraint(std::make_shared<Constraint>(std::forward<Constraint>(c)), std::forward<Mapper>(map));
4525 }
4526
4527 // Register the static_op function to associate it to instance_id
4529 int rank;
4531 ttg::trace("ttg_parsec(", rank, ") Inserting into static_id_to_op_map at ", get_instance_id());
4532 static_set_arg_fct_call_t call = std::make_pair(&TT::static_set_arg, this);
4533 auto &world_impl = world.impl();
4534 static_map_mutex.lock();
4535 static_id_to_op_map.insert(std::make_pair(get_instance_id(), call));
4536 if (delayed_unpack_actions.count(get_instance_id()) > 0) {
4537 auto tp = world_impl.taskpool();
4538
4539 ttg::trace("ttg_parsec(", rank, ") There are ", delayed_unpack_actions.count(get_instance_id()),
4540 " messages delayed with op_id ", get_instance_id());
4541
4542 auto se = delayed_unpack_actions.equal_range(get_instance_id());
4543 std::vector<static_set_arg_fct_arg_t> tmp;
4544 for (auto it = se.first; it != se.second;) {
4545 assert(it->first == get_instance_id());
4546 tmp.push_back(std::move(it->second));
4547 it = delayed_unpack_actions.erase(it);
4548 }
4549 static_map_mutex.unlock();
4550
4551 for (auto& it : tmp) {
4552 if(ttg::tracing())
4553 ttg::print("ttg_parsec(", rank, ") Unpacking delayed message (", ", ", get_instance_id(), ", ",
4554 std::get<1>(it).get(), ", ", std::get<2>(it), ")");
4555 int rc = detail::static_unpack_msg(&parsec_ce, world_impl.parsec_ttg_tag(), std::get<1>(it).get(), std::get<2>(it),
4556 std::get<0>(it), NULL);
4557 assert(rc == 0);
4558 }
4559
4560 tmp.clear();
4561 } else {
4562 static_map_mutex.unlock();
4563 }
4564 }
4565 };
4566
4567#include "ttg/make_tt.h"
4568
4569} // namespace ttg_parsec
4570
4576template <>
4578 private:
4579 ttg_parsec::detail::ttg_data_copy_t *copy_to_remove = nullptr;
4580 bool do_release = true;
4581
4582 public:
4586 : copy_to_remove(h.copy_to_remove)
4587 {
4588 h.copy_to_remove = nullptr;
4589 }
4590
4593 {
4594 std::swap(copy_to_remove, h.copy_to_remove);
4595 return *this;
4596 }
4597
4599 if (nullptr != copy_to_remove) {
4600 ttg_parsec::detail::remove_data_copy(copy_to_remove, ttg_parsec::detail::parsec_ttg_caller);
4601 if (do_release) {
4602 ttg_parsec::detail::release_data_copy(copy_to_remove);
4603 }
4604 }
4605 }
4606
4607 template <typename Value>
4608 inline std::conditional_t<std::is_reference_v<Value>,Value,Value&&> operator()(Value &&value) {
4609 constexpr auto value_is_rvref = std::is_rvalue_reference_v<decltype(value)>;
4610 using value_type = std::remove_reference_t<Value>;
4611 static_assert(value_is_rvref ||
4612 std::is_copy_constructible_v<std::decay_t<Value>>,
4613 "Data sent without being moved must be copy-constructible!");
4614
4616 if (nullptr == caller) {
4617 throw std::runtime_error("ERROR: ttg::send or ttg::broadcast called outside of a task!");
4618 }
4619
4621 copy = ttg_parsec::detail::find_copy_in_task(caller, &value);
4622 value_type *value_ptr = &value;
4623 if (nullptr == copy) {
4628 copy = ttg_parsec::detail::create_new_datacopy(std::forward<Value>(value));
4629 bool inserted = ttg_parsec::detail::add_copy_to_task(copy, caller);
4630 assert(inserted);
4631 value_ptr = reinterpret_cast<value_type *>(copy->get_ptr());
4632 copy_to_remove = copy;
4633 } else {
4634 if constexpr (value_is_rvref) {
4635 /* this copy won't be modified anymore so mark it as read-only */
4636 copy->reset_readers();
4637 }
4638 }
4639 /* We're coming from a writer so mark the data as modified.
4640 * That way we can force a pushout in prepare_send if we move to read-only tasks (needed by PaRSEC). */
4642 if constexpr (value_is_rvref)
4643 return std::move(*value_ptr);
4644 else
4645 return *value_ptr;
4646 }
4647
4648 template<typename Value>
4649 inline std::add_lvalue_reference_t<Value> operator()(ttg_parsec::detail::persistent_value_ref<Value> vref) {
4651 if (nullptr == caller) {
4652 throw std::runtime_error("ERROR: ttg::send or ttg::broadcast called outside of a task!");
4653 }
4655 copy = ttg_parsec::detail::find_copy_in_task(caller, &vref.value_ref);
4656 if (nullptr == copy) {
4657 // no need to create a new copy since it's derived from the copy already
4658 copy = const_cast<ttg_parsec::detail::ttg_data_copy_t *>(static_cast<const ttg_parsec::detail::ttg_data_copy_t *>(&vref.value_ref));
4659 bool inserted = ttg_parsec::detail::add_copy_to_task(copy, caller);
4660 assert(inserted);
4661 copy_to_remove = copy; // we want to remove the copy from the task once done sending
4662 do_release = true; // we don't release the copy since we didn't allocate it
4663 copy->add_ref(); // add a reference so that TTG does not attempt to delete this object
4664 copy->add_ref(); // add another reference so that TTG never attempts to free this copy
4665 if (copy->num_readers() == 0) {
4666 /* add at least one reader (the current task) */
4667 copy->increment_readers<false>();
4668 }
4669 }
4670 return vref.value_ref;
4671 }
4672
4673 template <typename Value>
4674 inline const Value &operator()(const Value &value) {
4676 if (nullptr == caller) {
4677 throw std::runtime_error("ERROR: ttg::send or ttg::broadcast called outside of a task!");
4678 }
4680 copy = ttg_parsec::detail::find_copy_in_task(caller, &value);
4681 const Value *value_ptr = &value;
4682 if (nullptr == copy) {
4688 bool inserted = ttg_parsec::detail::add_copy_to_task(copy, caller);
4689 assert(inserted);
4690 value_ptr = reinterpret_cast<Value *>(copy->get_ptr());
4691 copy_to_remove = copy;
4692 }
4694 return *value_ptr;
4695 }
4696
4697};
4698
4699#endif // PARSEC_TTG_H_INCLUDED
4700// clang-format on
#define TTG_OP_ASSERT_EXECUTABLE()
Definition tt.h:278
Edge is used to connect In and Out terminals.
Definition edge.h:26
A base class for all template tasks.
Definition tt.h:32
void trace(const T &t, const Ts &...ts)
Like ttg::trace(), but only produces tracing output if this->tracing()==true
Definition tt.h:188
auto get_instance_id() const
Definition tt.h:260
virtual void make_executable()=0
Marks this executable.
Definition tt.h:288
bool tracing() const
Definition tt.h:179
TTBase(TTBase &&other)
Definition tt.h:117
void register_input_terminals(terminalsT &terms, const namesT &names)
Definition tt.h:86
const TTBase * ttg_ptr() const
Definition tt.h:207
const std::string & get_name() const
Gets the name of this operation.
Definition tt.h:219
bool is_lazy_pull()
Definition tt.h:201
void register_output_terminals(terminalsT &terms, const namesT &names)
Definition tt.h:93
A complete version of void.
Definition void.h:12
int size() const
Definition world.h:199
int rank() const
Definition world.h:205
WorldImplT & impl(void)
Definition world.h:217
Base class for implementation-specific Worlds.
Definition world.h:34
void release_ops(void)
Definition world.h:55
WorldImplBase(int size, int rank)
Definition world.h:62
bool is_valid(void) const
Definition world.h:155
Represents a device in a specific execution space.
Definition device.h:19
Device cycle() const
Definition device.h:65
static Device host()
Definition device.h:72
std::enable_if_t<!ttg::meta::is_void_v< Key >, void > release_constraint(std::size_t cid, const std::span< Key > &keys)
Definition ttg.h:2689
void print_incomplete_tasks()
Definition ttg.h:4162
std::enable_if_t<!ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > prepare_send(const ttg::span< const Key > &keylist, const Value &value)
Definition ttg.h:3658
void add_constraint(Constraint c, Mapper &&map)
Definition ttg.h:4522
TT(const input_edges_type &inedges, const output_edges_type &outedges, const std::string &name, const std::vector< std::string > &innames, const std::vector< std::string > &outnames, keymapT &&keymap=keymapT(ttg::default_execution_context()), priomapT &&priomap=priomapT())
Definition ttg.h:4130
ttg::World get_world() const override final
Definition ttg.h:1373
static constexpr int numinvals
Definition ttg.h:1285
uint64_t pack(T &obj, void *bytes, uint64_t pos, detail::ttg_data_copy_t *copy=nullptr)
Definition ttg.h:1976
std::tuple_element_t< i, output_terminalsT > * out()
Definition ttg.h:4299
void set_priomap(Priomap &&pm)
Definition ttg.h:4440
void set_input_reducer(Reducer &&reducer)
Definition ttg.h:4203
void set_devicemap(Devicemap &&dm)
Definition ttg.h:4450
void set_keymap(Keymap &&km)
keymap setter
Definition ttg.h:4429
std::enable_if_t< key_is_void, void > finalize_argstream()
finalizes stream for input i
Definition ttg.h:3479
void add_constraint(std::shared_ptr< Constraint > c)
Definition ttg.h:4479
void finalize_argstream_from_msg(void *data, std::size_t size)
Definition ttg.h:2305
std::enable_if_t< ttg::meta::is_void_v< Key >, void > set_arg()
Definition ttg.h:2790
std::tuple_element_t< i, input_terminals_type > * in()
Definition ttg.h:4292
std::enable_if_t<!std::is_void_v< std::decay_t< Value > >, void > do_prepare_send(const Value &value, RemoteCheckFn &&remote_check)
Definition ttg.h:3569
void set_input_reducer(Reducer &&reducer, std::size_t size)
Definition ttg.h:4284
void broadcast_arg_local(Iterator &&begin, Iterator &&end, const Value &value)
Definition ttg.h:3022
bool check_constraints(task_t *task)
Definition ttg.h:2648
std::enable_if_t< ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > set_arg_local(Value &&value)
Definition ttg.h:2367
ttg::detail::edges_tuple_t< keyT, ttg::meta::decayed_typelist_t< input_tuple_type > > input_edges_type
Definition ttg.h:1276
std::enable_if_t< ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > prepare_send(const Value &value)
Definition ttg.h:3672
std::enable_if_t< ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > set_arg(Value &&value)
Definition ttg.h:2785
std::enable_if_t< ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > set_arg_local(std::shared_ptr< const Value > &valueptr)
Definition ttg.h:2385
virtual ~TT()
Definition ttg.h:4137
output_terminalsT output_terminals_type
Definition ttg.h:1289
ttg::detail::input_terminals_tuple_t< keyT, input_tuple_type > input_terminals_type
Definition ttg.h:1274
std::enable_if_t<!ttg::meta::is_void_v< Key > &&!ttg::meta::is_empty_tuple_v< input_values_tuple_type >, void > invoke(const Key &key, const input_values_tuple_type &args)
Definition ttg.h:4305
void add_constraint(std::shared_ptr< Constraint > c, Mapper &&map)
Definition ttg.h:4504
std::enable_if_t< ttg::meta::is_none_void_v< Key >, void > set_args(std::index_sequence< Is... >, std::index_sequence< Js... >, const Key &key, const std::tuple< Ts... > &args)
Definition ttg.h:3248
std::enable_if_t< ttg::meta::is_void_v< Key > &&ttg::meta::is_empty_tuple_v< input_values_tuple_type >, void > invoke()
Definition ttg.h:4350
std::enable_if_t< ttg::meta::is_void_v< Key >, void > set_argstream_size(std::size_t size)
Definition ttg.h:3369
TT(const std::string &name, const std::vector< std::string > &innames, const std::vector< std::string > &outnames, keymapT &&keymap=keymapT(ttg::default_execution_context()), priomapT &&priomap=priomapT())
Definition ttg.h:4110
void set_defer_writer(bool value)
Definition ttg.h:4408
std::enable_if_t< ttg::meta::is_void_v< Key >, void > set_args(std::index_sequence< Is... >, std::index_sequence< Js... >, const std::tuple< Ts... > &args)
Definition ttg.h:3269
bool can_inline_data(Value *value_ptr, detail::ttg_data_copy_t *copy, const Key &key, std::size_t num_keys)
Definition ttg.h:2801
virtual void release() override
Definition ttg.h:4166
bool get_defer_writer(bool value)
Definition ttg.h:4412
static void ht_iter_cb(void *item, void *cb_data)
Definition ttg.h:4152
std::enable_if_t< ttg::meta::is_void_v< Key >, void > release_constraint(std::size_t cid)
Definition ttg.h:2665
std::enable_if_t< ttg::meta::is_none_void_v< Key >, void > set_args(std::index_sequence< Is... > is, const Key &key, const std::tuple< Ts... > &args)
Definition ttg.h:3260
const auto & get_output_terminals() const
Definition ttg.h:1314
void do_release()
Definition ttg.h:4168
void release_task(task_t *task, parsec_task_t **task_ring=nullptr)
Definition ttg.h:2725
detail::reducer_task_t * create_new_reducer_task(task_t *task, bool is_first)
Definition ttg.h:2418
static constexpr bool derived_has_device_op()
Definition ttg.h:1262
auto get_devicemap()
Definition ttg.h:4474
TT(const std::string &name, const std::vector< std::string > &innames, const std::vector< std::string > &outnames, ttg::World world, keymapT &&keymap_=keymapT(), priomapT &&priomap_=priomapT())
Definition ttg.h:3955
ttg::meta::drop_void_t< ttg::meta::decayed_typelist_t< input_tuple_type > > input_values_tuple_type
Definition ttg.h:1282
const decltype(priomap) & get_priomap() const
Definition ttg.h:4435
TT(const input_edges_type &inedges, const output_edges_type &outedges, const std::string &name, const std::vector< std::string > &innames, const std::vector< std::string > &outnames, ttg::World world, keymapT &&keymap_=keymapT(), priomapT &&priomap=priomapT())
Definition ttg.h:4117
task_t * create_new_task(const Key &key)
Definition ttg.h:2391
uint64_t unpack(T &obj, void *_bytes, uint64_t pos)
Definition ttg.h:1963
void set_static_argstream_size(std::size_t size)
Definition ttg.h:3289
static constexpr const ttg::Runtime runtime
Definition ttg.h:4195
static void static_set_arg(void *data, std::size_t size, ttg::TTBase *bop)
Definition ttg.h:1986
void set_arg_from_msg_keylist(ttg::span< keyT > &&keylist, detail::ttg_data_copy_t *copy)
Definition ttg.h:2039
std::enable_if_t< ttg::meta::is_void_v< Key >, void > set_args(std::index_sequence< Is... > is, const std::tuple< Ts... > &args)
Definition ttg.h:3280
static resultT get(InTuple &&intuple)
Definition ttg.h:1293
static constexpr bool derived_has_cuda_op()
Definition ttg.h:1244
void register_static_op_function(void)
Definition ttg.h:4528
std::enable_if_t<!ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > broadcast_arg(const ttg::span< const Key > &keylist, const Value &value)
Definition ttg.h:3052
void add_constraint(Constraint &&c)
Definition ttg.h:4495
typename ttg::terminals_to_edges< output_terminalsT >::type output_edges_type
Definition ttg.h:1290
void argstream_set_size_from_msg(void *data, std::size_t size)
Definition ttg.h:2324
void set_arg_from_msg(void *data, std::size_t size)
Definition ttg.h:2100
keyT key_type
Definition ttg.h:1273
void invoke() override
Definition ttg.h:4358
void make_executable() override
Marks this executable.
Definition ttg.h:4417
ttg::meta::add_glvalue_reference_tuple_t< ttg::meta::void_to_Void_tuple_t< actual_input_tuple_type > > input_refs_full_tuple_type
Definition ttg.h:1281
ttg::meta::drop_void_t< ttg::meta::add_glvalue_reference_tuple_t< input_tuple_type > > input_refs_tuple_type
Definition ttg.h:1283
std::enable_if_t<!ttg::meta::is_void_v< Key > &&!ttg::meta::is_empty_tuple_v< input_values_tuple_type >, void > invoke(const Key &key, Arg &&arg, Args &&... args)
Definition ttg.h:4394
ttg::meta::void_to_Void_tuple_t< ttg::meta::decayed_typelist_t< actual_input_tuple_type > > input_values_full_tuple_type
Definition ttg.h:1279
std::enable_if_t< ttg::meta::is_void_v< Key > &&!ttg::meta::is_empty_tuple_v< input_values_tuple_type >, void > invoke(const input_values_tuple_type &args)
Definition ttg.h:4322
void set_arg_local_impl(const Key &key, Value &&value, detail::ttg_data_copy_t *copy_in=nullptr, parsec_task_t **task_ring=nullptr)
Definition ttg.h:2445
std::enable_if_t<!ttg::meta::is_void_v< Key > &&ttg::meta::is_empty_tuple_v< input_values_tuple_type >, void > invoke(const Key &key)
Definition ttg.h:4334
const decltype(keymap) & get_keymap() const
Definition ttg.h:4425
std::enable_if_t< ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > set_arg_local(const Value &value)
Definition ttg.h:2379
std::enable_if_t<!ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > set_arg_local(const Key &key, const Value &value)
Definition ttg.h:2373
static constexpr bool derived_has_hip_op()
Definition ttg.h:1250
void copy_mark_pushout(const Value &value)
Definition ttg.h:3527
void get_from_pull_msg(void *data, std::size_t size)
Definition ttg.h:2347
parsec_thread_mempool_t * get_task_mempool(void)
Definition ttg.h:2031
std::enable_if_t<!ttg::meta::is_void_v< Key >, void > set_argstream_size(const Key &key, std::size_t size)
Definition ttg.h:3308
std::enable_if_t<!ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > set_arg_local(const Key &key, Value &&value)
Definition ttg.h:2361
void set_arg_impl(const Key &key, Value &&value, detail::ttg_data_copy_t *copy_in=nullptr)
Definition ttg.h:2835
static auto & get(InTuple &&intuple)
Definition ttg.h:1297
static constexpr bool derived_has_level_zero_op()
Definition ttg.h:1256
std::enable_if_t<!ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > set_arg(const Key &key, Value &&value)
Definition ttg.h:2778
actual_input_tuple_type input_args_type
Definition ttg.h:1275
std::enable_if_t<!ttg::meta::is_void_v< Key >, void > set_arg(const Key &key)
Definition ttg.h:2796
std::enable_if_t<!ttg::meta::is_void_v< Key >, void > finalize_argstream(const Key &key)
finalizes stream for input i
Definition ttg.h:3427
void increment_created()
Definition ttg.h:470
auto * execution_stream()
Definition ttg.h:346
virtual void execute() override
Definition ttg.h:411
static constexpr int parsec_ttg_rma_tag()
Definition ttg.h:407
void decrement_inflight_msg()
Definition ttg.h:473
void destroy_tpool()
Definition ttg.h:422
void increment_inflight_msg()
Definition ttg.h:472
WorldImpl(const WorldImpl &other)=delete
virtual void profile_off() override
Definition ttg.h:502
WorldImpl & operator=(const WorldImpl &other)=delete
WorldImpl & operator=(WorldImpl &&other)=delete
void create_tpool()
Definition ttg.h:349
WorldImpl(int *argc, char **argv[], int ncores, parsec_context_t *c=nullptr)
Definition ttg.h:275
MPI_Comm comm() const
Definition ttg.h:409
bool mpi_support(ttg::ExecutionSpace space)
Definition ttg.h:516
void register_tt_profiling(const TT< keyT, output_terminalsT, derivedT, input_valueTs, Space > *t)
Definition ttg.h:531
virtual bool profiling() override
Definition ttg.h:514
ttg::Edge & ctl_edge()
Definition ttg.h:466
virtual void dag_off() override
Definition ttg.h:493
virtual void fence_impl(void) override
Definition ttg.h:570
virtual void dag_on(const std::string &filename) override
Definition ttg.h:477
static constexpr int parsec_ttg_tag()
Definition ttg.h:406
virtual void final_task() override
Definition ttg.h:520
virtual void destroy() override
Definition ttg.h:435
const ttg::Edge & ctl_edge() const
Definition ttg.h:468
virtual void profile_on() override
Definition ttg.h:508
WorldImpl(WorldImpl &&other)=delete
bool dag_profiling() override
Definition ttg.h:475
auto * context()
Definition ttg.h:345
auto * taskpool()
Definition ttg.h:347
constexpr auto data(C &c) -> decltype(c.data())
Definition span.h:191
typename make_index_sequence_t< I... >::type make_index_sequence
STL namespace.
void deregister_world(ttg::base::WorldImplBase &world)
bool force_device_comm()
Definition env.cpp:34
typename input_terminals_tuple< keyT, valuesT... >::type input_terminals_tuple_t
Definition terminal.h:355
void register_world(ttg::base::WorldImplBase &world)
int num_threads()
Determine the number of compute threads to use by TTG when not given to ttg::initialize
Definition env.cpp:16
typename edges_tuple< keyT, valuesT >::type edges_tuple_t
Definition edge.h:200
void buffer_apply(T &&t, Fn &&fn)
Definition buffer.h:152
void set_current(int device, Stream stream)
Definition device.h:149
void reset_current()
Definition device.h:144
constexpr bool probe_all_v
Definition meta.h:192
typename typelist_to_tuple< T >::type typelist_to_tuple_t
Definition typelist.h:53
constexpr auto get(typelist< T, RestOfTs... >)
Definition typelist.h:102
bool all_devices_peer_access
Definition ttg.h:233
ttg::device::Device parsec_device_to_ttg_device(int parsec_id)
Definition device.h:31
std::size_t max_inline_size
Definition ttg.h:193
bool & initialized_mpi()
Definition ttg.h:228
void foreach_parsec_data(Value &&value, Fn &&fn)
Definition parsec_data.h:10
int ttg_device_to_parsec_device(const ttg::device::Device &device)
Definition device.h:19
constexpr const int PARSEC_TTG_MAX_AM_SIZE
Definition ttg.h:175
ttg_parsec::detail::ttg_data_copy_t * get_copy(ttg_parsec::Ptr< T > &p)
Definition ptr.h:278
ttg_data_copy_t * create_new_datacopy(Value &&value)
Definition ttg.h:718
thread_local parsec_ttg_task_base_t * parsec_ttg_caller
this contains PaRSEC-based TTG functionality
Definition fwd.h:19
void ttg_fence(ttg::World world)
Definition ttg.h:1141
std::map< uint64_t, static_set_arg_fct_call_t > static_id_to_op_map
Definition ttg.h:137
std::multimap< uint64_t, static_set_arg_fct_arg_t > delayed_unpack_actions
Definition ttg.h:140
void ttg_finalize()
Definition ttg.h:1127
void ttg_register_ptr(ttg::World world, const std::shared_ptr< T > &ptr)
Definition ttg.h:1144
std::mutex static_map_mutex
Definition ttg.h:138
void ttg_register_callback(ttg::World world, Callback &&callback)
Definition ttg.h:1158
ttg::Edge & ttg_ctl_edge(ttg::World world)
Definition ttg.h:1162
void make_executable_hook(ttg::World &)
Definition ttg.h:1170
void(* static_set_arg_fct_type)(void *, size_t, ttg::TTBase *)
Definition ttg.h:135
std::tuple< int, std::unique_ptr< std::byte[]>, size_t > static_set_arg_fct_arg_t
Definition ttg.h:139
void ttg_initialize(int argc, char **argv, int num_threads=-1, parsec_context_s *=nullptr)
ttg::World ttg_default_execution_context()
Definition ttg.h:1137
std::pair< static_set_arg_fct_type, ttg::TTBase * > static_set_arg_fct_call_t
Definition ttg.h:136
void ttg_execute(ttg::World world)
Definition ttg.h:1140
void ttg_sum(ttg::World world, double &value)
Definition ttg.h:1164
void ttg_register_status(ttg::World world, const std::shared_ptr< std::promise< void > > &status_ptr)
Definition ttg.h:1153
top-level TTG namespace contains runtime-neutral functionality
Definition keymap.h:9
ExecutionSpace
denotes task execution space
Definition execution.h:18
int size(World world=default_execution_context())
Definition run.h:131
void abort()
Aborts the TTG program using the default backend's ttg_abort method.
Definition run.h:104
Runtime
Definition runtimes.h:16
ttg::World & get_default_world()
Definition world.h:81
World default_execution_context()
Accesses the default backend's default execution context.
Definition run.h:110
TTG_CXX_COROUTINE_NAMESPACE::coroutine_handle< Promise > coroutine_handle
Definition coroutine.h:25
void print(const T &t, const Ts &... ts)
atomically prints to std::cout a sequence of items (separated by ttg::print_separator) followed by st...
Definition print.h:131
void print_error(const T &t, const Ts &... ts)
atomically prints to std::cerr a sequence of items (separated by ttg::print_separator) followed by st...
Definition print.h:139
bool tracing()
returns whether tracing is enabled
Definition trace.h:29
void trace(const T &t, const Ts &... ts)
Definition trace.h:44
@ ResumableTask
-> ttg::resumable_task
@ Invalid
not a coroutine, i.e. a standard task function, -> void
@ DeviceTask
-> ttg::device::Task
#define TTG_PARSEC_FLOW_ACCESS_TMP
Definition parsec-ext.h:9
Provides (de)serialization of C++ data that can be invoked from C via ttg_data_descriptor.
const Value & operator()(const Value &value)
Definition ttg.h:4674
value_copy_handler & operator=(const value_copy_handler &h)=delete
std::add_lvalue_reference_t< Value > operator()(ttg_parsec::detail::persistent_value_ref< Value > vref)
Definition ttg.h:4649
std::conditional_t< std::is_reference_v< Value >, Value, Value && > operator()(Value &&value)
Definition ttg.h:4608
value_copy_handler(const value_copy_handler &h)=delete
value_copy_handler & operator=(value_copy_handler &&h)
Definition ttg.h:4592
A container for types.
Definition typelist.h:25
Computes hash values for objects of type T.
Definition hash.h:82
task that can be resumed after some events occur
Definition coroutine.h:54
parsec_task_class_t self
Definition ttg.h:1202
parsec_hash_table_t task_constraint_table
Definition ttg.h:1201
parsec_hash_table_t tasks_table
Definition ttg.h:1200
parsec_gpu_task_t * gpu_task
Definition task.h:15
msg_header_t tt_id
Definition ttg.h:178
static constexpr std::size_t max_payload_size
Definition ttg.h:179
msg_t(uint64_t tt_id, uint32_t taskpool_id, msg_header_t::fn_id_t fn_id, int32_t param_id, int sender, int num_keys=1)
Definition ttg.h:183
unsigned char bytes[max_payload_size]
Definition ttg.h:180
std::array< stream_info_t, num_streams > streams
Definition task.h:211
ttg_data_copy_t * copies[num_copies]
Definition task.h:217
lvalue_reference_type value_ref
Definition ttvalue.h:89
static void drop_all_ptr()
Definition ptr.h:118
enum ttg_parsec::msg_header_t::fn_id fn_id_t
uint32_t taskpool_id
Definition ttg.h:149
std::int8_t num_iovecs
Definition ttg.h:153
std::size_t key_offset
Definition ttg.h:151
msg_header_t(fn_id_t fid, uint32_t tid, uint64_t oid, int32_t pid, int sender, int nk)
Definition ttg.h:161
#define TTG_PROCESS_TT_OP_RETURN(result, id, invoke)
Definition tt.h:182
int parsec_add_fetch_runtime_task(parsec_taskpool_t *tp, int tasks)
void parsec_taskpool_termination_detected(parsec_taskpool_t *tp)
#define TTG_PARSEC_DEFER_WRITER
Definition ttg.h:14