ttg.h
Go to the documentation of this file.
1 // clang-format off
2 #ifndef PARSEC_TTG_H_INCLUDED
3 #define PARSEC_TTG_H_INCLUDED
4 
5 /* set up env if this header was included directly */
6 #if !defined(TTG_IMPL_NAME)
7 #define TTG_USE_PARSEC 1
8 #endif // !defined(TTG_IMPL_NAME)
9 
10 /* Whether to defer a potential writer if there are readers.
11  * This may avoid extra copies in exchange for concurrency.
12  * This may cause deadlocks, so use with caution. */
13 #define TTG_PARSEC_DEFER_WRITER false
14 
15 #include "ttg/config.h"
16 
17 #include "ttg/impl_selector.h"
18 
19 /* include ttg header to make symbols available in case this header is included directly */
20 #include "../../ttg.h"
21 
22 #include "ttg/base/keymap.h"
23 #include "ttg/base/tt.h"
24 #include "ttg/base/world.h"
25 #include "ttg/edge.h"
26 #include "ttg/execution.h"
27 #include "ttg/func.h"
28 #include "ttg/runtimes.h"
29 #include "ttg/terminal.h"
30 #include "ttg/tt.h"
31 #include "ttg/util/env.h"
32 #include "ttg/util/hash.h"
33 #include "ttg/util/meta.h"
34 #include "ttg/util/meta/callable.h"
35 #include "ttg/util/print.h"
36 #include "ttg/util/trace.h"
37 #include "ttg/util/typelist.h"
38 #ifdef TTG_HAVE_DEVICE
39 #include "ttg/device/task.h"
40 #endif // TTG_HAVE_DEVICE
41 
43 
44 #include "ttg/parsec/fwd.h"
45 
46 #include "ttg/parsec/buffer.h"
49 #include "ttg/parsec/devicefunc.h"
50 #include "ttg/parsec/ttvalue.h"
51 
52 #include <algorithm>
53 #include <array>
54 #include <cassert>
55 #include <cstring>
56 #include <experimental/type_traits>
57 #include <functional>
58 #include <future>
59 #include <iostream>
60 #include <list>
61 #include <map>
62 #include <memory>
63 #include <mutex>
64 #include <numeric>
65 #include <sstream>
66 #include <string>
67 #include <tuple>
68 #include <vector>
69 
70 // needed for MPIX_CUDA_AWARE_SUPPORT
71 #if defined(TTG_HAVE_MPI)
72 #include <mpi.h>
73 #if defined(TTG_HAVE_MPIEXT)
74 #include <mpi-ext.h>
75 #endif // TTG_HAVE_MPIEXT
76 #endif // TTG_HAVE_MPI
77 
78 
79 #include <parsec.h>
80 #include <parsec/class/parsec_hash_table.h>
81 #include <parsec/data_internal.h>
82 #include <parsec/execution_stream.h>
83 #include <parsec/interfaces/interface.h>
84 #include <parsec/mca/device/device.h>
85 #include <parsec/parsec_comm_engine.h>
86 #include <parsec/parsec_internal.h>
87 #include <parsec/scheduling.h>
88 #include <parsec/remote_dep.h>
89 
90 #ifdef PARSEC_HAVE_DEV_CUDA_SUPPORT
91 #include <parsec/mca/device/cuda/device_cuda.h>
92 #endif // PARSEC_HAVE_DEV_CUDA_SUPPORT
93 #ifdef PARSEC_HAVE_DEV_HIP_SUPPORT
94 #include <parsec/mca/device/hip/device_hip.h>
95 #endif // PARSEC_HAVE_DEV_HIP_SUPPORT
96 #ifdef PARSEC_HAVE_DEV_LEVEL_ZERO_SUPPORT
97 #include <parsec/mca/device/level_zero/device_level_zero.h>
98 #endif //PARSEC_HAVE_DEV_LEVEL_ZERO_SUPPORT
99 
100 #include <parsec/mca/device/device_gpu.h>
101 #if defined(PARSEC_PROF_TRACE)
102 #include <parsec/profiling.h>
103 #undef PARSEC_TTG_PROFILE_BACKEND
104 #if defined(PARSEC_PROF_GRAPHER)
105 #include <parsec/parsec_prof_grapher.h>
106 #endif
107 #endif
108 #include <cstdlib>
109 #include <cstring>
110 
111 #if defined(TTG_PARSEC_DEBUG_TRACK_DATA_COPIES)
112 #include <unordered_set>
113 #endif
114 
116 #include "ttg/parsec/thread_local.h"
117 #include "ttg/parsec/ptr.h"
118 #include "ttg/parsec/task.h"
119 #include "ttg/parsec/parsec-ext.h"
120 
121 #include "ttg/device/device.h"
122 
123 #include <boost/type_index.hpp>
124 
125 #undef TTG_PARSEC_DEBUG_TRACK_DATA_COPIES
126 
127 /* PaRSEC function declarations */
128 extern "C" {
129 void parsec_taskpool_termination_detected(parsec_taskpool_t *tp);
130 int parsec_add_fetch_runtime_task(parsec_taskpool_t *tp, int tasks);
131 }
132 
133 namespace ttg_parsec {
134  typedef void (*static_set_arg_fct_type)(void *, size_t, ttg::TTBase *);
135  typedef std::pair<static_set_arg_fct_type, ttg::TTBase *> static_set_arg_fct_call_t;
136  inline std::map<uint64_t, static_set_arg_fct_call_t> static_id_to_op_map;
137  inline std::mutex static_map_mutex;
138  typedef std::tuple<int, void *, size_t> static_set_arg_fct_arg_t;
139  inline std::multimap<uint64_t, static_set_arg_fct_arg_t> delayed_unpack_actions;
140 
141  struct msg_header_t {
142  typedef enum fn_id : std::int8_t {
148  uint32_t taskpool_id = -1;
149  uint64_t op_id = -1;
150  std::size_t key_offset = 0;
152  std::int8_t num_iovecs = 0;
153  bool inline_data = false;
154  int32_t param_id = -1;
155  int num_keys = 0;
156  int sender = -1;
157 
158  msg_header_t() = default;
159 
160  msg_header_t(fn_id_t fid, uint32_t tid, uint64_t oid, int32_t pid, int sender, int nk)
161  : fn_id(fid)
162  , taskpool_id(tid)
163  , op_id(oid)
164  , param_id(pid)
165  , num_keys(nk)
166  , sender(sender)
167  { }
168  };
169 
170  static void unregister_parsec_tags(void *_);
171 
172  namespace detail {
173 
174  constexpr const int PARSEC_TTG_MAX_AM_SIZE = 1 * 1024*1024;
175 
176  struct msg_t {
178  static constexpr std::size_t max_payload_size = PARSEC_TTG_MAX_AM_SIZE - sizeof(msg_header_t);
179  unsigned char bytes[max_payload_size];
180 
181  msg_t() = default;
182  msg_t(uint64_t tt_id,
183  uint32_t taskpool_id,
184  msg_header_t::fn_id_t fn_id,
185  int32_t param_id,
186  int sender,
187  int num_keys = 1)
188  : tt_id(fn_id, taskpool_id, tt_id, param_id, sender, num_keys)
189  {}
190  };
191 
193 
194  static int static_unpack_msg(parsec_comm_engine_t *ce, uint64_t tag, void *data, long unsigned int size,
195  int src_rank, void *obj) {
196  static_set_arg_fct_type static_set_arg_fct;
197  parsec_taskpool_t *tp = NULL;
198  msg_header_t *msg = static_cast<msg_header_t *>(data);
199  uint64_t op_id = msg->op_id;
200  tp = parsec_taskpool_lookup(msg->taskpool_id);
201  assert(NULL != tp);
202  static_map_mutex.lock();
203  try {
204  auto op_pair = static_id_to_op_map.at(op_id);
205  static_map_mutex.unlock();
206  tp->tdm.module->incoming_message_start(tp, src_rank, NULL, NULL, 0, NULL);
207  static_set_arg_fct = op_pair.first;
208  static_set_arg_fct(data, size, op_pair.second);
209  tp->tdm.module->incoming_message_end(tp, NULL);
210  return 0;
211  } catch (const std::out_of_range &e) {
212  void *data_cpy = malloc(size);
213  assert(data_cpy != 0);
214  memcpy(data_cpy, data, size);
215  ttg::trace("ttg_parsec(", ttg_default_execution_context().rank(), ") Delaying delivery of message (", src_rank,
216  ", ", op_id, ", ", data_cpy, ", ", size, ")");
217  delayed_unpack_actions.insert(std::make_pair(op_id, std::make_tuple(src_rank, data_cpy, size)));
218  static_map_mutex.unlock();
219  return 1;
220  }
221  }
222 
223  static int get_remote_complete_cb(parsec_comm_engine_t *ce, parsec_ce_tag_t tag, void *msg, size_t msg_size,
224  int src, void *cb_data);
225 
226  inline bool &initialized_mpi() {
227  static bool im = false;
228  return im;
229  }
230 
231  } // namespace detail
232 
234  ttg::Edge<> m_ctl_edge;
235  bool _dag_profiling;
236  bool _task_profiling;
237  std::array<bool, static_cast<std::size_t>(ttg::ExecutionSpace::Invalid)>
238  mpi_space_support = {true, false, false};
239 
240  int query_comm_size() {
241  int comm_size;
242  MPI_Comm_size(MPI_COMM_WORLD, &comm_size);
243  return comm_size;
244  }
245 
246  int query_comm_rank() {
247  int comm_rank;
248  MPI_Comm_rank(MPI_COMM_WORLD, &comm_rank);
249  return comm_rank;
250  }
251 
252  static void ttg_parsec_ce_up(parsec_comm_engine_t *comm_engine, void *user_data)
253  {
254  parsec_ce.tag_register(WorldImpl::parsec_ttg_tag(), &detail::static_unpack_msg, user_data, detail::PARSEC_TTG_MAX_AM_SIZE);
255  parsec_ce.tag_register(WorldImpl::parsec_ttg_rma_tag(), &detail::get_remote_complete_cb, user_data, 128);
256  }
257 
258  static void ttg_parsec_ce_down(parsec_comm_engine_t *comm_engine, void *user_data)
259  {
260  parsec_ce.tag_unregister(WorldImpl::parsec_ttg_tag());
261  parsec_ce.tag_unregister(WorldImpl::parsec_ttg_rma_tag());
262  }
263 
264  public:
265 #if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND)
266  int parsec_ttg_profile_backend_set_arg_start, parsec_ttg_profile_backend_set_arg_end;
267  int parsec_ttg_profile_backend_bcast_arg_start, parsec_ttg_profile_backend_bcast_arg_end;
268  int parsec_ttg_profile_backend_allocate_datacopy, parsec_ttg_profile_backend_free_datacopy;
269 #endif
270 
271  WorldImpl(int *argc, char **argv[], int ncores, parsec_context_t *c = nullptr)
272  : WorldImplBase(query_comm_size(), query_comm_rank())
273  , ctx(c)
274  , own_ctx(c == nullptr)
275 #if defined(PARSEC_PROF_TRACE)
276  , profiling_array(nullptr)
277  , profiling_array_size(0)
278 #endif
279  , _dag_profiling(false)
280  , _task_profiling(false)
281  {
283  if (own_ctx) ctx = parsec_init(ncores, argc, argv);
284 
285  /* query MPI device support */
287 #if defined(MPIX_CUDA_AWARE_SUPPORT) && MPIX_CUDA_AWARE_SUPPORT
288  || MPIX_Query_cuda_support()
289 #endif // MPIX_CUDA_AWARE_SUPPORT
290  ) {
291  mpi_space_support[static_cast<std::size_t>(ttg::ExecutionSpace::CUDA)] = true;
292  }
293 
295 #if defined(MPIX_HIP_AWARE_SUPPORT) && MPIX_HIP_AWARE_SUPPORT
296  || MPIX_Query_hip_support()
297 #endif // MPIX_HIP_AWARE_SUPPORT
298  ) {
299  mpi_space_support[static_cast<std::size_t>(ttg::ExecutionSpace::HIP)] = true;
300  }
301 
302 #if defined(PARSEC_PROF_TRACE)
303  if(parsec_profile_enabled) {
304  profile_on();
305 #if defined(PARSEC_TTG_PROFILE_BACKEND)
306  parsec_profiling_add_dictionary_keyword("PARSEC_TTG_SET_ARG_IMPL", "fill:000000", 0, NULL,
307  (int*)&parsec_ttg_profile_backend_set_arg_start,
308  (int*)&parsec_ttg_profile_backend_set_arg_end);
309  parsec_profiling_add_dictionary_keyword("PARSEC_TTG_BCAST_ARG_IMPL", "fill:000000", 0, NULL,
310  (int*)&parsec_ttg_profile_backend_bcast_arg_start,
311  (int*)&parsec_ttg_profile_backend_bcast_arg_end);
312  parsec_profiling_add_dictionary_keyword("PARSEC_TTG_DATACOPY", "fill:000000",
313  sizeof(size_t), "size{int64_t}",
314  (int*)&parsec_ttg_profile_backend_allocate_datacopy,
315  (int*)&parsec_ttg_profile_backend_free_datacopy);
316 #endif
317  }
318 #endif
319 
320  if( NULL != parsec_ce.tag_register) {
321  parsec_ce.tag_register(WorldImpl::parsec_ttg_tag(), &detail::static_unpack_msg, this, detail::PARSEC_TTG_MAX_AM_SIZE);
322  parsec_ce.tag_register(WorldImpl::parsec_ttg_rma_tag(), &detail::get_remote_complete_cb, this, 128);
323  }
324 
325  create_tpool();
326  }
327 
328 
329  auto *context() { return ctx; }
330  auto *execution_stream() { return parsec_my_execution_stream(); }
331  auto *taskpool() { return tpool; }
332 
333  void create_tpool() {
334  assert(nullptr == tpool);
335  tpool = PARSEC_OBJ_NEW(parsec_taskpool_t);
336  tpool->taskpool_id = -1;
337  tpool->update_nb_runtime_task = parsec_add_fetch_runtime_task;
338  tpool->taskpool_type = PARSEC_TASKPOOL_TYPE_TTG;
339  tpool->taskpool_name = strdup("TTG Taskpool");
340  parsec_taskpool_reserve_id(tpool);
341 
342  tpool->devices_index_mask = 0;
343  for(int i = 0; i < (int)parsec_nb_devices; i++) {
344  parsec_device_module_t *device = parsec_mca_device_get(i);
345  if( NULL == device ) continue;
346  tpool->devices_index_mask |= (1 << device->device_index);
347  }
348 
349 #ifdef TTG_USE_USER_TERMDET
350  parsec_termdet_open_module(tpool, "user_trigger");
351 #else // TTG_USE_USER_TERMDET
352  parsec_termdet_open_dyn_module(tpool);
353 #endif // TTG_USE_USER_TERMDET
354  tpool->tdm.module->monitor_taskpool(tpool, parsec_taskpool_termination_detected);
355  // In TTG, we use the pending actions to denote that the
356  // taskpool is not ready, i.e. some local tasks could still
357  // be added by the main thread. It should then be initialized
358  // to 0, execute will set it to 1 and mark the tpool as ready,
359  // and the fence() will decrease it back to 0.
360  tpool->tdm.module->taskpool_set_runtime_actions(tpool, 0);
361  parsec_taskpool_enable(tpool, NULL, NULL, execution_stream(), size() > 1);
362 
363 #if defined(PARSEC_PROF_TRACE)
364  tpool->profiling_array = profiling_array;
365 #endif
366 
367  // Termination detection in PaRSEC requires to synchronize the
368  // taskpool enabling, to avoid a race condition that would keep
369  // termination detection-related messages in a waiting queue
370  // forever
371  MPI_Barrier(comm());
372 
373  parsec_taskpool_started = false;
374  }
375 
376  /* Deleted copy ctor */
377  WorldImpl(const WorldImpl &other) = delete;
378 
379  /* Deleted move ctor */
380  WorldImpl(WorldImpl &&other) = delete;
381 
382  /* Deleted copy assignment */
383  WorldImpl &operator=(const WorldImpl &other) = delete;
384 
385  /* Deleted move assignment */
386  WorldImpl &operator=(WorldImpl &&other) = delete;
387 
389 
390  static constexpr int parsec_ttg_tag() { return PARSEC_DSL_TTG_TAG; }
391  static constexpr int parsec_ttg_rma_tag() { return PARSEC_DSL_TTG_RMA_TAG; }
392 
393  MPI_Comm comm() const { return MPI_COMM_WORLD; }
394 
395  virtual void execute() override {
396  if (!parsec_taskpool_started) {
397  parsec_enqueue(ctx, tpool);
398  tpool->tdm.module->taskpool_addto_runtime_actions(tpool, 1);
399  tpool->tdm.module->taskpool_ready(tpool);
400  [[maybe_unused]] auto ret = parsec_context_start(ctx);
401  // ignore ret since all of its nonzero values are OK (e.g. -1 due to ctx already being active)
402  parsec_taskpool_started = true;
403  }
404  }
405 
406  void destroy_tpool() {
407 #if defined(PARSEC_PROF_TRACE)
408  // We don't want to release the profiling array, as it should be persistent
409  // between fences() to allow defining a TT/TTG before a fence() and schedule
410  // it / complete it after a fence()
411  tpool->profiling_array = nullptr;
412 #endif
413  assert(NULL != tpool->tdm.monitor);
414  tpool->tdm.module->unmonitor_taskpool(tpool);
415  parsec_taskpool_free(tpool);
416  tpool = nullptr;
417  }
418 
419  virtual void destroy() override {
420  if (is_valid()) {
421  if (parsec_taskpool_started) {
422  // We are locally ready (i.e. we won't add new tasks)
423  tpool->tdm.module->taskpool_addto_runtime_actions(tpool, -1);
424  ttg::trace("ttg_parsec(", this->rank(), "): final waiting for completion");
425  if (own_ctx)
426  parsec_context_wait(ctx);
427  else
428  parsec_taskpool_wait(tpool);
429  }
430  release_ops();
432  destroy_tpool();
433  if (own_ctx) {
434  unregister_parsec_tags(nullptr);
435  } else {
436  parsec_context_at_fini(unregister_parsec_tags, nullptr);
437  }
438 #if defined(PARSEC_PROF_TRACE)
439  if(nullptr != profiling_array) {
440  free(profiling_array);
441  profiling_array = nullptr;
442  profiling_array_size = 0;
443  }
444 #endif
445  if (own_ctx) parsec_fini(&ctx);
446  mark_invalid();
447  }
448  }
449 
450  ttg::Edge<> &ctl_edge() { return m_ctl_edge; }
451 
452  const ttg::Edge<> &ctl_edge() const { return m_ctl_edge; }
453 
454  void increment_created() { taskpool()->tdm.module->taskpool_addto_nb_tasks(taskpool(), 1); }
455 
456  void increment_inflight_msg() { taskpool()->tdm.module->taskpool_addto_runtime_actions(taskpool(), 1); }
457  void decrement_inflight_msg() { taskpool()->tdm.module->taskpool_addto_runtime_actions(taskpool(), -1); }
458 
459  bool dag_profiling() override { return _dag_profiling; }
460 
461  virtual void dag_on(const std::string &filename) override {
462 #if defined(PARSEC_PROF_GRAPHER)
463  if(!_dag_profiling) {
464  profile_on();
465  size_t len = strlen(filename.c_str())+32;
466  char ext_filename[len];
467  snprintf(ext_filename, len, "%s-%d.dot", filename.c_str(), rank());
468  parsec_prof_grapher_init(ctx, ext_filename);
469  _dag_profiling = true;
470  }
471 #else
472  ttg::print("Error: requested to create '", filename, "' to create a DAG of tasks,\n"
473  "but PaRSEC does not support graphing options. Reconfigure with PARSEC_PROF_GRAPHER=ON\n");
474 #endif
475  }
476 
477  virtual void dag_off() override {
478 #if defined(PARSEC_PROF_GRAPHER)
479  if(_dag_profiling) {
480  parsec_prof_grapher_fini();
481  _dag_profiling = false;
482  }
483 #endif
484  }
485 
486  virtual void profile_off() override {
487 #if defined(PARSEC_PROF_TRACE)
488  _task_profiling = false;
489 #endif
490  }
491 
492  virtual void profile_on() override {
493 #if defined(PARSEC_PROF_TRACE)
494  _task_profiling = true;
495 #endif
496  }
497 
498  virtual bool profiling() override { return _task_profiling; }
499 
501  return mpi_space_support[static_cast<std::size_t>(space)];
502  }
503 
504  virtual void final_task() override {
505 #ifdef TTG_USE_USER_TERMDET
506  if(parsec_taskpool_started) {
507  taskpool()->tdm.module->taskpool_set_nb_tasks(taskpool(), 0);
508  parsec_taskpool_started = false;
509  }
510 #endif // TTG_USE_USER_TERMDET
511  }
512 
513  template <typename keyT, typename output_terminalsT, typename derivedT, typename input_valueTs = ttg::typelist<>>
515 #if defined(PARSEC_PROF_TRACE)
516  std::stringstream ss;
517  build_composite_name_rec(t->ttg_ptr(), ss);
518  ss << t->get_name();
519  register_new_profiling_event(ss.str().c_str(), t->get_instance_id());
520 #endif
521  }
522 
523  protected:
524 #if defined(PARSEC_PROF_TRACE)
525  void build_composite_name_rec(const ttg::TTBase *t, std::stringstream &ss) {
526  if(nullptr == t)
527  return;
528  build_composite_name_rec(t->ttg_ptr(), ss);
529  ss << t->get_name() << "::";
530  }
531 
532  void register_new_profiling_event(const char *name, int position) {
533  if(2*position >= profiling_array_size) {
534  size_t new_profiling_array_size = 64 * ((2*position + 63)/64 + 1);
535  profiling_array = (int*)realloc((void*)profiling_array,
536  new_profiling_array_size * sizeof(int));
537  memset((void*)&profiling_array[profiling_array_size], 0, sizeof(int)*(new_profiling_array_size - profiling_array_size));
538  profiling_array_size = new_profiling_array_size;
539  tpool->profiling_array = profiling_array;
540  }
541 
542  assert(0 == tpool->profiling_array[2*position]);
543  assert(0 == tpool->profiling_array[2*position+1]);
544  // TODO PROFILING: 0 and NULL should be replaced with something that depends on the key human-readable serialization...
545  // Typically, we would put something like 3*sizeof(int32_t), "m{int32_t};n{int32_t};k{int32_t}" to say
546  // there are three fields, named m, n and k, stored in this order, and each of size int32_t
547  parsec_profiling_add_dictionary_keyword(name, "fill:000000", 64, "key{char[64]}",
548  (int*)&tpool->profiling_array[2*position],
549  (int*)&tpool->profiling_array[2*position+1]);
550  }
551 #endif
552 
553  virtual void fence_impl(void) override {
554  int rank = this->rank();
555  if (!parsec_taskpool_started) {
556  ttg::trace("ttg_parsec::(", rank, "): parsec taskpool has not been started, fence is a simple MPI_Barrier");
557  MPI_Barrier(comm());
558  return;
559  }
560  ttg::trace("ttg_parsec::(", rank, "): parsec taskpool is ready for completion");
561  // We are locally ready (i.e. we won't add new tasks)
562  tpool->tdm.module->taskpool_addto_runtime_actions(tpool, -1);
563  ttg::trace("ttg_parsec(", rank, "): waiting for completion");
564  parsec_taskpool_wait(tpool);
565 
566  // We need the synchronization between the end of the context and the restart of the taskpool
567  // until we use parsec_taskpool_wait and implement an epoch in the PaRSEC taskpool
568  // see Issue #118 (TTG)
569  MPI_Barrier(comm());
570 
571  destroy_tpool();
572  create_tpool();
573  execute();
574  }
575 
576  private:
577  parsec_context_t *ctx = nullptr;
578  bool own_ctx = false; //< whether I own the context
579  parsec_taskpool_t *tpool = nullptr;
580  bool parsec_taskpool_started = false;
581 #if defined(PARSEC_PROF_TRACE)
582  int *profiling_array;
583  std::size_t profiling_array_size;
584 #endif
585  };
586 
587  static void unregister_parsec_tags(void *_pidx)
588  {
589  if(NULL != parsec_ce.tag_unregister) {
590  parsec_ce.tag_unregister(WorldImpl::parsec_ttg_tag());
591  parsec_ce.tag_unregister(WorldImpl::parsec_ttg_rma_tag());
592  }
593  }
594 
595  namespace detail {
596 
597  const parsec_symbol_t parsec_taskclass_param0 = {
598  .flags = PARSEC_SYMBOL_IS_STANDALONE|PARSEC_SYMBOL_IS_GLOBAL,
599  .name = "HASH0",
600  .context_index = 0,
601  .min = nullptr,
602  .max = nullptr,
603  .expr_inc = nullptr,
604  .cst_inc = 0 };
605  const parsec_symbol_t parsec_taskclass_param1 = {
606  .flags = PARSEC_SYMBOL_IS_STANDALONE|PARSEC_SYMBOL_IS_GLOBAL,
607  .name = "HASH1",
608  .context_index = 1,
609  .min = nullptr,
610  .max = nullptr,
611  .expr_inc = nullptr,
612  .cst_inc = 0 };
613  const parsec_symbol_t parsec_taskclass_param2 = {
614  .flags = PARSEC_SYMBOL_IS_STANDALONE|PARSEC_SYMBOL_IS_GLOBAL,
615  .name = "KEY0",
616  .context_index = 2,
617  .min = nullptr,
618  .max = nullptr,
619  .expr_inc = nullptr,
620  .cst_inc = 0 };
621  const parsec_symbol_t parsec_taskclass_param3 = {
622  .flags = PARSEC_SYMBOL_IS_STANDALONE|PARSEC_SYMBOL_IS_GLOBAL,
623  .name = "KEY1",
624  .context_index = 3,
625  .min = nullptr,
626  .max = nullptr,
627  .expr_inc = nullptr,
628  .cst_inc = 0 };
629 
631  ttg_data_copy_t *res = nullptr;
632  if (task == nullptr || ptr == nullptr) {
633  return res;
634  }
635  for (int i = 0; i < task->data_count; ++i) {
636  auto copy = static_cast<ttg_data_copy_t *>(task->copies[i]);
637  if (NULL != copy && copy->get_ptr() == ptr) {
638  res = copy;
639  break;
640  }
641  }
642  return res;
643  }
644 
645  inline int find_index_of_copy_in_task(parsec_ttg_task_base_t *task, const void *ptr) {
646  int i = -1;
647  if (task == nullptr || ptr == nullptr) {
648  return i;
649  }
650  for (i = 0; i < task->data_count; ++i) {
651  auto copy = static_cast<ttg_data_copy_t *>(task->copies[i]);
652  if (NULL != copy && copy->get_ptr() == ptr) {
653  return i;
654  }
655  }
656  return -1;
657  }
658 
660  if (task == nullptr || copy == nullptr) {
661  return false;
662  }
663 
664  if (MAX_PARAM_COUNT < task->data_count) {
665  throw std::logic_error("Too many data copies, check MAX_PARAM_COUNT!");
666  }
667 
668  task->copies[task->data_count] = copy;
669  task->data_count++;
670  return true;
671  }
672 
674  int i;
675  /* find and remove entry; copies are usually appended and removed, so start from back */
676  for (i = task->data_count-1; i >= 0; --i) {
677  if (copy == task->copies[i]) {
678  break;
679  }
680  }
681  if (i < 0) return;
682  /* move all following elements one up */
683  for (; i < task->data_count - 1; ++i) {
684  task->copies[i] = task->copies[i + 1];
685  }
686  /* null last element */
687  task->copies[i] = nullptr;
688  task->data_count--;
689  }
690 
691 #if defined(TTG_PARSEC_DEBUG_TRACK_DATA_COPIES)
692 #warning "ttg::PaRSEC enables data copy tracking"
693  static std::unordered_set<ttg_data_copy_t *> pending_copies;
694  static std::mutex pending_copies_mutex;
695 #endif
696 #if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND)
697  static int64_t parsec_ttg_data_copy_uid = 0;
698 #endif
699 
700  template <typename Value>
701  inline ttg_data_copy_t *create_new_datacopy(Value &&value) {
702  using value_type = std::decay_t<Value>;
703  ttg_data_copy_t *copy;
704  if constexpr (std::is_base_of_v<ttg::TTValue<value_type>, value_type>) {
705  copy = new value_type(std::forward<Value>(value));
706  } else if constexpr (std::is_rvalue_reference_v<decltype(value)> ||
707  std::is_copy_constructible_v<std::decay_t<Value>>) {
708  copy = new ttg_data_value_copy_t<value_type>(std::forward<Value>(value));
709  } else {
710  throw std::logic_error("Trying to copy-construct data that is not copy-constructible!");
711  }
712 #if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND)
713  // Keep track of additional memory usage
714  if(ttg::default_execution_context().impl().profiling()) {
715  copy->size = sizeof(Value);
716  copy->uid = parsec_atomic_fetch_inc_int64(&parsec_ttg_data_copy_uid);
717  parsec_profiling_ts_trace_flags(ttg::default_execution_context().impl().parsec_ttg_profile_backend_allocate_datacopy,
718  static_cast<uint64_t>(copy->uid),
719  PROFILE_OBJECT_ID_NULL, &copy->size,
720  PARSEC_PROFILING_EVENT_COUNTER|PARSEC_PROFILING_EVENT_HAS_INFO);
721  }
722 #endif
723 #if defined(TTG_PARSEC_DEBUG_TRACK_DATA_COPIES)
724  {
725  const std::lock_guard<std::mutex> lock(pending_copies_mutex);
726  auto rc = pending_copies.insert(copy);
727  assert(std::get<1>(rc));
728  }
729 #endif
730  return copy;
731  }
732 
733 #if 0
734  template <std::size_t... IS, typename Key = keyT>
735  void invoke_pull_terminals(std::index_sequence<IS...>, const Key &key, detail::parsec_ttg_task_base_t *task) {
736  int junk[] = {0, (invoke_pull_terminal<IS>(
737  std::get<IS>(input_terminals), key, task),
738  0)...};
739  junk[0]++;
740  }
741 #endif // 0
742 
743  template<typename TT, std::size_t I>
744  inline void transfer_ownership_impl(ttg_data_copy_t *copy, int device) {
745  if constexpr(!std::is_const_v<std::tuple_element_t<I, typename TT::input_values_tuple_type>>) {
746  copy->transfer_ownership(PARSEC_FLOW_ACCESS_RW, device);
747  copy->inc_current_version();
748  }
749  }
750 
751  template<typename TT, std::size_t... Is>
752  inline void transfer_ownership(parsec_ttg_task_t<TT> *me, int device, std::index_sequence<Is...>) {
753  /* transfer ownership of each data */
754  int junk[] = {0, (transfer_ownership_impl<TT, Is>(me->copies[Is], device), 0)...};
755  junk[0]++;
756  }
757 
758  template<typename TT>
759  inline parsec_hook_return_t hook(struct parsec_execution_stream_s *es, parsec_task_t *parsec_task) {
760  parsec_ttg_task_t<TT> *me = (parsec_ttg_task_t<TT> *)parsec_task;
761  if constexpr(std::tuple_size_v<typename TT::input_values_tuple_type> > 0) {
762  transfer_ownership<TT>(me, 0, std::make_index_sequence<std::tuple_size_v<typename TT::input_values_tuple_type>>{});
763  }
764  return me->template invoke_op<ttg::ExecutionSpace::Host>();
765  }
766 
767  template<typename TT>
768  inline parsec_hook_return_t hook_cuda(struct parsec_execution_stream_s *es, parsec_task_t *parsec_task) {
769  if constexpr(TT::derived_has_cuda_op()) {
770  parsec_ttg_task_t<TT> *me = (parsec_ttg_task_t<TT> *)parsec_task;
771  return me->template invoke_op<ttg::ExecutionSpace::CUDA>();
772  } else {
773  std::cerr << "CUDA hook called without having a CUDA op!" << std::endl;
774  return PARSEC_HOOK_RETURN_ERROR;
775  }
776  }
777 
778  template<typename TT>
779  inline parsec_hook_return_t hook_hip(struct parsec_execution_stream_s *es, parsec_task_t *parsec_task) {
780  if constexpr(TT::derived_has_hip_op()) {
781  parsec_ttg_task_t<TT> *me = (parsec_ttg_task_t<TT> *)parsec_task;
782  return me->template invoke_op<ttg::ExecutionSpace::HIP>();
783  } else {
784  std::cerr << "HIP hook called without having a HIP op!" << std::endl;
785  return PARSEC_HOOK_RETURN_ERROR;
786  }
787  }
788 
789  template<typename TT>
790  inline parsec_hook_return_t hook_level_zero(struct parsec_execution_stream_s *es, parsec_task_t *parsec_task) {
791  if constexpr(TT::derived_has_level_zero_op()) {
792  parsec_ttg_task_t<TT> *me = (parsec_ttg_task_t<TT> *)parsec_task;
793  return me->template invoke_op<ttg::ExecutionSpace::L0>();
794  } else {
795  std::cerr << "L0 hook called without having a L0 op!" << std::endl;
796  return PARSEC_HOOK_RETURN_ERROR;
797  }
798  }
799 
800 
801  template<typename TT>
802  inline parsec_hook_return_t evaluate_cuda(const parsec_task_t *parsec_task) {
803  if constexpr(TT::derived_has_cuda_op()) {
804  parsec_ttg_task_t<TT> *me = (parsec_ttg_task_t<TT> *)parsec_task;
805  return me->template invoke_evaluate<ttg::ExecutionSpace::CUDA>();
806  } else {
807  return PARSEC_HOOK_RETURN_NEXT;
808  }
809  }
810 
811  template<typename TT>
812  inline parsec_hook_return_t evaluate_hip(const parsec_task_t *parsec_task) {
813  if constexpr(TT::derived_has_hip_op()) {
814  parsec_ttg_task_t<TT> *me = (parsec_ttg_task_t<TT> *)parsec_task;
815  return me->template invoke_evaluate<ttg::ExecutionSpace::HIP>();
816  } else {
817  return PARSEC_HOOK_RETURN_NEXT;
818  }
819  }
820 
821  template<typename TT>
822  inline parsec_hook_return_t evaluate_level_zero(const parsec_task_t *parsec_task) {
823  if constexpr(TT::derived_has_level_zero_op()) {
824  parsec_ttg_task_t<TT> *me = (parsec_ttg_task_t<TT> *)parsec_task;
825  return me->template invoke_evaluate<ttg::ExecutionSpace::L0>();
826  } else {
827  return PARSEC_HOOK_RETURN_NEXT;
828  }
829  }
830 
831 
832  template <typename KeyT, typename ActivationCallbackT>
834  std::vector<KeyT> _keylist;
835  std::atomic<int> _outstanding_transfers;
836  ActivationCallbackT _cb;
838 
839  public:
840  rma_delayed_activate(std::vector<KeyT> &&key, detail::ttg_data_copy_t *copy, int num_transfers, ActivationCallbackT cb)
841  : _keylist(std::move(key)), _outstanding_transfers(num_transfers), _cb(cb), _copy(copy) {}
842 
843  bool complete_transfer(void) {
844  int left = --_outstanding_transfers;
845  if (0 == left) {
846  _cb(std::move(_keylist), _copy);
847  return true;
848  }
849  return false;
850  }
851  };
852 
853  template <typename ActivationT>
854  static int get_complete_cb(parsec_comm_engine_t *comm_engine, parsec_ce_mem_reg_handle_t lreg, ptrdiff_t ldispl,
855  parsec_ce_mem_reg_handle_t rreg, ptrdiff_t rdispl, size_t size, int remote,
856  void *cb_data) {
857  parsec_ce.mem_unregister(&lreg);
858  ActivationT *activation = static_cast<ActivationT *>(cb_data);
859  if (activation->complete_transfer()) {
860  delete activation;
861  }
862  return PARSEC_SUCCESS;
863  }
864 
865  static int get_remote_complete_cb(parsec_comm_engine_t *ce, parsec_ce_tag_t tag, void *msg, size_t msg_size,
866  int src, void *cb_data) {
867  std::intptr_t *fn_ptr = static_cast<std::intptr_t *>(msg);
868  std::function<void(void)> *fn = reinterpret_cast<std::function<void(void)> *>(*fn_ptr);
869  (*fn)();
870  delete fn;
871  return PARSEC_SUCCESS;
872  }
873 
874  template <typename FuncT>
875  static int invoke_get_remote_complete_cb(parsec_comm_engine_t *ce, parsec_ce_tag_t tag, void *msg, size_t msg_size,
876  int src, void *cb_data) {
877  std::intptr_t *iptr = static_cast<std::intptr_t *>(msg);
878  FuncT *fn_ptr = reinterpret_cast<FuncT *>(*iptr);
879  (*fn_ptr)();
880  delete fn_ptr;
881  return PARSEC_SUCCESS;
882  }
883 
884  inline void release_data_copy(ttg_data_copy_t *copy) {
885  if (copy->is_mutable() && nullptr == copy->get_next_task()) {
886  /* current task mutated the data but there are no consumers so prepare
887  * the copy to be freed below */
888  copy->reset_readers();
889  }
890 
891  int32_t readers = copy->num_readers();
892  if (readers > 1) {
893  /* potentially more than one reader, decrement atomically */
894  readers = copy->decrement_readers();
895  } else if (readers == 1) {
896  /* make sure readers drop to zero */
897  readers = copy->decrement_readers<false>();
898  }
899  /* if there was only one reader (the current task) or
900  * a mutable copy and a successor, we release the copy */
901  if (1 == readers || readers == copy->mutable_tag) {
902  std::atomic_thread_fence(std::memory_order_acquire);
903  if (nullptr != copy->get_next_task()) {
904  /* Release the deferred task.
905  * The copy was mutable and will be mutated by the released task,
906  * so simply transfer ownership.
907  */
908  parsec_task_t *next_task = copy->get_next_task();
909  copy->set_next_task(nullptr);
910  parsec_ttg_task_base_t *deferred_op = (parsec_ttg_task_base_t *)next_task;
911  copy->mark_mutable();
912  deferred_op->release_task();
913  } else if ((1 == copy->num_ref()) || (1 == copy->drop_ref())) {
914  /* we are the last reference, delete the copy */
915 #if defined(TTG_PARSEC_DEBUG_TRACK_DATA_COPIES)
916  {
917  const std::lock_guard<std::mutex> lock(pending_copies_mutex);
918  size_t rc = pending_copies.erase(copy);
919  assert(1 == rc);
920  }
921 #endif
922 #if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND)
923  // Keep track of additional memory usage
924  if(ttg::default_execution_context().impl().profiling()) {
925  parsec_profiling_ts_trace_flags(ttg::default_execution_context().impl().parsec_ttg_profile_backend_free_datacopy,
926  static_cast<uint64_t>(copy->uid),
927  PROFILE_OBJECT_ID_NULL, &copy->size,
928  PARSEC_PROFILING_EVENT_COUNTER|PARSEC_PROFILING_EVENT_HAS_INFO);
929  }
930 #endif
931  delete copy;
932  }
933  }
934  }
935 
936  template <typename Value>
938  ttg_data_copy_t *copy_res = copy_in;
939  bool replace = false;
940  int32_t readers = copy_in->num_readers();
941 
942  assert(readers != 0);
943 
944  if (readonly && !copy_in->is_mutable()) {
945  /* simply increment the number of readers */
946  readers = copy_in->increment_readers();
947  }
948 
949  if (readers == copy_in->mutable_tag) {
950  if (copy_res->get_next_task() != nullptr) {
951  if (readonly) {
952  parsec_ttg_task_base_t *next_task = reinterpret_cast<parsec_ttg_task_base_t *>(copy_res->get_next_task());
953  if (next_task->defer_writer) {
954  /* there is a writer but it signalled that it wants to wait for readers to complete */
955  return copy_res;
956  }
957  }
958  }
959  /* someone is going to write into this copy -> we need to make a copy */
960  copy_res = NULL;
961  if (readonly) {
962  /* we replace the copy in a deferred task if the copy will be mutated by
963  * the deferred task and we are readonly.
964  * That way, we can share the copy with other readonly tasks and release
965  * the deferred task. */
966  replace = true;
967  }
968  } else if (!readonly) {
969  /* this task will mutate the data
970  * check whether there are other readers already and potentially
971  * defer the release of this task to give following readers a
972  * chance to make a copy of the data before this task mutates it
973  *
974  * Try to replace the readers with a negative value that indicates
975  * the value is mutable. If that fails we know that there are other
976  * readers or writers already.
977  *
978  * NOTE: this check is not atomic: either there is a single reader
979  * (current task) or there are others, in which we case won't
980  * touch it.
981  */
982  if (1 == copy_in->num_readers() && !task->defer_writer) {
987  assert(nullptr == copy_in->get_next_task());
988  copy_in->set_next_task(&task->parsec_task);
989  std::atomic_thread_fence(std::memory_order_release);
990  copy_in->mark_mutable();
991  } else {
992  if (task->defer_writer && nullptr == copy_in->get_next_task()) {
993  /* we're the first writer and want to wait for all readers to complete */
994  copy_res->set_next_task(&task->parsec_task);
995  } else {
996  /* there are writers and/or waiting already of this copy already, make a copy that we can mutate */
997  copy_res = NULL;
998  }
999  }
1000  }
1001 
1002  if (NULL == copy_res) {
1003  // can only make a copy if Value is copy-constructible ... so this codepath should never be hit
1004  if constexpr (std::is_copy_constructible_v<std::decay_t<Value>>) {
1005  ttg_data_copy_t *new_copy = detail::create_new_datacopy(*static_cast<Value *>(copy_in->get_ptr()));
1006  if (replace && nullptr != copy_in->get_next_task()) {
1007  /* replace the task that was deferred */
1008  parsec_ttg_task_base_t *deferred_op = (parsec_ttg_task_base_t *)copy_in->get_next_task();
1009  new_copy->mark_mutable();
1010  /* replace the copy in the deferred task */
1011  for (int i = 0; i < deferred_op->data_count; ++i) {
1012  if (deferred_op->copies[i] == copy_in) {
1013  deferred_op->copies[i] = new_copy;
1014  break;
1015  }
1016  }
1017  copy_in->set_next_task(nullptr);
1018  deferred_op->release_task();
1019  copy_in->reset_readers(); // set the copy back to being read-only
1020  copy_in->increment_readers<false>(); // register as reader
1021  copy_res = copy_in; // return the copy we were passed
1022  } else {
1023  if (!readonly) {
1024  new_copy->mark_mutable();
1025  }
1026  copy_res = new_copy; // return the new copy
1027  }
1028  }
1029  else {
1030  throw std::logic_error(std::string("TTG::PaRSEC: need to copy a datum of type") + boost::typeindex::type_id<std::decay_t<Value>>().pretty_name() + " but the type is not copyable");
1031  }
1032  }
1033  return copy_res;
1034  }
1035 
1036  } // namespace detail
1037 
1038  inline void ttg_initialize(int argc, char **argv, int num_threads, parsec_context_t *ctx) {
1039  if (detail::initialized_mpi()) throw std::runtime_error("ttg_parsec::ttg_initialize: can only be called once");
1040 
1041  // make sure it's not already initialized
1042  int mpi_initialized;
1043  MPI_Initialized(&mpi_initialized);
1044  if (!mpi_initialized) { // MPI not initialized? do it, remember that we did it
1045  int provided;
1046  MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided);
1047  if (!provided)
1048  throw std::runtime_error("ttg_parsec::ttg_initialize: MPI_Init_thread did not provide MPI_THREAD_MULTIPLE");
1049  detail::initialized_mpi() = true;
1050  } else { // no way to test that MPI was initialized with MPI_THREAD_MULTIPLE, cross fingers and proceed
1051  }
1052 
1054  auto world_ptr = new ttg_parsec::WorldImpl{&argc, &argv, num_threads, ctx};
1055  std::shared_ptr<ttg::base::WorldImplBase> world_sptr{static_cast<ttg::base::WorldImplBase *>(world_ptr)};
1056  ttg::World world{std::move(world_sptr)};
1057  ttg::detail::set_default_world(std::move(world));
1058 
1059  // query the first device ID
1061  for (int i = 0; i < parsec_nb_devices; ++i) {
1062  bool is_gpu = parsec_mca_device_is_gpu(i);
1063  if (detail::first_device_id == -1 && is_gpu) {
1065  } else if (detail::first_device_id > -1 && !is_gpu) {
1066  throw std::runtime_error("PaRSEC: Found non-GPU device in GPU ID range!");
1067  }
1068  }
1069 
1070  /* parse the maximum inline size */
1071  const char* ttg_max_inline_cstr = std::getenv("TTG_MAX_INLINE");
1072  if (nullptr != ttg_max_inline_cstr) {
1073  std::size_t inline_size = std::atol(ttg_max_inline_cstr);
1074  if (inline_size < detail::max_inline_size) {
1075  detail::max_inline_size = inline_size;
1076  }
1077  }
1078  }
1079  inline void ttg_finalize() {
1080  // We need to notify the current taskpool of termination if we are in user termination detection mode
1081  // or the parsec_context_wait() in destroy_worlds() will never complete
1083  ttg::default_execution_context().impl().final_task();
1084  ttg::detail::set_default_world(ttg::World{}); // reset the default world
1086  ttg::detail::destroy_worlds<ttg_parsec::WorldImpl>();
1087  if (detail::initialized_mpi()) MPI_Finalize();
1088  }
1090  [[noreturn]]
1091  inline void ttg_abort() { MPI_Abort(ttg_default_execution_context().impl().comm(), 1); std::abort(); }
1092  inline void ttg_execute(ttg::World world) { world.impl().execute(); }
1093  inline void ttg_fence(ttg::World world) { world.impl().fence(); }
1094 
1095  template <typename T>
1096  inline void ttg_register_ptr(ttg::World world, const std::shared_ptr<T> &ptr) {
1097  world.impl().register_ptr(ptr);
1098  }
1099 
1100  template <typename T>
1101  inline void ttg_register_ptr(ttg::World world, std::unique_ptr<T> &&ptr) {
1102  world.impl().register_ptr(std::move(ptr));
1103  }
1104 
1105  inline void ttg_register_status(ttg::World world, const std::shared_ptr<std::promise<void>> &status_ptr) {
1106  world.impl().register_status(status_ptr);
1107  }
1108 
1109  template <typename Callback>
1110  inline void ttg_register_callback(ttg::World world, Callback &&callback) {
1111  world.impl().register_callback(std::forward<Callback>(callback));
1112  }
1113 
1114  inline ttg::Edge<> &ttg_ctl_edge(ttg::World world) { return world.impl().ctl_edge(); }
1115 
1116  inline void ttg_sum(ttg::World world, double &value) {
1117  double result = 0.0;
1118  MPI_Allreduce(&value, &result, 1, MPI_DOUBLE, MPI_SUM, world.impl().comm());
1119  value = result;
1120  }
1121 
1122  inline void make_executable_hook(ttg::World& world) {
1123  MPI_Barrier(world.impl().comm());
1124  }
1125 
1128  template <typename T>
1129  void ttg_broadcast(::ttg::World world, T &data, int source_rank) {
1130  int64_t BUFLEN;
1131  if (world.rank() == source_rank) {
1133  }
1134  MPI_Bcast(&BUFLEN, 1, MPI_INT64_T, source_rank, world.impl().comm());
1135 
1136  unsigned char *buf = new unsigned char[BUFLEN];
1137  if (world.rank() == source_rank) {
1139  }
1140  MPI_Bcast(buf, BUFLEN, MPI_UNSIGNED_CHAR, source_rank, world.impl().comm());
1141  if (world.rank() != source_rank) {
1143  }
1144  delete[] buf;
1145  }
1146 
1147  namespace detail {
1148 
1149  struct ParsecTTBase {
1150  protected:
1151  // static std::map<int, ParsecBaseTT*> function_id_to_instance;
1152  parsec_hash_table_t tasks_table;
1153  parsec_task_class_t self;
1154  };
1155 
1156  } // namespace detail
1157 
1158  template <typename keyT, typename output_terminalsT, typename derivedT, typename input_valueTs>
1160  private:
1162  static_assert(ttg::meta::is_typelist_v<input_valueTs>,
1163  "The fourth template for ttg::TT must be a ttg::typelist containing the input types");
1164  // create a virtual control input if the input list is empty, to be used in invoke()
1165  using actual_input_tuple_type = std::conditional_t<!ttg::meta::typelist_is_empty_v<input_valueTs>,
1167  using input_tuple_type = ttg::meta::typelist_to_tuple_t<input_valueTs>;
1168  static_assert(ttg::meta::is_tuple_v<output_terminalsT>,
1169  "Second template argument for ttg::TT must be std::tuple containing the output terminal types");
1170  static_assert((ttg::meta::none_has_reference_v<input_valueTs>), "Input typelist cannot contain reference types");
1171  static_assert(ttg::meta::is_none_Void_v<input_valueTs>, "ttg::Void is for internal use only, do not use it");
1172 
1173  parsec_mempool_t mempools;
1174 
1175  // check for a non-type member named have_cuda_op
1176  template <typename T>
1177  using have_cuda_op_non_type_t = decltype(T::have_cuda_op);
1178 
1179  template <typename T>
1180  using have_hip_op_non_type_t = decltype(T::have_hip_op);
1181 
1182  template <typename T>
1183  using have_level_zero_op_non_type_t = decltype(T::have_level_zero_op);
1184 
1185  bool alive = true;
1186 
1187  static constexpr int numinedges = std::tuple_size_v<input_tuple_type>; // number of input edges
1188  static constexpr int numins = std::tuple_size_v<actual_input_tuple_type>; // number of input arguments
1189  static constexpr int numouts = std::tuple_size_v<output_terminalsT>; // number of outputs
1190  static constexpr int numflows = std::max(numins, numouts); // max number of flows
1191 
1192  public:
1194  static constexpr bool derived_has_cuda_op() {
1195  if constexpr (ttg::meta::is_detected_v<have_cuda_op_non_type_t, derivedT>) {
1196  return derivedT::have_cuda_op;
1197  } else {
1198  return false;
1199  }
1200  }
1201 
1203  static constexpr bool derived_has_hip_op() {
1204  if constexpr (ttg::meta::is_detected_v<have_hip_op_non_type_t, derivedT>) {
1205  return derivedT::have_hip_op;
1206  } else {
1207  return false;
1208  }
1209  }
1210 
1212  static constexpr bool derived_has_level_zero_op() {
1213  if constexpr (ttg::meta::is_detected_v<have_level_zero_op_non_type_t, derivedT>) {
1214  return derivedT::have_level_zero_op;
1215  } else {
1216  return false;
1217  }
1218  }
1219 
1221  static constexpr bool derived_has_device_op() {
1223  }
1224 
1225  using ttT = TT;
1226  using key_type = keyT;
1228  using input_args_type = actual_input_tuple_type;
1230  // if have data inputs and (always last) control input, convert last input to Void to make logic easier
1232  ttg::meta::void_to_Void_tuple_t<ttg::meta::decayed_typelist_t<actual_input_tuple_type>>;
1234  ttg::meta::add_glvalue_reference_tuple_t<ttg::meta::void_to_Void_tuple_t<actual_input_tuple_type>>;
1235  using input_values_tuple_type = ttg::meta::drop_void_t<ttg::meta::decayed_typelist_t<input_tuple_type>>;
1236  using input_refs_tuple_type = ttg::meta::drop_void_t<ttg::meta::add_glvalue_reference_tuple_t<input_tuple_type>>;
1237 
1238  static constexpr int numinvals =
1239  std::tuple_size_v<input_refs_tuple_type>; // number of input arguments with values (i.e. omitting the control
1240  // input, if any)
1241 
1242  using output_terminals_type = output_terminalsT;
1244 
1245  template <std::size_t i, typename resultT, typename InTuple>
1246  static resultT get(InTuple &&intuple) {
1247  return static_cast<resultT>(std::get<i>(std::forward<InTuple>(intuple)));
1248  };
1249  template <std::size_t i, typename InTuple>
1250  static auto &get(InTuple &&intuple) {
1251  return std::get<i>(std::forward<InTuple>(intuple));
1252  };
1253 
1254  private:
1255  using task_t = detail::parsec_ttg_task_t<ttT>;
1256 
1258  friend task_t;
1259 
1260  /* the offset of the key placed after the task structure in the memory from mempool */
1261  constexpr static const size_t task_key_offset = sizeof(task_t);
1262 
1263  input_terminals_type input_terminals;
1264  output_terminalsT output_terminals;
1265 
1266  protected:
1267  const auto &get_output_terminals() const { return output_terminals; }
1268 
1269  private:
1270  template <std::size_t... IS>
1271  static constexpr auto make_set_args_fcts(std::index_sequence<IS...>) {
1272  using resultT = decltype(set_arg_from_msg_fcts);
1273  return resultT{{&TT::set_arg_from_msg<IS>...}};
1274  }
1275  constexpr static std::array<void (TT::*)(void *, std::size_t), numins> set_arg_from_msg_fcts =
1276  make_set_args_fcts(std::make_index_sequence<numins>{});
1277 
1278  template <std::size_t... IS>
1279  static constexpr auto make_set_size_fcts(std::index_sequence<IS...>) {
1280  using resultT = decltype(set_argstream_size_from_msg_fcts);
1281  return resultT{{&TT::argstream_set_size_from_msg<IS>...}};
1282  }
1283  constexpr static std::array<void (TT::*)(void *, std::size_t), numins> set_argstream_size_from_msg_fcts =
1284  make_set_size_fcts(std::make_index_sequence<numins>{});
1285 
1286  template <std::size_t... IS>
1287  static constexpr auto make_finalize_argstream_fcts(std::index_sequence<IS...>) {
1288  using resultT = decltype(finalize_argstream_from_msg_fcts);
1289  return resultT{{&TT::finalize_argstream_from_msg<IS>...}};
1290  }
1291  constexpr static std::array<void (TT::*)(void *, std::size_t), numins> finalize_argstream_from_msg_fcts =
1292  make_finalize_argstream_fcts(std::make_index_sequence<numins>{});
1293 
1294  template <std::size_t... IS>
1295  static constexpr auto make_get_from_pull_fcts(std::index_sequence<IS...>) {
1296  using resultT = decltype(get_from_pull_msg_fcts);
1297  return resultT{{&TT::get_from_pull_msg<IS>...}};
1298  }
1299  constexpr static std::array<void (TT::*)(void *, std::size_t), numinedges> get_from_pull_msg_fcts =
1300  make_get_from_pull_fcts(std::make_index_sequence<numinedges>{});
1301 
1302  template<std::size_t... IS>
1303  constexpr static auto make_input_is_const(std::index_sequence<IS...>) {
1304  using resultT = decltype(input_is_const);
1305  return resultT{{std::is_const_v<std::tuple_element_t<IS, input_args_type>>...}};
1306  }
1307  constexpr static std::array<bool, numins> input_is_const = make_input_is_const(std::make_index_sequence<numins>{});
1308 
1309  ttg::World world;
1310  ttg::meta::detail::keymap_t<keyT> keymap;
1311  ttg::meta::detail::keymap_t<keyT> priomap;
1312  ttg::meta::detail::keymap_t<keyT, ttg::device::Device> devicemap;
1313  // For now use same type for unary/streaming input terminals, and stream reducers assigned at runtime
1314  ttg::meta::detail::input_reducers_t<actual_input_tuple_type>
1315  input_reducers;
1316  std::array<parsec_task_class_t*, numins> inpute_reducers_taskclass = { nullptr };
1317  std::array<std::size_t, numins> static_stream_goal = { std::numeric_limits<std::size_t>::max() };
1318  int num_pullins = 0;
1319 
1320  bool m_defer_writer = TTG_PARSEC_DEFER_WRITER;
1321 
1322  public:
1323  ttg::World get_world() const override final { return world; }
1324 
1325  private:
1329  template <ttg::ExecutionSpace Space, typename... Args>
1330  auto op(Args &&...args) {
1331  derivedT *derived = static_cast<derivedT *>(this);
1332  //if constexpr (Space == ttg::ExecutionSpace::Host) {
1333  using return_type = decltype(derived->op(std::forward<Args>(args)...));
1334  if constexpr (std::is_same_v<return_type,void>) {
1335  derived->op(std::forward<Args>(args)...);
1336  return;
1337  }
1338  else {
1339  return derived->op(std::forward<Args>(args)...);
1340  }
1341  }
1342 
1343  template <std::size_t i, typename terminalT, typename Key>
1344  void invoke_pull_terminal(terminalT &in, const Key &key, detail::parsec_ttg_task_base_t *task) {
1345  if (in.is_pull_terminal) {
1346  auto owner = in.container.owner(key);
1347  if (owner != world.rank()) {
1348  get_pull_terminal_data_from<i>(owner, key);
1349  } else {
1350  // push the data to the task
1351  set_arg<i>(key, (in.container).get(key));
1352  }
1353  }
1354  }
1355 
1356  template <std::size_t i, typename Key>
1357  void get_pull_terminal_data_from(const int owner,
1358  const Key &key) {
1359  using msg_t = detail::msg_t;
1360  auto &world_impl = world.impl();
1361  parsec_taskpool_t *tp = world_impl.taskpool();
1362  std::unique_ptr<msg_t> msg = std::make_unique<msg_t>(get_instance_id(), tp->taskpool_id,
1364  world.rank(), 1);
1365  /* pack the key */
1366  size_t pos = 0;
1367  pos = pack(key, msg->bytes, pos);
1368  tp->tdm.module->outgoing_message_start(tp, owner, NULL);
1369  tp->tdm.module->outgoing_message_pack(tp, owner, NULL, NULL, 0);
1370  parsec_ce.send_am(&parsec_ce, world_impl.parsec_ttg_tag(), owner, static_cast<void *>(msg.get()),
1371  sizeof(msg_header_t) + pos);
1372  }
1373 
1374  template <std::size_t... IS, typename Key = keyT>
1375  void invoke_pull_terminals(std::index_sequence<IS...>, const Key &key, detail::parsec_ttg_task_base_t *task) {
1376  int junk[] = {0, (invoke_pull_terminal<IS>(
1377  std::get<IS>(input_terminals), key, task),
1378  0)...};
1379  junk[0]++;
1380  }
1381 
1382  template <std::size_t... IS>
1383  static input_refs_tuple_type make_tuple_of_ref_from_array(task_t *task, std::index_sequence<IS...>) {
1384  return input_refs_tuple_type{static_cast<std::tuple_element_t<IS, input_refs_tuple_type>>(
1385  *reinterpret_cast<std::remove_reference_t<std::tuple_element_t<IS, input_refs_tuple_type>> *>(
1386  task->copies[IS]->get_ptr()))...};
1387  }
1388 
1389 #ifdef TTG_HAVE_DEVICE
1393  template <ttg::ExecutionSpace Space>
1394  static int device_static_submit(parsec_device_gpu_module_t *gpu_device,
1395  parsec_gpu_task_t *gpu_task,
1396  parsec_gpu_exec_stream_t *gpu_stream) {
1397 
1398  task_t *task = (task_t*)gpu_task->ec;
1399  // get the device task from the coroutine handle
1400  ttg::device::Task dev_task = ttg::device::detail::device_task_handle_type::from_address(task->suspended_task_address);
1401 
1402  task->dev_ptr->stream = gpu_stream;
1403 
1404  //std::cout << "device_static_submit task " << task << std::endl;
1405 
1406  // get the promise which contains the views
1407  auto dev_data = dev_task.promise();
1408 
1409  /* we should still be waiting for the transfer to complete */
1410  assert(dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_WAIT_TRANSFER ||
1411  dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_WAIT_KERNEL);
1412 
1413 #if defined(PARSEC_HAVE_DEV_CUDA_SUPPORT) && defined(TTG_HAVE_CUDA)
1414  {
1415  parsec_cuda_exec_stream_t *cuda_stream = (parsec_cuda_exec_stream_t *)gpu_stream;
1416  int device = detail::parsec_device_to_ttg_device(gpu_device->super.device_index);
1417  ttg::device::detail::set_current(device, cuda_stream->cuda_stream);
1418  }
1419 #endif // defined(PARSEC_HAVE_DEV_CUDA_SUPPORT) && defined(TTG_HAVE_CUDA)
1420 
1421 #if defined(PARSEC_HAVE_DEV_HIP_SUPPORT) && defined(TTG_HAVE_HIP)
1422  {
1423  parsec_hip_exec_stream_t *hip_stream = (parsec_hip_exec_stream_t *)gpu_stream;
1424  int device = detail::parsec_device_to_ttg_device(gpu_device->super.device_index);
1425  ttg::device::detail::set_current(device, hip_stream->hip_stream);
1426  }
1427 #endif // defined(PARSEC_HAVE_DEV_CUDA_SUPPORT) && defined(TTG_HAVE_CUDA)
1428 
1429 #if defined(PARSEC_HAVE_DEV_LEVEL_ZERO_SUPPORT) && defined(TTG_HAVE_LEVEL_ZERO)
1430  {
1431  parsec_level_zero_exec_stream_t *stream;
1432  stream = (parsec_level_zero_exec_stream_t *)gpu_stream;
1433  int device = detail::parsec_device_to_ttg_device(gpu_device->super.device_index);
1434  ttg::device::detail::set_current(device, stream->swq->queue);
1435  }
1436 #endif // defined(PARSEC_HAVE_DEV_CUDA_SUPPORT) && defined(TTG_HAVE_CUDA)
1437 
1438  /* Here we call back into the coroutine again after the transfers have completed */
1439  static_op<Space>(&task->parsec_task);
1440 
1441  ttg::device::detail::reset_current();
1442 
1443  /* we will come back into this function once the kernel and transfers are done */
1444  int rc = PARSEC_HOOK_RETURN_DONE;
1445  if (nullptr != task->suspended_task_address) {
1446  /* Get a new handle for the promise*/
1447  dev_task = ttg::device::detail::device_task_handle_type::from_address(task->suspended_task_address);
1448  dev_data = dev_task.promise();
1449 
1450  assert(dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_WAIT_KERNEL ||
1451  dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_SENDOUT ||
1452  dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_COMPLETE);
1453 
1454  if (ttg::device::detail::TTG_DEVICE_CORO_SENDOUT == dev_data.state() ||
1455  ttg::device::detail::TTG_DEVICE_CORO_COMPLETE == dev_data.state()) {
1456  /* the task started sending so we won't come back here */
1457  //std::cout << "device_static_submit task " << task << " complete" << std::endl;
1458  } else {
1459  //std::cout << "device_static_submit task " << task << " return-again" << std::endl;
1460  rc = PARSEC_HOOK_RETURN_AGAIN;
1461  }
1462  } else {
1463  /* the task is done so we won't come back here */
1464  //std::cout << "device_static_submit task " << task << " complete" << std::endl;
1465  }
1466  return rc;
1467  }
1468 
1469  static void
1470  static_device_stage_in(parsec_gpu_task_t *gtask,
1471  uint32_t flow_mask,
1472  parsec_gpu_exec_stream_t *gpu_stream) {
1473  /* register any memory that hasn't been registered yet */
1474  for (int i = 0; i < MAX_PARAM_COUNT; ++i) {
1475  if (flow_mask & (1<<i)) {
1476  task_t *task = (task_t*)gtask->ec;
1477  parsec_data_copy_t *copy = task->parsec_task.data[i].data_in;
1478  if (0 == (copy->flags & TTG_PARSEC_DATA_FLAG_REGISTERED)) {
1479 #if defined(PARSEC_HAVE_DEV_CUDA_SUPPORT)
1480  // register host memory for faster device access
1481  cudaError_t status;
1482  //status = cudaHostRegister(copy->device_private, gtask->flow_nb_elts[i], cudaHostRegisterPortable);
1483  //assert(cudaSuccess == status);
1484 #endif // PARSEC_HAVE_DEV_CUDA_SUPPORT
1485  //copy->flags |= TTG_PARSEC_DATA_FLAG_REGISTERED;
1486  }
1487  }
1488  }
1489  }
1490 
1491  static int
1492  static_device_stage_in_hook(parsec_gpu_task_t *gtask,
1493  uint32_t flow_mask,
1494  parsec_gpu_exec_stream_t *gpu_stream) {
1495  static_device_stage_in(gtask, flow_mask, gpu_stream);
1496  return parsec_default_gpu_stage_in(gtask, flow_mask, gpu_stream);
1497  }
1498 
1499  template <ttg::ExecutionSpace Space>
1500  static parsec_hook_return_t device_static_evaluate(parsec_task_t* parsec_task) {
1501 
1502  task_t *task = (task_t*)parsec_task;
1503  if (task->dev_ptr->gpu_task == nullptr) {
1504 
1505  /* set up a device task */
1506  parsec_gpu_task_t *gpu_task;
1507  /* PaRSEC wants to free the gpu_task, because F***K ownerships */
1508  gpu_task = static_cast<parsec_gpu_task_t*>(std::calloc(1, sizeof(*gpu_task)));
1509  PARSEC_OBJ_CONSTRUCT(gpu_task, parsec_list_item_t);
1510  gpu_task->ec = parsec_task;
1511  gpu_task->task_type = 0; // user task
1512  gpu_task->last_data_check_epoch = -1; // used internally
1513  gpu_task->pushout = 0;
1514  gpu_task->submit = &TT::device_static_submit<Space>;
1515 
1516  // one way to force the task device
1517  // currently this will probably break all of PaRSEC if this hint
1518  // does not match where the data is located, not really useful for us
1519  // instead we set a hint on the data if there is no hint set yet
1520  //parsec_task->selected_device = ...;
1521 
1522  /* set the gpu_task so it's available in register_device_memory */
1523  task->dev_ptr->gpu_task = gpu_task;
1524 
1525  /* TODO: is this the right place to set the mask? */
1526  task->parsec_task.chore_mask = PARSEC_DEV_ALL;
1527 
1528  /* copy over the task class, because that's what we need */
1529  task->dev_ptr->task_class = *task->parsec_task.task_class;
1530 
1531  // first invocation of the coroutine to get the coroutine handle
1532  static_op<Space>(parsec_task);
1533 
1534  /* when we come back here, the flows in gpu_task are set (see register_device_memory) */
1535 
1536  parsec_task_class_t& tc = task->dev_ptr->task_class;
1537 
1538  // input flows are set up during register_device_memory as part of the first invocation above
1539  for (int i = 0; i < MAX_PARAM_COUNT; ++i) {
1540  tc.in[i] = gpu_task->flow[i];
1541  tc.out[i] = gpu_task->flow[i];
1542  }
1543  tc.nb_flows = MAX_PARAM_COUNT;
1544 
1545  /* set the device hint on the data */
1546  TT *tt = task->tt;
1547  if (tt->devicemap) {
1548  int parsec_dev;
1549  if constexpr (std::is_void_v<keyT>) {
1550  parsec_dev = detail::ttg_device_to_parsec_device(tt->devicemap());
1551  } else {
1552  parsec_dev = detail::ttg_device_to_parsec_device(tt->devicemap(task->key));
1553  }
1554  for (int i = 0; i < MAX_PARAM_COUNT; ++i) {
1555  /* only set on mutable data since we have exclusive access */
1556  if (tc.in[i]->flow_flags & PARSEC_FLOW_ACCESS_WRITE) {
1557  parsec_data_t *data = parsec_task->data[i].data_in->original;
1558  /* only set the preferred device if the host has the latest copy
1559  * as otherwise we may end up with the wrong data if there is a newer
1560  * version on a different device. Also, keep fingers crossed. */
1561  if (data->owner_device == 0) {
1562  parsec_advise_data_on_device(data, parsec_dev, PARSEC_DEV_DATA_ADVICE_PREFERRED_DEVICE);
1563  }
1564  }
1565  }
1566  }
1567 
1568  /* set the new task class that contains the flows */
1569  task->parsec_task.task_class = &task->dev_ptr->task_class;
1570 
1571  /* select this one */
1572  return PARSEC_HOOK_RETURN_DONE;
1573  }
1574 
1575  std::cerr << "EVALUATE called on task with assigned GPU task!" << std::endl;
1576 
1577  /* not sure if this might happen*/
1578  return PARSEC_HOOK_RETURN_ERROR;
1579 
1580  }
1581 
1582  template <ttg::ExecutionSpace Space>
1583  static parsec_hook_return_t device_static_op(parsec_task_t* parsec_task) {
1584  static_assert(derived_has_device_op());
1585 
1586  /* when we come in here we have a device assigned and are ready to go */
1587 
1588  task_t *task = (task_t*)parsec_task;
1589 
1590  if (nullptr == task->suspended_task_address) {
1591  /* short-cut in case the task returned immediately */
1592  return PARSEC_HOOK_RETURN_DONE;
1593  }
1594 
1595  // get the device task from the coroutine handle
1596  auto dev_task = ttg::device::detail::device_task_handle_type::from_address(task->suspended_task_address);
1597 
1598  // get the promise which contains the views
1599  ttg::device::detail::device_task_promise_type& dev_data = dev_task.promise();
1600 
1601  /* for now make sure we're waiting for transfers and the coro hasn't skipped this step */
1602  assert(dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_WAIT_TRANSFER);
1603 
1604  parsec_device_gpu_module_t *device = (parsec_device_gpu_module_t*)task->parsec_task.selected_device;
1605  assert(NULL != device);
1606 
1607  task->dev_ptr->device = device;
1608  parsec_gpu_task_t *gpu_task = task->dev_ptr->gpu_task;
1609  parsec_execution_stream_s *es = task->tt->world.impl().execution_stream();
1610 
1611  switch(device->super.type) {
1612 
1613 #if defined(PARSEC_HAVE_DEV_CUDA_SUPPORT)
1614  case PARSEC_DEV_CUDA:
1615  if constexpr (Space == ttg::ExecutionSpace::CUDA) {
1616  /* TODO: we need custom staging functions because PaRSEC looks at the
1617  * task-class to determine the number of flows. */
1618  gpu_task->stage_in = static_device_stage_in_hook;
1619  gpu_task->stage_out = parsec_default_gpu_stage_out;
1620  return parsec_device_kernel_scheduler(&device->super, es, gpu_task);
1621  }
1622  break;
1623 #endif
1624 #if defined(PARSEC_HAVE_DEV_HIP_SUPPORT)
1625  case PARSEC_DEV_HIP:
1626  if constexpr (Space == ttg::ExecutionSpace::HIP) {
1627  gpu_task->stage_in = static_device_stage_in_hook;
1628  gpu_task->stage_out = parsec_default_gpu_stage_out;
1629  return parsec_device_kernel_scheduler(&device->super, es, gpu_task);
1630  }
1631  break;
1632 #endif // PARSEC_HAVE_DEV_HIP_SUPPORT
1633 #if defined(PARSEC_HAVE_DEV_LEVEL_ZERO_SUPPORT)
1634  case PARSEC_DEV_LEVEL_ZERO:
1635  if constexpr (Space == ttg::ExecutionSpace::L0) {
1636  gpu_task->stage_in = static_device_stage_in_hook;
1637  gpu_task->stage_out = parsec_default_gpu_stage_out;
1638  return parsec_device_kernel_scheduler(&device->super, es, gpu_task);
1639  }
1640  break;
1641 #endif // PARSEC_HAVE_DEV_LEVEL_ZERO_SUPPORT
1642  default:
1643  break;
1644  }
1645  ttg::print_error(task->tt->get_name(), " : received mismatching device type ", (int)device->super.type, " from PaRSEC");
1646  ttg::abort();
1647  return PARSEC_HOOK_RETURN_DONE; // will not be reacehed
1648  }
1649 #endif // TTG_HAVE_DEVICE
1650 
1651  template <ttg::ExecutionSpace Space>
1652  static parsec_hook_return_t static_op(parsec_task_t *parsec_task) {
1653 
1654  task_t *task = (task_t*)parsec_task;
1655  void* suspended_task_address =
1656 #ifdef TTG_HAVE_COROUTINE
1657  task->suspended_task_address; // non-null = need to resume the task
1658 #else // TTG_HAVE_COROUTINE
1659  nullptr;
1660 #endif // TTG_HAVE_COROUTINE
1661  //std::cout << "static_op: suspended_task_address " << suspended_task_address << std::endl;
1662  if (suspended_task_address == nullptr) { // task is a coroutine that has not started or an ordinary function
1663 
1664  ttT *baseobj = task->tt;
1665  derivedT *obj = static_cast<derivedT *>(baseobj);
1666  assert(detail::parsec_ttg_caller == nullptr);
1667  detail::parsec_ttg_caller = static_cast<detail::parsec_ttg_task_base_t*>(task);
1668  if (obj->tracing()) {
1669  if constexpr (!ttg::meta::is_void_v<keyT>)
1670  ttg::trace(obj->get_world().rank(), ":", obj->get_name(), " : ", task->key, ": executing");
1671  else
1672  ttg::trace(obj->get_world().rank(), ":", obj->get_name(), " : executing");
1673  }
1674 
1675  if constexpr (!ttg::meta::is_void_v<keyT> && !ttg::meta::is_empty_tuple_v<input_values_tuple_type>) {
1676  auto input = make_tuple_of_ref_from_array(task, std::make_index_sequence<numinvals>{});
1677  TTG_PROCESS_TT_OP_RETURN(suspended_task_address, task->coroutine_id, baseobj->template op<Space>(task->key, std::move(input), obj->output_terminals));
1678  } else if constexpr (!ttg::meta::is_void_v<keyT> && ttg::meta::is_empty_tuple_v<input_values_tuple_type>) {
1679  TTG_PROCESS_TT_OP_RETURN(suspended_task_address, task->coroutine_id, baseobj->template op<Space>(task->key, obj->output_terminals));
1680  } else if constexpr (ttg::meta::is_void_v<keyT> && !ttg::meta::is_empty_tuple_v<input_values_tuple_type>) {
1681  auto input = make_tuple_of_ref_from_array(task, std::make_index_sequence<numinvals>{});
1682  TTG_PROCESS_TT_OP_RETURN(suspended_task_address, task->coroutine_id, baseobj->template op<Space>(std::move(input), obj->output_terminals));
1683  } else if constexpr (ttg::meta::is_void_v<keyT> && ttg::meta::is_empty_tuple_v<input_values_tuple_type>) {
1684  TTG_PROCESS_TT_OP_RETURN(suspended_task_address, task->coroutine_id, baseobj->template op<Space>(obj->output_terminals));
1685  } else {
1686  ttg::abort();
1687  }
1688  detail::parsec_ttg_caller = nullptr;
1689  }
1690  else { // resume the suspended coroutine
1691 
1692 #ifdef TTG_HAVE_COROUTINE
1693  assert(task->coroutine_id != ttg::TaskCoroutineID::Invalid);
1694 
1695 #ifdef TTG_HAVE_DEVICE
1696  if (task->coroutine_id == ttg::TaskCoroutineID::DeviceTask) {
1697  ttg::device::Task coro = ttg::device::detail::device_task_handle_type::from_address(suspended_task_address);
1698  assert(detail::parsec_ttg_caller == nullptr);
1699  detail::parsec_ttg_caller = static_cast<detail::parsec_ttg_task_base_t*>(task);
1700  // TODO: unify the outputs tls handling
1701  auto old_output_tls_ptr = task->tt->outputs_tls_ptr_accessor();
1702  task->tt->set_outputs_tls_ptr();
1703  coro.resume();
1704  if (coro.completed()) {
1705  coro.destroy();
1706  suspended_task_address = nullptr;
1707  }
1708  task->tt->set_outputs_tls_ptr(old_output_tls_ptr);
1709  detail::parsec_ttg_caller = nullptr;
1710  } else
1711 #endif // TTG_HAVE_DEVICE
1712  if (task->coroutine_id == ttg::TaskCoroutineID::ResumableTask) {
1713  auto ret = static_cast<ttg::resumable_task>(ttg::coroutine_handle<ttg::resumable_task_state>::from_address(suspended_task_address));
1714  assert(ret.ready());
1715  auto old_output_tls_ptr = task->tt->outputs_tls_ptr_accessor();
1716  task->tt->set_outputs_tls_ptr();
1717  ret.resume();
1718  if (ret.completed()) {
1719  ret.destroy();
1720  suspended_task_address = nullptr;
1721  }
1722  else { // not yet completed
1723  // leave suspended_task_address as is
1724 
1725  // right now can events are not properly implemented, we are only testing the workflow with dummy events
1726  // so mark the events finished manually, parsec will rerun this task again and it should complete the second time
1727  auto events = static_cast<ttg::resumable_task>(ttg::coroutine_handle<ttg::resumable_task_state>::from_address(suspended_task_address)).events();
1728  for (auto &event_ptr : events) {
1729  event_ptr->finish();
1730  }
1731  assert(ttg::coroutine_handle<ttg::resumable_task_state>::from_address(suspended_task_address).promise().ready());
1732  }
1733  task->tt->set_outputs_tls_ptr(old_output_tls_ptr);
1734  detail::parsec_ttg_caller = nullptr;
1735  task->suspended_task_address = suspended_task_address;
1736  }
1737  else
1738  ttg::abort(); // unrecognized task id
1739 #else // TTG_HAVE_COROUTINE
1740  ttg::abort(); // should not happen
1741 #endif // TTG_HAVE_COROUTINE
1742  }
1743 #ifdef TTG_HAVE_COROUTINE
1744  task->suspended_task_address = suspended_task_address;
1745 #endif // TTG_HAVE_COROUTINE
1746  if (suspended_task_address == nullptr) {
1747  ttT *baseobj = task->tt;
1748  derivedT *obj = static_cast<derivedT *>(baseobj);
1749  if (obj->tracing()) {
1750  if constexpr (!ttg::meta::is_void_v<keyT>)
1751  ttg::trace(obj->get_world().rank(), ":", obj->get_name(), " : ", task->key, ": done executing");
1752  else
1753  ttg::trace(obj->get_world().rank(), ":", obj->get_name(), " : done executing");
1754  }
1755  }
1756 
1757  return PARSEC_HOOK_RETURN_DONE;
1758  }
1759 
1760  template <ttg::ExecutionSpace Space>
1761  static parsec_hook_return_t static_op_noarg(parsec_task_t *parsec_task) {
1762  task_t *task = static_cast<task_t*>(parsec_task);
1763 
1764  void* suspended_task_address =
1765 #ifdef TTG_HAVE_COROUTINE
1766  task->suspended_task_address; // non-null = need to resume the task
1767 #else // TTG_HAVE_COROUTINE
1768  nullptr;
1769 #endif // TTG_HAVE_COROUTINE
1770  if (suspended_task_address == nullptr) { // task is a coroutine that has not started or an ordinary function
1771  ttT *baseobj = (ttT *)task->object_ptr;
1772  derivedT *obj = (derivedT *)task->object_ptr;
1773  assert(detail::parsec_ttg_caller == NULL);
1775  if constexpr (!ttg::meta::is_void_v<keyT>) {
1776  TTG_PROCESS_TT_OP_RETURN(suspended_task_address, task->coroutine_id, baseobj->template op<Space>(task->key, obj->output_terminals));
1777  } else if constexpr (ttg::meta::is_void_v<keyT>) {
1778  TTG_PROCESS_TT_OP_RETURN(suspended_task_address, task->coroutine_id, baseobj->template op<Space>(obj->output_terminals));
1779  } else // unreachable
1780  ttg:: abort();
1782  }
1783  else {
1784 #ifdef TTG_HAVE_COROUTINE
1785  auto ret = static_cast<ttg::resumable_task>(ttg::coroutine_handle<ttg::resumable_task_state>::from_address(suspended_task_address));
1786  assert(ret.ready());
1787  ret.resume();
1788  if (ret.completed()) {
1789  ret.destroy();
1790  suspended_task_address = nullptr;
1791  }
1792  else { // not yet completed
1793  // leave suspended_task_address as is
1794  }
1795 #else // TTG_HAVE_COROUTINE
1796  ttg::abort(); // should not happen
1797 #endif // TTG_HAVE_COROUTINE
1798  }
1799  task->suspended_task_address = suspended_task_address;
1800 
1801  if (suspended_task_address) {
1802  ttg::abort(); // not yet implemented
1803  // see comments in static_op()
1804  return PARSEC_HOOK_RETURN_AGAIN;
1805  }
1806  else
1807  return PARSEC_HOOK_RETURN_DONE;
1808  }
1809 
1810  template <std::size_t i>
1811  static parsec_hook_return_t static_reducer_op(parsec_execution_stream_s *es, parsec_task_t *parsec_task) {
1812  using rtask_t = detail::reducer_task_t;
1813  using value_t = std::tuple_element_t<i, actual_input_tuple_type>;
1814  constexpr const bool val_is_void = ttg::meta::is_void_v<value_t>;
1815  constexpr const bool input_is_const = std::is_const_v<value_t>;
1816  rtask_t *rtask = (rtask_t*)parsec_task;
1817  task_t *parent_task = static_cast<task_t*>(rtask->parent_task);
1818  ttT *baseobj = parent_task->tt;
1819  derivedT *obj = static_cast<derivedT *>(baseobj);
1820 
1821  auto& reducer = std::get<i>(baseobj->input_reducers);
1822 
1823  //std::cout << "static_reducer_op " << parent_task->key << std::endl;
1824 
1825  if (obj->tracing()) {
1826  if constexpr (!ttg::meta::is_void_v<keyT>)
1827  ttg::trace(obj->get_world().rank(), ":", obj->get_name(), " : ", parent_task->key, ": reducer executing");
1828  else
1829  ttg::trace(obj->get_world().rank(), ":", obj->get_name(), " : reducer executing");
1830  }
1831 
1832  /* the copy to reduce into */
1833  detail::ttg_data_copy_t *target_copy;
1834  target_copy = parent_task->copies[i];
1835  assert(val_is_void || nullptr != target_copy);
1836  /* once we hit 0 we have to stop since another thread might enqueue a new reduction task */
1837  std::size_t c = 0;
1838  std::size_t size = 0;
1839  assert(parent_task->streams[i].reduce_count > 0);
1840  if (rtask->is_first) {
1841  if (0 == (parent_task->streams[i].reduce_count.fetch_sub(1, std::memory_order_acq_rel)-1)) {
1842  /* we were the first and there is nothing to be done */
1843  if (obj->tracing()) {
1844  if constexpr (!ttg::meta::is_void_v<keyT>)
1845  ttg::trace(obj->get_world().rank(), ":", obj->get_name(), " : ", parent_task->key, ": first reducer empty");
1846  else
1847  ttg::trace(obj->get_world().rank(), ":", obj->get_name(), " : first reducer empty");
1848  }
1849 
1850  return PARSEC_HOOK_RETURN_DONE;
1851  }
1852  }
1853 
1854  assert(detail::parsec_ttg_caller == NULL);
1855  detail::parsec_ttg_caller = rtask->parent_task;
1856 
1857  do {
1858  if constexpr(!val_is_void) {
1859  /* the copies to reduce out of */
1860  detail::ttg_data_copy_t *source_copy;
1861  parsec_list_item_t *item;
1862  item = parsec_lifo_pop(&parent_task->streams[i].reduce_copies);
1863  if (nullptr == item) {
1864  // maybe someone is changing the goal right now
1865  break;
1866  }
1867  source_copy = ((detail::ttg_data_copy_self_t *)(item))->self;
1868  assert(target_copy->num_readers() == target_copy->mutable_tag);
1869  assert(source_copy->num_readers() > 0);
1870  reducer(*reinterpret_cast<std::decay_t<value_t> *>(target_copy->get_ptr()),
1871  *reinterpret_cast<std::decay_t<value_t> *>(source_copy->get_ptr()));
1872  detail::release_data_copy(source_copy);
1873  } else if constexpr(val_is_void) {
1874  reducer(); // invoke control reducer
1875  }
1876  // there is only one task working on this stream, so no need to be atomic here
1877  size = ++parent_task->streams[i].size;
1878  //std::cout << "static_reducer_op size " << size << " of " << parent_task->streams[i].goal << std::endl;
1879  } while ((c = (parent_task->streams[i].reduce_count.fetch_sub(1, std::memory_order_acq_rel)-1)) > 0);
1880  //} while ((c = (--task->streams[i].reduce_count)) > 0);
1881 
1882  /* finalize_argstream sets goal to 1, so size may be larger than goal */
1883  bool complete = (size >= parent_task->streams[i].goal);
1884 
1885  //std::cout << "static_reducer_op size " << size
1886  // << " of " << parent_task->streams[i].goal << " complete " << complete
1887  // << " c " << c << std::endl;
1888  if (complete && c == 0) {
1889  if constexpr(input_is_const) {
1890  /* make the consumer task a reader if its input is const */
1891  target_copy->reset_readers();
1892  }
1893  /* task may not be runnable yet because other inputs are missing, have release_task decide */
1894  parent_task->remove_from_hash = true;
1895  parent_task->release_task(parent_task);
1896  }
1897 
1899 
1900  if (obj->tracing()) {
1901  if constexpr (!ttg::meta::is_void_v<keyT>)
1902  ttg::trace(obj->get_world().rank(), ":", obj->get_name(), " : ", parent_task->key, ": done executing");
1903  else
1904  ttg::trace(obj->get_world().rank(), ":", obj->get_name(), " : done executing");
1905  }
1906 
1907  return PARSEC_HOOK_RETURN_DONE;
1908  }
1909 
1910 
1911  protected:
1912  template <typename T>
1913  uint64_t unpack(T &obj, void *_bytes, uint64_t pos) {
1915  uint64_t payload_size;
1916  if constexpr (!dd_t::serialize_size_is_const) {
1917  pos = ttg::default_data_descriptor<uint64_t>::unpack_payload(&payload_size, sizeof(uint64_t), pos, _bytes);
1918  } else {
1919  payload_size = dd_t::payload_size(&obj);
1920  }
1921  pos = dd_t::unpack_payload(&obj, payload_size, pos, _bytes);
1922  return pos;
1923  }
1924 
1925  template <typename T>
1926  uint64_t pack(T &obj, void *bytes, uint64_t pos, detail::ttg_data_copy_t *copy = nullptr) {
1928  uint64_t payload_size = dd_t::payload_size(&obj);
1929  if (copy) {
1930  /* reset any tracked data, we don't care about the packing from the payload size */
1931  copy->iovec_reset();
1932  }
1933 
1934  if constexpr (!dd_t::serialize_size_is_const) {
1935  pos = ttg::default_data_descriptor<uint64_t>::pack_payload(&payload_size, sizeof(uint64_t), pos, bytes);
1936  }
1937  pos = dd_t::pack_payload(&obj, payload_size, pos, bytes);
1938  return pos;
1939  }
1940 
1941  static void static_set_arg(void *data, std::size_t size, ttg::TTBase *bop) {
1942  assert(size >= sizeof(msg_header_t) &&
1943  "Trying to unpack as message that does not hold enough bytes to represent a single header");
1944  msg_header_t *hd = static_cast<msg_header_t *>(data);
1945  derivedT *obj = reinterpret_cast<derivedT *>(bop);
1946  switch (hd->fn_id) {
1948  if (0 <= hd->param_id) {
1949  assert(hd->param_id >= 0);
1950  assert(hd->param_id < obj->set_arg_from_msg_fcts.size());
1951  auto member = obj->set_arg_from_msg_fcts[hd->param_id];
1952  (obj->*member)(data, size);
1953  } else {
1954  // there is no good reason to have negative param ids
1955  ttg::abort();
1956  }
1957  break;
1958  }
1960  assert(hd->param_id >= 0);
1961  assert(hd->param_id < obj->set_argstream_size_from_msg_fcts.size());
1962  auto member = obj->set_argstream_size_from_msg_fcts[hd->param_id];
1963  (obj->*member)(data, size);
1964  break;
1965  }
1967  assert(hd->param_id >= 0);
1968  assert(hd->param_id < obj->finalize_argstream_from_msg_fcts.size());
1969  auto member = obj->finalize_argstream_from_msg_fcts[hd->param_id];
1970  (obj->*member)(data, size);
1971  break;
1972  }
1974  assert(hd->param_id >= 0);
1975  assert(hd->param_id < obj->get_from_pull_msg_fcts.size());
1976  auto member = obj->get_from_pull_msg_fcts[hd->param_id];
1977  (obj->*member)(data, size);
1978  break;
1979  }
1980  default:
1981  ttg::abort();
1982  }
1983  }
1984 
1986  inline parsec_thread_mempool_t *get_task_mempool(void) {
1987  auto &world_impl = world.impl();
1988  parsec_execution_stream_s *es = world_impl.execution_stream();
1989  int index = (es->virtual_process->vp_id * es->virtual_process->nb_cores + es->th_id);
1990  return &mempools.thread_mempools[index];
1991  }
1992 
1993  template <size_t i, typename valueT>
1994  void set_arg_from_msg_keylist(ttg::span<keyT> &&keylist, detail::ttg_data_copy_t *copy) {
1995  /* create a dummy task that holds the copy, which can be reused by others */
1996  task_t *dummy;
1997  parsec_execution_stream_s *es = world.impl().execution_stream();
1998  parsec_thread_mempool_t *mempool = get_task_mempool();
1999  dummy = new (parsec_thread_mempool_allocate(mempool)) task_t(mempool, &this->self);
2000  dummy->set_dummy(true);
2001  // TODO: do we need to copy static_stream_goal in dummy?
2002 
2003  /* set the received value as the dummy's only data */
2004  dummy->copies[0] = copy;
2005 
2006  /* We received the task on this world, so it's using the same taskpool */
2007  dummy->parsec_task.taskpool = world.impl().taskpool();
2008 
2009  /* save the current task and set the dummy task */
2010  auto parsec_ttg_caller_save = detail::parsec_ttg_caller;
2011  detail::parsec_ttg_caller = dummy;
2012 
2013  /* iterate over the keys and have them use the copy we made */
2014  parsec_task_t *task_ring = nullptr;
2015  for (auto &&key : keylist) {
2016  // copy-constructible? can broadcast to any number of keys
2017  if constexpr (std::is_copy_constructible_v<valueT>) {
2018  set_arg_local_impl<i>(key, *reinterpret_cast<valueT *>(copy->get_ptr()), copy, &task_ring);
2019  }
2020  else {
2021  // not copy-constructible? can move, but only to single key
2022  static_assert(!std::is_reference_v<valueT>);
2023  if (std::size(keylist) == 1)
2024  set_arg_local_impl<i>(key, std::move(*reinterpret_cast<valueT *>(copy->get_ptr())), copy, &task_ring);
2025  else {
2026  throw std::logic_error(std::string("TTG::PaRSEC: need to copy a datum of type") + boost::typeindex::type_id<std::decay_t<valueT>>().pretty_name() + " but the type is not copyable");
2027  }
2028  }
2029  }
2030 
2031  if (nullptr != task_ring) {
2032  auto &world_impl = world.impl();
2033  parsec_task_t *vp_task_ring[1] = { task_ring };
2034  __parsec_schedule_vp(world_impl.execution_stream(), vp_task_ring, 0);
2035  }
2036 
2037  /* restore the previous task */
2038  detail::parsec_ttg_caller = parsec_ttg_caller_save;
2039 
2040  /* release the dummy task */
2041  complete_task_and_release(es, &dummy->parsec_task);
2042  parsec_thread_mempool_free(mempool, &dummy->parsec_task);
2043  }
2044 
2045  // there are 6 types of set_arg:
2046  // - case 1: nonvoid Key, complete Value type
2047  // - case 2: nonvoid Key, void Value, mixed (data+control) inputs
2048  // - case 3: nonvoid Key, void Value, no inputs
2049  // - case 4: void Key, complete Value type
2050  // - case 5: void Key, void Value, mixed (data+control) inputs
2051  // - case 6: void Key, void Value, no inputs
2052  // implementation of these will be further split into "local-only" and global+local
2053 
2054  template <std::size_t i>
2055  void set_arg_from_msg(void *data, std::size_t size) {
2056  using valueT = std::tuple_element_t<i, actual_input_tuple_type>;
2057  using msg_t = detail::msg_t;
2058  msg_t *msg = static_cast<msg_t *>(data);
2059  if constexpr (!ttg::meta::is_void_v<keyT>) {
2060  /* unpack the keys */
2061  /* TODO: can we avoid copying all the keys?! */
2062  uint64_t pos = msg->tt_id.key_offset;
2063  uint64_t key_end_pos;
2064  std::vector<keyT> keylist;
2065  int num_keys = msg->tt_id.num_keys;
2066  keylist.reserve(num_keys);
2067  auto rank = world.rank();
2068  for (int k = 0; k < num_keys; ++k) {
2069  keyT key;
2070  pos = unpack(key, msg->bytes, pos);
2071  assert(keymap(key) == rank);
2072  keylist.push_back(std::move(key));
2073  }
2074  key_end_pos = pos;
2075  /* jump back to the beginning of the message to get the value */
2076  pos = 0;
2077  // case 1
2078  if constexpr (!ttg::meta::is_void_v<valueT>) {
2079  using decvalueT = std::decay_t<valueT>;
2080  int32_t num_iovecs = msg->tt_id.num_iovecs;
2081  //bool inline_data = msg->inline_data;
2085  using metadata_t = decltype(descr.get_metadata(std::declval<decvalueT>()));
2086 
2087  /* unpack the metadata */
2088  metadata_t metadata;
2089  pos = unpack(metadata, msg->bytes, pos);
2090 
2091  //std::cout << "set_arg_from_msg splitmd num_iovecs " << num_iovecs << std::endl;
2092 
2093  copy = detail::create_new_datacopy(descr.create_from_metadata(metadata));
2094  } else if constexpr (!ttg::has_split_metadata<decvalueT>::value) {
2095  copy = detail::create_new_datacopy(decvalueT{});
2096 #if 0
2097  // TODO: first attempt at sending directly to the device
2098  parsec_gpu_data_copy_t* gpu_elem;
2099  gpu_elem = PARSEC_DATA_GET_COPY(master, gpu_device->super.device_index);
2100  int i = detail::first_device_id;
2101  int devid = detail::first_device_id;
2102  while (i < parsec_nb_devices) {
2103  if (nullptr == gpu_elem) {
2104  gpu_elem = PARSEC_OBJ_NEW(parsec_data_copy_t);
2105  gpu_elem->flags = PARSEC_DATA_FLAG_PARSEC_OWNED | PARSEC_DATA_FLAG_PARSEC_MANAGED;
2106  gpu_elem->coherency_state = PARSEC_DATA_COHERENCY_INVALID;
2107  gpu_elem->version = 0;
2108  gpu_elem->coherency_state = PARSEC_DATA_COHERENCY_OWNED;
2109  }
2110  if (nullptr == gpu_elem->device_private) {
2111  gpu_elem->device_private = zone_malloc(gpu_device->memory, gpu_task->flow_nb_elts[i]);
2112  if (nullptr == gpu_elem->device_private) {
2113  devid++;
2114  continue;
2115  }
2116  break;
2117  }
2118  }
2119 #endif // 0
2120  /* unpack the object, potentially discovering iovecs */
2121  pos = unpack(*static_cast<decvalueT *>(copy->get_ptr()), msg->bytes, pos);
2122  //std::cout << "set_arg_from_msg iovec_begin num_iovecs " << num_iovecs << " distance " << std::distance(copy->iovec_begin(), copy->iovec_end()) << std::endl;
2123  assert(std::distance(copy->iovec_begin(), copy->iovec_end()) == num_iovecs);
2124  }
2125 
2126  if (num_iovecs == 0) {
2127  set_arg_from_msg_keylist<i, decvalueT>(ttg::span<keyT>(&keylist[0], num_keys), copy);
2128  } else {
2129  /* unpack the header and start the RMA transfers */
2130 
2131  /* get the remote rank */
2132  int remote = msg->tt_id.sender;
2133  assert(remote < world.size());
2134 
2135  auto &val = *static_cast<decvalueT *>(copy->get_ptr());
2136 
2137  bool inline_data = msg->tt_id.inline_data;
2138 
2139  int nv = 0;
2140  /* start the RMA transfers */
2141  auto handle_iovecs_fn =
2142  [&](auto&& iovecs) {
2143 
2144  if (inline_data) {
2145  /* unpack the data from the message */
2146  for (auto &&iov : iovecs) {
2147  ++nv;
2148  std::memcpy(iov.data, msg->bytes + pos, iov.num_bytes);
2149  pos += iov.num_bytes;
2150  }
2151  } else {
2152  /* extract the callback tag */
2153  parsec_ce_tag_t cbtag;
2154  std::memcpy(&cbtag, msg->bytes + pos, sizeof(cbtag));
2155  pos += sizeof(cbtag);
2156 
2157  /* create the value from the metadata */
2158  auto activation = new detail::rma_delayed_activate(
2159  std::move(keylist), copy, num_iovecs, [this](std::vector<keyT> &&keylist, detail::ttg_data_copy_t *copy) {
2160  set_arg_from_msg_keylist<i, decvalueT>(keylist, copy);
2161  this->world.impl().decrement_inflight_msg();
2162  });
2163 
2164  using ActivationT = std::decay_t<decltype(*activation)>;
2165 
2166  for (auto &&iov : iovecs) {
2167  ++nv;
2168  parsec_ce_mem_reg_handle_t rreg;
2169  int32_t rreg_size_i;
2170  std::memcpy(&rreg_size_i, msg->bytes + pos, sizeof(rreg_size_i));
2171  pos += sizeof(rreg_size_i);
2172  rreg = static_cast<parsec_ce_mem_reg_handle_t>(msg->bytes + pos);
2173  pos += rreg_size_i;
2174  // std::intptr_t *fn_ptr = reinterpret_cast<std::intptr_t *>(msg->bytes + pos);
2175  // pos += sizeof(*fn_ptr);
2176  std::intptr_t fn_ptr;
2177  std::memcpy(&fn_ptr, msg->bytes + pos, sizeof(fn_ptr));
2178  pos += sizeof(fn_ptr);
2179 
2180  /* register the local memory */
2181  parsec_ce_mem_reg_handle_t lreg;
2182  size_t lreg_size;
2183  parsec_ce.mem_register(iov.data, PARSEC_MEM_TYPE_NONCONTIGUOUS, iov.num_bytes, parsec_datatype_int8_t,
2184  iov.num_bytes, &lreg, &lreg_size);
2185  world.impl().increment_inflight_msg();
2186  /* TODO: PaRSEC should treat the remote callback as a tag, not a function pointer! */
2187  //std::cout << "set_arg_from_msg: get rreg " << rreg << " remote " << remote << std::endl;
2188  parsec_ce.get(&parsec_ce, lreg, 0, rreg, 0, iov.num_bytes, remote,
2189  &detail::get_complete_cb<ActivationT>, activation,
2190  /*world.impl().parsec_ttg_rma_tag()*/
2191  cbtag, &fn_ptr, sizeof(std::intptr_t));
2192  }
2193  }
2194  };
2197  handle_iovecs_fn(descr.get_data(val));
2198  } else if constexpr (!ttg::has_split_metadata<decvalueT>::value) {
2199  handle_iovecs_fn(copy->iovec_span());
2200  copy->iovec_reset();
2201  }
2202 
2203  assert(num_iovecs == nv);
2204  assert(size == (key_end_pos + sizeof(msg_header_t)));
2205 
2206  if (inline_data) {
2207  set_arg_from_msg_keylist<i, decvalueT>(ttg::span<keyT>(&keylist[0], num_keys), copy);
2208  }
2209  }
2210  // case 2 and 3
2211  } else if constexpr (!ttg::meta::is_void_v<keyT> && std::is_void_v<valueT>) {
2212  for (auto &&key : keylist) {
2213  set_arg<i, keyT, ttg::Void>(key, ttg::Void{});
2214  }
2215  }
2216  // case 4
2217  } else if constexpr (ttg::meta::is_void_v<keyT> && !std::is_void_v<valueT>) {
2218  using decvalueT = std::decay_t<valueT>;
2219  decvalueT val;
2220  /* TODO: handle split-metadata case as with non-void keys */
2221  unpack(val, msg->bytes, 0);
2222  set_arg<i, keyT, valueT>(std::move(val));
2223  // case 5 and 6
2224  } else if constexpr (ttg::meta::is_void_v<keyT> && std::is_void_v<valueT>) {
2225  set_arg<i, keyT, ttg::Void>(ttg::Void{});
2226  } else { // unreachable
2227  ttg::abort();
2228  }
2229  }
2230 
2231  template <std::size_t i>
2232  void finalize_argstream_from_msg(void *data, std::size_t size) {
2233  using msg_t = detail::msg_t;
2234  msg_t *msg = static_cast<msg_t *>(data);
2235  if constexpr (!ttg::meta::is_void_v<keyT>) {
2236  /* unpack the key */
2237  uint64_t pos = 0;
2238  auto rank = world.rank();
2239  keyT key;
2240  pos = unpack(key, msg->bytes, pos);
2241  assert(keymap(key) == rank);
2242  finalize_argstream<i>(key);
2243  } else {
2244  auto rank = world.rank();
2245  assert(keymap() == rank);
2246  finalize_argstream<i>();
2247  }
2248  }
2249 
2250  template <std::size_t i>
2251  void argstream_set_size_from_msg(void *data, std::size_t size) {
2252  using msg_t = detail::msg_t;
2253  auto msg = static_cast<msg_t *>(data);
2254  uint64_t pos = 0;
2255  if constexpr (!ttg::meta::is_void_v<keyT>) {
2256  /* unpack the key */
2257  auto rank = world.rank();
2258  keyT key;
2259  pos = unpack(key, msg->bytes, pos);
2260  assert(keymap(key) == rank);
2261  std::size_t argstream_size;
2262  pos = unpack(argstream_size, msg->bytes, pos);
2263  set_argstream_size<i>(key, argstream_size);
2264  } else {
2265  auto rank = world.rank();
2266  assert(keymap() == rank);
2267  std::size_t argstream_size;
2268  pos = unpack(argstream_size, msg->bytes, pos);
2269  set_argstream_size<i>(argstream_size);
2270  }
2271  }
2272 
2273  template <std::size_t i>
2274  void get_from_pull_msg(void *data, std::size_t size) {
2275  using msg_t = detail::msg_t;
2276  msg_t *msg = static_cast<msg_t *>(data);
2277  auto &in = std::get<i>(input_terminals);
2278  if constexpr (!ttg::meta::is_void_v<keyT>) {
2279  /* unpack the key */
2280  uint64_t pos = 0;
2281  keyT key;
2282  pos = unpack(key, msg->bytes, pos);
2283  set_arg<i>(key, (in.container).get(key));
2284  }
2285  }
2286 
2287  template <std::size_t i, typename Key, typename Value>
2288  std::enable_if_t<!ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>, void> set_arg_local(
2289  const Key &key, Value &&value) {
2290  set_arg_local_impl<i>(key, std::forward<Value>(value));
2291  }
2292 
2293  template <std::size_t i, typename Key = keyT, typename Value>
2294  std::enable_if_t<ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>, void> set_arg_local(
2295  Value &&value) {
2296  set_arg_local_impl<i>(ttg::Void{}, std::forward<Value>(value));
2297  }
2298 
2299  template <std::size_t i, typename Key, typename Value>
2300  std::enable_if_t<!ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>, void> set_arg_local(
2301  const Key &key, const Value &value) {
2302  set_arg_local_impl<i>(key, value);
2303  }
2304 
2305  template <std::size_t i, typename Key = keyT, typename Value>
2306  std::enable_if_t<ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>, void> set_arg_local(
2307  const Value &value) {
2308  set_arg_local_impl<i>(ttg::Void{}, value);
2309  }
2310 
2311  template <std::size_t i, typename Key = keyT, typename Value>
2312  std::enable_if_t<ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>, void> set_arg_local(
2313  std::shared_ptr<const Value> &valueptr) {
2314  set_arg_local_impl<i>(ttg::Void{}, *valueptr);
2315  }
2316 
2317  template <typename Key>
2318  task_t *create_new_task(const Key &key) {
2319  constexpr const bool keyT_is_Void = ttg::meta::is_void_v<keyT>;
2320  auto &world_impl = world.impl();
2321  task_t *newtask;
2322  parsec_thread_mempool_t *mempool = get_task_mempool();
2323  char *taskobj = (char *)parsec_thread_mempool_allocate(mempool);
2324  int32_t priority = 0;
2325  if constexpr (!keyT_is_Void) {
2326  priority = priomap(key);
2327  /* placement-new the task */
2328  newtask = new (taskobj) task_t(key, mempool, &this->self, world_impl.taskpool(), this, priority);
2329  } else {
2330  priority = priomap();
2331  /* placement-new the task */
2332  newtask = new (taskobj) task_t(mempool, &this->self, world_impl.taskpool(), this, priority);
2333  }
2334 
2335  for (int i = 0; i < static_stream_goal.size(); ++i) {
2336  newtask->streams[i].goal = static_stream_goal[i];
2337  }
2338 
2339  ttg::trace(world.rank(), ":", get_name(), " : ", key, ": creating task");
2340  return newtask;
2341  }
2342 
2343 
2344  template <std::size_t i>
2346  /* make sure we can reuse the existing memory pool and don't have to create a new one */
2347  static_assert(sizeof(task_t) >= sizeof(detail::reducer_task_t));
2348  constexpr const bool keyT_is_Void = ttg::meta::is_void_v<keyT>;
2349  auto &world_impl = world.impl();
2350  detail::reducer_task_t *newtask;
2351  parsec_thread_mempool_t *mempool = get_task_mempool();
2352  char *taskobj = (char *)parsec_thread_mempool_allocate(mempool);
2353  // use the priority of the task we stream into
2354  int32_t priority = 0;
2355  if constexpr (!keyT_is_Void) {
2356  priority = priomap(task->key);
2357  ttg::trace(world.rank(), ":", get_name(), " : ", task->key, ": creating reducer task");
2358  } else {
2359  priority = priomap();
2360  ttg::trace(world.rank(), ":", get_name(), ": creating reducer task");
2361  }
2362  /* placement-new the task */
2363  newtask = new (taskobj) detail::reducer_task_t(task, mempool, inpute_reducers_taskclass[i],
2364  world_impl.taskpool(), priority, is_first);
2365 
2366  return newtask;
2367  }
2368 
2369 
2370  // Used to set the i'th argument
2371  template <std::size_t i, typename Key, typename Value>
2372  void set_arg_local_impl(const Key &key, Value &&value, detail::ttg_data_copy_t *copy_in = nullptr,
2373  parsec_task_t **task_ring = nullptr) {
2374  using valueT = std::tuple_element_t<i, input_values_full_tuple_type>;
2375  constexpr const bool input_is_const = std::is_const_v<std::tuple_element_t<i, input_args_type>>;
2376  constexpr const bool valueT_is_Void = ttg::meta::is_void_v<valueT>;
2377  constexpr const bool keyT_is_Void = ttg::meta::is_void_v<Key>;
2378 
2379 
2380  ttg::trace(world.rank(), ":", get_name(), " : ", key, ": received value for argument : ", i);
2381 
2382  parsec_key_t hk = 0;
2383  if constexpr (!keyT_is_Void) {
2384  hk = reinterpret_cast<parsec_key_t>(&key);
2385  assert(keymap(key) == world.rank());
2386  }
2387 
2388  task_t *task;
2389  auto &world_impl = world.impl();
2390  auto &reducer = std::get<i>(input_reducers);
2391  bool release = false;
2392  bool remove_from_hash = true;
2393  bool discover_task = true;
2394  bool get_pull_data = false;
2395  bool has_lock = false;
2396  /* If we have only one input and no reducer on that input we can skip the hash table */
2397  if (numins > 1 || reducer) {
2398  has_lock = true;
2399  parsec_hash_table_lock_bucket(&tasks_table, hk);
2400  if (nullptr == (task = (task_t *)parsec_hash_table_nolock_find(&tasks_table, hk))) {
2401  task = create_new_task(key);
2402  world_impl.increment_created();
2403  parsec_hash_table_nolock_insert(&tasks_table, &task->tt_ht_item);
2404  get_pull_data = !is_lazy_pull();
2405  if( world_impl.dag_profiling() ) {
2406 #if defined(PARSEC_PROF_GRAPHER)
2407  parsec_prof_grapher_task(&task->parsec_task, world_impl.execution_stream()->th_id, 0,
2408  key_hash(make_key(task->parsec_task.taskpool, task->parsec_task.locals), nullptr));
2409 #endif
2410  }
2411  } else if (!reducer && numins == (task->in_data_count + 1)) {
2412  /* remove while we have the lock */
2413  parsec_hash_table_nolock_remove(&tasks_table, hk);
2414  remove_from_hash = false;
2415  }
2416  /* if we have a reducer, we need to hold on to the lock for just a little longer */
2417  if (!reducer) {
2418  parsec_hash_table_unlock_bucket(&tasks_table, hk);
2419  has_lock = false;
2420  }
2421  } else {
2422  task = create_new_task(key);
2423  world_impl.increment_created();
2424  remove_from_hash = false;
2425  if( world_impl.dag_profiling() ) {
2426 #if defined(PARSEC_PROF_GRAPHER)
2427  parsec_prof_grapher_task(&task->parsec_task, world_impl.execution_stream()->th_id, 0,
2428  key_hash(make_key(task->parsec_task.taskpool, task->parsec_task.locals), nullptr));
2429 #endif
2430  }
2431  }
2432 
2433  if( world_impl.dag_profiling() ) {
2434 #if defined(PARSEC_PROF_GRAPHER)
2435  if(NULL != detail::parsec_ttg_caller && !detail::parsec_ttg_caller->is_dummy()) {
2437  char orig_str[32];
2438  char dest_str[32];
2439  if(orig_index >= 0) {
2440  snprintf(orig_str, 32, "%d", orig_index);
2441  } else {
2442  strncpy(orig_str, "_", 32);
2443  }
2444  snprintf(dest_str, 32, "%lu", i);
2445  parsec_flow_t orig{ .name = orig_str, .sym_type = PARSEC_SYM_INOUT, .flow_flags = PARSEC_FLOW_ACCESS_RW,
2446  .flow_index = 0, .flow_datatype_mask = ~0 };
2447  parsec_flow_t dest{ .name = dest_str, .sym_type = PARSEC_SYM_INOUT, .flow_flags = PARSEC_FLOW_ACCESS_RW,
2448  .flow_index = 0, .flow_datatype_mask = ~0 };
2449  parsec_prof_grapher_dep(&detail::parsec_ttg_caller->parsec_task, &task->parsec_task, discover_task ? 1 : 0, &orig, &dest);
2450  }
2451 #endif
2452  }
2453 
2454  auto get_copy_fn = [&](detail::parsec_ttg_task_base_t *task, auto&& value, bool is_const){
2455  detail::ttg_data_copy_t *copy = copy_in;
2456  if (nullptr == copy && nullptr != detail::parsec_ttg_caller) {
2458  }
2459  if (nullptr != copy) {
2460  /* retain the data copy */
2461  copy = detail::register_data_copy<valueT>(copy, task, is_const);
2462  } else {
2463  /* create a new copy */
2464  copy = detail::create_new_datacopy(std::forward<Value>(value));
2465  if (!is_const) {
2466  copy->mark_mutable();
2467  }
2468  }
2469  return copy;
2470  };
2471 
2472  if (reducer && 1 != task->streams[i].goal) { // is this a streaming input? reduce the received value
2473  auto submit_reducer_task = [&](auto *parent_task){
2474  /* check if we need to create a task */
2475  std::size_t c = parent_task->streams[i].reduce_count.fetch_add(1, std::memory_order_acquire);
2476  //std::cout << "submit_reducer_task " << key << " c " << c << std::endl;
2477  if (0 == c) {
2478  /* we are responsible for creating the reduction task */
2479  detail::reducer_task_t *reduce_task;
2480  reduce_task = create_new_reducer_task<i>(parent_task, false);
2481  reduce_task->release_task(reduce_task); // release immediately
2482  }
2483  };
2484 
2485  if constexpr (!ttg::meta::is_void_v<valueT>) { // for data values
2486  // have a value already? if not, set, otherwise reduce
2487  detail::ttg_data_copy_t *copy = nullptr;
2488  if (nullptr == (copy = task->copies[i])) {
2489  using decay_valueT = std::decay_t<valueT>;
2490 
2491  /* first input value, create a task and bind it to the copy */
2492  //std::cout << "Creating new reducer task for " << key << std::endl;
2493  detail::reducer_task_t *reduce_task;
2494  reduce_task = create_new_reducer_task<i>(task, true);
2495 
2496  /* protected by the bucket lock */
2497  task->streams[i].size = 1;
2498  task->streams[i].reduce_count.store(1, std::memory_order_relaxed);
2499 
2500  /* get the copy to use as input for this task */
2501  detail::ttg_data_copy_t *copy = get_copy_fn(reduce_task, std::forward<Value>(value), false);
2502 
2503  /* put the copy into the task */
2504  task->copies[i] = copy;
2505 
2506  /* release the task if we're not deferred
2507  * TODO: can we delay that until we get the second value?
2508  */
2509  if (copy->get_next_task() != &reduce_task->parsec_task) {
2510  reduce_task->release_task(reduce_task);
2511  }
2512 
2513  /* now we can unlock the bucket */
2514  parsec_hash_table_unlock_bucket(&tasks_table, hk);
2515  } else {
2516  /* unlock the bucket, the lock is not needed anymore */
2517  parsec_hash_table_unlock_bucket(&tasks_table, hk);
2518 
2519  /* get the copy to use as input for this task */
2520  detail::ttg_data_copy_t *copy = get_copy_fn(task, std::forward<Value>(value), true);
2521 
2522  /* enqueue the data copy to be reduced */
2523  parsec_lifo_push(&task->streams[i].reduce_copies, &copy->super);
2524  submit_reducer_task(task);
2525  }
2526  } else {
2527  /* unlock the bucket, the lock is not needed anymore */
2528  parsec_hash_table_unlock_bucket(&tasks_table, hk);
2529  /* submit reducer for void values to handle side effects */
2530  submit_reducer_task(task);
2531  }
2532  //if (release) {
2533  // parsec_hash_table_nolock_remove(&tasks_table, hk);
2534  // remove_from_hash = false;
2535  //}
2536  //parsec_hash_table_unlock_bucket(&tasks_table, hk);
2537  } else {
2538  /* unlock the bucket, the lock is not needed anymore */
2539  if (has_lock) {
2540  parsec_hash_table_unlock_bucket(&tasks_table, hk);
2541  }
2542  /* whether the task needs to be deferred or not */
2543  if constexpr (!valueT_is_Void) {
2544  if (nullptr != task->copies[i]) {
2545  ttg::print_error(get_name(), " : ", key, ": error argument is already set : ", i);
2546  throw std::logic_error("bad set arg");
2547  }
2548 
2549  /* get the copy to use as input for this task */
2550  detail::ttg_data_copy_t *copy = get_copy_fn(task, std::forward<Value>(value), input_is_const);
2551 
2552  /* if we registered as a writer and were the first to register with this copy
2553  * we need to defer the release of this task to give other tasks a chance to
2554  * make a copy of the original data */
2555  release = (copy->get_next_task() != &task->parsec_task);
2556  task->copies[i] = copy;
2557  } else {
2558  release = true;
2559  }
2560  }
2561  task->remove_from_hash = remove_from_hash;
2562  if (release) {
2563  release_task(task, task_ring);
2564  }
2565  /* if not pulling lazily, pull the data here */
2566  if constexpr (!ttg::meta::is_void_v<keyT>) {
2567  if (get_pull_data) {
2568  invoke_pull_terminals(std::make_index_sequence<std::tuple_size_v<input_values_tuple_type>>{}, task->key, task);
2569  }
2570  }
2571  }
2572 
2573  void release_task(task_t *task,
2574  parsec_task_t **task_ring = nullptr) {
2575  constexpr const bool keyT_is_Void = ttg::meta::is_void_v<keyT>;
2576 
2577  /* if remove_from_hash == false, someone has already removed the task from the hash table
2578  * so we know that the task is ready, no need to do atomic increments here */
2579  bool is_ready = !task->remove_from_hash;
2580  int32_t count;
2581  if (is_ready) {
2582  count = numins;
2583  } else {
2584  count = parsec_atomic_fetch_inc_int32(&task->in_data_count) + 1;
2585  assert(count <= self.dependencies_goal);
2586  }
2587 
2588  auto &world_impl = world.impl();
2589  ttT *baseobj = task->tt;
2590 
2591  if (count == numins) {
2592  parsec_execution_stream_t *es = world_impl.execution_stream();
2593  parsec_key_t hk = task->pkey();
2594  if (tracing()) {
2595  if constexpr (!keyT_is_Void) {
2596  ttg::trace(world.rank(), ":", get_name(), " : ", task->key, ": submitting task for op ");
2597  } else {
2598  ttg::trace(world.rank(), ":", get_name(), ": submitting task for op ");
2599  }
2600  }
2601  if (task->remove_from_hash) parsec_hash_table_remove(&tasks_table, hk);
2602  if (nullptr == task_ring) {
2603  parsec_task_t *vp_task_rings[1] = { &task->parsec_task };
2604  __parsec_schedule_vp(es, vp_task_rings, 0);
2605  } else if (*task_ring == nullptr) {
2606  /* the first task is set directly */
2607  *task_ring = &task->parsec_task;
2608  } else {
2609  /* push into the ring */
2610  parsec_list_item_ring_push_sorted(&(*task_ring)->super, &task->parsec_task.super,
2611  offsetof(parsec_task_t, priority));
2612  }
2613  } else if constexpr (!ttg::meta::is_void_v<keyT>) {
2614  if ((baseobj->num_pullins + count == numins) && baseobj->is_lazy_pull()) {
2615  /* lazily pull the pull terminal data */
2616  baseobj->invoke_pull_terminals(std::make_index_sequence<std::tuple_size_v<input_values_tuple_type>>{}, task->key, task);
2617  }
2618  }
2619  }
2620 
2621  // cases 1+2
2622  template <std::size_t i, typename Key, typename Value>
2623  std::enable_if_t<!ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>, void> set_arg(const Key &key,
2624  Value &&value) {
2625  set_arg_impl<i>(key, std::forward<Value>(value));
2626  }
2627 
2628  // cases 4+5+6
2629  template <std::size_t i, typename Key, typename Value>
2630  std::enable_if_t<ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>, void> set_arg(Value &&value) {
2631  set_arg_impl<i>(ttg::Void{}, std::forward<Value>(value));
2632  }
2633 
2634  template <std::size_t i, typename Key = keyT>
2635  std::enable_if_t<ttg::meta::is_void_v<Key>, void> set_arg() {
2636  set_arg_impl<i>(ttg::Void{}, ttg::Void{});
2637  }
2638 
2639  // case 3
2640  template <std::size_t i, typename Key>
2641  std::enable_if_t<!ttg::meta::is_void_v<Key>, void> set_arg(const Key &key) {
2642  set_arg_impl<i>(key, ttg::Void{});
2643  }
2644 
2645  template<typename Value, typename Key>
2646  bool can_inline_data(Value* value_ptr, detail::ttg_data_copy_t *copy, const Key& key, std::size_t num_keys) {
2647  using decvalueT = std::decay_t<Value>;
2648  bool inline_data = false;
2649  /* check whether to send data in inline */
2650  std::size_t iov_size = 0;
2651  std::size_t metadata_size = 0;
2652  if constexpr (ttg::has_split_metadata<std::decay_t<Value>>::value) {
2654  auto iovs = descr.get_data(*const_cast<decvalueT *>(value_ptr));
2655  iov_size = std::accumulate(iovs.begin(), iovs.end(), 0,
2656  [](std::size_t s, auto& iov){ return s + iov.num_bytes; });
2657  auto metadata = descr.get_metadata(*const_cast<decvalueT *>(value_ptr));
2658  metadata_size = ttg::default_data_descriptor<decltype(metadata)>::payload_size(&metadata);
2659  } else {
2660  /* TODO: how can we query the iovecs of the buffers here without actually packing the data? */
2661  metadata_size = ttg::default_data_descriptor<ttg::meta::remove_cvr_t<Value>>::payload_size(value_ptr);
2662  iov_size = std::accumulate(copy->iovec_begin(), copy->iovec_end(), 0,
2663  [](std::size_t s, auto& iov){ return s + iov.num_bytes; });
2664  }
2665  /* key is packed at the end */
2666  std::size_t key_pack_size = ttg::default_data_descriptor<Key>::payload_size(&key);
2667  std::size_t pack_size = key_pack_size + metadata_size + iov_size;
2668  if (pack_size < detail::max_inline_size) {
2669  inline_data = true;
2670  }
2671  return inline_data;
2672  }
2673 
2674  // Used to set the i'th argument
2675  template <std::size_t i, typename Key, typename Value>
2676  void set_arg_impl(const Key &key, Value &&value, detail::ttg_data_copy_t *copy_in = nullptr) {
2677  int owner;
2678  using decvalueT = std::decay_t<Value>;
2679  using norefvalueT = std::remove_reference_t<Value>;
2680  norefvalueT *value_ptr = &value;
2681 
2682 #if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND)
2683  if(world.impl().profiling()) {
2684  parsec_profiling_ts_trace(world.impl().parsec_ttg_profile_backend_set_arg_start, 0, 0, NULL);
2685  }
2686 #endif
2687 
2688  if constexpr (!ttg::meta::is_void_v<Key>)
2689  owner = keymap(key);
2690  else
2691  owner = keymap();
2692  if (owner == world.rank()) {
2693  if constexpr (!ttg::meta::is_void_v<keyT>)
2694  set_arg_local_impl<i>(key, std::forward<Value>(value), copy_in);
2695  else
2696  set_arg_local_impl<i>(ttg::Void{}, std::forward<Value>(value), copy_in);
2697 #if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND)
2698  if(world.impl().profiling()) {
2699  parsec_profiling_ts_trace(world.impl().parsec_ttg_profile_backend_set_arg_end, 0, 0, NULL);
2700  }
2701 #endif
2702  return;
2703  }
2704  // the target task is remote. Pack the information and send it to
2705  // the corresponding peer.
2706  // TODO do we need to copy value?
2707  using msg_t = detail::msg_t;
2708  auto &world_impl = world.impl();
2709  uint64_t pos = 0;
2710  int num_iovecs = 0;
2711  std::unique_ptr<msg_t> msg = std::make_unique<msg_t>(get_instance_id(), world_impl.taskpool()->taskpool_id,
2712  msg_header_t::MSG_SET_ARG, i, world_impl.rank(), 1);
2713 
2714  if constexpr (!ttg::meta::is_void_v<decvalueT>) {
2715 
2716  detail::ttg_data_copy_t *copy = copy_in;
2717  /* make sure we have a data copy to register with */
2718  if (nullptr == copy) {
2720  if (nullptr == copy) {
2721  // We need to create a copy for this data, as it does not exist yet.
2722  copy = detail::create_new_datacopy(std::forward<Value>(value));
2723  // use the new value from here on out
2724  value_ptr = static_cast<norefvalueT*>(copy->get_ptr());
2725  }
2726  }
2727 
2728  bool inline_data = can_inline_data(value_ptr, copy, key, 1);
2729  msg->tt_id.inline_data = inline_data;
2730 
2731  auto handle_iovec_fn = [&](auto&& iovecs){
2732 
2733  if (inline_data) {
2734  /* inline data is packed right after the tt_id in the message */
2735  for (auto &&iov : iovecs) {
2736  std::memcpy(msg->bytes + pos, iov.data, iov.num_bytes);
2737  pos += iov.num_bytes;
2738  }
2739  } else {
2740 
2741  /* TODO: at the moment, the tag argument to parsec_ce.get() is treated as a
2742  * raw function pointer instead of a preregistered AM tag, so play that game.
2743  * Once this is fixed in PaRSEC we need to use parsec_ttg_rma_tag instead! */
2744  parsec_ce_tag_t cbtag = reinterpret_cast<parsec_ce_tag_t>(&detail::get_remote_complete_cb);
2745  std::memcpy(msg->bytes + pos, &cbtag, sizeof(cbtag));
2746  pos += sizeof(cbtag);
2747 
2752  for (auto &&iov : iovecs) {
2753  copy = detail::register_data_copy<decvalueT>(copy, nullptr, true);
2754  parsec_ce_mem_reg_handle_t lreg;
2755  size_t lreg_size;
2756  /* TODO: only register once when we can broadcast the data! */
2757  parsec_ce.mem_register(iov.data, PARSEC_MEM_TYPE_NONCONTIGUOUS, iov.num_bytes, parsec_datatype_int8_t,
2758  iov.num_bytes, &lreg, &lreg_size);
2759  auto lreg_ptr = std::shared_ptr<void>{lreg, [](void *ptr) {
2760  parsec_ce_mem_reg_handle_t memreg = (parsec_ce_mem_reg_handle_t)ptr;
2761  parsec_ce.mem_unregister(&memreg);
2762  }};
2763  int32_t lreg_size_i = lreg_size;
2764  std::memcpy(msg->bytes + pos, &lreg_size_i, sizeof(lreg_size_i));
2765  pos += sizeof(lreg_size_i);
2766  std::memcpy(msg->bytes + pos, lreg, lreg_size);
2767  pos += lreg_size;
2768  //std::cout << "set_arg_impl lreg " << lreg << std::endl;
2769  /* TODO: can we avoid the extra indirection of going through std::function? */
2770  std::function<void(void)> *fn = new std::function<void(void)>([=]() mutable {
2771  /* shared_ptr of value and registration captured by value so resetting
2772  * them here will eventually release the memory/registration */
2774  lreg_ptr.reset();
2775  });
2776  std::intptr_t fn_ptr{reinterpret_cast<std::intptr_t>(fn)};
2777  std::memcpy(msg->bytes + pos, &fn_ptr, sizeof(fn_ptr));
2778  pos += sizeof(fn_ptr);
2779  }
2780  }
2781  };
2782 
2783  if constexpr (ttg::has_split_metadata<std::decay_t<Value>>::value) {
2785  auto iovs = descr.get_data(*const_cast<decvalueT *>(value_ptr));
2786  num_iovecs = std::distance(std::begin(iovs), std::end(iovs));
2787  /* pack the metadata */
2788  auto metadata = descr.get_metadata(*const_cast<decvalueT *>(value_ptr));
2789  size_t metadata_size = sizeof(metadata);
2790  pos = pack(metadata, msg->bytes, pos);
2791  //std::cout << "set_arg_impl splitmd num_iovecs " << num_iovecs << std::endl;
2792  handle_iovec_fn(iovs);
2793  } else if constexpr (!ttg::has_split_metadata<std::decay_t<Value>>::value) {
2794  /* serialize the object */
2795  //std::cout << "PRE pack num_iovecs " << std::distance(copy->iovec_begin(), copy->iovec_end()) << std::endl;
2796  pos = pack(*value_ptr, msg->bytes, pos, copy);
2797  num_iovecs = std::distance(copy->iovec_begin(), copy->iovec_end());
2798  //std::cout << "POST pack num_iovecs " << num_iovecs << std::endl;
2799  /* handle any iovecs contained in it */
2800  handle_iovec_fn(copy->iovec_span());
2801  copy->iovec_reset();
2802  }
2803 
2804  msg->tt_id.num_iovecs = num_iovecs;
2805  }
2806 
2807  /* pack the key */
2808  msg->tt_id.num_keys = 0;
2809  msg->tt_id.key_offset = pos;
2810  if constexpr (!ttg::meta::is_void_v<Key>) {
2811  size_t tmppos = pack(key, msg->bytes, pos);
2812  pos = tmppos;
2813  msg->tt_id.num_keys = 1;
2814  }
2815 
2816  parsec_taskpool_t *tp = world_impl.taskpool();
2817  tp->tdm.module->outgoing_message_start(tp, owner, NULL);
2818  tp->tdm.module->outgoing_message_pack(tp, owner, NULL, NULL, 0);
2819  //std::cout << "set_arg_impl send_am owner " << owner << " sender " << msg->tt_id.sender << std::endl;
2820  parsec_ce.send_am(&parsec_ce, world_impl.parsec_ttg_tag(), owner, static_cast<void *>(msg.get()),
2821  sizeof(msg_header_t) + pos);
2822 #if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND)
2823  if(world.impl().profiling()) {
2824  parsec_profiling_ts_trace(world.impl().parsec_ttg_profile_backend_set_arg_end, 0, 0, NULL);
2825  }
2826 #endif
2827 #if defined(PARSEC_PROF_GRAPHER)
2828  if(NULL != detail::parsec_ttg_caller && !detail::parsec_ttg_caller->is_dummy()) {
2830  char orig_str[32];
2831  char dest_str[32];
2832  if(orig_index >= 0) {
2833  snprintf(orig_str, 32, "%d", orig_index);
2834  } else {
2835  strncpy(orig_str, "_", 32);
2836  }
2837  snprintf(dest_str, 32, "%lu", i);
2838  parsec_flow_t orig{ .name = orig_str, .sym_type = PARSEC_SYM_INOUT, .flow_flags = PARSEC_FLOW_ACCESS_RW,
2839  .flow_index = 0, .flow_datatype_mask = ~0 };
2840  parsec_flow_t dest{ .name = dest_str, .sym_type = PARSEC_SYM_INOUT, .flow_flags = PARSEC_FLOW_ACCESS_RW,
2841  .flow_index = 0, .flow_datatype_mask = ~0 };
2842  task_t *task = create_new_task(key);
2843  parsec_prof_grapher_dep(&detail::parsec_ttg_caller->parsec_task, &task->parsec_task, 0, &orig, &dest);
2844  delete task;
2845  }
2846 #endif
2847  }
2848 
2849  template <int i, typename Iterator, typename Value>
2850  void broadcast_arg_local(Iterator &&begin, Iterator &&end, const Value &value) {
2851 #if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND)
2852  if(world.impl().profiling()) {
2853  parsec_profiling_ts_trace(world.impl().parsec_ttg_profile_backend_bcast_arg_start, 0, 0, NULL);
2854  }
2855 #endif
2856  parsec_task_t *task_ring = nullptr;
2857  detail::ttg_data_copy_t *copy = nullptr;
2858  if (nullptr != detail::parsec_ttg_caller) {
2860  }
2861 
2862  for (auto it = begin; it != end; ++it) {
2863  set_arg_local_impl<i>(*it, value, copy, &task_ring);
2864  }
2865  /* submit all ready tasks at once */
2866  if (nullptr != task_ring) {
2867  parsec_task_t *vp_task_ring[1] = { task_ring };
2868  __parsec_schedule_vp(world.impl().execution_stream(), vp_task_ring, 0);
2869  }
2870 #if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND)
2871  if(world.impl().profiling()) {
2872  parsec_profiling_ts_trace(world.impl().parsec_ttg_profile_backend_set_arg_end, 0, 0, NULL);
2873  }
2874 #endif
2875  }
2876 
2877  template <std::size_t i, typename Key, typename Value>
2878  std::enable_if_t<!ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>,
2879  void>
2880  broadcast_arg(const ttg::span<const Key> &keylist, const Value &value) {
2881  using valueT = std::tuple_element_t<i, input_values_full_tuple_type>;
2882  auto world = ttg_default_execution_context();
2883  auto np = world.size();
2884  int rank = world.rank();
2885  uint64_t pos = 0;
2886  bool have_remote = keylist.end() != std::find_if(keylist.begin(), keylist.end(),
2887  [&](const Key &key) { return keymap(key) != rank; });
2888 
2889  if (have_remote) {
2890  using decvalueT = std::decay_t<Value>;
2891 
2892  /* sort the input key list by owner and check whether there are remote keys */
2893  std::vector<Key> keylist_sorted(keylist.begin(), keylist.end());
2894  std::sort(keylist_sorted.begin(), keylist_sorted.end(), [&](const Key &a, const Key &b) mutable {
2895  int rank_a = keymap(a);
2896  int rank_b = keymap(b);
2897  // sort so that the keys for my rank are first, rank+1 next, ..., wrapping around to 0
2898  int pos_a = (rank_a + np - rank) % np;
2899  int pos_b = (rank_b + np - rank) % np;
2900  return pos_a < pos_b;
2901  });
2902 
2903  /* Assuming there are no local keys, will be updated while iterating over the keys */
2904  auto local_begin = keylist_sorted.end();
2905  auto local_end = keylist_sorted.end();
2906 
2907  int32_t num_iovs = 0;
2908 
2911  assert(nullptr != copy);
2912 
2913  using msg_t = detail::msg_t;
2914  auto &world_impl = world.impl();
2915  std::unique_ptr<msg_t> msg = std::make_unique<msg_t>(get_instance_id(), world_impl.taskpool()->taskpool_id,
2916  msg_header_t::MSG_SET_ARG, i, world_impl.rank());
2917 
2918  /* check if we inline the data */
2919  /* TODO: this assumes the worst case: that all keys are packed at once (i.e., go to the same remote). Can we do better?*/
2920  bool inline_data = can_inline_data(&value, copy, keylist_sorted[0], keylist_sorted.size());
2921  msg->tt_id.inline_data = inline_data;
2922 
2923  std::vector<std::pair<int32_t, std::shared_ptr<void>>> memregs;
2924  auto handle_iovs_fn = [&](auto&& iovs){
2925 
2926  if (inline_data) {
2927  /* inline data is packed right after the tt_id in the message */
2928  for (auto &&iov : iovs) {
2929  std::memcpy(msg->bytes + pos, iov.data, iov.num_bytes);
2930  pos += iov.num_bytes;
2931  }
2932  } else {
2933 
2934  /* TODO: at the moment, the tag argument to parsec_ce.get() is treated as a
2935  * raw function pointer instead of a preregistered AM tag, so play that game.
2936  * Once this is fixed in PaRSEC we need to use parsec_ttg_rma_tag instead! */
2937  parsec_ce_tag_t cbtag = reinterpret_cast<parsec_ce_tag_t>(&detail::get_remote_complete_cb);
2938  std::memcpy(msg->bytes + pos, &cbtag, sizeof(cbtag));
2939  pos += sizeof(cbtag);
2940 
2941  for (auto &&iov : iovs) {
2942  parsec_ce_mem_reg_handle_t lreg;
2943  size_t lreg_size;
2944  parsec_ce.mem_register(iov.data, PARSEC_MEM_TYPE_NONCONTIGUOUS, iov.num_bytes, parsec_datatype_int8_t,
2945  iov.num_bytes, &lreg, &lreg_size);
2946  /* TODO: use a static function for deregistration here? */
2947  memregs.push_back(std::make_pair(static_cast<int32_t>(lreg_size),
2948  /* TODO: this assumes that parsec_ce_mem_reg_handle_t is void* */
2949  std::shared_ptr<void>{lreg, [](void *ptr) {
2950  parsec_ce_mem_reg_handle_t memreg =
2951  (parsec_ce_mem_reg_handle_t)ptr;
2952  //std::cout << "broadcast_arg memunreg lreg " << memreg << std::endl;
2953  parsec_ce.mem_unregister(&memreg);
2954  }}));
2955  //std::cout << "broadcast_arg memreg lreg " << lreg << std::endl;
2956  }
2957  }
2958  };
2959 
2960  if constexpr (ttg::has_split_metadata<std::decay_t<Value>>::value) {
2962  /* pack the metadata */
2963  auto metadata = descr.get_metadata(value);
2964  size_t metadata_size = sizeof(metadata);
2965  pos = pack(metadata, msg->bytes, pos);
2966  auto iovs = descr.get_data(*const_cast<decvalueT *>(&value));
2967  num_iovs = std::distance(std::begin(iovs), std::end(iovs));
2968  memregs.reserve(num_iovs);
2969  handle_iovs_fn(iovs);
2970  //std::cout << "broadcast_arg splitmd num_iovecs " << num_iovs << std::endl;
2971  } else if constexpr (!ttg::has_split_metadata<std::decay_t<Value>>::value) {
2972  /* serialize the object once */
2973  pos = pack(value, msg->bytes, pos, copy);
2974  num_iovs = std::distance(copy->iovec_begin(), copy->iovec_end());
2975  handle_iovs_fn(copy->iovec_span());
2976  copy->iovec_reset();
2977  }
2978 
2979  msg->tt_id.num_iovecs = num_iovs;
2980 
2981  std::size_t save_pos = pos;
2982 
2983  parsec_taskpool_t *tp = world_impl.taskpool();
2984  for (auto it = keylist_sorted.begin(); it < keylist_sorted.end(); /* increment done inline */) {
2985 
2986  auto owner = keymap(*it);
2987  if (owner == rank) {
2988  local_begin = it;
2989  /* find first non-local key */
2990  local_end =
2991  std::find_if_not(++it, keylist_sorted.end(), [&](const Key &key) { return keymap(key) == rank; });
2992  it = local_end;
2993  continue;
2994  }
2995 
2996  /* rewind the buffer and start packing a new set of memregs and keys */
2997  pos = save_pos;
3003  if (!inline_data) {
3004  for (int idx = 0; idx < num_iovs; ++idx) {
3005  // auto [lreg_size, lreg_ptr] = memregs[idx];
3006  int32_t lreg_size;
3007  std::shared_ptr<void> lreg_ptr;
3008  std::tie(lreg_size, lreg_ptr) = memregs[idx];
3009  std::memcpy(msg->bytes + pos, &lreg_size, sizeof(lreg_size));
3010  pos += sizeof(lreg_size);
3011  std::memcpy(msg->bytes + pos, lreg_ptr.get(), lreg_size);
3012  pos += lreg_size;
3013  //std::cout << "broadcast_arg lreg_ptr " << lreg_ptr.get() << std::endl;
3014  /* mark another reader on the copy */
3015  copy = detail::register_data_copy<valueT>(copy, nullptr, true);
3016  /* create a function that will be invoked upon RMA completion at the target */
3017  std::function<void(void)> *fn = new std::function<void(void)>([=]() mutable {
3018  /* shared_ptr of value and registration captured by value so resetting
3019  * them here will eventually release the memory/registration */
3021  lreg_ptr.reset();
3022  });
3023  std::intptr_t fn_ptr{reinterpret_cast<std::intptr_t>(fn)};
3024  std::memcpy(msg->bytes + pos, &fn_ptr, sizeof(fn_ptr));
3025  pos += sizeof(fn_ptr);
3026  }
3027  }
3028 
3029  /* mark the beginning of the keys */
3030  msg->tt_id.key_offset = pos;
3031 
3032  /* pack all keys for this owner */
3033  int num_keys = 0;
3034  do {
3035  ++num_keys;
3036  pos = pack(*it, msg->bytes, pos);
3037  ++it;
3038  } while (it < keylist_sorted.end() && keymap(*it) == owner);
3039  msg->tt_id.num_keys = num_keys;
3040 
3041  tp->tdm.module->outgoing_message_start(tp, owner, NULL);
3042  tp->tdm.module->outgoing_message_pack(tp, owner, NULL, NULL, 0);
3043  //std::cout << "broadcast_arg send_am owner " << owner << std::endl;
3044  parsec_ce.send_am(&parsec_ce, world_impl.parsec_ttg_tag(), owner, static_cast<void *>(msg.get()),
3045  sizeof(msg_header_t) + pos);
3046  }
3047  /* handle local keys */
3048  broadcast_arg_local<i>(local_begin, local_end, value);
3049  } else {
3050  /* handle local keys */
3051  broadcast_arg_local<i>(keylist.begin(), keylist.end(), value);
3052  }
3053  }
3054 
3055  // Used by invoke to set all arguments associated with a task
3056  // Is: index sequence of elements in args
3057  // Js: index sequence of input terminals to set
3058  template <typename Key, typename... Ts, size_t... Is, size_t... Js>
3059  std::enable_if_t<ttg::meta::is_none_void_v<Key>, void> set_args(std::index_sequence<Is...>,
3060  std::index_sequence<Js...>, const Key &key,
3061  const std::tuple<Ts...> &args) {
3062  static_assert(sizeof...(Js) == sizeof...(Is));
3063  constexpr size_t js[] = {Js...};
3064  int junk[] = {0, (set_arg<js[Is]>(key, TT::get<Is>(args)), 0)...};
3065  junk[0]++;
3066  }
3067 
3068  // Used by invoke to set all arguments associated with a task
3069  // Is: index sequence of input terminals to set
3070  template <typename Key, typename... Ts, size_t... Is>
3071  std::enable_if_t<ttg::meta::is_none_void_v<Key>, void> set_args(std::index_sequence<Is...> is, const Key &key,
3072  const std::tuple<Ts...> &args) {
3073  set_args(std::index_sequence_for<Ts...>{}, is, key, args);
3074  }
3075 
3076  // Used by invoke to set all arguments associated with a task
3077  // Is: index sequence of elements in args
3078  // Js: index sequence of input terminals to set
3079  template <typename Key = keyT, typename... Ts, size_t... Is, size_t... Js>
3080  std::enable_if_t<ttg::meta::is_void_v<Key>, void> set_args(std::index_sequence<Is...>, std::index_sequence<Js...>,
3081  const std::tuple<Ts...> &args) {
3082  static_assert(sizeof...(Js) == sizeof...(Is));
3083  constexpr size_t js[] = {Js...};
3084  int junk[] = {0, (set_arg<js[Is], void>(TT::get<Is>(args)), 0)...};
3085  junk[0]++;
3086  }
3087 
3088  // Used by invoke to set all arguments associated with a task
3089  // Is: index sequence of input terminals to set
3090  template <typename Key = keyT, typename... Ts, size_t... Is>
3091  std::enable_if_t<ttg::meta::is_void_v<Key>, void> set_args(std::index_sequence<Is...> is,
3092  const std::tuple<Ts...> &args) {
3093  set_args(std::index_sequence_for<Ts...>{}, is, args);
3094  }
3095 
3096  public:
3099  template <std::size_t i>
3100  void set_static_argstream_size(std::size_t size) {
3101  assert(std::get<i>(input_reducers) && "TT::set_static_argstream_size called on nonstreaming input terminal");
3102  assert(size > 0 && "TT::set_static_argstream_size(key,size) called with size=0");
3103 
3104  this->trace(world.rank(), ":", get_name(), ": setting global stream size for terminal ", i);
3105 
3106  // Check if stream is already bounded
3107  if (static_stream_goal[i] < std::numeric_limits<std::size_t>::max()) {
3108  ttg::print_error(world.rank(), ":", get_name(), " : error stream is already bounded : ", i);
3109  throw std::runtime_error("TT::set_static_argstream_size called for a bounded stream");
3110  }
3111 
3112  static_stream_goal[i] = size;
3113  }
3114 
3118  template <std::size_t i, typename Key>
3119  std::enable_if_t<!ttg::meta::is_void_v<Key>, void> set_argstream_size(const Key &key, std::size_t size) {
3120  // preconditions
3121  assert(std::get<i>(input_reducers) && "TT::set_argstream_size called on nonstreaming input terminal");
3122  assert(size > 0 && "TT::set_argstream_size(key,size) called with size=0");
3123 
3124  // body
3125  const auto owner = keymap(key);
3126  if (owner != world.rank()) {
3127  ttg::trace(world.rank(), ":", get_name(), ":", key, " : forwarding stream size for terminal ", i);
3128  using msg_t = detail::msg_t;
3129  auto &world_impl = world.impl();
3130  uint64_t pos = 0;
3131  std::unique_ptr<msg_t> msg = std::make_unique<msg_t>(get_instance_id(), world_impl.taskpool()->taskpool_id,
3133  world_impl.rank(), 1);
3134  /* pack the key */
3135  pos = pack(key, msg->bytes, pos);
3136  pos = pack(size, msg->bytes, pos);
3137  parsec_taskpool_t *tp = world_impl.taskpool();
3138  tp->tdm.module->outgoing_message_start(tp, owner, NULL);
3139  tp->tdm.module->outgoing_message_pack(tp, owner, NULL, NULL, 0);
3140  parsec_ce.send_am(&parsec_ce, world_impl.parsec_ttg_tag(), owner, static_cast<void *>(msg.get()),
3141  sizeof(msg_header_t) + pos);
3142  } else {
3143  ttg::trace(world.rank(), ":", get_name(), ":", key, " : setting stream size to ", size, " for terminal ", i);
3144 
3145  auto hk = reinterpret_cast<parsec_key_t>(&key);
3146  task_t *task;
3147  parsec_hash_table_lock_bucket(&tasks_table, hk);
3148  if (nullptr == (task = (task_t *)parsec_hash_table_nolock_find(&tasks_table, hk))) {
3149  task = create_new_task(key);
3150  world.impl().increment_created();
3151  parsec_hash_table_nolock_insert(&tasks_table, &task->tt_ht_item);
3152  if( world.impl().dag_profiling() ) {
3153 #if defined(PARSEC_PROF_GRAPHER)
3154  parsec_prof_grapher_task(&task->parsec_task, world.impl().execution_stream()->th_id, 0, *(uintptr_t*)&(task->parsec_task.locals[0]));
3155 #endif
3156  }
3157  }
3158  parsec_hash_table_unlock_bucket(&tasks_table, hk);
3159 
3160  // TODO: Unfriendly implementation, cannot check if stream is already bounded
3161  // TODO: Unfriendly implementation, cannot check if stream has been finalized already
3162 
3163  // commit changes
3164  // 1) "lock" the stream by incrementing the reduce_count
3165  // 2) set the goal
3166  // 3) "unlock" the stream
3167  // only one thread will see the reduce_count be zero and the goal match the size
3168  task->streams[i].reduce_count.fetch_add(1, std::memory_order_acquire);
3169  task->streams[i].goal = size;
3170  auto c = task->streams[i].reduce_count.fetch_sub(1, std::memory_order_release);
3171  if (1 == c && (task->streams[i].size >= size)) {
3172  release_task(task);
3173  }
3174  }
3175  }
3176 
3179  template <std::size_t i, typename Key = keyT>
3180  std::enable_if_t<ttg::meta::is_void_v<Key>, void> set_argstream_size(std::size_t size) {
3181  // preconditions
3182  assert(std::get<i>(input_reducers) && "TT::set_argstream_size called on nonstreaming input terminal");
3183  assert(size > 0 && "TT::set_argstream_size(key,size) called with size=0");
3184 
3185  // body
3186  const auto owner = keymap();
3187  if (owner != world.rank()) {
3188  ttg::trace(world.rank(), ":", get_name(), " : forwarding stream size for terminal ", i);
3189  using msg_t = detail::msg_t;
3190  auto &world_impl = world.impl();
3191  uint64_t pos = 0;
3192  std::unique_ptr<msg_t> msg = std::make_unique<msg_t>(get_instance_id(), world_impl.taskpool()->taskpool_id,
3194  world_impl.rank(), 0);
3195  pos = pack(size, msg->bytes, pos);
3196  parsec_taskpool_t *tp = world_impl.taskpool();
3197  tp->tdm.module->outgoing_message_start(tp, owner, NULL);
3198  tp->tdm.module->outgoing_message_pack(tp, owner, NULL, NULL, 0);
3199  parsec_ce.send_am(&parsec_ce, world_impl.parsec_ttg_tag(), owner, static_cast<void *>(msg.get()),
3200  sizeof(msg_header_t) + pos);
3201  } else {
3202  ttg::trace(world.rank(), ":", get_name(), " : setting stream size to ", size, " for terminal ", i);
3203 
3204  parsec_key_t hk = 0;
3205  task_t *task;
3206  parsec_hash_table_lock_bucket(&tasks_table, hk);
3207  if (nullptr == (task = (task_t *)parsec_hash_table_nolock_find(&tasks_table, hk))) {
3208  task = create_new_task(ttg::Void{});
3209  world.impl().increment_created();
3210  parsec_hash_table_nolock_insert(&tasks_table, &task->tt_ht_item);
3211  if( world.impl().dag_profiling() ) {
3212 #if defined(PARSEC_PROF_GRAPHER)
3213  parsec_prof_grapher_task(&task->parsec_task, world.impl().execution_stream()->th_id, 0, *(uintptr_t*)&(task->parsec_task.locals[0]));
3214 #endif
3215  }
3216  }
3217  parsec_hash_table_unlock_bucket(&tasks_table, hk);
3218 
3219  // TODO: Unfriendly implementation, cannot check if stream is already bounded
3220  // TODO: Unfriendly implementation, cannot check if stream has been finalized already
3221 
3222  // commit changes
3223  // 1) "lock" the stream by incrementing the reduce_count
3224  // 2) set the goal
3225  // 3) "unlock" the stream
3226  // only one thread will see the reduce_count be zero and the goal match the size
3227  task->streams[i].reduce_count.fetch_add(1, std::memory_order_acquire);
3228  task->streams[i].goal = size;
3229  auto c = task->streams[i].reduce_count.fetch_sub(1, std::memory_order_release);
3230  if (1 == c && (task->streams[i].size >= size)) {
3231  release_task(task);
3232  }
3233  }
3234  }
3235 
3237  template <std::size_t i, typename Key>
3238  std::enable_if_t<!ttg::meta::is_void_v<Key>, void> finalize_argstream(const Key &key) {
3239  // preconditions
3240  assert(std::get<i>(input_reducers) && "TT::finalize_argstream called on nonstreaming input terminal");
3241 
3242  // body
3243  const auto owner = keymap(key);
3244  if (owner != world.rank()) {
3245  ttg::trace(world.rank(), ":", get_name(), " : ", key, ": forwarding stream finalize for terminal ", i);
3246  using msg_t = detail::msg_t;
3247  auto &world_impl = world.impl();
3248  uint64_t pos = 0;
3249  std::unique_ptr<msg_t> msg = std::make_unique<msg_t>(get_instance_id(), world_impl.taskpool()->taskpool_id,
3251  world_impl.rank(), 1);
3252  /* pack the key */
3253  pos = pack(key, msg->bytes, pos);
3254  parsec_taskpool_t *tp = world_impl.taskpool();
3255  tp->tdm.module->outgoing_message_start(tp, owner, NULL);
3256  tp->tdm.module->outgoing_message_pack(tp, owner, NULL, NULL, 0);
3257  parsec_ce.send_am(&parsec_ce, world_impl.parsec_ttg_tag(), owner, static_cast<void *>(msg.get()),
3258  sizeof(msg_header_t) + pos);
3259  } else {
3260  ttg::trace(world.rank(), ":", get_name(), " : ", key, ": finalizing stream for terminal ", i);
3261 
3262  auto hk = reinterpret_cast<parsec_key_t>(&key);
3263  task_t *task = nullptr;
3264  //parsec_hash_table_lock_bucket(&tasks_table, hk);
3265  if (nullptr == (task = (task_t *)parsec_hash_table_find(&tasks_table, hk))) {
3266  ttg::print_error(world.rank(), ":", get_name(), ":", key,
3267  " : error finalize called on stream that never received an input data: ", i);
3268  throw std::runtime_error("TT::finalize called on stream that never received an input data");
3269  }
3270 
3271  // TODO: Unfriendly implementation, cannot check if stream is already bounded
3272  // TODO: Unfriendly implementation, cannot check if stream has been finalized already
3273 
3274  // commit changes
3275  // 1) "lock" the stream by incrementing the reduce_count
3276  // 2) set the goal
3277  // 3) "unlock" the stream
3278  // only one thread will see the reduce_count be zero and the goal match the size
3279  task->streams[i].reduce_count.fetch_add(1, std::memory_order_acquire);
3280  task->streams[i].goal = 1;
3281  auto c = task->streams[i].reduce_count.fetch_sub(1, std::memory_order_release);
3282  if (1 == c && (task->streams[i].size >= 1)) {
3283  release_task(task);
3284  }
3285  }
3286  }
3287 
3289  template <std::size_t i, bool key_is_void = ttg::meta::is_void_v<keyT>>
3290  std::enable_if_t<key_is_void, void> finalize_argstream() {
3291  // preconditions
3292  assert(std::get<i>(input_reducers) && "TT::finalize_argstream called on nonstreaming input terminal");
3293 
3294  // body
3295  const auto owner = keymap();
3296  if (owner != world.rank()) {
3297  ttg::trace(world.rank(), ":", get_name(), ": forwarding stream finalize for terminal ", i);
3298  using msg_t = detail::msg_t;
3299  auto &world_impl = world.impl();
3300  uint64_t pos = 0;
3301  std::unique_ptr<msg_t> msg = std::make_unique<msg_t>(get_instance_id(), world_impl.taskpool()->taskpool_id,
3303  world_impl.rank(), 0);
3304  parsec_taskpool_t *tp = world_impl.taskpool();
3305  tp->tdm.module->outgoing_message_start(tp, owner, NULL);
3306  tp->tdm.module->outgoing_message_pack(tp, owner, NULL, NULL, 0);
3307  parsec_ce.send_am(&parsec_ce, world_impl.parsec_ttg_tag(), owner, static_cast<void *>(msg.get()),
3308  sizeof(msg_header_t) + pos);
3309  } else {
3310  ttg::trace(world.rank(), ":", get_name(), ": finalizing stream for terminal ", i);
3311 
3312  auto hk = static_cast<parsec_key_t>(0);
3313  task_t *task = nullptr;
3314  if (nullptr == (task = (task_t *)parsec_hash_table_find(&tasks_table, hk))) {
3315  ttg::print_error(world.rank(), ":", get_name(),
3316  " : error finalize called on stream that never received an input data: ", i);
3317  throw std::runtime_error("TT::finalize called on stream that never received an input data");
3318  }
3319 
3320  // TODO: Unfriendly implementation, cannot check if stream is already bounded
3321  // TODO: Unfriendly implementation, cannot check if stream has been finalized already
3322 
3323  // commit changes
3324  // 1) "lock" the stream by incrementing the reduce_count
3325  // 2) set the goal
3326  // 3) "unlock" the stream
3327  // only one thread will see the reduce_count be zero and the goal match the size
3328  task->streams[i].reduce_count.fetch_add(1, std::memory_order_acquire);
3329  task->streams[i].goal = 1;
3330  auto c = task->streams[i].reduce_count.fetch_sub(1, std::memory_order_release);
3331  if (1 == c && (task->streams[i].size >= 1)) {
3332  release_task(task);
3333  }
3334  }
3335  }
3336 
3338 
3339  assert(detail::parsec_ttg_caller->dev_ptr && detail::parsec_ttg_caller->dev_ptr->gpu_task);
3340  parsec_gpu_task_t *gpu_task = detail::parsec_ttg_caller->dev_ptr->gpu_task;
3341  auto check_parsec_data = [&](parsec_data_t* data) {
3342  if (data->owner_device != 0) {
3343  /* find the flow */
3344  int flowidx = 0;
3345  while (flowidx < MAX_PARAM_COUNT &&
3346  gpu_task->flow[flowidx]->flow_flags != PARSEC_FLOW_ACCESS_NONE) {
3347  if (detail::parsec_ttg_caller->parsec_task.data[flowidx].data_in->original == data) {
3348  /* found the right data, set the corresponding flow as pushout */
3349  break;
3350  }
3351  ++flowidx;
3352  }
3353  if (flowidx == MAX_PARAM_COUNT) {
3354  throw std::runtime_error("Cannot add more than MAX_PARAM_COUNT flows to a task!");
3355  }
3356  if (gpu_task->flow[flowidx]->flow_flags == PARSEC_FLOW_ACCESS_NONE) {
3357  /* no flow found, add one and mark it pushout */
3358  detail::parsec_ttg_caller->parsec_task.data[flowidx].data_in = data->device_copies[0];
3359  gpu_task->flow_nb_elts[flowidx] = data->nb_elts;
3360  }
3361  /* need to mark the flow RW to make PaRSEC happy */
3362  ((parsec_flow_t *)gpu_task->flow[flowidx])->flow_flags |= PARSEC_FLOW_ACCESS_RW;
3363  gpu_task->pushout |= 1<<flowidx;
3364  }
3365  };
3366  copy->foreach_parsec_data(check_parsec_data);
3367  }
3368 
3369 
3370  /* check whether a data needs to be pushed out */
3371  template <std::size_t i, typename Value, typename RemoteCheckFn>
3372  std::enable_if_t<!std::is_void_v<std::decay_t<Value>>,
3373  void>
3374  do_prepare_send(const Value &value, RemoteCheckFn&& remote_check) {
3375  using valueT = std::tuple_element_t<i, input_values_full_tuple_type>;
3376  static constexpr const bool value_is_const = std::is_const_v<valueT>;
3377 
3378  /* get the copy */
3381 
3382  /* if there is no copy we don't need to prepare anything */
3383  if (nullptr == copy) {
3384  return;
3385  }
3386 
3388  bool need_pushout = false;
3389 
3391  /* already marked pushout, skip the rest */
3392  return;
3393  }
3394 
3395  /* TODO: remove this once we support reductions on the GPU */
3396  auto &reducer = std::get<i>(input_reducers);
3397  if (reducer) {
3398  /* reductions are currently done only on the host so push out */
3399  copy_mark_pushout(copy);
3401  return;
3402  }
3403 
3404  if constexpr (value_is_const) {
3406  /* The data has been modified previously. PaRSEC requires us to pushout
3407  * data if we transition from a writer to one or more readers. */
3408  need_pushout = true;
3409  }
3410 
3411  /* check for multiple readers */
3414  }
3415 
3417  /* there is a writer already, we will need to create a copy */
3418  need_pushout = true;
3419  }
3420 
3422  } else {
3425  need_pushout = true;
3426  } else {
3428  /* there are readers, we will need to create a copy */
3429  need_pushout = true;
3430  }
3432  }
3433  }
3434 
3435  if constexpr (!derived_has_device_op()) {
3436  need_pushout = true;
3437  }
3438 
3439  /* check if there are non-local successors if it's a device task */
3440  if (!need_pushout) {
3441  bool device_supported = false;
3442  if constexpr (derived_has_cuda_op()) {
3443  device_supported = world.impl().mpi_support(ttg::ExecutionSpace::CUDA);
3444  } else if constexpr (derived_has_hip_op()) {
3445  device_supported = world.impl().mpi_support(ttg::ExecutionSpace::HIP);
3446  } else if constexpr (derived_has_level_zero_op()) {
3447  device_supported = world.impl().mpi_support(ttg::ExecutionSpace::L0);
3448  }
3449  /* if MPI supports the device we don't care whether we have remote peers
3450  * because we can send from the device directly */
3451  if (!device_supported) {
3452  need_pushout = remote_check();
3453  }
3454  }
3455 
3456  if (need_pushout) {
3457  copy_mark_pushout(copy);
3459  }
3460  }
3461 
3462  /* check whether a data needs to be pushed out */
3463  template <std::size_t i, typename Key, typename Value>
3464  std::enable_if_t<!ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>,
3465  void>
3466  prepare_send(const ttg::span<const Key> &keylist, const Value &value) {
3467  auto remote_check = [&](){
3468  auto world = ttg_default_execution_context();
3469  int rank = world.rank();
3470  bool remote = keylist.end() != std::find_if(keylist.begin(), keylist.end(),
3471  [&](const Key &key) { return keymap(key) != rank; });
3472  return remote;
3473  };
3474  do_prepare_send<i>(value, remote_check);
3475  }
3476 
3477  template <std::size_t i, typename Key, typename Value>
3478  std::enable_if_t<ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>,
3479  void>
3480  prepare_send(const Value &value) {
3481  auto remote_check = [&](){
3482  auto world = ttg_default_execution_context();
3483  int rank = world.rank();
3484  return (keymap() != rank);
3485  };
3486  do_prepare_send<i>(value, remote_check);
3487  }
3488 
3489  private:
3490  // Copy/assign/move forbidden ... we could make it work using
3491  // PIMPL for this base class. However, this instance of the base
3492  // class is tied to a specific instance of a derived class a
3493  // pointer to which is captured for invoking derived class
3494  // functions. Thus, not only does the derived class has to be
3495  // involved but we would have to do it in a thread safe way
3496  // including for possibly already running tasks and remote
3497  // references. This is not worth the effort ... wherever you are
3498  // wanting to move/assign an TT you should be using a pointer.
3499  TT(const TT &other) = delete;
3500  TT &operator=(const TT &other) = delete;
3501  TT(TT &&other) = delete;
3502  TT &operator=(TT &&other) = delete;
3503 
3504  // Registers the callback for the i'th input terminal
3505  template <typename terminalT, std::size_t i>
3506  void register_input_callback(terminalT &input) {
3507  using valueT = typename terminalT::value_type;
3508  if (input.is_pull_terminal) {
3509  num_pullins++;
3510  }
3512  // case 1: nonvoid key, nonvoid value
3514  if constexpr (!ttg::meta::is_void_v<keyT> && !std::is_void_v<valueT>) {
3515  auto move_callback = [this](const keyT &key, valueT &&value) {
3516  set_arg<i, keyT, valueT>(key, std::forward<valueT>(value));
3517  };
3518  auto send_callback = [this](const keyT &key, const valueT &value) {
3519  if constexpr (std::is_copy_constructible_v<valueT>) {
3520  set_arg<i, keyT, const valueT &>(key, value);
3521  }
3522  else {
3523  throw std::logic_error(std::string("TTG::PaRSEC: send_callback is invoked on datum of type ") + boost::typeindex::type_id<valueT>().pretty_name() + " which is not copy constructible, std::move datum into send statement");
3524  }
3525  };
3526  auto broadcast_callback = [this](const ttg::span<const keyT> &keylist, const valueT &value) {
3527  if constexpr (std::is_copy_constructible_v<valueT>) {
3528  broadcast_arg<i, keyT, valueT>(keylist, value);
3529  }
3530  else {
3531  throw std::logic_error(std::string("TTG::PaRSEC: broadcast_callback is invoked on datum of type ") + boost::typeindex::type_id<valueT>().pretty_name() + " which is not copy constructible, broadcast is not possible with move-only type");
3532  }
3533  };
3534  auto prepare_send_callback = [this](const ttg::span<const keyT> &keylist, const valueT &value) {
3535  prepare_send<i, keyT, valueT>(keylist, value);
3536  };
3537  auto setsize_callback = [this](const keyT &key, std::size_t size) { set_argstream_size<i>(key, size); };
3538  auto finalize_callback = [this](const keyT &key) { finalize_argstream<i>(key); };
3539  input.set_callback(send_callback, move_callback, broadcast_callback,
3540  setsize_callback, finalize_callback, prepare_send_callback);
3541  }
3543  // case 2: nonvoid key, void value, mixed inputs
3545  else if constexpr (!ttg::meta::is_void_v<keyT> && std::is_void_v<valueT>) {
3546  auto send_callback = [this](const keyT &key) { set_arg<i, keyT, ttg::Void>(key, ttg::Void{}); };
3547  auto setsize_callback = [this](const keyT &key, std::size_t size) { set_argstream_size<i>(key, size); };
3548  auto finalize_callback = [this](const keyT &key) { finalize_argstream<i>(key); };
3549  input.set_callback(send_callback, send_callback, {}, setsize_callback, finalize_callback);
3550  }
3552  // case 3: nonvoid key, void value, no inputs
3553  // NOTE: subsumed in case 2 above, kept for historical reasons
3556  // case 4: void key, nonvoid value
3558  else if constexpr (ttg::meta::is_void_v<keyT> && !std::is_void_v<valueT>) {
3559  auto move_callback = [this](valueT &&value) { set_arg<i, keyT, valueT>(std::forward<valueT>(value)); };
3560  auto send_callback = [this](const valueT &value) {
3561  if constexpr (std::is_copy_constructible_v<valueT>) {
3562  set_arg<i, keyT, const valueT &>(value);
3563  }
3564  else {
3565  throw std::logic_error(std::string("TTG::PaRSEC: send_callback is invoked on datum of type ") + boost::typeindex::type_id<valueT>().pretty_name() + " which is not copy constructible, std::move datum into send/broadcast statement");
3566  }
3567  };
3568  auto setsize_callback = [this](std::size_t size) { set_argstream_size<i>(size); };
3569  auto finalize_callback = [this]() { finalize_argstream<i>(); };
3570  auto prepare_send_callback = [this](const valueT &value) {
3571  prepare_send<i, void>(value);
3572  };
3573  input.set_callback(send_callback, move_callback, {}, setsize_callback, finalize_callback, prepare_send_callback);
3574  }
3576  // case 5: void key, void value, mixed inputs
3578  else if constexpr (ttg::meta::is_void_v<keyT> && std::is_void_v<valueT>) {
3579  auto send_callback = [this]() { set_arg<i, keyT, ttg::Void>(ttg::Void{}); };
3580  auto setsize_callback = [this](std::size_t size) { set_argstream_size<i>(size); };
3581  auto finalize_callback = [this]() { finalize_argstream<i>(); };
3582  input.set_callback(send_callback, send_callback, {}, setsize_callback, finalize_callback);
3583  }
3585  // case 6: void key, void value, no inputs
3586  // NOTE: subsumed in case 5 above, kept for historical reasons
3588  else
3589  ttg::abort();
3590  }
3591 
3592  template <std::size_t... IS>
3593  void register_input_callbacks(std::index_sequence<IS...>) {
3594  int junk[] = {
3595  0,
3596  (register_input_callback<std::tuple_element_t<IS, input_terminals_type>, IS>(std::get<IS>(input_terminals)),
3597  0)...};
3598  junk[0]++;
3599  }
3600 
3601  template <std::size_t... IS, typename inedgesT>
3602  void connect_my_inputs_to_incoming_edge_outputs(std::index_sequence<IS...>, inedgesT &inedges) {
3603  int junk[] = {0, (std::get<IS>(inedges).set_out(&std::get<IS>(input_terminals)), 0)...};
3604  junk[0]++;
3605  }
3606 
3607  template <std::size_t... IS, typename outedgesT>
3608  void connect_my_outputs_to_outgoing_edge_inputs(std::index_sequence<IS...>, outedgesT &outedges) {
3609  int junk[] = {0, (std::get<IS>(outedges).set_in(&std::get<IS>(output_terminals)), 0)...};
3610  junk[0]++;
3611  }
3612 
3613 #if 0
3614  template <typename input_terminals_tupleT, std::size_t... IS, typename flowsT>
3615  void _initialize_flows(std::index_sequence<IS...>, flowsT &&flows) {
3616  int junk[] = {0,
3617  (*(const_cast<std::remove_const_t<decltype(flows[IS]->flow_flags)> *>(&(flows[IS]->flow_flags))) =
3618  (std::is_const_v<std::tuple_element_t<IS, input_terminals_tupleT>> ? PARSEC_FLOW_ACCESS_READ
3619  : PARSEC_FLOW_ACCESS_RW),
3620  0)...};
3621  junk[0]++;
3622  }
3623 
3624  template <typename input_terminals_tupleT, typename flowsT>
3625  void initialize_flows(flowsT &&flows) {
3626  _initialize_flows<input_terminals_tupleT>(
3627  std::make_index_sequence<std::tuple_size<input_terminals_tupleT>::value>{}, flows);
3628  }
3629 #endif // 0
3630 
3631  void fence() override { ttg::default_execution_context().impl().fence(); }
3632 
3633  static int key_equal(parsec_key_t a, parsec_key_t b, void *user_data) {
3634  if constexpr (std::is_same_v<keyT, void>) {
3635  return 1;
3636  } else {
3637  keyT &ka = *(reinterpret_cast<keyT *>(a));
3638  keyT &kb = *(reinterpret_cast<keyT *>(b));
3639  return ka == kb;
3640  }
3641  }
3642 
3643  static uint64_t key_hash(parsec_key_t k, void *user_data) {
3644  constexpr const bool keyT_is_Void = ttg::meta::is_void_v<keyT>;
3645  if constexpr (keyT_is_Void || std::is_same_v<keyT, void>) {
3646  return 0;
3647  } else {
3648  keyT &kk = *(reinterpret_cast<keyT *>(k));
3649  using ttg::hash;
3650  uint64_t hv = hash<std::decay_t<decltype(kk)>>{}(kk);
3651  return hv;
3652  }
3653  }
3654 
3655  static char *key_print(char *buffer, size_t buffer_size, parsec_key_t k, void *user_data) {
3656  if constexpr (std::is_same_v<keyT, void>) {
3657  buffer[0] = '\0';
3658  return buffer;
3659  } else {
3660  keyT kk = *(reinterpret_cast<keyT *>(k));
3661  std::stringstream iss;
3662  iss << kk;
3663  memset(buffer, 0, buffer_size);
3664  iss.get(buffer, buffer_size);
3665  return buffer;
3666  }
3667  }
3668 
3669  static parsec_key_t make_key(const parsec_taskpool_t *tp, const parsec_assignment_t *as) {
3670  // we use the parsec_assignment_t array as a scratchpad to store the hash and address of the key
3671  keyT *key = *(keyT**)&(as[2]);
3672  return reinterpret_cast<parsec_key_t>(key);
3673  }
3674 
3675  static char *parsec_ttg_task_snprintf(char *buffer, size_t buffer_size, const parsec_task_t *parsec_task) {
3676  if(buffer_size == 0)
3677  return buffer;
3678 
3679  if constexpr (ttg::meta::is_void_v<keyT>) {
3680  snprintf(buffer, buffer_size, "%s()[]<%d>", parsec_task->task_class->name, parsec_task->priority);
3681  } else {
3682  const task_t *task = reinterpret_cast<const task_t*>(parsec_task);
3683  std::stringstream ss;
3684  ss << task->key;
3685 
3686  std::string keystr = ss.str();
3687  std::replace(keystr.begin(), keystr.end(), '(', ':');
3688  std::replace(keystr.begin(), keystr.end(), ')', ':');
3689 
3690  snprintf(buffer, buffer_size, "%s(%s)[]<%d>", parsec_task->task_class->name, keystr.c_str(), parsec_task->priority);
3691  }
3692  return buffer;
3693  }
3694 
3695 #if defined(PARSEC_PROF_TRACE)
3696  static void *parsec_ttg_task_info(void *dst, const void *data, size_t size)
3697  {
3698  const task_t *task = reinterpret_cast<const task_t *>(data);
3699 
3700  if constexpr (ttg::meta::is_void_v<keyT>) {
3701  snprintf(reinterpret_cast<char*>(dst), size, "()");
3702  } else {
3703  std::stringstream ss;
3704  ss << task->key;
3705  snprintf(reinterpret_cast<char*>(dst), size, "%s", ss.str().c_str());
3706  }
3707  return dst;
3708  }
3709 #endif
3710 
3711  parsec_key_fn_t tasks_hash_fcts = {key_equal, key_print, key_hash};
3712 
3713  template<std::size_t I>
3714  inline static void increment_data_version_impl(task_t *task) {
3715  if constexpr (!std::is_const_v<std::tuple_element_t<I, typename TT::input_values_tuple_type>>) {
3716  if (task->copies[I] != nullptr){
3717  task->copies[I]->inc_current_version();
3718  }
3719  }
3720  }
3721 
3722  template<std::size_t... Is>
3723  inline static void increment_data_versions(task_t *task, std::index_sequence<Is...>) {
3724  /* increment version of each mutable data */
3725  int junk[] = {0, (increment_data_version_impl<Is>(task), 0)...};
3726  junk[0]++;
3727  }
3728 
3729  static parsec_hook_return_t complete_task_and_release(parsec_execution_stream_t *es, parsec_task_t *parsec_task) {
3730 
3731  //std::cout << "complete_task_and_release: task " << parsec_task << std::endl;
3732 
3733  task_t *task = (task_t*)parsec_task;
3734 
3735 #ifdef TTG_HAVE_COROUTINE
3736  /* if we still have a coroutine handle we invoke it one more time to get the sends/broadcasts */
3737  if (task->suspended_task_address) {
3738  assert(task->coroutine_id != ttg::TaskCoroutineID::Invalid);
3739 #ifdef TTG_HAVE_DEVICE
3740  if (task->coroutine_id == ttg::TaskCoroutineID::DeviceTask) {
3741  /* increment versions of all data we might have modified
3742  * this must happen before we issue the sends */
3743  //increment_data_versions(task, std::make_index_sequence<std::tuple_size_v<typename TT::input_values_tuple_type>>{});
3744 
3745  // get the device task from the coroutine handle
3746  auto dev_task = ttg::device::detail::device_task_handle_type::from_address(task->suspended_task_address);
3747 
3748  // get the promise which contains the views
3749  auto dev_data = dev_task.promise();
3750 
3751  /* for now make sure we're waiting for the kernel to complete and the coro hasn't skipped this step */
3752  assert(dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_SENDOUT);
3753 
3754  /* execute the sends we stored */
3755  if (dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_SENDOUT) {
3756  /* set the current task, needed inside the sends */
3758  dev_data.do_sends();
3759  detail::parsec_ttg_caller = nullptr;
3760  }
3761  }
3762 #endif // TTG_HAVE_DEVICE
3763  /* the coroutine should have completed and we cannot access the promise anymore */
3764  task->suspended_task_address = nullptr;
3765  }
3766 #endif // TTG_HAVE_COROUTINE
3767 
3768  /* release our data copies */
3769  for (int i = 0; i < task->data_count; i++) {
3770  detail::ttg_data_copy_t *copy = task->copies[i];
3771  if (nullptr == copy) continue;
3773  task->copies[i] = nullptr;
3774  }
3775  return PARSEC_HOOK_RETURN_DONE;
3776  }
3777 
3778  public:
3779  template <typename keymapT = ttg::detail::default_keymap<keyT>,
3780  typename priomapT = ttg::detail::default_priomap<keyT>>
3781  TT(const std::string &name, const std::vector<std::string> &innames, const std::vector<std::string> &outnames,
3782  ttg::World world, keymapT &&keymap_ = keymapT(), priomapT &&priomap_ = priomapT())
3783  : ttg::TTBase(name, numinedges, numouts)
3784  , world(world)
3785  // if using default keymap, rebind to the given world
3786  , keymap(std::is_same<keymapT, ttg::detail::default_keymap<keyT>>::value
3787  ? decltype(keymap)(ttg::detail::default_keymap<keyT>(world))
3788  : decltype(keymap)(std::forward<keymapT>(keymap_)))
3789  , priomap(decltype(keymap)(std::forward<priomapT>(priomap_))) {
3790  // Cannot call these in base constructor since terminals not yet constructed
3791  if (innames.size() != numinedges) throw std::logic_error("ttg_parsec::TT: #input names != #input terminals");
3792  if (outnames.size() != numouts) throw std::logic_error("ttg_parsec::TT: #output names != #output terminals");
3793 
3794  auto &world_impl = world.impl();
3795  world_impl.register_op(this);
3796 
3797  if constexpr (numinedges == numins) {
3798  register_input_terminals(input_terminals, innames);
3799  } else {
3800  // create a name for the virtual control input
3801  register_input_terminals(input_terminals, std::array<std::string, 1>{std::string("Virtual Control")});
3802  }
3803  register_output_terminals(output_terminals, outnames);
3804 
3805  register_input_callbacks(std::make_index_sequence<numinedges>{});
3806  int i;
3807 
3808  memset(&self, 0, sizeof(parsec_task_class_t));
3809 
3810  self.name = strdup(get_name().c_str());
3811  self.task_class_id = get_instance_id();
3812  self.nb_parameters = 0;
3813  self.nb_locals = 0;
3814  //self.nb_flows = numflows;
3815  self.nb_flows = MAX_PARAM_COUNT; // we're not using all flows but have to
3816  // trick the device handler into looking at all of them
3817 
3818  if( world_impl.profiling() ) {
3819  // first two ints are used to store the hash of the key.
3820  self.nb_parameters = (sizeof(void*)+sizeof(int)-1)/sizeof(int);
3821  // seconds two ints are used to store a pointer to the key of the task.
3822  self.nb_locals = self.nb_parameters + (sizeof(void*)+sizeof(int)-1)/sizeof(int);
3823 
3824  // If we have parameters and locals, we need to define the corresponding dereference arrays
3825  self.params[0] = &detail::parsec_taskclass_param0;
3826  self.params[1] = &detail::parsec_taskclass_param1;
3827 
3828  self.locals[0] = &detail::parsec_taskclass_param0;
3829  self.locals[1] = &detail::parsec_taskclass_param1;
3830  self.locals[2] = &detail::parsec_taskclass_param2;
3831  self.locals[3] = &detail::parsec_taskclass_param3;
3832  }
3833  self.make_key = make_key;
3834  self.key_functions = &tasks_hash_fcts;
3835  self.task_snprintf = parsec_ttg_task_snprintf;
3836 
3837 #if defined(PARSEC_PROF_TRACE)
3838  self.profile_info = &parsec_ttg_task_info;
3839 #endif
3840 
3841  world_impl.taskpool()->nb_task_classes = std::max(world_impl.taskpool()->nb_task_classes, static_cast<decltype(world_impl.taskpool()->nb_task_classes)>(self.task_class_id+1));
3842  // function_id_to_instance[self.task_class_id] = this;
3843  //self.incarnations = incarnations_array.data();
3844 //#if 0
3845  if constexpr (derived_has_cuda_op()) {
3846  self.incarnations = (__parsec_chore_t *)malloc(3 * sizeof(__parsec_chore_t));
3847  ((__parsec_chore_t *)self.incarnations)[0].type = PARSEC_DEV_CUDA;
3848  ((__parsec_chore_t *)self.incarnations)[0].evaluate = &detail::evaluate_cuda<TT>;
3849  ((__parsec_chore_t *)self.incarnations)[0].hook = &detail::hook_cuda<TT>;
3850  ((__parsec_chore_t *)self.incarnations)[1].type = PARSEC_DEV_NONE;
3851  ((__parsec_chore_t *)self.incarnations)[1].evaluate = NULL;
3852  ((__parsec_chore_t *)self.incarnations)[1].hook = NULL;
3853  } else if constexpr (derived_has_hip_op()) {
3854  self.incarnations = (__parsec_chore_t *)malloc(3 * sizeof(__parsec_chore_t));
3855  ((__parsec_chore_t *)self.incarnations)[0].type = PARSEC_DEV_HIP;
3856  ((__parsec_chore_t *)self.incarnations)[0].evaluate = &detail::evaluate_hip<TT>;
3857  ((__parsec_chore_t *)self.incarnations)[0].hook = &detail::hook_hip<TT>;
3858 
3859  ((__parsec_chore_t *)self.incarnations)[1].type = PARSEC_DEV_NONE;
3860  ((__parsec_chore_t *)self.incarnations)[1].evaluate = NULL;
3861  ((__parsec_chore_t *)self.incarnations)[1].hook = NULL;
3862 #if defined(PARSEC_HAVE_DEV_LEVEL_ZERO_SUPPORT)
3863  } else if constexpr (derived_has_level_zero_op()) {
3864  self.incarnations = (__parsec_chore_t *)malloc(3 * sizeof(__parsec_chore_t));
3865  ((__parsec_chore_t *)self.incarnations)[0].type = PARSEC_DEV_LEVEL_ZERO;
3866  ((__parsec_chore_t *)self.incarnations)[0].evaluate = &detail::evaluate_level_zero<TT>;
3867  ((__parsec_chore_t *)self.incarnations)[0].hook = &detail::hook_level_zero<TT>;
3868 
3869  ((__parsec_chore_t *)self.incarnations)[1].type = PARSEC_DEV_NONE;
3870  ((__parsec_chore_t *)self.incarnations)[1].evaluate = NULL;
3871  ((__parsec_chore_t *)self.incarnations)[1].hook = NULL;
3872 #endif // PARSEC_HAVE_DEV_LEVEL_ZERO_SUPPORT
3873  } else {
3874  self.incarnations = (__parsec_chore_t *)malloc(2 * sizeof(__parsec_chore_t));
3875  ((__parsec_chore_t *)self.incarnations)[0].type = PARSEC_DEV_CPU;
3876  ((__parsec_chore_t *)self.incarnations)[0].evaluate = NULL;
3877  ((__parsec_chore_t *)self.incarnations)[0].hook = &detail::hook<TT>;
3878  ((__parsec_chore_t *)self.incarnations)[1].type = PARSEC_DEV_NONE;
3879  ((__parsec_chore_t *)self.incarnations)[1].evaluate = NULL;
3880  ((__parsec_chore_t *)self.incarnations)[1].hook = NULL;
3881  }
3882 //#endif // 0
3883 
3884  self.release_task = &parsec_release_task_to_mempool_update_nbtasks;
3885  self.complete_execution = complete_task_and_release;
3886 
3887  for (i = 0; i < MAX_PARAM_COUNT; i++) {
3888  parsec_flow_t *flow = new parsec_flow_t;
3889  flow->name = strdup((std::string("flow in") + std::to_string(i)).c_str());
3890  flow->sym_type = PARSEC_SYM_INOUT;
3891  // see initialize_flows below
3892  // flow->flow_flags = PARSEC_FLOW_ACCESS_RW;
3893  flow->dep_in[0] = NULL;
3894  flow->dep_out[0] = NULL;
3895  flow->flow_index = i;
3896  flow->flow_datatype_mask = ~0;
3897  *((parsec_flow_t **)&(self.in[i])) = flow;
3898  }
3899  //*((parsec_flow_t **)&(self.in[i])) = NULL;
3900  //initialize_flows<input_terminals_type>(self.in);
3901 
3902  for (i = 0; i < MAX_PARAM_COUNT; i++) {
3903  parsec_flow_t *flow = new parsec_flow_t;
3904  flow->name = strdup((std::string("flow out") + std::to_string(i)).c_str());
3905  flow->sym_type = PARSEC_SYM_INOUT;
3906  flow->flow_flags = PARSEC_FLOW_ACCESS_READ; // does PaRSEC use this???
3907  flow->dep_in[0] = NULL;
3908  flow->dep_out[0] = NULL;
3909  flow->flow_index = i;
3910  flow->flow_datatype_mask = (1 << i);
3911  *((parsec_flow_t **)&(self.out[i])) = flow;
3912  }
3913  //*((parsec_flow_t **)&(self.out[i])) = NULL;
3914 
3915  self.flags = 0;
3916  self.dependencies_goal = numins; /* (~(uint32_t)0) >> (32 - numins); */
3917 
3918  int nbthreads = 0;
3919  auto *context = world_impl.context();
3920  for (int i = 0; i < context->nb_vp; i++) {
3921  nbthreads += context->virtual_processes[i]->nb_cores;
3922  }
3923 
3924  parsec_mempool_construct(&mempools, PARSEC_OBJ_CLASS(parsec_task_t), sizeof(task_t),
3925  offsetof(parsec_task_t, mempool_owner), nbthreads);
3926 
3927  parsec_hash_table_init(&tasks_table, offsetof(detail::parsec_ttg_task_base_t, tt_ht_item), 8, tasks_hash_fcts,
3928  NULL);
3929  }
3930 
3931  template <typename keymapT = ttg::detail::default_keymap<keyT>,
3932  typename priomapT = ttg::detail::default_priomap<keyT>>
3933  TT(const std::string &name, const std::vector<std::string> &innames, const std::vector<std::string> &outnames,
3934  keymapT &&keymap = keymapT(ttg::default_execution_context()), priomapT &&priomap = priomapT())
3935  : TT(name, innames, outnames, ttg::default_execution_context(), std::forward<keymapT>(keymap),
3936  std::forward<priomapT>(priomap)) {}
3937 
3938  template <typename keymapT = ttg::detail::default_keymap<keyT>,
3939  typename priomapT = ttg::detail::default_priomap<keyT>>
3940  TT(const input_edges_type &inedges, const output_edges_type &outedges, const std::string &name,
3941  const std::vector<std::string> &innames, const std::vector<std::string> &outnames, ttg::World world,
3942  keymapT &&keymap_ = keymapT(), priomapT &&priomap = priomapT())
3943  : TT(name, innames, outnames, world, std::forward<keymapT>(keymap_), std::forward<priomapT>(priomap)) {
3944  connect_my_inputs_to_incoming_edge_outputs(std::make_index_sequence<numinedges>{}, inedges);
3945  connect_my_outputs_to_outgoing_edge_inputs(std::make_index_sequence<numouts>{}, outedges);
3946  //DO NOT MOVE THIS - information about the number of pull terminals is only available after connecting the edges.
3947  if constexpr (numinedges > 0) {
3948  register_input_callbacks(std::make_index_sequence<numinedges>{});
3949  }
3950  }
3951  template <typename keymapT = ttg::detail::default_keymap<keyT>,
3952  typename priomapT = ttg::detail::default_priomap<keyT>>
3953  TT(const input_edges_type &inedges, const output_edges_type &outedges, const std::string &name,
3954  const std::vector<std::string> &innames, const std::vector<std::string> &outnames,
3955  keymapT &&keymap = keymapT(ttg::default_execution_context()), priomapT &&priomap = priomapT())
3956  : TT(inedges, outedges, name, innames, outnames, ttg::default_execution_context(),
3957  std::forward<keymapT>(keymap), std::forward<priomapT>(priomap)) {}
3958 
3959  // Destructor checks for unexecuted tasks
3960  virtual ~TT() {
3961  if(nullptr != self.name ) {
3962  free((void*)self.name);
3963  self.name = nullptr;
3964  }
3965 
3966  for (std::size_t i = 0; i < numins; ++i) {
3967  if (inpute_reducers_taskclass[i] != nullptr) {
3968  std::free(inpute_reducers_taskclass[i]);
3969  inpute_reducers_taskclass[i] = nullptr;
3970  }
3971  }
3972  release();
3973  }
3974 
3975  static void ht_iter_cb(void *item, void *cb_data) {
3976  task_t *task = (task_t *)item;
3977  ttT *op = (ttT *)cb_data;
3978  if constexpr (!ttg::meta::is_void_v<keyT>) {
3979  std::cout << "Left over task " << op->get_name() << " " << task->key << std::endl;
3980  } else {
3981  std::cout << "Left over task " << op->get_name() << std::endl;
3982  }
3983  }
3984 
3986  parsec_hash_table_for_all(&tasks_table, ht_iter_cb, this);
3987  }
3988 
3989  virtual void release() override { do_release(); }
3990 
3991  void do_release() {
3992  if (!alive) {
3993  return;
3994  }
3995  alive = false;
3996  /* print all outstanding tasks */
3998  parsec_hash_table_fini(&tasks_table);
3999  parsec_mempool_destruct(&mempools);
4000  // uintptr_t addr = (uintptr_t)self.incarnations;
4001  // free((void *)addr);
4002  free((__parsec_chore_t *)self.incarnations);
4003  for (int i = 0; i < MAX_PARAM_COUNT; i++) {
4004  if (NULL != self.in[i]) {
4005  free(self.in[i]->name);
4006  delete self.in[i];
4007  self.in[i] = nullptr;
4008  }
4009  if (NULL != self.out[i]) {
4010  free(self.out[i]->name);
4011  delete self.out[i];
4012  self.out[i] = nullptr;
4013  }
4014  }
4015  world.impl().deregister_op(this);
4016  }
4017 
4018  static constexpr const ttg::Runtime runtime = ttg::Runtime::PaRSEC;
4019 
4025  template <std::size_t i, typename Reducer>
4026  void set_input_reducer(Reducer &&reducer) {
4027  ttg::trace(world.rank(), ":", get_name(), " : setting reducer for terminal ", i);
4028  std::get<i>(input_reducers) = reducer;
4029 
4030  parsec_task_class_t *tc = inpute_reducers_taskclass[i];
4031  if (nullptr == tc) {
4032  tc = (parsec_task_class_t *)std::calloc(1, sizeof(*tc));
4033  inpute_reducers_taskclass[i] = tc;
4034 
4035  tc->name = strdup((get_name() + std::string(" reducer ") + std::to_string(i)).c_str());
4036  tc->task_class_id = get_instance_id();
4037  tc->nb_parameters = 0;
4038  tc->nb_locals = 0;
4039  tc->nb_flows = numflows;
4040 
4041  auto &world_impl = world.impl();
4042 
4043  if( world_impl.profiling() ) {
4044  // first two ints are used to store the hash of the key.
4045  tc->nb_parameters = (sizeof(void*)+sizeof(int)-1)/sizeof(int);
4046  // seconds two ints are used to store a pointer to the key of the task.
4047  tc->nb_locals = self.nb_parameters + (sizeof(void*)+sizeof(int)-1)/sizeof(int);
4048 
4049  // If we have parameters and locals, we need to define the corresponding dereference arrays
4050  tc->params[0] = &detail::parsec_taskclass_param0;
4051  tc->params[1] = &detail::parsec_taskclass_param1;
4052 
4053  tc->locals[0] = &detail::parsec_taskclass_param0;
4054  tc->locals[1] = &detail::parsec_taskclass_param1;
4055  tc->locals[2] = &detail::parsec_taskclass_param2;
4056  tc->locals[3] = &detail::parsec_taskclass_param3;
4057  }
4058  tc->make_key = make_key;
4059  tc->key_functions = &tasks_hash_fcts;
4060  tc->task_snprintf = parsec_ttg_task_snprintf;
4061 
4062 #if defined(PARSEC_PROF_TRACE)
4063  tc->profile_info = &parsec_ttg_task_info;
4064 #endif
4065 
4066  world_impl.taskpool()->nb_task_classes = std::max(world_impl.taskpool()->nb_task_classes, static_cast<decltype(world_impl.taskpool()->nb_task_classes)>(self.task_class_id+1));
4067 
4068 #if 0
4069  // FIXME: currently only support reduction on the host
4070  if constexpr (derived_has_cuda_op()) {
4071  self.incarnations = (__parsec_chore_t *)malloc(3 * sizeof(__parsec_chore_t));
4072  ((__parsec_chore_t *)self.incarnations)[0].type = PARSEC_DEV_CUDA;
4073  ((__parsec_chore_t *)self.incarnations)[0].evaluate = NULL;
4074  ((__parsec_chore_t *)self.incarnations)[0].hook = detail::hook_cuda;
4075  ((__parsec_chore_t *)self.incarnations)[1].type = PARSEC_DEV_CPU;
4076  ((__parsec_chore_t *)self.incarnations)[1].evaluate = NULL;
4077  ((__parsec_chore_t *)self.incarnations)[1].hook = detail::hook;
4078  ((__parsec_chore_t *)self.incarnations)[2].type = PARSEC_DEV_NONE;
4079  ((__parsec_chore_t *)self.incarnations)[2].evaluate = NULL;
4080  ((__parsec_chore_t *)self.incarnations)[2].hook = NULL;
4081  } else
4082 #endif // 0
4083  {
4084  tc->incarnations = (__parsec_chore_t *)malloc(2 * sizeof(__parsec_chore_t));
4085  ((__parsec_chore_t *)tc->incarnations)[0].type = PARSEC_DEV_CPU;
4086  ((__parsec_chore_t *)tc->incarnations)[0].evaluate = NULL;
4087  ((__parsec_chore_t *)tc->incarnations)[0].hook = &static_reducer_op<i>;
4088  ((__parsec_chore_t *)tc->incarnations)[1].type = PARSEC_DEV_NONE;
4089  ((__parsec_chore_t *)tc->incarnations)[1].evaluate = NULL;
4090  ((__parsec_chore_t *)tc->incarnations)[1].hook = NULL;
4091  }
4092 
4093  /* the reduction task does not alter the termination detection because the target task will execute */
4094  tc->release_task = &parsec_release_task_to_mempool;
4095  tc->complete_execution = NULL;
4096  }
4097  }
4098 
4106  template <std::size_t i, typename Reducer>
4107  void set_input_reducer(Reducer &&reducer, std::size_t size) {
4108  set_input_reducer<i>(std::forward<Reducer>(reducer));
4109  set_static_argstream_size<i>(size);
4110  }
4111 
4112  // Returns reference to input terminal i to facilitate connection --- terminal
4113  // cannot be copied, moved or assigned
4114  template <std::size_t i>
4115  std::tuple_element_t<i, input_terminals_type> *in() {
4116  return &std::get<i>(input_terminals);
4117  }
4118 
4119  // Returns reference to output terminal for purpose of connection --- terminal
4120  // cannot be copied, moved or assigned
4121  template <std::size_t i>
4122  std::tuple_element_t<i, output_terminalsT> *out() {
4123  return &std::get<i>(output_terminals);
4124  }
4125 
4126  // Manual injection of a task with all input arguments specified as a tuple
4127  template <typename Key = keyT>
4128  std::enable_if_t<!ttg::meta::is_void_v<Key> && !ttg::meta::is_empty_tuple_v<input_values_tuple_type>, void> invoke(
4129  const Key &key, const input_values_tuple_type &args) {
4131  if constexpr(!std::is_same_v<Key, key_type>) {
4132  key_type k = key; /* cast that type into the key type we know */
4133  invoke(k, args);
4134  } else {
4135  /* trigger non-void inputs */
4136  set_args(ttg::meta::nonvoid_index_seq<actual_input_tuple_type>{}, key, args);
4137  /* trigger void inputs */
4138  using void_index_seq = ttg::meta::void_index_seq<actual_input_tuple_type>;
4139  set_args(void_index_seq{}, key, ttg::detail::make_void_tuple<void_index_seq::size()>());
4140  }
4141  }
4142 
4143  // Manual injection of a key-free task and all input arguments specified as a tuple
4144  template <typename Key = keyT>
4145  std::enable_if_t<ttg::meta::is_void_v<Key> && !ttg::meta::is_empty_tuple_v<input_values_tuple_type>, void> invoke(
4146  const input_values_tuple_type &args) {
4148  /* trigger non-void inputs */
4149  set_args(ttg::meta::nonvoid_index_seq<actual_input_tuple_type>{}, args);
4150  /* trigger void inputs */
4151  using void_index_seq = ttg::meta::void_index_seq<actual_input_tuple_type>;
4152  set_args(void_index_seq{}, ttg::detail::make_void_tuple<void_index_seq::size()>());
4153  }
4154 
4155  // Manual injection of a task that has no arguments
4156  template <typename Key = keyT>
4157  std::enable_if_t<!ttg::meta::is_void_v<Key> && ttg::meta::is_empty_tuple_v<input_values_tuple_type>, void> invoke(
4158  const Key &key) {
4160 
4161  if constexpr(!std::is_same_v<Key, key_type>) {
4162  key_type k = key; /* cast that type into the key type we know */
4163  invoke(k);
4164  } else {
4165  /* trigger void inputs */
4166  using void_index_seq = ttg::meta::void_index_seq<actual_input_tuple_type>;
4167  set_args(void_index_seq{}, key, ttg::detail::make_void_tuple<void_index_seq::size()>());
4168  }
4169  }
4170 
4171  // Manual injection of a task that has no key or arguments
4172  template <typename Key = keyT>
4173  std::enable_if_t<ttg::meta::is_void_v<Key> && ttg::meta::is_empty_tuple_v<input_values_tuple_type>, void> invoke() {
4175  /* trigger void inputs */
4176  using void_index_seq = ttg::meta::void_index_seq<actual_input_tuple_type>;
4177  set_args(void_index_seq{}, ttg::detail::make_void_tuple<void_index_seq::size()>());
4178  }
4179 
4180  // overrides TTBase::invoke()
4181  void invoke() override {
4182  if constexpr (ttg::meta::is_void_v<keyT> && ttg::meta::is_empty_tuple_v<input_values_tuple_type>)
4183  invoke<keyT>();
4184  else
4185  TTBase::invoke();
4186  }
4187 
4188  private:
4189  template<typename Key, typename Arg, typename... Args, std::size_t I, std::size_t... Is>
4190  void invoke_arglist(std::index_sequence<I, Is...>, const Key& key, Arg&& arg, Args&&... args) {
4191  using arg_type = std::decay_t<Arg>;
4192  if constexpr (ttg::meta::is_ptr_v<arg_type>) {
4193  /* add a reference to the object */
4194  auto copy = ttg_parsec::detail::get_copy(arg);
4195  copy->add_ref();
4196  /* reset readers so that the value can flow without copying */
4197  copy->reset_readers();
4198  auto& val = *arg;
4199  set_arg_impl<I>(key, val, copy);
4201  if constexpr (std::is_rvalue_reference_v<Arg>) {
4202  /* if the ptr was moved in we reset it */
4203  arg.reset();
4204  }
4205  } else if constexpr (!ttg::meta::is_ptr_v<arg_type>) {
4206  set_arg<I>(key, std::forward<Arg>(arg));
4207  }
4208  if constexpr (sizeof...(Is) > 0) {
4209  /* recursive next argument */
4210  invoke_arglist(std::index_sequence<Is...>{}, key, std::forward<Args>(args)...);
4211  }
4212  }
4213 
4214  public:
4215  // Manual injection of a task with all input arguments specified as variadic arguments
4216  template <typename Key = keyT, typename Arg, typename... Args>
4217  std::enable_if_t<!ttg::meta::is_void_v<Key> && !ttg::meta::is_empty_tuple_v<input_values_tuple_type>, void> invoke(
4218  const Key &key, Arg&& arg, Args&&... args) {
4219  static_assert(sizeof...(Args)+1 == std::tuple_size_v<actual_input_tuple_type>,
4220  "Number of arguments to invoke must match the number of task inputs.");
4222  /* trigger non-void inputs */
4223  invoke_arglist(ttg::meta::nonvoid_index_seq<actual_input_tuple_type>{}, key,
4224  std::forward<Arg>(arg), std::forward<Args>(args)...);
4225  //set_args(ttg::meta::nonvoid_index_seq<actual_input_tuple_type>{}, key, args);
4226  /* trigger void inputs */
4227  using void_index_seq = ttg::meta::void_index_seq<actual_input_tuple_type>;
4228  set_args(void_index_seq{}, key, ttg::detail::make_void_tuple<void_index_seq::size()>());
4229  }
4230 
4231  void set_defer_writer(bool value) {
4232  m_defer_writer = value;
4233  }
4234 
4235  bool get_defer_writer(bool value) {
4236  return m_defer_writer;
4237  }
4238 
4239  public:
4240  void make_executable() override {
4241  world.impl().register_tt_profiling(this);
4244  }
4245 
4248  const decltype(keymap) &get_keymap() const { return keymap; }
4249 
4251  template <typename Keymap>
4252  void set_keymap(Keymap &&km) {
4253  keymap = km;
4254  }
4255 
4258  const decltype(priomap) &get_priomap() const { return priomap; }
4259 
4262  template <typename Priomap>
4263  void set_priomap(Priomap &&pm) {
4264  priomap = std::forward<Priomap>(pm);
4265  }
4266 
4272  template<typename Devicemap>
4273  void set_devicemap(Devicemap&& dm) {
4274  static_assert(derived_has_device_op(), "Device map only allowed on device-enabled TT!");
4275  if constexpr (std::is_same_v<ttg::device::Device, decltype(dm(std::declval<keyT>()))>) {
4276  // dm returns a Device
4277  devicemap = std::forward<Devicemap>(dm);
4278  } else {
4279  // convert dm return into a Device
4280  devicemap = [=](const keyT& key) {
4281  if constexpr (derived_has_cuda_op()) {
4283  } else if constexpr (derived_has_hip_op()) {
4285  } else if constexpr (derived_has_level_zero_op()) {
4287  } else {
4288  throw std::runtime_error("Unknown device type!");
4289  }
4290  };
4291  }
4292  }
4293 
4296  auto get_devicemap() { return devicemap; }
4297 
4298  // Register the static_op function to associate it to instance_id
4300  int rank;
4301  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
4302  ttg::trace("ttg_parsec(", rank, ") Inserting into static_id_to_op_map at ", get_instance_id());
4303  static_set_arg_fct_call_t call = std::make_pair(&TT::static_set_arg, this);
4304  auto &world_impl = world.impl();
4305  static_map_mutex.lock();
4306  static_id_to_op_map.insert(std::make_pair(get_instance_id(), call));
4307  if (delayed_unpack_actions.count(get_instance_id()) > 0) {
4308  auto tp = world_impl.taskpool();
4309 
4310  ttg::trace("ttg_parsec(", rank, ") There are ", delayed_unpack_actions.count(get_instance_id()),
4311  " messages delayed with op_id ", get_instance_id());
4312 
4313  auto se = delayed_unpack_actions.equal_range(get_instance_id());
4314  std::vector<static_set_arg_fct_arg_t> tmp;
4315  for (auto it = se.first; it != se.second;) {
4316  assert(it->first == get_instance_id());
4317  tmp.push_back(it->second);
4318  it = delayed_unpack_actions.erase(it);
4319  }
4320  static_map_mutex.unlock();
4321 
4322  for (auto it : tmp) {
4323  if(ttg::tracing())
4324  ttg::print("ttg_parsec(", rank, ") Unpacking delayed message (", ", ", get_instance_id(), ", ",
4325  std::get<1>(it), ", ", std::get<2>(it), ")");
4326  int rc = detail::static_unpack_msg(&parsec_ce, world_impl.parsec_ttg_tag(), std::get<1>(it), std::get<2>(it),
4327  std::get<0>(it), NULL);
4328  assert(rc == 0);
4329  free(std::get<1>(it));
4330  }
4331 
4332  tmp.clear();
4333  } else {
4334  static_map_mutex.unlock();
4335  }
4336  }
4337  };
4338 
4339 #include "ttg/make_tt.h"
4340 
4341 } // namespace ttg_parsec
4342 
4348 template <>
4350  private:
4351  ttg_parsec::detail::ttg_data_copy_t *copy_to_remove = nullptr;
4352  bool do_release = true;
4353 
4354  public:
4355  value_copy_handler() = default;
4358  : copy_to_remove(h.copy_to_remove)
4359  {
4360  h.copy_to_remove = nullptr;
4361  }
4362 
4365  {
4366  std::swap(copy_to_remove, h.copy_to_remove);
4367  return *this;
4368  }
4369 
4371  if (nullptr != copy_to_remove) {
4373  if (do_release) {
4374  ttg_parsec::detail::release_data_copy(copy_to_remove);
4375  }
4376  }
4377  }
4378 
4379  template <typename Value>
4380  inline std::conditional_t<std::is_reference_v<Value>,Value,Value&&> operator()(Value &&value) {
4381  constexpr auto value_is_rvref = std::is_rvalue_reference_v<decltype(value)>;
4382  static_assert(value_is_rvref ||
4383  std::is_copy_constructible_v<std::decay_t<Value>>,
4384  "Data sent without being moved must be copy-constructible!");
4385 
4387  if (nullptr == caller) {
4388  ttg::print("ERROR: ttg::send or ttg::broadcast called outside of a task!\n");
4389  }
4390  using value_type = std::remove_reference_t<Value>;
4392  copy = ttg_parsec::detail::find_copy_in_task(caller, &value);
4393  value_type *value_ptr = &value;
4394  if (nullptr == copy) {
4399  copy = ttg_parsec::detail::create_new_datacopy(std::forward<Value>(value));
4400  bool inserted = ttg_parsec::detail::add_copy_to_task(copy, caller);
4401  assert(inserted);
4402  value_ptr = reinterpret_cast<value_type *>(copy->get_ptr());
4403  copy_to_remove = copy;
4404  } else {
4405  if constexpr (value_is_rvref) {
4406  /* this copy won't be modified anymore so mark it as read-only */
4407  copy->reset_readers();
4408  }
4409  }
4410  /* We're coming from a writer so mark the data as modified.
4411  * That way we can force a pushout in prepare_send if we move to read-only tasks (needed by PaRSEC). */
4413  if constexpr (value_is_rvref)
4414  return std::move(*value_ptr);
4415  else
4416  return *value_ptr;
4417  }
4418 
4419  template<typename Value>
4420  inline std::add_lvalue_reference_t<Value> operator()(ttg_parsec::detail::persistent_value_ref<Value> vref) {
4422  if (nullptr == caller) {
4423  ttg::print("ERROR: ttg::send or ttg::broadcast called outside of a task!\n");
4424  }
4426  copy = ttg_parsec::detail::find_copy_in_task(caller, &vref.value_ref);
4427  if (nullptr == copy) {
4428  // no need to create a new copy since it's derived from the copy already
4429  copy = const_cast<ttg_parsec::detail::ttg_data_copy_t *>(static_cast<const ttg_parsec::detail::ttg_data_copy_t *>(&vref.value_ref));
4430  bool inserted = ttg_parsec::detail::add_copy_to_task(copy, caller);
4431  assert(inserted);
4432  copy_to_remove = copy; // we want to remove the copy from the task once done sending
4433  do_release = false; // we don't release the copy since we didn't allocate it
4434  copy->add_ref(); // add a reference so that TTG does not attempt to delete this object
4435  }
4436  return vref.value_ref;
4437  }
4438 
4439  template <typename Value>
4440  inline const Value &operator()(const Value &value) {
4441  static_assert(std::is_copy_constructible_v<std::decay_t<Value>>,
4442  "Data sent without being moved must be copy-constructible!");
4444  if (nullptr == caller) {
4445  ttg::print("ERROR: ttg::send or ttg::broadcast called outside of a task!\n");
4446  }
4448  copy = ttg_parsec::detail::find_copy_in_task(caller, &value);
4449  const Value *value_ptr = &value;
4450  if (nullptr == copy) {
4456  bool inserted = ttg_parsec::detail::add_copy_to_task(copy, caller);
4457  assert(inserted);
4458  value_ptr = reinterpret_cast<Value *>(copy->get_ptr());
4459  copy_to_remove = copy;
4460  }
4462  return *value_ptr;
4463  }
4464 
4465 };
4466 
4467 #endif // PARSEC_TTG_H_INCLUDED
4468 // clang-format on
#define TTG_OP_ASSERT_EXECUTABLE()
Definition: tt.h:276
Edge is used to connect In and Out terminals.
Definition: edge.h:25
A base class for all template tasks.
Definition: tt.h:30
void trace(const T &t, const Ts &...ts)
Like ttg::trace(), but only produces tracing output if this->tracing()==true
Definition: tt.h:186
auto get_instance_id() const
Definition: tt.h:258
virtual void make_executable()=0
Marks this executable.
Definition: tt.h:286
bool tracing() const
Definition: tt.h:177
const std::string & get_name() const
Gets the name of this operation.
Definition: tt.h:217
TTBase(TTBase &&other)
Definition: tt.h:115
void register_input_terminals(terminalsT &terms, const namesT &names)
Definition: tt.h:84
bool is_lazy_pull()
Definition: tt.h:199
const TTBase * ttg_ptr() const
Definition: tt.h:205
void register_output_terminals(terminalsT &terms, const namesT &names)
Definition: tt.h:91
A complete version of void.
Definition: void.h:11
WorldImplT & impl(void)
Definition: world.h:216
int rank() const
Definition: world.h:204
Base class for implementation-specific Worlds.
Definition: world.h:33
void release_ops(void)
Definition: world.h:54
WorldImplBase(int size, int rank)
Definition: world.h:61
bool is_valid(void) const
Definition: world.h:154
Represents a device in a specific execution space.
Definition: device.h:23
uint64_t pack(T &obj, void *bytes, uint64_t pos, detail::ttg_data_copy_t *copy=nullptr)
Definition: ttg.h:1926
std::enable_if_t<!ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > set_arg_local(const Key &key, const Value &value)
Definition: ttg.h:2300
std::enable_if_t<!ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > prepare_send(const ttg::span< const Key > &keylist, const Value &value)
Definition: ttg.h:3466
void finalize_argstream_from_msg(void *data, std::size_t size)
Definition: ttg.h:2232
ttg::meta::add_glvalue_reference_tuple_t< ttg::meta::void_to_Void_tuple_t< actual_input_tuple_type > > input_refs_full_tuple_type
Definition: ttg.h:1234
std::tuple_element_t< i, input_terminals_type > * in()
Definition: ttg.h:4115
static constexpr bool derived_has_hip_op()
Definition: ttg.h:1203
void set_keymap(Keymap &&km)
keymap setter
Definition: ttg.h:4252
decltype(keymap) const & get_keymap() const
Definition: ttg.h:4248
std::enable_if_t<!ttg::meta::is_void_v< Key > &&!ttg::meta::is_empty_tuple_v< input_values_tuple_type >, void > invoke(const Key &key, Arg &&arg, Args &&... args)
Definition: ttg.h:4217
void print_incomplete_tasks()
Definition: ttg.h:3985
void set_arg_from_msg(void *data, std::size_t size)
Definition: ttg.h:2055
std::enable_if_t<!ttg::meta::is_void_v< Key >, void > set_arg(const Key &key)
Definition: ttg.h:2641
std::enable_if_t<!ttg::meta::is_void_v< Key >, void > finalize_argstream(const Key &key)
finalizes stream for input i
Definition: ttg.h:3238
virtual ~TT()
Definition: ttg.h:3960
std::enable_if_t<!ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > set_arg(const Key &key, Value &&value)
Definition: ttg.h:2623
std::enable_if_t< ttg::meta::is_void_v< Key >, void > set_args(std::index_sequence< Is... > is, const std::tuple< Ts... > &args)
Definition: ttg.h:3091
keyT key_type
Definition: ttg.h:1226
static constexpr bool derived_has_level_zero_op()
Definition: ttg.h:1212
parsec_thread_mempool_t * get_task_mempool(void)
Definition: ttg.h:1986
TT(const input_edges_type &inedges, const output_edges_type &outedges, const std::string &name, const std::vector< std::string > &innames, const std::vector< std::string > &outnames, ttg::World world, keymapT &&keymap_=keymapT(), priomapT &&priomap=priomapT())
Definition: ttg.h:3940
std::enable_if_t<!ttg::meta::is_void_v< Key > &&ttg::meta::is_empty_tuple_v< input_values_tuple_type >, void > invoke(const Key &key)
Definition: ttg.h:4157
std::enable_if_t<!std::is_void_v< std::decay_t< Value > >, void > do_prepare_send(const Value &value, RemoteCheckFn &&remote_check)
Definition: ttg.h:3374
typename ttg::terminals_to_edges< output_terminalsT >::type output_edges_type
Definition: ttg.h:1243
void set_arg_impl(const Key &key, Value &&value, detail::ttg_data_copy_t *copy_in=nullptr)
Definition: ttg.h:2676
auto get_devicemap()
Definition: ttg.h:4296
std::enable_if_t<!ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > broadcast_arg(const ttg::span< const Key > &keylist, const Value &value)
Definition: ttg.h:2880
void invoke() override
Definition: ttg.h:4181
task_t * create_new_task(const Key &key)
Definition: ttg.h:2318
void do_release()
Definition: ttg.h:3991
uint64_t unpack(T &obj, void *_bytes, uint64_t pos)
Definition: ttg.h:1913
TT(const input_edges_type &inedges, const output_edges_type &outedges, const std::string &name, const std::vector< std::string > &innames, const std::vector< std::string > &outnames, keymapT &&keymap=keymapT(ttg::default_execution_context()), priomapT &&priomap=priomapT())
Definition: ttg.h:3953
bool get_defer_writer(bool value)
Definition: ttg.h:4235
void set_arg_from_msg_keylist(ttg::span< keyT > &&keylist, detail::ttg_data_copy_t *copy)
Definition: ttg.h:1994
static void ht_iter_cb(void *item, void *cb_data)
Definition: ttg.h:3975
const auto & get_output_terminals() const
Definition: ttg.h:1267
ttg::meta::drop_void_t< ttg::meta::add_glvalue_reference_tuple_t< input_tuple_type > > input_refs_tuple_type
Definition: ttg.h:1236
std::enable_if_t<!ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > set_arg_local(const Key &key, Value &&value)
Definition: ttg.h:2288
std::enable_if_t<!ttg::meta::is_void_v< Key > &&!ttg::meta::is_empty_tuple_v< input_values_tuple_type >, void > invoke(const Key &key, const input_values_tuple_type &args)
Definition: ttg.h:4128
std::enable_if_t< ttg::meta::is_none_void_v< Key >, void > set_args(std::index_sequence< Is... > is, const Key &key, const std::tuple< Ts... > &args)
Definition: ttg.h:3071
ttg::detail::input_terminals_tuple_t< keyT, input_tuple_type > input_terminals_type
Definition: ttg.h:1227
static void static_set_arg(void *data, std::size_t size, ttg::TTBase *bop)
Definition: ttg.h:1941
void set_input_reducer(Reducer &&reducer, std::size_t size)
Definition: ttg.h:4107
output_terminalsT output_terminals_type
Definition: ttg.h:1242
detail::reducer_task_t * create_new_reducer_task(task_t *task, bool is_first)
Definition: ttg.h:2345
std::enable_if_t< ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > set_arg_local(const Value &value)
Definition: ttg.h:2306
std::enable_if_t< ttg::meta::is_void_v< Key > &&!ttg::meta::is_empty_tuple_v< input_values_tuple_type >, void > invoke(const input_values_tuple_type &args)
Definition: ttg.h:4145
void set_input_reducer(Reducer &&reducer)
Definition: ttg.h:4026
decltype(priomap) const & get_priomap() const
Definition: ttg.h:4258
ttg::detail::edges_tuple_t< keyT, ttg::meta::decayed_typelist_t< input_tuple_type > > input_edges_type
Definition: ttg.h:1229
std::enable_if_t< ttg::meta::is_none_void_v< Key >, void > set_args(std::index_sequence< Is... >, std::index_sequence< Js... >, const Key &key, const std::tuple< Ts... > &args)
Definition: ttg.h:3059
bool can_inline_data(Value *value_ptr, detail::ttg_data_copy_t *copy, const Key &key, std::size_t num_keys)
Definition: ttg.h:2646
void copy_mark_pushout(detail::ttg_data_copy_t *copy)
Definition: ttg.h:3337
void get_from_pull_msg(void *data, std::size_t size)
Definition: ttg.h:2274
static constexpr int numinvals
Definition: ttg.h:1238
std::enable_if_t< ttg::meta::is_void_v< Key >, void > set_arg()
Definition: ttg.h:2635
TT(const std::string &name, const std::vector< std::string > &innames, const std::vector< std::string > &outnames, ttg::World world, keymapT &&keymap_=keymapT(), priomapT &&priomap_=priomapT())
Definition: ttg.h:3781
ttg::World get_world() const override final
Definition: ttg.h:1323
void set_defer_writer(bool value)
Definition: ttg.h:4231
void make_executable() override
Marks this executable.
Definition: ttg.h:4240
std::enable_if_t< ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > set_arg_local(std::shared_ptr< const Value > &valueptr)
Definition: ttg.h:2312
void release_task(task_t *task, parsec_task_t **task_ring=nullptr)
Definition: ttg.h:2573
virtual void release() override
Definition: ttg.h:3989
void set_devicemap(Devicemap &&dm)
Definition: ttg.h:4273
std::enable_if_t< key_is_void, void > finalize_argstream()
finalizes stream for input i
Definition: ttg.h:3290
std::enable_if_t<!ttg::meta::is_void_v< Key >, void > set_argstream_size(const Key &key, std::size_t size)
Definition: ttg.h:3119
std::enable_if_t< ttg::meta::is_void_v< Key >, void > set_argstream_size(std::size_t size)
Definition: ttg.h:3180
void register_static_op_function(void)
Definition: ttg.h:4299
static resultT get(InTuple &&intuple)
Definition: ttg.h:1246
static auto & get(InTuple &&intuple)
Definition: ttg.h:1250
void broadcast_arg_local(Iterator &&begin, Iterator &&end, const Value &value)
Definition: ttg.h:2850
actual_input_tuple_type input_args_type
Definition: ttg.h:1228
void set_priomap(Priomap &&pm)
Definition: ttg.h:4263
std::enable_if_t< ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > prepare_send(const Value &value)
Definition: ttg.h:3480
std::enable_if_t< ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > set_arg_local(Value &&value)
Definition: ttg.h:2294
std::enable_if_t< ttg::meta::is_void_v< Key > &&ttg::meta::is_empty_tuple_v< input_values_tuple_type >, void > invoke()
Definition: ttg.h:4173
ttg::meta::drop_void_t< ttg::meta::decayed_typelist_t< input_tuple_type > > input_values_tuple_type
Definition: ttg.h:1235
std::tuple_element_t< i, output_terminalsT > * out()
Definition: ttg.h:4122
void argstream_set_size_from_msg(void *data, std::size_t size)
Definition: ttg.h:2251
TT(const std::string &name, const std::vector< std::string > &innames, const std::vector< std::string > &outnames, keymapT &&keymap=keymapT(ttg::default_execution_context()), priomapT &&priomap=priomapT())
Definition: ttg.h:3933
ttg::meta::void_to_Void_tuple_t< ttg::meta::decayed_typelist_t< actual_input_tuple_type > > input_values_full_tuple_type
Definition: ttg.h:1232
void set_static_argstream_size(std::size_t size)
Definition: ttg.h:3100
std::enable_if_t< ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > set_arg(Value &&value)
Definition: ttg.h:2630
void set_arg_local_impl(const Key &key, Value &&value, detail::ttg_data_copy_t *copy_in=nullptr, parsec_task_t **task_ring=nullptr)
Definition: ttg.h:2372
static constexpr bool derived_has_device_op()
Definition: ttg.h:1221
static constexpr const ttg::Runtime runtime
Definition: ttg.h:4018
static constexpr bool derived_has_cuda_op()
Definition: ttg.h:1194
std::enable_if_t< ttg::meta::is_void_v< Key >, void > set_args(std::index_sequence< Is... >, std::index_sequence< Js... >, const std::tuple< Ts... > &args)
Definition: ttg.h:3080
void increment_created()
Definition: ttg.h:454
virtual void execute() override
Definition: ttg.h:395
static constexpr int parsec_ttg_rma_tag()
Definition: ttg.h:391
void decrement_inflight_msg()
Definition: ttg.h:457
WorldImpl & operator=(const WorldImpl &other)=delete
void destroy_tpool()
Definition: ttg.h:406
const ttg::Edge & ctl_edge() const
Definition: ttg.h:452
void increment_inflight_msg()
Definition: ttg.h:456
WorldImpl(const WorldImpl &other)=delete
void register_tt_profiling(const TT< keyT, output_terminalsT, derivedT, input_valueTs > *t)
Definition: ttg.h:514
virtual void profile_off() override
Definition: ttg.h:486
void create_tpool()
Definition: ttg.h:333
WorldImpl(int *argc, char **argv[], int ncores, parsec_context_t *c=nullptr)
Definition: ttg.h:271
ttg::Edge & ctl_edge()
Definition: ttg.h:450
MPI_Comm comm() const
Definition: ttg.h:393
bool mpi_support(ttg::ExecutionSpace space)
Definition: ttg.h:500
virtual bool profiling() override
Definition: ttg.h:498
virtual void dag_off() override
Definition: ttg.h:477
virtual void fence_impl(void) override
Definition: ttg.h:553
virtual void dag_on(const std::string &filename) override
Definition: ttg.h:461
static constexpr int parsec_ttg_tag()
Definition: ttg.h:390
virtual void final_task() override
Definition: ttg.h:504
auto * taskpool()
Definition: ttg.h:331
virtual void destroy() override
Definition: ttg.h:419
virtual void profile_on() override
Definition: ttg.h:492
WorldImpl(WorldImpl &&other)=delete
bool dag_profiling() override
Definition: ttg.h:459
auto * execution_stream()
Definition: ttg.h:330
auto * context()
Definition: ttg.h:329
WorldImpl & operator=(WorldImpl &&other)=delete
rma_delayed_activate(std::vector< KeyT > &&key, detail::ttg_data_copy_t *copy, int num_transfers, ActivationCallbackT cb)
Definition: ttg.h:840
constexpr auto data(C &c) -> decltype(c.data())
Definition: span.h:189
typename make_index_sequence_t< I... >::type make_index_sequence
std::integral_constant< bool,(Flags &const_) !=0 > is_const
void set_default_world(WorldT &world)
Definition: world.h:29
void deregister_world(ttg::base::WorldImplBase &world)
bool force_device_comm()
Definition: env.cpp:33
typename input_terminals_tuple< keyT, valuesT... >::type input_terminals_tuple_t
Definition: terminal.h:354
void register_world(ttg::base::WorldImplBase &world)
int num_threads()
Determine the number of compute threads to use by TTG when not given to ttg::initialize
Definition: env.cpp:15
typename edges_tuple< keyT, valuesT >::type edges_tuple_t
Definition: edge.h:191
typename typelist_to_tuple< T >::type typelist_to_tuple_t
Definition: typelist.h:52
ttg_data_copy_t * find_copy_in_task(parsec_ttg_task_base_t *task, const void *ptr)
Definition: ttg.h:630
ttg_parsec::detail::ttg_data_copy_t * get_copy(ttg_parsec::Ptr< T > &p)
Definition: ptr.h:277
bool & initialized_mpi()
Definition: ttg.h:226
parsec_hook_return_t evaluate_level_zero(const parsec_task_t *parsec_task)
Definition: ttg.h:822
ttg::device::Device parsec_device_to_ttg_device(int parsec_id)
Definition: device.h:30
parsec_hook_return_t hook_level_zero(struct parsec_execution_stream_s *es, parsec_task_t *parsec_task)
Definition: ttg.h:790
int first_device_id
Definition: device.h:12
std::size_t max_inline_size
Definition: ttg.h:192
int find_index_of_copy_in_task(parsec_ttg_task_base_t *task, const void *ptr)
Definition: ttg.h:645
int ttg_device_to_parsec_device(const ttg::device::Device &device)
Definition: device.h:18
const parsec_symbol_t parsec_taskclass_param1
Definition: ttg.h:605
parsec_hook_return_t evaluate_cuda(const parsec_task_t *parsec_task)
Definition: ttg.h:802
bool add_copy_to_task(ttg_data_copy_t *copy, parsec_ttg_task_base_t *task)
Definition: ttg.h:659
constexpr const int PARSEC_TTG_MAX_AM_SIZE
Definition: ttg.h:174
void remove_data_copy(ttg_data_copy_t *copy, parsec_ttg_task_base_t *task)
Definition: ttg.h:673
parsec_hook_return_t hook(struct parsec_execution_stream_s *es, parsec_task_t *parsec_task)
Definition: ttg.h:759
ttg_data_copy_t * register_data_copy(ttg_data_copy_t *copy_in, parsec_ttg_task_base_t *task, bool readonly)
Definition: ttg.h:937
parsec_hook_return_t evaluate_hip(const parsec_task_t *parsec_task)
Definition: ttg.h:812
const parsec_symbol_t parsec_taskclass_param2
Definition: ttg.h:613
parsec_hook_return_t hook_hip(struct parsec_execution_stream_s *es, parsec_task_t *parsec_task)
Definition: ttg.h:779
ttg_data_copy_t * create_new_datacopy(Value &&value)
Definition: ttg.h:701
parsec_hook_return_t hook_cuda(struct parsec_execution_stream_s *es, parsec_task_t *parsec_task)
Definition: ttg.h:768
thread_local parsec_ttg_task_base_t * parsec_ttg_caller
Definition: thread_local.h:12
void transfer_ownership(parsec_ttg_task_t< TT > *me, int device, std::index_sequence< Is... >)
Definition: ttg.h:752
void release_data_copy(ttg_data_copy_t *copy)
Definition: ttg.h:884
void transfer_ownership_impl(ttg_data_copy_t *copy, int device)
Definition: ttg.h:744
const parsec_symbol_t parsec_taskclass_param3
Definition: ttg.h:621
const parsec_symbol_t parsec_taskclass_param0
Definition: ttg.h:597
this contains PaRSEC-based TTG functionality
Definition: fwd.h:18
void ttg_fence(ttg::World world)
Definition: ttg.h:1093
std::tuple< int, void *, size_t > static_set_arg_fct_arg_t
Definition: ttg.h:138
std::map< uint64_t, static_set_arg_fct_call_t > static_id_to_op_map
Definition: ttg.h:136
std::multimap< uint64_t, static_set_arg_fct_arg_t > delayed_unpack_actions
Definition: ttg.h:139
void ttg_finalize()
Definition: ttg.h:1079
void ttg_register_ptr(ttg::World world, const std::shared_ptr< T > &ptr)
Definition: ttg.h:1096
std::mutex static_map_mutex
Definition: ttg.h:137
void ttg_register_callback(ttg::World world, Callback &&callback)
Definition: ttg.h:1110
ttg::Edge & ttg_ctl_edge(ttg::World world)
Definition: ttg.h:1114
void make_executable_hook(ttg::World &)
Definition: ttg.h:1122
void(* static_set_arg_fct_type)(void *, size_t, ttg::TTBase *)
Definition: ttg.h:134
void ttg_initialize(int argc, char **argv, int num_threads=-1, parsec_context_s *=nullptr)
void ttg_register_status(ttg::World world, const std::shared_ptr< std::promise< void >> &status_ptr)
Definition: ttg.h:1105
ttg::World ttg_default_execution_context()
Definition: ttg.h:1089
std::pair< static_set_arg_fct_type, ttg::TTBase * > static_set_arg_fct_call_t
Definition: ttg.h:135
void ttg_execute(ttg::World world)
Definition: ttg.h:1092
void ttg_sum(ttg::World world, double &value)
Definition: ttg.h:1116
top-level TTG namespace contains runtime-neutral functionality
Definition: keymap.h:8
ExecutionSpace
denotes task execution space
Definition: execution.h:17
int size(World world=default_execution_context())
Definition: run.h:89
void abort()
Aborts the TTG program using the default backend's ttg_abort method.
Definition: run.h:62
Runtime
Definition: runtimes.h:15
World default_execution_context()
Accesses the default backend's default execution context.
Definition: run.h:68
TTG_CXX_COROUTINE_NAMESPACE::coroutine_handle< Promise > coroutine_handle
Definition: coroutine.h:24
void print(const T &t, const Ts &... ts)
atomically prints to std::cout a sequence of items (separated by ttg::print_separator) followed by st...
Definition: print.h:130
void print_error(const T &t, const Ts &... ts)
atomically prints to std::cerr a sequence of items (separated by ttg::print_separator) followed by st...
Definition: print.h:138
bool tracing()
returns whether tracing is enabled
Definition: trace.h:28
int rank(World world=default_execution_context())
Definition: run.h:85
ttg::World & get_default_world()
Definition: world.h:80
void trace(const T &t, const Ts &... ts)
Definition: trace.h:43
@ ResumableTask
-> ttg::resumable_task
@ Invalid
not a coroutine, i.e. a standard task function, -> void
@ DeviceTask
-> ttg::device::Task
#define TTG_PARSEC_DATA_FLAG_REGISTERED
Definition: parsec-ext.h:5
Provides (de)serialization of C++ data that can be invoked from C via ttg_data_descriptor.
const Value & operator()(const Value &value)
Definition: ttg.h:4440
std::conditional_t< std::is_reference_v< Value >, Value, Value && > operator()(Value &&value)
Definition: ttg.h:4380
value_copy_handler & operator=(value_copy_handler &&h)
Definition: ttg.h:4364
value_copy_handler(const value_copy_handler &h)=delete
std::add_lvalue_reference_t< Value > operator()(ttg_parsec::detail::persistent_value_ref< Value > vref)
Definition: ttg.h:4420
value_copy_handler & operator=(const value_copy_handler &h)=delete
Computes hash values for objects of type T.
Definition: hash.h:81
task that can be resumed after some events occur
Definition: coroutine.h:53
parsec_hash_table_t tasks_table
Definition: ttg.h:1152
parsec_gpu_task_t * gpu_task
Definition: task.h:14
msg_header_t tt_id
Definition: ttg.h:177
static constexpr std::size_t max_payload_size
Definition: ttg.h:178
msg_t(uint64_t tt_id, uint32_t taskpool_id, msg_header_t::fn_id_t fn_id, int32_t param_id, int sender, int num_keys=1)
Definition: ttg.h:182
unsigned char bytes[max_payload_size]
Definition: ttg.h:179
ttg_parsec_data_flags data_flags
Definition: task.h:136
parsec_hash_table_item_t tt_ht_item
Definition: task.h:94
std::array< stream_info_t, num_streams > streams
Definition: task.h:204
ttg_data_copy_t * copies[num_copies]
Definition: task.h:210
lvalue_reference_type value_ref
Definition: ttvalue.h:88
static void drop_all_ptr()
Definition: ptr.h:117
static void release_task(parsec_ttg_task_base_t *task_base)
Definition: task.h:342
parsec_task_t * get_next_task() const
ttg::span< ttg::iovec > iovec_span()
void transfer_ownership(int access, int device=0)
void set_next_task(parsec_task_t *task)
enum ttg_parsec::msg_header_t::fn_id fn_id_t
uint32_t taskpool_id
Definition: ttg.h:148
std::int8_t num_iovecs
Definition: ttg.h:152
std::size_t key_offset
Definition: ttg.h:150
msg_header_t(fn_id_t fid, uint32_t tid, uint64_t oid, int32_t pid, int sender, int nk)
Definition: ttg.h:160
#define TTG_PROCESS_TT_OP_RETURN(result, id, invoke)
Definition: tt.h:181
int parsec_add_fetch_runtime_task(parsec_taskpool_t *tp, int tasks)
void parsec_taskpool_termination_detected(parsec_taskpool_t *tp)
#define TTG_PARSEC_DEFER_WRITER
Definition: ttg.h:13