ttg.h
Go to the documentation of this file.
1 // clang-format off
2 #ifndef PARSEC_TTG_H_INCLUDED
3 #define PARSEC_TTG_H_INCLUDED
4 
5 /* set up env if this header was included directly */
6 #if !defined(TTG_IMPL_NAME)
7 #define TTG_USE_PARSEC 1
8 #endif // !defined(TTG_IMPL_NAME)
9 
10 /* Whether to defer a potential writer if there are readers.
11  * This may avoid extra copies in exchange for concurrency.
12  * This may cause deadlocks, so use with caution. */
13 #define TTG_PARSEC_DEFER_WRITER false
14 
15 #include "ttg/config.h"
16 
17 #include "ttg/impl_selector.h"
18 
19 /* include ttg header to make symbols available in case this header is included directly */
20 #include "../../ttg.h"
21 
22 #include "ttg/base/keymap.h"
23 #include "ttg/base/tt.h"
24 #include "ttg/base/world.h"
25 #include "ttg/constraint.h"
26 #include "ttg/edge.h"
27 #include "ttg/execution.h"
28 #include "ttg/func.h"
29 #include "ttg/runtimes.h"
30 #include "ttg/terminal.h"
31 #include "ttg/tt.h"
32 #include "ttg/util/env.h"
33 #include "ttg/util/hash.h"
34 #include "ttg/util/meta.h"
35 #include "ttg/util/meta/callable.h"
36 #include "ttg/util/print.h"
37 #include "ttg/util/scope_exit.h"
38 #include "ttg/util/trace.h"
39 #include "ttg/util/typelist.h"
40 #ifdef TTG_HAVE_DEVICE
41 #include "ttg/device/task.h"
42 #endif // TTG_HAVE_DEVICE
43 
45 
46 #include "ttg/parsec/fwd.h"
47 
48 #include "ttg/parsec/buffer.h"
51 #include "ttg/parsec/devicefunc.h"
52 #include "ttg/parsec/ttvalue.h"
53 
54 #include <algorithm>
55 #include <array>
56 #include <cassert>
57 #include <cstring>
58 #include <experimental/type_traits>
59 #include <functional>
60 #include <future>
61 #include <iostream>
62 #include <list>
63 #include <map>
64 #include <memory>
65 #include <mutex>
66 #include <numeric>
67 #include <sstream>
68 #include <string>
69 #include <tuple>
70 #include <vector>
71 
72 // needed for MPIX_CUDA_AWARE_SUPPORT
73 #if defined(TTG_HAVE_MPI)
74 #include <mpi.h>
75 #if defined(TTG_HAVE_MPIEXT)
76 #include <mpi-ext.h>
77 #endif // TTG_HAVE_MPIEXT
78 #endif // TTG_HAVE_MPI
79 
80 
81 #include <parsec.h>
82 #include <parsec/class/parsec_hash_table.h>
83 #include <parsec/data_internal.h>
84 #include <parsec/execution_stream.h>
85 #include <parsec/interfaces/interface.h>
86 #include <parsec/mca/device/device.h>
87 #include <parsec/parsec_comm_engine.h>
88 #include <parsec/parsec_internal.h>
89 #include <parsec/scheduling.h>
90 #include <parsec/remote_dep.h>
91 
92 #ifdef PARSEC_HAVE_DEV_CUDA_SUPPORT
93 #include <parsec/mca/device/cuda/device_cuda.h>
94 #endif // PARSEC_HAVE_DEV_CUDA_SUPPORT
95 #ifdef PARSEC_HAVE_DEV_HIP_SUPPORT
96 #include <parsec/mca/device/hip/device_hip.h>
97 #endif // PARSEC_HAVE_DEV_HIP_SUPPORT
98 #ifdef PARSEC_HAVE_DEV_LEVEL_ZERO_SUPPORT
99 #include <parsec/mca/device/level_zero/device_level_zero.h>
100 #endif //PARSEC_HAVE_DEV_LEVEL_ZERO_SUPPORT
101 
102 #include <parsec/mca/device/device_gpu.h>
103 #if defined(PARSEC_PROF_TRACE)
104 #include <parsec/profiling.h>
105 #undef PARSEC_TTG_PROFILE_BACKEND
106 #if defined(PARSEC_PROF_GRAPHER)
107 #include <parsec/parsec_prof_grapher.h>
108 #endif
109 #endif
110 #include <cstdlib>
111 #include <cstring>
112 
113 #if defined(TTG_PARSEC_DEBUG_TRACK_DATA_COPIES)
114 #include <unordered_set>
115 #endif
116 
118 #include "ttg/parsec/thread_local.h"
119 #include "ttg/parsec/ptr.h"
120 #include "ttg/parsec/task.h"
121 #include "ttg/parsec/parsec-ext.h"
122 
123 #include "ttg/device/device.h"
124 
125 #include <boost/type_index.hpp>
126 
127 #undef TTG_PARSEC_DEBUG_TRACK_DATA_COPIES
128 
129 /* PaRSEC function declarations */
130 extern "C" {
131 void parsec_taskpool_termination_detected(parsec_taskpool_t *tp);
132 int parsec_add_fetch_runtime_task(parsec_taskpool_t *tp, int tasks);
133 }
134 
135 namespace ttg_parsec {
136  typedef void (*static_set_arg_fct_type)(void *, size_t, ttg::TTBase *);
137  typedef std::pair<static_set_arg_fct_type, ttg::TTBase *> static_set_arg_fct_call_t;
138  inline std::map<uint64_t, static_set_arg_fct_call_t> static_id_to_op_map;
139  inline std::mutex static_map_mutex;
140  typedef std::tuple<int, void *, size_t> static_set_arg_fct_arg_t;
141  inline std::multimap<uint64_t, static_set_arg_fct_arg_t> delayed_unpack_actions;
142 
143  struct msg_header_t {
144  typedef enum fn_id : std::int8_t {
150  uint32_t taskpool_id = std::numeric_limits<uint32_t>::max();
151  uint64_t op_id = std::numeric_limits<uint64_t>::max();
152  std::size_t key_offset = 0;
154  std::int8_t num_iovecs = 0;
155  bool inline_data = false;
156  int32_t param_id = -1;
157  int num_keys = 0;
158  int sender = -1;
159 
160  msg_header_t() = default;
161 
162  msg_header_t(fn_id_t fid, uint32_t tid, uint64_t oid, int32_t pid, int sender, int nk)
163  : fn_id(fid)
164  , taskpool_id(tid)
165  , op_id(oid)
166  , param_id(pid)
167  , num_keys(nk)
168  , sender(sender)
169  { }
170  };
171 
172  static void unregister_parsec_tags(void *_);
173 
174  namespace detail {
175 
176  constexpr const int PARSEC_TTG_MAX_AM_SIZE = 1 * 1024*1024;
177 
178  struct msg_t {
180  static constexpr std::size_t max_payload_size = PARSEC_TTG_MAX_AM_SIZE - sizeof(msg_header_t);
181  unsigned char bytes[max_payload_size];
182 
183  msg_t() = default;
184  msg_t(uint64_t tt_id,
185  uint32_t taskpool_id,
186  msg_header_t::fn_id_t fn_id,
187  int32_t param_id,
188  int sender,
189  int num_keys = 1)
190  : tt_id(fn_id, taskpool_id, tt_id, param_id, sender, num_keys)
191  {}
192  };
193 
195 
196  static int static_unpack_msg(parsec_comm_engine_t *ce, uint64_t tag, void *data, long unsigned int size,
197  int src_rank, void *obj) {
198  static_set_arg_fct_type static_set_arg_fct;
199  parsec_taskpool_t *tp = NULL;
200  msg_header_t *msg = static_cast<msg_header_t *>(data);
201  uint64_t op_id = msg->op_id;
202  tp = parsec_taskpool_lookup(msg->taskpool_id);
203  assert(NULL != tp);
204  static_map_mutex.lock();
205  try {
206  auto op_pair = static_id_to_op_map.at(op_id);
207  static_map_mutex.unlock();
208  tp->tdm.module->incoming_message_start(tp, src_rank, NULL, NULL, 0, NULL);
209  static_set_arg_fct = op_pair.first;
210  static_set_arg_fct(data, size, op_pair.second);
211  tp->tdm.module->incoming_message_end(tp, NULL);
212  return 0;
213  } catch (const std::out_of_range &e) {
214  void *data_cpy = malloc(size);
215  assert(data_cpy != 0);
216  memcpy(data_cpy, data, size);
217  ttg::trace("ttg_parsec(", ttg_default_execution_context().rank(), ") Delaying delivery of message (", src_rank,
218  ", ", op_id, ", ", data_cpy, ", ", size, ")");
219  delayed_unpack_actions.insert(std::make_pair(op_id, std::make_tuple(src_rank, data_cpy, size)));
220  static_map_mutex.unlock();
221  return 1;
222  }
223  }
224 
225  static int get_remote_complete_cb(parsec_comm_engine_t *ce, parsec_ce_tag_t tag, void *msg, size_t msg_size,
226  int src, void *cb_data);
227 
228  inline bool &initialized_mpi() {
229  static bool im = false;
230  return im;
231  }
232 
234 
235  } // namespace detail
236 
238  ttg::Edge<> m_ctl_edge;
239  bool _dag_profiling;
240  bool _task_profiling;
241  std::array<bool, static_cast<std::size_t>(ttg::ExecutionSpace::Invalid)>
242  mpi_space_support = {true, false, false};
243 
244  int query_comm_size() {
245  int comm_size;
246  MPI_Comm_size(MPI_COMM_WORLD, &comm_size);
247  return comm_size;
248  }
249 
250  int query_comm_rank() {
251  int comm_rank;
252  MPI_Comm_rank(MPI_COMM_WORLD, &comm_rank);
253  return comm_rank;
254  }
255 
256  static void ttg_parsec_ce_up(parsec_comm_engine_t *comm_engine, void *user_data)
257  {
258  parsec_ce.tag_register(WorldImpl::parsec_ttg_tag(), &detail::static_unpack_msg, user_data, detail::PARSEC_TTG_MAX_AM_SIZE);
259  parsec_ce.tag_register(WorldImpl::parsec_ttg_rma_tag(), &detail::get_remote_complete_cb, user_data, 128);
260  }
261 
262  static void ttg_parsec_ce_down(parsec_comm_engine_t *comm_engine, void *user_data)
263  {
264  parsec_ce.tag_unregister(WorldImpl::parsec_ttg_tag());
265  parsec_ce.tag_unregister(WorldImpl::parsec_ttg_rma_tag());
266  }
267 
268  public:
269 #if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND)
270  int parsec_ttg_profile_backend_set_arg_start, parsec_ttg_profile_backend_set_arg_end;
271  int parsec_ttg_profile_backend_bcast_arg_start, parsec_ttg_profile_backend_bcast_arg_end;
272  int parsec_ttg_profile_backend_allocate_datacopy, parsec_ttg_profile_backend_free_datacopy;
273 #endif
274 
275  WorldImpl(int *argc, char **argv[], int ncores, parsec_context_t *c = nullptr)
276  : WorldImplBase(query_comm_size(), query_comm_rank())
277  , ctx(c)
278  , own_ctx(c == nullptr)
279 #if defined(PARSEC_PROF_TRACE)
280  , profiling_array(nullptr)
281  , profiling_array_size(0)
282 #endif
283  , _dag_profiling(false)
284  , _task_profiling(false)
285  {
287  if (own_ctx) ctx = parsec_init(ncores, argc, argv);
288 
289  /* query MPI device support */
291 #if defined(MPIX_CUDA_AWARE_SUPPORT) && MPIX_CUDA_AWARE_SUPPORT
292  || MPIX_Query_cuda_support()
293 #endif // MPIX_CUDA_AWARE_SUPPORT
294  ) {
295  mpi_space_support[static_cast<std::size_t>(ttg::ExecutionSpace::CUDA)] = true;
296  }
297 
299 #if defined(MPIX_HIP_AWARE_SUPPORT) && MPIX_HIP_AWARE_SUPPORT
300  || MPIX_Query_hip_support()
301 #endif // MPIX_HIP_AWARE_SUPPORT
302  ) {
303  mpi_space_support[static_cast<std::size_t>(ttg::ExecutionSpace::HIP)] = true;
304  }
305 
306 #if defined(PARSEC_PROF_TRACE)
307  if(parsec_profile_enabled) {
308  profile_on();
309 #if defined(PARSEC_TTG_PROFILE_BACKEND)
310  parsec_profiling_add_dictionary_keyword("PARSEC_TTG_SET_ARG_IMPL", "fill:000000", 0, NULL,
311  (int*)&parsec_ttg_profile_backend_set_arg_start,
312  (int*)&parsec_ttg_profile_backend_set_arg_end);
313  parsec_profiling_add_dictionary_keyword("PARSEC_TTG_BCAST_ARG_IMPL", "fill:000000", 0, NULL,
314  (int*)&parsec_ttg_profile_backend_bcast_arg_start,
315  (int*)&parsec_ttg_profile_backend_bcast_arg_end);
316  parsec_profiling_add_dictionary_keyword("PARSEC_TTG_DATACOPY", "fill:000000",
317  sizeof(size_t), "size{int64_t}",
318  (int*)&parsec_ttg_profile_backend_allocate_datacopy,
319  (int*)&parsec_ttg_profile_backend_free_datacopy);
320 #endif
321  }
322 #endif
323 
324  if( NULL != parsec_ce.tag_register) {
325  parsec_ce.tag_register(WorldImpl::parsec_ttg_tag(), &detail::static_unpack_msg, this, detail::PARSEC_TTG_MAX_AM_SIZE);
326  parsec_ce.tag_register(WorldImpl::parsec_ttg_rma_tag(), &detail::get_remote_complete_cb, this, 128);
327  }
328 
329  create_tpool();
330  }
331 
332 
333  auto *context() { return ctx; }
334  auto *execution_stream() { return parsec_my_execution_stream(); }
335  auto *taskpool() { return tpool; }
336 
337  void create_tpool() {
338  assert(nullptr == tpool);
339  tpool = PARSEC_OBJ_NEW(parsec_taskpool_t);
340  tpool->taskpool_id = std::numeric_limits<uint32_t>::max();
341  tpool->update_nb_runtime_task = parsec_add_fetch_runtime_task;
342  tpool->taskpool_type = PARSEC_TASKPOOL_TYPE_TTG;
343  tpool->taskpool_name = strdup("TTG Taskpool");
344  parsec_taskpool_reserve_id(tpool);
345 
346  tpool->devices_index_mask = 0;
347  for(int i = 0; i < (int)parsec_nb_devices; i++) {
348  parsec_device_module_t *device = parsec_mca_device_get(i);
349  if( NULL == device ) continue;
350  tpool->devices_index_mask |= (1 << device->device_index);
351  }
352 
353 #ifdef TTG_USE_USER_TERMDET
354  parsec_termdet_open_module(tpool, "user_trigger");
355 #else // TTG_USE_USER_TERMDET
356  parsec_termdet_open_dyn_module(tpool);
357 #endif // TTG_USE_USER_TERMDET
358  tpool->tdm.module->monitor_taskpool(tpool, parsec_taskpool_termination_detected);
359  // In TTG, we use the pending actions to denote that the
360  // taskpool is not ready, i.e. some local tasks could still
361  // be added by the main thread. It should then be initialized
362  // to 0, execute will set it to 1 and mark the tpool as ready,
363  // and the fence() will decrease it back to 0.
364  tpool->tdm.module->taskpool_set_runtime_actions(tpool, 0);
365  parsec_taskpool_enable(tpool, NULL, NULL, execution_stream(), size() > 1);
366 
367 #if defined(PARSEC_PROF_TRACE)
368  tpool->profiling_array = profiling_array;
369 #endif
370 
371  // Termination detection in PaRSEC requires to synchronize the
372  // taskpool enabling, to avoid a race condition that would keep
373  // termination detection-related messages in a waiting queue
374  // forever
375  MPI_Barrier(comm());
376 
377  parsec_taskpool_started = false;
378  }
379 
380  /* Deleted copy ctor */
381  WorldImpl(const WorldImpl &other) = delete;
382 
383  /* Deleted move ctor */
384  WorldImpl(WorldImpl &&other) = delete;
385 
386  /* Deleted copy assignment */
387  WorldImpl &operator=(const WorldImpl &other) = delete;
388 
389  /* Deleted move assignment */
390  WorldImpl &operator=(WorldImpl &&other) = delete;
391 
393 
394  static constexpr int parsec_ttg_tag() { return PARSEC_DSL_TTG_TAG; }
395  static constexpr int parsec_ttg_rma_tag() { return PARSEC_DSL_TTG_RMA_TAG; }
396 
397  MPI_Comm comm() const { return MPI_COMM_WORLD; }
398 
399  virtual void execute() override {
400  if (!parsec_taskpool_started) {
401  parsec_enqueue(ctx, tpool);
402  tpool->tdm.module->taskpool_addto_runtime_actions(tpool, 1);
403  tpool->tdm.module->taskpool_ready(tpool);
404  [[maybe_unused]] auto ret = parsec_context_start(ctx);
405  // ignore ret since all of its nonzero values are OK (e.g. -1 due to ctx already being active)
406  parsec_taskpool_started = true;
407  }
408  }
409 
410  void destroy_tpool() {
411 #if defined(PARSEC_PROF_TRACE)
412  // We don't want to release the profiling array, as it should be persistent
413  // between fences() to allow defining a TT/TTG before a fence() and schedule
414  // it / complete it after a fence()
415  tpool->profiling_array = nullptr;
416 #endif
417  assert(NULL != tpool->tdm.monitor);
418  tpool->tdm.module->unmonitor_taskpool(tpool);
419  parsec_taskpool_free(tpool);
420  tpool = nullptr;
421  }
422 
423  virtual void destroy() override {
424  if (is_valid()) {
425  if (parsec_taskpool_started) {
426  // We are locally ready (i.e. we won't add new tasks)
427  tpool->tdm.module->taskpool_addto_runtime_actions(tpool, -1);
428  ttg::trace("ttg_parsec(", this->rank(), "): final waiting for completion");
429  if (own_ctx)
430  parsec_context_wait(ctx);
431  else
432  parsec_taskpool_wait(tpool);
433  }
434  release_ops();
436  destroy_tpool();
437  if (own_ctx) {
438  unregister_parsec_tags(nullptr);
439  } else {
440  parsec_context_at_fini(unregister_parsec_tags, nullptr);
441  }
442 #if defined(PARSEC_PROF_TRACE)
443  if(nullptr != profiling_array) {
444  free(profiling_array);
445  profiling_array = nullptr;
446  profiling_array_size = 0;
447  }
448 #endif
449  if (own_ctx) parsec_fini(&ctx);
450  mark_invalid();
451  }
452  }
453 
454  ttg::Edge<> &ctl_edge() { return m_ctl_edge; }
455 
456  const ttg::Edge<> &ctl_edge() const { return m_ctl_edge; }
457 
458  void increment_created() { taskpool()->tdm.module->taskpool_addto_nb_tasks(taskpool(), 1); }
459 
460  void increment_inflight_msg() { taskpool()->tdm.module->taskpool_addto_runtime_actions(taskpool(), 1); }
461  void decrement_inflight_msg() { taskpool()->tdm.module->taskpool_addto_runtime_actions(taskpool(), -1); }
462 
463  bool dag_profiling() override { return _dag_profiling; }
464 
465  virtual void dag_on(const std::string &filename) override {
466 #if defined(PARSEC_PROF_GRAPHER)
467  if(!_dag_profiling) {
468  profile_on();
469  size_t len = strlen(filename.c_str())+32;
470  char ext_filename[len];
471  snprintf(ext_filename, len, "%s-%d.dot", filename.c_str(), rank());
472  parsec_prof_grapher_init(ctx, ext_filename);
473  _dag_profiling = true;
474  }
475 #else
476  ttg::print("Error: requested to create '", filename, "' to create a DAG of tasks,\n"
477  "but PaRSEC does not support graphing options. Reconfigure with PARSEC_PROF_GRAPHER=ON\n");
478 #endif
479  }
480 
481  virtual void dag_off() override {
482 #if defined(PARSEC_PROF_GRAPHER)
483  if(_dag_profiling) {
484  parsec_prof_grapher_fini();
485  _dag_profiling = false;
486  }
487 #endif
488  }
489 
490  virtual void profile_off() override {
491 #if defined(PARSEC_PROF_TRACE)
492  _task_profiling = false;
493 #endif
494  }
495 
496  virtual void profile_on() override {
497 #if defined(PARSEC_PROF_TRACE)
498  _task_profiling = true;
499 #endif
500  }
501 
502  virtual bool profiling() override { return _task_profiling; }
503 
505  return mpi_space_support[static_cast<std::size_t>(space)];
506  }
507 
508  virtual void final_task() override {
509 #ifdef TTG_USE_USER_TERMDET
510  if(parsec_taskpool_started) {
511  taskpool()->tdm.module->taskpool_set_nb_tasks(taskpool(), 0);
512  parsec_taskpool_started = false;
513  }
514 #endif // TTG_USE_USER_TERMDET
515  }
516 
517  template <typename keyT, typename output_terminalsT, typename derivedT, typename input_valueTs = ttg::typelist<>>
519 #if defined(PARSEC_PROF_TRACE)
520  std::stringstream ss;
521  build_composite_name_rec(t->ttg_ptr(), ss);
522  ss << t->get_name();
523  register_new_profiling_event(ss.str().c_str(), t->get_instance_id());
524 #endif
525  }
526 
527  protected:
528 #if defined(PARSEC_PROF_TRACE)
529  void build_composite_name_rec(const ttg::TTBase *t, std::stringstream &ss) {
530  if(nullptr == t)
531  return;
532  build_composite_name_rec(t->ttg_ptr(), ss);
533  ss << t->get_name() << "::";
534  }
535 
536  void register_new_profiling_event(const char *name, int position) {
537  if(2*position >= profiling_array_size) {
538  size_t new_profiling_array_size = 64 * ((2*position + 63)/64 + 1);
539  profiling_array = (int*)realloc((void*)profiling_array,
540  new_profiling_array_size * sizeof(int));
541  memset((void*)&profiling_array[profiling_array_size], 0, sizeof(int)*(new_profiling_array_size - profiling_array_size));
542  profiling_array_size = new_profiling_array_size;
543  tpool->profiling_array = profiling_array;
544  }
545 
546  assert(0 == tpool->profiling_array[2*position]);
547  assert(0 == tpool->profiling_array[2*position+1]);
548  // TODO PROFILING: 0 and NULL should be replaced with something that depends on the key human-readable serialization...
549  // Typically, we would put something like 3*sizeof(int32_t), "m{int32_t};n{int32_t};k{int32_t}" to say
550  // there are three fields, named m, n and k, stored in this order, and each of size int32_t
551  parsec_profiling_add_dictionary_keyword(name, "fill:000000", 64, "key{char[64]}",
552  (int*)&tpool->profiling_array[2*position],
553  (int*)&tpool->profiling_array[2*position+1]);
554  }
555 #endif
556 
557  virtual void fence_impl(void) override {
558  int rank = this->rank();
559  if (!parsec_taskpool_started) {
560  ttg::trace("ttg_parsec::(", rank, "): parsec taskpool has not been started, fence is a simple MPI_Barrier");
561  MPI_Barrier(comm());
562  return;
563  }
564  ttg::trace("ttg_parsec::(", rank, "): parsec taskpool is ready for completion");
565  // We are locally ready (i.e. we won't add new tasks)
566  tpool->tdm.module->taskpool_addto_runtime_actions(tpool, -1);
567  ttg::trace("ttg_parsec(", rank, "): waiting for completion");
568  parsec_taskpool_wait(tpool);
569 
570  // We need the synchronization between the end of the context and the restart of the taskpool
571  // until we use parsec_taskpool_wait and implement an epoch in the PaRSEC taskpool
572  // see Issue #118 (TTG)
573  MPI_Barrier(comm());
574 
575  destroy_tpool();
576  create_tpool();
577  execute();
578  }
579 
580  private:
581  parsec_context_t *ctx = nullptr;
582  bool own_ctx = false; //< whether I own the context
583  parsec_taskpool_t *tpool = nullptr;
584  bool parsec_taskpool_started = false;
585 #if defined(PARSEC_PROF_TRACE)
586  int *profiling_array;
587  std::size_t profiling_array_size;
588 #endif
589  };
590 
591  static void unregister_parsec_tags(void *_pidx)
592  {
593  if(NULL != parsec_ce.tag_unregister) {
594  parsec_ce.tag_unregister(WorldImpl::parsec_ttg_tag());
595  parsec_ce.tag_unregister(WorldImpl::parsec_ttg_rma_tag());
596  }
597  }
598 
599  namespace detail {
600 
601  const parsec_symbol_t parsec_taskclass_param0 = {
602  .flags = PARSEC_SYMBOL_IS_STANDALONE|PARSEC_SYMBOL_IS_GLOBAL,
603  .name = "HASH0",
604  .context_index = 0,
605  .min = nullptr,
606  .max = nullptr,
607  .expr_inc = nullptr,
608  .cst_inc = 0 };
609  const parsec_symbol_t parsec_taskclass_param1 = {
610  .flags = PARSEC_SYMBOL_IS_STANDALONE|PARSEC_SYMBOL_IS_GLOBAL,
611  .name = "HASH1",
612  .context_index = 1,
613  .min = nullptr,
614  .max = nullptr,
615  .expr_inc = nullptr,
616  .cst_inc = 0 };
617  const parsec_symbol_t parsec_taskclass_param2 = {
618  .flags = PARSEC_SYMBOL_IS_STANDALONE|PARSEC_SYMBOL_IS_GLOBAL,
619  .name = "KEY0",
620  .context_index = 2,
621  .min = nullptr,
622  .max = nullptr,
623  .expr_inc = nullptr,
624  .cst_inc = 0 };
625  const parsec_symbol_t parsec_taskclass_param3 = {
626  .flags = PARSEC_SYMBOL_IS_STANDALONE|PARSEC_SYMBOL_IS_GLOBAL,
627  .name = "KEY1",
628  .context_index = 3,
629  .min = nullptr,
630  .max = nullptr,
631  .expr_inc = nullptr,
632  .cst_inc = 0 };
633 
635  ttg_data_copy_t *res = nullptr;
636  if (task == nullptr || ptr == nullptr) {
637  return res;
638  }
639  for (int i = 0; i < task->data_count; ++i) {
640  auto copy = static_cast<ttg_data_copy_t *>(task->copies[i]);
641  if (NULL != copy && copy->get_ptr() == ptr) {
642  res = copy;
643  break;
644  }
645  }
646  return res;
647  }
648 
649  inline int find_index_of_copy_in_task(parsec_ttg_task_base_t *task, const void *ptr) {
650  int i = -1;
651  if (task == nullptr || ptr == nullptr) {
652  return i;
653  }
654  for (i = 0; i < task->data_count; ++i) {
655  auto copy = static_cast<ttg_data_copy_t *>(task->copies[i]);
656  if (NULL != copy && copy->get_ptr() == ptr) {
657  return i;
658  }
659  }
660  return -1;
661  }
662 
664  if (task == nullptr || copy == nullptr) {
665  return false;
666  }
667 
668  if (MAX_PARAM_COUNT < task->data_count) {
669  throw std::logic_error("Too many data copies, check MAX_PARAM_COUNT!");
670  }
671 
672  task->copies[task->data_count] = copy;
673  task->data_count++;
674  return true;
675  }
676 
678  int i;
679  /* find and remove entry; copies are usually appended and removed, so start from back */
680  for (i = task->data_count-1; i >= 0; --i) {
681  if (copy == task->copies[i]) {
682  break;
683  }
684  }
685  if (i < 0) return;
686  /* move all following elements one up */
687  for (; i < task->data_count - 1; ++i) {
688  task->copies[i] = task->copies[i + 1];
689  }
690  /* null last element */
691  task->copies[i] = nullptr;
692  task->data_count--;
693  }
694 
695 #if defined(TTG_PARSEC_DEBUG_TRACK_DATA_COPIES)
696 #warning "ttg::PaRSEC enables data copy tracking"
697  static std::unordered_set<ttg_data_copy_t *> pending_copies;
698  static std::mutex pending_copies_mutex;
699 #endif
700 #if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND)
701  static int64_t parsec_ttg_data_copy_uid = 0;
702 #endif
703 
704  template <typename Value>
705  inline ttg_data_copy_t *create_new_datacopy(Value &&value) {
706  using value_type = std::decay_t<Value>;
707  ttg_data_copy_t *copy = nullptr;
708  if constexpr (std::is_base_of_v<ttg::TTValue<value_type>, value_type> &&
709  std::is_constructible_v<value_type, decltype(value)>) {
710  copy = new value_type(std::forward<Value>(value));
711  } else if constexpr (std::is_constructible_v<ttg_data_value_copy_t<value_type>, decltype(value)>) {
712  copy = new ttg_data_value_copy_t<value_type>(std::forward<Value>(value));
713  } else {
714  /* we have no way to create a new copy from this value */
715  throw std::logic_error("Trying to copy-construct data that is not copy-constructible!");
716  }
717 #if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND)
718  // Keep track of additional memory usage
719  if(ttg::default_execution_context().impl().profiling()) {
720  copy->size = sizeof(Value);
721  copy->uid = parsec_atomic_fetch_inc_int64(&parsec_ttg_data_copy_uid);
722  parsec_profiling_ts_trace_flags(ttg::default_execution_context().impl().parsec_ttg_profile_backend_allocate_datacopy,
723  static_cast<uint64_t>(copy->uid),
724  PROFILE_OBJECT_ID_NULL, &copy->size,
725  PARSEC_PROFILING_EVENT_COUNTER|PARSEC_PROFILING_EVENT_HAS_INFO);
726  }
727 #endif
728 #if defined(TTG_PARSEC_DEBUG_TRACK_DATA_COPIES)
729  {
730  const std::lock_guard<std::mutex> lock(pending_copies_mutex);
731  auto rc = pending_copies.insert(copy);
732  assert(std::get<1>(rc));
733  }
734 #endif
735  return copy;
736  }
737 
738 #if 0
739  template <std::size_t... IS, typename Key = keyT>
740  void invoke_pull_terminals(std::index_sequence<IS...>, const Key &key, detail::parsec_ttg_task_base_t *task) {
741  int junk[] = {0, (invoke_pull_terminal<IS>(
742  std::get<IS>(input_terminals), key, task),
743  0)...};
744  junk[0]++;
745  }
746 #endif // 0
747 
748  template<typename TT, std::size_t I>
749  inline void transfer_ownership_impl(ttg_data_copy_t *copy, int device) {
750  if constexpr(!std::is_const_v<std::tuple_element_t<I, typename TT::input_values_tuple_type>>) {
751  copy->transfer_ownership(PARSEC_FLOW_ACCESS_RW, device);
752  copy->inc_current_version();
753  }
754  }
755 
756  template<typename TT, std::size_t... Is>
757  inline void transfer_ownership(parsec_ttg_task_t<TT> *me, int device, std::index_sequence<Is...>) {
758  /* transfer ownership of each data */
759  int junk[] = {0, (transfer_ownership_impl<TT, Is>(me->copies[Is], device), 0)...};
760  junk[0]++;
761  }
762 
763  template<typename TT>
764  inline parsec_hook_return_t hook(struct parsec_execution_stream_s *es, parsec_task_t *parsec_task) {
765  parsec_ttg_task_t<TT> *me = (parsec_ttg_task_t<TT> *)parsec_task;
766  if constexpr(std::tuple_size_v<typename TT::input_values_tuple_type> > 0) {
767  transfer_ownership<TT>(me, 0, std::make_index_sequence<std::tuple_size_v<typename TT::input_values_tuple_type>>{});
768  }
769  return me->template invoke_op<ttg::ExecutionSpace::Host>();
770  }
771 
772  template<typename TT>
773  inline parsec_hook_return_t hook_cuda(struct parsec_execution_stream_s *es, parsec_task_t *parsec_task) {
774  if constexpr(TT::derived_has_cuda_op()) {
775  parsec_ttg_task_t<TT> *me = (parsec_ttg_task_t<TT> *)parsec_task;
776  return me->template invoke_op<ttg::ExecutionSpace::CUDA>();
777  } else {
778  std::cerr << "CUDA hook called without having a CUDA op!" << std::endl;
779  return PARSEC_HOOK_RETURN_ERROR;
780  }
781  }
782 
783  template<typename TT>
784  inline parsec_hook_return_t hook_hip(struct parsec_execution_stream_s *es, parsec_task_t *parsec_task) {
785  if constexpr(TT::derived_has_hip_op()) {
786  parsec_ttg_task_t<TT> *me = (parsec_ttg_task_t<TT> *)parsec_task;
787  return me->template invoke_op<ttg::ExecutionSpace::HIP>();
788  } else {
789  std::cerr << "HIP hook called without having a HIP op!" << std::endl;
790  return PARSEC_HOOK_RETURN_ERROR;
791  }
792  }
793 
794  template<typename TT>
795  inline parsec_hook_return_t hook_level_zero(struct parsec_execution_stream_s *es, parsec_task_t *parsec_task) {
796  if constexpr(TT::derived_has_level_zero_op()) {
797  parsec_ttg_task_t<TT> *me = (parsec_ttg_task_t<TT> *)parsec_task;
798  return me->template invoke_op<ttg::ExecutionSpace::L0>();
799  } else {
800  std::cerr << "L0 hook called without having a L0 op!" << std::endl;
801  return PARSEC_HOOK_RETURN_ERROR;
802  }
803  }
804 
805 
806  template<typename TT>
807  inline parsec_hook_return_t evaluate_cuda(const parsec_task_t *parsec_task) {
808  if constexpr(TT::derived_has_cuda_op()) {
809  parsec_ttg_task_t<TT> *me = (parsec_ttg_task_t<TT> *)parsec_task;
810  return me->template invoke_evaluate<ttg::ExecutionSpace::CUDA>();
811  } else {
812  return PARSEC_HOOK_RETURN_NEXT;
813  }
814  }
815 
816  template<typename TT>
817  inline parsec_hook_return_t evaluate_hip(const parsec_task_t *parsec_task) {
818  if constexpr(TT::derived_has_hip_op()) {
819  parsec_ttg_task_t<TT> *me = (parsec_ttg_task_t<TT> *)parsec_task;
820  return me->template invoke_evaluate<ttg::ExecutionSpace::HIP>();
821  } else {
822  return PARSEC_HOOK_RETURN_NEXT;
823  }
824  }
825 
826  template<typename TT>
827  inline parsec_hook_return_t evaluate_level_zero(const parsec_task_t *parsec_task) {
828  if constexpr(TT::derived_has_level_zero_op()) {
829  parsec_ttg_task_t<TT> *me = (parsec_ttg_task_t<TT> *)parsec_task;
830  return me->template invoke_evaluate<ttg::ExecutionSpace::L0>();
831  } else {
832  return PARSEC_HOOK_RETURN_NEXT;
833  }
834  }
835 
836 
837  template <typename KeyT, typename ActivationCallbackT>
839  std::vector<KeyT> _keylist;
840  std::atomic<int> _outstanding_transfers;
841  ActivationCallbackT _cb;
843 
844  public:
845  rma_delayed_activate(std::vector<KeyT> &&key, detail::ttg_data_copy_t *copy, int num_transfers, ActivationCallbackT cb)
846  : _keylist(std::move(key)), _outstanding_transfers(num_transfers), _cb(cb), _copy(copy) {}
847 
848  bool complete_transfer(void) {
849  int left = --_outstanding_transfers;
850  if (0 == left) {
851  _cb(std::move(_keylist), _copy);
852  return true;
853  }
854  return false;
855  }
856  };
857 
858  template <typename ActivationT>
859  static int get_complete_cb(parsec_comm_engine_t *comm_engine, parsec_ce_mem_reg_handle_t lreg, ptrdiff_t ldispl,
860  parsec_ce_mem_reg_handle_t rreg, ptrdiff_t rdispl, size_t size, int remote,
861  void *cb_data) {
862  parsec_ce.mem_unregister(&lreg);
863  ActivationT *activation = static_cast<ActivationT *>(cb_data);
864  if (activation->complete_transfer()) {
865  delete activation;
866  }
867  return PARSEC_SUCCESS;
868  }
869 
870  static int get_remote_complete_cb(parsec_comm_engine_t *ce, parsec_ce_tag_t tag, void *msg, size_t msg_size,
871  int src, void *cb_data) {
872  std::intptr_t *fn_ptr = static_cast<std::intptr_t *>(msg);
873  std::function<void(void)> *fn = reinterpret_cast<std::function<void(void)> *>(*fn_ptr);
874  (*fn)();
875  delete fn;
876  return PARSEC_SUCCESS;
877  }
878 
879  template <typename FuncT>
880  static int invoke_get_remote_complete_cb(parsec_comm_engine_t *ce, parsec_ce_tag_t tag, void *msg, size_t msg_size,
881  int src, void *cb_data) {
882  std::intptr_t *iptr = static_cast<std::intptr_t *>(msg);
883  FuncT *fn_ptr = reinterpret_cast<FuncT *>(*iptr);
884  (*fn_ptr)();
885  delete fn_ptr;
886  return PARSEC_SUCCESS;
887  }
888 
889  inline void release_data_copy(ttg_data_copy_t *copy) {
890  if (copy->is_mutable() && nullptr == copy->get_next_task()) {
891  /* current task mutated the data but there are no consumers so prepare
892  * the copy to be freed below */
893  copy->reset_readers();
894  }
895 
896  int32_t readers = copy->num_readers();
897  if (readers > 1) {
898  /* potentially more than one reader, decrement atomically */
899  readers = copy->decrement_readers();
900  } else if (readers == 1) {
901  /* make sure readers drop to zero */
902  readers = copy->decrement_readers<false>();
903  }
904  /* if there was only one reader (the current task) or
905  * a mutable copy and a successor, we release the copy */
906  if (1 == readers || readers == copy->mutable_tag) {
907  std::atomic_thread_fence(std::memory_order_acquire);
908  if (nullptr != copy->get_next_task()) {
909  /* Release the deferred task.
910  * The copy was mutable and will be mutated by the released task,
911  * so simply transfer ownership.
912  */
913  parsec_task_t *next_task = copy->get_next_task();
914  copy->set_next_task(nullptr);
915  parsec_ttg_task_base_t *deferred_op = (parsec_ttg_task_base_t *)next_task;
916  copy->mark_mutable();
917  deferred_op->release_task();
918  } else if ((1 == copy->num_ref()) || (1 == copy->drop_ref())) {
919  /* we are the last reference, delete the copy */
920 #if defined(TTG_PARSEC_DEBUG_TRACK_DATA_COPIES)
921  {
922  const std::lock_guard<std::mutex> lock(pending_copies_mutex);
923  size_t rc = pending_copies.erase(copy);
924  assert(1 == rc);
925  }
926 #endif
927 #if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND)
928  // Keep track of additional memory usage
929  if(ttg::default_execution_context().impl().profiling()) {
930  parsec_profiling_ts_trace_flags(ttg::default_execution_context().impl().parsec_ttg_profile_backend_free_datacopy,
931  static_cast<uint64_t>(copy->uid),
932  PROFILE_OBJECT_ID_NULL, &copy->size,
933  PARSEC_PROFILING_EVENT_COUNTER|PARSEC_PROFILING_EVENT_HAS_INFO);
934  }
935 #endif
936  delete copy;
937  }
938  }
939  }
940 
941  template <typename Value>
943  ttg_data_copy_t *copy_res = copy_in;
944  bool replace = false;
945  int32_t readers = copy_in->num_readers();
946  assert(readers != 0);
947 
948  /* try hard to defer writers if we cannot make copies
949  * if deferral fails we have to bail out */
950  bool defer_writer = (!std::is_copy_constructible_v<std::decay_t<Value>>) || task->defer_writer;
951 
952  if (readonly && !copy_in->is_mutable()) {
953  /* simply increment the number of readers */
954  readers = copy_in->increment_readers();
955  }
956 
957  if (readers == copy_in->mutable_tag) {
958  if (copy_res->get_next_task() != nullptr) {
959  if (readonly) {
960  parsec_ttg_task_base_t *next_task = reinterpret_cast<parsec_ttg_task_base_t *>(copy_res->get_next_task());
961  if (next_task->defer_writer) {
962  /* there is a writer but it signalled that it wants to wait for readers to complete */
963  return copy_res;
964  }
965  }
966  }
967  /* someone is going to write into this copy -> we need to make a copy */
968  copy_res = NULL;
969  if (readonly) {
970  /* we replace the copy in a deferred task if the copy will be mutated by
971  * the deferred task and we are readonly.
972  * That way, we can share the copy with other readonly tasks and release
973  * the deferred task. */
974  replace = true;
975  }
976  } else if (!readonly) {
977  /* this task will mutate the data
978  * check whether there are other readers already and potentially
979  * defer the release of this task to give following readers a
980  * chance to make a copy of the data before this task mutates it
981  *
982  * Try to replace the readers with a negative value that indicates
983  * the value is mutable. If that fails we know that there are other
984  * readers or writers already.
985  *
986  * NOTE: this check is not atomic: either there is a single reader
987  * (current task) or there are others, in which we case won't
988  * touch it.
989  */
990  if (1 == copy_in->num_readers() && !defer_writer) {
995  assert(nullptr == copy_in->get_next_task());
996  copy_in->set_next_task(&task->parsec_task);
997  std::atomic_thread_fence(std::memory_order_release);
998  copy_in->mark_mutable();
999  } else {
1000  if (defer_writer && nullptr == copy_in->get_next_task()) {
1001  /* we're the first writer and want to wait for all readers to complete */
1002  copy_res->set_next_task(&task->parsec_task);
1003  task->defer_writer = true;
1004  } else {
1005  /* there are writers and/or waiting already of this copy already, make a copy that we can mutate */
1006  copy_res = NULL;
1007  }
1008  }
1009  }
1010 
1011  if (NULL == copy_res) {
1012  // can only make a copy if Value is copy-constructible ... so this codepath should never be hit
1013  if constexpr (std::is_copy_constructible_v<std::decay_t<Value>>) {
1014  ttg_data_copy_t *new_copy = detail::create_new_datacopy(*static_cast<Value *>(copy_in->get_ptr()));
1015  if (replace && nullptr != copy_in->get_next_task()) {
1016  /* replace the task that was deferred */
1017  parsec_ttg_task_base_t *deferred_op = (parsec_ttg_task_base_t *)copy_in->get_next_task();
1018  new_copy->mark_mutable();
1019  /* replace the copy in the deferred task */
1020  for (int i = 0; i < deferred_op->data_count; ++i) {
1021  if (deferred_op->copies[i] == copy_in) {
1022  deferred_op->copies[i] = new_copy;
1023  break;
1024  }
1025  }
1026  copy_in->set_next_task(nullptr);
1027  deferred_op->release_task();
1028  copy_in->reset_readers(); // set the copy back to being read-only
1029  copy_in->increment_readers<false>(); // register as reader
1030  copy_res = copy_in; // return the copy we were passed
1031  } else {
1032  if (!readonly) {
1033  new_copy->mark_mutable();
1034  }
1035  copy_res = new_copy; // return the new copy
1036  }
1037  }
1038  else {
1039  throw std::logic_error(std::string("TTG::PaRSEC: need to copy a datum of type") + boost::typeindex::type_id<std::decay_t<Value>>().pretty_name() + " but the type is not copyable");
1040  }
1041  }
1042  return copy_res;
1043  }
1044 
1045  } // namespace detail
1046 
1047  inline void ttg_initialize(int argc, char **argv, int num_threads, parsec_context_t *ctx) {
1048  if (detail::initialized_mpi()) throw std::runtime_error("ttg_parsec::ttg_initialize: can only be called once");
1049 
1050  // make sure it's not already initialized
1051  int mpi_initialized;
1052  MPI_Initialized(&mpi_initialized);
1053  if (!mpi_initialized) { // MPI not initialized? do it, remember that we did it
1054  int provided;
1055  MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided);
1056  if (!provided)
1057  throw std::runtime_error("ttg_parsec::ttg_initialize: MPI_Init_thread did not provide MPI_THREAD_MULTIPLE");
1058  detail::initialized_mpi() = true;
1059  } else { // no way to test that MPI was initialized with MPI_THREAD_MULTIPLE, cross fingers and proceed
1060  }
1061 
1063  auto world_ptr = new ttg_parsec::WorldImpl{&argc, &argv, num_threads, ctx};
1064  std::shared_ptr<ttg::base::WorldImplBase> world_sptr{static_cast<ttg::base::WorldImplBase *>(world_ptr)};
1065  ttg::World world{std::move(world_sptr)};
1066  ttg::detail::set_default_world(std::move(world));
1067 
1068  // query the first device ID
1070  for (int i = 0; i < parsec_nb_devices; ++i) {
1071  bool is_gpu = parsec_mca_device_is_gpu(i);
1072  if (detail::first_device_id == -1 && is_gpu) {
1074  } else if (detail::first_device_id > -1 && !is_gpu) {
1075  throw std::runtime_error("PaRSEC: Found non-GPU device in GPU ID range!");
1076  }
1077  }
1078 
1079  /* parse the maximum inline size */
1080  const char* ttg_max_inline_cstr = std::getenv("TTG_MAX_INLINE");
1081  if (nullptr != ttg_max_inline_cstr) {
1082  std::size_t inline_size = std::atol(ttg_max_inline_cstr);
1083  if (inline_size < detail::max_inline_size) {
1084  detail::max_inline_size = inline_size;
1085  }
1086  }
1087 
1088  bool all_peer_access = true;
1089  /* check whether all GPUs can access all peer GPUs */
1090  for (int i = 0; (i < parsec_nb_devices) && all_peer_access; ++i) {
1091  parsec_device_module_t *device = parsec_mca_device_get(i);
1092  if (PARSEC_DEV_IS_GPU(device->type)) {
1093  parsec_device_gpu_module_t *gpu_device = (parsec_device_gpu_module_t*)device;
1094  for (int j = 0; (j < parsec_nb_devices) && all_peer_access; ++j) {
1095  if (PARSEC_DEV_IS_GPU(device->type)) {
1096  all_peer_access = all_peer_access && (gpu_device->peer_access_mask & (1<<j));
1097  }
1098  }
1099  }
1100  }
1101  detail::all_devices_peer_access = all_peer_access;
1102  }
1103  inline void ttg_finalize() {
1104  // We need to notify the current taskpool of termination if we are in user termination detection mode
1105  // or the parsec_context_wait() in destroy_worlds() will never complete
1107  ttg::default_execution_context().impl().final_task();
1108  ttg::detail::set_default_world(ttg::World{}); // reset the default world
1110  ttg::detail::destroy_worlds<ttg_parsec::WorldImpl>();
1111  if (detail::initialized_mpi()) MPI_Finalize();
1112  }
1114  [[noreturn]]
1115  inline void ttg_abort() { MPI_Abort(ttg_default_execution_context().impl().comm(), 1); std::abort(); }
1116  inline void ttg_execute(ttg::World world) { world.impl().execute(); }
1117  inline void ttg_fence(ttg::World world) { world.impl().fence(); }
1118 
1119  template <typename T>
1120  inline void ttg_register_ptr(ttg::World world, const std::shared_ptr<T> &ptr) {
1121  world.impl().register_ptr(ptr);
1122  }
1123 
1124  template <typename T>
1125  inline void ttg_register_ptr(ttg::World world, std::unique_ptr<T> &&ptr) {
1126  world.impl().register_ptr(std::move(ptr));
1127  }
1128 
1129  inline void ttg_register_status(ttg::World world, const std::shared_ptr<std::promise<void>> &status_ptr) {
1130  world.impl().register_status(status_ptr);
1131  }
1132 
1133  template <typename Callback>
1134  inline void ttg_register_callback(ttg::World world, Callback &&callback) {
1135  world.impl().register_callback(std::forward<Callback>(callback));
1136  }
1137 
1138  inline ttg::Edge<> &ttg_ctl_edge(ttg::World world) { return world.impl().ctl_edge(); }
1139 
1140  inline void ttg_sum(ttg::World world, double &value) {
1141  double result = 0.0;
1142  MPI_Allreduce(&value, &result, 1, MPI_DOUBLE, MPI_SUM, world.impl().comm());
1143  value = result;
1144  }
1145 
1146  inline void make_executable_hook(ttg::World& world) {
1147  MPI_Barrier(world.impl().comm());
1148  }
1149 
1152  template <typename T>
1153  void ttg_broadcast(::ttg::World world, T &data, int source_rank) {
1154  int64_t BUFLEN;
1155  if (world.rank() == source_rank) {
1157  }
1158  MPI_Bcast(&BUFLEN, 1, MPI_INT64_T, source_rank, world.impl().comm());
1159 
1160  unsigned char *buf = new unsigned char[BUFLEN];
1161  if (world.rank() == source_rank) {
1163  }
1164  MPI_Bcast(buf, BUFLEN, MPI_UNSIGNED_CHAR, source_rank, world.impl().comm());
1165  if (world.rank() != source_rank) {
1167  }
1168  delete[] buf;
1169  }
1170 
1171  namespace detail {
1172 
1173  struct ParsecTTBase {
1174  protected:
1175  // static std::map<int, ParsecBaseTT*> function_id_to_instance;
1176  parsec_hash_table_t tasks_table;
1177  parsec_hash_table_t task_constraint_table;
1178  parsec_task_class_t self;
1179  };
1180 
1181  } // namespace detail
1182 
1183  template <typename keyT, typename output_terminalsT, typename derivedT, typename input_valueTs>
1185  private:
1187  static_assert(ttg::meta::is_typelist_v<input_valueTs>,
1188  "The fourth template for ttg::TT must be a ttg::typelist containing the input types");
1189  // create a virtual control input if the input list is empty, to be used in invoke()
1190  using actual_input_tuple_type = std::conditional_t<!ttg::meta::typelist_is_empty_v<input_valueTs>,
1192  using input_tuple_type = ttg::meta::typelist_to_tuple_t<input_valueTs>;
1193  static_assert(ttg::meta::is_tuple_v<output_terminalsT>,
1194  "Second template argument for ttg::TT must be std::tuple containing the output terminal types");
1195  static_assert((ttg::meta::none_has_reference_v<input_valueTs>), "Input typelist cannot contain reference types");
1196  static_assert(ttg::meta::is_none_Void_v<input_valueTs>, "ttg::Void is for internal use only, do not use it");
1197 
1198  parsec_mempool_t mempools;
1199 
1200  // check for a non-type member named have_cuda_op
1201  template <typename T>
1202  using have_cuda_op_non_type_t = decltype(T::have_cuda_op);
1203 
1204  template <typename T>
1205  using have_hip_op_non_type_t = decltype(T::have_hip_op);
1206 
1207  template <typename T>
1208  using have_level_zero_op_non_type_t = decltype(T::have_level_zero_op);
1209 
1210  bool alive = true;
1211 
1212  static constexpr int numinedges = std::tuple_size_v<input_tuple_type>; // number of input edges
1213  static constexpr int numins = std::tuple_size_v<actual_input_tuple_type>; // number of input arguments
1214  static constexpr int numouts = std::tuple_size_v<output_terminalsT>; // number of outputs
1215  static constexpr int numflows = std::max(numins, numouts); // max number of flows
1216 
1217  public:
1219  static constexpr bool derived_has_cuda_op() {
1220  if constexpr (ttg::meta::is_detected_v<have_cuda_op_non_type_t, derivedT>) {
1221  return derivedT::have_cuda_op;
1222  } else {
1223  return false;
1224  }
1225  }
1226 
1228  static constexpr bool derived_has_hip_op() {
1229  if constexpr (ttg::meta::is_detected_v<have_hip_op_non_type_t, derivedT>) {
1230  return derivedT::have_hip_op;
1231  } else {
1232  return false;
1233  }
1234  }
1235 
1237  static constexpr bool derived_has_level_zero_op() {
1238  if constexpr (ttg::meta::is_detected_v<have_level_zero_op_non_type_t, derivedT>) {
1239  return derivedT::have_level_zero_op;
1240  } else {
1241  return false;
1242  }
1243  }
1244 
1246  static constexpr bool derived_has_device_op() {
1248  }
1249 
1250  using ttT = TT;
1251  using key_type = keyT;
1253  using input_args_type = actual_input_tuple_type;
1255  // if have data inputs and (always last) control input, convert last input to Void to make logic easier
1257  ttg::meta::void_to_Void_tuple_t<ttg::meta::decayed_typelist_t<actual_input_tuple_type>>;
1259  ttg::meta::add_glvalue_reference_tuple_t<ttg::meta::void_to_Void_tuple_t<actual_input_tuple_type>>;
1260  using input_values_tuple_type = ttg::meta::drop_void_t<ttg::meta::decayed_typelist_t<input_tuple_type>>;
1261  using input_refs_tuple_type = ttg::meta::drop_void_t<ttg::meta::add_glvalue_reference_tuple_t<input_tuple_type>>;
1262 
1263  static constexpr int numinvals =
1264  std::tuple_size_v<input_refs_tuple_type>; // number of input arguments with values (i.e. omitting the control
1265  // input, if any)
1266 
1267  using output_terminals_type = output_terminalsT;
1269 
1270  template <std::size_t i, typename resultT, typename InTuple>
1271  static resultT get(InTuple &&intuple) {
1272  return static_cast<resultT>(std::get<i>(std::forward<InTuple>(intuple)));
1273  };
1274  template <std::size_t i, typename InTuple>
1275  static auto &get(InTuple &&intuple) {
1276  return std::get<i>(std::forward<InTuple>(intuple));
1277  };
1278 
1279  private:
1280  using task_t = detail::parsec_ttg_task_t<ttT>;
1281 
1283  friend task_t;
1284 
1285  /* the offset of the key placed after the task structure in the memory from mempool */
1286  constexpr static const size_t task_key_offset = sizeof(task_t);
1287 
1288  input_terminals_type input_terminals;
1289  output_terminalsT output_terminals;
1290 
1291  protected:
1292  const auto &get_output_terminals() const { return output_terminals; }
1293 
1294  private:
1295  template <std::size_t... IS>
1296  static constexpr auto make_set_args_fcts(std::index_sequence<IS...>) {
1297  using resultT = decltype(set_arg_from_msg_fcts);
1298  return resultT{{&TT::set_arg_from_msg<IS>...}};
1299  }
1300  constexpr static std::array<void (TT::*)(void *, std::size_t), numins> set_arg_from_msg_fcts =
1301  make_set_args_fcts(std::make_index_sequence<numins>{});
1302 
1303  template <std::size_t... IS>
1304  static constexpr auto make_set_size_fcts(std::index_sequence<IS...>) {
1305  using resultT = decltype(set_argstream_size_from_msg_fcts);
1306  return resultT{{&TT::argstream_set_size_from_msg<IS>...}};
1307  }
1308  constexpr static std::array<void (TT::*)(void *, std::size_t), numins> set_argstream_size_from_msg_fcts =
1309  make_set_size_fcts(std::make_index_sequence<numins>{});
1310 
1311  template <std::size_t... IS>
1312  static constexpr auto make_finalize_argstream_fcts(std::index_sequence<IS...>) {
1313  using resultT = decltype(finalize_argstream_from_msg_fcts);
1314  return resultT{{&TT::finalize_argstream_from_msg<IS>...}};
1315  }
1316  constexpr static std::array<void (TT::*)(void *, std::size_t), numins> finalize_argstream_from_msg_fcts =
1317  make_finalize_argstream_fcts(std::make_index_sequence<numins>{});
1318 
1319  template <std::size_t... IS>
1320  static constexpr auto make_get_from_pull_fcts(std::index_sequence<IS...>) {
1321  using resultT = decltype(get_from_pull_msg_fcts);
1322  return resultT{{&TT::get_from_pull_msg<IS>...}};
1323  }
1324  constexpr static std::array<void (TT::*)(void *, std::size_t), numinedges> get_from_pull_msg_fcts =
1325  make_get_from_pull_fcts(std::make_index_sequence<numinedges>{});
1326 
1327  template<std::size_t... IS>
1328  constexpr static auto make_input_is_const(std::index_sequence<IS...>) {
1329  using resultT = decltype(input_is_const);
1330  return resultT{{std::is_const_v<std::tuple_element_t<IS, input_args_type>>...}};
1331  }
1332  constexpr static std::array<bool, numins> input_is_const = make_input_is_const(std::make_index_sequence<numins>{});
1333 
1334  ttg::World world;
1335  ttg::meta::detail::keymap_t<keyT> keymap;
1336  ttg::meta::detail::keymap_t<keyT> priomap;
1337  ttg::meta::detail::keymap_t<keyT, ttg::device::Device> devicemap;
1338  // For now use same type for unary/streaming input terminals, and stream reducers assigned at runtime
1339  ttg::meta::detail::input_reducers_t<actual_input_tuple_type>
1340  input_reducers;
1341  std::array<parsec_task_class_t*, numins> inpute_reducers_taskclass = { nullptr };
1342  std::array<std::size_t, numins> static_stream_goal = { std::numeric_limits<std::size_t>::max() };
1343  int num_pullins = 0;
1344 
1345  bool m_defer_writer = TTG_PARSEC_DEFER_WRITER;
1346 
1347  std::vector<ttg::meta::detail::constraint_callback_t<keyT>> constraints_check;
1348  std::vector<ttg::meta::detail::constraint_callback_t<keyT>> constraints_complete;
1349 
1350  public:
1351  ttg::World get_world() const override final { return world; }
1352 
1353  private:
1357  template <ttg::ExecutionSpace Space, typename... Args>
1358  auto op(Args &&...args) {
1359  derivedT *derived = static_cast<derivedT *>(this);
1360  //if constexpr (Space == ttg::ExecutionSpace::Host) {
1361  using return_type = decltype(derived->op(std::forward<Args>(args)...));
1362  if constexpr (std::is_same_v<return_type,void>) {
1363  derived->op(std::forward<Args>(args)...);
1364  return;
1365  }
1366  else {
1367  return derived->op(std::forward<Args>(args)...);
1368  }
1369  }
1370 
1371  template <std::size_t i, typename terminalT, typename Key>
1372  void invoke_pull_terminal(terminalT &in, const Key &key, detail::parsec_ttg_task_base_t *task) {
1373  if (in.is_pull_terminal) {
1374  auto owner = in.container.owner(key);
1375  if (owner != world.rank()) {
1376  get_pull_terminal_data_from<i>(owner, key);
1377  } else {
1378  // push the data to the task
1379  set_arg<i>(key, (in.container).get(key));
1380  }
1381  }
1382  }
1383 
1384  template <std::size_t i, typename Key>
1385  void get_pull_terminal_data_from(const int owner,
1386  const Key &key) {
1387  using msg_t = detail::msg_t;
1388  auto &world_impl = world.impl();
1389  parsec_taskpool_t *tp = world_impl.taskpool();
1390  std::unique_ptr<msg_t> msg = std::make_unique<msg_t>(get_instance_id(), tp->taskpool_id,
1392  world.rank(), 1);
1393  /* pack the key */
1394  size_t pos = 0;
1395  pos = pack(key, msg->bytes, pos);
1396  tp->tdm.module->outgoing_message_start(tp, owner, NULL);
1397  tp->tdm.module->outgoing_message_pack(tp, owner, NULL, NULL, 0);
1398  parsec_ce.send_am(&parsec_ce, world_impl.parsec_ttg_tag(), owner, static_cast<void *>(msg.get()),
1399  sizeof(msg_header_t) + pos);
1400  }
1401 
1402  template <std::size_t... IS, typename Key = keyT>
1403  void invoke_pull_terminals(std::index_sequence<IS...>, const Key &key, detail::parsec_ttg_task_base_t *task) {
1404  int junk[] = {0, (invoke_pull_terminal<IS>(
1405  std::get<IS>(input_terminals), key, task),
1406  0)...};
1407  junk[0]++;
1408  }
1409 
1410  template <std::size_t... IS>
1411  static input_refs_tuple_type make_tuple_of_ref_from_array(task_t *task, std::index_sequence<IS...>) {
1412  return input_refs_tuple_type{static_cast<std::tuple_element_t<IS, input_refs_tuple_type>>(
1413  *reinterpret_cast<std::remove_reference_t<std::tuple_element_t<IS, input_refs_tuple_type>> *>(
1414  task->copies[IS]->get_ptr()))...};
1415  }
1416 
1417 #ifdef TTG_HAVE_DEVICE
1421  template <ttg::ExecutionSpace Space>
1422  static int device_static_submit(parsec_device_gpu_module_t *gpu_device,
1423  parsec_gpu_task_t *gpu_task,
1424  parsec_gpu_exec_stream_t *gpu_stream) {
1425 
1426  task_t *task = (task_t*)gpu_task->ec;
1427  // get the device task from the coroutine handle
1428  ttg::device::Task dev_task = ttg::device::detail::device_task_handle_type::from_address(task->suspended_task_address);
1429 
1430  task->dev_ptr->stream = gpu_stream;
1431 
1432  //std::cout << "device_static_submit task " << task << std::endl;
1433 
1434  // get the promise which contains the views
1435  auto dev_data = dev_task.promise();
1436 
1437  /* we should still be waiting for the transfer to complete */
1438  assert(dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_WAIT_TRANSFER ||
1439  dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_WAIT_KERNEL);
1440 
1441 #if defined(PARSEC_HAVE_DEV_CUDA_SUPPORT) && defined(TTG_HAVE_CUDA)
1442  {
1443  parsec_cuda_exec_stream_t *cuda_stream = (parsec_cuda_exec_stream_t *)gpu_stream;
1444  int device = detail::parsec_device_to_ttg_device(gpu_device->super.device_index);
1445  ttg::device::detail::set_current(device, cuda_stream->cuda_stream);
1446  }
1447 #endif // defined(PARSEC_HAVE_DEV_CUDA_SUPPORT) && defined(TTG_HAVE_CUDA)
1448 
1449 #if defined(PARSEC_HAVE_DEV_HIP_SUPPORT) && defined(TTG_HAVE_HIP)
1450  {
1451  parsec_hip_exec_stream_t *hip_stream = (parsec_hip_exec_stream_t *)gpu_stream;
1452  int device = detail::parsec_device_to_ttg_device(gpu_device->super.device_index);
1453  ttg::device::detail::set_current(device, hip_stream->hip_stream);
1454  }
1455 #endif // defined(PARSEC_HAVE_DEV_CUDA_SUPPORT) && defined(TTG_HAVE_CUDA)
1456 
1457 #if defined(PARSEC_HAVE_DEV_LEVEL_ZERO_SUPPORT) && defined(TTG_HAVE_LEVEL_ZERO)
1458  {
1459  parsec_level_zero_exec_stream_t *stream;
1460  stream = (parsec_level_zero_exec_stream_t *)gpu_stream;
1461  int device = detail::parsec_device_to_ttg_device(gpu_device->super.device_index);
1462  ttg::device::detail::set_current(device, stream->swq->queue);
1463  }
1464 #endif // defined(PARSEC_HAVE_DEV_CUDA_SUPPORT) && defined(TTG_HAVE_CUDA)
1465 
1466  /* Here we call back into the coroutine again after the transfers have completed */
1467  static_op<Space>(&task->parsec_task);
1468 
1470 
1471  /* we will come back into this function once the kernel and transfers are done */
1472  int rc = PARSEC_HOOK_RETURN_DONE;
1473  if (nullptr != task->suspended_task_address) {
1474  /* Get a new handle for the promise*/
1475  dev_task = ttg::device::detail::device_task_handle_type::from_address(task->suspended_task_address);
1476  dev_data = dev_task.promise();
1477 
1478  assert(dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_WAIT_KERNEL ||
1479  dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_SENDOUT ||
1480  dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_COMPLETE);
1481 
1482  if (ttg::device::detail::TTG_DEVICE_CORO_SENDOUT == dev_data.state() ||
1483  ttg::device::detail::TTG_DEVICE_CORO_COMPLETE == dev_data.state()) {
1484  /* the task started sending so we won't come back here */
1485  //std::cout << "device_static_submit task " << task << " complete" << std::endl;
1486  } else {
1487  //std::cout << "device_static_submit task " << task << " return-again" << std::endl;
1488  rc = PARSEC_HOOK_RETURN_AGAIN;
1489  }
1490  } else {
1491  /* the task is done so we won't come back here */
1492  //std::cout << "device_static_submit task " << task << " complete" << std::endl;
1493  }
1494  return rc;
1495  }
1496 
1497  template <ttg::ExecutionSpace Space>
1498  static parsec_hook_return_t device_static_evaluate(parsec_task_t* parsec_task) {
1499 
1500  task_t *task = (task_t*)parsec_task;
1501  if (task->dev_ptr->gpu_task == nullptr) {
1502 
1503  /* set up a device task */
1504  parsec_gpu_task_t *gpu_task;
1505  /* PaRSEC wants to free the gpu_task, because F***K ownerships */
1506  gpu_task = static_cast<parsec_gpu_task_t*>(std::calloc(1, sizeof(*gpu_task)));
1507  PARSEC_OBJ_CONSTRUCT(gpu_task, parsec_list_item_t);
1508  gpu_task->ec = parsec_task;
1509  gpu_task->task_type = 0; // user task
1510  gpu_task->last_data_check_epoch = 0; // used internally
1511  gpu_task->pushout = 0;
1512  gpu_task->submit = &TT::device_static_submit<Space>;
1513 
1514  // one way to force the task device
1515  // currently this will probably break all of PaRSEC if this hint
1516  // does not match where the data is located, not really useful for us
1517  // instead we set a hint on the data if there is no hint set yet
1518  //parsec_task->selected_device = ...;
1519 
1520  /* set the gpu_task so it's available in register_device_memory */
1521  task->dev_ptr->gpu_task = gpu_task;
1522 
1523  /* TODO: is this the right place to set the mask? */
1524  task->parsec_task.chore_mask = PARSEC_DEV_ALL;
1525 
1526  /* copy over the task class, because that's what we need */
1527  task->dev_ptr->task_class = *task->parsec_task.task_class;
1528 
1529  // first invocation of the coroutine to get the coroutine handle
1530  static_op<Space>(parsec_task);
1531 
1532  /* when we come back here, the flows in gpu_task are set (see register_device_memory) */
1533 
1534  parsec_task_class_t& tc = task->dev_ptr->task_class;
1535 
1536  // input flows are set up during register_device_memory as part of the first invocation above
1537  for (int i = 0; i < MAX_PARAM_COUNT; ++i) {
1538  tc.in[i] = gpu_task->flow[i];
1539  tc.out[i] = gpu_task->flow[i];
1540  }
1541  tc.nb_flows = MAX_PARAM_COUNT;
1542 
1543  /* set the device hint on the data */
1544  TT *tt = task->tt;
1545  if (tt->devicemap) {
1546  int parsec_dev;
1547  if constexpr (std::is_void_v<keyT>) {
1548  parsec_dev = detail::ttg_device_to_parsec_device(tt->devicemap());
1549  } else {
1550  parsec_dev = detail::ttg_device_to_parsec_device(tt->devicemap(task->key));
1551  }
1552  for (int i = 0; i < MAX_PARAM_COUNT; ++i) {
1553  /* only set on mutable data since we have exclusive access */
1554  if (tc.in[i]->flow_flags & PARSEC_FLOW_ACCESS_WRITE) {
1555  parsec_data_t *data = parsec_task->data[i].data_in->original;
1556  /* only set the preferred device if the host has the latest copy
1557  * as otherwise we may end up with the wrong data if there is a newer
1558  * version on a different device. Also, keep fingers crossed. */
1559  if (data->owner_device == 0) {
1560  parsec_advise_data_on_device(data, parsec_dev, PARSEC_DEV_DATA_ADVICE_PREFERRED_DEVICE);
1561  }
1562  }
1563  }
1564  }
1565 
1566  /* set the new task class that contains the flows */
1567  task->parsec_task.task_class = &task->dev_ptr->task_class;
1568 
1569  /* select this one */
1570  return PARSEC_HOOK_RETURN_DONE;
1571  }
1572 
1573  std::cerr << "EVALUATE called on task with assigned GPU task!" << std::endl;
1574 
1575  /* not sure if this might happen*/
1576  return PARSEC_HOOK_RETURN_ERROR;
1577 
1578  }
1579 
1580  template <ttg::ExecutionSpace Space>
1581  static parsec_hook_return_t device_static_op(parsec_task_t* parsec_task) {
1582  static_assert(derived_has_device_op());
1583 
1584  /* when we come in here we have a device assigned and are ready to go */
1585 
1586  task_t *task = (task_t*)parsec_task;
1587 
1588  if (nullptr == task->suspended_task_address) {
1589  /* short-cut in case the task returned immediately */
1590  return PARSEC_HOOK_RETURN_DONE;
1591  }
1592 
1593  // get the device task from the coroutine handle
1594  auto dev_task = ttg::device::detail::device_task_handle_type::from_address(task->suspended_task_address);
1595 
1596  // get the promise which contains the views
1597  ttg::device::detail::device_task_promise_type& dev_data = dev_task.promise();
1598 
1599  if (dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_SENDOUT ||
1600  dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_COMPLETE) {
1601  /* the task jumped directly to send or returned so we're done */
1602  return PARSEC_HOOK_RETURN_DONE;
1603  }
1604 
1605  parsec_device_gpu_module_t *device = (parsec_device_gpu_module_t*)task->parsec_task.selected_device;
1606  assert(NULL != device);
1607 
1608  task->dev_ptr->device = device;
1609  parsec_gpu_task_t *gpu_task = task->dev_ptr->gpu_task;
1610  parsec_execution_stream_s *es = task->tt->world.impl().execution_stream();
1611 
1612  switch(device->super.type) {
1613 
1614 #if defined(PARSEC_HAVE_DEV_CUDA_SUPPORT)
1615  case PARSEC_DEV_CUDA:
1616  if constexpr (Space == ttg::ExecutionSpace::CUDA) {
1617  /* TODO: we need custom staging functions because PaRSEC looks at the
1618  * task-class to determine the number of flows. */
1619  gpu_task->stage_in = parsec_default_gpu_stage_in;
1620  gpu_task->stage_out = parsec_default_gpu_stage_out;
1621  return parsec_device_kernel_scheduler(&device->super, es, gpu_task);
1622  }
1623  break;
1624 #endif
1625 #if defined(PARSEC_HAVE_DEV_HIP_SUPPORT)
1626  case PARSEC_DEV_HIP:
1627  if constexpr (Space == ttg::ExecutionSpace::HIP) {
1628  gpu_task->stage_in = parsec_default_gpu_stage_in;
1629  gpu_task->stage_out = parsec_default_gpu_stage_out;
1630  return parsec_device_kernel_scheduler(&device->super, es, gpu_task);
1631  }
1632  break;
1633 #endif // PARSEC_HAVE_DEV_HIP_SUPPORT
1634 #if defined(PARSEC_HAVE_DEV_LEVEL_ZERO_SUPPORT)
1635  case PARSEC_DEV_LEVEL_ZERO:
1636  if constexpr (Space == ttg::ExecutionSpace::L0) {
1637  gpu_task->stage_in = parsec_default_gpu_stage_in;
1638  gpu_task->stage_out = parsec_default_gpu_stage_out;
1639  return parsec_device_kernel_scheduler(&device->super, es, gpu_task);
1640  }
1641  break;
1642 #endif // PARSEC_HAVE_DEV_LEVEL_ZERO_SUPPORT
1643  default:
1644  break;
1645  }
1646  ttg::print_error(task->tt->get_name(), " : received mismatching device type ", (int)device->super.type, " from PaRSEC");
1647  ttg::abort();
1648  return PARSEC_HOOK_RETURN_DONE; // will not be reacehed
1649  }
1650 #endif // TTG_HAVE_DEVICE
1651 
1652  template <ttg::ExecutionSpace Space>
1653  static parsec_hook_return_t static_op(parsec_task_t *parsec_task) {
1654 
1655  task_t *task = (task_t*)parsec_task;
1656  void* suspended_task_address =
1657 #ifdef TTG_HAVE_COROUTINE
1658  task->suspended_task_address; // non-null = need to resume the task
1659 #else // TTG_HAVE_COROUTINE
1660  nullptr;
1661 #endif // TTG_HAVE_COROUTINE
1662  //std::cout << "static_op: suspended_task_address " << suspended_task_address << std::endl;
1663  if (suspended_task_address == nullptr) { // task is a coroutine that has not started or an ordinary function
1664 
1665  ttT *baseobj = task->tt;
1666  derivedT *obj = static_cast<derivedT *>(baseobj);
1667  assert(detail::parsec_ttg_caller == nullptr);
1668  detail::parsec_ttg_caller = static_cast<detail::parsec_ttg_task_base_t*>(task);
1669  if (obj->tracing()) {
1670  if constexpr (!ttg::meta::is_void_v<keyT>)
1671  ttg::trace(obj->get_world().rank(), ":", obj->get_name(), " : ", task->key, ": executing");
1672  else
1673  ttg::trace(obj->get_world().rank(), ":", obj->get_name(), " : executing");
1674  }
1675 
1676  if constexpr (!ttg::meta::is_void_v<keyT> && !ttg::meta::is_empty_tuple_v<input_values_tuple_type>) {
1677  auto input = make_tuple_of_ref_from_array(task, std::make_index_sequence<numinvals>{});
1678  TTG_PROCESS_TT_OP_RETURN(suspended_task_address, task->coroutine_id, baseobj->template op<Space>(task->key, std::move(input), obj->output_terminals));
1679  } else if constexpr (!ttg::meta::is_void_v<keyT> && ttg::meta::is_empty_tuple_v<input_values_tuple_type>) {
1680  TTG_PROCESS_TT_OP_RETURN(suspended_task_address, task->coroutine_id, baseobj->template op<Space>(task->key, obj->output_terminals));
1681  } else if constexpr (ttg::meta::is_void_v<keyT> && !ttg::meta::is_empty_tuple_v<input_values_tuple_type>) {
1682  auto input = make_tuple_of_ref_from_array(task, std::make_index_sequence<numinvals>{});
1683  TTG_PROCESS_TT_OP_RETURN(suspended_task_address, task->coroutine_id, baseobj->template op<Space>(std::move(input), obj->output_terminals));
1684  } else if constexpr (ttg::meta::is_void_v<keyT> && ttg::meta::is_empty_tuple_v<input_values_tuple_type>) {
1685  TTG_PROCESS_TT_OP_RETURN(suspended_task_address, task->coroutine_id, baseobj->template op<Space>(obj->output_terminals));
1686  } else {
1687  ttg::abort();
1688  }
1689  detail::parsec_ttg_caller = nullptr;
1690  }
1691  else { // resume the suspended coroutine
1692 
1693 #ifdef TTG_HAVE_COROUTINE
1694  assert(task->coroutine_id != ttg::TaskCoroutineID::Invalid);
1695 
1696 #ifdef TTG_HAVE_DEVICE
1697  if (task->coroutine_id == ttg::TaskCoroutineID::DeviceTask) {
1698  ttg::device::Task coro = ttg::device::detail::device_task_handle_type::from_address(suspended_task_address);
1699  assert(detail::parsec_ttg_caller == nullptr);
1700  detail::parsec_ttg_caller = static_cast<detail::parsec_ttg_task_base_t*>(task);
1701  // TODO: unify the outputs tls handling
1702  auto old_output_tls_ptr = task->tt->outputs_tls_ptr_accessor();
1703  task->tt->set_outputs_tls_ptr();
1704  coro.resume();
1705  if (coro.completed()) {
1706  coro.destroy();
1707  suspended_task_address = nullptr;
1708  }
1709  task->tt->set_outputs_tls_ptr(old_output_tls_ptr);
1710  detail::parsec_ttg_caller = nullptr;
1711  } else
1712 #endif // TTG_HAVE_DEVICE
1713  if (task->coroutine_id == ttg::TaskCoroutineID::ResumableTask) {
1714  auto ret = static_cast<ttg::resumable_task>(ttg::coroutine_handle<ttg::resumable_task_state>::from_address(suspended_task_address));
1715  assert(ret.ready());
1716  auto old_output_tls_ptr = task->tt->outputs_tls_ptr_accessor();
1717  task->tt->set_outputs_tls_ptr();
1718  ret.resume();
1719  if (ret.completed()) {
1720  ret.destroy();
1721  suspended_task_address = nullptr;
1722  }
1723  else { // not yet completed
1724  // leave suspended_task_address as is
1725 
1726  // right now can events are not properly implemented, we are only testing the workflow with dummy events
1727  // so mark the events finished manually, parsec will rerun this task again and it should complete the second time
1728  auto events = static_cast<ttg::resumable_task>(ttg::coroutine_handle<ttg::resumable_task_state>::from_address(suspended_task_address)).events();
1729  for (auto &event_ptr : events) {
1730  event_ptr->finish();
1731  }
1732  assert(ttg::coroutine_handle<ttg::resumable_task_state>::from_address(suspended_task_address).promise().ready());
1733  }
1734  task->tt->set_outputs_tls_ptr(old_output_tls_ptr);
1735  detail::parsec_ttg_caller = nullptr;
1736  task->suspended_task_address = suspended_task_address;
1737  }
1738  else
1739  ttg::abort(); // unrecognized task id
1740 #else // TTG_HAVE_COROUTINE
1741  ttg::abort(); // should not happen
1742 #endif // TTG_HAVE_COROUTINE
1743  }
1744 #ifdef TTG_HAVE_COROUTINE
1745  task->suspended_task_address = suspended_task_address;
1746 #endif // TTG_HAVE_COROUTINE
1747  if (suspended_task_address == nullptr) {
1748  ttT *baseobj = task->tt;
1749  derivedT *obj = static_cast<derivedT *>(baseobj);
1750  if (obj->tracing()) {
1751  if constexpr (!ttg::meta::is_void_v<keyT>)
1752  ttg::trace(obj->get_world().rank(), ":", obj->get_name(), " : ", task->key, ": done executing");
1753  else
1754  ttg::trace(obj->get_world().rank(), ":", obj->get_name(), " : done executing");
1755  }
1756  }
1757 
1758  return PARSEC_HOOK_RETURN_DONE;
1759  }
1760 
1761  template <ttg::ExecutionSpace Space>
1762  static parsec_hook_return_t static_op_noarg(parsec_task_t *parsec_task) {
1763  task_t *task = static_cast<task_t*>(parsec_task);
1764 
1765  void* suspended_task_address =
1766 #ifdef TTG_HAVE_COROUTINE
1767  task->suspended_task_address; // non-null = need to resume the task
1768 #else // TTG_HAVE_COROUTINE
1769  nullptr;
1770 #endif // TTG_HAVE_COROUTINE
1771  if (suspended_task_address == nullptr) { // task is a coroutine that has not started or an ordinary function
1772  ttT *baseobj = (ttT *)task->object_ptr;
1773  derivedT *obj = (derivedT *)task->object_ptr;
1774  assert(detail::parsec_ttg_caller == NULL);
1776  if constexpr (!ttg::meta::is_void_v<keyT>) {
1777  TTG_PROCESS_TT_OP_RETURN(suspended_task_address, task->coroutine_id, baseobj->template op<Space>(task->key, obj->output_terminals));
1778  } else if constexpr (ttg::meta::is_void_v<keyT>) {
1779  TTG_PROCESS_TT_OP_RETURN(suspended_task_address, task->coroutine_id, baseobj->template op<Space>(obj->output_terminals));
1780  } else // unreachable
1781  ttg:: abort();
1783  }
1784  else {
1785 #ifdef TTG_HAVE_COROUTINE
1786  auto ret = static_cast<ttg::resumable_task>(ttg::coroutine_handle<ttg::resumable_task_state>::from_address(suspended_task_address));
1787  assert(ret.ready());
1788  ret.resume();
1789  if (ret.completed()) {
1790  ret.destroy();
1791  suspended_task_address = nullptr;
1792  }
1793  else { // not yet completed
1794  // leave suspended_task_address as is
1795  }
1796 #else // TTG_HAVE_COROUTINE
1797  ttg::abort(); // should not happen
1798 #endif // TTG_HAVE_COROUTINE
1799  }
1800  task->suspended_task_address = suspended_task_address;
1801 
1802  if (suspended_task_address) {
1803  ttg::abort(); // not yet implemented
1804  // see comments in static_op()
1805  return PARSEC_HOOK_RETURN_AGAIN;
1806  }
1807  else
1808  return PARSEC_HOOK_RETURN_DONE;
1809  }
1810 
1811  template <std::size_t i>
1812  static parsec_hook_return_t static_reducer_op(parsec_execution_stream_s *es, parsec_task_t *parsec_task) {
1813  using rtask_t = detail::reducer_task_t;
1814  using value_t = std::tuple_element_t<i, actual_input_tuple_type>;
1815  constexpr const bool val_is_void = ttg::meta::is_void_v<value_t>;
1816  constexpr const bool input_is_const = std::is_const_v<value_t>;
1817  rtask_t *rtask = (rtask_t*)parsec_task;
1818  task_t *parent_task = static_cast<task_t*>(rtask->parent_task);
1819  ttT *baseobj = parent_task->tt;
1820  derivedT *obj = static_cast<derivedT *>(baseobj);
1821 
1822  auto& reducer = std::get<i>(baseobj->input_reducers);
1823 
1824  //std::cout << "static_reducer_op " << parent_task->key << std::endl;
1825 
1826  if (obj->tracing()) {
1827  if constexpr (!ttg::meta::is_void_v<keyT>)
1828  ttg::trace(obj->get_world().rank(), ":", obj->get_name(), " : ", parent_task->key, ": reducer executing");
1829  else
1830  ttg::trace(obj->get_world().rank(), ":", obj->get_name(), " : reducer executing");
1831  }
1832 
1833  /* the copy to reduce into */
1834  detail::ttg_data_copy_t *target_copy;
1835  target_copy = parent_task->copies[i];
1836  assert(val_is_void || nullptr != target_copy);
1837  /* once we hit 0 we have to stop since another thread might enqueue a new reduction task */
1838  std::size_t c = 0;
1839  std::size_t size = 0;
1840  assert(parent_task->streams[i].reduce_count > 0);
1841  if (rtask->is_first) {
1842  if (0 == (parent_task->streams[i].reduce_count.fetch_sub(1, std::memory_order_acq_rel)-1)) {
1843  /* we were the first and there is nothing to be done */
1844  if (obj->tracing()) {
1845  if constexpr (!ttg::meta::is_void_v<keyT>)
1846  ttg::trace(obj->get_world().rank(), ":", obj->get_name(), " : ", parent_task->key, ": first reducer empty");
1847  else
1848  ttg::trace(obj->get_world().rank(), ":", obj->get_name(), " : first reducer empty");
1849  }
1850 
1851  return PARSEC_HOOK_RETURN_DONE;
1852  }
1853  }
1854 
1855  assert(detail::parsec_ttg_caller == NULL);
1856  detail::parsec_ttg_caller = rtask->parent_task;
1857 
1858  do {
1859  if constexpr(!val_is_void) {
1860  /* the copies to reduce out of */
1861  detail::ttg_data_copy_t *source_copy;
1862  parsec_list_item_t *item;
1863  item = parsec_lifo_pop(&parent_task->streams[i].reduce_copies);
1864  if (nullptr == item) {
1865  // maybe someone is changing the goal right now
1866  break;
1867  }
1868  source_copy = ((detail::ttg_data_copy_self_t *)(item))->self;
1869  assert(target_copy->num_readers() == target_copy->mutable_tag);
1870  assert(source_copy->num_readers() > 0);
1871  reducer(*reinterpret_cast<std::decay_t<value_t> *>(target_copy->get_ptr()),
1872  *reinterpret_cast<std::decay_t<value_t> *>(source_copy->get_ptr()));
1873  detail::release_data_copy(source_copy);
1874  } else if constexpr(val_is_void) {
1875  reducer(); // invoke control reducer
1876  }
1877  // there is only one task working on this stream, so no need to be atomic here
1878  size = ++parent_task->streams[i].size;
1879  //std::cout << "static_reducer_op size " << size << " of " << parent_task->streams[i].goal << std::endl;
1880  } while ((c = (parent_task->streams[i].reduce_count.fetch_sub(1, std::memory_order_acq_rel)-1)) > 0);
1881  //} while ((c = (--task->streams[i].reduce_count)) > 0);
1882 
1883  /* finalize_argstream sets goal to 1, so size may be larger than goal */
1884  bool complete = (size >= parent_task->streams[i].goal);
1885 
1886  //std::cout << "static_reducer_op size " << size
1887  // << " of " << parent_task->streams[i].goal << " complete " << complete
1888  // << " c " << c << std::endl;
1889  if (complete && c == 0) {
1890  if constexpr(input_is_const) {
1891  /* make the consumer task a reader if its input is const */
1892  target_copy->reset_readers();
1893  }
1894  /* task may not be runnable yet because other inputs are missing, have release_task decide */
1895  parent_task->remove_from_hash = true;
1896  parent_task->release_task(parent_task);
1897  }
1898 
1900 
1901  if (obj->tracing()) {
1902  if constexpr (!ttg::meta::is_void_v<keyT>)
1903  ttg::trace(obj->get_world().rank(), ":", obj->get_name(), " : ", parent_task->key, ": done executing");
1904  else
1905  ttg::trace(obj->get_world().rank(), ":", obj->get_name(), " : done executing");
1906  }
1907 
1908  return PARSEC_HOOK_RETURN_DONE;
1909  }
1910 
1911 
1912  protected:
1913  template <typename T>
1914  uint64_t unpack(T &obj, void *_bytes, uint64_t pos) {
1916  uint64_t payload_size;
1917  if constexpr (!dd_t::serialize_size_is_const) {
1918  pos = ttg::default_data_descriptor<uint64_t>::unpack_payload(&payload_size, sizeof(uint64_t), pos, _bytes);
1919  } else {
1920  payload_size = dd_t::payload_size(&obj);
1921  }
1922  pos = dd_t::unpack_payload(&obj, payload_size, pos, _bytes);
1923  return pos;
1924  }
1925 
1926  template <typename T>
1927  uint64_t pack(T &obj, void *bytes, uint64_t pos, detail::ttg_data_copy_t *copy = nullptr) {
1929  uint64_t payload_size = dd_t::payload_size(&obj);
1930  if (copy) {
1931  /* reset any tracked data, we don't care about the packing from the payload size */
1932  copy->iovec_reset();
1933  }
1934 
1935  if constexpr (!dd_t::serialize_size_is_const) {
1936  pos = ttg::default_data_descriptor<uint64_t>::pack_payload(&payload_size, sizeof(uint64_t), pos, bytes);
1937  }
1938  pos = dd_t::pack_payload(&obj, payload_size, pos, bytes);
1939  return pos;
1940  }
1941 
1942  static void static_set_arg(void *data, std::size_t size, ttg::TTBase *bop) {
1943  assert(size >= sizeof(msg_header_t) &&
1944  "Trying to unpack as message that does not hold enough bytes to represent a single header");
1945  msg_header_t *hd = static_cast<msg_header_t *>(data);
1946  derivedT *obj = reinterpret_cast<derivedT *>(bop);
1947  switch (hd->fn_id) {
1949  if (0 <= hd->param_id) {
1950  assert(hd->param_id >= 0);
1951  assert(hd->param_id < obj->set_arg_from_msg_fcts.size());
1952  auto member = obj->set_arg_from_msg_fcts[hd->param_id];
1953  (obj->*member)(data, size);
1954  } else {
1955  // there is no good reason to have negative param ids
1956  ttg::abort();
1957  }
1958  break;
1959  }
1961  assert(hd->param_id >= 0);
1962  assert(hd->param_id < obj->set_argstream_size_from_msg_fcts.size());
1963  auto member = obj->set_argstream_size_from_msg_fcts[hd->param_id];
1964  (obj->*member)(data, size);
1965  break;
1966  }
1968  assert(hd->param_id >= 0);
1969  assert(hd->param_id < obj->finalize_argstream_from_msg_fcts.size());
1970  auto member = obj->finalize_argstream_from_msg_fcts[hd->param_id];
1971  (obj->*member)(data, size);
1972  break;
1973  }
1975  assert(hd->param_id >= 0);
1976  assert(hd->param_id < obj->get_from_pull_msg_fcts.size());
1977  auto member = obj->get_from_pull_msg_fcts[hd->param_id];
1978  (obj->*member)(data, size);
1979  break;
1980  }
1981  default:
1982  ttg::abort();
1983  }
1984  }
1985 
1987  inline parsec_thread_mempool_t *get_task_mempool(void) {
1988  auto &world_impl = world.impl();
1989  parsec_execution_stream_s *es = world_impl.execution_stream();
1990  int index = (es->virtual_process->vp_id * es->virtual_process->nb_cores + es->th_id);
1991  return &mempools.thread_mempools[index];
1992  }
1993 
1994  template <size_t i, typename valueT>
1995  void set_arg_from_msg_keylist(ttg::span<keyT> &&keylist, detail::ttg_data_copy_t *copy) {
1996  /* create a dummy task that holds the copy, which can be reused by others */
1997  task_t *dummy;
1998  parsec_execution_stream_s *es = world.impl().execution_stream();
1999  parsec_thread_mempool_t *mempool = get_task_mempool();
2000  dummy = new (parsec_thread_mempool_allocate(mempool)) task_t(mempool, &this->self, this);
2001  dummy->set_dummy(true);
2002  // TODO: do we need to copy static_stream_goal in dummy?
2003 
2004  /* set the received value as the dummy's only data */
2005  dummy->copies[0] = copy;
2006 
2007  /* We received the task on this world, so it's using the same taskpool */
2008  dummy->parsec_task.taskpool = world.impl().taskpool();
2009 
2010  /* save the current task and set the dummy task */
2011  auto parsec_ttg_caller_save = detail::parsec_ttg_caller;
2012  detail::parsec_ttg_caller = dummy;
2013 
2014  /* iterate over the keys and have them use the copy we made */
2015  parsec_task_t *task_ring = nullptr;
2016  for (auto &&key : keylist) {
2017  // copy-constructible? can broadcast to any number of keys
2018  if constexpr (std::is_copy_constructible_v<valueT>) {
2019  set_arg_local_impl<i>(key, *reinterpret_cast<valueT *>(copy->get_ptr()), copy, &task_ring);
2020  }
2021  else {
2022  // not copy-constructible? can move, but only to single key
2023  static_assert(!std::is_reference_v<valueT>);
2024  if (std::size(keylist) == 1)
2025  set_arg_local_impl<i>(key, std::move(*reinterpret_cast<valueT *>(copy->get_ptr())), copy, &task_ring);
2026  else {
2027  throw std::logic_error(std::string("TTG::PaRSEC: need to copy a datum of type") + boost::typeindex::type_id<std::decay_t<valueT>>().pretty_name() + " but the type is not copyable");
2028  }
2029  }
2030  }
2031 
2032  if (nullptr != task_ring) {
2033  auto &world_impl = world.impl();
2034  parsec_task_t *vp_task_ring[1] = { task_ring };
2035  __parsec_schedule_vp(world_impl.execution_stream(), vp_task_ring, 0);
2036  }
2037 
2038  /* restore the previous task */
2039  detail::parsec_ttg_caller = parsec_ttg_caller_save;
2040 
2041  /* release the dummy task */
2042  complete_task_and_release(es, &dummy->parsec_task);
2043  parsec_thread_mempool_free(mempool, &dummy->parsec_task);
2044  }
2045 
2046  // there are 6 types of set_arg:
2047  // - case 1: nonvoid Key, complete Value type
2048  // - case 2: nonvoid Key, void Value, mixed (data+control) inputs
2049  // - case 3: nonvoid Key, void Value, no inputs
2050  // - case 4: void Key, complete Value type
2051  // - case 5: void Key, void Value, mixed (data+control) inputs
2052  // - case 6: void Key, void Value, no inputs
2053  // implementation of these will be further split into "local-only" and global+local
2054 
2055  template <std::size_t i>
2056  void set_arg_from_msg(void *data, std::size_t size) {
2057  using valueT = std::tuple_element_t<i, actual_input_tuple_type>;
2058  using msg_t = detail::msg_t;
2059  msg_t *msg = static_cast<msg_t *>(data);
2060  if constexpr (!ttg::meta::is_void_v<keyT>) {
2061  /* unpack the keys */
2062  /* TODO: can we avoid copying all the keys?! */
2063  uint64_t pos = msg->tt_id.key_offset;
2064  uint64_t key_end_pos;
2065  std::vector<keyT> keylist;
2066  int num_keys = msg->tt_id.num_keys;
2067  keylist.reserve(num_keys);
2068  auto rank = world.rank();
2069  for (int k = 0; k < num_keys; ++k) {
2070  keyT key;
2071  pos = unpack(key, msg->bytes, pos);
2072  assert(keymap(key) == rank);
2073  keylist.push_back(std::move(key));
2074  }
2075  key_end_pos = pos;
2076  /* jump back to the beginning of the message to get the value */
2077  pos = 0;
2078  // case 1
2079  if constexpr (!ttg::meta::is_void_v<valueT>) {
2080  using decvalueT = std::decay_t<valueT>;
2081  int32_t num_iovecs = msg->tt_id.num_iovecs;
2082  //bool inline_data = msg->inline_data;
2086  using metadata_t = decltype(descr.get_metadata(std::declval<decvalueT>()));
2087 
2088  /* unpack the metadata */
2089  metadata_t metadata;
2090  pos = unpack(metadata, msg->bytes, pos);
2091 
2092  //std::cout << "set_arg_from_msg splitmd num_iovecs " << num_iovecs << std::endl;
2093 
2094  copy = detail::create_new_datacopy(descr.create_from_metadata(metadata));
2095  } else if constexpr (!ttg::has_split_metadata<decvalueT>::value) {
2096  copy = detail::create_new_datacopy(decvalueT{});
2097 #if 0
2098  // TODO: first attempt at sending directly to the device
2099  parsec_gpu_data_copy_t* gpu_elem;
2100  gpu_elem = PARSEC_DATA_GET_COPY(master, gpu_device->super.device_index);
2101  int i = detail::first_device_id;
2102  int devid = detail::first_device_id;
2103  while (i < parsec_nb_devices) {
2104  if (nullptr == gpu_elem) {
2105  gpu_elem = PARSEC_OBJ_NEW(parsec_data_copy_t);
2106  gpu_elem->flags = PARSEC_DATA_FLAG_PARSEC_OWNED | PARSEC_DATA_FLAG_PARSEC_MANAGED;
2107  gpu_elem->coherency_state = PARSEC_DATA_COHERENCY_INVALID;
2108  gpu_elem->version = 0;
2109  gpu_elem->coherency_state = PARSEC_DATA_COHERENCY_OWNED;
2110  }
2111  if (nullptr == gpu_elem->device_private) {
2112  gpu_elem->device_private = zone_malloc(gpu_device->memory, gpu_task->flow_nb_elts[i]);
2113  if (nullptr == gpu_elem->device_private) {
2114  devid++;
2115  continue;
2116  }
2117  break;
2118  }
2119  }
2120 #endif // 0
2121  /* unpack the object, potentially discovering iovecs */
2122  pos = unpack(*static_cast<decvalueT *>(copy->get_ptr()), msg->bytes, pos);
2123  //std::cout << "set_arg_from_msg iovec_begin num_iovecs " << num_iovecs << " distance " << std::distance(copy->iovec_begin(), copy->iovec_end()) << std::endl;
2124  assert(std::distance(copy->iovec_begin(), copy->iovec_end()) == num_iovecs);
2125  }
2126 
2127  if (num_iovecs == 0) {
2128  set_arg_from_msg_keylist<i, decvalueT>(ttg::span<keyT>(&keylist[0], num_keys), copy);
2129  } else {
2130  /* unpack the header and start the RMA transfers */
2131 
2132  /* get the remote rank */
2133  int remote = msg->tt_id.sender;
2134  assert(remote < world.size());
2135 
2136  auto &val = *static_cast<decvalueT *>(copy->get_ptr());
2137 
2138  bool inline_data = msg->tt_id.inline_data;
2139 
2140  int nv = 0;
2141  /* start the RMA transfers */
2142  auto handle_iovecs_fn =
2143  [&](auto&& iovecs) {
2144 
2145  if (inline_data) {
2146  /* unpack the data from the message */
2147  for (auto &&iov : iovecs) {
2148  ++nv;
2149  std::memcpy(iov.data, msg->bytes + pos, iov.num_bytes);
2150  pos += iov.num_bytes;
2151  }
2152  } else {
2153  /* extract the callback tag */
2154  parsec_ce_tag_t cbtag;
2155  std::memcpy(&cbtag, msg->bytes + pos, sizeof(cbtag));
2156  pos += sizeof(cbtag);
2157 
2158  /* create the value from the metadata */
2159  auto activation = new detail::rma_delayed_activate(
2160  std::move(keylist), copy, num_iovecs, [this](std::vector<keyT> &&keylist, detail::ttg_data_copy_t *copy) {
2161  set_arg_from_msg_keylist<i, decvalueT>(keylist, copy);
2162  this->world.impl().decrement_inflight_msg();
2163  });
2164 
2165  using ActivationT = std::decay_t<decltype(*activation)>;
2166 
2167  for (auto &&iov : iovecs) {
2168  ++nv;
2169  parsec_ce_mem_reg_handle_t rreg;
2170  int32_t rreg_size_i;
2171  std::memcpy(&rreg_size_i, msg->bytes + pos, sizeof(rreg_size_i));
2172  pos += sizeof(rreg_size_i);
2173  rreg = static_cast<parsec_ce_mem_reg_handle_t>(msg->bytes + pos);
2174  pos += rreg_size_i;
2175  // std::intptr_t *fn_ptr = reinterpret_cast<std::intptr_t *>(msg->bytes + pos);
2176  // pos += sizeof(*fn_ptr);
2177  std::intptr_t fn_ptr;
2178  std::memcpy(&fn_ptr, msg->bytes + pos, sizeof(fn_ptr));
2179  pos += sizeof(fn_ptr);
2180 
2181  /* register the local memory */
2182  parsec_ce_mem_reg_handle_t lreg;
2183  size_t lreg_size;
2184  parsec_ce.mem_register(iov.data, PARSEC_MEM_TYPE_NONCONTIGUOUS, iov.num_bytes, parsec_datatype_int8_t,
2185  iov.num_bytes, &lreg, &lreg_size);
2186  world.impl().increment_inflight_msg();
2187  /* TODO: PaRSEC should treat the remote callback as a tag, not a function pointer! */
2188  //std::cout << "set_arg_from_msg: get rreg " << rreg << " remote " << remote << std::endl;
2189  parsec_ce.get(&parsec_ce, lreg, 0, rreg, 0, iov.num_bytes, remote,
2190  &detail::get_complete_cb<ActivationT>, activation,
2191  /*world.impl().parsec_ttg_rma_tag()*/
2192  cbtag, &fn_ptr, sizeof(std::intptr_t));
2193  }
2194  }
2195  };
2198  handle_iovecs_fn(descr.get_data(val));
2199  } else if constexpr (!ttg::has_split_metadata<decvalueT>::value) {
2200  handle_iovecs_fn(copy->iovec_span());
2201  copy->iovec_reset();
2202  }
2203 
2204  assert(num_iovecs == nv);
2205  assert(size == (key_end_pos + sizeof(msg_header_t)));
2206 
2207  if (inline_data) {
2208  set_arg_from_msg_keylist<i, decvalueT>(ttg::span<keyT>(&keylist[0], num_keys), copy);
2209  }
2210  }
2211  // case 2 and 3
2212  } else if constexpr (!ttg::meta::is_void_v<keyT> && std::is_void_v<valueT>) {
2213  for (auto &&key : keylist) {
2214  set_arg<i, keyT, ttg::Void>(key, ttg::Void{});
2215  }
2216  }
2217  // case 4
2218  } else if constexpr (ttg::meta::is_void_v<keyT> && !std::is_void_v<valueT>) {
2219  using decvalueT = std::decay_t<valueT>;
2220  decvalueT val;
2221  /* TODO: handle split-metadata case as with non-void keys */
2222  unpack(val, msg->bytes, 0);
2223  set_arg<i, keyT, valueT>(std::move(val));
2224  // case 5 and 6
2225  } else if constexpr (ttg::meta::is_void_v<keyT> && std::is_void_v<valueT>) {
2226  set_arg<i, keyT, ttg::Void>(ttg::Void{});
2227  } else { // unreachable
2228  ttg::abort();
2229  }
2230  }
2231 
2232  template <std::size_t i>
2233  void finalize_argstream_from_msg(void *data, std::size_t size) {
2234  using msg_t = detail::msg_t;
2235  msg_t *msg = static_cast<msg_t *>(data);
2236  if constexpr (!ttg::meta::is_void_v<keyT>) {
2237  /* unpack the key */
2238  uint64_t pos = 0;
2239  auto rank = world.rank();
2240  keyT key;
2241  pos = unpack(key, msg->bytes, pos);
2242  assert(keymap(key) == rank);
2243  finalize_argstream<i>(key);
2244  } else {
2245  auto rank = world.rank();
2246  assert(keymap() == rank);
2247  finalize_argstream<i>();
2248  }
2249  }
2250 
2251  template <std::size_t i>
2252  void argstream_set_size_from_msg(void *data, std::size_t size) {
2253  using msg_t = detail::msg_t;
2254  auto msg = static_cast<msg_t *>(data);
2255  uint64_t pos = 0;
2256  if constexpr (!ttg::meta::is_void_v<keyT>) {
2257  /* unpack the key */
2258  auto rank = world.rank();
2259  keyT key;
2260  pos = unpack(key, msg->bytes, pos);
2261  assert(keymap(key) == rank);
2262  std::size_t argstream_size;
2263  pos = unpack(argstream_size, msg->bytes, pos);
2264  set_argstream_size<i>(key, argstream_size);
2265  } else {
2266  auto rank = world.rank();
2267  assert(keymap() == rank);
2268  std::size_t argstream_size;
2269  pos = unpack(argstream_size, msg->bytes, pos);
2270  set_argstream_size<i>(argstream_size);
2271  }
2272  }
2273 
2274  template <std::size_t i>
2275  void get_from_pull_msg(void *data, std::size_t size) {
2276  using msg_t = detail::msg_t;
2277  msg_t *msg = static_cast<msg_t *>(data);
2278  auto &in = std::get<i>(input_terminals);
2279  if constexpr (!ttg::meta::is_void_v<keyT>) {
2280  /* unpack the key */
2281  uint64_t pos = 0;
2282  keyT key;
2283  pos = unpack(key, msg->bytes, pos);
2284  set_arg<i>(key, (in.container).get(key));
2285  }
2286  }
2287 
2288  template <std::size_t i, typename Key, typename Value>
2289  std::enable_if_t<!ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>, void> set_arg_local(
2290  const Key &key, Value &&value) {
2291  set_arg_local_impl<i>(key, std::forward<Value>(value));
2292  }
2293 
2294  template <std::size_t i, typename Key = keyT, typename Value>
2295  std::enable_if_t<ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>, void> set_arg_local(
2296  Value &&value) {
2297  set_arg_local_impl<i>(ttg::Void{}, std::forward<Value>(value));
2298  }
2299 
2300  template <std::size_t i, typename Key, typename Value>
2301  std::enable_if_t<!ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>, void> set_arg_local(
2302  const Key &key, const Value &value) {
2303  set_arg_local_impl<i>(key, value);
2304  }
2305 
2306  template <std::size_t i, typename Key = keyT, typename Value>
2307  std::enable_if_t<ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>, void> set_arg_local(
2308  const Value &value) {
2309  set_arg_local_impl<i>(ttg::Void{}, value);
2310  }
2311 
2312  template <std::size_t i, typename Key = keyT, typename Value>
2313  std::enable_if_t<ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>, void> set_arg_local(
2314  std::shared_ptr<const Value> &valueptr) {
2315  set_arg_local_impl<i>(ttg::Void{}, *valueptr);
2316  }
2317 
2318  template <typename Key>
2319  task_t *create_new_task(const Key &key) {
2320  constexpr const bool keyT_is_Void = ttg::meta::is_void_v<keyT>;
2321  auto &world_impl = world.impl();
2322  task_t *newtask;
2323  parsec_thread_mempool_t *mempool = get_task_mempool();
2324  char *taskobj = (char *)parsec_thread_mempool_allocate(mempool);
2325  int32_t priority = 0;
2326  if constexpr (!keyT_is_Void) {
2327  priority = priomap(key);
2328  /* placement-new the task */
2329  newtask = new (taskobj) task_t(key, mempool, &this->self, world_impl.taskpool(), this, priority);
2330  } else {
2331  priority = priomap();
2332  /* placement-new the task */
2333  newtask = new (taskobj) task_t(mempool, &this->self, world_impl.taskpool(), this, priority);
2334  }
2335 
2336  for (int i = 0; i < static_stream_goal.size(); ++i) {
2337  newtask->streams[i].goal = static_stream_goal[i];
2338  }
2339 
2340  ttg::trace(world.rank(), ":", get_name(), " : ", key, ": creating task");
2341  return newtask;
2342  }
2343 
2344 
2345  template <std::size_t i>
2347  /* make sure we can reuse the existing memory pool and don't have to create a new one */
2348  static_assert(sizeof(task_t) >= sizeof(detail::reducer_task_t));
2349  constexpr const bool keyT_is_Void = ttg::meta::is_void_v<keyT>;
2350  auto &world_impl = world.impl();
2351  detail::reducer_task_t *newtask;
2352  parsec_thread_mempool_t *mempool = get_task_mempool();
2353  char *taskobj = (char *)parsec_thread_mempool_allocate(mempool);
2354  // use the priority of the task we stream into
2355  int32_t priority = 0;
2356  if constexpr (!keyT_is_Void) {
2357  priority = priomap(task->key);
2358  ttg::trace(world.rank(), ":", get_name(), " : ", task->key, ": creating reducer task");
2359  } else {
2360  priority = priomap();
2361  ttg::trace(world.rank(), ":", get_name(), ": creating reducer task");
2362  }
2363  /* placement-new the task */
2364  newtask = new (taskobj) detail::reducer_task_t(task, mempool, inpute_reducers_taskclass[i],
2365  world_impl.taskpool(), priority, is_first);
2366 
2367  return newtask;
2368  }
2369 
2370 
2371  // Used to set the i'th argument
2372  template <std::size_t i, typename Key, typename Value>
2373  void set_arg_local_impl(const Key &key, Value &&value, detail::ttg_data_copy_t *copy_in = nullptr,
2374  parsec_task_t **task_ring = nullptr) {
2375  using valueT = std::tuple_element_t<i, input_values_full_tuple_type>;
2376  constexpr const bool input_is_const = std::is_const_v<std::tuple_element_t<i, input_args_type>>;
2377  constexpr const bool valueT_is_Void = ttg::meta::is_void_v<valueT>;
2378  constexpr const bool keyT_is_Void = ttg::meta::is_void_v<Key>;
2379 
2380 
2381  ttg::trace(world.rank(), ":", get_name(), " : ", key, ": received value for argument : ", i);
2382 
2383  parsec_key_t hk = 0;
2384  if constexpr (!keyT_is_Void) {
2385  hk = reinterpret_cast<parsec_key_t>(&key);
2386  assert(keymap(key) == world.rank());
2387  }
2388 
2389  task_t *task;
2390  auto &world_impl = world.impl();
2391  auto &reducer = std::get<i>(input_reducers);
2392  bool release = false;
2393  bool remove_from_hash = true;
2394 #if defined(PARSEC_PROF_GRAPHER)
2395  bool discover_task = true;
2396 #endif
2397  bool get_pull_data = false;
2398  bool has_lock = false;
2399  /* If we have only one input and no reducer on that input we can skip the hash table */
2400  if (numins > 1 || reducer) {
2401  has_lock = true;
2402  parsec_hash_table_lock_bucket(&tasks_table, hk);
2403  if (nullptr == (task = (task_t *)parsec_hash_table_nolock_find(&tasks_table, hk))) {
2404  task = create_new_task(key);
2405  world_impl.increment_created();
2406  parsec_hash_table_nolock_insert(&tasks_table, &task->tt_ht_item);
2407  get_pull_data = !is_lazy_pull();
2408  if( world_impl.dag_profiling() ) {
2409 #if defined(PARSEC_PROF_GRAPHER)
2410  parsec_prof_grapher_task(&task->parsec_task, world_impl.execution_stream()->th_id, 0,
2411  key_hash(make_key(task->parsec_task.taskpool, task->parsec_task.locals), nullptr));
2412 #endif
2413  }
2414  } else if (!reducer && numins == (task->in_data_count + 1)) {
2415  /* remove while we have the lock */
2416  parsec_hash_table_nolock_remove(&tasks_table, hk);
2417  remove_from_hash = false;
2418  }
2419  /* if we have a reducer, we need to hold on to the lock for just a little longer */
2420  if (!reducer) {
2421  parsec_hash_table_unlock_bucket(&tasks_table, hk);
2422  has_lock = false;
2423  }
2424  } else {
2425  task = create_new_task(key);
2426  world_impl.increment_created();
2427  remove_from_hash = false;
2428  if( world_impl.dag_profiling() ) {
2429 #if defined(PARSEC_PROF_GRAPHER)
2430  parsec_prof_grapher_task(&task->parsec_task, world_impl.execution_stream()->th_id, 0,
2431  key_hash(make_key(task->parsec_task.taskpool, task->parsec_task.locals), nullptr));
2432 #endif
2433  }
2434  }
2435 
2436  if( world_impl.dag_profiling() ) {
2437 #if defined(PARSEC_PROF_GRAPHER)
2438  if(NULL != detail::parsec_ttg_caller && !detail::parsec_ttg_caller->is_dummy()) {
2440  char orig_str[32];
2441  char dest_str[32];
2442  if(orig_index >= 0) {
2443  snprintf(orig_str, 32, "%d", orig_index);
2444  } else {
2445  strncpy(orig_str, "_", 32);
2446  }
2447  snprintf(dest_str, 32, "%lu", i);
2448  parsec_flow_t orig{ .name = orig_str, .sym_type = PARSEC_SYM_INOUT, .flow_flags = PARSEC_FLOW_ACCESS_RW,
2449  .flow_index = 0, .flow_datatype_mask = ~0 };
2450  parsec_flow_t dest{ .name = dest_str, .sym_type = PARSEC_SYM_INOUT, .flow_flags = PARSEC_FLOW_ACCESS_RW,
2451  .flow_index = 0, .flow_datatype_mask = ~0 };
2452  parsec_prof_grapher_dep(&detail::parsec_ttg_caller->parsec_task, &task->parsec_task, discover_task ? 1 : 0, &orig, &dest);
2453  }
2454 #endif
2455  }
2456 
2457  auto get_copy_fn = [&](detail::parsec_ttg_task_base_t *task, auto&& value, bool is_const){
2458  detail::ttg_data_copy_t *copy = copy_in;
2459  if (nullptr == copy && nullptr != detail::parsec_ttg_caller) {
2461  }
2462  if (nullptr != copy) {
2463  /* retain the data copy */
2464  copy = detail::register_data_copy<valueT>(copy, task, is_const);
2465  } else {
2466  /* create a new copy */
2467  copy = detail::create_new_datacopy(std::forward<Value>(value));
2468  if (!is_const) {
2469  copy->mark_mutable();
2470  }
2471  }
2472  return copy;
2473  };
2474 
2475  if (reducer && 1 != task->streams[i].goal) { // is this a streaming input? reduce the received value
2476  auto submit_reducer_task = [&](auto *parent_task){
2477  /* check if we need to create a task */
2478  std::size_t c = parent_task->streams[i].reduce_count.fetch_add(1, std::memory_order_acquire);
2479  //std::cout << "submit_reducer_task " << key << " c " << c << std::endl;
2480  if (0 == c) {
2481  /* we are responsible for creating the reduction task */
2482  detail::reducer_task_t *reduce_task;
2483  reduce_task = create_new_reducer_task<i>(parent_task, false);
2484  reduce_task->release_task(reduce_task); // release immediately
2485  }
2486  };
2487 
2488  if constexpr (!ttg::meta::is_void_v<valueT>) { // for data values
2489  // have a value already? if not, set, otherwise reduce
2490  detail::ttg_data_copy_t *copy = nullptr;
2491  if (nullptr == (copy = task->copies[i])) {
2492  using decay_valueT = std::decay_t<valueT>;
2493 
2494  /* first input value, create a task and bind it to the copy */
2495  //std::cout << "Creating new reducer task for " << key << std::endl;
2496  detail::reducer_task_t *reduce_task;
2497  reduce_task = create_new_reducer_task<i>(task, true);
2498 
2499  /* protected by the bucket lock */
2500  task->streams[i].size = 1;
2501  task->streams[i].reduce_count.store(1, std::memory_order_relaxed);
2502 
2503  /* get the copy to use as input for this task */
2504  detail::ttg_data_copy_t *copy = get_copy_fn(reduce_task, std::forward<Value>(value), false);
2505 
2506  /* put the copy into the task */
2507  task->copies[i] = copy;
2508 
2509  /* release the task if we're not deferred
2510  * TODO: can we delay that until we get the second value?
2511  */
2512  if (copy->get_next_task() != &reduce_task->parsec_task) {
2513  reduce_task->release_task(reduce_task);
2514  }
2515 
2516  /* now we can unlock the bucket */
2517  parsec_hash_table_unlock_bucket(&tasks_table, hk);
2518  } else {
2519  /* unlock the bucket, the lock is not needed anymore */
2520  parsec_hash_table_unlock_bucket(&tasks_table, hk);
2521 
2522  /* get the copy to use as input for this task */
2523  detail::ttg_data_copy_t *copy = get_copy_fn(task, std::forward<Value>(value), true);
2524 
2525  /* enqueue the data copy to be reduced */
2526  parsec_lifo_push(&task->streams[i].reduce_copies, &copy->super);
2527  submit_reducer_task(task);
2528  }
2529  } else {
2530  /* unlock the bucket, the lock is not needed anymore */
2531  parsec_hash_table_unlock_bucket(&tasks_table, hk);
2532  /* submit reducer for void values to handle side effects */
2533  submit_reducer_task(task);
2534  }
2535  //if (release) {
2536  // parsec_hash_table_nolock_remove(&tasks_table, hk);
2537  // remove_from_hash = false;
2538  //}
2539  //parsec_hash_table_unlock_bucket(&tasks_table, hk);
2540  } else {
2541  /* unlock the bucket, the lock is not needed anymore */
2542  if (has_lock) {
2543  parsec_hash_table_unlock_bucket(&tasks_table, hk);
2544  }
2545  /* whether the task needs to be deferred or not */
2546  if constexpr (!valueT_is_Void) {
2547  if (nullptr != task->copies[i]) {
2548  ttg::print_error(get_name(), " : ", key, ": error argument is already set : ", i);
2549  throw std::logic_error("bad set arg");
2550  }
2551 
2552  /* get the copy to use as input for this task */
2553  detail::ttg_data_copy_t *copy = get_copy_fn(task, std::forward<Value>(value), input_is_const);
2554 
2555  /* if we registered as a writer and were the first to register with this copy
2556  * we need to defer the release of this task to give other tasks a chance to
2557  * make a copy of the original data */
2558  release = (copy->get_next_task() != &task->parsec_task);
2559  task->copies[i] = copy;
2560  } else {
2561  release = true;
2562  }
2563  }
2564  task->remove_from_hash = remove_from_hash;
2565  if (release) {
2566  release_task(task, task_ring);
2567  }
2568  /* if not pulling lazily, pull the data here */
2569  if constexpr (!ttg::meta::is_void_v<keyT>) {
2570  if (get_pull_data) {
2571  invoke_pull_terminals(std::make_index_sequence<std::tuple_size_v<input_values_tuple_type>>{}, task->key, task);
2572  }
2573  }
2574  }
2575 
2577  bool constrained = false;
2578  if (constraints_check.size() > 0) {
2579  if constexpr (ttg::meta::is_void_v<keyT>) {
2580  constrained = !constraints_check[0]();
2581  } else {
2582  constrained = !constraints_check[0](task->key);
2583  }
2584  }
2585  if (constrained) {
2586  // store the task so we can later access it once it is released
2587  parsec_hash_table_insert(&task_constraint_table, &task->tt_ht_item);
2588  }
2589  return !constrained;
2590  }
2591 
2592  template<typename Key = keyT>
2593  std::enable_if_t<ttg::meta::is_void_v<Key>, void> release_constraint(std::size_t cid) {
2594  // check the next constraint, if any
2595  assert(cid < constraints_check.size());
2596  bool release = true;
2597  for (std::size_t i = cid+1; i < constraints_check.size(); i++) {
2598  if (!constraints_check[i]()) {
2599  release = false;
2600  break;
2601  }
2602  }
2603  if (release) {
2604  // no constraint blocked us
2605  task_t *task;
2606  parsec_key_t hk = 0;
2607  task = (task_t*)parsec_hash_table_remove(&task_constraint_table, hk);
2608  assert(task != nullptr);
2609  auto &world_impl = world.impl();
2610  parsec_execution_stream_t *es = world_impl.execution_stream();
2611  parsec_task_t *vp_task_rings[1] = { &task->parsec_task };
2612  __parsec_schedule_vp(es, vp_task_rings, 0);
2613  }
2614  }
2615 
2616  template<typename Key = keyT>
2617  std::enable_if_t<!ttg::meta::is_void_v<Key>, void> release_constraint(std::size_t cid, const std::span<Key>& keys) {
2618  assert(cid < constraints_check.size());
2619  parsec_task_t *task_ring = nullptr;
2620  for (auto& key : keys) {
2621  task_t *task;
2622  bool release = true;
2623  for (std::size_t i = cid+1; i < constraints_check.size(); i++) {
2624  if (!constraints_check[i](key)) {
2625  release = false;
2626  break;
2627  }
2628  }
2629 
2630  if (release) {
2631  // no constraint blocked this task, so go ahead and release
2632  auto hk = reinterpret_cast<parsec_key_t>(&key);
2633  task = (task_t*)parsec_hash_table_remove(&task_constraint_table, hk);
2634  assert(task != nullptr);
2635  if (task_ring == nullptr) {
2636  /* the first task is set directly */
2637  task_ring = &task->parsec_task;
2638  } else {
2639  /* push into the ring */
2640  parsec_list_item_ring_push_sorted(&task_ring->super, &task->parsec_task.super,
2641  offsetof(parsec_task_t, priority));
2642  }
2643  }
2644  }
2645  if (nullptr != task_ring) {
2646  auto &world_impl = world.impl();
2647  parsec_execution_stream_t *es = world_impl.execution_stream();
2648  parsec_task_t *vp_task_rings[1] = { task_ring };
2649  __parsec_schedule_vp(es, vp_task_rings, 0);
2650  }
2651  }
2652 
2653  void release_task(task_t *task,
2654  parsec_task_t **task_ring = nullptr) {
2655  constexpr const bool keyT_is_Void = ttg::meta::is_void_v<keyT>;
2656 
2657  /* if remove_from_hash == false, someone has already removed the task from the hash table
2658  * so we know that the task is ready, no need to do atomic increments here */
2659  bool is_ready = !task->remove_from_hash;
2660  int32_t count;
2661  if (is_ready) {
2662  count = numins;
2663  } else {
2664  count = parsec_atomic_fetch_inc_int32(&task->in_data_count) + 1;
2665  assert(count <= self.dependencies_goal);
2666  }
2667 
2668  auto &world_impl = world.impl();
2669  ttT *baseobj = task->tt;
2670 
2671  if (count == numins) {
2672  parsec_execution_stream_t *es = world_impl.execution_stream();
2673  parsec_key_t hk = task->pkey();
2674  if (tracing()) {
2675  if constexpr (!keyT_is_Void) {
2676  ttg::trace(world.rank(), ":", get_name(), " : ", task->key, ": submitting task for op ");
2677  } else {
2678  ttg::trace(world.rank(), ":", get_name(), ": submitting task for op ");
2679  }
2680  }
2681  if (task->remove_from_hash) parsec_hash_table_remove(&tasks_table, hk);
2682 
2683  if (check_constraints(task)) {
2684  if (nullptr == task_ring) {
2685  parsec_task_t *vp_task_rings[1] = { &task->parsec_task };
2686  __parsec_schedule_vp(es, vp_task_rings, 0);
2687  } else if (*task_ring == nullptr) {
2688  /* the first task is set directly */
2689  *task_ring = &task->parsec_task;
2690  } else {
2691  /* push into the ring */
2692  parsec_list_item_ring_push_sorted(&(*task_ring)->super, &task->parsec_task.super,
2693  offsetof(parsec_task_t, priority));
2694  }
2695  }
2696  } else if constexpr (!ttg::meta::is_void_v<keyT>) {
2697  if ((baseobj->num_pullins + count == numins) && baseobj->is_lazy_pull()) {
2698  /* lazily pull the pull terminal data */
2699  baseobj->invoke_pull_terminals(std::make_index_sequence<std::tuple_size_v<input_values_tuple_type>>{}, task->key, task);
2700  }
2701  }
2702  }
2703 
2704  // cases 1+2
2705  template <std::size_t i, typename Key, typename Value>
2706  std::enable_if_t<!ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>, void> set_arg(const Key &key,
2707  Value &&value) {
2708  set_arg_impl<i>(key, std::forward<Value>(value));
2709  }
2710 
2711  // cases 4+5+6
2712  template <std::size_t i, typename Key, typename Value>
2713  std::enable_if_t<ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>, void> set_arg(Value &&value) {
2714  set_arg_impl<i>(ttg::Void{}, std::forward<Value>(value));
2715  }
2716 
2717  template <std::size_t i, typename Key = keyT>
2718  std::enable_if_t<ttg::meta::is_void_v<Key>, void> set_arg() {
2719  set_arg_impl<i>(ttg::Void{}, ttg::Void{});
2720  }
2721 
2722  // case 3
2723  template <std::size_t i, typename Key>
2724  std::enable_if_t<!ttg::meta::is_void_v<Key>, void> set_arg(const Key &key) {
2725  set_arg_impl<i>(key, ttg::Void{});
2726  }
2727 
2728  template<typename Value, typename Key>
2729  bool can_inline_data(Value* value_ptr, detail::ttg_data_copy_t *copy, const Key& key, std::size_t num_keys) {
2730  using decvalueT = std::decay_t<Value>;
2731  bool inline_data = false;
2732  /* check whether to send data in inline */
2733  std::size_t iov_size = 0;
2734  std::size_t metadata_size = 0;
2735  if constexpr (ttg::has_split_metadata<std::decay_t<Value>>::value) {
2737  auto iovs = descr.get_data(*const_cast<decvalueT *>(value_ptr));
2738  iov_size = std::accumulate(iovs.begin(), iovs.end(), 0,
2739  [](std::size_t s, auto& iov){ return s + iov.num_bytes; });
2740  auto metadata = descr.get_metadata(*const_cast<decvalueT *>(value_ptr));
2741  metadata_size = ttg::default_data_descriptor<decltype(metadata)>::payload_size(&metadata);
2742  } else {
2743  /* TODO: how can we query the iovecs of the buffers here without actually packing the data? */
2744  metadata_size = ttg::default_data_descriptor<ttg::meta::remove_cvr_t<Value>>::payload_size(value_ptr);
2745  iov_size = std::accumulate(copy->iovec_begin(), copy->iovec_end(), 0,
2746  [](std::size_t s, auto& iov){ return s + iov.num_bytes; });
2747  }
2748  /* key is packed at the end */
2749  std::size_t key_pack_size = ttg::default_data_descriptor<Key>::payload_size(&key);
2750  std::size_t pack_size = key_pack_size + metadata_size + iov_size;
2751  if (pack_size < detail::max_inline_size) {
2752  inline_data = true;
2753  }
2754  return inline_data;
2755  }
2756 
2757  // Used to set the i'th argument
2758  template <std::size_t i, typename Key, typename Value>
2759  void set_arg_impl(const Key &key, Value &&value, detail::ttg_data_copy_t *copy_in = nullptr) {
2760  int owner;
2761  using decvalueT = std::decay_t<Value>;
2762  using norefvalueT = std::remove_reference_t<Value>;
2763  norefvalueT *value_ptr = &value;
2764 
2765 #if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND)
2766  if(world.impl().profiling()) {
2767  parsec_profiling_ts_trace(world.impl().parsec_ttg_profile_backend_set_arg_start, 0, 0, NULL);
2768  }
2769 #endif
2770 
2771  if constexpr (!ttg::meta::is_void_v<Key>)
2772  owner = keymap(key);
2773  else
2774  owner = keymap();
2775  if (owner == world.rank()) {
2776  if constexpr (!ttg::meta::is_void_v<keyT>)
2777  set_arg_local_impl<i>(key, std::forward<Value>(value), copy_in);
2778  else
2779  set_arg_local_impl<i>(ttg::Void{}, std::forward<Value>(value), copy_in);
2780 #if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND)
2781  if(world.impl().profiling()) {
2782  parsec_profiling_ts_trace(world.impl().parsec_ttg_profile_backend_set_arg_end, 0, 0, NULL);
2783  }
2784 #endif
2785  return;
2786  }
2787  // the target task is remote. Pack the information and send it to
2788  // the corresponding peer.
2789  // TODO do we need to copy value?
2790  using msg_t = detail::msg_t;
2791  auto &world_impl = world.impl();
2792  uint64_t pos = 0;
2793  int num_iovecs = 0;
2794  std::unique_ptr<msg_t> msg = std::make_unique<msg_t>(get_instance_id(), world_impl.taskpool()->taskpool_id,
2795  msg_header_t::MSG_SET_ARG, i, world_impl.rank(), 1);
2796 
2797  if constexpr (!ttg::meta::is_void_v<decvalueT>) {
2798 
2799  detail::ttg_data_copy_t *copy = copy_in;
2800  /* make sure we have a data copy to register with */
2801  if (nullptr == copy) {
2803  if (nullptr == copy) {
2804  // We need to create a copy for this data, as it does not exist yet.
2805  copy = detail::create_new_datacopy(std::forward<Value>(value));
2806  // use the new value from here on out
2807  value_ptr = static_cast<norefvalueT*>(copy->get_ptr());
2808  }
2809  }
2810 
2811  bool inline_data = can_inline_data(value_ptr, copy, key, 1);
2812  msg->tt_id.inline_data = inline_data;
2813 
2814  auto handle_iovec_fn = [&](auto&& iovecs){
2815 
2816  if (inline_data) {
2817  /* inline data is packed right after the tt_id in the message */
2818  for (auto &&iov : iovecs) {
2819  std::memcpy(msg->bytes + pos, iov.data, iov.num_bytes);
2820  pos += iov.num_bytes;
2821  }
2822  } else {
2823 
2824  /* TODO: at the moment, the tag argument to parsec_ce.get() is treated as a
2825  * raw function pointer instead of a preregistered AM tag, so play that game.
2826  * Once this is fixed in PaRSEC we need to use parsec_ttg_rma_tag instead! */
2827  parsec_ce_tag_t cbtag = reinterpret_cast<parsec_ce_tag_t>(&detail::get_remote_complete_cb);
2828  std::memcpy(msg->bytes + pos, &cbtag, sizeof(cbtag));
2829  pos += sizeof(cbtag);
2830 
2835  for (auto &&iov : iovecs) {
2836  copy = detail::register_data_copy<decvalueT>(copy, nullptr, true);
2837  parsec_ce_mem_reg_handle_t lreg;
2838  size_t lreg_size;
2839  /* TODO: only register once when we can broadcast the data! */
2840  parsec_ce.mem_register(iov.data, PARSEC_MEM_TYPE_NONCONTIGUOUS, iov.num_bytes, parsec_datatype_int8_t,
2841  iov.num_bytes, &lreg, &lreg_size);
2842  auto lreg_ptr = std::shared_ptr<void>{lreg, [](void *ptr) {
2843  parsec_ce_mem_reg_handle_t memreg = (parsec_ce_mem_reg_handle_t)ptr;
2844  parsec_ce.mem_unregister(&memreg);
2845  }};
2846  int32_t lreg_size_i = lreg_size;
2847  std::memcpy(msg->bytes + pos, &lreg_size_i, sizeof(lreg_size_i));
2848  pos += sizeof(lreg_size_i);
2849  std::memcpy(msg->bytes + pos, lreg, lreg_size);
2850  pos += lreg_size;
2851  //std::cout << "set_arg_impl lreg " << lreg << std::endl;
2852  /* TODO: can we avoid the extra indirection of going through std::function? */
2853  std::function<void(void)> *fn = new std::function<void(void)>([=]() mutable {
2854  /* shared_ptr of value and registration captured by value so resetting
2855  * them here will eventually release the memory/registration */
2857  lreg_ptr.reset();
2858  });
2859  std::intptr_t fn_ptr{reinterpret_cast<std::intptr_t>(fn)};
2860  std::memcpy(msg->bytes + pos, &fn_ptr, sizeof(fn_ptr));
2861  pos += sizeof(fn_ptr);
2862  }
2863  }
2864  };
2865 
2866  if constexpr (ttg::has_split_metadata<std::decay_t<Value>>::value) {
2868  auto iovs = descr.get_data(*const_cast<decvalueT *>(value_ptr));
2869  num_iovecs = std::distance(std::begin(iovs), std::end(iovs));
2870  /* pack the metadata */
2871  auto metadata = descr.get_metadata(*const_cast<decvalueT *>(value_ptr));
2872  pos = pack(metadata, msg->bytes, pos);
2873  //std::cout << "set_arg_impl splitmd num_iovecs " << num_iovecs << std::endl;
2874  handle_iovec_fn(iovs);
2875  } else if constexpr (!ttg::has_split_metadata<std::decay_t<Value>>::value) {
2876  /* serialize the object */
2877  //std::cout << "PRE pack num_iovecs " << std::distance(copy->iovec_begin(), copy->iovec_end()) << std::endl;
2878  pos = pack(*value_ptr, msg->bytes, pos, copy);
2879  num_iovecs = std::distance(copy->iovec_begin(), copy->iovec_end());
2880  //std::cout << "POST pack num_iovecs " << num_iovecs << std::endl;
2881  /* handle any iovecs contained in it */
2882  handle_iovec_fn(copy->iovec_span());
2883  copy->iovec_reset();
2884  }
2885 
2886  msg->tt_id.num_iovecs = num_iovecs;
2887  }
2888 
2889  /* pack the key */
2890  msg->tt_id.num_keys = 0;
2891  msg->tt_id.key_offset = pos;
2892  if constexpr (!ttg::meta::is_void_v<Key>) {
2893  size_t tmppos = pack(key, msg->bytes, pos);
2894  pos = tmppos;
2895  msg->tt_id.num_keys = 1;
2896  }
2897 
2898  parsec_taskpool_t *tp = world_impl.taskpool();
2899  tp->tdm.module->outgoing_message_start(tp, owner, NULL);
2900  tp->tdm.module->outgoing_message_pack(tp, owner, NULL, NULL, 0);
2901  //std::cout << "set_arg_impl send_am owner " << owner << " sender " << msg->tt_id.sender << std::endl;
2902  parsec_ce.send_am(&parsec_ce, world_impl.parsec_ttg_tag(), owner, static_cast<void *>(msg.get()),
2903  sizeof(msg_header_t) + pos);
2904 #if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND)
2905  if(world.impl().profiling()) {
2906  parsec_profiling_ts_trace(world.impl().parsec_ttg_profile_backend_set_arg_end, 0, 0, NULL);
2907  }
2908 #endif
2909 #if defined(PARSEC_PROF_GRAPHER)
2910  if(NULL != detail::parsec_ttg_caller && !detail::parsec_ttg_caller->is_dummy()) {
2912  char orig_str[32];
2913  char dest_str[32];
2914  if(orig_index >= 0) {
2915  snprintf(orig_str, 32, "%d", orig_index);
2916  } else {
2917  strncpy(orig_str, "_", 32);
2918  }
2919  snprintf(dest_str, 32, "%lu", i);
2920  parsec_flow_t orig{ .name = orig_str, .sym_type = PARSEC_SYM_INOUT, .flow_flags = PARSEC_FLOW_ACCESS_RW,
2921  .flow_index = 0, .flow_datatype_mask = ~0 };
2922  parsec_flow_t dest{ .name = dest_str, .sym_type = PARSEC_SYM_INOUT, .flow_flags = PARSEC_FLOW_ACCESS_RW,
2923  .flow_index = 0, .flow_datatype_mask = ~0 };
2924  task_t *task = create_new_task(key);
2925  parsec_prof_grapher_dep(&detail::parsec_ttg_caller->parsec_task, &task->parsec_task, 0, &orig, &dest);
2926  delete task;
2927  }
2928 #endif
2929  }
2930 
2931  template <int i, typename Iterator, typename Value>
2932  void broadcast_arg_local(Iterator &&begin, Iterator &&end, const Value &value) {
2933 #if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND)
2934  if(world.impl().profiling()) {
2935  parsec_profiling_ts_trace(world.impl().parsec_ttg_profile_backend_bcast_arg_start, 0, 0, NULL);
2936  }
2937 #endif
2938  parsec_task_t *task_ring = nullptr;
2939  detail::ttg_data_copy_t *copy = nullptr;
2940  if (nullptr != detail::parsec_ttg_caller) {
2942  }
2943 
2944  for (auto it = begin; it != end; ++it) {
2945  set_arg_local_impl<i>(*it, value, copy, &task_ring);
2946  }
2947  /* submit all ready tasks at once */
2948  if (nullptr != task_ring) {
2949  parsec_task_t *vp_task_ring[1] = { task_ring };
2950  __parsec_schedule_vp(world.impl().execution_stream(), vp_task_ring, 0);
2951  }
2952 #if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND)
2953  if(world.impl().profiling()) {
2954  parsec_profiling_ts_trace(world.impl().parsec_ttg_profile_backend_set_arg_end, 0, 0, NULL);
2955  }
2956 #endif
2957  }
2958 
2959  template <std::size_t i, typename Key, typename Value>
2960  std::enable_if_t<!ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>,
2961  void>
2962  broadcast_arg(const ttg::span<const Key> &keylist, const Value &value) {
2963  using valueT = std::tuple_element_t<i, input_values_full_tuple_type>;
2964  auto world = ttg_default_execution_context();
2965  auto np = world.size();
2966  int rank = world.rank();
2967  uint64_t pos = 0;
2968  bool have_remote = keylist.end() != std::find_if(keylist.begin(), keylist.end(),
2969  [&](const Key &key) { return keymap(key) != rank; });
2970 
2971  if (have_remote) {
2972  using decvalueT = std::decay_t<Value>;
2973 
2974  /* sort the input key list by owner and check whether there are remote keys */
2975  std::vector<Key> keylist_sorted(keylist.begin(), keylist.end());
2976  std::sort(keylist_sorted.begin(), keylist_sorted.end(), [&](const Key &a, const Key &b) mutable {
2977  int rank_a = keymap(a);
2978  int rank_b = keymap(b);
2979  // sort so that the keys for my rank are first, rank+1 next, ..., wrapping around to 0
2980  int pos_a = (rank_a + np - rank) % np;
2981  int pos_b = (rank_b + np - rank) % np;
2982  return pos_a < pos_b;
2983  });
2984 
2985  /* Assuming there are no local keys, will be updated while iterating over the keys */
2986  auto local_begin = keylist_sorted.end();
2987  auto local_end = keylist_sorted.end();
2988 
2989  int32_t num_iovs = 0;
2990 
2993  assert(nullptr != copy);
2994 
2995  using msg_t = detail::msg_t;
2996  auto &world_impl = world.impl();
2997  std::unique_ptr<msg_t> msg = std::make_unique<msg_t>(get_instance_id(), world_impl.taskpool()->taskpool_id,
2998  msg_header_t::MSG_SET_ARG, i, world_impl.rank());
2999 
3000  /* check if we inline the data */
3001  /* TODO: this assumes the worst case: that all keys are packed at once (i.e., go to the same remote). Can we do better?*/
3002  bool inline_data = can_inline_data(&value, copy, keylist_sorted[0], keylist_sorted.size());
3003  msg->tt_id.inline_data = inline_data;
3004 
3005  std::vector<std::pair<int32_t, std::shared_ptr<void>>> memregs;
3006  auto handle_iovs_fn = [&](auto&& iovs){
3007 
3008  if (inline_data) {
3009  /* inline data is packed right after the tt_id in the message */
3010  for (auto &&iov : iovs) {
3011  std::memcpy(msg->bytes + pos, iov.data, iov.num_bytes);
3012  pos += iov.num_bytes;
3013  }
3014  } else {
3015 
3016  /* TODO: at the moment, the tag argument to parsec_ce.get() is treated as a
3017  * raw function pointer instead of a preregistered AM tag, so play that game.
3018  * Once this is fixed in PaRSEC we need to use parsec_ttg_rma_tag instead! */
3019  parsec_ce_tag_t cbtag = reinterpret_cast<parsec_ce_tag_t>(&detail::get_remote_complete_cb);
3020  std::memcpy(msg->bytes + pos, &cbtag, sizeof(cbtag));
3021  pos += sizeof(cbtag);
3022 
3023  for (auto &&iov : iovs) {
3024  parsec_ce_mem_reg_handle_t lreg;
3025  size_t lreg_size;
3026  parsec_ce.mem_register(iov.data, PARSEC_MEM_TYPE_NONCONTIGUOUS, iov.num_bytes, parsec_datatype_int8_t,
3027  iov.num_bytes, &lreg, &lreg_size);
3028  /* TODO: use a static function for deregistration here? */
3029  memregs.push_back(std::make_pair(static_cast<int32_t>(lreg_size),
3030  /* TODO: this assumes that parsec_ce_mem_reg_handle_t is void* */
3031  std::shared_ptr<void>{lreg, [](void *ptr) {
3032  parsec_ce_mem_reg_handle_t memreg =
3033  (parsec_ce_mem_reg_handle_t)ptr;
3034  //std::cout << "broadcast_arg memunreg lreg " << memreg << std::endl;
3035  parsec_ce.mem_unregister(&memreg);
3036  }}));
3037  //std::cout << "broadcast_arg memreg lreg " << lreg << std::endl;
3038  }
3039  }
3040  };
3041 
3042  if constexpr (ttg::has_split_metadata<std::decay_t<Value>>::value) {
3044  /* pack the metadata */
3045  auto metadata = descr.get_metadata(value);
3046  pos = pack(metadata, msg->bytes, pos);
3047  auto iovs = descr.get_data(*const_cast<decvalueT *>(&value));
3048  num_iovs = std::distance(std::begin(iovs), std::end(iovs));
3049  memregs.reserve(num_iovs);
3050  handle_iovs_fn(iovs);
3051  //std::cout << "broadcast_arg splitmd num_iovecs " << num_iovs << std::endl;
3052  } else if constexpr (!ttg::has_split_metadata<std::decay_t<Value>>::value) {
3053  /* serialize the object once */
3054  pos = pack(value, msg->bytes, pos, copy);
3055  num_iovs = std::distance(copy->iovec_begin(), copy->iovec_end());
3056  handle_iovs_fn(copy->iovec_span());
3057  copy->iovec_reset();
3058  }
3059 
3060  msg->tt_id.num_iovecs = num_iovs;
3061 
3062  std::size_t save_pos = pos;
3063 
3064  parsec_taskpool_t *tp = world_impl.taskpool();
3065  for (auto it = keylist_sorted.begin(); it < keylist_sorted.end(); /* increment done inline */) {
3066 
3067  auto owner = keymap(*it);
3068  if (owner == rank) {
3069  local_begin = it;
3070  /* find first non-local key */
3071  local_end =
3072  std::find_if_not(++it, keylist_sorted.end(), [&](const Key &key) { return keymap(key) == rank; });
3073  it = local_end;
3074  continue;
3075  }
3076 
3077  /* rewind the buffer and start packing a new set of memregs and keys */
3078  pos = save_pos;
3084  if (!inline_data) {
3085  for (int idx = 0; idx < num_iovs; ++idx) {
3086  // auto [lreg_size, lreg_ptr] = memregs[idx];
3087  int32_t lreg_size;
3088  std::shared_ptr<void> lreg_ptr;
3089  std::tie(lreg_size, lreg_ptr) = memregs[idx];
3090  std::memcpy(msg->bytes + pos, &lreg_size, sizeof(lreg_size));
3091  pos += sizeof(lreg_size);
3092  std::memcpy(msg->bytes + pos, lreg_ptr.get(), lreg_size);
3093  pos += lreg_size;
3094  //std::cout << "broadcast_arg lreg_ptr " << lreg_ptr.get() << std::endl;
3095  /* mark another reader on the copy */
3096  copy = detail::register_data_copy<valueT>(copy, nullptr, true);
3097  /* create a function that will be invoked upon RMA completion at the target */
3098  std::function<void(void)> *fn = new std::function<void(void)>([=]() mutable {
3099  /* shared_ptr of value and registration captured by value so resetting
3100  * them here will eventually release the memory/registration */
3102  lreg_ptr.reset();
3103  });
3104  std::intptr_t fn_ptr{reinterpret_cast<std::intptr_t>(fn)};
3105  std::memcpy(msg->bytes + pos, &fn_ptr, sizeof(fn_ptr));
3106  pos += sizeof(fn_ptr);
3107  }
3108  }
3109 
3110  /* mark the beginning of the keys */
3111  msg->tt_id.key_offset = pos;
3112 
3113  /* pack all keys for this owner */
3114  int num_keys = 0;
3115  do {
3116  ++num_keys;
3117  pos = pack(*it, msg->bytes, pos);
3118  ++it;
3119  } while (it < keylist_sorted.end() && keymap(*it) == owner);
3120  msg->tt_id.num_keys = num_keys;
3121 
3122  tp->tdm.module->outgoing_message_start(tp, owner, NULL);
3123  tp->tdm.module->outgoing_message_pack(tp, owner, NULL, NULL, 0);
3124  //std::cout << "broadcast_arg send_am owner " << owner << std::endl;
3125  parsec_ce.send_am(&parsec_ce, world_impl.parsec_ttg_tag(), owner, static_cast<void *>(msg.get()),
3126  sizeof(msg_header_t) + pos);
3127  }
3128  /* handle local keys */
3129  broadcast_arg_local<i>(local_begin, local_end, value);
3130  } else {
3131  /* handle local keys */
3132  broadcast_arg_local<i>(keylist.begin(), keylist.end(), value);
3133  }
3134  }
3135 
3136  // Used by invoke to set all arguments associated with a task
3137  // Is: index sequence of elements in args
3138  // Js: index sequence of input terminals to set
3139  template <typename Key, typename... Ts, size_t... Is, size_t... Js>
3140  std::enable_if_t<ttg::meta::is_none_void_v<Key>, void> set_args(std::index_sequence<Is...>,
3141  std::index_sequence<Js...>, const Key &key,
3142  const std::tuple<Ts...> &args) {
3143  static_assert(sizeof...(Js) == sizeof...(Is));
3144  constexpr size_t js[] = {Js...};
3145  int junk[] = {0, (set_arg<js[Is]>(key, TT::get<Is>(args)), 0)...};
3146  junk[0]++;
3147  }
3148 
3149  // Used by invoke to set all arguments associated with a task
3150  // Is: index sequence of input terminals to set
3151  template <typename Key, typename... Ts, size_t... Is>
3152  std::enable_if_t<ttg::meta::is_none_void_v<Key>, void> set_args(std::index_sequence<Is...> is, const Key &key,
3153  const std::tuple<Ts...> &args) {
3154  set_args(std::index_sequence_for<Ts...>{}, is, key, args);
3155  }
3156 
3157  // Used by invoke to set all arguments associated with a task
3158  // Is: index sequence of elements in args
3159  // Js: index sequence of input terminals to set
3160  template <typename Key = keyT, typename... Ts, size_t... Is, size_t... Js>
3161  std::enable_if_t<ttg::meta::is_void_v<Key>, void> set_args(std::index_sequence<Is...>, std::index_sequence<Js...>,
3162  const std::tuple<Ts...> &args) {
3163  static_assert(sizeof...(Js) == sizeof...(Is));
3164  constexpr size_t js[] = {Js...};
3165  int junk[] = {0, (set_arg<js[Is], void>(TT::get<Is>(args)), 0)...};
3166  junk[0]++;
3167  }
3168 
3169  // Used by invoke to set all arguments associated with a task
3170  // Is: index sequence of input terminals to set
3171  template <typename Key = keyT, typename... Ts, size_t... Is>
3172  std::enable_if_t<ttg::meta::is_void_v<Key>, void> set_args(std::index_sequence<Is...> is,
3173  const std::tuple<Ts...> &args) {
3174  set_args(std::index_sequence_for<Ts...>{}, is, args);
3175  }
3176 
3177  public:
3180  template <std::size_t i>
3181  void set_static_argstream_size(std::size_t size) {
3182  assert(std::get<i>(input_reducers) && "TT::set_static_argstream_size called on nonstreaming input terminal");
3183  assert(size > 0 && "TT::set_static_argstream_size(key,size) called with size=0");
3184 
3185  this->trace(world.rank(), ":", get_name(), ": setting global stream size for terminal ", i);
3186 
3187  // Check if stream is already bounded
3188  if (static_stream_goal[i] < std::numeric_limits<std::size_t>::max()) {
3189  ttg::print_error(world.rank(), ":", get_name(), " : error stream is already bounded : ", i);
3190  throw std::runtime_error("TT::set_static_argstream_size called for a bounded stream");
3191  }
3192 
3193  static_stream_goal[i] = size;
3194  }
3195 
3199  template <std::size_t i, typename Key>
3200  std::enable_if_t<!ttg::meta::is_void_v<Key>, void> set_argstream_size(const Key &key, std::size_t size) {
3201  // preconditions
3202  assert(std::get<i>(input_reducers) && "TT::set_argstream_size called on nonstreaming input terminal");
3203  assert(size > 0 && "TT::set_argstream_size(key,size) called with size=0");
3204 
3205  // body
3206  const auto owner = keymap(key);
3207  if (owner != world.rank()) {
3208  ttg::trace(world.rank(), ":", get_name(), ":", key, " : forwarding stream size for terminal ", i);
3209  using msg_t = detail::msg_t;
3210  auto &world_impl = world.impl();
3211  uint64_t pos = 0;
3212  std::unique_ptr<msg_t> msg = std::make_unique<msg_t>(get_instance_id(), world_impl.taskpool()->taskpool_id,
3214  world_impl.rank(), 1);
3215  /* pack the key */
3216  pos = pack(key, msg->bytes, pos);
3217  pos = pack(size, msg->bytes, pos);
3218  parsec_taskpool_t *tp = world_impl.taskpool();
3219  tp->tdm.module->outgoing_message_start(tp, owner, NULL);
3220  tp->tdm.module->outgoing_message_pack(tp, owner, NULL, NULL, 0);
3221  parsec_ce.send_am(&parsec_ce, world_impl.parsec_ttg_tag(), owner, static_cast<void *>(msg.get()),
3222  sizeof(msg_header_t) + pos);
3223  } else {
3224  ttg::trace(world.rank(), ":", get_name(), ":", key, " : setting stream size to ", size, " for terminal ", i);
3225 
3226  auto hk = reinterpret_cast<parsec_key_t>(&key);
3227  task_t *task;
3228  parsec_hash_table_lock_bucket(&tasks_table, hk);
3229  if (nullptr == (task = (task_t *)parsec_hash_table_nolock_find(&tasks_table, hk))) {
3230  task = create_new_task(key);
3231  world.impl().increment_created();
3232  parsec_hash_table_nolock_insert(&tasks_table, &task->tt_ht_item);
3233  if( world.impl().dag_profiling() ) {
3234 #if defined(PARSEC_PROF_GRAPHER)
3235  parsec_prof_grapher_task(&task->parsec_task, world.impl().execution_stream()->th_id, 0, *(uintptr_t*)&(task->parsec_task.locals[0]));
3236 #endif
3237  }
3238  }
3239  parsec_hash_table_unlock_bucket(&tasks_table, hk);
3240 
3241  // TODO: Unfriendly implementation, cannot check if stream is already bounded
3242  // TODO: Unfriendly implementation, cannot check if stream has been finalized already
3243 
3244  // commit changes
3245  // 1) "lock" the stream by incrementing the reduce_count
3246  // 2) set the goal
3247  // 3) "unlock" the stream
3248  // only one thread will see the reduce_count be zero and the goal match the size
3249  task->streams[i].reduce_count.fetch_add(1, std::memory_order_acquire);
3250  task->streams[i].goal = size;
3251  auto c = task->streams[i].reduce_count.fetch_sub(1, std::memory_order_release);
3252  if (1 == c && (task->streams[i].size >= size)) {
3253  release_task(task);
3254  }
3255  }
3256  }
3257 
3260  template <std::size_t i, typename Key = keyT>
3261  std::enable_if_t<ttg::meta::is_void_v<Key>, void> set_argstream_size(std::size_t size) {
3262  // preconditions
3263  assert(std::get<i>(input_reducers) && "TT::set_argstream_size called on nonstreaming input terminal");
3264  assert(size > 0 && "TT::set_argstream_size(key,size) called with size=0");
3265 
3266  // body
3267  const auto owner = keymap();
3268  if (owner != world.rank()) {
3269  ttg::trace(world.rank(), ":", get_name(), " : forwarding stream size for terminal ", i);
3270  using msg_t = detail::msg_t;
3271  auto &world_impl = world.impl();
3272  uint64_t pos = 0;
3273  std::unique_ptr<msg_t> msg = std::make_unique<msg_t>(get_instance_id(), world_impl.taskpool()->taskpool_id,
3275  world_impl.rank(), 0);
3276  pos = pack(size, msg->bytes, pos);
3277  parsec_taskpool_t *tp = world_impl.taskpool();
3278  tp->tdm.module->outgoing_message_start(tp, owner, NULL);
3279  tp->tdm.module->outgoing_message_pack(tp, owner, NULL, NULL, 0);
3280  parsec_ce.send_am(&parsec_ce, world_impl.parsec_ttg_tag(), owner, static_cast<void *>(msg.get()),
3281  sizeof(msg_header_t) + pos);
3282  } else {
3283  ttg::trace(world.rank(), ":", get_name(), " : setting stream size to ", size, " for terminal ", i);
3284 
3285  parsec_key_t hk = 0;
3286  task_t *task;
3287  parsec_hash_table_lock_bucket(&tasks_table, hk);
3288  if (nullptr == (task = (task_t *)parsec_hash_table_nolock_find(&tasks_table, hk))) {
3289  task = create_new_task(ttg::Void{});
3290  world.impl().increment_created();
3291  parsec_hash_table_nolock_insert(&tasks_table, &task->tt_ht_item);
3292  if( world.impl().dag_profiling() ) {
3293 #if defined(PARSEC_PROF_GRAPHER)
3294  parsec_prof_grapher_task(&task->parsec_task, world.impl().execution_stream()->th_id, 0, *(uintptr_t*)&(task->parsec_task.locals[0]));
3295 #endif
3296  }
3297  }
3298  parsec_hash_table_unlock_bucket(&tasks_table, hk);
3299 
3300  // TODO: Unfriendly implementation, cannot check if stream is already bounded
3301  // TODO: Unfriendly implementation, cannot check if stream has been finalized already
3302 
3303  // commit changes
3304  // 1) "lock" the stream by incrementing the reduce_count
3305  // 2) set the goal
3306  // 3) "unlock" the stream
3307  // only one thread will see the reduce_count be zero and the goal match the size
3308  task->streams[i].reduce_count.fetch_add(1, std::memory_order_acquire);
3309  task->streams[i].goal = size;
3310  auto c = task->streams[i].reduce_count.fetch_sub(1, std::memory_order_release);
3311  if (1 == c && (task->streams[i].size >= size)) {
3312  release_task(task);
3313  }
3314  }
3315  }
3316 
3318  template <std::size_t i, typename Key>
3319  std::enable_if_t<!ttg::meta::is_void_v<Key>, void> finalize_argstream(const Key &key) {
3320  // preconditions
3321  assert(std::get<i>(input_reducers) && "TT::finalize_argstream called on nonstreaming input terminal");
3322 
3323  // body
3324  const auto owner = keymap(key);
3325  if (owner != world.rank()) {
3326  ttg::trace(world.rank(), ":", get_name(), " : ", key, ": forwarding stream finalize for terminal ", i);
3327  using msg_t = detail::msg_t;
3328  auto &world_impl = world.impl();
3329  uint64_t pos = 0;
3330  std::unique_ptr<msg_t> msg = std::make_unique<msg_t>(get_instance_id(), world_impl.taskpool()->taskpool_id,
3332  world_impl.rank(), 1);
3333  /* pack the key */
3334  pos = pack(key, msg->bytes, pos);
3335  parsec_taskpool_t *tp = world_impl.taskpool();
3336  tp->tdm.module->outgoing_message_start(tp, owner, NULL);
3337  tp->tdm.module->outgoing_message_pack(tp, owner, NULL, NULL, 0);
3338  parsec_ce.send_am(&parsec_ce, world_impl.parsec_ttg_tag(), owner, static_cast<void *>(msg.get()),
3339  sizeof(msg_header_t) + pos);
3340  } else {
3341  ttg::trace(world.rank(), ":", get_name(), " : ", key, ": finalizing stream for terminal ", i);
3342 
3343  auto hk = reinterpret_cast<parsec_key_t>(&key);
3344  task_t *task = nullptr;
3345  //parsec_hash_table_lock_bucket(&tasks_table, hk);
3346  if (nullptr == (task = (task_t *)parsec_hash_table_find(&tasks_table, hk))) {
3347  ttg::print_error(world.rank(), ":", get_name(), ":", key,
3348  " : error finalize called on stream that never received an input data: ", i);
3349  throw std::runtime_error("TT::finalize called on stream that never received an input data");
3350  }
3351 
3352  // TODO: Unfriendly implementation, cannot check if stream is already bounded
3353  // TODO: Unfriendly implementation, cannot check if stream has been finalized already
3354 
3355  // commit changes
3356  // 1) "lock" the stream by incrementing the reduce_count
3357  // 2) set the goal
3358  // 3) "unlock" the stream
3359  // only one thread will see the reduce_count be zero and the goal match the size
3360  task->streams[i].reduce_count.fetch_add(1, std::memory_order_acquire);
3361  task->streams[i].goal = 1;
3362  auto c = task->streams[i].reduce_count.fetch_sub(1, std::memory_order_release);
3363  if (1 == c && (task->streams[i].size >= 1)) {
3364  release_task(task);
3365  }
3366  }
3367  }
3368 
3370  template <std::size_t i, bool key_is_void = ttg::meta::is_void_v<keyT>>
3371  std::enable_if_t<key_is_void, void> finalize_argstream() {
3372  // preconditions
3373  assert(std::get<i>(input_reducers) && "TT::finalize_argstream called on nonstreaming input terminal");
3374 
3375  // body
3376  const auto owner = keymap();
3377  if (owner != world.rank()) {
3378  ttg::trace(world.rank(), ":", get_name(), ": forwarding stream finalize for terminal ", i);
3379  using msg_t = detail::msg_t;
3380  auto &world_impl = world.impl();
3381  uint64_t pos = 0;
3382  std::unique_ptr<msg_t> msg = std::make_unique<msg_t>(get_instance_id(), world_impl.taskpool()->taskpool_id,
3384  world_impl.rank(), 0);
3385  parsec_taskpool_t *tp = world_impl.taskpool();
3386  tp->tdm.module->outgoing_message_start(tp, owner, NULL);
3387  tp->tdm.module->outgoing_message_pack(tp, owner, NULL, NULL, 0);
3388  parsec_ce.send_am(&parsec_ce, world_impl.parsec_ttg_tag(), owner, static_cast<void *>(msg.get()),
3389  sizeof(msg_header_t) + pos);
3390  } else {
3391  ttg::trace(world.rank(), ":", get_name(), ": finalizing stream for terminal ", i);
3392 
3393  auto hk = static_cast<parsec_key_t>(0);
3394  task_t *task = nullptr;
3395  if (nullptr == (task = (task_t *)parsec_hash_table_find(&tasks_table, hk))) {
3396  ttg::print_error(world.rank(), ":", get_name(),
3397  " : error finalize called on stream that never received an input data: ", i);
3398  throw std::runtime_error("TT::finalize called on stream that never received an input data");
3399  }
3400 
3401  // TODO: Unfriendly implementation, cannot check if stream is already bounded
3402  // TODO: Unfriendly implementation, cannot check if stream has been finalized already
3403 
3404  // commit changes
3405  // 1) "lock" the stream by incrementing the reduce_count
3406  // 2) set the goal
3407  // 3) "unlock" the stream
3408  // only one thread will see the reduce_count be zero and the goal match the size
3409  task->streams[i].reduce_count.fetch_add(1, std::memory_order_acquire);
3410  task->streams[i].goal = 1;
3411  auto c = task->streams[i].reduce_count.fetch_sub(1, std::memory_order_release);
3412  if (1 == c && (task->streams[i].size >= 1)) {
3413  release_task(task);
3414  }
3415  }
3416  }
3417 
3419 
3420  assert(detail::parsec_ttg_caller->dev_ptr && detail::parsec_ttg_caller->dev_ptr->gpu_task);
3421  parsec_gpu_task_t *gpu_task = detail::parsec_ttg_caller->dev_ptr->gpu_task;
3422  auto check_parsec_data = [&](parsec_data_t* data) {
3423  if (data->owner_device != 0) {
3424  /* find the flow */
3425  int flowidx = 0;
3426  while (flowidx < MAX_PARAM_COUNT &&
3427  gpu_task->flow[flowidx]->flow_flags != PARSEC_FLOW_ACCESS_NONE) {
3428  if (detail::parsec_ttg_caller->parsec_task.data[flowidx].data_in->original == data) {
3429  /* found the right data, set the corresponding flow as pushout */
3430  break;
3431  }
3432  ++flowidx;
3433  }
3434  if (flowidx == MAX_PARAM_COUNT) {
3435  throw std::runtime_error("Cannot add more than MAX_PARAM_COUNT flows to a task!");
3436  }
3437  if (gpu_task->flow[flowidx]->flow_flags == PARSEC_FLOW_ACCESS_NONE) {
3438  /* no flow found, add one and mark it pushout */
3439  detail::parsec_ttg_caller->parsec_task.data[flowidx].data_in = data->device_copies[0];
3440  gpu_task->flow_nb_elts[flowidx] = data->nb_elts;
3441  }
3442  /* need to mark the flow RW to make PaRSEC happy */
3443  ((parsec_flow_t *)gpu_task->flow[flowidx])->flow_flags |= PARSEC_FLOW_ACCESS_RW;
3444  gpu_task->pushout |= 1<<flowidx;
3445  }
3446  };
3447  copy->foreach_parsec_data(check_parsec_data);
3448  }
3449 
3450 
3451  /* check whether a data needs to be pushed out */
3452  template <std::size_t i, typename Value, typename RemoteCheckFn>
3453  std::enable_if_t<!std::is_void_v<std::decay_t<Value>>,
3454  void>
3455  do_prepare_send(const Value &value, RemoteCheckFn&& remote_check) {
3456  using valueT = std::tuple_element_t<i, input_values_full_tuple_type>;
3457  static constexpr const bool value_is_const = std::is_const_v<valueT>;
3458 
3459  /* get the copy */
3462 
3463  /* if there is no copy we don't need to prepare anything */
3464  if (nullptr == copy) {
3465  return;
3466  }
3467 
3469  bool need_pushout = false;
3470 
3472  /* already marked pushout, skip the rest */
3473  return;
3474  }
3475 
3476  /* TODO: remove this once we support reductions on the GPU */
3477  auto &reducer = std::get<i>(input_reducers);
3478  if (reducer) {
3479  /* reductions are currently done only on the host so push out */
3480  copy_mark_pushout(copy);
3482  return;
3483  }
3484 
3485  if constexpr (value_is_const) {
3487  /* The data has been modified previously. If not all devices can access
3488  * their peers then we need to push out to the host so that all devices
3489  * have the data available for reading.
3490  * NOTE: we currently don't allow users to force the next writer to be
3491  * on a different device. In that case PaRSEC would take the host-side
3492  * copy. If we change our restriction we need to revisit this.
3493  * Ideally, PaRSEC would take the device copy if the owner moves... */
3494  need_pushout = !detail::all_devices_peer_access;
3495  }
3496 
3497  /* check for multiple readers */
3500  }
3501 
3503  /* there is a writer already, we will need to create a copy */
3504  need_pushout = true;
3505  }
3506 
3508  } else {
3511  need_pushout = true;
3512  } else {
3514  /* there are readers, we will need to create a copy */
3515  need_pushout = true;
3516  }
3518  }
3519  }
3520 
3521  if constexpr (!derived_has_device_op()) {
3522  need_pushout = true;
3523  }
3524 
3525  /* check if there are non-local successors if it's a device task */
3526  if (!need_pushout) {
3527  bool device_supported = false;
3528  if constexpr (derived_has_cuda_op()) {
3529  device_supported = world.impl().mpi_support(ttg::ExecutionSpace::CUDA);
3530  } else if constexpr (derived_has_hip_op()) {
3531  device_supported = world.impl().mpi_support(ttg::ExecutionSpace::HIP);
3532  } else if constexpr (derived_has_level_zero_op()) {
3533  device_supported = world.impl().mpi_support(ttg::ExecutionSpace::L0);
3534  }
3535  /* if MPI supports the device we don't care whether we have remote peers
3536  * because we can send from the device directly */
3537  if (!device_supported) {
3538  need_pushout = remote_check();
3539  }
3540  }
3541 
3542  if (need_pushout) {
3543  copy_mark_pushout(copy);
3545  }
3546  }
3547 
3548  /* check whether a data needs to be pushed out */
3549  template <std::size_t i, typename Key, typename Value>
3550  std::enable_if_t<!ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>,
3551  void>
3552  prepare_send(const ttg::span<const Key> &keylist, const Value &value) {
3553  auto remote_check = [&](){
3554  auto world = ttg_default_execution_context();
3555  int rank = world.rank();
3556  bool remote = keylist.end() != std::find_if(keylist.begin(), keylist.end(),
3557  [&](const Key &key) { return keymap(key) != rank; });
3558  return remote;
3559  };
3560  do_prepare_send<i>(value, remote_check);
3561  }
3562 
3563  template <std::size_t i, typename Key, typename Value>
3564  std::enable_if_t<ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>,
3565  void>
3566  prepare_send(const Value &value) {
3567  auto remote_check = [&](){
3568  auto world = ttg_default_execution_context();
3569  int rank = world.rank();
3570  return (keymap() != rank);
3571  };
3572  do_prepare_send<i>(value, remote_check);
3573  }
3574 
3575  private:
3576  // Copy/assign/move forbidden ... we could make it work using
3577  // PIMPL for this base class. However, this instance of the base
3578  // class is tied to a specific instance of a derived class a
3579  // pointer to which is captured for invoking derived class
3580  // functions. Thus, not only does the derived class has to be
3581  // involved but we would have to do it in a thread safe way
3582  // including for possibly already running tasks and remote
3583  // references. This is not worth the effort ... wherever you are
3584  // wanting to move/assign an TT you should be using a pointer.
3585  TT(const TT &other) = delete;
3586  TT &operator=(const TT &other) = delete;
3587  TT(TT &&other) = delete;
3588  TT &operator=(TT &&other) = delete;
3589 
3590  // Registers the callback for the i'th input terminal
3591  template <typename terminalT, std::size_t i>
3592  void register_input_callback(terminalT &input) {
3593  using valueT = std::decay_t<typename terminalT::value_type>;
3594  if (input.is_pull_terminal) {
3595  num_pullins++;
3596  }
3598  // case 1: nonvoid key, nonvoid value
3600  if constexpr (!ttg::meta::is_void_v<keyT> && !std::is_void_v<valueT>) {
3601  auto move_callback = [this](const keyT &key, valueT &&value) {
3602  set_arg<i, keyT, valueT>(key, std::forward<valueT>(value));
3603  };
3604  auto send_callback = [this](const keyT &key, const valueT &value) {
3605  set_arg<i, keyT, const valueT &>(key, value);
3606  };
3607  auto broadcast_callback = [this](const ttg::span<const keyT> &keylist, const valueT &value) {
3608  broadcast_arg<i, keyT, valueT>(keylist, value);
3609  };
3610  auto prepare_send_callback = [this](const ttg::span<const keyT> &keylist, const valueT &value) {
3611  prepare_send<i, keyT, valueT>(keylist, value);
3612  };
3613  auto setsize_callback = [this](const keyT &key, std::size_t size) { set_argstream_size<i>(key, size); };
3614  auto finalize_callback = [this](const keyT &key) { finalize_argstream<i>(key); };
3615  input.set_callback(send_callback, move_callback, broadcast_callback,
3616  setsize_callback, finalize_callback, prepare_send_callback);
3617  }
3619  // case 2: nonvoid key, void value, mixed inputs
3621  else if constexpr (!ttg::meta::is_void_v<keyT> && std::is_void_v<valueT>) {
3622  auto send_callback = [this](const keyT &key) { set_arg<i, keyT, ttg::Void>(key, ttg::Void{}); };
3623  auto setsize_callback = [this](const keyT &key, std::size_t size) { set_argstream_size<i>(key, size); };
3624  auto finalize_callback = [this](const keyT &key) { finalize_argstream<i>(key); };
3625  input.set_callback(send_callback, send_callback, {}, setsize_callback, finalize_callback);
3626  }
3628  // case 3: nonvoid key, void value, no inputs
3629  // NOTE: subsumed in case 2 above, kept for historical reasons
3632  // case 4: void key, nonvoid value
3634  else if constexpr (ttg::meta::is_void_v<keyT> && !std::is_void_v<valueT>) {
3635  auto move_callback = [this](valueT &&value) { set_arg<i, keyT, valueT>(std::forward<valueT>(value)); };
3636  auto send_callback = [this](const valueT &value) {
3637  if constexpr (std::is_copy_constructible_v<valueT>) {
3638  set_arg<i, keyT, const valueT &>(value);
3639  }
3640  else {
3641  throw std::logic_error(std::string("TTG::PaRSEC: send_callback is invoked on datum of type ") + boost::typeindex::type_id<valueT>().pretty_name() + " which is not copy constructible, std::move datum into send/broadcast statement");
3642  }
3643  };
3644  auto setsize_callback = [this](std::size_t size) { set_argstream_size<i>(size); };
3645  auto finalize_callback = [this]() { finalize_argstream<i>(); };
3646  auto prepare_send_callback = [this](const valueT &value) {
3647  prepare_send<i, void>(value);
3648  };
3649  input.set_callback(send_callback, move_callback, {}, setsize_callback, finalize_callback, prepare_send_callback);
3650  }
3652  // case 5: void key, void value, mixed inputs
3654  else if constexpr (ttg::meta::is_void_v<keyT> && std::is_void_v<valueT>) {
3655  auto send_callback = [this]() { set_arg<i, keyT, ttg::Void>(ttg::Void{}); };
3656  auto setsize_callback = [this](std::size_t size) { set_argstream_size<i>(size); };
3657  auto finalize_callback = [this]() { finalize_argstream<i>(); };
3658  input.set_callback(send_callback, send_callback, {}, setsize_callback, finalize_callback);
3659  }
3661  // case 6: void key, void value, no inputs
3662  // NOTE: subsumed in case 5 above, kept for historical reasons
3664  else
3665  ttg::abort();
3666  }
3667 
3668  template <std::size_t... IS>
3669  void register_input_callbacks(std::index_sequence<IS...>) {
3670  int junk[] = {
3671  0,
3672  (register_input_callback<std::tuple_element_t<IS, input_terminals_type>, IS>(std::get<IS>(input_terminals)),
3673  0)...};
3674  junk[0]++;
3675  }
3676 
3677  template <std::size_t... IS, typename inedgesT>
3678  void connect_my_inputs_to_incoming_edge_outputs(std::index_sequence<IS...>, inedgesT &inedges) {
3679  int junk[] = {0, (std::get<IS>(inedges).set_out(&std::get<IS>(input_terminals)), 0)...};
3680  junk[0]++;
3681  }
3682 
3683  template <std::size_t... IS, typename outedgesT>
3684  void connect_my_outputs_to_outgoing_edge_inputs(std::index_sequence<IS...>, outedgesT &outedges) {
3685  int junk[] = {0, (std::get<IS>(outedges).set_in(&std::get<IS>(output_terminals)), 0)...};
3686  junk[0]++;
3687  }
3688 
3689 #if 0
3690  template <typename input_terminals_tupleT, std::size_t... IS, typename flowsT>
3691  void _initialize_flows(std::index_sequence<IS...>, flowsT &&flows) {
3692  int junk[] = {0,
3693  (*(const_cast<std::remove_const_t<decltype(flows[IS]->flow_flags)> *>(&(flows[IS]->flow_flags))) =
3694  (std::is_const_v<std::tuple_element_t<IS, input_terminals_tupleT>> ? PARSEC_FLOW_ACCESS_READ
3695  : PARSEC_FLOW_ACCESS_RW),
3696  0)...};
3697  junk[0]++;
3698  }
3699 
3700  template <typename input_terminals_tupleT, typename flowsT>
3701  void initialize_flows(flowsT &&flows) {
3702  _initialize_flows<input_terminals_tupleT>(
3703  std::make_index_sequence<std::tuple_size<input_terminals_tupleT>::value>{}, flows);
3704  }
3705 #endif // 0
3706 
3707  void fence() override { ttg::default_execution_context().impl().fence(); }
3708 
3709  static int key_equal(parsec_key_t a, parsec_key_t b, void *user_data) {
3710  if constexpr (std::is_same_v<keyT, void>) {
3711  return 1;
3712  } else {
3713  keyT &ka = *(reinterpret_cast<keyT *>(a));
3714  keyT &kb = *(reinterpret_cast<keyT *>(b));
3715  return ka == kb;
3716  }
3717  }
3718 
3719  static uint64_t key_hash(parsec_key_t k, void *user_data) {
3720  constexpr const bool keyT_is_Void = ttg::meta::is_void_v<keyT>;
3721  if constexpr (keyT_is_Void || std::is_same_v<keyT, void>) {
3722  return 0;
3723  } else {
3724  keyT &kk = *(reinterpret_cast<keyT *>(k));
3725  using ttg::hash;
3726  uint64_t hv = hash<std::decay_t<decltype(kk)>>{}(kk);
3727  return hv;
3728  }
3729  }
3730 
3731  static char *key_print(char *buffer, size_t buffer_size, parsec_key_t k, void *user_data) {
3732  if constexpr (std::is_same_v<keyT, void>) {
3733  buffer[0] = '\0';
3734  return buffer;
3735  } else {
3736  keyT kk = *(reinterpret_cast<keyT *>(k));
3737  std::stringstream iss;
3738  iss << kk;
3739  memset(buffer, 0, buffer_size);
3740  iss.get(buffer, buffer_size);
3741  return buffer;
3742  }
3743  }
3744 
3745  static parsec_key_t make_key(const parsec_taskpool_t *tp, const parsec_assignment_t *as) {
3746  // we use the parsec_assignment_t array as a scratchpad to store the hash and address of the key
3747  keyT *key = *(keyT**)&(as[2]);
3748  return reinterpret_cast<parsec_key_t>(key);
3749  }
3750 
3751  static char *parsec_ttg_task_snprintf(char *buffer, size_t buffer_size, const parsec_task_t *parsec_task) {
3752  if(buffer_size == 0)
3753  return buffer;
3754 
3755  if constexpr (ttg::meta::is_void_v<keyT>) {
3756  snprintf(buffer, buffer_size, "%s()[]<%d>", parsec_task->task_class->name, parsec_task->priority);
3757  } else {
3758  const task_t *task = reinterpret_cast<const task_t*>(parsec_task);
3759  std::stringstream ss;
3760  ss << task->key;
3761 
3762  std::string keystr = ss.str();
3763  std::replace(keystr.begin(), keystr.end(), '(', ':');
3764  std::replace(keystr.begin(), keystr.end(), ')', ':');
3765 
3766  snprintf(buffer, buffer_size, "%s(%s)[]<%d>", parsec_task->task_class->name, keystr.c_str(), parsec_task->priority);
3767  }
3768  return buffer;
3769  }
3770 
3771 #if defined(PARSEC_PROF_TRACE)
3772  static void *parsec_ttg_task_info(void *dst, const void *data, size_t size)
3773  {
3774  const task_t *task = reinterpret_cast<const task_t *>(data);
3775 
3776  if constexpr (ttg::meta::is_void_v<keyT>) {
3777  snprintf(reinterpret_cast<char*>(dst), size, "()");
3778  } else {
3779  std::stringstream ss;
3780  ss << task->key;
3781  snprintf(reinterpret_cast<char*>(dst), size, "%s", ss.str().c_str());
3782  }
3783  return dst;
3784  }
3785 #endif
3786 
3787  parsec_key_fn_t tasks_hash_fcts = {key_equal, key_print, key_hash};
3788 
3789  template<std::size_t I>
3790  inline static void increment_data_version_impl(task_t *task) {
3791  if constexpr (!std::is_const_v<std::tuple_element_t<I, typename TT::input_values_tuple_type>>) {
3792  if (task->copies[I] != nullptr){
3793  task->copies[I]->inc_current_version();
3794  }
3795  }
3796  }
3797 
3798  template<std::size_t... Is>
3799  inline static void increment_data_versions(task_t *task, std::index_sequence<Is...>) {
3800  /* increment version of each mutable data */
3801  int junk[] = {0, (increment_data_version_impl<Is>(task), 0)...};
3802  junk[0]++;
3803  }
3804 
3805  static parsec_hook_return_t complete_task_and_release(parsec_execution_stream_t *es, parsec_task_t *parsec_task) {
3806 
3807  //std::cout << "complete_task_and_release: task " << parsec_task << std::endl;
3808 
3809  task_t *task = (task_t*)parsec_task;
3810 
3811 #ifdef TTG_HAVE_COROUTINE
3812  /* if we still have a coroutine handle we invoke it one more time to get the sends/broadcasts */
3813  if (task->suspended_task_address) {
3814  assert(task->coroutine_id != ttg::TaskCoroutineID::Invalid);
3815 #ifdef TTG_HAVE_DEVICE
3816  if (task->coroutine_id == ttg::TaskCoroutineID::DeviceTask) {
3817  /* increment versions of all data we might have modified
3818  * this must happen before we issue the sends */
3819  //increment_data_versions(task, std::make_index_sequence<std::tuple_size_v<typename TT::input_values_tuple_type>>{});
3820 
3821  // get the device task from the coroutine handle
3822  auto dev_task = ttg::device::detail::device_task_handle_type::from_address(task->suspended_task_address);
3823 
3824  // get the promise which contains the views
3825  auto dev_data = dev_task.promise();
3826 
3827  assert(dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_SENDOUT ||
3828  dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_COMPLETE);
3829 
3830  /* execute the sends we stored */
3831  if (dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_SENDOUT) {
3832  /* set the current task, needed inside the sends */
3834  auto old_output_tls_ptr = task->tt->outputs_tls_ptr_accessor();
3835  task->tt->set_outputs_tls_ptr();
3836  dev_data.do_sends(); // all sends happen here
3837  task->tt->set_outputs_tls_ptr(old_output_tls_ptr);
3838  detail::parsec_ttg_caller = nullptr;
3839  }
3840  }
3841 #endif // TTG_HAVE_DEVICE
3842  /* the coroutine should have completed and we cannot access the promise anymore */
3843  task->suspended_task_address = nullptr;
3844  }
3845 #endif // TTG_HAVE_COROUTINE
3846 
3847  /* release our data copies */
3848  for (int i = 0; i < task->data_count; i++) {
3849  detail::ttg_data_copy_t *copy = task->copies[i];
3850  if (nullptr == copy) continue;
3852  task->copies[i] = nullptr;
3853  }
3854 
3855  for (auto& c : task->tt->constraints_complete) {
3856  if constexpr(std::is_void_v<keyT>) {
3857  c();
3858  } else {
3859  c(task->key);
3860  }
3861  }
3862  return PARSEC_HOOK_RETURN_DONE;
3863  }
3864 
3865  public:
3866  template <typename keymapT = ttg::detail::default_keymap<keyT>,
3867  typename priomapT = ttg::detail::default_priomap<keyT>>
3868  TT(const std::string &name, const std::vector<std::string> &innames, const std::vector<std::string> &outnames,
3869  ttg::World world, keymapT &&keymap_ = keymapT(), priomapT &&priomap_ = priomapT())
3870  : ttg::TTBase(name, numinedges, numouts)
3871  , world(world)
3872  // if using default keymap, rebind to the given world
3873  , keymap(std::is_same<keymapT, ttg::detail::default_keymap<keyT>>::value
3874  ? decltype(keymap)(ttg::detail::default_keymap<keyT>(world))
3875  : decltype(keymap)(std::forward<keymapT>(keymap_)))
3876  , priomap(decltype(keymap)(std::forward<priomapT>(priomap_))) {
3877  // Cannot call these in base constructor since terminals not yet constructed
3878  if (innames.size() != numinedges) throw std::logic_error("ttg_parsec::TT: #input names != #input terminals");
3879  if (outnames.size() != numouts) throw std::logic_error("ttg_parsec::TT: #output names != #output terminals");
3880 
3881  auto &world_impl = world.impl();
3882  world_impl.register_op(this);
3883 
3884  if constexpr (numinedges == numins) {
3885  register_input_terminals(input_terminals, innames);
3886  } else {
3887  // create a name for the virtual control input
3888  register_input_terminals(input_terminals, std::array<std::string, 1>{std::string("Virtual Control")});
3889  }
3890  register_output_terminals(output_terminals, outnames);
3891 
3892  register_input_callbacks(std::make_index_sequence<numinedges>{});
3893  int i;
3894 
3895  memset(&self, 0, sizeof(parsec_task_class_t));
3896 
3897  self.name = strdup(get_name().c_str());
3898  self.task_class_id = get_instance_id();
3899  self.nb_parameters = 0;
3900  self.nb_locals = 0;
3901  //self.nb_flows = numflows;
3902  self.nb_flows = MAX_PARAM_COUNT; // we're not using all flows but have to
3903  // trick the device handler into looking at all of them
3904 
3905  if( world_impl.profiling() ) {
3906  // first two ints are used to store the hash of the key.
3907  self.nb_parameters = (sizeof(void*)+sizeof(int)-1)/sizeof(int);
3908  // seconds two ints are used to store a pointer to the key of the task.
3909  self.nb_locals = self.nb_parameters + (sizeof(void*)+sizeof(int)-1)/sizeof(int);
3910 
3911  // If we have parameters and locals, we need to define the corresponding dereference arrays
3912  self.params[0] = &detail::parsec_taskclass_param0;
3913  self.params[1] = &detail::parsec_taskclass_param1;
3914 
3915  self.locals[0] = &detail::parsec_taskclass_param0;
3916  self.locals[1] = &detail::parsec_taskclass_param1;
3917  self.locals[2] = &detail::parsec_taskclass_param2;
3918  self.locals[3] = &detail::parsec_taskclass_param3;
3919  }
3920  self.make_key = make_key;
3921  self.key_functions = &tasks_hash_fcts;
3922  self.task_snprintf = parsec_ttg_task_snprintf;
3923 
3924 #if defined(PARSEC_PROF_TRACE)
3925  self.profile_info = &parsec_ttg_task_info;
3926 #endif
3927 
3928  world_impl.taskpool()->nb_task_classes = std::max(world_impl.taskpool()->nb_task_classes, static_cast<decltype(world_impl.taskpool()->nb_task_classes)>(self.task_class_id+1));
3929  // function_id_to_instance[self.task_class_id] = this;
3930  //self.incarnations = incarnations_array.data();
3931 //#if 0
3932  if constexpr (derived_has_cuda_op()) {
3933  self.incarnations = (__parsec_chore_t *)malloc(3 * sizeof(__parsec_chore_t));
3934  ((__parsec_chore_t *)self.incarnations)[0].type = PARSEC_DEV_CUDA;
3935  ((__parsec_chore_t *)self.incarnations)[0].evaluate = &detail::evaluate_cuda<TT>;
3936  ((__parsec_chore_t *)self.incarnations)[0].hook = &detail::hook_cuda<TT>;
3937  ((__parsec_chore_t *)self.incarnations)[1].type = PARSEC_DEV_NONE;
3938  ((__parsec_chore_t *)self.incarnations)[1].evaluate = NULL;
3939  ((__parsec_chore_t *)self.incarnations)[1].hook = NULL;
3940  } else if constexpr (derived_has_hip_op()) {
3941  self.incarnations = (__parsec_chore_t *)malloc(3 * sizeof(__parsec_chore_t));
3942  ((__parsec_chore_t *)self.incarnations)[0].type = PARSEC_DEV_HIP;
3943  ((__parsec_chore_t *)self.incarnations)[0].evaluate = &detail::evaluate_hip<TT>;
3944  ((__parsec_chore_t *)self.incarnations)[0].hook = &detail::hook_hip<TT>;
3945 
3946  ((__parsec_chore_t *)self.incarnations)[1].type = PARSEC_DEV_NONE;
3947  ((__parsec_chore_t *)self.incarnations)[1].evaluate = NULL;
3948  ((__parsec_chore_t *)self.incarnations)[1].hook = NULL;
3949 #if defined(PARSEC_HAVE_DEV_LEVEL_ZERO_SUPPORT)
3950  } else if constexpr (derived_has_level_zero_op()) {
3951  self.incarnations = (__parsec_chore_t *)malloc(3 * sizeof(__parsec_chore_t));
3952  ((__parsec_chore_t *)self.incarnations)[0].type = PARSEC_DEV_LEVEL_ZERO;
3953  ((__parsec_chore_t *)self.incarnations)[0].evaluate = &detail::evaluate_level_zero<TT>;
3954  ((__parsec_chore_t *)self.incarnations)[0].hook = &detail::hook_level_zero<TT>;
3955 
3956  ((__parsec_chore_t *)self.incarnations)[1].type = PARSEC_DEV_NONE;
3957  ((__parsec_chore_t *)self.incarnations)[1].evaluate = NULL;
3958  ((__parsec_chore_t *)self.incarnations)[1].hook = NULL;
3959 #endif // PARSEC_HAVE_DEV_LEVEL_ZERO_SUPPORT
3960  } else {
3961  self.incarnations = (__parsec_chore_t *)malloc(2 * sizeof(__parsec_chore_t));
3962  ((__parsec_chore_t *)self.incarnations)[0].type = PARSEC_DEV_CPU;
3963  ((__parsec_chore_t *)self.incarnations)[0].evaluate = NULL;
3964  ((__parsec_chore_t *)self.incarnations)[0].hook = &detail::hook<TT>;
3965  ((__parsec_chore_t *)self.incarnations)[1].type = PARSEC_DEV_NONE;
3966  ((__parsec_chore_t *)self.incarnations)[1].evaluate = NULL;
3967  ((__parsec_chore_t *)self.incarnations)[1].hook = NULL;
3968  }
3969 //#endif // 0
3970 
3971  self.release_task = &parsec_release_task_to_mempool_update_nbtasks;
3972  self.complete_execution = complete_task_and_release;
3973 
3974  for (i = 0; i < MAX_PARAM_COUNT; i++) {
3975  parsec_flow_t *flow = new parsec_flow_t;
3976  flow->name = strdup((std::string("flow in") + std::to_string(i)).c_str());
3977  flow->sym_type = PARSEC_SYM_INOUT;
3978  // see initialize_flows below
3979  // flow->flow_flags = PARSEC_FLOW_ACCESS_RW;
3980  flow->dep_in[0] = NULL;
3981  flow->dep_out[0] = NULL;
3982  flow->flow_index = i;
3983  flow->flow_datatype_mask = ~0;
3984  *((parsec_flow_t **)&(self.in[i])) = flow;
3985  }
3986  //*((parsec_flow_t **)&(self.in[i])) = NULL;
3987  //initialize_flows<input_terminals_type>(self.in);
3988 
3989  for (i = 0; i < MAX_PARAM_COUNT; i++) {
3990  parsec_flow_t *flow = new parsec_flow_t;
3991  flow->name = strdup((std::string("flow out") + std::to_string(i)).c_str());
3992  flow->sym_type = PARSEC_SYM_INOUT;
3993  flow->flow_flags = PARSEC_FLOW_ACCESS_READ; // does PaRSEC use this???
3994  flow->dep_in[0] = NULL;
3995  flow->dep_out[0] = NULL;
3996  flow->flow_index = i;
3997  flow->flow_datatype_mask = (1 << i);
3998  *((parsec_flow_t **)&(self.out[i])) = flow;
3999  }
4000  //*((parsec_flow_t **)&(self.out[i])) = NULL;
4001 
4002  self.flags = 0;
4003  self.dependencies_goal = numins; /* (~(uint32_t)0) >> (32 - numins); */
4004 
4005  int nbthreads = 0;
4006  auto *context = world_impl.context();
4007  for (int i = 0; i < context->nb_vp; i++) {
4008  nbthreads += context->virtual_processes[i]->nb_cores;
4009  }
4010 
4011  parsec_mempool_construct(&mempools, PARSEC_OBJ_CLASS(parsec_task_t), sizeof(task_t),
4012  offsetof(parsec_task_t, mempool_owner), nbthreads);
4013 
4014  parsec_hash_table_init(&tasks_table, offsetof(detail::parsec_ttg_task_base_t, tt_ht_item), 8, tasks_hash_fcts,
4015  NULL);
4016 
4017  parsec_hash_table_init(&task_constraint_table, offsetof(detail::parsec_ttg_task_base_t, tt_ht_item), 8, tasks_hash_fcts,
4018  NULL);
4019  }
4020 
4021  template <typename keymapT = ttg::detail::default_keymap<keyT>,
4022  typename priomapT = ttg::detail::default_priomap<keyT>>
4023  TT(const std::string &name, const std::vector<std::string> &innames, const std::vector<std::string> &outnames,
4024  keymapT &&keymap = keymapT(ttg::default_execution_context()), priomapT &&priomap = priomapT())
4025  : TT(name, innames, outnames, ttg::default_execution_context(), std::forward<keymapT>(keymap),
4026  std::forward<priomapT>(priomap)) {}
4027 
4028  template <typename keymapT = ttg::detail::default_keymap<keyT>,
4029  typename priomapT = ttg::detail::default_priomap<keyT>>
4030  TT(const input_edges_type &inedges, const output_edges_type &outedges, const std::string &name,
4031  const std::vector<std::string> &innames, const std::vector<std::string> &outnames, ttg::World world,
4032  keymapT &&keymap_ = keymapT(), priomapT &&priomap = priomapT())
4033  : TT(name, innames, outnames, world, std::forward<keymapT>(keymap_), std::forward<priomapT>(priomap)) {
4034  connect_my_inputs_to_incoming_edge_outputs(std::make_index_sequence<numinedges>{}, inedges);
4035  connect_my_outputs_to_outgoing_edge_inputs(std::make_index_sequence<numouts>{}, outedges);
4036  //DO NOT MOVE THIS - information about the number of pull terminals is only available after connecting the edges.
4037  if constexpr (numinedges > 0) {
4038  register_input_callbacks(std::make_index_sequence<numinedges>{});
4039  }
4040  }
4041  template <typename keymapT = ttg::detail::default_keymap<keyT>,
4042  typename priomapT = ttg::detail::default_priomap<keyT>>
4043  TT(const input_edges_type &inedges, const output_edges_type &outedges, const std::string &name,
4044  const std::vector<std::string> &innames, const std::vector<std::string> &outnames,
4045  keymapT &&keymap = keymapT(ttg::default_execution_context()), priomapT &&priomap = priomapT())
4046  : TT(inedges, outedges, name, innames, outnames, ttg::default_execution_context(),
4047  std::forward<keymapT>(keymap), std::forward<priomapT>(priomap)) {}
4048 
4049  // Destructor checks for unexecuted tasks
4050  virtual ~TT() {
4051  if(nullptr != self.name ) {
4052  free((void*)self.name);
4053  self.name = nullptr;
4054  }
4055 
4056  for (std::size_t i = 0; i < numins; ++i) {
4057  if (inpute_reducers_taskclass[i] != nullptr) {
4058  std::free(inpute_reducers_taskclass[i]);
4059  inpute_reducers_taskclass[i] = nullptr;
4060  }
4061  }
4062  release();
4063  }
4064 
4065  static void ht_iter_cb(void *item, void *cb_data) {
4066  task_t *task = (task_t *)item;
4067  ttT *op = (ttT *)cb_data;
4068  if constexpr (!ttg::meta::is_void_v<keyT>) {
4069  std::cout << "Left over task " << op->get_name() << " " << task->key << std::endl;
4070  } else {
4071  std::cout << "Left over task " << op->get_name() << std::endl;
4072  }
4073  }
4074 
4076  parsec_hash_table_for_all(&tasks_table, ht_iter_cb, this);
4077  }
4078 
4079  virtual void release() override { do_release(); }
4080 
4081  void do_release() {
4082  if (!alive) {
4083  return;
4084  }
4085  alive = false;
4086  /* print all outstanding tasks */
4088  parsec_hash_table_fini(&tasks_table);
4089  parsec_mempool_destruct(&mempools);
4090  // uintptr_t addr = (uintptr_t)self.incarnations;
4091  // free((void *)addr);
4092  free((__parsec_chore_t *)self.incarnations);
4093  for (int i = 0; i < MAX_PARAM_COUNT; i++) {
4094  if (NULL != self.in[i]) {
4095  free(self.in[i]->name);
4096  delete self.in[i];
4097  self.in[i] = nullptr;
4098  }
4099  if (NULL != self.out[i]) {
4100  free(self.out[i]->name);
4101  delete self.out[i];
4102  self.out[i] = nullptr;
4103  }
4104  }
4105  world.impl().deregister_op(this);
4106  }
4107 
4108  static constexpr const ttg::Runtime runtime = ttg::Runtime::PaRSEC;
4109 
4115  template <std::size_t i, typename Reducer>
4116  void set_input_reducer(Reducer &&reducer) {
4117  ttg::trace(world.rank(), ":", get_name(), " : setting reducer for terminal ", i);
4118  std::get<i>(input_reducers) = reducer;
4119 
4120  parsec_task_class_t *tc = inpute_reducers_taskclass[i];
4121  if (nullptr == tc) {
4122  tc = (parsec_task_class_t *)std::calloc(1, sizeof(*tc));
4123  inpute_reducers_taskclass[i] = tc;
4124 
4125  tc->name = strdup((get_name() + std::string(" reducer ") + std::to_string(i)).c_str());
4126  tc->task_class_id = get_instance_id();
4127  tc->nb_parameters = 0;
4128  tc->nb_locals = 0;
4129  tc->nb_flows = numflows;
4130 
4131  auto &world_impl = world.impl();
4132 
4133  if( world_impl.profiling() ) {
4134  // first two ints are used to store the hash of the key.
4135  tc->nb_parameters = (sizeof(void*)+sizeof(int)-1)/sizeof(int);
4136  // seconds two ints are used to store a pointer to the key of the task.
4137  tc->nb_locals = self.nb_parameters + (sizeof(void*)+sizeof(int)-1)/sizeof(int);
4138 
4139  // If we have parameters and locals, we need to define the corresponding dereference arrays
4140  tc->params[0] = &detail::parsec_taskclass_param0;
4141  tc->params[1] = &detail::parsec_taskclass_param1;
4142 
4143  tc->locals[0] = &detail::parsec_taskclass_param0;
4144  tc->locals[1] = &detail::parsec_taskclass_param1;
4145  tc->locals[2] = &detail::parsec_taskclass_param2;
4146  tc->locals[3] = &detail::parsec_taskclass_param3;
4147  }
4148  tc->make_key = make_key;
4149  tc->key_functions = &tasks_hash_fcts;
4150  tc->task_snprintf = parsec_ttg_task_snprintf;
4151 
4152 #if defined(PARSEC_PROF_TRACE)
4153  tc->profile_info = &parsec_ttg_task_info;
4154 #endif
4155 
4156  world_impl.taskpool()->nb_task_classes = std::max(world_impl.taskpool()->nb_task_classes, static_cast<decltype(world_impl.taskpool()->nb_task_classes)>(self.task_class_id+1));
4157 
4158 #if 0
4159  // FIXME: currently only support reduction on the host
4160  if constexpr (derived_has_cuda_op()) {
4161  self.incarnations = (__parsec_chore_t *)malloc(3 * sizeof(__parsec_chore_t));
4162  ((__parsec_chore_t *)self.incarnations)[0].type = PARSEC_DEV_CUDA;
4163  ((__parsec_chore_t *)self.incarnations)[0].evaluate = NULL;
4164  ((__parsec_chore_t *)self.incarnations)[0].hook = detail::hook_cuda;
4165  ((__parsec_chore_t *)self.incarnations)[1].type = PARSEC_DEV_CPU;
4166  ((__parsec_chore_t *)self.incarnations)[1].evaluate = NULL;
4167  ((__parsec_chore_t *)self.incarnations)[1].hook = detail::hook;
4168  ((__parsec_chore_t *)self.incarnations)[2].type = PARSEC_DEV_NONE;
4169  ((__parsec_chore_t *)self.incarnations)[2].evaluate = NULL;
4170  ((__parsec_chore_t *)self.incarnations)[2].hook = NULL;
4171  } else
4172 #endif // 0
4173  {
4174  tc->incarnations = (__parsec_chore_t *)malloc(2 * sizeof(__parsec_chore_t));
4175  ((__parsec_chore_t *)tc->incarnations)[0].type = PARSEC_DEV_CPU;
4176  ((__parsec_chore_t *)tc->incarnations)[0].evaluate = NULL;
4177  ((__parsec_chore_t *)tc->incarnations)[0].hook = &static_reducer_op<i>;
4178  ((__parsec_chore_t *)tc->incarnations)[1].type = PARSEC_DEV_NONE;
4179  ((__parsec_chore_t *)tc->incarnations)[1].evaluate = NULL;
4180  ((__parsec_chore_t *)tc->incarnations)[1].hook = NULL;
4181  }
4182 
4183  /* the reduction task does not alter the termination detection because the target task will execute */
4184  tc->release_task = &parsec_release_task_to_mempool;
4185  tc->complete_execution = NULL;
4186  }
4187  }
4188 
4196  template <std::size_t i, typename Reducer>
4197  void set_input_reducer(Reducer &&reducer, std::size_t size) {
4198  set_input_reducer<i>(std::forward<Reducer>(reducer));
4199  set_static_argstream_size<i>(size);
4200  }
4201 
4202  // Returns reference to input terminal i to facilitate connection --- terminal
4203  // cannot be copied, moved or assigned
4204  template <std::size_t i>
4205  std::tuple_element_t<i, input_terminals_type> *in() {
4206  return &std::get<i>(input_terminals);
4207  }
4208 
4209  // Returns reference to output terminal for purpose of connection --- terminal
4210  // cannot be copied, moved or assigned
4211  template <std::size_t i>
4212  std::tuple_element_t<i, output_terminalsT> *out() {
4213  return &std::get<i>(output_terminals);
4214  }
4215 
4216  // Manual injection of a task with all input arguments specified as a tuple
4217  template <typename Key = keyT>
4218  std::enable_if_t<!ttg::meta::is_void_v<Key> && !ttg::meta::is_empty_tuple_v<input_values_tuple_type>, void> invoke(
4219  const Key &key, const input_values_tuple_type &args) {
4221  if constexpr(!std::is_same_v<Key, key_type>) {
4222  key_type k = key; /* cast that type into the key type we know */
4223  invoke(k, args);
4224  } else {
4225  /* trigger non-void inputs */
4226  set_args(ttg::meta::nonvoid_index_seq<actual_input_tuple_type>{}, key, args);
4227  /* trigger void inputs */
4228  using void_index_seq = ttg::meta::void_index_seq<actual_input_tuple_type>;
4229  set_args(void_index_seq{}, key, ttg::detail::make_void_tuple<void_index_seq::size()>());
4230  }
4231  }
4232 
4233  // Manual injection of a key-free task and all input arguments specified as a tuple
4234  template <typename Key = keyT>
4235  std::enable_if_t<ttg::meta::is_void_v<Key> && !ttg::meta::is_empty_tuple_v<input_values_tuple_type>, void> invoke(
4236  const input_values_tuple_type &args) {
4238  /* trigger non-void inputs */
4239  set_args(ttg::meta::nonvoid_index_seq<actual_input_tuple_type>{}, args);
4240  /* trigger void inputs */
4241  using void_index_seq = ttg::meta::void_index_seq<actual_input_tuple_type>;
4242  set_args(void_index_seq{}, ttg::detail::make_void_tuple<void_index_seq::size()>());
4243  }
4244 
4245  // Manual injection of a task that has no arguments
4246  template <typename Key = keyT>
4247  std::enable_if_t<!ttg::meta::is_void_v<Key> && ttg::meta::is_empty_tuple_v<input_values_tuple_type>, void> invoke(
4248  const Key &key) {
4250 
4251  if constexpr(!std::is_same_v<Key, key_type>) {
4252  key_type k = key; /* cast that type into the key type we know */
4253  invoke(k);
4254  } else {
4255  /* trigger void inputs */
4256  using void_index_seq = ttg::meta::void_index_seq<actual_input_tuple_type>;
4257  set_args(void_index_seq{}, key, ttg::detail::make_void_tuple<void_index_seq::size()>());
4258  }
4259  }
4260 
4261  // Manual injection of a task that has no key or arguments
4262  template <typename Key = keyT>
4263  std::enable_if_t<ttg::meta::is_void_v<Key> && ttg::meta::is_empty_tuple_v<input_values_tuple_type>, void> invoke() {
4265  /* trigger void inputs */
4266  using void_index_seq = ttg::meta::void_index_seq<actual_input_tuple_type>;
4267  set_args(void_index_seq{}, ttg::detail::make_void_tuple<void_index_seq::size()>());
4268  }
4269 
4270  // overrides TTBase::invoke()
4271  void invoke() override {
4272  if constexpr (ttg::meta::is_void_v<keyT> && ttg::meta::is_empty_tuple_v<input_values_tuple_type>)
4273  invoke<keyT>();
4274  else
4275  TTBase::invoke();
4276  }
4277 
4278  private:
4279  template<typename Key, typename Arg, typename... Args, std::size_t I, std::size_t... Is>
4280  void invoke_arglist(std::index_sequence<I, Is...>, const Key& key, Arg&& arg, Args&&... args) {
4281  using arg_type = std::decay_t<Arg>;
4282  if constexpr (ttg::meta::is_ptr_v<arg_type>) {
4283  /* add a reference to the object */
4284  auto copy = ttg_parsec::detail::get_copy(arg);
4285  copy->add_ref();
4286  /* reset readers so that the value can flow without copying */
4287  copy->reset_readers();
4288  auto& val = *arg;
4289  set_arg_impl<I>(key, val, copy);
4291  if constexpr (std::is_rvalue_reference_v<Arg>) {
4292  /* if the ptr was moved in we reset it */
4293  arg.reset();
4294  }
4295  } else if constexpr (!ttg::meta::is_ptr_v<arg_type>) {
4296  set_arg<I>(key, std::forward<Arg>(arg));
4297  }
4298  if constexpr (sizeof...(Is) > 0) {
4299  /* recursive next argument */
4300  invoke_arglist(std::index_sequence<Is...>{}, key, std::forward<Args>(args)...);
4301  }
4302  }
4303 
4304  public:
4305  // Manual injection of a task with all input arguments specified as variadic arguments
4306  template <typename Key = keyT, typename Arg, typename... Args>
4307  std::enable_if_t<!ttg::meta::is_void_v<Key> && !ttg::meta::is_empty_tuple_v<input_values_tuple_type>, void> invoke(
4308  const Key &key, Arg&& arg, Args&&... args) {
4309  static_assert(sizeof...(Args)+1 == std::tuple_size_v<actual_input_tuple_type>,
4310  "Number of arguments to invoke must match the number of task inputs.");
4312  /* trigger non-void inputs */
4313  invoke_arglist(ttg::meta::nonvoid_index_seq<actual_input_tuple_type>{}, key,
4314  std::forward<Arg>(arg), std::forward<Args>(args)...);
4315  //set_args(ttg::meta::nonvoid_index_seq<actual_input_tuple_type>{}, key, args);
4316  /* trigger void inputs */
4317  using void_index_seq = ttg::meta::void_index_seq<actual_input_tuple_type>;
4318  set_args(void_index_seq{}, key, ttg::detail::make_void_tuple<void_index_seq::size()>());
4319  }
4320 
4321  void set_defer_writer(bool value) {
4322  m_defer_writer = value;
4323  }
4324 
4325  bool get_defer_writer(bool value) {
4326  return m_defer_writer;
4327  }
4328 
4329  public:
4330  void make_executable() override {
4331  world.impl().register_tt_profiling(this);
4334  }
4335 
4338  const decltype(keymap) &get_keymap() const { return keymap; }
4339 
4341  template <typename Keymap>
4342  void set_keymap(Keymap &&km) {
4343  keymap = km;
4344  }
4345 
4348  const decltype(priomap) &get_priomap() const { return priomap; }
4349 
4352  template <typename Priomap>
4353  void set_priomap(Priomap &&pm) {
4354  priomap = std::forward<Priomap>(pm);
4355  }
4356 
4362  template<typename Devicemap>
4363  void set_devicemap(Devicemap&& dm) {
4364  static_assert(derived_has_device_op(), "Device map only allowed on device-enabled TT!");
4365  if constexpr (std::is_same_v<ttg::device::Device, decltype(dm(std::declval<keyT>()))>) {
4366  // dm returns a Device
4367  devicemap = std::forward<Devicemap>(dm);
4368  } else {
4369  // convert dm return into a Device
4370  devicemap = [=](const keyT& key) {
4371  if constexpr (derived_has_cuda_op()) {
4373  } else if constexpr (derived_has_hip_op()) {
4375  } else if constexpr (derived_has_level_zero_op()) {
4377  } else {
4378  throw std::runtime_error("Unknown device type!");
4379  }
4380  };
4381  }
4382  }
4383 
4386  auto get_devicemap() { return devicemap; }
4387 
4390  template<typename Constraint>
4391  void add_constraint(std::shared_ptr<Constraint> c) {
4392  std::size_t cid = constraints_check.size();
4393  if constexpr(ttg::meta::is_void_v<keyT>) {
4394  c->add_listener([this, cid](){ this->release_constraint(cid); }, this);
4395  constraints_check.push_back([c, this](){ return c->check(this); });
4396  constraints_complete.push_back([c, this](const keyT& key){ c->complete(this); return true; });
4397  } else {
4398  c->add_listener([this, cid](const std::span<keyT>& keys){ this->release_constraint(cid, keys); }, this);
4399  constraints_check.push_back([c, this](const keyT& key){ return c->check(key, this); });
4400  constraints_complete.push_back([c, this](const keyT& key){ c->complete(key, this); return true; });
4401  }
4402  }
4403 
4406  template<typename Constraint>
4407  void add_constraint(Constraint&& c) {
4408  // need to make this a shared_ptr since it's shared between different callbacks
4409  this->add_constraint(std::make_shared<Constraint>(std::forward<Constraint>(c)));
4410  }
4411 
4415  template<typename Constraint, typename Mapper>
4416  void add_constraint(std::shared_ptr<Constraint> c, Mapper&& map) {
4417  static_assert(std::is_same_v<typename Constraint::key_type, keyT>);
4418  std::size_t cid = constraints_check.size();
4419  if constexpr(ttg::meta::is_void_v<keyT>) {
4420  c->add_listener([this, cid](){ this->release_constraint(cid); }, this);
4421  constraints_check.push_back([map, c, this](){ return c->check(map(), this); });
4422  constraints_complete.push_back([map, c, this](){ c->complete(map(), this); return true; });
4423  } else {
4424  c->add_listener([this, cid](const std::span<keyT>& keys){ this->release_constraint(cid, keys); }, this);
4425  constraints_check.push_back([map, c, this](const keyT& key){ return c->check(key, map(key), this); });
4426  constraints_complete.push_back([map, c, this](const keyT& key){ c->complete(key, map(key), this); return true; });
4427  }
4428  }
4429 
4433  template<typename Constraint, typename Mapper>
4434  void add_constraint(Constraint c, Mapper&& map) {
4435  // need to make this a shared_ptr since it's shared between different callbacks
4436  this->add_constraint(std::make_shared<Constraint>(std::forward<Constraint>(c)), std::forward<Mapper>(map));
4437  }
4438 
4439  // Register the static_op function to associate it to instance_id
4441  int rank;
4442  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
4443  ttg::trace("ttg_parsec(", rank, ") Inserting into static_id_to_op_map at ", get_instance_id());
4444  static_set_arg_fct_call_t call = std::make_pair(&TT::static_set_arg, this);
4445  auto &world_impl = world.impl();
4446  static_map_mutex.lock();
4447  static_id_to_op_map.insert(std::make_pair(get_instance_id(), call));
4448  if (delayed_unpack_actions.count(get_instance_id()) > 0) {
4449  auto tp = world_impl.taskpool();
4450 
4451  ttg::trace("ttg_parsec(", rank, ") There are ", delayed_unpack_actions.count(get_instance_id()),
4452  " messages delayed with op_id ", get_instance_id());
4453 
4454  auto se = delayed_unpack_actions.equal_range(get_instance_id());
4455  std::vector<static_set_arg_fct_arg_t> tmp;
4456  for (auto it = se.first; it != se.second;) {
4457  assert(it->first == get_instance_id());
4458  tmp.push_back(it->second);
4459  it = delayed_unpack_actions.erase(it);
4460  }
4461  static_map_mutex.unlock();
4462 
4463  for (auto it : tmp) {
4464  if(ttg::tracing())
4465  ttg::print("ttg_parsec(", rank, ") Unpacking delayed message (", ", ", get_instance_id(), ", ",
4466  std::get<1>(it), ", ", std::get<2>(it), ")");
4467  int rc = detail::static_unpack_msg(&parsec_ce, world_impl.parsec_ttg_tag(), std::get<1>(it), std::get<2>(it),
4468  std::get<0>(it), NULL);
4469  assert(rc == 0);
4470  free(std::get<1>(it));
4471  }
4472 
4473  tmp.clear();
4474  } else {
4475  static_map_mutex.unlock();
4476  }
4477  }
4478  };
4479 
4480 #include "ttg/make_tt.h"
4481 
4482 } // namespace ttg_parsec
4483 
4489 template <>
4491  private:
4492  ttg_parsec::detail::ttg_data_copy_t *copy_to_remove = nullptr;
4493  bool do_release = true;
4494 
4495  public:
4496  value_copy_handler() = default;
4499  : copy_to_remove(h.copy_to_remove)
4500  {
4501  h.copy_to_remove = nullptr;
4502  }
4503 
4506  {
4507  std::swap(copy_to_remove, h.copy_to_remove);
4508  return *this;
4509  }
4510 
4512  if (nullptr != copy_to_remove) {
4514  if (do_release) {
4515  ttg_parsec::detail::release_data_copy(copy_to_remove);
4516  }
4517  }
4518  }
4519 
4520  template <typename Value>
4521  inline std::conditional_t<std::is_reference_v<Value>,Value,Value&&> operator()(Value &&value) {
4522  constexpr auto value_is_rvref = std::is_rvalue_reference_v<decltype(value)>;
4523  using value_type = std::remove_reference_t<Value>;
4524  static_assert(value_is_rvref ||
4525  std::is_copy_constructible_v<std::decay_t<Value>>,
4526  "Data sent without being moved must be copy-constructible!");
4527 
4529  if (nullptr == caller) {
4530  throw std::runtime_error("ERROR: ttg::send or ttg::broadcast called outside of a task!");
4531  }
4532 
4534  copy = ttg_parsec::detail::find_copy_in_task(caller, &value);
4535  value_type *value_ptr = &value;
4536  if (nullptr == copy) {
4541  copy = ttg_parsec::detail::create_new_datacopy(std::forward<Value>(value));
4542  bool inserted = ttg_parsec::detail::add_copy_to_task(copy, caller);
4543  assert(inserted);
4544  value_ptr = reinterpret_cast<value_type *>(copy->get_ptr());
4545  copy_to_remove = copy;
4546  } else {
4547  if constexpr (value_is_rvref) {
4548  /* this copy won't be modified anymore so mark it as read-only */
4549  copy->reset_readers();
4550  }
4551  }
4552  /* We're coming from a writer so mark the data as modified.
4553  * That way we can force a pushout in prepare_send if we move to read-only tasks (needed by PaRSEC). */
4555  if constexpr (value_is_rvref)
4556  return std::move(*value_ptr);
4557  else
4558  return *value_ptr;
4559  }
4560 
4561  template<typename Value>
4562  inline std::add_lvalue_reference_t<Value> operator()(ttg_parsec::detail::persistent_value_ref<Value> vref) {
4564  if (nullptr == caller) {
4565  throw std::runtime_error("ERROR: ttg::send or ttg::broadcast called outside of a task!");
4566  }
4568  copy = ttg_parsec::detail::find_copy_in_task(caller, &vref.value_ref);
4569  if (nullptr == copy) {
4570  // no need to create a new copy since it's derived from the copy already
4571  copy = const_cast<ttg_parsec::detail::ttg_data_copy_t *>(static_cast<const ttg_parsec::detail::ttg_data_copy_t *>(&vref.value_ref));
4572  bool inserted = ttg_parsec::detail::add_copy_to_task(copy, caller);
4573  assert(inserted);
4574  copy_to_remove = copy; // we want to remove the copy from the task once done sending
4575  do_release = true; // we don't release the copy since we didn't allocate it
4576  copy->add_ref(); // add a reference so that TTG does not attempt to delete this object
4577  copy->add_ref(); // add another reference so that TTG never attempts to free this copy
4578  if (copy->num_readers() == 0) {
4579  /* add at least one reader (the current task) */
4580  copy->increment_readers<false>();
4581  }
4582  }
4583  return vref.value_ref;
4584  }
4585 
4586  template <typename Value>
4587  inline const Value &operator()(const Value &value) {
4589  if (nullptr == caller) {
4590  throw std::runtime_error("ERROR: ttg::send or ttg::broadcast called outside of a task!");
4591  }
4593  copy = ttg_parsec::detail::find_copy_in_task(caller, &value);
4594  const Value *value_ptr = &value;
4595  if (nullptr == copy) {
4601  bool inserted = ttg_parsec::detail::add_copy_to_task(copy, caller);
4602  assert(inserted);
4603  value_ptr = reinterpret_cast<Value *>(copy->get_ptr());
4604  copy_to_remove = copy;
4605  }
4607  return *value_ptr;
4608  }
4609 
4610 };
4611 
4612 #endif // PARSEC_TTG_H_INCLUDED
4613 // clang-format on
#define TTG_OP_ASSERT_EXECUTABLE()
Definition: tt.h:276
Edge is used to connect In and Out terminals.
Definition: edge.h:25
A base class for all template tasks.
Definition: tt.h:30
void trace(const T &t, const Ts &...ts)
Like ttg::trace(), but only produces tracing output if this->tracing()==true
Definition: tt.h:186
auto get_instance_id() const
Definition: tt.h:258
virtual void make_executable()=0
Marks this executable.
Definition: tt.h:286
bool tracing() const
Definition: tt.h:177
const std::string & get_name() const
Gets the name of this operation.
Definition: tt.h:217
TTBase(TTBase &&other)
Definition: tt.h:115
void register_input_terminals(terminalsT &terms, const namesT &names)
Definition: tt.h:84
bool is_lazy_pull()
Definition: tt.h:199
const TTBase * ttg_ptr() const
Definition: tt.h:205
void register_output_terminals(terminalsT &terms, const namesT &names)
Definition: tt.h:91
A complete version of void.
Definition: void.h:11
WorldImplT & impl(void)
Definition: world.h:216
int rank() const
Definition: world.h:204
Base class for implementation-specific Worlds.
Definition: world.h:33
void release_ops(void)
Definition: world.h:54
WorldImplBase(int size, int rank)
Definition: world.h:61
bool is_valid(void) const
Definition: world.h:154
Represents a device in a specific execution space.
Definition: device.h:14
std::enable_if_t<!ttg::meta::is_void_v< Key >, void > release_constraint(std::size_t cid, const std::span< Key > &keys)
Definition: ttg.h:2617
uint64_t pack(T &obj, void *bytes, uint64_t pos, detail::ttg_data_copy_t *copy=nullptr)
Definition: ttg.h:1927
std::enable_if_t<!ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > set_arg_local(const Key &key, const Value &value)
Definition: ttg.h:2301
std::enable_if_t<!ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > prepare_send(const ttg::span< const Key > &keylist, const Value &value)
Definition: ttg.h:3552
void finalize_argstream_from_msg(void *data, std::size_t size)
Definition: ttg.h:2233
ttg::meta::add_glvalue_reference_tuple_t< ttg::meta::void_to_Void_tuple_t< actual_input_tuple_type > > input_refs_full_tuple_type
Definition: ttg.h:1259
std::tuple_element_t< i, input_terminals_type > * in()
Definition: ttg.h:4205
static constexpr bool derived_has_hip_op()
Definition: ttg.h:1228
void set_keymap(Keymap &&km)
keymap setter
Definition: ttg.h:4342
decltype(keymap) const & get_keymap() const
Definition: ttg.h:4338
std::enable_if_t<!ttg::meta::is_void_v< Key > &&!ttg::meta::is_empty_tuple_v< input_values_tuple_type >, void > invoke(const Key &key, Arg &&arg, Args &&... args)
Definition: ttg.h:4307
void print_incomplete_tasks()
Definition: ttg.h:4075
void add_constraint(std::shared_ptr< Constraint > c)
Definition: ttg.h:4391
void set_arg_from_msg(void *data, std::size_t size)
Definition: ttg.h:2056
std::enable_if_t<!ttg::meta::is_void_v< Key >, void > set_arg(const Key &key)
Definition: ttg.h:2724
void add_constraint(Constraint &&c)
Definition: ttg.h:4407
std::enable_if_t<!ttg::meta::is_void_v< Key >, void > finalize_argstream(const Key &key)
finalizes stream for input i
Definition: ttg.h:3319
virtual ~TT()
Definition: ttg.h:4050
std::enable_if_t<!ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > set_arg(const Key &key, Value &&value)
Definition: ttg.h:2706
std::enable_if_t< ttg::meta::is_void_v< Key >, void > set_args(std::index_sequence< Is... > is, const std::tuple< Ts... > &args)
Definition: ttg.h:3172
keyT key_type
Definition: ttg.h:1251
static constexpr bool derived_has_level_zero_op()
Definition: ttg.h:1237
parsec_thread_mempool_t * get_task_mempool(void)
Definition: ttg.h:1987
void add_constraint(std::shared_ptr< Constraint > c, Mapper &&map)
Definition: ttg.h:4416
TT(const input_edges_type &inedges, const output_edges_type &outedges, const std::string &name, const std::vector< std::string > &innames, const std::vector< std::string > &outnames, ttg::World world, keymapT &&keymap_=keymapT(), priomapT &&priomap=priomapT())
Definition: ttg.h:4030
std::enable_if_t<!ttg::meta::is_void_v< Key > &&ttg::meta::is_empty_tuple_v< input_values_tuple_type >, void > invoke(const Key &key)
Definition: ttg.h:4247
std::enable_if_t<!std::is_void_v< std::decay_t< Value > >, void > do_prepare_send(const Value &value, RemoteCheckFn &&remote_check)
Definition: ttg.h:3455
typename ttg::terminals_to_edges< output_terminalsT >::type output_edges_type
Definition: ttg.h:1268
std::enable_if_t< ttg::meta::is_void_v< Key >, void > release_constraint(std::size_t cid)
Definition: ttg.h:2593
void add_constraint(Constraint c, Mapper &&map)
Definition: ttg.h:4434
void set_arg_impl(const Key &key, Value &&value, detail::ttg_data_copy_t *copy_in=nullptr)
Definition: ttg.h:2759
auto get_devicemap()
Definition: ttg.h:4386
std::enable_if_t<!ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > broadcast_arg(const ttg::span< const Key > &keylist, const Value &value)
Definition: ttg.h:2962
void invoke() override
Definition: ttg.h:4271
bool check_constraints(task_t *task)
Definition: ttg.h:2576
task_t * create_new_task(const Key &key)
Definition: ttg.h:2319
void do_release()
Definition: ttg.h:4081
uint64_t unpack(T &obj, void *_bytes, uint64_t pos)
Definition: ttg.h:1914
TT(const input_edges_type &inedges, const output_edges_type &outedges, const std::string &name, const std::vector< std::string > &innames, const std::vector< std::string > &outnames, keymapT &&keymap=keymapT(ttg::default_execution_context()), priomapT &&priomap=priomapT())
Definition: ttg.h:4043
bool get_defer_writer(bool value)
Definition: ttg.h:4325
void set_arg_from_msg_keylist(ttg::span< keyT > &&keylist, detail::ttg_data_copy_t *copy)
Definition: ttg.h:1995
static void ht_iter_cb(void *item, void *cb_data)
Definition: ttg.h:4065
const auto & get_output_terminals() const
Definition: ttg.h:1292
ttg::meta::drop_void_t< ttg::meta::add_glvalue_reference_tuple_t< input_tuple_type > > input_refs_tuple_type
Definition: ttg.h:1261
std::enable_if_t<!ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > set_arg_local(const Key &key, Value &&value)
Definition: ttg.h:2289
std::enable_if_t<!ttg::meta::is_void_v< Key > &&!ttg::meta::is_empty_tuple_v< input_values_tuple_type >, void > invoke(const Key &key, const input_values_tuple_type &args)
Definition: ttg.h:4218
std::enable_if_t< ttg::meta::is_none_void_v< Key >, void > set_args(std::index_sequence< Is... > is, const Key &key, const std::tuple< Ts... > &args)
Definition: ttg.h:3152
ttg::detail::input_terminals_tuple_t< keyT, input_tuple_type > input_terminals_type
Definition: ttg.h:1252
static void static_set_arg(void *data, std::size_t size, ttg::TTBase *bop)
Definition: ttg.h:1942
void set_input_reducer(Reducer &&reducer, std::size_t size)
Definition: ttg.h:4197
output_terminalsT output_terminals_type
Definition: ttg.h:1267
detail::reducer_task_t * create_new_reducer_task(task_t *task, bool is_first)
Definition: ttg.h:2346
std::enable_if_t< ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > set_arg_local(const Value &value)
Definition: ttg.h:2307
std::enable_if_t< ttg::meta::is_void_v< Key > &&!ttg::meta::is_empty_tuple_v< input_values_tuple_type >, void > invoke(const input_values_tuple_type &args)
Definition: ttg.h:4235
void set_input_reducer(Reducer &&reducer)
Definition: ttg.h:4116
decltype(priomap) const & get_priomap() const
Definition: ttg.h:4348
ttg::detail::edges_tuple_t< keyT, ttg::meta::decayed_typelist_t< input_tuple_type > > input_edges_type
Definition: ttg.h:1254
std::enable_if_t< ttg::meta::is_none_void_v< Key >, void > set_args(std::index_sequence< Is... >, std::index_sequence< Js... >, const Key &key, const std::tuple< Ts... > &args)
Definition: ttg.h:3140
bool can_inline_data(Value *value_ptr, detail::ttg_data_copy_t *copy, const Key &key, std::size_t num_keys)
Definition: ttg.h:2729
void copy_mark_pushout(detail::ttg_data_copy_t *copy)
Definition: ttg.h:3418
void get_from_pull_msg(void *data, std::size_t size)
Definition: ttg.h:2275
static constexpr int numinvals
Definition: ttg.h:1263
std::enable_if_t< ttg::meta::is_void_v< Key >, void > set_arg()
Definition: ttg.h:2718
TT(const std::string &name, const std::vector< std::string > &innames, const std::vector< std::string > &outnames, ttg::World world, keymapT &&keymap_=keymapT(), priomapT &&priomap_=priomapT())
Definition: ttg.h:3868
ttg::World get_world() const override final
Definition: ttg.h:1351
void set_defer_writer(bool value)
Definition: ttg.h:4321
void make_executable() override
Marks this executable.
Definition: ttg.h:4330
std::enable_if_t< ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > set_arg_local(std::shared_ptr< const Value > &valueptr)
Definition: ttg.h:2313
void release_task(task_t *task, parsec_task_t **task_ring=nullptr)
Definition: ttg.h:2653
virtual void release() override
Definition: ttg.h:4079
void set_devicemap(Devicemap &&dm)
Definition: ttg.h:4363
std::enable_if_t< key_is_void, void > finalize_argstream()
finalizes stream for input i
Definition: ttg.h:3371
std::enable_if_t<!ttg::meta::is_void_v< Key >, void > set_argstream_size(const Key &key, std::size_t size)
Definition: ttg.h:3200
std::enable_if_t< ttg::meta::is_void_v< Key >, void > set_argstream_size(std::size_t size)
Definition: ttg.h:3261
void register_static_op_function(void)
Definition: ttg.h:4440
static resultT get(InTuple &&intuple)
Definition: ttg.h:1271
static auto & get(InTuple &&intuple)
Definition: ttg.h:1275
void broadcast_arg_local(Iterator &&begin, Iterator &&end, const Value &value)
Definition: ttg.h:2932
actual_input_tuple_type input_args_type
Definition: ttg.h:1253
void set_priomap(Priomap &&pm)
Definition: ttg.h:4353
std::enable_if_t< ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > prepare_send(const Value &value)
Definition: ttg.h:3566
std::enable_if_t< ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > set_arg_local(Value &&value)
Definition: ttg.h:2295
std::enable_if_t< ttg::meta::is_void_v< Key > &&ttg::meta::is_empty_tuple_v< input_values_tuple_type >, void > invoke()
Definition: ttg.h:4263
ttg::meta::drop_void_t< ttg::meta::decayed_typelist_t< input_tuple_type > > input_values_tuple_type
Definition: ttg.h:1260
std::tuple_element_t< i, output_terminalsT > * out()
Definition: ttg.h:4212
void argstream_set_size_from_msg(void *data, std::size_t size)
Definition: ttg.h:2252
TT(const std::string &name, const std::vector< std::string > &innames, const std::vector< std::string > &outnames, keymapT &&keymap=keymapT(ttg::default_execution_context()), priomapT &&priomap=priomapT())
Definition: ttg.h:4023
ttg::meta::void_to_Void_tuple_t< ttg::meta::decayed_typelist_t< actual_input_tuple_type > > input_values_full_tuple_type
Definition: ttg.h:1257
void set_static_argstream_size(std::size_t size)
Definition: ttg.h:3181
std::enable_if_t< ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > set_arg(Value &&value)
Definition: ttg.h:2713
void set_arg_local_impl(const Key &key, Value &&value, detail::ttg_data_copy_t *copy_in=nullptr, parsec_task_t **task_ring=nullptr)
Definition: ttg.h:2373
static constexpr bool derived_has_device_op()
Definition: ttg.h:1246
static constexpr const ttg::Runtime runtime
Definition: ttg.h:4108
static constexpr bool derived_has_cuda_op()
Definition: ttg.h:1219
std::enable_if_t< ttg::meta::is_void_v< Key >, void > set_args(std::index_sequence< Is... >, std::index_sequence< Js... >, const std::tuple< Ts... > &args)
Definition: ttg.h:3161
void increment_created()
Definition: ttg.h:458
virtual void execute() override
Definition: ttg.h:399
static constexpr int parsec_ttg_rma_tag()
Definition: ttg.h:395
void decrement_inflight_msg()
Definition: ttg.h:461
WorldImpl & operator=(const WorldImpl &other)=delete
void destroy_tpool()
Definition: ttg.h:410
const ttg::Edge & ctl_edge() const
Definition: ttg.h:456
void increment_inflight_msg()
Definition: ttg.h:460
WorldImpl(const WorldImpl &other)=delete
void register_tt_profiling(const TT< keyT, output_terminalsT, derivedT, input_valueTs > *t)
Definition: ttg.h:518
virtual void profile_off() override
Definition: ttg.h:490
void create_tpool()
Definition: ttg.h:337
WorldImpl(int *argc, char **argv[], int ncores, parsec_context_t *c=nullptr)
Definition: ttg.h:275
ttg::Edge & ctl_edge()
Definition: ttg.h:454
MPI_Comm comm() const
Definition: ttg.h:397
bool mpi_support(ttg::ExecutionSpace space)
Definition: ttg.h:504
virtual bool profiling() override
Definition: ttg.h:502
virtual void dag_off() override
Definition: ttg.h:481
virtual void fence_impl(void) override
Definition: ttg.h:557
virtual void dag_on(const std::string &filename) override
Definition: ttg.h:465
static constexpr int parsec_ttg_tag()
Definition: ttg.h:394
virtual void final_task() override
Definition: ttg.h:508
auto * taskpool()
Definition: ttg.h:335
virtual void destroy() override
Definition: ttg.h:423
virtual void profile_on() override
Definition: ttg.h:496
WorldImpl(WorldImpl &&other)=delete
bool dag_profiling() override
Definition: ttg.h:463
auto * execution_stream()
Definition: ttg.h:334
auto * context()
Definition: ttg.h:333
WorldImpl & operator=(WorldImpl &&other)=delete
rma_delayed_activate(std::vector< KeyT > &&key, detail::ttg_data_copy_t *copy, int num_transfers, ActivationCallbackT cb)
Definition: ttg.h:845
constexpr auto data(C &c) -> decltype(c.data())
Definition: span.h:189
typename make_index_sequence_t< I... >::type make_index_sequence
std::integral_constant< bool,(Flags &const_) !=0 > is_const
void set_default_world(WorldT &world)
Definition: world.h:29
void deregister_world(ttg::base::WorldImplBase &world)
bool force_device_comm()
Definition: env.cpp:33
typename input_terminals_tuple< keyT, valuesT... >::type input_terminals_tuple_t
Definition: terminal.h:354
void register_world(ttg::base::WorldImplBase &world)
int num_threads()
Determine the number of compute threads to use by TTG when not given to ttg::initialize
Definition: env.cpp:15
typename edges_tuple< keyT, valuesT >::type edges_tuple_t
Definition: edge.h:191
void set_current(int device, Stream stream)
Definition: device.h:124
void reset_current()
Definition: device.h:119
typename typelist_to_tuple< T >::type typelist_to_tuple_t
Definition: typelist.h:52
ttg_data_copy_t * find_copy_in_task(parsec_ttg_task_base_t *task, const void *ptr)
Definition: ttg.h:634
ttg_parsec::detail::ttg_data_copy_t * get_copy(ttg_parsec::Ptr< T > &p)
Definition: ptr.h:277
bool & initialized_mpi()
Definition: ttg.h:228
parsec_hook_return_t evaluate_level_zero(const parsec_task_t *parsec_task)
Definition: ttg.h:827
bool all_devices_peer_access
Definition: ttg.h:233
ttg::device::Device parsec_device_to_ttg_device(int parsec_id)
Definition: device.h:30
parsec_hook_return_t hook_level_zero(struct parsec_execution_stream_s *es, parsec_task_t *parsec_task)
Definition: ttg.h:795
int first_device_id
Definition: device.h:12
std::size_t max_inline_size
Definition: ttg.h:194
int find_index_of_copy_in_task(parsec_ttg_task_base_t *task, const void *ptr)
Definition: ttg.h:649
int ttg_device_to_parsec_device(const ttg::device::Device &device)
Definition: device.h:18
const parsec_symbol_t parsec_taskclass_param1
Definition: ttg.h:609
parsec_hook_return_t evaluate_cuda(const parsec_task_t *parsec_task)
Definition: ttg.h:807
bool add_copy_to_task(ttg_data_copy_t *copy, parsec_ttg_task_base_t *task)
Definition: ttg.h:663
constexpr const int PARSEC_TTG_MAX_AM_SIZE
Definition: ttg.h:176
void remove_data_copy(ttg_data_copy_t *copy, parsec_ttg_task_base_t *task)
Definition: ttg.h:677
parsec_hook_return_t hook(struct parsec_execution_stream_s *es, parsec_task_t *parsec_task)
Definition: ttg.h:764
ttg_data_copy_t * register_data_copy(ttg_data_copy_t *copy_in, parsec_ttg_task_base_t *task, bool readonly)
Definition: ttg.h:942
parsec_hook_return_t evaluate_hip(const parsec_task_t *parsec_task)
Definition: ttg.h:817
const parsec_symbol_t parsec_taskclass_param2
Definition: ttg.h:617
parsec_hook_return_t hook_hip(struct parsec_execution_stream_s *es, parsec_task_t *parsec_task)
Definition: ttg.h:784
ttg_data_copy_t * create_new_datacopy(Value &&value)
Definition: ttg.h:705
parsec_hook_return_t hook_cuda(struct parsec_execution_stream_s *es, parsec_task_t *parsec_task)
Definition: ttg.h:773
thread_local parsec_ttg_task_base_t * parsec_ttg_caller
Definition: thread_local.h:12
void transfer_ownership(parsec_ttg_task_t< TT > *me, int device, std::index_sequence< Is... >)
Definition: ttg.h:757
void release_data_copy(ttg_data_copy_t *copy)
Definition: ttg.h:889
void transfer_ownership_impl(ttg_data_copy_t *copy, int device)
Definition: ttg.h:749
const parsec_symbol_t parsec_taskclass_param3
Definition: ttg.h:625
const parsec_symbol_t parsec_taskclass_param0
Definition: ttg.h:601
this contains PaRSEC-based TTG functionality
Definition: fwd.h:18
void ttg_fence(ttg::World world)
Definition: ttg.h:1117
std::tuple< int, void *, size_t > static_set_arg_fct_arg_t
Definition: ttg.h:140
std::map< uint64_t, static_set_arg_fct_call_t > static_id_to_op_map
Definition: ttg.h:138
std::multimap< uint64_t, static_set_arg_fct_arg_t > delayed_unpack_actions
Definition: ttg.h:141
void ttg_finalize()
Definition: ttg.h:1103
void ttg_register_ptr(ttg::World world, const std::shared_ptr< T > &ptr)
Definition: ttg.h:1120
std::mutex static_map_mutex
Definition: ttg.h:139
void ttg_register_callback(ttg::World world, Callback &&callback)
Definition: ttg.h:1134
ttg::Edge & ttg_ctl_edge(ttg::World world)
Definition: ttg.h:1138
void make_executable_hook(ttg::World &)
Definition: ttg.h:1146
void(* static_set_arg_fct_type)(void *, size_t, ttg::TTBase *)
Definition: ttg.h:136
void ttg_initialize(int argc, char **argv, int num_threads=-1, parsec_context_s *=nullptr)
void ttg_register_status(ttg::World world, const std::shared_ptr< std::promise< void >> &status_ptr)
Definition: ttg.h:1129
ttg::World ttg_default_execution_context()
Definition: ttg.h:1113
std::pair< static_set_arg_fct_type, ttg::TTBase * > static_set_arg_fct_call_t
Definition: ttg.h:137
void ttg_execute(ttg::World world)
Definition: ttg.h:1116
void ttg_sum(ttg::World world, double &value)
Definition: ttg.h:1140
top-level TTG namespace contains runtime-neutral functionality
Definition: keymap.h:8
ExecutionSpace
denotes task execution space
Definition: execution.h:17
int size(World world=default_execution_context())
Definition: run.h:89
void abort()
Aborts the TTG program using the default backend's ttg_abort method.
Definition: run.h:62
Runtime
Definition: runtimes.h:15
World default_execution_context()
Accesses the default backend's default execution context.
Definition: run.h:68
TTG_CXX_COROUTINE_NAMESPACE::coroutine_handle< Promise > coroutine_handle
Definition: coroutine.h:24
void print(const T &t, const Ts &... ts)
atomically prints to std::cout a sequence of items (separated by ttg::print_separator) followed by st...
Definition: print.h:130
void print_error(const T &t, const Ts &... ts)
atomically prints to std::cerr a sequence of items (separated by ttg::print_separator) followed by st...
Definition: print.h:138
bool tracing()
returns whether tracing is enabled
Definition: trace.h:28
int rank(World world=default_execution_context())
Definition: run.h:85
ttg::World & get_default_world()
Definition: world.h:80
void trace(const T &t, const Ts &... ts)
Definition: trace.h:43
@ ResumableTask
-> ttg::resumable_task
@ Invalid
not a coroutine, i.e. a standard task function, -> void
@ DeviceTask
-> ttg::device::Task
Provides (de)serialization of C++ data that can be invoked from C via ttg_data_descriptor.
const Value & operator()(const Value &value)
Definition: ttg.h:4587
std::conditional_t< std::is_reference_v< Value >, Value, Value && > operator()(Value &&value)
Definition: ttg.h:4521
value_copy_handler & operator=(value_copy_handler &&h)
Definition: ttg.h:4505
value_copy_handler(const value_copy_handler &h)=delete
std::add_lvalue_reference_t< Value > operator()(ttg_parsec::detail::persistent_value_ref< Value > vref)
Definition: ttg.h:4562
value_copy_handler & operator=(const value_copy_handler &h)=delete
Computes hash values for objects of type T.
Definition: hash.h:81
task that can be resumed after some events occur
Definition: coroutine.h:53
parsec_hash_table_t task_constraint_table
Definition: ttg.h:1177
parsec_hash_table_t tasks_table
Definition: ttg.h:1176
parsec_gpu_task_t * gpu_task
Definition: task.h:14
msg_header_t tt_id
Definition: ttg.h:179
static constexpr std::size_t max_payload_size
Definition: ttg.h:180
msg_t(uint64_t tt_id, uint32_t taskpool_id, msg_header_t::fn_id_t fn_id, int32_t param_id, int sender, int num_keys=1)
Definition: ttg.h:184
unsigned char bytes[max_payload_size]
Definition: ttg.h:181
ttg_parsec_data_flags data_flags
Definition: task.h:136
parsec_hash_table_item_t tt_ht_item
Definition: task.h:94
std::array< stream_info_t, num_streams > streams
Definition: task.h:210
ttg_data_copy_t * copies[num_copies]
Definition: task.h:216
lvalue_reference_type value_ref
Definition: ttvalue.h:88
static void drop_all_ptr()
Definition: ptr.h:117
static void release_task(parsec_ttg_task_base_t *task_base)
Definition: task.h:359
parsec_task_t * get_next_task() const
ttg::span< ttg::iovec > iovec_span()
void transfer_ownership(int access, int device=0)
void set_next_task(parsec_task_t *task)
enum ttg_parsec::msg_header_t::fn_id fn_id_t
uint32_t taskpool_id
Definition: ttg.h:150
std::int8_t num_iovecs
Definition: ttg.h:154
std::size_t key_offset
Definition: ttg.h:152
msg_header_t(fn_id_t fid, uint32_t tid, uint64_t oid, int32_t pid, int sender, int nk)
Definition: ttg.h:162
#define TTG_PROCESS_TT_OP_RETURN(result, id, invoke)
Definition: tt.h:181
int parsec_add_fetch_runtime_task(parsec_taskpool_t *tp, int tasks)
void parsec_taskpool_termination_detected(parsec_taskpool_t *tp)
#define TTG_PARSEC_DEFER_WRITER
Definition: ttg.h:13