ttg.h
Go to the documentation of this file.
1 // clang-format off
2 #ifndef PARSEC_TTG_H_INCLUDED
3 #define PARSEC_TTG_H_INCLUDED
4 
5 /* set up env if this header was included directly */
6 #if !defined(TTG_IMPL_NAME)
7 #define TTG_USE_PARSEC 1
8 #endif // !defined(TTG_IMPL_NAME)
9 
10 /* Whether to defer a potential writer if there are readers.
11  * This may avoid extra copies in exchange for concurrency.
12  * This may cause deadlocks, so use with caution. */
13 #define TTG_PARSEC_DEFER_WRITER false
14 
15 #include "ttg/config.h"
16 
17 #include "ttg/impl_selector.h"
18 
19 /* include ttg header to make symbols available in case this header is included directly */
20 #include "../../ttg.h"
21 
22 #include "ttg/base/keymap.h"
23 #include "ttg/base/tt.h"
24 #include "ttg/base/world.h"
25 #include "ttg/constraint.h"
26 #include "ttg/edge.h"
27 #include "ttg/execution.h"
28 #include "ttg/func.h"
29 #include "ttg/runtimes.h"
30 #include "ttg/terminal.h"
31 #include "ttg/tt.h"
32 #include "ttg/util/env.h"
33 #include "ttg/util/hash.h"
34 #include "ttg/util/meta.h"
35 #include "ttg/util/meta/callable.h"
36 #include "ttg/util/print.h"
37 #include "ttg/util/scope_exit.h"
38 #include "ttg/util/trace.h"
39 #include "ttg/util/typelist.h"
40 
42 
43 #include "ttg/parsec/fwd.h"
44 
45 #include "ttg/parsec/buffer.h"
48 #include "ttg/parsec/devicefunc.h"
49 #include "ttg/parsec/ttvalue.h"
50 #include "ttg/device/task.h"
51 #include "ttg/parsec/parsec_data.h"
52 
53 #include <algorithm>
54 #include <array>
55 #include <cassert>
56 #include <cstring>
57 #include <experimental/type_traits>
58 #include <functional>
59 #include <future>
60 #include <iostream>
61 #include <list>
62 #include <map>
63 #include <memory>
64 #include <mutex>
65 #include <numeric>
66 #include <sstream>
67 #include <string>
68 #include <tuple>
69 #include <vector>
70 
71 // needed for MPIX_CUDA_AWARE_SUPPORT
72 #if defined(TTG_HAVE_MPI)
73 #include <mpi.h>
74 #if defined(TTG_HAVE_MPIEXT)
75 #include <mpi-ext.h>
76 #endif // TTG_HAVE_MPIEXT
77 #endif // TTG_HAVE_MPI
78 
79 
80 #include <parsec.h>
81 #include <parsec/class/parsec_hash_table.h>
82 #include <parsec/data_internal.h>
83 #include <parsec/execution_stream.h>
84 #include <parsec/interfaces/interface.h>
85 #include <parsec/mca/device/device.h>
86 #include <parsec/parsec_comm_engine.h>
87 #include <parsec/parsec_internal.h>
88 #include <parsec/scheduling.h>
89 #include <parsec/remote_dep.h>
90 #include <parsec/utils/mca_param.h>
91 
92 #ifdef PARSEC_HAVE_DEV_CUDA_SUPPORT
93 #include <parsec/mca/device/cuda/device_cuda.h>
94 #endif // PARSEC_HAVE_DEV_CUDA_SUPPORT
95 #ifdef PARSEC_HAVE_DEV_HIP_SUPPORT
96 #include <parsec/mca/device/hip/device_hip.h>
97 #endif // PARSEC_HAVE_DEV_HIP_SUPPORT
98 #ifdef PARSEC_HAVE_DEV_LEVEL_ZERO_SUPPORT
99 #include <parsec/mca/device/level_zero/device_level_zero.h>
100 #endif //PARSEC_HAVE_DEV_LEVEL_ZERO_SUPPORT
101 
102 #include <parsec/mca/device/device_gpu.h>
103 #if defined(PARSEC_PROF_TRACE)
104 #include <parsec/profiling.h>
105 #undef PARSEC_TTG_PROFILE_BACKEND
106 #if defined(PARSEC_PROF_GRAPHER)
107 #include <parsec/parsec_prof_grapher.h>
108 #endif
109 #endif
110 #include <cstdlib>
111 #include <cstring>
112 
113 #if defined(TTG_PARSEC_DEBUG_TRACK_DATA_COPIES)
114 #include <unordered_set>
115 #endif
116 
118 #include "ttg/parsec/thread_local.h"
119 #include "ttg/parsec/ptr.h"
120 #include "ttg/parsec/task.h"
121 #include "ttg/parsec/parsec-ext.h"
122 
123 #include "ttg/device/device.h"
124 
125 #undef TTG_PARSEC_DEBUG_TRACK_DATA_COPIES
126 
127 /* PaRSEC function declarations */
128 extern "C" {
129 void parsec_taskpool_termination_detected(parsec_taskpool_t *tp);
130 int parsec_add_fetch_runtime_task(parsec_taskpool_t *tp, int tasks);
131 }
132 
133 namespace ttg_parsec {
134  typedef void (*static_set_arg_fct_type)(void *, size_t, ttg::TTBase *);
135  typedef std::pair<static_set_arg_fct_type, ttg::TTBase *> static_set_arg_fct_call_t;
136  inline std::map<uint64_t, static_set_arg_fct_call_t> static_id_to_op_map;
137  inline std::mutex static_map_mutex;
138  typedef std::tuple<int, std::unique_ptr<std::byte[]>, size_t> static_set_arg_fct_arg_t;
139  inline std::multimap<uint64_t, static_set_arg_fct_arg_t> delayed_unpack_actions;
140 
141  struct msg_header_t {
142  typedef enum fn_id : std::int8_t {
148  uint32_t taskpool_id = std::numeric_limits<uint32_t>::max();
149  uint64_t op_id = std::numeric_limits<uint64_t>::max();
150  std::size_t key_offset = 0;
152  std::int8_t num_iovecs = 0;
153  bool inline_data = false;
154  int32_t param_id = -1;
155  int num_keys = 0;
156  int sender = -1;
157 
158  msg_header_t() = default;
159 
160  msg_header_t(fn_id_t fid, uint32_t tid, uint64_t oid, int32_t pid, int sender, int nk)
161  : fn_id(fid)
162  , taskpool_id(tid)
163  , op_id(oid)
164  , param_id(pid)
165  , num_keys(nk)
166  , sender(sender)
167  { }
168  };
169 
170  static void unregister_parsec_tags(void *_);
171 
172  namespace detail {
173 
174  constexpr const int PARSEC_TTG_MAX_AM_SIZE = 1 * 1024*1024;
175 
176  struct msg_t {
178  static constexpr std::size_t max_payload_size = PARSEC_TTG_MAX_AM_SIZE - sizeof(msg_header_t);
179  unsigned char bytes[max_payload_size];
180 
181  msg_t() = default;
182  msg_t(uint64_t tt_id,
183  uint32_t taskpool_id,
184  msg_header_t::fn_id_t fn_id,
185  int32_t param_id,
186  int sender,
187  int num_keys = 1)
188  : tt_id(fn_id, taskpool_id, tt_id, param_id, sender, num_keys)
189  {}
190  };
191 
193 
194  static int static_unpack_msg(parsec_comm_engine_t *ce, uint64_t tag, void *data, long unsigned int size,
195  int src_rank, void *obj) {
196  static_set_arg_fct_type static_set_arg_fct;
197  parsec_taskpool_t *tp = NULL;
198  msg_header_t *msg = static_cast<msg_header_t *>(data);
199  uint64_t op_id = msg->op_id;
200  tp = parsec_taskpool_lookup(msg->taskpool_id);
201  assert(NULL != tp);
202  static_map_mutex.lock();
203  if (PARSEC_TERM_TP_NOT_READY != tp->tdm.module->taskpool_state(tp)) {
204  try {
205  auto op_pair = static_id_to_op_map.at(op_id);
206  static_map_mutex.unlock();
207  tp->tdm.module->incoming_message_start(tp, src_rank, NULL, NULL, 0, NULL);
208  static_set_arg_fct = op_pair.first;
209  static_set_arg_fct(data, size, op_pair.second);
210  tp->tdm.module->incoming_message_end(tp, NULL);
211  return 0;
212  } catch (const std::out_of_range &e)
213  { /* fall-through */ }
214  }
215  auto data_cpy = std::make_unique_for_overwrite<std::byte[]>(size);
216  memcpy(data_cpy.get(), data, size);
217  ttg::trace("ttg_parsec(", ttg_default_execution_context().rank(), ") Delaying delivery of message (", src_rank,
218  ", ", op_id, ", ", data_cpy, ", ", size, ")");
219  delayed_unpack_actions.insert(std::make_pair(op_id, std::make_tuple(src_rank, std::move(data_cpy), size)));
220  static_map_mutex.unlock();
221  return 1;
222  }
223 
224  static int get_remote_complete_cb(parsec_comm_engine_t *ce, parsec_ce_tag_t tag, void *msg, size_t msg_size,
225  int src, void *cb_data);
226 
227  inline bool &initialized_mpi() {
228  static bool im = false;
229  return im;
230  }
231 
233 
234  } // namespace detail
235 
237  ttg::Edge<> m_ctl_edge;
238  bool _dag_profiling;
239  bool _task_profiling;
240  std::array<bool, static_cast<std::size_t>(ttg::ExecutionSpace::Invalid)>
241  mpi_space_support = {true, false, false};
242 
243  int query_comm_size() {
244  int comm_size;
245  MPI_Comm_size(MPI_COMM_WORLD, &comm_size);
246  return comm_size;
247  }
248 
249  int query_comm_rank() {
250  int comm_rank;
251  MPI_Comm_rank(MPI_COMM_WORLD, &comm_rank);
252  return comm_rank;
253  }
254 
255  static void ttg_parsec_ce_up(parsec_comm_engine_t *comm_engine, void *user_data)
256  {
257  parsec_ce.tag_register(WorldImpl::parsec_ttg_tag(), &detail::static_unpack_msg, user_data, detail::PARSEC_TTG_MAX_AM_SIZE);
258  parsec_ce.tag_register(WorldImpl::parsec_ttg_rma_tag(), &detail::get_remote_complete_cb, user_data, 128);
259  }
260 
261  static void ttg_parsec_ce_down(parsec_comm_engine_t *comm_engine, void *user_data)
262  {
263  parsec_ce.tag_unregister(WorldImpl::parsec_ttg_tag());
264  parsec_ce.tag_unregister(WorldImpl::parsec_ttg_rma_tag());
265  }
266 
267  public:
268 #if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND)
269  int parsec_ttg_profile_backend_set_arg_start, parsec_ttg_profile_backend_set_arg_end;
270  int parsec_ttg_profile_backend_bcast_arg_start, parsec_ttg_profile_backend_bcast_arg_end;
271  int parsec_ttg_profile_backend_allocate_datacopy, parsec_ttg_profile_backend_free_datacopy;
272 #endif
273 
274  WorldImpl(int *argc, char **argv[], int ncores, parsec_context_t *c = nullptr)
275  : WorldImplBase(query_comm_size(), query_comm_rank())
276  , ctx(c)
277  , own_ctx(c == nullptr)
278 #if defined(PARSEC_PROF_TRACE)
279  , profiling_array(nullptr)
280  , profiling_array_size(0)
281 #endif
282  , _dag_profiling(false)
283  , _task_profiling(false)
284  {
286  if (own_ctx) ctx = parsec_init(ncores, argc, argv);
287 
288  /* query MPI device support */
290 #if defined(MPIX_CUDA_AWARE_SUPPORT) && MPIX_CUDA_AWARE_SUPPORT
291  || MPIX_Query_cuda_support()
292 #endif // MPIX_CUDA_AWARE_SUPPORT
293  ) {
294  mpi_space_support[static_cast<std::size_t>(ttg::ExecutionSpace::CUDA)] = true;
295  }
296 
298 #if defined(MPIX_HIP_AWARE_SUPPORT) && MPIX_HIP_AWARE_SUPPORT
299  || MPIX_Query_hip_support()
300 #endif // MPIX_HIP_AWARE_SUPPORT
301  ) {
302  mpi_space_support[static_cast<std::size_t>(ttg::ExecutionSpace::HIP)] = true;
303  }
304 
305 #if defined(PARSEC_PROF_TRACE)
306  if(parsec_profile_enabled) {
307  profile_on();
308 #if defined(PARSEC_TTG_PROFILE_BACKEND)
309  parsec_profiling_add_dictionary_keyword("PARSEC_TTG_SET_ARG_IMPL", "fill:000000", 0, NULL,
310  (int*)&parsec_ttg_profile_backend_set_arg_start,
311  (int*)&parsec_ttg_profile_backend_set_arg_end);
312  parsec_profiling_add_dictionary_keyword("PARSEC_TTG_BCAST_ARG_IMPL", "fill:000000", 0, NULL,
313  (int*)&parsec_ttg_profile_backend_bcast_arg_start,
314  (int*)&parsec_ttg_profile_backend_bcast_arg_end);
315  parsec_profiling_add_dictionary_keyword("PARSEC_TTG_DATACOPY", "fill:000000",
316  sizeof(size_t), "size{int64_t}",
317  (int*)&parsec_ttg_profile_backend_allocate_datacopy,
318  (int*)&parsec_ttg_profile_backend_free_datacopy);
319 #endif
320  }
321 #endif
322 
323 #ifdef PARSEC_PROF_GRAPHER
324  /* check if PaRSEC's dot tracing is enabled */
325  int dot_param_idx;
326  dot_param_idx = parsec_mca_param_find("profile", NULL, "dot");
327 
328  if (dot_param_idx != PARSEC_ERROR) {
329  char *filename;
330  parsec_mca_param_lookup_string(dot_param_idx, &filename);
331  dag_on(filename);
332  }
333 #endif // PARSEC_PROF_GRAPHER
334 
335  if( NULL != parsec_ce.tag_register) {
336  parsec_ce.tag_register(WorldImpl::parsec_ttg_tag(), &detail::static_unpack_msg, this, detail::PARSEC_TTG_MAX_AM_SIZE);
337  parsec_ce.tag_register(WorldImpl::parsec_ttg_rma_tag(), &detail::get_remote_complete_cb, this, 128);
338  }
339 
340  create_tpool();
341  }
342 
343 
344  auto *context() { return ctx; }
345  auto *execution_stream() { return parsec_my_execution_stream(); }
346  auto *taskpool() { return tpool; }
347 
348  void create_tpool() {
349  assert(nullptr == tpool);
350  tpool = PARSEC_OBJ_NEW(parsec_taskpool_t);
351  tpool->taskpool_id = std::numeric_limits<uint32_t>::max();
352  tpool->update_nb_runtime_task = parsec_add_fetch_runtime_task;
353  tpool->taskpool_type = PARSEC_TASKPOOL_TYPE_TTG;
354  tpool->taskpool_name = strdup("TTG Taskpool");
355  parsec_taskpool_reserve_id(tpool);
356 
357  tpool->devices_index_mask = 0;
358  for(int i = 0; i < (int)parsec_nb_devices; i++) {
359  parsec_device_module_t *device = parsec_mca_device_get(i);
360  if( NULL == device ) continue;
361  tpool->devices_index_mask |= (1 << device->device_index);
362  }
363 
364 #ifdef TTG_USE_USER_TERMDET
365  parsec_termdet_open_module(tpool, "user_trigger");
366 #else // TTG_USE_USER_TERMDET
367  parsec_termdet_open_dyn_module(tpool);
368 #endif // TTG_USE_USER_TERMDET
369  tpool->tdm.module->monitor_taskpool(tpool, parsec_taskpool_termination_detected);
370  // In TTG, we use the pending actions to denote that the
371  // taskpool is not ready, i.e. some local tasks could still
372  // be added by the main thread. It should then be initialized
373  // to 0, execute will set it to 1 and mark the tpool as ready,
374  // and the fence() will decrease it back to 0.
375  tpool->tdm.module->taskpool_set_runtime_actions(tpool, 0);
376  parsec_taskpool_enable(tpool, NULL, NULL, execution_stream(), size() > 1);
377 
378 #if defined(PARSEC_PROF_TRACE)
379  tpool->profiling_array = profiling_array;
380 #endif
381 
382  // Termination detection in PaRSEC requires to synchronize the
383  // taskpool enabling, to avoid a race condition that would keep
384  // termination detection-related messages in a waiting queue
385  // forever
386  MPI_Barrier(comm());
387 
388  parsec_taskpool_started = false;
389  }
390 
391  /* Deleted copy ctor */
392  WorldImpl(const WorldImpl &other) = delete;
393 
394  /* Deleted move ctor */
395  WorldImpl(WorldImpl &&other) = delete;
396 
397  /* Deleted copy assignment */
398  WorldImpl &operator=(const WorldImpl &other) = delete;
399 
400  /* Deleted move assignment */
401  WorldImpl &operator=(WorldImpl &&other) = delete;
402 
404 
405  static constexpr int parsec_ttg_tag() { return PARSEC_DSL_TTG_TAG; }
406  static constexpr int parsec_ttg_rma_tag() { return PARSEC_DSL_TTG_RMA_TAG; }
407 
408  MPI_Comm comm() const { return MPI_COMM_WORLD; }
409 
410  virtual void execute() override {
411  if (!parsec_taskpool_started) {
412  parsec_enqueue(ctx, tpool);
413  tpool->tdm.module->taskpool_addto_runtime_actions(tpool, 1);
414  tpool->tdm.module->taskpool_ready(tpool);
415  [[maybe_unused]] auto ret = parsec_context_start(ctx);
416  // ignore ret since all of its nonzero values are OK (e.g. -1 due to ctx already being active)
417  parsec_taskpool_started = true;
418  }
419  }
420 
421  void destroy_tpool() {
422 #if defined(PARSEC_PROF_TRACE)
423  // We don't want to release the profiling array, as it should be persistent
424  // between fences() to allow defining a TT/TTG before a fence() and schedule
425  // it / complete it after a fence()
426  tpool->profiling_array = nullptr;
427 #endif
428  assert(NULL != tpool->tdm.monitor);
429  tpool->tdm.module->unmonitor_taskpool(tpool);
430  parsec_taskpool_free(tpool);
431  tpool = nullptr;
432  }
433 
434  virtual void destroy() override {
435  if (is_valid()) {
436  if (parsec_taskpool_started) {
437  // We are locally ready (i.e. we won't add new tasks)
438  tpool->tdm.module->taskpool_addto_runtime_actions(tpool, -1);
439  ttg::trace("ttg_parsec(", this->rank(), "): final waiting for completion");
440  if (own_ctx)
441  parsec_context_wait(ctx);
442  else
443  parsec_taskpool_wait(tpool);
444  }
445  release_ops();
447  destroy_tpool();
448  if (own_ctx) {
449  unregister_parsec_tags(nullptr);
450  } else {
451  parsec_context_at_fini(unregister_parsec_tags, nullptr);
452  }
453 #if defined(PARSEC_PROF_TRACE)
454  if(nullptr != profiling_array) {
455  free(profiling_array);
456  profiling_array = nullptr;
457  profiling_array_size = 0;
458  }
459 #endif
460  if (own_ctx) parsec_fini(&ctx);
461  mark_invalid();
462  }
463  }
464 
465  ttg::Edge<> &ctl_edge() { return m_ctl_edge; }
466 
467  const ttg::Edge<> &ctl_edge() const { return m_ctl_edge; }
468 
469  void increment_created() { taskpool()->tdm.module->taskpool_addto_nb_tasks(taskpool(), 1); }
470 
471  void increment_inflight_msg() { taskpool()->tdm.module->taskpool_addto_runtime_actions(taskpool(), 1); }
472  void decrement_inflight_msg() { taskpool()->tdm.module->taskpool_addto_runtime_actions(taskpool(), -1); }
473 
474  bool dag_profiling() override { return _dag_profiling; }
475 
476  virtual void dag_on(const std::string &filename) override {
477 #if defined(PARSEC_PROF_GRAPHER)
478  if(!_dag_profiling) {
479  profile_on();
480  size_t len = strlen(filename.c_str())+32;
481  char ext_filename[len];
482  snprintf(ext_filename, len, "%s-%d.dot", filename.c_str(), rank());
483  parsec_prof_grapher_init(ctx, ext_filename);
484  _dag_profiling = true;
485  }
486 #else
487  ttg::print("Error: requested to create '", filename, "' to create a DAG of tasks,\n"
488  "but PaRSEC does not support graphing options. Reconfigure with PARSEC_PROF_GRAPHER=ON\n");
489 #endif
490  }
491 
492  virtual void dag_off() override {
493 #if defined(PARSEC_PROF_GRAPHER)
494  if(_dag_profiling) {
495  parsec_prof_grapher_fini();
496  _dag_profiling = false;
497  }
498 #endif
499  }
500 
501  virtual void profile_off() override {
502 #if defined(PARSEC_PROF_TRACE)
503  _task_profiling = false;
504 #endif
505  }
506 
507  virtual void profile_on() override {
508 #if defined(PARSEC_PROF_TRACE)
509  _task_profiling = true;
510 #endif
511  }
512 
513  virtual bool profiling() override { return _task_profiling; }
514 
516  return mpi_space_support[static_cast<std::size_t>(space)];
517  }
518 
519  virtual void final_task() override {
520 #ifdef TTG_USE_USER_TERMDET
521  if(parsec_taskpool_started) {
522  taskpool()->tdm.module->taskpool_set_nb_tasks(taskpool(), 0);
523  parsec_taskpool_started = false;
524  }
525 #endif // TTG_USE_USER_TERMDET
526  }
527 
528  template <typename keyT, typename output_terminalsT, typename derivedT,
529  typename input_valueTs = ttg::typelist<>, ttg::ExecutionSpace Space>
531 #if defined(PARSEC_PROF_TRACE)
532  std::stringstream ss;
533  build_composite_name_rec(t->ttg_ptr(), ss);
534  ss << t->get_name();
535  register_new_profiling_event(ss.str().c_str(), t->get_instance_id());
536 #endif
537  }
538 
539  protected:
540 #if defined(PARSEC_PROF_TRACE)
541  void build_composite_name_rec(const ttg::TTBase *t, std::stringstream &ss) {
542  if(nullptr == t)
543  return;
544  build_composite_name_rec(t->ttg_ptr(), ss);
545  ss << t->get_name() << "::";
546  }
547 
548  void register_new_profiling_event(const char *name, int position) {
549  if(2*position >= profiling_array_size) {
550  size_t new_profiling_array_size = 64 * ((2*position + 63)/64 + 1);
551  profiling_array = (int*)realloc((void*)profiling_array,
552  new_profiling_array_size * sizeof(int));
553  memset((void*)&profiling_array[profiling_array_size], 0, sizeof(int)*(new_profiling_array_size - profiling_array_size));
554  profiling_array_size = new_profiling_array_size;
555  tpool->profiling_array = profiling_array;
556  }
557 
558  assert(0 == tpool->profiling_array[2*position]);
559  assert(0 == tpool->profiling_array[2*position+1]);
560  // TODO PROFILING: 0 and NULL should be replaced with something that depends on the key human-readable serialization...
561  // Typically, we would put something like 3*sizeof(int32_t), "m{int32_t};n{int32_t};k{int32_t}" to say
562  // there are three fields, named m, n and k, stored in this order, and each of size int32_t
563  parsec_profiling_add_dictionary_keyword(name, "fill:000000", 64, "key{char[64]}",
564  (int*)&tpool->profiling_array[2*position],
565  (int*)&tpool->profiling_array[2*position+1]);
566  }
567 #endif
568 
569  virtual void fence_impl(void) override {
570  int rank = this->rank();
571  if (!parsec_taskpool_started) {
572  ttg::trace("ttg_parsec::(", rank, "): parsec taskpool has not been started, fence is a simple MPI_Barrier");
573  MPI_Barrier(comm());
574  return;
575  }
576  ttg::trace("ttg_parsec::(", rank, "): parsec taskpool is ready for completion");
577  // We are locally ready (i.e. we won't add new tasks)
578  tpool->tdm.module->taskpool_addto_runtime_actions(tpool, -1);
579  ttg::trace("ttg_parsec(", rank, "): waiting for completion");
580  parsec_taskpool_wait(tpool);
581 
582  // We need the synchronization between the end of the context and the restart of the taskpool
583  // until we use parsec_taskpool_wait and implement an epoch in the PaRSEC taskpool
584  // see Issue #118 (TTG)
585  MPI_Barrier(comm());
586 
587  destroy_tpool();
588  create_tpool();
589  execute();
590  }
591 
592  private:
593  parsec_context_t *ctx = nullptr;
594  bool own_ctx = false; //< whether I own the context
595  parsec_taskpool_t *tpool = nullptr;
596  bool parsec_taskpool_started = false;
597 #if defined(PARSEC_PROF_TRACE)
598  int *profiling_array;
599  std::size_t profiling_array_size;
600 #endif
601  };
602 
603  static void unregister_parsec_tags(void *_pidx)
604  {
605  if(NULL != parsec_ce.tag_unregister) {
606  parsec_ce.tag_unregister(WorldImpl::parsec_ttg_tag());
607  parsec_ce.tag_unregister(WorldImpl::parsec_ttg_rma_tag());
608  }
609  }
610 
611  namespace detail {
612 
613  const parsec_symbol_t parsec_taskclass_param0 = {
614  .flags = PARSEC_SYMBOL_IS_STANDALONE|PARSEC_SYMBOL_IS_GLOBAL,
615  .name = "HASH0",
616  .context_index = 0,
617  .min = nullptr,
618  .max = nullptr,
619  .expr_inc = nullptr,
620  .cst_inc = 0 };
621  const parsec_symbol_t parsec_taskclass_param1 = {
622  .flags = PARSEC_SYMBOL_IS_STANDALONE|PARSEC_SYMBOL_IS_GLOBAL,
623  .name = "HASH1",
624  .context_index = 1,
625  .min = nullptr,
626  .max = nullptr,
627  .expr_inc = nullptr,
628  .cst_inc = 0 };
629  const parsec_symbol_t parsec_taskclass_param2 = {
630  .flags = PARSEC_SYMBOL_IS_STANDALONE|PARSEC_SYMBOL_IS_GLOBAL,
631  .name = "KEY0",
632  .context_index = 2,
633  .min = nullptr,
634  .max = nullptr,
635  .expr_inc = nullptr,
636  .cst_inc = 0 };
637  const parsec_symbol_t parsec_taskclass_param3 = {
638  .flags = PARSEC_SYMBOL_IS_STANDALONE|PARSEC_SYMBOL_IS_GLOBAL,
639  .name = "KEY1",
640  .context_index = 3,
641  .min = nullptr,
642  .max = nullptr,
643  .expr_inc = nullptr,
644  .cst_inc = 0 };
645 
647  ttg_data_copy_t *res = nullptr;
648  if (task == nullptr || ptr == nullptr) {
649  return res;
650  }
651  for (int i = 0; i < task->data_count; ++i) {
652  auto copy = static_cast<ttg_data_copy_t *>(task->copies[i]);
653  if (NULL != copy && copy->get_ptr() == ptr) {
654  res = copy;
655  break;
656  }
657  }
658  return res;
659  }
660 
661  inline int find_index_of_copy_in_task(parsec_ttg_task_base_t *task, const void *ptr) {
662  int i = -1;
663  if (task == nullptr || ptr == nullptr) {
664  return i;
665  }
666  for (i = 0; i < task->data_count; ++i) {
667  auto copy = static_cast<ttg_data_copy_t *>(task->copies[i]);
668  if (NULL != copy && copy->get_ptr() == ptr) {
669  return i;
670  }
671  }
672  return -1;
673  }
674 
676  if (task == nullptr || copy == nullptr) {
677  return false;
678  }
679 
680  if (MAX_PARAM_COUNT < task->data_count) {
681  throw std::logic_error("Too many data copies, check MAX_PARAM_COUNT!");
682  }
683 
684  task->copies[task->data_count] = copy;
685  task->data_count++;
686  return true;
687  }
688 
690  int i;
691  /* find and remove entry; copies are usually appended and removed, so start from back */
692  for (i = task->data_count-1; i >= 0; --i) {
693  if (copy == task->copies[i]) {
694  break;
695  }
696  }
697  if (i < 0) return;
698  /* move all following elements one up */
699  for (; i < task->data_count - 1; ++i) {
700  task->copies[i] = task->copies[i + 1];
701  }
702  /* null last element */
703  task->copies[i] = nullptr;
704  task->data_count--;
705  }
706 
707 #if defined(TTG_PARSEC_DEBUG_TRACK_DATA_COPIES)
708 #warning "ttg::PaRSEC enables data copy tracking"
709  static std::unordered_set<ttg_data_copy_t *> pending_copies;
710  static std::mutex pending_copies_mutex;
711 #endif
712 #if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND)
713  static int64_t parsec_ttg_data_copy_uid = 0;
714 #endif
715 
716  template <typename Value>
717  inline ttg_data_copy_t *create_new_datacopy(Value &&value) {
718  using value_type = std::decay_t<Value>;
719  ttg_data_copy_t *copy = nullptr;
720  if constexpr (std::is_base_of_v<ttg::TTValue<value_type>, value_type> &&
721  std::is_constructible_v<value_type, decltype(value)>) {
722  copy = new value_type(std::forward<Value>(value));
723  } else if constexpr (std::is_constructible_v<ttg_data_value_copy_t<value_type>, decltype(value)>) {
724  copy = new ttg_data_value_copy_t<value_type>(std::forward<Value>(value));
725  } else {
726  /* we have no way to create a new copy from this value */
727  throw std::logic_error("Trying to copy-construct data that is not copy-constructible!");
728  }
729 #if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND)
730  // Keep track of additional memory usage
731  if(ttg::default_execution_context().impl().profiling()) {
732  copy->size = sizeof(Value);
733  copy->uid = parsec_atomic_fetch_inc_int64(&parsec_ttg_data_copy_uid);
734  parsec_profiling_ts_trace_flags(ttg::default_execution_context().impl().parsec_ttg_profile_backend_allocate_datacopy,
735  static_cast<uint64_t>(copy->uid),
736  PROFILE_OBJECT_ID_NULL, &copy->size,
737  PARSEC_PROFILING_EVENT_COUNTER|PARSEC_PROFILING_EVENT_HAS_INFO);
738  }
739 #endif
740 #if defined(TTG_PARSEC_DEBUG_TRACK_DATA_COPIES)
741  {
742  const std::lock_guard<std::mutex> lock(pending_copies_mutex);
743  auto rc = pending_copies.insert(copy);
744  assert(std::get<1>(rc));
745  }
746 #endif
747  return copy;
748  }
749 
750 #if 0
751  template <std::size_t... IS, typename Key = keyT>
752  void invoke_pull_terminals(std::index_sequence<IS...>, const Key &key, detail::parsec_ttg_task_base_t *task) {
753  int junk[] = {0, (invoke_pull_terminal<IS>(
754  std::get<IS>(input_terminals), key, task),
755  0)...};
756  junk[0]++;
757  }
758 #endif // 0
759 
760  template<typename T>
761  inline void transfer_ownership_impl(T&& arg, int device) {
762  if constexpr(!std::is_const_v<std::remove_reference_t<T>>) {
763  detail::foreach_parsec_data(arg, [&](parsec_data_t *data){
764  parsec_data_transfer_ownership_to_copy(data, device, PARSEC_FLOW_ACCESS_RW);
765  /* make sure we increment the version since we will modify the data */
766  data->device_copies[0]->version++;
767  });
768  }
769  }
770 
771  template<typename TT, std::size_t... Is>
772  inline void transfer_ownership(parsec_ttg_task_t<TT> *me, int device, std::index_sequence<Is...>) {
773  /* transfer ownership of each data */
774  int junk[] = {0,
776  *reinterpret_cast<std::remove_reference_t<std::tuple_element_t<Is, typename TT::input_refs_tuple_type>> *>(
777  me->copies[Is]->get_ptr()), device), 0)...};
778  junk[0]++;
779  }
780 
781  template<typename TT>
782  inline parsec_hook_return_t hook(struct parsec_execution_stream_s *es, parsec_task_t *parsec_task) {
783  parsec_ttg_task_t<TT> *me = (parsec_ttg_task_t<TT> *)parsec_task;
784  if constexpr(std::tuple_size_v<typename TT::input_values_tuple_type> > 0) {
785  transfer_ownership<TT>(me, 0, std::make_index_sequence<std::tuple_size_v<typename TT::input_values_tuple_type>>{});
786  }
787  return me->template invoke_op<ttg::ExecutionSpace::Host>();
788  }
789 
790  template<typename TT>
791  inline parsec_hook_return_t hook_cuda(struct parsec_execution_stream_s *es, parsec_task_t *parsec_task) {
792  if constexpr(TT::derived_has_cuda_op()) {
793  parsec_ttg_task_t<TT> *me = (parsec_ttg_task_t<TT> *)parsec_task;
794  return me->template invoke_op<ttg::ExecutionSpace::CUDA>();
795  } else {
796  std::cerr << "CUDA hook called without having a CUDA op!" << std::endl;
797  return PARSEC_HOOK_RETURN_ERROR;
798  }
799  }
800 
801  template<typename TT>
802  inline parsec_hook_return_t hook_hip(struct parsec_execution_stream_s *es, parsec_task_t *parsec_task) {
803  if constexpr(TT::derived_has_hip_op()) {
804  parsec_ttg_task_t<TT> *me = (parsec_ttg_task_t<TT> *)parsec_task;
805  return me->template invoke_op<ttg::ExecutionSpace::HIP>();
806  } else {
807  std::cerr << "HIP hook called without having a HIP op!" << std::endl;
808  return PARSEC_HOOK_RETURN_ERROR;
809  }
810  }
811 
812  template<typename TT>
813  inline parsec_hook_return_t hook_level_zero(struct parsec_execution_stream_s *es, parsec_task_t *parsec_task) {
814  if constexpr(TT::derived_has_level_zero_op()) {
815  parsec_ttg_task_t<TT> *me = (parsec_ttg_task_t<TT> *)parsec_task;
816  return me->template invoke_op<ttg::ExecutionSpace::L0>();
817  } else {
818  std::cerr << "L0 hook called without having a L0 op!" << std::endl;
819  return PARSEC_HOOK_RETURN_ERROR;
820  }
821  }
822 
823 
824  template<typename TT>
825  inline parsec_hook_return_t evaluate_cuda(const parsec_task_t *parsec_task) {
826  if constexpr(TT::derived_has_cuda_op()) {
827  parsec_ttg_task_t<TT> *me = (parsec_ttg_task_t<TT> *)parsec_task;
828  return me->template invoke_evaluate<ttg::ExecutionSpace::CUDA>();
829  } else {
830  return PARSEC_HOOK_RETURN_NEXT;
831  }
832  }
833 
834  template<typename TT>
835  inline parsec_hook_return_t evaluate_hip(const parsec_task_t *parsec_task) {
836  if constexpr(TT::derived_has_hip_op()) {
837  parsec_ttg_task_t<TT> *me = (parsec_ttg_task_t<TT> *)parsec_task;
838  return me->template invoke_evaluate<ttg::ExecutionSpace::HIP>();
839  } else {
840  return PARSEC_HOOK_RETURN_NEXT;
841  }
842  }
843 
844  template<typename TT>
845  inline parsec_hook_return_t evaluate_level_zero(const parsec_task_t *parsec_task) {
846  if constexpr(TT::derived_has_level_zero_op()) {
847  parsec_ttg_task_t<TT> *me = (parsec_ttg_task_t<TT> *)parsec_task;
848  return me->template invoke_evaluate<ttg::ExecutionSpace::L0>();
849  } else {
850  return PARSEC_HOOK_RETURN_NEXT;
851  }
852  }
853 
854 
855  template <typename KeyT, typename ActivationCallbackT>
857  std::vector<KeyT> _keylist;
858  std::atomic<int> _outstanding_transfers;
859  ActivationCallbackT _cb;
861 
862  public:
863  rma_delayed_activate(std::vector<KeyT> &&key, detail::ttg_data_copy_t *copy, int num_transfers, ActivationCallbackT cb)
864  : _keylist(std::move(key)), _outstanding_transfers(num_transfers), _cb(cb), _copy(copy) {}
865 
866  bool complete_transfer(void) {
867  int left = --_outstanding_transfers;
868  if (0 == left) {
869  _cb(std::move(_keylist), _copy);
870  return true;
871  }
872  return false;
873  }
874  };
875 
876  template <typename ActivationT>
877  static int get_complete_cb(parsec_comm_engine_t *comm_engine, parsec_ce_mem_reg_handle_t lreg, ptrdiff_t ldispl,
878  parsec_ce_mem_reg_handle_t rreg, ptrdiff_t rdispl, size_t size, int remote,
879  void *cb_data) {
880  parsec_ce.mem_unregister(&lreg);
881  ActivationT *activation = static_cast<ActivationT *>(cb_data);
882  if (activation->complete_transfer()) {
883  delete activation;
884  }
885  return PARSEC_SUCCESS;
886  }
887 
888  static int get_remote_complete_cb(parsec_comm_engine_t *ce, parsec_ce_tag_t tag, void *msg, size_t msg_size,
889  int src, void *cb_data) {
890  std::intptr_t *fn_ptr = static_cast<std::intptr_t *>(msg);
891  std::function<void(void)> *fn = reinterpret_cast<std::function<void(void)> *>(*fn_ptr);
892  (*fn)();
893  delete fn;
894  return PARSEC_SUCCESS;
895  }
896 
897  template <typename FuncT>
898  static int invoke_get_remote_complete_cb(parsec_comm_engine_t *ce, parsec_ce_tag_t tag, void *msg, size_t msg_size,
899  int src, void *cb_data) {
900  std::intptr_t *iptr = static_cast<std::intptr_t *>(msg);
901  FuncT *fn_ptr = reinterpret_cast<FuncT *>(*iptr);
902  (*fn_ptr)();
903  delete fn_ptr;
904  return PARSEC_SUCCESS;
905  }
906 
907  inline void release_data_copy(ttg_data_copy_t *copy) {
908  if (copy->is_mutable() && nullptr == copy->get_next_task()) {
909  /* current task mutated the data but there are no consumers so prepare
910  * the copy to be freed below */
911  copy->reset_readers();
912  }
913 
914  int32_t readers = copy->num_readers();
915  if (readers > 1) {
916  /* potentially more than one reader, decrement atomically */
917  readers = copy->decrement_readers();
918  } else if (readers == 1) {
919  /* make sure readers drop to zero */
920  readers = copy->decrement_readers<false>();
921  }
922  /* if there was only one reader (the current task) or
923  * a mutable copy and a successor, we release the copy */
924  if (1 == readers || readers == copy->mutable_tag) {
925  std::atomic_thread_fence(std::memory_order_acquire);
926  if (nullptr != copy->get_next_task()) {
927  /* Release the deferred task.
928  * The copy was mutable and will be mutated by the released task,
929  * so simply transfer ownership.
930  */
931  parsec_task_t *next_task = copy->get_next_task();
932  copy->set_next_task(nullptr);
933  parsec_ttg_task_base_t *deferred_op = (parsec_ttg_task_base_t *)next_task;
934  copy->mark_mutable();
935  deferred_op->release_task();
936  } else if ((1 == copy->num_ref()) || (1 == copy->drop_ref())) {
937  /* we are the last reference, delete the copy */
938 #if defined(TTG_PARSEC_DEBUG_TRACK_DATA_COPIES)
939  {
940  const std::lock_guard<std::mutex> lock(pending_copies_mutex);
941  size_t rc = pending_copies.erase(copy);
942  assert(1 == rc);
943  }
944 #endif
945 #if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND)
946  // Keep track of additional memory usage
947  if(ttg::default_execution_context().impl().profiling()) {
948  parsec_profiling_ts_trace_flags(ttg::default_execution_context().impl().parsec_ttg_profile_backend_free_datacopy,
949  static_cast<uint64_t>(copy->uid),
950  PROFILE_OBJECT_ID_NULL, &copy->size,
951  PARSEC_PROFILING_EVENT_COUNTER|PARSEC_PROFILING_EVENT_HAS_INFO);
952  }
953 #endif
954  delete copy;
955  }
956  }
957  }
958 
959  template <typename Value>
961  ttg_data_copy_t *copy_res = copy_in;
962  bool replace = false;
963  int32_t readers = copy_in->num_readers();
964  assert(readers != 0);
965 
966  /* try hard to defer writers if we cannot make copies
967  * if deferral fails we have to bail out */
968  bool defer_writer = (!std::is_copy_constructible_v<std::decay_t<Value>>) || task->defer_writer;
969 
970  if (readonly && !copy_in->is_mutable()) {
971  /* simply increment the number of readers */
972  readers = copy_in->increment_readers();
973  }
974 
975  if (readers == copy_in->mutable_tag) {
976  if (copy_res->get_next_task() != nullptr) {
977  if (readonly) {
978  parsec_ttg_task_base_t *next_task = reinterpret_cast<parsec_ttg_task_base_t *>(copy_res->get_next_task());
979  if (next_task->defer_writer) {
980  /* there is a writer but it signalled that it wants to wait for readers to complete */
981  return copy_res;
982  }
983  }
984  }
985  /* someone is going to write into this copy -> we need to make a copy */
986  copy_res = NULL;
987  if (readonly) {
988  /* we replace the copy in a deferred task if the copy will be mutated by
989  * the deferred task and we are readonly.
990  * That way, we can share the copy with other readonly tasks and release
991  * the deferred task. */
992  replace = true;
993  }
994  } else if (!readonly) {
995  /* this task will mutate the data
996  * check whether there are other readers already and potentially
997  * defer the release of this task to give following readers a
998  * chance to make a copy of the data before this task mutates it
999  *
1000  * Try to replace the readers with a negative value that indicates
1001  * the value is mutable. If that fails we know that there are other
1002  * readers or writers already.
1003  *
1004  * NOTE: this check is not atomic: either there is a single reader
1005  * (current task) or there are others, in which we case won't
1006  * touch it.
1007  */
1008  if (1 == copy_in->num_readers() && !defer_writer) {
1013  assert(nullptr == copy_in->get_next_task());
1014  copy_in->set_next_task(&task->parsec_task);
1015  std::atomic_thread_fence(std::memory_order_release);
1016  copy_in->mark_mutable();
1017  } else {
1018  if (defer_writer && nullptr == copy_in->get_next_task()) {
1019  /* we're the first writer and want to wait for all readers to complete */
1020  copy_res->set_next_task(&task->parsec_task);
1021  task->defer_writer = true;
1022  } else {
1023  /* there are writers and/or waiting already of this copy already, make a copy that we can mutate */
1024  copy_res = NULL;
1025  }
1026  }
1027  }
1028 
1029  if (NULL == copy_res) {
1030  // can only make a copy if Value is copy-constructible ... so this codepath should never be hit
1031  if constexpr (std::is_copy_constructible_v<std::decay_t<Value>>) {
1032  ttg_data_copy_t *new_copy = detail::create_new_datacopy(*static_cast<Value *>(copy_in->get_ptr()));
1033  if (replace && nullptr != copy_in->get_next_task()) {
1034  /* replace the task that was deferred */
1035  parsec_ttg_task_base_t *deferred_op = (parsec_ttg_task_base_t *)copy_in->get_next_task();
1036  new_copy->mark_mutable();
1037  /* replace the copy in the deferred task */
1038  for (int i = 0; i < deferred_op->data_count; ++i) {
1039  if (deferred_op->copies[i] == copy_in) {
1040  deferred_op->copies[i] = new_copy;
1041  break;
1042  }
1043  }
1044  copy_in->set_next_task(nullptr);
1045  deferred_op->release_task();
1046  copy_in->reset_readers(); // set the copy back to being read-only
1047  copy_in->increment_readers<false>(); // register as reader
1048  copy_res = copy_in; // return the copy we were passed
1049  } else {
1050  if (!readonly) {
1051  new_copy->mark_mutable();
1052  }
1053  copy_res = new_copy; // return the new copy
1054  }
1055  }
1056  else {
1057  throw std::logic_error(std::string("TTG::PaRSEC: need to copy a datum of type") + typeid(std::decay_t<Value>).name() + " but the type is not copyable");
1058  }
1059  }
1060  return copy_res;
1061  }
1062 
1063  } // namespace detail
1064 
1065  inline void ttg_initialize(int argc, char **argv, int num_threads, parsec_context_t *ctx) {
1066  if (detail::initialized_mpi()) throw std::runtime_error("ttg_parsec::ttg_initialize: can only be called once");
1067 
1068  // make sure it's not already initialized
1069  int mpi_initialized;
1070  MPI_Initialized(&mpi_initialized);
1071  if (!mpi_initialized) { // MPI not initialized? do it, remember that we did it
1072  int provided;
1073  MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided);
1074  if (!provided)
1075  throw std::runtime_error("ttg_parsec::ttg_initialize: MPI_Init_thread did not provide MPI_THREAD_MULTIPLE");
1076  detail::initialized_mpi() = true;
1077  } else { // no way to test that MPI was initialized with MPI_THREAD_MULTIPLE, cross fingers and proceed
1078  }
1079 
1081  auto world_ptr = new ttg_parsec::WorldImpl{&argc, &argv, num_threads, ctx};
1082  std::shared_ptr<ttg::base::WorldImplBase> world_sptr{static_cast<ttg::base::WorldImplBase *>(world_ptr)};
1083  ttg::World world{std::move(world_sptr)};
1084  ttg::detail::set_default_world(std::move(world));
1085 
1086  // query the first device ID
1088  for (int i = 0; i < parsec_nb_devices; ++i) {
1089  bool is_gpu = parsec_mca_device_is_gpu(i);
1090  if (detail::first_device_id == -1 && is_gpu) {
1092  } else if (detail::first_device_id > -1 && !is_gpu) {
1093  throw std::runtime_error("PaRSEC: Found non-GPU device in GPU ID range!");
1094  }
1095  }
1096 
1097  /* parse the maximum inline size */
1098  const char* ttg_max_inline_cstr = std::getenv("TTG_MAX_INLINE");
1099  if (nullptr != ttg_max_inline_cstr) {
1100  std::size_t inline_size = std::atol(ttg_max_inline_cstr);
1101  if (inline_size < detail::max_inline_size) {
1102  detail::max_inline_size = inline_size;
1103  }
1104  }
1105 
1106  bool all_peer_access = true;
1107  /* check whether all GPUs can access all peer GPUs */
1108  for (int i = 0; (i < parsec_nb_devices) && all_peer_access; ++i) {
1109  parsec_device_module_t *idevice = parsec_mca_device_get(i);
1110  if (PARSEC_DEV_IS_GPU(idevice->type)) {
1111  parsec_device_gpu_module_t *gpu_device = (parsec_device_gpu_module_t*)idevice;
1112  for (int j = 0; (j < parsec_nb_devices) && all_peer_access; ++j) {
1113  if (i != j) { // because no device can access itself, says PaRSEC
1114  parsec_device_module_t *jdevice = parsec_mca_device_get(j);
1115  if (PARSEC_DEV_IS_GPU(jdevice->type)) {
1116  all_peer_access &= (gpu_device->peer_access_mask & (1<<j)) ? true : false;
1117  }
1118  }
1119  }
1120  }
1121  }
1122  detail::all_devices_peer_access = all_peer_access;
1123  }
1124  inline void ttg_finalize() {
1125  // We need to notify the current taskpool of termination if we are in user termination detection mode
1126  // or the parsec_context_wait() in destroy_worlds() will never complete
1128  ttg::default_execution_context().impl().final_task();
1129  ttg::detail::set_default_world(ttg::World{}); // reset the default world
1131  ttg::detail::destroy_worlds<ttg_parsec::WorldImpl>();
1132  if (detail::initialized_mpi()) MPI_Finalize();
1133  }
1135  [[noreturn]]
1136  inline void ttg_abort() { MPI_Abort(ttg_default_execution_context().impl().comm(), 1); std::abort(); }
1137  inline void ttg_execute(ttg::World world) { world.impl().execute(); }
1138  inline void ttg_fence(ttg::World world) { world.impl().fence(); }
1139 
1140  template <typename T>
1141  inline void ttg_register_ptr(ttg::World world, const std::shared_ptr<T> &ptr) {
1142  world.impl().register_ptr(ptr);
1143  }
1144 
1145  template <typename T>
1146  inline void ttg_register_ptr(ttg::World world, std::unique_ptr<T> &&ptr) {
1147  world.impl().register_ptr(std::move(ptr));
1148  }
1149 
1150  inline void ttg_register_status(ttg::World world, const std::shared_ptr<std::promise<void>> &status_ptr) {
1151  world.impl().register_status(status_ptr);
1152  }
1153 
1154  template <typename Callback>
1155  inline void ttg_register_callback(ttg::World world, Callback &&callback) {
1156  world.impl().register_callback(std::forward<Callback>(callback));
1157  }
1158 
1159  inline ttg::Edge<> &ttg_ctl_edge(ttg::World world) { return world.impl().ctl_edge(); }
1160 
1161  inline void ttg_sum(ttg::World world, double &value) {
1162  double result = 0.0;
1163  MPI_Allreduce(&value, &result, 1, MPI_DOUBLE, MPI_SUM, world.impl().comm());
1164  value = result;
1165  }
1166 
1167  inline void make_executable_hook(ttg::World& world) {
1168  MPI_Barrier(world.impl().comm());
1169  }
1170 
1173  template <typename T>
1174  void ttg_broadcast(::ttg::World world, T &data, int source_rank) {
1175  int64_t BUFLEN;
1176  if (world.rank() == source_rank) {
1178  }
1179  MPI_Bcast(&BUFLEN, 1, MPI_INT64_T, source_rank, world.impl().comm());
1180 
1181  unsigned char *buf = new unsigned char[BUFLEN];
1182  if (world.rank() == source_rank) {
1184  }
1185  MPI_Bcast(buf, BUFLEN, MPI_UNSIGNED_CHAR, source_rank, world.impl().comm());
1186  if (world.rank() != source_rank) {
1188  }
1189  delete[] buf;
1190  }
1191 
1192  namespace detail {
1193 
1194  struct ParsecTTBase {
1195  protected:
1196  // static std::map<int, ParsecBaseTT*> function_id_to_instance;
1197  parsec_hash_table_t tasks_table;
1198  parsec_hash_table_t task_constraint_table;
1199  parsec_task_class_t self;
1200  };
1201 
1202  } // namespace detail
1203 
1204  template <typename keyT, typename output_terminalsT, typename derivedT, typename input_valueTs, ttg::ExecutionSpace Space>
1206  private:
1208  static_assert(ttg::meta::is_typelist_v<input_valueTs>,
1209  "The fourth template for ttg::TT must be a ttg::typelist containing the input types");
1210  // create a virtual control input if the input list is empty, to be used in invoke()
1211  using actual_input_tuple_type = std::conditional_t<!ttg::meta::typelist_is_empty_v<input_valueTs>,
1213  using input_tuple_type = ttg::meta::typelist_to_tuple_t<input_valueTs>;
1214  static_assert(ttg::meta::is_tuple_v<output_terminalsT>,
1215  "Second template argument for ttg::TT must be std::tuple containing the output terminal types");
1216  static_assert((ttg::meta::none_has_reference_v<input_valueTs>), "Input typelist cannot contain reference types");
1217  static_assert(ttg::meta::is_none_Void_v<input_valueTs>, "ttg::Void is for internal use only, do not use it");
1218 
1219  parsec_mempool_t mempools;
1220 
1221  // check for a non-type member named have_cuda_op
1222  template <typename T>
1223  using have_cuda_op_non_type_t = decltype(T::have_cuda_op);
1224 
1225  template <typename T>
1226  using have_hip_op_non_type_t = decltype(T::have_hip_op);
1227 
1228  template <typename T>
1229  using have_level_zero_op_non_type_t = decltype(T::have_level_zero_op);
1230 
1231  bool alive = true;
1232 
1233  static constexpr int numinedges = std::tuple_size_v<input_tuple_type>; // number of input edges
1234  static constexpr int numins = std::tuple_size_v<actual_input_tuple_type>; // number of input arguments
1235  static constexpr int numouts = std::tuple_size_v<output_terminalsT>; // number of outputs
1236  static constexpr int numflows = std::max(numins, numouts); // max number of flows
1237 
1238  public:
1240  template<typename DerivedT = derivedT>
1241  static constexpr bool derived_has_cuda_op() {
1242  return Space == ttg::ExecutionSpace::CUDA;
1243  }
1244 
1246  template<typename DerivedT = derivedT>
1247  static constexpr bool derived_has_hip_op() {
1248  return Space == ttg::ExecutionSpace::HIP;
1249  }
1250 
1252  template<typename DerivedT = derivedT>
1253  static constexpr bool derived_has_level_zero_op() {
1254  return Space == ttg::ExecutionSpace::L0;
1255  }
1256 
1258  template<typename DerivedT = derivedT>
1259  static constexpr bool derived_has_device_op() {
1260  return (derived_has_cuda_op<DerivedT>() ||
1261  derived_has_hip_op<DerivedT>() ||
1262  derived_has_level_zero_op<DerivedT>());
1263  }
1264 
1267  "Data sent from a device-capable template task must be serializable.");
1268 
1269  using ttT = TT;
1270  using key_type = keyT;
1272  using input_args_type = actual_input_tuple_type;
1274  // if have data inputs and (always last) control input, convert last input to Void to make logic easier
1276  ttg::meta::void_to_Void_tuple_t<ttg::meta::decayed_typelist_t<actual_input_tuple_type>>;
1278  ttg::meta::add_glvalue_reference_tuple_t<ttg::meta::void_to_Void_tuple_t<actual_input_tuple_type>>;
1279  using input_values_tuple_type = ttg::meta::drop_void_t<ttg::meta::decayed_typelist_t<input_tuple_type>>;
1280  using input_refs_tuple_type = ttg::meta::drop_void_t<ttg::meta::add_glvalue_reference_tuple_t<input_tuple_type>>;
1281 
1282  static constexpr int numinvals =
1283  std::tuple_size_v<input_refs_tuple_type>; // number of input arguments with values (i.e. omitting the control
1284  // input, if any)
1285 
1286  using output_terminals_type = output_terminalsT;
1288 
1289  template <std::size_t i, typename resultT, typename InTuple>
1290  static resultT get(InTuple &&intuple) {
1291  return static_cast<resultT>(std::get<i>(std::forward<InTuple>(intuple)));
1292  };
1293  template <std::size_t i, typename InTuple>
1294  static auto &get(InTuple &&intuple) {
1295  return std::get<i>(std::forward<InTuple>(intuple));
1296  };
1297 
1298  private:
1299  using task_t = detail::parsec_ttg_task_t<ttT>;
1300 
1302  friend task_t;
1303 
1304  /* the offset of the key placed after the task structure in the memory from mempool */
1305  constexpr static const size_t task_key_offset = sizeof(task_t);
1306 
1307  input_terminals_type input_terminals;
1308  output_terminalsT output_terminals;
1309 
1310  protected:
1311  const auto &get_output_terminals() const { return output_terminals; }
1312 
1313  private:
1314  template <std::size_t... IS>
1315  static constexpr auto make_set_args_fcts(std::index_sequence<IS...>) {
1316  using resultT = decltype(set_arg_from_msg_fcts);
1317  return resultT{{&TT::set_arg_from_msg<IS>...}};
1318  }
1319  constexpr static std::array<void (TT::*)(void *, std::size_t), numins> set_arg_from_msg_fcts =
1320  make_set_args_fcts(std::make_index_sequence<numins>{});
1321 
1322  template <std::size_t... IS>
1323  static constexpr auto make_set_size_fcts(std::index_sequence<IS...>) {
1324  using resultT = decltype(set_argstream_size_from_msg_fcts);
1325  return resultT{{&TT::argstream_set_size_from_msg<IS>...}};
1326  }
1327  constexpr static std::array<void (TT::*)(void *, std::size_t), numins> set_argstream_size_from_msg_fcts =
1328  make_set_size_fcts(std::make_index_sequence<numins>{});
1329 
1330  template <std::size_t... IS>
1331  static constexpr auto make_finalize_argstream_fcts(std::index_sequence<IS...>) {
1332  using resultT = decltype(finalize_argstream_from_msg_fcts);
1333  return resultT{{&TT::finalize_argstream_from_msg<IS>...}};
1334  }
1335  constexpr static std::array<void (TT::*)(void *, std::size_t), numins> finalize_argstream_from_msg_fcts =
1336  make_finalize_argstream_fcts(std::make_index_sequence<numins>{});
1337 
1338  template <std::size_t... IS>
1339  static constexpr auto make_get_from_pull_fcts(std::index_sequence<IS...>) {
1340  using resultT = decltype(get_from_pull_msg_fcts);
1341  return resultT{{&TT::get_from_pull_msg<IS>...}};
1342  }
1343  constexpr static std::array<void (TT::*)(void *, std::size_t), numinedges> get_from_pull_msg_fcts =
1344  make_get_from_pull_fcts(std::make_index_sequence<numinedges>{});
1345 
1346  template<std::size_t... IS>
1347  constexpr static auto make_input_is_const(std::index_sequence<IS...>) {
1348  using resultT = decltype(input_is_const);
1349  return resultT{{std::is_const_v<std::tuple_element_t<IS, input_args_type>>...}};
1350  }
1351  constexpr static std::array<bool, numins> input_is_const = make_input_is_const(std::make_index_sequence<numins>{});
1352 
1353  ttg::World world;
1354  ttg::meta::detail::keymap_t<keyT> keymap;
1355  ttg::meta::detail::keymap_t<keyT> priomap;
1356  ttg::meta::detail::keymap_t<keyT, ttg::device::Device> devicemap;
1357  // For now use same type for unary/streaming input terminals, and stream reducers assigned at runtime
1358  ttg::meta::detail::input_reducers_t<actual_input_tuple_type>
1359  input_reducers;
1360  std::array<parsec_task_class_t*, numins> inpute_reducers_taskclass = { nullptr };
1361  std::array<std::size_t, numins> static_stream_goal = { std::numeric_limits<std::size_t>::max() };
1362  int num_pullins = 0;
1363 
1364  bool m_defer_writer = TTG_PARSEC_DEFER_WRITER;
1365 
1366  std::vector<ttg::meta::detail::constraint_callback_t<keyT>> constraints_check;
1367  std::vector<ttg::meta::detail::constraint_callback_t<keyT>> constraints_complete;
1368 
1369  public:
1370  ttg::World get_world() const override final { return world; }
1371 
1372  private:
1376  template <typename... Args>
1377  auto op(Args &&...args) {
1378  derivedT *derived = static_cast<derivedT *>(this);
1379  using return_type = decltype(derived->op(std::forward<Args>(args)...));
1380  if constexpr (std::is_same_v<return_type,void>) {
1381  derived->op(std::forward<Args>(args)...);
1382  return;
1383  }
1384  else {
1385  return derived->op(std::forward<Args>(args)...);
1386  }
1387  }
1388 
1389  template <std::size_t i, typename terminalT, typename Key>
1390  void invoke_pull_terminal(terminalT &in, const Key &key, detail::parsec_ttg_task_base_t *task) {
1391  if (in.is_pull_terminal) {
1392  auto owner = in.container.owner(key);
1393  if (owner != world.rank()) {
1394  get_pull_terminal_data_from<i>(owner, key);
1395  } else {
1396  // push the data to the task
1397  set_arg<i>(key, (in.container).get(key));
1398  }
1399  }
1400  }
1401 
1402  template <std::size_t i, typename Key>
1403  void get_pull_terminal_data_from(const int owner,
1404  const Key &key) {
1405  using msg_t = detail::msg_t;
1406  auto &world_impl = world.impl();
1407  parsec_taskpool_t *tp = world_impl.taskpool();
1408  std::unique_ptr<msg_t> msg = std::make_unique<msg_t>(get_instance_id(), tp->taskpool_id,
1410  world.rank(), 1);
1411  /* pack the key */
1412  size_t pos = 0;
1413  pos = pack(key, msg->bytes, pos);
1414  tp->tdm.module->outgoing_message_start(tp, owner, NULL);
1415  tp->tdm.module->outgoing_message_pack(tp, owner, NULL, NULL, 0);
1416  parsec_ce.send_am(&parsec_ce, world_impl.parsec_ttg_tag(), owner, static_cast<void *>(msg.get()),
1417  sizeof(msg_header_t) + pos);
1418  }
1419 
1420  template <std::size_t... IS, typename Key = keyT>
1421  void invoke_pull_terminals(std::index_sequence<IS...>, const Key &key, detail::parsec_ttg_task_base_t *task) {
1422  int junk[] = {0, (invoke_pull_terminal<IS>(
1423  std::get<IS>(input_terminals), key, task),
1424  0)...};
1425  junk[0]++;
1426  }
1427 
1428  template <std::size_t... IS>
1429  static input_refs_tuple_type make_tuple_of_ref_from_array(task_t *task, std::index_sequence<IS...>) {
1430  return input_refs_tuple_type{static_cast<std::tuple_element_t<IS, input_refs_tuple_type>>(
1431  *reinterpret_cast<std::remove_reference_t<std::tuple_element_t<IS, input_refs_tuple_type>> *>(
1432  task->copies[IS]->get_ptr()))...};
1433  }
1434 
1435 #ifdef TTG_HAVE_DEVICE
1439  static int device_static_submit(parsec_device_gpu_module_t *gpu_device,
1440  parsec_gpu_task_t *gpu_task,
1441  parsec_gpu_exec_stream_t *gpu_stream) {
1442 
1443  task_t *task = (task_t*)gpu_task->ec;
1444  // get the device task from the coroutine handle
1445  ttg::device::Task dev_task = ttg::device::detail::device_task_handle_type::from_address(task->suspended_task_address);
1446 
1447  task->dev_ptr->stream = gpu_stream;
1448 
1449  //std::cout << "device_static_submit task " << task << std::endl;
1450 
1451  // get the promise which contains the views
1452  auto dev_data = dev_task.promise();
1453 
1454  /* we should still be waiting for the transfer to complete */
1455  assert(dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_WAIT_TRANSFER ||
1456  dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_WAIT_KERNEL);
1457 
1458 #if defined(PARSEC_HAVE_DEV_CUDA_SUPPORT) && defined(TTG_HAVE_CUDA)
1459  {
1460  parsec_cuda_exec_stream_t *cuda_stream = (parsec_cuda_exec_stream_t *)gpu_stream;
1461  int device = detail::parsec_device_to_ttg_device(gpu_device->super.device_index);
1462  ttg::device::detail::set_current(device, cuda_stream->cuda_stream);
1463  }
1464 #elif defined(PARSEC_HAVE_DEV_HIP_SUPPORT) && defined(TTG_HAVE_HIP)
1465  {
1466  parsec_hip_exec_stream_t *hip_stream = (parsec_hip_exec_stream_t *)gpu_stream;
1467  int device = detail::parsec_device_to_ttg_device(gpu_device->super.device_index);
1468  ttg::device::detail::set_current(device, hip_stream->hip_stream);
1469  }
1470 #elif defined(PARSEC_HAVE_DEV_LEVEL_ZERO_SUPPORT) && defined(TTG_HAVE_LEVEL_ZERO)
1471  {
1472  parsec_level_zero_exec_stream_t *stream;
1473  stream = (parsec_level_zero_exec_stream_t *)gpu_stream;
1474  int device = detail::parsec_device_to_ttg_device(gpu_device->super.device_index);
1475  ttg::device::detail::set_current(device, stream->swq->queue);
1476  }
1477 #endif // defined(PARSEC_HAVE_DEV_LEVEL_ZERO_SUPPORT) && defined(TTG_HAVE_LEVEL_ZERO)
1478 
1479  /* Here we call back into the coroutine again after the transfers have completed */
1480  static_op(&task->parsec_task);
1481 
1483 
1484  auto discard_tmp_flows = [&](){
1485  for (int i = 0; i < MAX_PARAM_COUNT; ++i) {
1486  if (gpu_task->flow[i]->flow_flags & TTG_PARSEC_FLOW_ACCESS_TMP) {
1487  /* temporary flow, discard by setting it to read-only to avoid evictions */
1488  const_cast<parsec_flow_t*>(gpu_task->flow[i])->flow_flags = PARSEC_FLOW_ACCESS_READ;
1489  task->parsec_task.data[i].data_out->readers = 1;
1490  }
1491  }
1492  };
1493 
1494  /* we will come back into this function once the kernel and transfers are done */
1495  int rc = PARSEC_HOOK_RETURN_DONE;
1496  if (nullptr != task->suspended_task_address) {
1497  /* Get a new handle for the promise*/
1498  dev_task = ttg::device::detail::device_task_handle_type::from_address(task->suspended_task_address);
1499  dev_data = dev_task.promise();
1500 
1501  assert(dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_WAIT_KERNEL ||
1502  dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_SENDOUT ||
1503  dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_COMPLETE);
1504 
1505  if (ttg::device::detail::TTG_DEVICE_CORO_SENDOUT == dev_data.state() ||
1506  ttg::device::detail::TTG_DEVICE_CORO_COMPLETE == dev_data.state()) {
1507  /* the task started sending so we won't come back here */
1508  //std::cout << "device_static_submit task " << task << " complete" << std::endl;
1509  discard_tmp_flows();
1510  } else {
1511  //std::cout << "device_static_submit task " << task << " return-again" << std::endl;
1512  rc = PARSEC_HOOK_RETURN_AGAIN;
1513  }
1514  } else {
1515  /* the task is done so we won't come back here */
1516  //std::cout << "device_static_submit task " << task << " complete" << std::endl;
1517  discard_tmp_flows();
1518  }
1519  return rc;
1520  }
1521 
1522  static parsec_hook_return_t device_static_evaluate(parsec_task_t* parsec_task) {
1523 
1524  task_t *task = (task_t*)parsec_task;
1525  if (task->dev_ptr->gpu_task == nullptr) {
1526 
1527  /* set up a device task */
1528  parsec_gpu_task_t *gpu_task;
1529  /* PaRSEC wants to free the gpu_task, because F***K ownerships */
1530  gpu_task = static_cast<parsec_gpu_task_t*>(std::calloc(1, sizeof(*gpu_task)));
1531  PARSEC_OBJ_CONSTRUCT(gpu_task, parsec_list_item_t);
1532  gpu_task->ec = parsec_task;
1533  gpu_task->task_type = 0; // user task
1534  gpu_task->last_data_check_epoch = std::numeric_limits<uint64_t>::max(); // used internally
1535  gpu_task->pushout = 0;
1536  gpu_task->submit = &TT::device_static_submit;
1537 
1538  // one way to force the task device
1539  // currently this will probably break all of PaRSEC if this hint
1540  // does not match where the data is located, not really useful for us
1541  // instead we set a hint on the data if there is no hint set yet
1542  //parsec_task->selected_device = ...;
1543 
1544  /* set the gpu_task so it's available in register_device_memory */
1545  task->dev_ptr->gpu_task = gpu_task;
1546 
1547  /* TODO: is this the right place to set the mask? */
1548  task->parsec_task.chore_mask = PARSEC_DEV_ALL;
1549 
1550  /* copy over the task class, because that's what we need */
1551  task->dev_ptr->task_class = *task->parsec_task.task_class;
1552 
1553  // first invocation of the coroutine to get the coroutine handle
1554  static_op(parsec_task);
1555 
1556  /* when we come back here, the flows in gpu_task are set (see register_device_memory) */
1557 
1558  parsec_task_class_t& tc = task->dev_ptr->task_class;
1559 
1560  // input flows are set up during register_device_memory as part of the first invocation above
1561  for (int i = 0; i < MAX_PARAM_COUNT; ++i) {
1562  tc.in[i] = gpu_task->flow[i];
1563  tc.out[i] = gpu_task->flow[i];
1564  }
1565  tc.nb_flows = MAX_PARAM_COUNT;
1566 
1567  /* set the device hint on the data */
1568  TT *tt = task->tt;
1569  if (tt->devicemap) {
1570  int parsec_dev;
1571  if constexpr (std::is_void_v<keyT>) {
1572  parsec_dev = detail::ttg_device_to_parsec_device(tt->devicemap());
1573  } else {
1574  parsec_dev = detail::ttg_device_to_parsec_device(tt->devicemap(task->key));
1575  }
1576  for (int i = 0; i < MAX_PARAM_COUNT; ++i) {
1577  /* only set on mutable data since we have exclusive access */
1578  if (tc.in[i]->flow_flags & PARSEC_FLOW_ACCESS_WRITE) {
1579  parsec_data_t *data = parsec_task->data[i].data_in->original;
1580  /* only set the preferred device if the host has the latest copy
1581  * as otherwise we may end up with the wrong data if there is a newer
1582  * version on a different device. Also, keep fingers crossed. */
1583  if (data->owner_device == 0) {
1584  parsec_advise_data_on_device(data, parsec_dev, PARSEC_DEV_DATA_ADVICE_PREFERRED_DEVICE);
1585  }
1586  }
1587  }
1588  }
1589 
1590  /* set the new task class that contains the flows */
1591  task->parsec_task.task_class = &task->dev_ptr->task_class;
1592 
1593  /* select this one */
1594  return PARSEC_HOOK_RETURN_DONE;
1595  }
1596 
1597  std::cerr << "EVALUATE called on task with assigned GPU task!" << std::endl;
1598 
1599  /* not sure if this might happen*/
1600  return PARSEC_HOOK_RETURN_ERROR;
1601 
1602  }
1603 
1604  static parsec_hook_return_t device_static_op(parsec_task_t* parsec_task) {
1605  static_assert(derived_has_device_op());
1606 
1607  /* when we come in here we have a device assigned and are ready to go */
1608 
1609  task_t *task = (task_t*)parsec_task;
1610 
1611  if (nullptr == task->suspended_task_address) {
1612  /* short-cut in case the task returned immediately */
1613  return PARSEC_HOOK_RETURN_DONE;
1614  }
1615 
1616  // get the device task from the coroutine handle
1617  auto dev_task = ttg::device::detail::device_task_handle_type::from_address(task->suspended_task_address);
1618 
1619  // get the promise which contains the views
1620  ttg::device::detail::device_task_promise_type& dev_data = dev_task.promise();
1621 
1622  if (dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_SENDOUT ||
1623  dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_COMPLETE) {
1624  /* the task jumped directly to send or returned so we're done */
1625  return PARSEC_HOOK_RETURN_DONE;
1626  }
1627 
1628  parsec_device_gpu_module_t *device = (parsec_device_gpu_module_t*)task->parsec_task.selected_device;
1629  assert(NULL != device);
1630 
1631  task->dev_ptr->device = device;
1632  parsec_gpu_task_t *gpu_task = task->dev_ptr->gpu_task;
1633  parsec_execution_stream_s *es = task->tt->world.impl().execution_stream();
1634 
1635  switch(device->super.type) {
1636 
1637 #if defined(PARSEC_HAVE_DEV_CUDA_SUPPORT)
1638  case PARSEC_DEV_CUDA:
1639  if constexpr (Space == ttg::ExecutionSpace::CUDA) {
1640  /* TODO: we need custom staging functions because PaRSEC looks at the
1641  * task-class to determine the number of flows. */
1642  gpu_task->stage_in = parsec_default_gpu_stage_in;
1643  gpu_task->stage_out = parsec_default_gpu_stage_out;
1644  return parsec_device_kernel_scheduler(&device->super, es, gpu_task);
1645  }
1646  break;
1647 #endif
1648 #if defined(PARSEC_HAVE_DEV_HIP_SUPPORT)
1649  case PARSEC_DEV_HIP:
1650  if constexpr (Space == ttg::ExecutionSpace::HIP) {
1651  gpu_task->stage_in = parsec_default_gpu_stage_in;
1652  gpu_task->stage_out = parsec_default_gpu_stage_out;
1653  return parsec_device_kernel_scheduler(&device->super, es, gpu_task);
1654  }
1655  break;
1656 #endif // PARSEC_HAVE_DEV_HIP_SUPPORT
1657 #if defined(PARSEC_HAVE_DEV_LEVEL_ZERO_SUPPORT)
1658  case PARSEC_DEV_LEVEL_ZERO:
1659  if constexpr (Space == ttg::ExecutionSpace::L0) {
1660  gpu_task->stage_in = parsec_default_gpu_stage_in;
1661  gpu_task->stage_out = parsec_default_gpu_stage_out;
1662  return parsec_device_kernel_scheduler(&device->super, es, gpu_task);
1663  }
1664  break;
1665 #endif // PARSEC_HAVE_DEV_LEVEL_ZERO_SUPPORT
1666  default:
1667  break;
1668  }
1669  ttg::print_error(task->tt->get_name(), " : received mismatching device type ", (int)device->super.type, " from PaRSEC");
1670  ttg::abort();
1671  return PARSEC_HOOK_RETURN_DONE; // will not be reacehed
1672  }
1673 #endif // TTG_HAVE_DEVICE
1674 
1675  static parsec_hook_return_t static_op(parsec_task_t *parsec_task) {
1676 
1677  task_t *task = (task_t*)parsec_task;
1678  void* suspended_task_address =
1679 #ifdef TTG_HAVE_COROUTINE
1680  task->suspended_task_address; // non-null = need to resume the task
1681 #else // TTG_HAVE_COROUTINE
1682  nullptr;
1683 #endif // TTG_HAVE_COROUTINE
1684  //std::cout << "static_op: suspended_task_address " << suspended_task_address << std::endl;
1685  if (suspended_task_address == nullptr) { // task is a coroutine that has not started or an ordinary function
1686 
1687  ttT *baseobj = task->tt;
1688  derivedT *obj = static_cast<derivedT *>(baseobj);
1689  assert(detail::parsec_ttg_caller == nullptr);
1690  detail::parsec_ttg_caller = static_cast<detail::parsec_ttg_task_base_t*>(task);
1691  if (obj->tracing()) {
1692  if constexpr (!ttg::meta::is_void_v<keyT>)
1693  ttg::trace(obj->get_world().rank(), ":", obj->get_name(), " : ", task->key, ": executing");
1694  else
1695  ttg::trace(obj->get_world().rank(), ":", obj->get_name(), " : executing");
1696  }
1697 
1698  if constexpr (!ttg::meta::is_void_v<keyT> && !ttg::meta::is_empty_tuple_v<input_values_tuple_type>) {
1699  auto input = make_tuple_of_ref_from_array(task, std::make_index_sequence<numinvals>{});
1700  TTG_PROCESS_TT_OP_RETURN(suspended_task_address, task->coroutine_id, baseobj->op(task->key, std::move(input), obj->output_terminals));
1701  } else if constexpr (!ttg::meta::is_void_v<keyT> && ttg::meta::is_empty_tuple_v<input_values_tuple_type>) {
1702  TTG_PROCESS_TT_OP_RETURN(suspended_task_address, task->coroutine_id, baseobj->op(task->key, obj->output_terminals));
1703  } else if constexpr (ttg::meta::is_void_v<keyT> && !ttg::meta::is_empty_tuple_v<input_values_tuple_type>) {
1704  auto input = make_tuple_of_ref_from_array(task, std::make_index_sequence<numinvals>{});
1705  TTG_PROCESS_TT_OP_RETURN(suspended_task_address, task->coroutine_id, baseobj->op(std::move(input), obj->output_terminals));
1706  } else if constexpr (ttg::meta::is_void_v<keyT> && ttg::meta::is_empty_tuple_v<input_values_tuple_type>) {
1707  TTG_PROCESS_TT_OP_RETURN(suspended_task_address, task->coroutine_id, baseobj->op(obj->output_terminals));
1708  } else {
1709  ttg::abort();
1710  }
1711  detail::parsec_ttg_caller = nullptr;
1712  }
1713  else { // resume the suspended coroutine
1714 
1715 #ifdef TTG_HAVE_COROUTINE
1716  assert(task->coroutine_id != ttg::TaskCoroutineID::Invalid);
1717 
1718 #ifdef TTG_HAVE_DEVICE
1719  if (task->coroutine_id == ttg::TaskCoroutineID::DeviceTask) {
1720  ttg::device::Task coro = ttg::device::detail::device_task_handle_type::from_address(suspended_task_address);
1721  assert(detail::parsec_ttg_caller == nullptr);
1722  detail::parsec_ttg_caller = static_cast<detail::parsec_ttg_task_base_t*>(task);
1723  // TODO: unify the outputs tls handling
1724  auto old_output_tls_ptr = task->tt->outputs_tls_ptr_accessor();
1725  task->tt->set_outputs_tls_ptr();
1726  coro.resume();
1727  if (coro.completed()) {
1728  coro.destroy();
1729  suspended_task_address = nullptr;
1730  }
1731  task->tt->set_outputs_tls_ptr(old_output_tls_ptr);
1732  detail::parsec_ttg_caller = nullptr;
1733  } else
1734 #endif // TTG_HAVE_DEVICE
1735  if (task->coroutine_id == ttg::TaskCoroutineID::ResumableTask) {
1736  auto ret = static_cast<ttg::resumable_task>(ttg::coroutine_handle<ttg::resumable_task_state>::from_address(suspended_task_address));
1737  assert(ret.ready());
1738  auto old_output_tls_ptr = task->tt->outputs_tls_ptr_accessor();
1739  task->tt->set_outputs_tls_ptr();
1740  ret.resume();
1741  if (ret.completed()) {
1742  ret.destroy();
1743  suspended_task_address = nullptr;
1744  }
1745  else { // not yet completed
1746  // leave suspended_task_address as is
1747 
1748  // right now can events are not properly implemented, we are only testing the workflow with dummy events
1749  // so mark the events finished manually, parsec will rerun this task again and it should complete the second time
1750  auto events = static_cast<ttg::resumable_task>(ttg::coroutine_handle<ttg::resumable_task_state>::from_address(suspended_task_address)).events();
1751  for (auto &event_ptr : events) {
1752  event_ptr->finish();
1753  }
1754  assert(ttg::coroutine_handle<ttg::resumable_task_state>::from_address(suspended_task_address).promise().ready());
1755  }
1756  task->tt->set_outputs_tls_ptr(old_output_tls_ptr);
1757  detail::parsec_ttg_caller = nullptr;
1758  task->suspended_task_address = suspended_task_address;
1759  }
1760  else
1761  ttg::abort(); // unrecognized task id
1762 #else // TTG_HAVE_COROUTINE
1763  ttg::abort(); // should not happen
1764 #endif // TTG_HAVE_COROUTINE
1765  }
1766 #ifdef TTG_HAVE_COROUTINE
1767  task->suspended_task_address = suspended_task_address;
1768 #endif // TTG_HAVE_COROUTINE
1769  if (suspended_task_address == nullptr) {
1770  ttT *baseobj = task->tt;
1771  derivedT *obj = static_cast<derivedT *>(baseobj);
1772  if (obj->tracing()) {
1773  if constexpr (!ttg::meta::is_void_v<keyT>)
1774  ttg::trace(obj->get_world().rank(), ":", obj->get_name(), " : ", task->key, ": done executing");
1775  else
1776  ttg::trace(obj->get_world().rank(), ":", obj->get_name(), " : done executing");
1777  }
1778  }
1779 
1780  return PARSEC_HOOK_RETURN_DONE;
1781  }
1782 
1783  static parsec_hook_return_t static_op_noarg(parsec_task_t *parsec_task) {
1784  task_t *task = static_cast<task_t*>(parsec_task);
1785 
1786  void* suspended_task_address =
1787 #ifdef TTG_HAVE_COROUTINE
1788  task->suspended_task_address; // non-null = need to resume the task
1789 #else // TTG_HAVE_COROUTINE
1790  nullptr;
1791 #endif // TTG_HAVE_COROUTINE
1792  if (suspended_task_address == nullptr) { // task is a coroutine that has not started or an ordinary function
1793  ttT *baseobj = (ttT *)task->object_ptr;
1794  derivedT *obj = (derivedT *)task->object_ptr;
1795  assert(detail::parsec_ttg_caller == NULL);
1797  if constexpr (!ttg::meta::is_void_v<keyT>) {
1798  TTG_PROCESS_TT_OP_RETURN(suspended_task_address, task->coroutine_id, baseobj->op(task->key, obj->output_terminals));
1799  } else if constexpr (ttg::meta::is_void_v<keyT>) {
1800  TTG_PROCESS_TT_OP_RETURN(suspended_task_address, task->coroutine_id, baseobj->op(obj->output_terminals));
1801  } else // unreachable
1802  ttg:: abort();
1804  }
1805  else {
1806 #ifdef TTG_HAVE_COROUTINE
1807  auto ret = static_cast<ttg::resumable_task>(ttg::coroutine_handle<ttg::resumable_task_state>::from_address(suspended_task_address));
1808  assert(ret.ready());
1809  ret.resume();
1810  if (ret.completed()) {
1811  ret.destroy();
1812  suspended_task_address = nullptr;
1813  }
1814  else { // not yet completed
1815  // leave suspended_task_address as is
1816  }
1817 #else // TTG_HAVE_COROUTINE
1818  ttg::abort(); // should not happen
1819 #endif // TTG_HAVE_COROUTINE
1820  }
1821  task->suspended_task_address = suspended_task_address;
1822 
1823  if (suspended_task_address) {
1824  ttg::abort(); // not yet implemented
1825  // see comments in static_op()
1826  return PARSEC_HOOK_RETURN_AGAIN;
1827  }
1828  else
1829  return PARSEC_HOOK_RETURN_DONE;
1830  }
1831 
1832  template <std::size_t i>
1833  static parsec_hook_return_t static_reducer_op(parsec_execution_stream_s *es, parsec_task_t *parsec_task) {
1834  using rtask_t = detail::reducer_task_t;
1835  using value_t = std::tuple_element_t<i, actual_input_tuple_type>;
1836  constexpr const bool val_is_void = ttg::meta::is_void_v<value_t>;
1837  constexpr const bool input_is_const = std::is_const_v<value_t>;
1838  rtask_t *rtask = (rtask_t*)parsec_task;
1839  task_t *parent_task = static_cast<task_t*>(rtask->parent_task);
1840  ttT *baseobj = parent_task->tt;
1841  derivedT *obj = static_cast<derivedT *>(baseobj);
1842 
1843  auto& reducer = std::get<i>(baseobj->input_reducers);
1844 
1845  //std::cout << "static_reducer_op " << parent_task->key << std::endl;
1846 
1847  if (obj->tracing()) {
1848  if constexpr (!ttg::meta::is_void_v<keyT>)
1849  ttg::trace(obj->get_world().rank(), ":", obj->get_name(), " : ", parent_task->key, ": reducer executing");
1850  else
1851  ttg::trace(obj->get_world().rank(), ":", obj->get_name(), " : reducer executing");
1852  }
1853 
1854  /* the copy to reduce into */
1855  detail::ttg_data_copy_t *target_copy;
1856  target_copy = parent_task->copies[i];
1857  assert(val_is_void || nullptr != target_copy);
1858  /* once we hit 0 we have to stop since another thread might enqueue a new reduction task */
1859  std::size_t c = 0;
1860  std::size_t size = 0;
1861  assert(parent_task->streams[i].reduce_count > 0);
1862  if (rtask->is_first) {
1863  if (0 == (parent_task->streams[i].reduce_count.fetch_sub(1, std::memory_order_acq_rel)-1)) {
1864  /* we were the first and there is nothing to be done */
1865  if (obj->tracing()) {
1866  if constexpr (!ttg::meta::is_void_v<keyT>)
1867  ttg::trace(obj->get_world().rank(), ":", obj->get_name(), " : ", parent_task->key, ": first reducer empty");
1868  else
1869  ttg::trace(obj->get_world().rank(), ":", obj->get_name(), " : first reducer empty");
1870  }
1871 
1872  return PARSEC_HOOK_RETURN_DONE;
1873  }
1874  }
1875 
1876  assert(detail::parsec_ttg_caller == NULL);
1877  detail::parsec_ttg_caller = rtask->parent_task;
1878 
1879  do {
1880  if constexpr(!val_is_void) {
1881  /* the copies to reduce out of */
1882  detail::ttg_data_copy_t *source_copy;
1883  parsec_list_item_t *item;
1884  item = parsec_lifo_pop(&parent_task->streams[i].reduce_copies);
1885  if (nullptr == item) {
1886  // maybe someone is changing the goal right now
1887  break;
1888  }
1889  source_copy = ((detail::ttg_data_copy_self_t *)(item))->self;
1890  assert(target_copy->num_readers() == target_copy->mutable_tag);
1891  assert(source_copy->num_readers() > 0);
1892  reducer(*reinterpret_cast<std::decay_t<value_t> *>(target_copy->get_ptr()),
1893  *reinterpret_cast<std::decay_t<value_t> *>(source_copy->get_ptr()));
1894  detail::release_data_copy(source_copy);
1895  } else if constexpr(val_is_void) {
1896  reducer(); // invoke control reducer
1897  }
1898  // there is only one task working on this stream, so no need to be atomic here
1899  size = ++parent_task->streams[i].size;
1900  //std::cout << "static_reducer_op size " << size << " of " << parent_task->streams[i].goal << std::endl;
1901  } while ((c = (parent_task->streams[i].reduce_count.fetch_sub(1, std::memory_order_acq_rel)-1)) > 0);
1902  //} while ((c = (--task->streams[i].reduce_count)) > 0);
1903 
1904  /* finalize_argstream sets goal to 1, so size may be larger than goal */
1905  bool complete = (size >= parent_task->streams[i].goal);
1906 
1907  //std::cout << "static_reducer_op size " << size
1908  // << " of " << parent_task->streams[i].goal << " complete " << complete
1909  // << " c " << c << std::endl;
1910  if (complete && c == 0) {
1911  if constexpr(input_is_const) {
1912  /* make the consumer task a reader if its input is const */
1913  target_copy->reset_readers();
1914  }
1915  /* task may not be runnable yet because other inputs are missing, have release_task decide */
1916  parent_task->remove_from_hash = true;
1917  parent_task->release_task(parent_task);
1918  }
1919 
1921 
1922  if (obj->tracing()) {
1923  if constexpr (!ttg::meta::is_void_v<keyT>)
1924  ttg::trace(obj->get_world().rank(), ":", obj->get_name(), " : ", parent_task->key, ": done executing");
1925  else
1926  ttg::trace(obj->get_world().rank(), ":", obj->get_name(), " : done executing");
1927  }
1928 
1929  return PARSEC_HOOK_RETURN_DONE;
1930  }
1931 
1932 
1933  protected:
1934  template <typename T>
1935  uint64_t unpack(T &obj, void *_bytes, uint64_t pos) {
1937  uint64_t payload_size;
1938  if constexpr (!dd_t::serialize_size_is_const) {
1939  pos = ttg::default_data_descriptor<uint64_t>::unpack_payload(&payload_size, sizeof(uint64_t), pos, _bytes);
1940  } else {
1941  payload_size = dd_t::payload_size(&obj);
1942  }
1943  pos = dd_t::unpack_payload(&obj, payload_size, pos, _bytes);
1944  return pos;
1945  }
1946 
1947  template <typename T>
1948  uint64_t pack(T &obj, void *bytes, uint64_t pos, detail::ttg_data_copy_t *copy = nullptr) {
1950  uint64_t payload_size = dd_t::payload_size(&obj);
1951  if constexpr (!dd_t::serialize_size_is_const) {
1952  pos = ttg::default_data_descriptor<uint64_t>::pack_payload(&payload_size, sizeof(uint64_t), pos, bytes);
1953  }
1954  pos = dd_t::pack_payload(&obj, payload_size, pos, bytes);
1955  return pos;
1956  }
1957 
1958  static void static_set_arg(void *data, std::size_t size, ttg::TTBase *bop) {
1959  assert(size >= sizeof(msg_header_t) &&
1960  "Trying to unpack as message that does not hold enough bytes to represent a single header");
1961  msg_header_t *hd = static_cast<msg_header_t *>(data);
1962  derivedT *obj = reinterpret_cast<derivedT *>(bop);
1963  switch (hd->fn_id) {
1965  if (0 <= hd->param_id) {
1966  assert(hd->param_id >= 0);
1967  assert(hd->param_id < obj->set_arg_from_msg_fcts.size());
1968  auto member = obj->set_arg_from_msg_fcts[hd->param_id];
1969  (obj->*member)(data, size);
1970  } else {
1971  // there is no good reason to have negative param ids
1972  ttg::abort();
1973  }
1974  break;
1975  }
1977  assert(hd->param_id >= 0);
1978  assert(hd->param_id < obj->set_argstream_size_from_msg_fcts.size());
1979  auto member = obj->set_argstream_size_from_msg_fcts[hd->param_id];
1980  (obj->*member)(data, size);
1981  break;
1982  }
1984  assert(hd->param_id >= 0);
1985  assert(hd->param_id < obj->finalize_argstream_from_msg_fcts.size());
1986  auto member = obj->finalize_argstream_from_msg_fcts[hd->param_id];
1987  (obj->*member)(data, size);
1988  break;
1989  }
1991  assert(hd->param_id >= 0);
1992  assert(hd->param_id < obj->get_from_pull_msg_fcts.size());
1993  auto member = obj->get_from_pull_msg_fcts[hd->param_id];
1994  (obj->*member)(data, size);
1995  break;
1996  }
1997  default:
1998  ttg::abort();
1999  }
2000  }
2001 
2003  inline parsec_thread_mempool_t *get_task_mempool(void) {
2004  auto &world_impl = world.impl();
2005  parsec_execution_stream_s *es = world_impl.execution_stream();
2006  int index = (es->virtual_process->vp_id * es->virtual_process->nb_cores + es->th_id);
2007  return &mempools.thread_mempools[index];
2008  }
2009 
2010  template <size_t i, typename valueT>
2011  void set_arg_from_msg_keylist(ttg::span<keyT> &&keylist, detail::ttg_data_copy_t *copy) {
2012  /* create a dummy task that holds the copy, which can be reused by others */
2013  task_t *dummy;
2014  parsec_execution_stream_s *es = world.impl().execution_stream();
2015  parsec_thread_mempool_t *mempool = get_task_mempool();
2016  dummy = new (parsec_thread_mempool_allocate(mempool)) task_t(mempool, &this->self, this);
2017  dummy->set_dummy(true);
2018  // TODO: do we need to copy static_stream_goal in dummy?
2019 
2020  /* set the received value as the dummy's only data */
2021  dummy->copies[0] = copy;
2022 
2023  /* We received the task on this world, so it's using the same taskpool */
2024  dummy->parsec_task.taskpool = world.impl().taskpool();
2025 
2026  /* save the current task and set the dummy task */
2027  auto parsec_ttg_caller_save = detail::parsec_ttg_caller;
2028  detail::parsec_ttg_caller = dummy;
2029 
2030  /* iterate over the keys and have them use the copy we made */
2031  parsec_task_t *task_ring = nullptr;
2032  for (auto &&key : keylist) {
2033  // copy-constructible? can broadcast to any number of keys
2034  if constexpr (std::is_copy_constructible_v<valueT>) {
2035  set_arg_local_impl<i>(key, *reinterpret_cast<valueT *>(copy->get_ptr()), copy, &task_ring);
2036  }
2037  else {
2038  // not copy-constructible? can move, but only to single key
2039  static_assert(!std::is_reference_v<valueT>);
2040  if (std::size(keylist) == 1)
2041  set_arg_local_impl<i>(key, std::move(*reinterpret_cast<valueT *>(copy->get_ptr())), copy, &task_ring);
2042  else {
2043  throw std::logic_error(std::string("TTG::PaRSEC: need to copy a datum of type") + typeid(std::decay_t<valueT>).name() + " but the type is not copyable");
2044  }
2045  }
2046  }
2047 
2048  if (nullptr != task_ring) {
2049  auto &world_impl = world.impl();
2050  parsec_task_t *vp_task_ring[1] = { task_ring };
2051  __parsec_schedule_vp(world_impl.execution_stream(), vp_task_ring, 0);
2052  }
2053 
2054  /* restore the previous task */
2055  detail::parsec_ttg_caller = parsec_ttg_caller_save;
2056 
2057  /* release the dummy task */
2058  complete_task_and_release(es, &dummy->parsec_task);
2059  parsec_thread_mempool_free(mempool, &dummy->parsec_task);
2060  }
2061 
2062  // there are 6 types of set_arg:
2063  // - case 1: nonvoid Key, complete Value type
2064  // - case 2: nonvoid Key, void Value, mixed (data+control) inputs
2065  // - case 3: nonvoid Key, void Value, no inputs
2066  // - case 4: void Key, complete Value type
2067  // - case 5: void Key, void Value, mixed (data+control) inputs
2068  // - case 6: void Key, void Value, no inputs
2069  // implementation of these will be further split into "local-only" and global+local
2070 
2071  template <std::size_t i>
2072  void set_arg_from_msg(void *data, std::size_t size) {
2073  using valueT = std::tuple_element_t<i, actual_input_tuple_type>;
2074  using msg_t = detail::msg_t;
2075  msg_t *msg = static_cast<msg_t *>(data);
2076  if constexpr (!ttg::meta::is_void_v<keyT>) {
2077  /* unpack the keys */
2078  /* TODO: can we avoid copying all the keys?! */
2079  uint64_t pos = msg->tt_id.key_offset;
2080  uint64_t key_end_pos;
2081  std::vector<keyT> keylist;
2082  int num_keys = msg->tt_id.num_keys;
2083  keylist.reserve(num_keys);
2084  auto rank = world.rank();
2085  for (int k = 0; k < num_keys; ++k) {
2086  keyT key;
2087  pos = unpack(key, msg->bytes, pos);
2088  assert(keymap(key) == rank);
2089  keylist.push_back(std::move(key));
2090  }
2091  key_end_pos = pos;
2092  /* jump back to the beginning of the message to get the value */
2093  pos = 0;
2094  // case 1
2095  if constexpr (!ttg::meta::is_void_v<valueT>) {
2096  using decvalueT = std::decay_t<valueT>;
2097  int32_t num_iovecs = msg->tt_id.num_iovecs;
2098  //bool inline_data = msg->inline_data;
2102  using metadata_t = decltype(descr.get_metadata(std::declval<decvalueT>()));
2103 
2104  /* unpack the metadata */
2105  metadata_t metadata;
2106  pos = unpack(metadata, msg->bytes, pos);
2107 
2108  //std::cout << "set_arg_from_msg splitmd num_iovecs " << num_iovecs << std::endl;
2109 
2110  copy = detail::create_new_datacopy(descr.create_from_metadata(metadata));
2111  } else if constexpr (!ttg::has_split_metadata<decvalueT>::value) {
2112  copy = detail::create_new_datacopy(decvalueT{});
2113 #if 0
2114  // TODO: first attempt at sending directly to the device
2115  parsec_gpu_data_copy_t* gpu_elem;
2116  gpu_elem = PARSEC_DATA_GET_COPY(master, gpu_device->super.device_index);
2117  int i = detail::first_device_id;
2118  int devid = detail::first_device_id;
2119  while (i < parsec_nb_devices) {
2120  if (nullptr == gpu_elem) {
2121  gpu_elem = PARSEC_OBJ_NEW(parsec_data_copy_t);
2122  gpu_elem->flags = PARSEC_DATA_FLAG_PARSEC_OWNED | PARSEC_DATA_FLAG_PARSEC_MANAGED;
2123  gpu_elem->coherency_state = PARSEC_DATA_COHERENCY_INVALID;
2124  gpu_elem->version = 0;
2125  gpu_elem->coherency_state = PARSEC_DATA_COHERENCY_OWNED;
2126  }
2127  if (nullptr == gpu_elem->device_private) {
2128  gpu_elem->device_private = zone_malloc(gpu_device->memory, gpu_task->flow_nb_elts[i]);
2129  if (nullptr == gpu_elem->device_private) {
2130  devid++;
2131  continue;
2132  }
2133  break;
2134  }
2135  }
2136 #endif // 0
2137  /* unpack the object, potentially discovering iovecs */
2138  pos = unpack(*static_cast<decvalueT *>(copy->get_ptr()), msg->bytes, pos);
2139  }
2140 
2141  if (num_iovecs == 0) {
2142  set_arg_from_msg_keylist<i, decvalueT>(ttg::span<keyT>(&keylist[0], num_keys), copy);
2143  } else {
2144  /* unpack the header and start the RMA transfers */
2145 
2146  /* get the remote rank */
2147  int remote = msg->tt_id.sender;
2148  assert(remote < world.size());
2149 
2150  auto &val = *static_cast<decvalueT *>(copy->get_ptr());
2151 
2152  bool inline_data = msg->tt_id.inline_data;
2153 
2154  int nv = 0;
2155  parsec_ce_tag_t cbtag;
2156  /* start the RMA transfers */
2157  auto create_activation_fn = [&]() {
2158  /* extract the callback tag */
2159  std::memcpy(&cbtag, msg->bytes + pos, sizeof(cbtag));
2160  pos += sizeof(cbtag);
2161 
2162  /* create the value from the metadata */
2163  auto activation = new detail::rma_delayed_activate(
2164  std::move(keylist), copy, num_iovecs, [this](std::vector<keyT> &&keylist, detail::ttg_data_copy_t *copy) {
2165  set_arg_from_msg_keylist<i, decvalueT>(keylist, copy);
2166  this->world.impl().decrement_inflight_msg();
2167  });
2168  return activation;
2169  };
2170  auto read_inline_data = [&](auto&& iovec){
2171  /* unpack the data from the message */
2172  ++nv;
2173  std::memcpy(iovec.data, msg->bytes + pos, iovec.num_bytes);
2174  pos += iovec.num_bytes;
2175  };
2176  auto handle_iovec_fn = [&](auto&& iovec, auto activation) {
2177  using ActivationT = std::decay_t<decltype(*activation)>;
2178 
2179  ++nv;
2180  parsec_ce_mem_reg_handle_t rreg;
2181  int32_t rreg_size_i;
2182  std::memcpy(&rreg_size_i, msg->bytes + pos, sizeof(rreg_size_i));
2183  pos += sizeof(rreg_size_i);
2184  rreg = static_cast<parsec_ce_mem_reg_handle_t>(msg->bytes + pos);
2185  pos += rreg_size_i;
2186  // std::intptr_t *fn_ptr = reinterpret_cast<std::intptr_t *>(msg->bytes + pos);
2187  // pos += sizeof(*fn_ptr);
2188  std::intptr_t fn_ptr;
2189  std::memcpy(&fn_ptr, msg->bytes + pos, sizeof(fn_ptr));
2190  pos += sizeof(fn_ptr);
2191 
2192  /* register the local memory */
2193  parsec_ce_mem_reg_handle_t lreg;
2194  size_t lreg_size;
2195  parsec_ce.mem_register(iovec.data, PARSEC_MEM_TYPE_NONCONTIGUOUS, iovec.num_bytes, parsec_datatype_int8_t,
2196  iovec.num_bytes, &lreg, &lreg_size);
2197  world.impl().increment_inflight_msg();
2198  /* TODO: PaRSEC should treat the remote callback as a tag, not a function pointer! */
2199  //std::cout << "set_arg_from_msg: get rreg " << rreg << " remote " << remote << std::endl;
2200  parsec_ce.get(&parsec_ce, lreg, 0, rreg, 0, iovec.num_bytes, remote,
2201  &detail::get_complete_cb<ActivationT>, activation,
2202  /*world.impl().parsec_ttg_rma_tag()*/
2203  cbtag, &fn_ptr, sizeof(std::intptr_t));
2204  };
2207  if (inline_data) {
2208  for (auto&& iov : descr.get_data(val)) {
2209  read_inline_data(iov);
2210  }
2211  } else {
2212  auto activation = create_activation_fn();
2213  for (auto&& iov : descr.get_data(val)) {
2214  handle_iovec_fn(iov, activation);
2215  }
2216  }
2217  } else if constexpr (!ttg::has_split_metadata<decvalueT>::value) {
2218  if (inline_data) {
2219  detail::foreach_parsec_data(val, [&](parsec_data_t* data){
2220  read_inline_data(ttg::iovec{data->nb_elts, data->device_copies[data->owner_device]->device_private});
2221  });
2222  } else {
2223  auto activation = create_activation_fn();
2224  detail::foreach_parsec_data(val, [&](parsec_data_t* data){
2225  handle_iovec_fn(ttg::iovec{data->nb_elts, data->device_copies[data->owner_device]->device_private}, activation);
2226  });
2227  }
2228  }
2229 
2230  assert(num_iovecs == nv);
2231  assert(size == (key_end_pos + sizeof(msg_header_t)));
2232 
2233  if (inline_data) {
2234  set_arg_from_msg_keylist<i, decvalueT>(ttg::span<keyT>(&keylist[0], num_keys), copy);
2235  }
2236  }
2237  // case 2 and 3
2238  } else if constexpr (!ttg::meta::is_void_v<keyT> && std::is_void_v<valueT>) {
2239  for (auto &&key : keylist) {
2240  set_arg<i, keyT, ttg::Void>(key, ttg::Void{});
2241  }
2242  }
2243  // case 4
2244  } else if constexpr (ttg::meta::is_void_v<keyT> && !std::is_void_v<valueT>) {
2245  using decvalueT = std::decay_t<valueT>;
2246  decvalueT val;
2247  /* TODO: handle split-metadata case as with non-void keys */
2248  unpack(val, msg->bytes, 0);
2249  set_arg<i, keyT, valueT>(std::move(val));
2250  // case 5 and 6
2251  } else if constexpr (ttg::meta::is_void_v<keyT> && std::is_void_v<valueT>) {
2252  set_arg<i, keyT, ttg::Void>(ttg::Void{});
2253  } else { // unreachable
2254  ttg::abort();
2255  }
2256  }
2257 
2258  template <std::size_t i>
2259  void finalize_argstream_from_msg(void *data, std::size_t size) {
2260  using msg_t = detail::msg_t;
2261  msg_t *msg = static_cast<msg_t *>(data);
2262  if constexpr (!ttg::meta::is_void_v<keyT>) {
2263  /* unpack the key */
2264  uint64_t pos = 0;
2265  auto rank = world.rank();
2266  keyT key;
2267  pos = unpack(key, msg->bytes, pos);
2268  assert(keymap(key) == rank);
2269  finalize_argstream<i>(key);
2270  } else {
2271  auto rank = world.rank();
2272  assert(keymap() == rank);
2273  finalize_argstream<i>();
2274  }
2275  }
2276 
2277  template <std::size_t i>
2278  void argstream_set_size_from_msg(void *data, std::size_t size) {
2279  using msg_t = detail::msg_t;
2280  auto msg = static_cast<msg_t *>(data);
2281  uint64_t pos = 0;
2282  if constexpr (!ttg::meta::is_void_v<keyT>) {
2283  /* unpack the key */
2284  auto rank = world.rank();
2285  keyT key;
2286  pos = unpack(key, msg->bytes, pos);
2287  assert(keymap(key) == rank);
2288  std::size_t argstream_size;
2289  pos = unpack(argstream_size, msg->bytes, pos);
2290  set_argstream_size<i>(key, argstream_size);
2291  } else {
2292  auto rank = world.rank();
2293  assert(keymap() == rank);
2294  std::size_t argstream_size;
2295  pos = unpack(argstream_size, msg->bytes, pos);
2296  set_argstream_size<i>(argstream_size);
2297  }
2298  }
2299 
2300  template <std::size_t i>
2301  void get_from_pull_msg(void *data, std::size_t size) {
2302  using msg_t = detail::msg_t;
2303  msg_t *msg = static_cast<msg_t *>(data);
2304  auto &in = std::get<i>(input_terminals);
2305  if constexpr (!ttg::meta::is_void_v<keyT>) {
2306  /* unpack the key */
2307  uint64_t pos = 0;
2308  keyT key;
2309  pos = unpack(key, msg->bytes, pos);
2310  set_arg<i>(key, (in.container).get(key));
2311  }
2312  }
2313 
2314  template <std::size_t i, typename Key, typename Value>
2315  std::enable_if_t<!ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>, void> set_arg_local(
2316  const Key &key, Value &&value) {
2317  set_arg_local_impl<i>(key, std::forward<Value>(value));
2318  }
2319 
2320  template <std::size_t i, typename Key = keyT, typename Value>
2321  std::enable_if_t<ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>, void> set_arg_local(
2322  Value &&value) {
2323  set_arg_local_impl<i>(ttg::Void{}, std::forward<Value>(value));
2324  }
2325 
2326  template <std::size_t i, typename Key, typename Value>
2327  std::enable_if_t<!ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>, void> set_arg_local(
2328  const Key &key, const Value &value) {
2329  set_arg_local_impl<i>(key, value);
2330  }
2331 
2332  template <std::size_t i, typename Key = keyT, typename Value>
2333  std::enable_if_t<ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>, void> set_arg_local(
2334  const Value &value) {
2335  set_arg_local_impl<i>(ttg::Void{}, value);
2336  }
2337 
2338  template <std::size_t i, typename Key = keyT, typename Value>
2339  std::enable_if_t<ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>, void> set_arg_local(
2340  std::shared_ptr<const Value> &valueptr) {
2341  set_arg_local_impl<i>(ttg::Void{}, *valueptr);
2342  }
2343 
2344  template <typename Key>
2345  task_t *create_new_task(const Key &key) {
2346  constexpr const bool keyT_is_Void = ttg::meta::is_void_v<keyT>;
2347  auto &world_impl = world.impl();
2348  task_t *newtask;
2349  parsec_thread_mempool_t *mempool = get_task_mempool();
2350  char *taskobj = (char *)parsec_thread_mempool_allocate(mempool);
2351  int32_t priority = 0;
2352  if constexpr (!keyT_is_Void) {
2353  priority = priomap(key);
2354  /* placement-new the task */
2355  newtask = new (taskobj) task_t(key, mempool, &this->self, world_impl.taskpool(), this, priority);
2356  } else {
2357  priority = priomap();
2358  /* placement-new the task */
2359  newtask = new (taskobj) task_t(mempool, &this->self, world_impl.taskpool(), this, priority);
2360  }
2361 
2362  for (int i = 0; i < static_stream_goal.size(); ++i) {
2363  newtask->streams[i].goal = static_stream_goal[i];
2364  }
2365 
2366  ttg::trace(world.rank(), ":", get_name(), " : ", key, ": creating task");
2367  return newtask;
2368  }
2369 
2370 
2371  template <std::size_t i>
2373  /* make sure we can reuse the existing memory pool and don't have to create a new one */
2374  static_assert(sizeof(task_t) >= sizeof(detail::reducer_task_t));
2375  constexpr const bool keyT_is_Void = ttg::meta::is_void_v<keyT>;
2376  auto &world_impl = world.impl();
2377  detail::reducer_task_t *newtask;
2378  parsec_thread_mempool_t *mempool = get_task_mempool();
2379  char *taskobj = (char *)parsec_thread_mempool_allocate(mempool);
2380  // use the priority of the task we stream into
2381  int32_t priority = 0;
2382  if constexpr (!keyT_is_Void) {
2383  priority = priomap(task->key);
2384  ttg::trace(world.rank(), ":", get_name(), " : ", task->key, ": creating reducer task");
2385  } else {
2386  priority = priomap();
2387  ttg::trace(world.rank(), ":", get_name(), ": creating reducer task");
2388  }
2389  /* placement-new the task */
2390  newtask = new (taskobj) detail::reducer_task_t(task, mempool, inpute_reducers_taskclass[i],
2391  world_impl.taskpool(), priority, is_first);
2392 
2393  return newtask;
2394  }
2395 
2396 
2397  // Used to set the i'th argument
2398  template <std::size_t i, typename Key, typename Value>
2399  void set_arg_local_impl(const Key &key, Value &&value, detail::ttg_data_copy_t *copy_in = nullptr,
2400  parsec_task_t **task_ring = nullptr) {
2401  using valueT = std::tuple_element_t<i, input_values_full_tuple_type>;
2402  constexpr const bool input_is_const = std::is_const_v<std::tuple_element_t<i, input_args_type>>;
2403  constexpr const bool valueT_is_Void = ttg::meta::is_void_v<valueT>;
2404  constexpr const bool keyT_is_Void = ttg::meta::is_void_v<Key>;
2405 
2406 
2407  ttg::trace(world.rank(), ":", get_name(), " : ", key, ": received value for argument : ", i);
2408 
2409  parsec_key_t hk = 0;
2410  if constexpr (!keyT_is_Void) {
2411  hk = reinterpret_cast<parsec_key_t>(&key);
2412  assert(keymap(key) == world.rank());
2413  }
2414 
2415  task_t *task;
2416  auto &world_impl = world.impl();
2417  auto &reducer = std::get<i>(input_reducers);
2418  bool release = false;
2419  bool remove_from_hash = true;
2420 #if defined(PARSEC_PROF_GRAPHER)
2421  bool discover_task = true;
2422 #endif
2423  bool get_pull_data = false;
2424  bool has_lock = false;
2425  /* If we have only one input and no reducer on that input we can skip the hash table */
2426  if (numins > 1 || reducer) {
2427  has_lock = true;
2428  parsec_hash_table_lock_bucket(&tasks_table, hk);
2429  if (nullptr == (task = (task_t *)parsec_hash_table_nolock_find(&tasks_table, hk))) {
2430  task = create_new_task(key);
2431  world_impl.increment_created();
2432  parsec_hash_table_nolock_insert(&tasks_table, &task->tt_ht_item);
2433  get_pull_data = !is_lazy_pull();
2434  if( world_impl.dag_profiling() ) {
2435 #if defined(PARSEC_PROF_GRAPHER)
2436  parsec_prof_grapher_task(&task->parsec_task, world_impl.execution_stream()->th_id, 0,
2437  key_hash(make_key(task->parsec_task.taskpool, task->parsec_task.locals), nullptr));
2438 #endif
2439  }
2440  } else if (!reducer && numins == (task->in_data_count + 1)) {
2441  /* remove while we have the lock */
2442  parsec_hash_table_nolock_remove(&tasks_table, hk);
2443  remove_from_hash = false;
2444  }
2445  /* if we have a reducer, we need to hold on to the lock for just a little longer */
2446  if (!reducer) {
2447  parsec_hash_table_unlock_bucket(&tasks_table, hk);
2448  has_lock = false;
2449  }
2450  } else {
2451  task = create_new_task(key);
2452  world_impl.increment_created();
2453  remove_from_hash = false;
2454  if( world_impl.dag_profiling() ) {
2455 #if defined(PARSEC_PROF_GRAPHER)
2456  parsec_prof_grapher_task(&task->parsec_task, world_impl.execution_stream()->th_id, 0,
2457  key_hash(make_key(task->parsec_task.taskpool, task->parsec_task.locals), nullptr));
2458 #endif
2459  }
2460  }
2461 
2462  if( world_impl.dag_profiling() ) {
2463 #if defined(PARSEC_PROF_GRAPHER)
2464  if(NULL != detail::parsec_ttg_caller && !detail::parsec_ttg_caller->is_dummy()) {
2466  char orig_str[32];
2467  char dest_str[32];
2468  if(orig_index >= 0) {
2469  snprintf(orig_str, 32, "%d", orig_index);
2470  } else {
2471  strncpy(orig_str, "_", 32);
2472  }
2473  snprintf(dest_str, 32, "%lu", i);
2474  parsec_flow_t orig{ .name = orig_str, .sym_type = PARSEC_SYM_INOUT, .flow_flags = PARSEC_FLOW_ACCESS_RW,
2475  .flow_index = 0, .flow_datatype_mask = ~0 };
2476  parsec_flow_t dest{ .name = dest_str, .sym_type = PARSEC_SYM_INOUT, .flow_flags = PARSEC_FLOW_ACCESS_RW,
2477  .flow_index = 0, .flow_datatype_mask = ~0 };
2478  parsec_prof_grapher_dep(&detail::parsec_ttg_caller->parsec_task, &task->parsec_task, discover_task ? 1 : 0, &orig, &dest);
2479  }
2480 #endif
2481  }
2482 
2483  auto get_copy_fn = [&](detail::parsec_ttg_task_base_t *task, auto&& value, bool is_const){
2484  detail::ttg_data_copy_t *copy = copy_in;
2485  if (nullptr == copy && nullptr != detail::parsec_ttg_caller) {
2487  }
2488  if (nullptr != copy) {
2489  /* retain the data copy */
2490  copy = detail::register_data_copy<valueT>(copy, task, is_const);
2491  } else {
2492  /* create a new copy */
2493  copy = detail::create_new_datacopy(std::forward<Value>(value));
2494  if (!is_const) {
2495  copy->mark_mutable();
2496  }
2497  }
2498  return copy;
2499  };
2500 
2501  if (reducer && 1 != task->streams[i].goal) { // is this a streaming input? reduce the received value
2502  auto submit_reducer_task = [&](auto *parent_task){
2503  /* check if we need to create a task */
2504  std::size_t c = parent_task->streams[i].reduce_count.fetch_add(1, std::memory_order_acquire);
2505  //std::cout << "submit_reducer_task " << key << " c " << c << std::endl;
2506  if (0 == c) {
2507  /* we are responsible for creating the reduction task */
2508  detail::reducer_task_t *reduce_task;
2509  reduce_task = create_new_reducer_task<i>(parent_task, false);
2510  reduce_task->release_task(reduce_task); // release immediately
2511  }
2512  };
2513 
2514  if constexpr (!ttg::meta::is_void_v<valueT>) { // for data values
2515  // have a value already? if not, set, otherwise reduce
2516  detail::ttg_data_copy_t *copy = nullptr;
2517  if (nullptr == (copy = task->copies[i])) {
2518  using decay_valueT = std::decay_t<valueT>;
2519 
2520  /* first input value, create a task and bind it to the copy */
2521  //std::cout << "Creating new reducer task for " << key << std::endl;
2522  detail::reducer_task_t *reduce_task;
2523  reduce_task = create_new_reducer_task<i>(task, true);
2524 
2525  /* protected by the bucket lock */
2526  task->streams[i].size = 1;
2527  task->streams[i].reduce_count.store(1, std::memory_order_relaxed);
2528 
2529  /* get the copy to use as input for this task */
2530  detail::ttg_data_copy_t *copy = get_copy_fn(reduce_task, std::forward<Value>(value), false);
2531 
2532  /* put the copy into the task */
2533  task->copies[i] = copy;
2534 
2535  /* release the task if we're not deferred
2536  * TODO: can we delay that until we get the second value?
2537  */
2538  if (copy->get_next_task() != &reduce_task->parsec_task) {
2539  reduce_task->release_task(reduce_task);
2540  }
2541 
2542  /* now we can unlock the bucket */
2543  parsec_hash_table_unlock_bucket(&tasks_table, hk);
2544  } else {
2545  /* unlock the bucket, the lock is not needed anymore */
2546  parsec_hash_table_unlock_bucket(&tasks_table, hk);
2547 
2548  /* get the copy to use as input for this task */
2549  detail::ttg_data_copy_t *copy = get_copy_fn(task, std::forward<Value>(value), true);
2550 
2551  /* enqueue the data copy to be reduced */
2552  parsec_lifo_push(&task->streams[i].reduce_copies, &copy->super);
2553  submit_reducer_task(task);
2554  }
2555  } else {
2556  /* unlock the bucket, the lock is not needed anymore */
2557  parsec_hash_table_unlock_bucket(&tasks_table, hk);
2558  /* submit reducer for void values to handle side effects */
2559  submit_reducer_task(task);
2560  }
2561  //if (release) {
2562  // parsec_hash_table_nolock_remove(&tasks_table, hk);
2563  // remove_from_hash = false;
2564  //}
2565  //parsec_hash_table_unlock_bucket(&tasks_table, hk);
2566  } else {
2567  /* unlock the bucket, the lock is not needed anymore */
2568  if (has_lock) {
2569  parsec_hash_table_unlock_bucket(&tasks_table, hk);
2570  }
2571  /* whether the task needs to be deferred or not */
2572  if constexpr (!valueT_is_Void) {
2573  if (nullptr != task->copies[i]) {
2574  ttg::print_error(get_name(), " : ", key, ": error argument is already set : ", i);
2575  throw std::logic_error("bad set arg");
2576  }
2577 
2578  /* get the copy to use as input for this task */
2579  detail::ttg_data_copy_t *copy = get_copy_fn(task, std::forward<Value>(value), input_is_const);
2580 
2581  /* if we registered as a writer and were the first to register with this copy
2582  * we need to defer the release of this task to give other tasks a chance to
2583  * make a copy of the original data */
2584  release = (copy->get_next_task() != &task->parsec_task);
2585  task->copies[i] = copy;
2586  } else {
2587  release = true;
2588  }
2589  }
2590  task->remove_from_hash = remove_from_hash;
2591  if (release) {
2592  release_task(task, task_ring);
2593  }
2594  /* if not pulling lazily, pull the data here */
2595  if constexpr (!ttg::meta::is_void_v<keyT>) {
2596  if (get_pull_data) {
2597  invoke_pull_terminals(std::make_index_sequence<std::tuple_size_v<input_values_tuple_type>>{}, task->key, task);
2598  }
2599  }
2600  }
2601 
2603  bool constrained = false;
2604  if (constraints_check.size() > 0) {
2605  if constexpr (ttg::meta::is_void_v<keyT>) {
2606  constrained = !constraints_check[0]();
2607  } else {
2608  constrained = !constraints_check[0](task->key);
2609  }
2610  }
2611  if (constrained) {
2612  // store the task so we can later access it once it is released
2613  parsec_hash_table_insert(&task_constraint_table, &task->tt_ht_item);
2614  }
2615  return !constrained;
2616  }
2617 
2618  template<typename Key = keyT>
2619  std::enable_if_t<ttg::meta::is_void_v<Key>, void> release_constraint(std::size_t cid) {
2620  // check the next constraint, if any
2621  assert(cid < constraints_check.size());
2622  bool release = true;
2623  for (std::size_t i = cid+1; i < constraints_check.size(); i++) {
2624  if (!constraints_check[i]()) {
2625  release = false;
2626  break;
2627  }
2628  }
2629  if (release) {
2630  // no constraint blocked us
2631  task_t *task;
2632  parsec_key_t hk = 0;
2633  task = (task_t*)parsec_hash_table_remove(&task_constraint_table, hk);
2634  assert(task != nullptr);
2635  auto &world_impl = world.impl();
2636  parsec_execution_stream_t *es = world_impl.execution_stream();
2637  parsec_task_t *vp_task_rings[1] = { &task->parsec_task };
2638  __parsec_schedule_vp(es, vp_task_rings, 0);
2639  }
2640  }
2641 
2642  template<typename Key = keyT>
2643  std::enable_if_t<!ttg::meta::is_void_v<Key>, void> release_constraint(std::size_t cid, const std::span<Key>& keys) {
2644  assert(cid < constraints_check.size());
2645  parsec_task_t *task_ring = nullptr;
2646  for (auto& key : keys) {
2647  task_t *task;
2648  bool release = true;
2649  for (std::size_t i = cid+1; i < constraints_check.size(); i++) {
2650  if (!constraints_check[i](key)) {
2651  release = false;
2652  break;
2653  }
2654  }
2655 
2656  if (release) {
2657  // no constraint blocked this task, so go ahead and release
2658  auto hk = reinterpret_cast<parsec_key_t>(&key);
2659  task = (task_t*)parsec_hash_table_remove(&task_constraint_table, hk);
2660  assert(task != nullptr);
2661  if (task_ring == nullptr) {
2662  /* the first task is set directly */
2663  task_ring = &task->parsec_task;
2664  } else {
2665  /* push into the ring */
2666  parsec_list_item_ring_push_sorted(&task_ring->super, &task->parsec_task.super,
2667  offsetof(parsec_task_t, priority));
2668  }
2669  }
2670  }
2671  if (nullptr != task_ring) {
2672  auto &world_impl = world.impl();
2673  parsec_execution_stream_t *es = world_impl.execution_stream();
2674  parsec_task_t *vp_task_rings[1] = { task_ring };
2675  __parsec_schedule_vp(es, vp_task_rings, 0);
2676  }
2677  }
2678 
2679  void release_task(task_t *task,
2680  parsec_task_t **task_ring = nullptr) {
2681  constexpr const bool keyT_is_Void = ttg::meta::is_void_v<keyT>;
2682 
2683  /* if remove_from_hash == false, someone has already removed the task from the hash table
2684  * so we know that the task is ready, no need to do atomic increments here */
2685  bool is_ready = !task->remove_from_hash;
2686  int32_t count;
2687  if (is_ready) {
2688  count = numins;
2689  } else {
2690  count = parsec_atomic_fetch_inc_int32(&task->in_data_count) + 1;
2691  assert(count <= self.dependencies_goal);
2692  }
2693 
2694  auto &world_impl = world.impl();
2695  ttT *baseobj = task->tt;
2696 
2697  if (count == numins) {
2698  parsec_execution_stream_t *es = world_impl.execution_stream();
2699  parsec_key_t hk = task->pkey();
2700  if (tracing()) {
2701  if constexpr (!keyT_is_Void) {
2702  ttg::trace(world.rank(), ":", get_name(), " : ", task->key, ": submitting task for op ");
2703  } else {
2704  ttg::trace(world.rank(), ":", get_name(), ": submitting task for op ");
2705  }
2706  }
2707  if (task->remove_from_hash) parsec_hash_table_remove(&tasks_table, hk);
2708 
2709  if (check_constraints(task)) {
2710  if (nullptr == task_ring) {
2711  parsec_task_t *vp_task_rings[1] = { &task->parsec_task };
2712  __parsec_schedule_vp(es, vp_task_rings, 0);
2713  } else if (*task_ring == nullptr) {
2714  /* the first task is set directly */
2715  *task_ring = &task->parsec_task;
2716  } else {
2717  /* push into the ring */
2718  parsec_list_item_ring_push_sorted(&(*task_ring)->super, &task->parsec_task.super,
2719  offsetof(parsec_task_t, priority));
2720  }
2721  }
2722  } else if constexpr (!ttg::meta::is_void_v<keyT>) {
2723  if ((baseobj->num_pullins + count == numins) && baseobj->is_lazy_pull()) {
2724  /* lazily pull the pull terminal data */
2725  baseobj->invoke_pull_terminals(std::make_index_sequence<std::tuple_size_v<input_values_tuple_type>>{}, task->key, task);
2726  }
2727  }
2728  }
2729 
2730  // cases 1+2
2731  template <std::size_t i, typename Key, typename Value>
2732  std::enable_if_t<!ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>, void> set_arg(const Key &key,
2733  Value &&value) {
2734  set_arg_impl<i>(key, std::forward<Value>(value));
2735  }
2736 
2737  // cases 4+5+6
2738  template <std::size_t i, typename Key, typename Value>
2739  std::enable_if_t<ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>, void> set_arg(Value &&value) {
2740  set_arg_impl<i>(ttg::Void{}, std::forward<Value>(value));
2741  }
2742 
2743  template <std::size_t i, typename Key = keyT>
2744  std::enable_if_t<ttg::meta::is_void_v<Key>, void> set_arg() {
2745  set_arg_impl<i>(ttg::Void{}, ttg::Void{});
2746  }
2747 
2748  // case 3
2749  template <std::size_t i, typename Key>
2750  std::enable_if_t<!ttg::meta::is_void_v<Key>, void> set_arg(const Key &key) {
2751  set_arg_impl<i>(key, ttg::Void{});
2752  }
2753 
2754  template<typename Value, typename Key>
2755  bool can_inline_data(Value* value_ptr, detail::ttg_data_copy_t *copy, const Key& key, std::size_t num_keys) {
2756  if constexpr (derived_has_device_op()) {
2757  /* don't inline if data is possibly on the device */
2758  return false;
2759  }
2760  /* non-device data */
2761  using decvalueT = std::decay_t<Value>;
2762  bool inline_data = false;
2763  /* check whether to send data in inline */
2764  std::size_t iov_size = 0;
2765  std::size_t metadata_size = 0;
2766  if constexpr (ttg::has_split_metadata<std::decay_t<Value>>::value) {
2768  auto iovs = descr.get_data(*const_cast<decvalueT *>(value_ptr));
2769  iov_size = std::accumulate(iovs.begin(), iovs.end(), 0,
2770  [](std::size_t s, auto& iov){ return s + iov.num_bytes; });
2771  auto metadata = descr.get_metadata(*const_cast<decvalueT *>(value_ptr));
2772  metadata_size = ttg::default_data_descriptor<decltype(metadata)>::payload_size(&metadata);
2773  } else {
2774  /* TODO: how can we query the iovecs of the buffers here without actually packing the data? */
2775  metadata_size = ttg::default_data_descriptor<ttg::meta::remove_cvr_t<Value>>::payload_size(value_ptr);
2776  detail::foreach_parsec_data(*value_ptr, [&](parsec_data_t* data){ iov_size += data->nb_elts; });
2777  }
2778  /* key is packed at the end */
2779  std::size_t key_pack_size = ttg::default_data_descriptor<Key>::payload_size(&key);
2780  std::size_t pack_size = key_pack_size + metadata_size + iov_size;
2781  if (pack_size < detail::max_inline_size) {
2782  inline_data = true;
2783  }
2784  return inline_data;
2785  }
2786 
2787  // Used to set the i'th argument
2788  template <std::size_t i, typename Key, typename Value>
2789  void set_arg_impl(const Key &key, Value &&value, detail::ttg_data_copy_t *copy_in = nullptr) {
2790  int owner;
2791  using decvalueT = std::decay_t<Value>;
2792  using norefvalueT = std::remove_reference_t<Value>;
2793  norefvalueT *value_ptr = &value;
2794 
2795 #if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND)
2796  if(world.impl().profiling()) {
2797  parsec_profiling_ts_trace(world.impl().parsec_ttg_profile_backend_set_arg_start, 0, 0, NULL);
2798  }
2799 #endif
2800 
2801  if constexpr (!ttg::meta::is_void_v<Key>)
2802  owner = keymap(key);
2803  else
2804  owner = keymap();
2805  if (owner == world.rank()) {
2806  if constexpr (!ttg::meta::is_void_v<keyT>)
2807  set_arg_local_impl<i>(key, std::forward<Value>(value), copy_in);
2808  else
2809  set_arg_local_impl<i>(ttg::Void{}, std::forward<Value>(value), copy_in);
2810 #if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND)
2811  if(world.impl().profiling()) {
2812  parsec_profiling_ts_trace(world.impl().parsec_ttg_profile_backend_set_arg_end, 0, 0, NULL);
2813  }
2814 #endif
2815  return;
2816  }
2817  // the target task is remote. Pack the information and send it to
2818  // the corresponding peer.
2819  // TODO do we need to copy value?
2820  using msg_t = detail::msg_t;
2821  auto &world_impl = world.impl();
2822  uint64_t pos = 0;
2823  int num_iovecs = 0;
2824  std::unique_ptr<msg_t> msg = std::make_unique<msg_t>(get_instance_id(), world_impl.taskpool()->taskpool_id,
2825  msg_header_t::MSG_SET_ARG, i, world_impl.rank(), 1);
2826 
2827  if constexpr (!ttg::meta::is_void_v<decvalueT>) {
2828 
2829  detail::ttg_data_copy_t *copy = copy_in;
2830  /* make sure we have a data copy to register with */
2831  if (nullptr == copy) {
2833  if (nullptr == copy) {
2834  // We need to create a copy for this data, as it does not exist yet.
2835  copy = detail::create_new_datacopy(std::forward<Value>(value));
2836  // use the new value from here on out
2837  value_ptr = static_cast<norefvalueT*>(copy->get_ptr());
2838  }
2839  }
2840 
2841  bool inline_data = can_inline_data(value_ptr, copy, key, 1);
2842  msg->tt_id.inline_data = inline_data;
2843 
2844  auto write_header_fn = [&]() {
2845  if (!inline_data) {
2846  /* TODO: at the moment, the tag argument to parsec_ce.get() is treated as a
2847  * raw function pointer instead of a preregistered AM tag, so play that game.
2848  * Once this is fixed in PaRSEC we need to use parsec_ttg_rma_tag instead! */
2849  parsec_ce_tag_t cbtag = reinterpret_cast<parsec_ce_tag_t>(&detail::get_remote_complete_cb);
2850  std::memcpy(msg->bytes + pos, &cbtag, sizeof(cbtag));
2851  pos += sizeof(cbtag);
2852  }
2853  };
2854  auto handle_iovec_fn = [&](auto&& iovec){
2855 
2856  if (inline_data) {
2857  /* inline data is packed right after the tt_id in the message */
2858  std::memcpy(msg->bytes + pos, iovec.data, iovec.num_bytes);
2859  pos += iovec.num_bytes;
2860  } else {
2861 
2866  copy = detail::register_data_copy<decvalueT>(copy, nullptr, true);
2867  parsec_ce_mem_reg_handle_t lreg;
2868  size_t lreg_size;
2869  /* TODO: only register once when we can broadcast the data! */
2870  parsec_ce.mem_register(iovec.data, PARSEC_MEM_TYPE_NONCONTIGUOUS, iovec.num_bytes, parsec_datatype_int8_t,
2871  iovec.num_bytes, &lreg, &lreg_size);
2872  auto lreg_ptr = std::shared_ptr<void>{lreg, [](void *ptr) {
2873  parsec_ce_mem_reg_handle_t memreg = (parsec_ce_mem_reg_handle_t)ptr;
2874  parsec_ce.mem_unregister(&memreg);
2875  }};
2876  int32_t lreg_size_i = lreg_size;
2877  std::memcpy(msg->bytes + pos, &lreg_size_i, sizeof(lreg_size_i));
2878  pos += sizeof(lreg_size_i);
2879  std::memcpy(msg->bytes + pos, lreg, lreg_size);
2880  pos += lreg_size;
2881  //std::cout << "set_arg_impl lreg " << lreg << std::endl;
2882  /* TODO: can we avoid the extra indirection of going through std::function? */
2883  std::function<void(void)> *fn = new std::function<void(void)>([=]() mutable {
2884  /* shared_ptr of value and registration captured by value so resetting
2885  * them here will eventually release the memory/registration */
2887  lreg_ptr.reset();
2888  });
2889  std::intptr_t fn_ptr{reinterpret_cast<std::intptr_t>(fn)};
2890  std::memcpy(msg->bytes + pos, &fn_ptr, sizeof(fn_ptr));
2891  pos += sizeof(fn_ptr);
2892  }
2893  };
2894 
2895  if constexpr (ttg::has_split_metadata<std::decay_t<Value>>::value) {
2897  auto iovs = descr.get_data(*const_cast<decvalueT *>(value_ptr));
2898  num_iovecs = std::distance(std::begin(iovs), std::end(iovs));
2899  /* pack the metadata */
2900  auto metadata = descr.get_metadata(*const_cast<decvalueT *>(value_ptr));
2901  pos = pack(metadata, msg->bytes, pos);
2902  //std::cout << "set_arg_impl splitmd num_iovecs " << num_iovecs << std::endl;
2903  write_header_fn();
2904  for (auto&& iov : iovs) {
2905  handle_iovec_fn(iov);
2906  }
2907  } else if constexpr (!ttg::has_split_metadata<std::decay_t<Value>>::value) {
2908  /* serialize the object */
2909  pos = pack(*value_ptr, msg->bytes, pos, copy);
2910  detail::foreach_parsec_data(value, [&](parsec_data_t *data){ ++num_iovecs; });
2911  //std::cout << "POST pack num_iovecs " << num_iovecs << std::endl;
2912  /* handle any iovecs contained in it */
2913  write_header_fn();
2914  detail::foreach_parsec_data(value, [&](parsec_data_t *data){
2915  handle_iovec_fn(ttg::iovec{data->nb_elts, data->device_copies[data->owner_device]->device_private});
2916  });
2917  }
2918 
2919  msg->tt_id.num_iovecs = num_iovecs;
2920  }
2921 
2922  /* pack the key */
2923  msg->tt_id.num_keys = 0;
2924  msg->tt_id.key_offset = pos;
2925  if constexpr (!ttg::meta::is_void_v<Key>) {
2926  size_t tmppos = pack(key, msg->bytes, pos);
2927  pos = tmppos;
2928  msg->tt_id.num_keys = 1;
2929  }
2930 
2931  parsec_taskpool_t *tp = world_impl.taskpool();
2932  tp->tdm.module->outgoing_message_start(tp, owner, NULL);
2933  tp->tdm.module->outgoing_message_pack(tp, owner, NULL, NULL, 0);
2934  //std::cout << "set_arg_impl send_am owner " << owner << " sender " << msg->tt_id.sender << std::endl;
2935  parsec_ce.send_am(&parsec_ce, world_impl.parsec_ttg_tag(), owner, static_cast<void *>(msg.get()),
2936  sizeof(msg_header_t) + pos);
2937 #if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND)
2938  if(world.impl().profiling()) {
2939  parsec_profiling_ts_trace(world.impl().parsec_ttg_profile_backend_set_arg_end, 0, 0, NULL);
2940  }
2941 #endif
2942 #if defined(PARSEC_PROF_GRAPHER)
2943  if(NULL != detail::parsec_ttg_caller && !detail::parsec_ttg_caller->is_dummy()) {
2945  char orig_str[32];
2946  char dest_str[32];
2947  if(orig_index >= 0) {
2948  snprintf(orig_str, 32, "%d", orig_index);
2949  } else {
2950  strncpy(orig_str, "_", 32);
2951  }
2952  snprintf(dest_str, 32, "%lu", i);
2953  parsec_flow_t orig{ .name = orig_str, .sym_type = PARSEC_SYM_INOUT, .flow_flags = PARSEC_FLOW_ACCESS_RW,
2954  .flow_index = 0, .flow_datatype_mask = ~0 };
2955  parsec_flow_t dest{ .name = dest_str, .sym_type = PARSEC_SYM_INOUT, .flow_flags = PARSEC_FLOW_ACCESS_RW,
2956  .flow_index = 0, .flow_datatype_mask = ~0 };
2957  task_t *task = create_new_task(key);
2958  parsec_prof_grapher_dep(&detail::parsec_ttg_caller->parsec_task, &task->parsec_task, 0, &orig, &dest);
2959  delete task;
2960  }
2961 #endif
2962  }
2963 
2964  template <int i, typename Iterator, typename Value>
2965  void broadcast_arg_local(Iterator &&begin, Iterator &&end, const Value &value) {
2966 #if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND)
2967  if(world.impl().profiling()) {
2968  parsec_profiling_ts_trace(world.impl().parsec_ttg_profile_backend_bcast_arg_start, 0, 0, NULL);
2969  }
2970 #endif
2971  parsec_task_t *task_ring = nullptr;
2972  detail::ttg_data_copy_t *copy = nullptr;
2973  if (nullptr != detail::parsec_ttg_caller) {
2975  }
2976 
2977  for (auto it = begin; it != end; ++it) {
2978  set_arg_local_impl<i>(*it, value, copy, &task_ring);
2979  }
2980  /* submit all ready tasks at once */
2981  if (nullptr != task_ring) {
2982  parsec_task_t *vp_task_ring[1] = { task_ring };
2983  __parsec_schedule_vp(world.impl().execution_stream(), vp_task_ring, 0);
2984  }
2985 #if defined(PARSEC_PROF_TRACE) && defined(PARSEC_TTG_PROFILE_BACKEND)
2986  if(world.impl().profiling()) {
2987  parsec_profiling_ts_trace(world.impl().parsec_ttg_profile_backend_set_arg_end, 0, 0, NULL);
2988  }
2989 #endif
2990  }
2991 
2992  template <std::size_t i, typename Key, typename Value>
2993  std::enable_if_t<!ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>,
2994  void>
2995  broadcast_arg(const ttg::span<const Key> &keylist, const Value &value) {
2996  using valueT = std::tuple_element_t<i, input_values_full_tuple_type>;
2997  auto world = ttg_default_execution_context();
2998  auto np = world.size();
2999  int rank = world.rank();
3000  uint64_t pos = 0;
3001  bool have_remote = keylist.end() != std::find_if(keylist.begin(), keylist.end(),
3002  [&](const Key &key) { return keymap(key) != rank; });
3003 
3004  if (have_remote) {
3005  using decvalueT = std::decay_t<Value>;
3006 
3007  /* sort the input key list by owner and check whether there are remote keys */
3008  std::vector<Key> keylist_sorted(keylist.begin(), keylist.end());
3009  std::sort(keylist_sorted.begin(), keylist_sorted.end(), [&](const Key &a, const Key &b) mutable {
3010  int rank_a = keymap(a);
3011  int rank_b = keymap(b);
3012  // sort so that the keys for my rank are first, rank+1 next, ..., wrapping around to 0
3013  int pos_a = (rank_a + np - rank) % np;
3014  int pos_b = (rank_b + np - rank) % np;
3015  return pos_a < pos_b;
3016  });
3017 
3018  /* Assuming there are no local keys, will be updated while iterating over the keys */
3019  auto local_begin = keylist_sorted.end();
3020  auto local_end = keylist_sorted.end();
3021 
3022  int32_t num_iovs = 0;
3023 
3026  assert(nullptr != copy);
3027 
3028  using msg_t = detail::msg_t;
3029  auto &world_impl = world.impl();
3030  std::unique_ptr<msg_t> msg = std::make_unique<msg_t>(get_instance_id(), world_impl.taskpool()->taskpool_id,
3031  msg_header_t::MSG_SET_ARG, i, world_impl.rank());
3032 
3033  /* check if we inline the data */
3034  /* TODO: this assumes the worst case: that all keys are packed at once (i.e., go to the same remote). Can we do better?*/
3035  bool inline_data = can_inline_data(&value, copy, keylist_sorted[0], keylist_sorted.size());
3036  msg->tt_id.inline_data = inline_data;
3037 
3038  std::vector<std::pair<int32_t, std::shared_ptr<void>>> memregs;
3039  auto write_iov_header = [&](){
3040  if (!inline_data) {
3041  /* TODO: at the moment, the tag argument to parsec_ce.get() is treated as a
3042  * raw function pointer instead of a preregistered AM tag, so play that game.
3043  * Once this is fixed in PaRSEC we need to use parsec_ttg_rma_tag instead! */
3044  parsec_ce_tag_t cbtag = reinterpret_cast<parsec_ce_tag_t>(&detail::get_remote_complete_cb);
3045  std::memcpy(msg->bytes + pos, &cbtag, sizeof(cbtag));
3046  pos += sizeof(cbtag);
3047  }
3048  };
3049  auto handle_iov_fn = [&](auto&& iovec){
3050  if (inline_data) {
3051  /* inline data is packed right after the tt_id in the message */
3052  std::memcpy(msg->bytes + pos, iovec.data, iovec.num_bytes);
3053  pos += iovec.num_bytes;
3054  } else {
3055  parsec_ce_mem_reg_handle_t lreg;
3056  size_t lreg_size;
3057  parsec_ce.mem_register(iovec.data, PARSEC_MEM_TYPE_NONCONTIGUOUS, iovec.num_bytes, parsec_datatype_int8_t,
3058  iovec.num_bytes, &lreg, &lreg_size);
3059  /* TODO: use a static function for deregistration here? */
3060  memregs.push_back(std::make_pair(static_cast<int32_t>(lreg_size),
3061  /* TODO: this assumes that parsec_ce_mem_reg_handle_t is void* */
3062  std::shared_ptr<void>{lreg, [](void *ptr) {
3063  parsec_ce_mem_reg_handle_t memreg =
3064  (parsec_ce_mem_reg_handle_t)ptr;
3065  //std::cout << "broadcast_arg memunreg lreg " << memreg << std::endl;
3066  parsec_ce.mem_unregister(&memreg);
3067  }}));
3068  //std::cout << "broadcast_arg memreg lreg " << lreg << std::endl;
3069  }
3070  };
3071 
3072  if constexpr (ttg::has_split_metadata<std::decay_t<Value>>::value) {
3074  /* pack the metadata */
3075  auto metadata = descr.get_metadata(value);
3076  pos = pack(metadata, msg->bytes, pos);
3077  auto iovs = descr.get_data(*const_cast<decvalueT *>(&value));
3078  num_iovs = std::distance(std::begin(iovs), std::end(iovs));
3079  memregs.reserve(num_iovs);
3080  write_iov_header();
3081  for (auto &&iov : iovs) {
3082  handle_iov_fn(iov);
3083  }
3084  //std::cout << "broadcast_arg splitmd num_iovecs " << num_iovs << std::endl;
3085  } else if constexpr (!ttg::has_split_metadata<std::decay_t<Value>>::value) {
3086  /* serialize the object once */
3087  pos = pack(value, msg->bytes, pos, copy);
3088  detail::foreach_parsec_data(value, [&](parsec_data_t *data){ ++num_iovs; });
3089  memregs.reserve(num_iovs);
3090  write_iov_header();
3091  detail::foreach_parsec_data(value, [&](parsec_data_t *data){
3092  handle_iov_fn(ttg::iovec{data->nb_elts,
3093  data->device_copies[data->owner_device]->device_private});
3094  });
3095  }
3096 
3097  msg->tt_id.num_iovecs = num_iovs;
3098 
3099  std::size_t save_pos = pos;
3100 
3101  parsec_taskpool_t *tp = world_impl.taskpool();
3102  for (auto it = keylist_sorted.begin(); it < keylist_sorted.end(); /* increment done inline */) {
3103 
3104  auto owner = keymap(*it);
3105  if (owner == rank) {
3106  local_begin = it;
3107  /* find first non-local key */
3108  local_end =
3109  std::find_if_not(++it, keylist_sorted.end(), [&](const Key &key) { return keymap(key) == rank; });
3110  it = local_end;
3111  continue;
3112  }
3113 
3114  /* rewind the buffer and start packing a new set of memregs and keys */
3115  pos = save_pos;
3121  if (!inline_data) {
3122  for (int idx = 0; idx < num_iovs; ++idx) {
3123  // auto [lreg_size, lreg_ptr] = memregs[idx];
3124  int32_t lreg_size;
3125  std::shared_ptr<void> lreg_ptr;
3126  std::tie(lreg_size, lreg_ptr) = memregs[idx];
3127  std::memcpy(msg->bytes + pos, &lreg_size, sizeof(lreg_size));
3128  pos += sizeof(lreg_size);
3129  std::memcpy(msg->bytes + pos, lreg_ptr.get(), lreg_size);
3130  pos += lreg_size;
3131  //std::cout << "broadcast_arg lreg_ptr " << lreg_ptr.get() << std::endl;
3132  /* mark another reader on the copy */
3133  copy = detail::register_data_copy<valueT>(copy, nullptr, true);
3134  /* create a function that will be invoked upon RMA completion at the target */
3135  std::function<void(void)> *fn = new std::function<void(void)>([=]() mutable {
3136  /* shared_ptr of value and registration captured by value so resetting
3137  * them here will eventually release the memory/registration */
3139  lreg_ptr.reset();
3140  });
3141  std::intptr_t fn_ptr{reinterpret_cast<std::intptr_t>(fn)};
3142  std::memcpy(msg->bytes + pos, &fn_ptr, sizeof(fn_ptr));
3143  pos += sizeof(fn_ptr);
3144  }
3145  }
3146 
3147  /* mark the beginning of the keys */
3148  msg->tt_id.key_offset = pos;
3149 
3150  /* pack all keys for this owner */
3151  int num_keys = 0;
3152  do {
3153  ++num_keys;
3154  pos = pack(*it, msg->bytes, pos);
3155  ++it;
3156  } while (it < keylist_sorted.end() && keymap(*it) == owner);
3157  msg->tt_id.num_keys = num_keys;
3158 
3159  tp->tdm.module->outgoing_message_start(tp, owner, NULL);
3160  tp->tdm.module->outgoing_message_pack(tp, owner, NULL, NULL, 0);
3161  //std::cout << "broadcast_arg send_am owner " << owner << std::endl;
3162  parsec_ce.send_am(&parsec_ce, world_impl.parsec_ttg_tag(), owner, static_cast<void *>(msg.get()),
3163  sizeof(msg_header_t) + pos);
3164  }
3165  /* handle local keys */
3166  broadcast_arg_local<i>(local_begin, local_end, value);
3167  } else {
3168  /* handle local keys */
3169  broadcast_arg_local<i>(keylist.begin(), keylist.end(), value);
3170  }
3171  }
3172 
3173  // Used by invoke to set all arguments associated with a task
3174  // Is: index sequence of elements in args
3175  // Js: index sequence of input terminals to set
3176  template <typename Key, typename... Ts, size_t... Is, size_t... Js>
3177  std::enable_if_t<ttg::meta::is_none_void_v<Key>, void> set_args(std::index_sequence<Is...>,
3178  std::index_sequence<Js...>, const Key &key,
3179  const std::tuple<Ts...> &args) {
3180  static_assert(sizeof...(Js) == sizeof...(Is));
3181  constexpr size_t js[] = {Js...};
3182  int junk[] = {0, (set_arg<js[Is]>(key, TT::get<Is>(args)), 0)...};
3183  junk[0]++;
3184  }
3185 
3186  // Used by invoke to set all arguments associated with a task
3187  // Is: index sequence of input terminals to set
3188  template <typename Key, typename... Ts, size_t... Is>
3189  std::enable_if_t<ttg::meta::is_none_void_v<Key>, void> set_args(std::index_sequence<Is...> is, const Key &key,
3190  const std::tuple<Ts...> &args) {
3191  set_args(std::index_sequence_for<Ts...>{}, is, key, args);
3192  }
3193 
3194  // Used by invoke to set all arguments associated with a task
3195  // Is: index sequence of elements in args
3196  // Js: index sequence of input terminals to set
3197  template <typename Key = keyT, typename... Ts, size_t... Is, size_t... Js>
3198  std::enable_if_t<ttg::meta::is_void_v<Key>, void> set_args(std::index_sequence<Is...>, std::index_sequence<Js...>,
3199  const std::tuple<Ts...> &args) {
3200  static_assert(sizeof...(Js) == sizeof...(Is));
3201  constexpr size_t js[] = {Js...};
3202  int junk[] = {0, (set_arg<js[Is], void>(TT::get<Is>(args)), 0)...};
3203  junk[0]++;
3204  }
3205 
3206  // Used by invoke to set all arguments associated with a task
3207  // Is: index sequence of input terminals to set
3208  template <typename Key = keyT, typename... Ts, size_t... Is>
3209  std::enable_if_t<ttg::meta::is_void_v<Key>, void> set_args(std::index_sequence<Is...> is,
3210  const std::tuple<Ts...> &args) {
3211  set_args(std::index_sequence_for<Ts...>{}, is, args);
3212  }
3213 
3214  public:
3217  template <std::size_t i>
3218  void set_static_argstream_size(std::size_t size) {
3219  assert(std::get<i>(input_reducers) && "TT::set_static_argstream_size called on nonstreaming input terminal");
3220  assert(size > 0 && "TT::set_static_argstream_size(key,size) called with size=0");
3221 
3222  this->trace(world.rank(), ":", get_name(), ": setting global stream size for terminal ", i);
3223 
3224  // Check if stream is already bounded
3225  if (static_stream_goal[i] < std::numeric_limits<std::size_t>::max()) {
3226  ttg::print_error(world.rank(), ":", get_name(), " : error stream is already bounded : ", i);
3227  throw std::runtime_error("TT::set_static_argstream_size called for a bounded stream");
3228  }
3229 
3230  static_stream_goal[i] = size;
3231  }
3232 
3236  template <std::size_t i, typename Key>
3237  std::enable_if_t<!ttg::meta::is_void_v<Key>, void> set_argstream_size(const Key &key, std::size_t size) {
3238  // preconditions
3239  assert(std::get<i>(input_reducers) && "TT::set_argstream_size called on nonstreaming input terminal");
3240  assert(size > 0 && "TT::set_argstream_size(key,size) called with size=0");
3241 
3242  // body
3243  const auto owner = keymap(key);
3244  if (owner != world.rank()) {
3245  ttg::trace(world.rank(), ":", get_name(), ":", key, " : forwarding stream size for terminal ", i);
3246  using msg_t = detail::msg_t;
3247  auto &world_impl = world.impl();
3248  uint64_t pos = 0;
3249  std::unique_ptr<msg_t> msg = std::make_unique<msg_t>(get_instance_id(), world_impl.taskpool()->taskpool_id,
3251  world_impl.rank(), 1);
3252  /* pack the key */
3253  pos = pack(key, msg->bytes, pos);
3254  pos = pack(size, msg->bytes, pos);
3255  parsec_taskpool_t *tp = world_impl.taskpool();
3256  tp->tdm.module->outgoing_message_start(tp, owner, NULL);
3257  tp->tdm.module->outgoing_message_pack(tp, owner, NULL, NULL, 0);
3258  parsec_ce.send_am(&parsec_ce, world_impl.parsec_ttg_tag(), owner, static_cast<void *>(msg.get()),
3259  sizeof(msg_header_t) + pos);
3260  } else {
3261  ttg::trace(world.rank(), ":", get_name(), ":", key, " : setting stream size to ", size, " for terminal ", i);
3262 
3263  auto hk = reinterpret_cast<parsec_key_t>(&key);
3264  task_t *task;
3265  parsec_hash_table_lock_bucket(&tasks_table, hk);
3266  if (nullptr == (task = (task_t *)parsec_hash_table_nolock_find(&tasks_table, hk))) {
3267  task = create_new_task(key);
3268  world.impl().increment_created();
3269  parsec_hash_table_nolock_insert(&tasks_table, &task->tt_ht_item);
3270  if( world.impl().dag_profiling() ) {
3271 #if defined(PARSEC_PROF_GRAPHER)
3272  parsec_prof_grapher_task(&task->parsec_task, world.impl().execution_stream()->th_id, 0, *(uintptr_t*)&(task->parsec_task.locals[0]));
3273 #endif
3274  }
3275  }
3276  parsec_hash_table_unlock_bucket(&tasks_table, hk);
3277 
3278  // TODO: Unfriendly implementation, cannot check if stream is already bounded
3279  // TODO: Unfriendly implementation, cannot check if stream has been finalized already
3280 
3281  // commit changes
3282  // 1) "lock" the stream by incrementing the reduce_count
3283  // 2) set the goal
3284  // 3) "unlock" the stream
3285  // only one thread will see the reduce_count be zero and the goal match the size
3286  task->streams[i].reduce_count.fetch_add(1, std::memory_order_acquire);
3287  task->streams[i].goal = size;
3288  auto c = task->streams[i].reduce_count.fetch_sub(1, std::memory_order_release);
3289  if (1 == c && (task->streams[i].size >= size)) {
3290  release_task(task);
3291  }
3292  }
3293  }
3294 
3297  template <std::size_t i, typename Key = keyT>
3298  std::enable_if_t<ttg::meta::is_void_v<Key>, void> set_argstream_size(std::size_t size) {
3299  // preconditions
3300  assert(std::get<i>(input_reducers) && "TT::set_argstream_size called on nonstreaming input terminal");
3301  assert(size > 0 && "TT::set_argstream_size(key,size) called with size=0");
3302 
3303  // body
3304  const auto owner = keymap();
3305  if (owner != world.rank()) {
3306  ttg::trace(world.rank(), ":", get_name(), " : forwarding stream size for terminal ", i);
3307  using msg_t = detail::msg_t;
3308  auto &world_impl = world.impl();
3309  uint64_t pos = 0;
3310  std::unique_ptr<msg_t> msg = std::make_unique<msg_t>(get_instance_id(), world_impl.taskpool()->taskpool_id,
3312  world_impl.rank(), 0);
3313  pos = pack(size, msg->bytes, pos);
3314  parsec_taskpool_t *tp = world_impl.taskpool();
3315  tp->tdm.module->outgoing_message_start(tp, owner, NULL);
3316  tp->tdm.module->outgoing_message_pack(tp, owner, NULL, NULL, 0);
3317  parsec_ce.send_am(&parsec_ce, world_impl.parsec_ttg_tag(), owner, static_cast<void *>(msg.get()),
3318  sizeof(msg_header_t) + pos);
3319  } else {
3320  ttg::trace(world.rank(), ":", get_name(), " : setting stream size to ", size, " for terminal ", i);
3321 
3322  parsec_key_t hk = 0;
3323  task_t *task;
3324  parsec_hash_table_lock_bucket(&tasks_table, hk);
3325  if (nullptr == (task = (task_t *)parsec_hash_table_nolock_find(&tasks_table, hk))) {
3326  task = create_new_task(ttg::Void{});
3327  world.impl().increment_created();
3328  parsec_hash_table_nolock_insert(&tasks_table, &task->tt_ht_item);
3329  if( world.impl().dag_profiling() ) {
3330 #if defined(PARSEC_PROF_GRAPHER)
3331  parsec_prof_grapher_task(&task->parsec_task, world.impl().execution_stream()->th_id, 0, *(uintptr_t*)&(task->parsec_task.locals[0]));
3332 #endif
3333  }
3334  }
3335  parsec_hash_table_unlock_bucket(&tasks_table, hk);
3336 
3337  // TODO: Unfriendly implementation, cannot check if stream is already bounded
3338  // TODO: Unfriendly implementation, cannot check if stream has been finalized already
3339 
3340  // commit changes
3341  // 1) "lock" the stream by incrementing the reduce_count
3342  // 2) set the goal
3343  // 3) "unlock" the stream
3344  // only one thread will see the reduce_count be zero and the goal match the size
3345  task->streams[i].reduce_count.fetch_add(1, std::memory_order_acquire);
3346  task->streams[i].goal = size;
3347  auto c = task->streams[i].reduce_count.fetch_sub(1, std::memory_order_release);
3348  if (1 == c && (task->streams[i].size >= size)) {
3349  release_task(task);
3350  }
3351  }
3352  }
3353 
3355  template <std::size_t i, typename Key>
3356  std::enable_if_t<!ttg::meta::is_void_v<Key>, void> finalize_argstream(const Key &key) {
3357  // preconditions
3358  assert(std::get<i>(input_reducers) && "TT::finalize_argstream called on nonstreaming input terminal");
3359 
3360  // body
3361  const auto owner = keymap(key);
3362  if (owner != world.rank()) {
3363  ttg::trace(world.rank(), ":", get_name(), " : ", key, ": forwarding stream finalize for terminal ", i);
3364  using msg_t = detail::msg_t;
3365  auto &world_impl = world.impl();
3366  uint64_t pos = 0;
3367  std::unique_ptr<msg_t> msg = std::make_unique<msg_t>(get_instance_id(), world_impl.taskpool()->taskpool_id,
3369  world_impl.rank(), 1);
3370  /* pack the key */
3371  pos = pack(key, msg->bytes, pos);
3372  parsec_taskpool_t *tp = world_impl.taskpool();
3373  tp->tdm.module->outgoing_message_start(tp, owner, NULL);
3374  tp->tdm.module->outgoing_message_pack(tp, owner, NULL, NULL, 0);
3375  parsec_ce.send_am(&parsec_ce, world_impl.parsec_ttg_tag(), owner, static_cast<void *>(msg.get()),
3376  sizeof(msg_header_t) + pos);
3377  } else {
3378  ttg::trace(world.rank(), ":", get_name(), " : ", key, ": finalizing stream for terminal ", i);
3379 
3380  auto hk = reinterpret_cast<parsec_key_t>(&key);
3381  task_t *task = nullptr;
3382  //parsec_hash_table_lock_bucket(&tasks_table, hk);
3383  if (nullptr == (task = (task_t *)parsec_hash_table_find(&tasks_table, hk))) {
3384  ttg::print_error(world.rank(), ":", get_name(), ":", key,
3385  " : error finalize called on stream that never received an input data: ", i);
3386  throw std::runtime_error("TT::finalize called on stream that never received an input data");
3387  }
3388 
3389  // TODO: Unfriendly implementation, cannot check if stream is already bounded
3390  // TODO: Unfriendly implementation, cannot check if stream has been finalized already
3391 
3392  // commit changes
3393  // 1) "lock" the stream by incrementing the reduce_count
3394  // 2) set the goal
3395  // 3) "unlock" the stream
3396  // only one thread will see the reduce_count be zero and the goal match the size
3397  task->streams[i].reduce_count.fetch_add(1, std::memory_order_acquire);
3398  task->streams[i].goal = 1;
3399  auto c = task->streams[i].reduce_count.fetch_sub(1, std::memory_order_release);
3400  if (1 == c && (task->streams[i].size >= 1)) {
3401  release_task(task);
3402  }
3403  }
3404  }
3405 
3407  template <std::size_t i, bool key_is_void = ttg::meta::is_void_v<keyT>>
3408  std::enable_if_t<key_is_void, void> finalize_argstream() {
3409  // preconditions
3410  assert(std::get<i>(input_reducers) && "TT::finalize_argstream called on nonstreaming input terminal");
3411 
3412  // body
3413  const auto owner = keymap();
3414  if (owner != world.rank()) {
3415  ttg::trace(world.rank(), ":", get_name(), ": forwarding stream finalize for terminal ", i);
3416  using msg_t = detail::msg_t;
3417  auto &world_impl = world.impl();
3418  uint64_t pos = 0;
3419  std::unique_ptr<msg_t> msg = std::make_unique<msg_t>(get_instance_id(), world_impl.taskpool()->taskpool_id,
3421  world_impl.rank(), 0);
3422  parsec_taskpool_t *tp = world_impl.taskpool();
3423  tp->tdm.module->outgoing_message_start(tp, owner, NULL);
3424  tp->tdm.module->outgoing_message_pack(tp, owner, NULL, NULL, 0);
3425  parsec_ce.send_am(&parsec_ce, world_impl.parsec_ttg_tag(), owner, static_cast<void *>(msg.get()),
3426  sizeof(msg_header_t) + pos);
3427  } else {
3428  ttg::trace(world.rank(), ":", get_name(), ": finalizing stream for terminal ", i);
3429 
3430  auto hk = static_cast<parsec_key_t>(0);
3431  task_t *task = nullptr;
3432  if (nullptr == (task = (task_t *)parsec_hash_table_find(&tasks_table, hk))) {
3433  ttg::print_error(world.rank(), ":", get_name(),
3434  " : error finalize called on stream that never received an input data: ", i);
3435  throw std::runtime_error("TT::finalize called on stream that never received an input data");
3436  }
3437 
3438  // TODO: Unfriendly implementation, cannot check if stream is already bounded
3439  // TODO: Unfriendly implementation, cannot check if stream has been finalized already
3440 
3441  // commit changes
3442  // 1) "lock" the stream by incrementing the reduce_count
3443  // 2) set the goal
3444  // 3) "unlock" the stream
3445  // only one thread will see the reduce_count be zero and the goal match the size
3446  task->streams[i].reduce_count.fetch_add(1, std::memory_order_acquire);
3447  task->streams[i].goal = 1;
3448  auto c = task->streams[i].reduce_count.fetch_sub(1, std::memory_order_release);
3449  if (1 == c && (task->streams[i].size >= 1)) {
3450  release_task(task);
3451  }
3452  }
3453  }
3454 
3455  template<typename Value>
3456  void copy_mark_pushout(const Value& value) {
3457 
3458  assert(detail::parsec_ttg_caller->dev_ptr && detail::parsec_ttg_caller->dev_ptr->gpu_task);
3459  parsec_gpu_task_t *gpu_task = detail::parsec_ttg_caller->dev_ptr->gpu_task;
3460  auto check_parsec_data = [&](parsec_data_t* data) {
3461  if (data->owner_device != 0) {
3462  /* find the flow */
3463  int flowidx = 0;
3464  while (flowidx < MAX_PARAM_COUNT &&
3465  gpu_task->flow[flowidx]->flow_flags != PARSEC_FLOW_ACCESS_NONE) {
3466  if (detail::parsec_ttg_caller->parsec_task.data[flowidx].data_in->original == data) {
3467  /* found the right data, set the corresponding flow as pushout */
3468  break;
3469  }
3470  ++flowidx;
3471  }
3472  if (flowidx == MAX_PARAM_COUNT) {
3473  throw std::runtime_error("Cannot add more than MAX_PARAM_COUNT flows to a task!");
3474  }
3475  if (gpu_task->flow[flowidx]->flow_flags == PARSEC_FLOW_ACCESS_NONE) {
3476  /* no flow found, add one and mark it pushout */
3477  detail::parsec_ttg_caller->parsec_task.data[flowidx].data_in = data->device_copies[0];
3478  gpu_task->flow_nb_elts[flowidx] = data->nb_elts;
3479  }
3480  /* need to mark the flow RW to make PaRSEC happy */
3481  ((parsec_flow_t *)gpu_task->flow[flowidx])->flow_flags |= PARSEC_FLOW_ACCESS_RW;
3482  gpu_task->pushout |= 1<<flowidx;
3483  }
3484  };
3486  [&](parsec_data_t* data){
3487  check_parsec_data(data);
3488  });
3489  }
3490 
3491 
3492  /* check whether a data needs to be pushed out */
3493  template <std::size_t i, typename Value, typename RemoteCheckFn>
3494  std::enable_if_t<!std::is_void_v<std::decay_t<Value>>,
3495  void>
3496  do_prepare_send(const Value &value, RemoteCheckFn&& remote_check) {
3497  constexpr const bool value_is_const = std::is_const_v<std::tuple_element_t<i, input_args_type>>;
3498 
3499  /* get the copy */
3502 
3503  /* if there is no copy we don't need to prepare anything */
3504  if (nullptr == copy) {
3505  return;
3506  }
3507 
3509  bool need_pushout = false;
3510 
3512  /* already marked pushout, skip the rest */
3513  return;
3514  }
3515 
3516  /* TODO: remove this once we support reductions on the GPU */
3517  auto &reducer = std::get<i>(input_reducers);
3518  if (reducer) {
3519  /* reductions are currently done only on the host so push out */
3520  copy_mark_pushout(value);
3522  return;
3523  }
3524 
3525  if constexpr (value_is_const) {
3527  /* The data has been modified previously. If not all devices can access
3528  * their peers then we need to push out to the host so that all devices
3529  * have the data available for reading.
3530  * NOTE: we currently don't allow users to force the next writer to be
3531  * on a different device. In that case PaRSEC would take the host-side
3532  * copy. If we change our restriction we need to revisit this.
3533  * Ideally, PaRSEC would take the device copy if the owner moves... */
3534  need_pushout = !detail::all_devices_peer_access;
3535  }
3536 
3537  /* check for multiple readers */
3540  }
3541 
3543  /* there is a writer already, we will need to create a copy */
3544  need_pushout = true;
3545  }
3546 
3548  } else {
3551  need_pushout = true;
3552  } else {
3554  /* there are readers, we will need to create a copy */
3555  need_pushout = true;
3556  }
3558  }
3559  }
3560 
3561  if constexpr (!derived_has_device_op()) {
3562  need_pushout = true;
3563  }
3564 
3565  /* check if there are non-local successors if it's a device task */
3566  if (!need_pushout) {
3567  bool device_supported = false;
3568  if constexpr (derived_has_cuda_op()) {
3569  device_supported = world.impl().mpi_support(ttg::ExecutionSpace::CUDA);
3570  } else if constexpr (derived_has_hip_op()) {
3571  device_supported = world.impl().mpi_support(ttg::ExecutionSpace::HIP);
3572  } else if constexpr (derived_has_level_zero_op()) {
3573  device_supported = world.impl().mpi_support(ttg::ExecutionSpace::L0);
3574  }
3575  /* if MPI supports the device we don't care whether we have remote peers
3576  * because we can send from the device directly */
3577  if (!device_supported) {
3578  need_pushout = remote_check();
3579  }
3580  }
3581 
3582  if (need_pushout) {
3583  copy_mark_pushout(value);
3585  }
3586  }
3587 
3588  /* check whether a data needs to be pushed out */
3589  template <std::size_t i, typename Key, typename Value>
3590  std::enable_if_t<!ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>,
3591  void>
3592  prepare_send(const ttg::span<const Key> &keylist, const Value &value) {
3593  auto remote_check = [&](){
3594  auto world = ttg_default_execution_context();
3595  int rank = world.rank();
3596  bool remote = keylist.end() != std::find_if(keylist.begin(), keylist.end(),
3597  [&](const Key &key) { return keymap(key) != rank; });
3598  return remote;
3599  };
3600  do_prepare_send<i>(value, remote_check);
3601  }
3602 
3603  template <std::size_t i, typename Key, typename Value>
3604  std::enable_if_t<ttg::meta::is_void_v<Key> && !std::is_void_v<std::decay_t<Value>>,
3605  void>
3606  prepare_send(const Value &value) {
3607  auto remote_check = [&](){
3608  auto world = ttg_default_execution_context();
3609  int rank = world.rank();
3610  return (keymap() != rank);
3611  };
3612  do_prepare_send<i>(value, remote_check);
3613  }
3614 
3615  private:
3616  // Copy/assign/move forbidden ... we could make it work using
3617  // PIMPL for this base class. However, this instance of the base
3618  // class is tied to a specific instance of a derived class a
3619  // pointer to which is captured for invoking derived class
3620  // functions. Thus, not only does the derived class has to be
3621  // involved but we would have to do it in a thread safe way
3622  // including for possibly already running tasks and remote
3623  // references. This is not worth the effort ... wherever you are
3624  // wanting to move/assign an TT you should be using a pointer.
3625  TT(const TT &other) = delete;
3626  TT &operator=(const TT &other) = delete;
3627  TT(TT &&other) = delete;
3628  TT &operator=(TT &&other) = delete;
3629 
3630  // Registers the callback for the i'th input terminal
3631  template <typename terminalT, std::size_t i>
3632  void register_input_callback(terminalT &input) {
3633  using valueT = std::decay_t<typename terminalT::value_type>;
3634  if (input.is_pull_terminal) {
3635  num_pullins++;
3636  }
3638  // case 1: nonvoid key, nonvoid value
3640  if constexpr (!ttg::meta::is_void_v<keyT> && !std::is_void_v<valueT>) {
3641  auto move_callback = [this](const keyT &key, valueT &&value) {
3642  set_arg<i, keyT, valueT>(key, std::forward<valueT>(value));
3643  };
3644  auto send_callback = [this](const keyT &key, const valueT &value) {
3645  set_arg<i, keyT, const valueT &>(key, value);
3646  };
3647  auto broadcast_callback = [this](const ttg::span<const keyT> &keylist, const valueT &value) {
3648  broadcast_arg<i, keyT, valueT>(keylist, value);
3649  };
3650  auto prepare_send_callback = [this](const ttg::span<const keyT> &keylist, const valueT &value) {
3651  prepare_send<i, keyT, valueT>(keylist, value);
3652  };
3653  auto setsize_callback = [this](const keyT &key, std::size_t size) { set_argstream_size<i>(key, size); };
3654  auto finalize_callback = [this](const keyT &key) { finalize_argstream<i>(key); };
3655  input.set_callback(send_callback, move_callback, broadcast_callback,
3656  setsize_callback, finalize_callback, prepare_send_callback);
3657  }
3659  // case 2: nonvoid key, void value, mixed inputs
3661  else if constexpr (!ttg::meta::is_void_v<keyT> && std::is_void_v<valueT>) {
3662  auto send_callback = [this](const keyT &key) { set_arg<i, keyT, ttg::Void>(key, ttg::Void{}); };
3663  auto setsize_callback = [this](const keyT &key, std::size_t size) { set_argstream_size<i>(key, size); };
3664  auto finalize_callback = [this](const keyT &key) { finalize_argstream<i>(key); };
3665  input.set_callback(send_callback, send_callback, {}, setsize_callback, finalize_callback);
3666  }
3668  // case 3: nonvoid key, void value, no inputs
3669  // NOTE: subsumed in case 2 above, kept for historical reasons
3672  // case 4: void key, nonvoid value
3674  else if constexpr (ttg::meta::is_void_v<keyT> && !std::is_void_v<valueT>) {
3675  auto move_callback = [this](valueT &&value) { set_arg<i, keyT, valueT>(std::forward<valueT>(value)); };
3676  auto send_callback = [this](const valueT &value) {
3677  if constexpr (std::is_copy_constructible_v<valueT>) {
3678  set_arg<i, keyT, const valueT &>(value);
3679  }
3680  else {
3681  throw std::logic_error(std::string("TTG::PaRSEC: send_callback is invoked on datum of type ") + typeid(std::decay_t<valueT>).name() + " which is not copy constructible, std::move datum into send/broadcast statement");
3682  }
3683  };
3684  auto setsize_callback = [this](std::size_t size) { set_argstream_size<i>(size); };
3685  auto finalize_callback = [this]() { finalize_argstream<i>(); };
3686  auto prepare_send_callback = [this](const valueT &value) {
3687  prepare_send<i, void>(value);
3688  };
3689  input.set_callback(send_callback, move_callback, {}, setsize_callback, finalize_callback, prepare_send_callback);
3690  }
3692  // case 5: void key, void value, mixed inputs
3694  else if constexpr (ttg::meta::is_void_v<keyT> && std::is_void_v<valueT>) {
3695  auto send_callback = [this]() { set_arg<i, keyT, ttg::Void>(ttg::Void{}); };
3696  auto setsize_callback = [this](std::size_t size) { set_argstream_size<i>(size); };
3697  auto finalize_callback = [this]() { finalize_argstream<i>(); };
3698  input.set_callback(send_callback, send_callback, {}, setsize_callback, finalize_callback);
3699  }
3701  // case 6: void key, void value, no inputs
3702  // NOTE: subsumed in case 5 above, kept for historical reasons
3704  else
3705  ttg::abort();
3706  }
3707 
3708  template <std::size_t... IS>
3709  void register_input_callbacks(std::index_sequence<IS...>) {
3710  int junk[] = {
3711  0,
3712  (register_input_callback<std::tuple_element_t<IS, input_terminals_type>, IS>(std::get<IS>(input_terminals)),
3713  0)...};
3714  junk[0]++;
3715  }
3716 
3717  template <std::size_t... IS, typename inedgesT>
3718  void connect_my_inputs_to_incoming_edge_outputs(std::index_sequence<IS...>, inedgesT &inedges) {
3719  int junk[] = {0, (std::get<IS>(inedges).set_out(&std::get<IS>(input_terminals)), 0)...};
3720  junk[0]++;
3721  }
3722 
3723  template <std::size_t... IS, typename outedgesT>
3724  void connect_my_outputs_to_outgoing_edge_inputs(std::index_sequence<IS...>, outedgesT &outedges) {
3725  int junk[] = {0, (std::get<IS>(outedges).set_in(&std::get<IS>(output_terminals)), 0)...};
3726  junk[0]++;
3727  }
3728 
3729 #if 0
3730  template <typename input_terminals_tupleT, std::size_t... IS, typename flowsT>
3731  void _initialize_flows(std::index_sequence<IS...>, flowsT &&flows) {
3732  int junk[] = {0,
3733  (*(const_cast<std::remove_const_t<decltype(flows[IS]->flow_flags)> *>(&(flows[IS]->flow_flags))) =
3734  (std::is_const_v<std::tuple_element_t<IS, input_terminals_tupleT>> ? PARSEC_FLOW_ACCESS_READ
3735  : PARSEC_FLOW_ACCESS_RW),
3736  0)...};
3737  junk[0]++;
3738  }
3739 
3740  template <typename input_terminals_tupleT, typename flowsT>
3741  void initialize_flows(flowsT &&flows) {
3742  _initialize_flows<input_terminals_tupleT>(
3743  std::make_index_sequence<std::tuple_size<input_terminals_tupleT>::value>{}, flows);
3744  }
3745 #endif // 0
3746 
3747  void fence() override { ttg::default_execution_context().impl().fence(); }
3748 
3749  static int key_equal(parsec_key_t a, parsec_key_t b, void *user_data) {
3750  if constexpr (std::is_same_v<keyT, void>) {
3751  return 1;
3752  } else {
3753  keyT &ka = *(reinterpret_cast<keyT *>(a));
3754  keyT &kb = *(reinterpret_cast<keyT *>(b));
3755  return ka == kb;
3756  }
3757  }
3758 
3759  static uint64_t key_hash(parsec_key_t k, void *user_data) {
3760  constexpr const bool keyT_is_Void = ttg::meta::is_void_v<keyT>;
3761  if constexpr (keyT_is_Void || std::is_same_v<keyT, void>) {
3762  return 0;
3763  } else {
3764  keyT &kk = *(reinterpret_cast<keyT *>(k));
3765  using ttg::hash;
3766  uint64_t hv = hash<std::decay_t<decltype(kk)>>{}(kk);
3767  return hv;
3768  }
3769  }
3770 
3771  static char *key_print(char *buffer, size_t buffer_size, parsec_key_t k, void *user_data) {
3772  if constexpr (std::is_same_v<keyT, void>) {
3773  buffer[0] = '\0';
3774  return buffer;
3775  } else {
3776  keyT kk = *(reinterpret_cast<keyT *>(k));
3777  std::stringstream iss;
3778  iss << kk;
3779  memset(buffer, 0, buffer_size);
3780  iss.get(buffer, buffer_size);
3781  return buffer;
3782  }
3783  }
3784 
3785  static parsec_key_t make_key(const parsec_taskpool_t *tp, const parsec_assignment_t *as) {
3786  // we use the parsec_assignment_t array as a scratchpad to store the hash and address of the key
3787  keyT *key = *(keyT**)&(as[2]);
3788  return reinterpret_cast<parsec_key_t>(key);
3789  }
3790 
3791  static char *parsec_ttg_task_snprintf(char *buffer, size_t buffer_size, const parsec_task_t *parsec_task) {
3792  if(buffer_size == 0)
3793  return buffer;
3794 
3795  if constexpr (ttg::meta::is_void_v<keyT>) {
3796  snprintf(buffer, buffer_size, "%s()[]<%d>", parsec_task->task_class->name, parsec_task->priority);
3797  } else {
3798  const task_t *task = reinterpret_cast<const task_t*>(parsec_task);
3799  std::stringstream ss;
3800  ss << task->key;
3801 
3802  std::string keystr = ss.str();
3803  std::replace(keystr.begin(), keystr.end(), '(', ':');
3804  std::replace(keystr.begin(), keystr.end(), ')', ':');
3805 
3806  snprintf(buffer, buffer_size, "%s(%s)[]<%d>", parsec_task->task_class->name, keystr.c_str(), parsec_task->priority);
3807  }
3808  return buffer;
3809  }
3810 
3811 #if defined(PARSEC_PROF_TRACE)
3812  static void *parsec_ttg_task_info(void *dst, const void *data, size_t size)
3813  {
3814  const task_t *task = reinterpret_cast<const task_t *>(data);
3815 
3816  if constexpr (ttg::meta::is_void_v<keyT>) {
3817  snprintf(reinterpret_cast<char*>(dst), size, "()");
3818  } else {
3819  std::stringstream ss;
3820  ss << task->key;
3821  snprintf(reinterpret_cast<char*>(dst), size, "%s", ss.str().c_str());
3822  }
3823  return dst;
3824  }
3825 #endif
3826 
3827  parsec_key_fn_t tasks_hash_fcts = {key_equal, key_print, key_hash};
3828 
3829  static parsec_hook_return_t complete_task_and_release(parsec_execution_stream_t *es, parsec_task_t *parsec_task) {
3830 
3831  //std::cout << "complete_task_and_release: task " << parsec_task << std::endl;
3832 
3833  task_t *task = (task_t*)parsec_task;
3834 
3835 #ifdef TTG_HAVE_COROUTINE
3836  /* if we still have a coroutine handle we invoke it one more time to get the sends/broadcasts */
3837  if (task->suspended_task_address) {
3838  assert(task->coroutine_id != ttg::TaskCoroutineID::Invalid);
3839 #ifdef TTG_HAVE_DEVICE
3840  if (task->coroutine_id == ttg::TaskCoroutineID::DeviceTask) {
3841  // get the device task from the coroutine handle
3842  auto dev_task = ttg::device::detail::device_task_handle_type::from_address(task->suspended_task_address);
3843 
3844  // get the promise which contains the views
3845  auto dev_data = dev_task.promise();
3846 
3847  assert(dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_SENDOUT ||
3848  dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_COMPLETE);
3849 
3850  /* execute the sends we stored */
3851  if (dev_data.state() == ttg::device::detail::TTG_DEVICE_CORO_SENDOUT) {
3852  /* set the current task, needed inside the sends */
3854  auto old_output_tls_ptr = task->tt->outputs_tls_ptr_accessor();
3855  task->tt->set_outputs_tls_ptr();
3856  dev_data.do_sends(); // all sends happen here
3857  task->tt->set_outputs_tls_ptr(old_output_tls_ptr);
3858  detail::parsec_ttg_caller = nullptr;
3859  }
3860  dev_task.destroy(); // safe to destroy the coroutine now
3861  }
3862 #endif // TTG_HAVE_DEVICE
3863  /* the coroutine should have completed and we cannot access the promise anymore */
3864  task->suspended_task_address = nullptr;
3865  }
3866 #endif // TTG_HAVE_COROUTINE
3867 
3868  /* release our data copies */
3869  for (int i = 0; i < task->data_count; i++) {
3870  detail::ttg_data_copy_t *copy = task->copies[i];
3871  if (nullptr == copy) continue;
3873  task->copies[i] = nullptr;
3874  }
3875 
3876  for (auto& c : task->tt->constraints_complete) {
3877  if constexpr(std::is_void_v<keyT>) {
3878  c();
3879  } else {
3880  c(task->key);
3881  }
3882  }
3883  return PARSEC_HOOK_RETURN_DONE;
3884  }
3885 
3886  public:
3887  template <typename keymapT = ttg::detail::default_keymap<keyT>,
3888  typename priomapT = ttg::detail::default_priomap<keyT>>
3889  TT(const std::string &name, const std::vector<std::string> &innames, const std::vector<std::string> &outnames,
3890  ttg::World world, keymapT &&keymap_ = keymapT(), priomapT &&priomap_ = priomapT())
3891  : ttg::TTBase(name, numinedges, numouts)
3892  , world(world)
3893  // if using default keymap, rebind to the given world
3894  , keymap(std::is_same<keymapT, ttg::detail::default_keymap<keyT>>::value
3895  ? decltype(keymap)(ttg::detail::default_keymap<keyT>(world))
3896  : decltype(keymap)(std::forward<keymapT>(keymap_)))
3897  , priomap(decltype(keymap)(std::forward<priomapT>(priomap_))) {
3898  // Cannot call these in base constructor since terminals not yet constructed
3899  if (innames.size() != numinedges) throw std::logic_error("ttg_parsec::TT: #input names != #input terminals");
3900  if (outnames.size() != numouts) throw std::logic_error("ttg_parsec::TT: #output names != #output terminals");
3901 
3902  auto &world_impl = world.impl();
3903  world_impl.register_op(this);
3904 
3905  if constexpr (numinedges == numins) {
3906  register_input_terminals(input_terminals, innames);
3907  } else {
3908  // create a name for the virtual control input
3909  register_input_terminals(input_terminals, std::array<std::string, 1>{std::string("Virtual Control")});
3910  }
3911  register_output_terminals(output_terminals, outnames);
3912 
3913  register_input_callbacks(std::make_index_sequence<numinedges>{});
3914  int i;
3915 
3916  memset(&self, 0, sizeof(parsec_task_class_t));
3917 
3918  self.name = strdup(get_name().c_str());
3919  self.task_class_id = get_instance_id();
3920  self.nb_parameters = 0;
3921  self.nb_locals = 0;
3922  //self.nb_flows = numflows;
3923  self.nb_flows = MAX_PARAM_COUNT; // we're not using all flows but have to
3924  // trick the device handler into looking at all of them
3925 
3926  if( world_impl.profiling() ) {
3927  // first two ints are used to store the hash of the key.
3928  self.nb_parameters = (sizeof(void*)+sizeof(int)-1)/sizeof(int);
3929  // seconds two ints are used to store a pointer to the key of the task.
3930  self.nb_locals = self.nb_parameters + (sizeof(void*)+sizeof(int)-1)/sizeof(int);
3931 
3932  // If we have parameters and locals, we need to define the corresponding dereference arrays
3933  self.params[0] = &detail::parsec_taskclass_param0;
3934  self.params[1] = &detail::parsec_taskclass_param1;
3935 
3936  self.locals[0] = &detail::parsec_taskclass_param0;
3937  self.locals[1] = &detail::parsec_taskclass_param1;
3938  self.locals[2] = &detail::parsec_taskclass_param2;
3939  self.locals[3] = &detail::parsec_taskclass_param3;
3940  }
3941  self.make_key = make_key;
3942  self.key_functions = &tasks_hash_fcts;
3943  self.task_snprintf = parsec_ttg_task_snprintf;
3944 
3945 #if defined(PARSEC_PROF_TRACE)
3946  self.profile_info = &parsec_ttg_task_info;
3947 #endif
3948 
3949  world_impl.taskpool()->nb_task_classes = std::max(world_impl.taskpool()->nb_task_classes, static_cast<decltype(world_impl.taskpool()->nb_task_classes)>(self.task_class_id+1));
3950  // function_id_to_instance[self.task_class_id] = this;
3951  //self.incarnations = incarnations_array.data();
3952 //#if 0
3953  if constexpr (derived_has_cuda_op()) {
3954  self.incarnations = (__parsec_chore_t *)malloc(3 * sizeof(__parsec_chore_t));
3955  ((__parsec_chore_t *)self.incarnations)[0].type = PARSEC_DEV_CUDA;
3956  ((__parsec_chore_t *)self.incarnations)[0].evaluate = &detail::evaluate_cuda<TT>;
3957  ((__parsec_chore_t *)self.incarnations)[0].hook = &detail::hook_cuda<TT>;
3958  ((__parsec_chore_t *)self.incarnations)[1].type = PARSEC_DEV_NONE;
3959  ((__parsec_chore_t *)self.incarnations)[1].evaluate = NULL;
3960  ((__parsec_chore_t *)self.incarnations)[1].hook = NULL;
3961  } else if constexpr (derived_has_hip_op()) {
3962  self.incarnations = (__parsec_chore_t *)malloc(3 * sizeof(__parsec_chore_t));
3963  ((__parsec_chore_t *)self.incarnations)[0].type = PARSEC_DEV_HIP;
3964  ((__parsec_chore_t *)self.incarnations)[0].evaluate = &detail::evaluate_hip<TT>;
3965  ((__parsec_chore_t *)self.incarnations)[0].hook = &detail::hook_hip<TT>;
3966 
3967  ((__parsec_chore_t *)self.incarnations)[1].type = PARSEC_DEV_NONE;
3968  ((__parsec_chore_t *)self.incarnations)[1].evaluate = NULL;
3969  ((__parsec_chore_t *)self.incarnations)[1].hook = NULL;
3970 #if defined(PARSEC_HAVE_DEV_LEVEL_ZERO_SUPPORT)
3971  } else if constexpr (derived_has_level_zero_op()) {
3972  self.incarnations = (__parsec_chore_t *)malloc(3 * sizeof(__parsec_chore_t));
3973  ((__parsec_chore_t *)self.incarnations)[0].type = PARSEC_DEV_LEVEL_ZERO;
3974  ((__parsec_chore_t *)self.incarnations)[0].evaluate = &detail::evaluate_level_zero<TT>;
3975  ((__parsec_chore_t *)self.incarnations)[0].hook = &detail::hook_level_zero<TT>;
3976 
3977  ((__parsec_chore_t *)self.incarnations)[1].type = PARSEC_DEV_NONE;
3978  ((__parsec_chore_t *)self.incarnations)[1].evaluate = NULL;
3979  ((__parsec_chore_t *)self.incarnations)[1].hook = NULL;
3980 #endif // PARSEC_HAVE_DEV_LEVEL_ZERO_SUPPORT
3981  } else {
3982  self.incarnations = (__parsec_chore_t *)malloc(2 * sizeof(__parsec_chore_t));
3983  ((__parsec_chore_t *)self.incarnations)[0].type = PARSEC_DEV_CPU;
3984  ((__parsec_chore_t *)self.incarnations)[0].evaluate = NULL;
3985  ((__parsec_chore_t *)self.incarnations)[0].hook = &detail::hook<TT>;
3986  ((__parsec_chore_t *)self.incarnations)[1].type = PARSEC_DEV_NONE;
3987  ((__parsec_chore_t *)self.incarnations)[1].evaluate = NULL;
3988  ((__parsec_chore_t *)self.incarnations)[1].hook = NULL;
3989  }
3990 //#endif // 0
3991 
3992  self.release_task = &parsec_release_task_to_mempool_update_nbtasks;
3993  self.complete_execution = complete_task_and_release;
3994 
3995  for (i = 0; i < MAX_PARAM_COUNT; i++) {
3996  parsec_flow_t *flow = new parsec_flow_t;
3997  flow->name = strdup((std::string("flow in") + std::to_string(i)).c_str());
3998  flow->sym_type = PARSEC_SYM_INOUT;
3999  // see initialize_flows below
4000  // flow->flow_flags = PARSEC_FLOW_ACCESS_RW;
4001  flow->dep_in[0] = NULL;
4002  flow->dep_out[0] = NULL;
4003  flow->flow_index = i;
4004  flow->flow_datatype_mask = ~0;
4005  *((parsec_flow_t **)&(self.in[i])) = flow;
4006  }
4007  //*((parsec_flow_t **)&(self.in[i])) = NULL;
4008  //initialize_flows<input_terminals_type>(self.in);
4009 
4010  for (i = 0; i < MAX_PARAM_COUNT; i++) {
4011  parsec_flow_t *flow = new parsec_flow_t;
4012  flow->name = strdup((std::string("flow out") + std::to_string(i)).c_str());
4013  flow->sym_type = PARSEC_SYM_INOUT;
4014  flow->flow_flags = PARSEC_FLOW_ACCESS_READ; // does PaRSEC use this???
4015  flow->dep_in[0] = NULL;
4016  flow->dep_out[0] = NULL;
4017  flow->flow_index = i;
4018  flow->flow_datatype_mask = (1 << i);
4019  *((parsec_flow_t **)&(self.out[i])) = flow;
4020  }
4021  //*((parsec_flow_t **)&(self.out[i])) = NULL;
4022 
4023  self.flags = 0;
4024  self.dependencies_goal = numins; /* (~(uint32_t)0) >> (32 - numins); */
4025 
4026  int nbthreads = 0;
4027  auto *context = world_impl.context();
4028  for (int i = 0; i < context->nb_vp; i++) {
4029  nbthreads += context->virtual_processes[i]->nb_cores;
4030  }
4031 
4032  parsec_mempool_construct(&mempools, PARSEC_OBJ_CLASS(parsec_task_t), sizeof(task_t),
4033  offsetof(parsec_task_t, mempool_owner), nbthreads);
4034 
4035  parsec_hash_table_init(&tasks_table, offsetof(detail::parsec_ttg_task_base_t, tt_ht_item), 8, tasks_hash_fcts,
4036  NULL);
4037 
4038  parsec_hash_table_init(&task_constraint_table, offsetof(detail::parsec_ttg_task_base_t, tt_ht_item), 8, tasks_hash_fcts,
4039  NULL);
4040  }
4041 
4042  template <typename keymapT = ttg::detail::default_keymap<keyT>,
4043  typename priomapT = ttg::detail::default_priomap<keyT>>
4044  TT(const std::string &name, const std::vector<std::string> &innames, const std::vector<std::string> &outnames,
4045  keymapT &&keymap = keymapT(ttg::default_execution_context()), priomapT &&priomap = priomapT())
4046  : TT(name, innames, outnames, ttg::default_execution_context(), std::forward<keymapT>(keymap),
4047  std::forward<priomapT>(priomap)) {}
4048 
4049  template <typename keymapT = ttg::detail::default_keymap<keyT>,
4050  typename priomapT = ttg::detail::default_priomap<keyT>>
4051  TT(const input_edges_type &inedges, const output_edges_type &outedges, const std::string &name,
4052  const std::vector<std::string> &innames, const std::vector<std::string> &outnames, ttg::World world,
4053  keymapT &&keymap_ = keymapT(), priomapT &&priomap = priomapT())
4054  : TT(name, innames, outnames, world, std::forward<keymapT>(keymap_), std::forward<priomapT>(priomap)) {
4055  connect_my_inputs_to_incoming_edge_outputs(std::make_index_sequence<numinedges>{}, inedges);
4056  connect_my_outputs_to_outgoing_edge_inputs(std::make_index_sequence<numouts>{}, outedges);
4057  //DO NOT MOVE THIS - information about the number of pull terminals is only available after connecting the edges.
4058  if constexpr (numinedges > 0) {
4059  register_input_callbacks(std::make_index_sequence<numinedges>{});
4060  }
4061  }
4062  template <typename keymapT = ttg::detail::default_keymap<keyT>,
4063  typename priomapT = ttg::detail::default_priomap<keyT>>
4064  TT(const input_edges_type &inedges, const output_edges_type &outedges, const std::string &name,
4065  const std::vector<std::string> &innames, const std::vector<std::string> &outnames,
4066  keymapT &&keymap = keymapT(ttg::default_execution_context()), priomapT &&priomap = priomapT())
4067  : TT(inedges, outedges, name, innames, outnames, ttg::default_execution_context(),
4068  std::forward<keymapT>(keymap), std::forward<priomapT>(priomap)) {}
4069 
4070  // Destructor checks for unexecuted tasks
4071  virtual ~TT() {
4072  if(nullptr != self.name ) {
4073  free((void*)self.name);
4074  self.name = nullptr;
4075  }
4076 
4077  for (std::size_t i = 0; i < numins; ++i) {
4078  if (inpute_reducers_taskclass[i] != nullptr) {
4079  std::free(inpute_reducers_taskclass[i]);
4080  inpute_reducers_taskclass[i] = nullptr;
4081  }
4082  }
4083  release();
4084  }
4085 
4086  static void ht_iter_cb(void *item, void *cb_data) {
4087  task_t *task = (task_t *)item;
4088  ttT *op = (ttT *)cb_data;
4089  if constexpr (!ttg::meta::is_void_v<keyT>) {
4090  std::cout << "Left over task " << op->get_name() << " " << task->key << std::endl;
4091  } else {
4092  std::cout << "Left over task " << op->get_name() << std::endl;
4093  }
4094  }
4095 
4097  parsec_hash_table_for_all(&tasks_table, ht_iter_cb, this);
4098  }
4099 
4100  virtual void release() override { do_release(); }
4101 
4102  void do_release() {
4103  if (!alive) {
4104  return;
4105  }
4106  alive = false;
4107  /* print all outstanding tasks */
4109  parsec_hash_table_fini(&tasks_table);
4110  parsec_mempool_destruct(&mempools);
4111  // uintptr_t addr = (uintptr_t)self.incarnations;
4112  // free((void *)addr);
4113  free((__parsec_chore_t *)self.incarnations);
4114  for (int i = 0; i < MAX_PARAM_COUNT; i++) {
4115  if (NULL != self.in[i]) {
4116  free(self.in[i]->name);
4117  delete self.in[i];
4118  self.in[i] = nullptr;
4119  }
4120  if (NULL != self.out[i]) {
4121  free(self.out[i]->name);
4122  delete self.out[i];
4123  self.out[i] = nullptr;
4124  }
4125  }
4126  world.impl().deregister_op(this);
4127  }
4128 
4129  static constexpr const ttg::Runtime runtime = ttg::Runtime::PaRSEC;
4130 
4136  template <std::size_t i, typename Reducer>
4137  void set_input_reducer(Reducer &&reducer) {
4138  ttg::trace(world.rank(), ":", get_name(), " : setting reducer for terminal ", i);
4139  std::get<i>(input_reducers) = reducer;
4140 
4141  parsec_task_class_t *tc = inpute_reducers_taskclass[i];
4142  if (nullptr == tc) {
4143  tc = (parsec_task_class_t *)std::calloc(1, sizeof(*tc));
4144  inpute_reducers_taskclass[i] = tc;
4145 
4146  tc->name = strdup((get_name() + std::string(" reducer ") + std::to_string(i)).c_str());
4147  tc->task_class_id = get_instance_id();
4148  tc->nb_parameters = 0;
4149  tc->nb_locals = 0;
4150  tc->nb_flows = numflows;
4151 
4152  auto &world_impl = world.impl();
4153 
4154  if( world_impl.profiling() ) {
4155  // first two ints are used to store the hash of the key.
4156  tc->nb_parameters = (sizeof(void*)+sizeof(int)-1)/sizeof(int);
4157  // seconds two ints are used to store a pointer to the key of the task.
4158  tc->nb_locals = self.nb_parameters + (sizeof(void*)+sizeof(int)-1)/sizeof(int);
4159 
4160  // If we have parameters and locals, we need to define the corresponding dereference arrays
4161  tc->params[0] = &detail::parsec_taskclass_param0;
4162  tc->params[1] = &detail::parsec_taskclass_param1;
4163 
4164  tc->locals[0] = &detail::parsec_taskclass_param0;
4165  tc->locals[1] = &detail::parsec_taskclass_param1;
4166  tc->locals[2] = &detail::parsec_taskclass_param2;
4167  tc->locals[3] = &detail::parsec_taskclass_param3;
4168  }
4169  tc->make_key = make_key;
4170  tc->key_functions = &tasks_hash_fcts;
4171  tc->task_snprintf = parsec_ttg_task_snprintf;
4172 
4173 #if defined(PARSEC_PROF_TRACE)
4174  tc->profile_info = &parsec_ttg_task_info;
4175 #endif
4176 
4177  world_impl.taskpool()->nb_task_classes = std::max(world_impl.taskpool()->nb_task_classes, static_cast<decltype(world_impl.taskpool()->nb_task_classes)>(self.task_class_id+1));
4178 
4179 #if 0
4180  // FIXME: currently only support reduction on the host
4181  if constexpr (derived_has_cuda_op()) {
4182  self.incarnations = (__parsec_chore_t *)malloc(3 * sizeof(__parsec_chore_t));
4183  ((__parsec_chore_t *)self.incarnations)[0].type = PARSEC_DEV_CUDA;
4184  ((__parsec_chore_t *)self.incarnations)[0].evaluate = NULL;
4185  ((__parsec_chore_t *)self.incarnations)[0].hook = detail::hook_cuda;
4186  ((__parsec_chore_t *)self.incarnations)[1].type = PARSEC_DEV_CPU;
4187  ((__parsec_chore_t *)self.incarnations)[1].evaluate = NULL;
4188  ((__parsec_chore_t *)self.incarnations)[1].hook = detail::hook;
4189  ((__parsec_chore_t *)self.incarnations)[2].type = PARSEC_DEV_NONE;
4190  ((__parsec_chore_t *)self.incarnations)[2].evaluate = NULL;
4191  ((__parsec_chore_t *)self.incarnations)[2].hook = NULL;
4192  } else
4193 #endif // 0
4194  {
4195  tc->incarnations = (__parsec_chore_t *)malloc(2 * sizeof(__parsec_chore_t));
4196  ((__parsec_chore_t *)tc->incarnations)[0].type = PARSEC_DEV_CPU;
4197  ((__parsec_chore_t *)tc->incarnations)[0].evaluate = NULL;
4198  ((__parsec_chore_t *)tc->incarnations)[0].hook = &static_reducer_op<i>;
4199  ((__parsec_chore_t *)tc->incarnations)[1].type = PARSEC_DEV_NONE;
4200  ((__parsec_chore_t *)tc->incarnations)[1].evaluate = NULL;
4201  ((__parsec_chore_t *)tc->incarnations)[1].hook = NULL;
4202  }
4203 
4204  /* the reduction task does not alter the termination detection because the target task will execute */
4205  tc->release_task = &parsec_release_task_to_mempool;
4206  tc->complete_execution = NULL;
4207  }
4208  }
4209 
4217  template <std::size_t i, typename Reducer>
4218  void set_input_reducer(Reducer &&reducer, std::size_t size) {
4219  set_input_reducer<i>(std::forward<Reducer>(reducer));
4220  set_static_argstream_size<i>(size);
4221  }
4222 
4223  // Returns reference to input terminal i to facilitate connection --- terminal
4224  // cannot be copied, moved or assigned
4225  template <std::size_t i>
4226  std::tuple_element_t<i, input_terminals_type> *in() {
4227  return &std::get<i>(input_terminals);
4228  }
4229 
4230  // Returns reference to output terminal for purpose of connection --- terminal
4231  // cannot be copied, moved or assigned
4232  template <std::size_t i>
4233  std::tuple_element_t<i, output_terminalsT> *out() {
4234  return &std::get<i>(output_terminals);
4235  }
4236 
4237  // Manual injection of a task with all input arguments specified as a tuple
4238  template <typename Key = keyT>
4239  std::enable_if_t<!ttg::meta::is_void_v<Key> && !ttg::meta::is_empty_tuple_v<input_values_tuple_type>, void> invoke(
4240  const Key &key, const input_values_tuple_type &args) {
4242  if constexpr(!std::is_same_v<Key, key_type>) {
4243  key_type k = key; /* cast that type into the key type we know */
4244  invoke(k, args);
4245  } else {
4246  /* trigger non-void inputs */
4247  set_args(ttg::meta::nonvoid_index_seq<actual_input_tuple_type>{}, key, args);
4248  /* trigger void inputs */
4249  using void_index_seq = ttg::meta::void_index_seq<actual_input_tuple_type>;
4250  set_args(void_index_seq{}, key, ttg::detail::make_void_tuple<void_index_seq::size()>());
4251  }
4252  }
4253 
4254  // Manual injection of a key-free task and all input arguments specified as a tuple
4255  template <typename Key = keyT>
4256  std::enable_if_t<ttg::meta::is_void_v<Key> && !ttg::meta::is_empty_tuple_v<input_values_tuple_type>, void> invoke(
4257  const input_values_tuple_type &args) {
4259  /* trigger non-void inputs */
4260  set_args(ttg::meta::nonvoid_index_seq<actual_input_tuple_type>{}, args);
4261  /* trigger void inputs */
4262  using void_index_seq = ttg::meta::void_index_seq<actual_input_tuple_type>;
4263  set_args(void_index_seq{}, ttg::detail::make_void_tuple<void_index_seq::size()>());
4264  }
4265 
4266  // Manual injection of a task that has no arguments
4267  template <typename Key = keyT>
4268  std::enable_if_t<!ttg::meta::is_void_v<Key> && ttg::meta::is_empty_tuple_v<input_values_tuple_type>, void> invoke(
4269  const Key &key) {
4271 
4272  if constexpr(!std::is_same_v<Key, key_type>) {
4273  key_type k = key; /* cast that type into the key type we know */
4274  invoke(k);
4275  } else {
4276  /* trigger void inputs */
4277  using void_index_seq = ttg::meta::void_index_seq<actual_input_tuple_type>;
4278  set_args(void_index_seq{}, key, ttg::detail::make_void_tuple<void_index_seq::size()>());
4279  }
4280  }
4281 
4282  // Manual injection of a task that has no key or arguments
4283  template <typename Key = keyT>
4284  std::enable_if_t<ttg::meta::is_void_v<Key> && ttg::meta::is_empty_tuple_v<input_values_tuple_type>, void> invoke() {
4286  /* trigger void inputs */
4287  using void_index_seq = ttg::meta::void_index_seq<actual_input_tuple_type>;
4288  set_args(void_index_seq{}, ttg::detail::make_void_tuple<void_index_seq::size()>());
4289  }
4290 
4291  // overrides TTBase::invoke()
4292  void invoke() override {
4293  if constexpr (ttg::meta::is_void_v<keyT> && ttg::meta::is_empty_tuple_v<input_values_tuple_type>)
4294  invoke<keyT>();
4295  else
4296  TTBase::invoke();
4297  }
4298 
4299  private:
4300  template<typename Key, typename Arg, typename... Args, std::size_t I, std::size_t... Is>
4301  void invoke_arglist(std::index_sequence<I, Is...>, const Key& key, Arg&& arg, Args&&... args) {
4302  using arg_type = std::decay_t<Arg>;
4303  if constexpr (ttg::meta::is_ptr_v<arg_type>) {
4304  /* add a reference to the object */
4305  auto copy = ttg_parsec::detail::get_copy(arg);
4306  copy->add_ref();
4307  /* reset readers so that the value can flow without copying */
4308  copy->reset_readers();
4309  auto& val = *arg;
4310  set_arg_impl<I>(key, val, copy);
4312  if constexpr (std::is_rvalue_reference_v<Arg>) {
4313  /* if the ptr was moved in we reset it */
4314  arg.reset();
4315  }
4316  } else if constexpr (!ttg::meta::is_ptr_v<arg_type>) {
4317  set_arg<I>(key, std::forward<Arg>(arg));
4318  }
4319  if constexpr (sizeof...(Is) > 0) {
4320  /* recursive next argument */
4321  invoke_arglist(std::index_sequence<Is...>{}, key, std::forward<Args>(args)...);
4322  }
4323  }
4324 
4325  public:
4326  // Manual injection of a task with all input arguments specified as variadic arguments
4327  template <typename Key = keyT, typename Arg, typename... Args>
4328  std::enable_if_t<!ttg::meta::is_void_v<Key> && !ttg::meta::is_empty_tuple_v<input_values_tuple_type>, void> invoke(
4329  const Key &key, Arg&& arg, Args&&... args) {
4330  static_assert(sizeof...(Args)+1 == std::tuple_size_v<actual_input_tuple_type>,
4331  "Number of arguments to invoke must match the number of task inputs.");
4333  /* trigger non-void inputs */
4334  invoke_arglist(ttg::meta::nonvoid_index_seq<actual_input_tuple_type>{}, key,
4335  std::forward<Arg>(arg), std::forward<Args>(args)...);
4336  //set_args(ttg::meta::nonvoid_index_seq<actual_input_tuple_type>{}, key, args);
4337  /* trigger void inputs */
4338  using void_index_seq = ttg::meta::void_index_seq<actual_input_tuple_type>;
4339  set_args(void_index_seq{}, key, ttg::detail::make_void_tuple<void_index_seq::size()>());
4340  }
4341 
4342  void set_defer_writer(bool value) {
4343  m_defer_writer = value;
4344  }
4345 
4346  bool get_defer_writer(bool value) {
4347  return m_defer_writer;
4348  }
4349 
4350  public:
4351  void make_executable() override {
4352  world.impl().register_tt_profiling(this);
4355  }
4356 
4359  const decltype(keymap) &get_keymap() const { return keymap; }
4360 
4362  template <typename Keymap>
4363  void set_keymap(Keymap &&km) {
4364  keymap = km;
4365  }
4366 
4369  const decltype(priomap) &get_priomap() const { return priomap; }
4370 
4373  template <typename Priomap>
4374  void set_priomap(Priomap &&pm) {
4375  priomap = std::forward<Priomap>(pm);
4376  }
4377 
4383  template<typename Devicemap>
4384  void set_devicemap(Devicemap&& dm) {
4385  //static_assert(derived_has_device_op(), "Device map only allowed on device-enabled TT!");
4386  if constexpr (std::is_same_v<ttg::device::Device, decltype(dm(std::declval<keyT>()))>) {
4387  // dm returns a Device
4388  devicemap = std::forward<Devicemap>(dm);
4389  } else {
4390  // convert dm return into a Device
4391  devicemap = [=](const keyT& key) {
4392  if constexpr (derived_has_cuda_op()) {
4394  } else if constexpr (derived_has_hip_op()) {
4396  } else if constexpr (derived_has_level_zero_op()) {
4398  } else {
4399  throw std::runtime_error("Unknown device type!");
4400  return ttg::device::Device{};
4401  }
4402  };
4403  }
4404  }
4405 
4408  auto get_devicemap() { return devicemap; }
4409 
4412  template<typename Constraint>
4413  void add_constraint(std::shared_ptr<Constraint> c) {
4414  std::size_t cid = constraints_check.size();
4415  if constexpr(ttg::meta::is_void_v<keyT>) {
4416  c->add_listener([this, cid](){ this->release_constraint(cid); }, this);
4417  constraints_check.push_back([c, this](){ return c->check(this); });
4418  constraints_complete.push_back([c, this](const keyT& key){ c->complete(this); return true; });
4419  } else {
4420  c->add_listener([this, cid](const std::span<keyT>& keys){ this->release_constraint(cid, keys); }, this);
4421  constraints_check.push_back([c, this](const keyT& key){ return c->check(key, this); });
4422  constraints_complete.push_back([c, this](const keyT& key){ c->complete(key, this); return true; });
4423  }
4424  }
4425 
4428  template<typename Constraint>
4429  void add_constraint(Constraint&& c) {
4430  // need to make this a shared_ptr since it's shared between different callbacks
4431  this->add_constraint(std::make_shared<Constraint>(std::forward<Constraint>(c)));
4432  }
4433 
4437  template<typename Constraint, typename Mapper>
4438  void add_constraint(std::shared_ptr<Constraint> c, Mapper&& map) {
4439  static_assert(std::is_same_v<typename Constraint::key_type, keyT>);
4440  std::size_t cid = constraints_check.size();
4441  if constexpr(ttg::meta::is_void_v<keyT>) {
4442  c->add_listener([this, cid](){ this->release_constraint(cid); }, this);
4443  constraints_check.push_back([map, c, this](){ return c->check(map(), this); });
4444  constraints_complete.push_back([map, c, this](){ c->complete(map(), this); return true; });
4445  } else {
4446  c->add_listener([this, cid](const std::span<keyT>& keys){ this->release_constraint(cid, keys); }, this);
4447  constraints_check.push_back([map, c, this](const keyT& key){ return c->check(key, map(key), this); });
4448  constraints_complete.push_back([map, c, this](const keyT& key){ c->complete(key, map(key), this); return true; });
4449  }
4450  }
4451 
4455  template<typename Constraint, typename Mapper>
4456  void add_constraint(Constraint c, Mapper&& map) {
4457  // need to make this a shared_ptr since it's shared between different callbacks
4458  this->add_constraint(std::make_shared<Constraint>(std::forward<Constraint>(c)), std::forward<Mapper>(map));
4459  }
4460 
4461  // Register the static_op function to associate it to instance_id
4463  int rank;
4464  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
4465  ttg::trace("ttg_parsec(", rank, ") Inserting into static_id_to_op_map at ", get_instance_id());
4466  static_set_arg_fct_call_t call = std::make_pair(&TT::static_set_arg, this);
4467  auto &world_impl = world.impl();
4468  static_map_mutex.lock();
4469  static_id_to_op_map.insert(std::make_pair(get_instance_id(), call));
4470  if (delayed_unpack_actions.count(get_instance_id()) > 0) {
4471  auto tp = world_impl.taskpool();
4472 
4473  ttg::trace("ttg_parsec(", rank, ") There are ", delayed_unpack_actions.count(get_instance_id()),
4474  " messages delayed with op_id ", get_instance_id());
4475 
4476  auto se = delayed_unpack_actions.equal_range(get_instance_id());
4477  std::vector<static_set_arg_fct_arg_t> tmp;
4478  for (auto it = se.first; it != se.second;) {
4479  assert(it->first == get_instance_id());
4480  tmp.push_back(std::move(it->second));
4481  it = delayed_unpack_actions.erase(it);
4482  }
4483  static_map_mutex.unlock();
4484 
4485  for (auto& it : tmp) {
4486  if(ttg::tracing())
4487  ttg::print("ttg_parsec(", rank, ") Unpacking delayed message (", ", ", get_instance_id(), ", ",
4488  std::get<1>(it).get(), ", ", std::get<2>(it), ")");
4489  int rc = detail::static_unpack_msg(&parsec_ce, world_impl.parsec_ttg_tag(), std::get<1>(it).get(), std::get<2>(it),
4490  std::get<0>(it), NULL);
4491  assert(rc == 0);
4492  }
4493 
4494  tmp.clear();
4495  } else {
4496  static_map_mutex.unlock();
4497  }
4498  }
4499  };
4500 
4501 #include "ttg/make_tt.h"
4502 
4503 } // namespace ttg_parsec
4504 
4510 template <>
4512  private:
4513  ttg_parsec::detail::ttg_data_copy_t *copy_to_remove = nullptr;
4514  bool do_release = true;
4515 
4516  public:
4517  value_copy_handler() = default;
4520  : copy_to_remove(h.copy_to_remove)
4521  {
4522  h.copy_to_remove = nullptr;
4523  }
4524 
4527  {
4528  std::swap(copy_to_remove, h.copy_to_remove);
4529  return *this;
4530  }
4531 
4533  if (nullptr != copy_to_remove) {
4535  if (do_release) {
4536  ttg_parsec::detail::release_data_copy(copy_to_remove);
4537  }
4538  }
4539  }
4540 
4541  template <typename Value>
4542  inline std::conditional_t<std::is_reference_v<Value>,Value,Value&&> operator()(Value &&value) {
4543  constexpr auto value_is_rvref = std::is_rvalue_reference_v<decltype(value)>;
4544  using value_type = std::remove_reference_t<Value>;
4545  static_assert(value_is_rvref ||
4546  std::is_copy_constructible_v<std::decay_t<Value>>,
4547  "Data sent without being moved must be copy-constructible!");
4548 
4550  if (nullptr == caller) {
4551  throw std::runtime_error("ERROR: ttg::send or ttg::broadcast called outside of a task!");
4552  }
4553 
4555  copy = ttg_parsec::detail::find_copy_in_task(caller, &value);
4556  value_type *value_ptr = &value;
4557  if (nullptr == copy) {
4562  copy = ttg_parsec::detail::create_new_datacopy(std::forward<Value>(value));
4563  bool inserted = ttg_parsec::detail::add_copy_to_task(copy, caller);
4564  assert(inserted);
4565  value_ptr = reinterpret_cast<value_type *>(copy->get_ptr());
4566  copy_to_remove = copy;
4567  } else {
4568  if constexpr (value_is_rvref) {
4569  /* this copy won't be modified anymore so mark it as read-only */
4570  copy->reset_readers();
4571  }
4572  }
4573  /* We're coming from a writer so mark the data as modified.
4574  * That way we can force a pushout in prepare_send if we move to read-only tasks (needed by PaRSEC). */
4576  if constexpr (value_is_rvref)
4577  return std::move(*value_ptr);
4578  else
4579  return *value_ptr;
4580  }
4581 
4582  template<typename Value>
4583  inline std::add_lvalue_reference_t<Value> operator()(ttg_parsec::detail::persistent_value_ref<Value> vref) {
4585  if (nullptr == caller) {
4586  throw std::runtime_error("ERROR: ttg::send or ttg::broadcast called outside of a task!");
4587  }
4589  copy = ttg_parsec::detail::find_copy_in_task(caller, &vref.value_ref);
4590  if (nullptr == copy) {
4591  // no need to create a new copy since it's derived from the copy already
4592  copy = const_cast<ttg_parsec::detail::ttg_data_copy_t *>(static_cast<const ttg_parsec::detail::ttg_data_copy_t *>(&vref.value_ref));
4593  bool inserted = ttg_parsec::detail::add_copy_to_task(copy, caller);
4594  assert(inserted);
4595  copy_to_remove = copy; // we want to remove the copy from the task once done sending
4596  do_release = true; // we don't release the copy since we didn't allocate it
4597  copy->add_ref(); // add a reference so that TTG does not attempt to delete this object
4598  copy->add_ref(); // add another reference so that TTG never attempts to free this copy
4599  if (copy->num_readers() == 0) {
4600  /* add at least one reader (the current task) */
4601  copy->increment_readers<false>();
4602  }
4603  }
4604  return vref.value_ref;
4605  }
4606 
4607  template <typename Value>
4608  inline const Value &operator()(const Value &value) {
4610  if (nullptr == caller) {
4611  throw std::runtime_error("ERROR: ttg::send or ttg::broadcast called outside of a task!");
4612  }
4614  copy = ttg_parsec::detail::find_copy_in_task(caller, &value);
4615  const Value *value_ptr = &value;
4616  if (nullptr == copy) {
4622  bool inserted = ttg_parsec::detail::add_copy_to_task(copy, caller);
4623  assert(inserted);
4624  value_ptr = reinterpret_cast<Value *>(copy->get_ptr());
4625  copy_to_remove = copy;
4626  }
4628  return *value_ptr;
4629  }
4630 
4631 };
4632 
4633 #endif // PARSEC_TTG_H_INCLUDED
4634 // clang-format on
#define TTG_OP_ASSERT_EXECUTABLE()
Definition: tt.h:277
Edge is used to connect In and Out terminals.
Definition: edge.h:25
A base class for all template tasks.
Definition: tt.h:31
void trace(const T &t, const Ts &...ts)
Like ttg::trace(), but only produces tracing output if this->tracing()==true
Definition: tt.h:187
auto get_instance_id() const
Definition: tt.h:259
virtual void make_executable()=0
Marks this executable.
Definition: tt.h:287
bool tracing() const
Definition: tt.h:178
const std::string & get_name() const
Gets the name of this operation.
Definition: tt.h:218
TTBase(TTBase &&other)
Definition: tt.h:116
void register_input_terminals(terminalsT &terms, const namesT &names)
Definition: tt.h:85
bool is_lazy_pull()
Definition: tt.h:200
const TTBase * ttg_ptr() const
Definition: tt.h:206
void register_output_terminals(terminalsT &terms, const namesT &names)
Definition: tt.h:92
A complete version of void.
Definition: void.h:11
WorldImplT & impl(void)
Definition: world.h:216
int rank() const
Definition: world.h:204
Base class for implementation-specific Worlds.
Definition: world.h:33
void release_ops(void)
Definition: world.h:54
WorldImplBase(int size, int rank)
Definition: world.h:61
bool is_valid(void) const
Definition: world.h:154
Represents a device in a specific execution space.
Definition: device.h:14
void print_incomplete_tasks()
Definition: ttg.h:4096
void add_constraint(Constraint c, Mapper &&map)
Definition: ttg.h:4456
TT(const input_edges_type &inedges, const output_edges_type &outedges, const std::string &name, const std::vector< std::string > &innames, const std::vector< std::string > &outnames, keymapT &&keymap=keymapT(ttg::default_execution_context()), priomapT &&priomap=priomapT())
Definition: ttg.h:4064
ttg::World get_world() const override final
Definition: ttg.h:1370
static constexpr int numinvals
Definition: ttg.h:1282
uint64_t pack(T &obj, void *bytes, uint64_t pos, detail::ttg_data_copy_t *copy=nullptr)
Definition: ttg.h:1948
void set_priomap(Priomap &&pm)
Definition: ttg.h:4374
void set_input_reducer(Reducer &&reducer)
Definition: ttg.h:4137
std::tuple_element_t< i, input_terminals_type > * in()
Definition: ttg.h:4226
void set_devicemap(Devicemap &&dm)
Definition: ttg.h:4384
void set_keymap(Keymap &&km)
keymap setter
Definition: ttg.h:4363
void add_constraint(std::shared_ptr< Constraint > c)
Definition: ttg.h:4413
void finalize_argstream_from_msg(void *data, std::size_t size)
Definition: ttg.h:2259
void set_input_reducer(Reducer &&reducer, std::size_t size)
Definition: ttg.h:4218
void broadcast_arg_local(Iterator &&begin, Iterator &&end, const Value &value)
Definition: ttg.h:2965
bool check_constraints(task_t *task)
Definition: ttg.h:2602
std::enable_if_t<!std::is_void_v< std::decay_t< Value > >, void > do_prepare_send(const Value &value, RemoteCheckFn &&remote_check)
Definition: ttg.h:3496
ttg::detail::edges_tuple_t< keyT, ttg::meta::decayed_typelist_t< input_tuple_type > > input_edges_type
Definition: ttg.h:1273
std::enable_if_t<!ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > broadcast_arg(const ttg::span< const Key > &keylist, const Value &value)
Definition: ttg.h:2995
std::enable_if_t<!ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > set_arg(const Key &key, Value &&value)
Definition: ttg.h:2732
task_t * create_new_task(const Key &key)
Definition: ttg.h:2345
virtual ~TT()
Definition: ttg.h:4071
output_terminalsT output_terminals_type
Definition: ttg.h:1286
ttg::detail::input_terminals_tuple_t< keyT, input_tuple_type > input_terminals_type
Definition: ttg.h:1271
void add_constraint(std::shared_ptr< Constraint > c, Mapper &&map)
Definition: ttg.h:4438
static auto & get(InTuple &&intuple)
Definition: ttg.h:1294
std::enable_if_t< ttg::meta::is_void_v< Key >, void > set_arg()
Definition: ttg.h:2744
std::enable_if_t<!ttg::meta::is_void_v< Key > &&!ttg::meta::is_empty_tuple_v< input_values_tuple_type >, void > invoke(const Key &key, const input_values_tuple_type &args)
Definition: ttg.h:4239
std::enable_if_t< ttg::meta::is_void_v< Key > &&ttg::meta::is_empty_tuple_v< input_values_tuple_type >, void > invoke()
Definition: ttg.h:4284
TT(const std::string &name, const std::vector< std::string > &innames, const std::vector< std::string > &outnames, keymapT &&keymap=keymapT(ttg::default_execution_context()), priomapT &&priomap=priomapT())
Definition: ttg.h:4044
void set_defer_writer(bool value)
Definition: ttg.h:4342
bool can_inline_data(Value *value_ptr, detail::ttg_data_copy_t *copy, const Key &key, std::size_t num_keys)
Definition: ttg.h:2755
virtual void release() override
Definition: ttg.h:4100
bool get_defer_writer(bool value)
Definition: ttg.h:4346
std::enable_if_t<!ttg::meta::is_void_v< Key >, void > set_arg(const Key &key)
Definition: ttg.h:2750
std::enable_if_t< ttg::meta::is_none_void_v< Key >, void > set_args(std::index_sequence< Is... >, std::index_sequence< Js... >, const Key &key, const std::tuple< Ts... > &args)
Definition: ttg.h:3177
std::enable_if_t< ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > set_arg_local(Value &&value)
Definition: ttg.h:2321
std::enable_if_t< ttg::meta::is_void_v< Key >, void > set_args(std::index_sequence< Is... >, std::index_sequence< Js... >, const std::tuple< Ts... > &args)
Definition: ttg.h:3198
static void ht_iter_cb(void *item, void *cb_data)
Definition: ttg.h:4086
std::enable_if_t< ttg::meta::is_none_void_v< Key >, void > set_args(std::index_sequence< Is... > is, const Key &key, const std::tuple< Ts... > &args)
Definition: ttg.h:3189
parsec_thread_mempool_t * get_task_mempool(void)
Definition: ttg.h:2003
std::enable_if_t<!ttg::meta::is_void_v< Key >, void > set_argstream_size(const Key &key, std::size_t size)
Definition: ttg.h:3237
void do_release()
Definition: ttg.h:4102
void release_task(task_t *task, parsec_task_t **task_ring=nullptr)
Definition: ttg.h:2679
std::enable_if_t< ttg::meta::is_void_v< Key >, void > set_argstream_size(std::size_t size)
Definition: ttg.h:3298
const auto & get_output_terminals() const
Definition: ttg.h:1311
static constexpr bool derived_has_device_op()
Definition: ttg.h:1259
std::enable_if_t< ttg::meta::is_void_v< Key > &&!ttg::meta::is_empty_tuple_v< input_values_tuple_type >, void > invoke(const input_values_tuple_type &args)
Definition: ttg.h:4256
auto get_devicemap()
Definition: ttg.h:4408
TT(const std::string &name, const std::vector< std::string > &innames, const std::vector< std::string > &outnames, ttg::World world, keymapT &&keymap_=keymapT(), priomapT &&priomap_=priomapT())
Definition: ttg.h:3889
decltype(keymap) const & get_keymap() const
Definition: ttg.h:4359
ttg::meta::drop_void_t< ttg::meta::decayed_typelist_t< input_tuple_type > > input_values_tuple_type
Definition: ttg.h:1279
std::enable_if_t< key_is_void, void > finalize_argstream()
finalizes stream for input i
Definition: ttg.h:3408
std::enable_if_t<!ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > prepare_send(const ttg::span< const Key > &keylist, const Value &value)
Definition: ttg.h:3592
detail::reducer_task_t * create_new_reducer_task(task_t *task, bool is_first)
Definition: ttg.h:2372
TT(const input_edges_type &inedges, const output_edges_type &outedges, const std::string &name, const std::vector< std::string > &innames, const std::vector< std::string > &outnames, ttg::World world, keymapT &&keymap_=keymapT(), priomapT &&priomap=priomapT())
Definition: ttg.h:4051
uint64_t unpack(T &obj, void *_bytes, uint64_t pos)
Definition: ttg.h:1935
void set_static_argstream_size(std::size_t size)
Definition: ttg.h:3218
static constexpr const ttg::Runtime runtime
Definition: ttg.h:4129
static void static_set_arg(void *data, std::size_t size, ttg::TTBase *bop)
Definition: ttg.h:1958
void set_arg_from_msg_keylist(ttg::span< keyT > &&keylist, detail::ttg_data_copy_t *copy)
Definition: ttg.h:2011
static resultT get(InTuple &&intuple)
Definition: ttg.h:1290
static constexpr bool derived_has_cuda_op()
Definition: ttg.h:1241
void register_static_op_function(void)
Definition: ttg.h:4462
void add_constraint(Constraint &&c)
Definition: ttg.h:4429
std::enable_if_t< ttg::meta::is_void_v< Key >, void > set_args(std::index_sequence< Is... > is, const std::tuple< Ts... > &args)
Definition: ttg.h:3209
typename ttg::terminals_to_edges< output_terminalsT >::type output_edges_type
Definition: ttg.h:1287
void argstream_set_size_from_msg(void *data, std::size_t size)
Definition: ttg.h:2278
std::enable_if_t< ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > set_arg_local(const Value &value)
Definition: ttg.h:2333
void set_arg_from_msg(void *data, std::size_t size)
Definition: ttg.h:2072
std::enable_if_t< ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > set_arg_local(std::shared_ptr< const Value > &valueptr)
Definition: ttg.h:2339
keyT key_type
Definition: ttg.h:1270
void invoke() override
Definition: ttg.h:4292
void make_executable() override
Marks this executable.
Definition: ttg.h:4351
std::enable_if_t<!ttg::meta::is_void_v< Key >, void > release_constraint(std::size_t cid, const std::span< Key > &keys)
Definition: ttg.h:2643
ttg::meta::add_glvalue_reference_tuple_t< ttg::meta::void_to_Void_tuple_t< actual_input_tuple_type > > input_refs_full_tuple_type
Definition: ttg.h:1278
ttg::meta::drop_void_t< ttg::meta::add_glvalue_reference_tuple_t< input_tuple_type > > input_refs_tuple_type
Definition: ttg.h:1280
ttg::meta::void_to_Void_tuple_t< ttg::meta::decayed_typelist_t< actual_input_tuple_type > > input_values_full_tuple_type
Definition: ttg.h:1276
decltype(priomap) const & get_priomap() const
Definition: ttg.h:4369
std::enable_if_t<!ttg::meta::is_void_v< Key > &&!ttg::meta::is_empty_tuple_v< input_values_tuple_type >, void > invoke(const Key &key, Arg &&arg, Args &&... args)
Definition: ttg.h:4328
std::enable_if_t<!ttg::meta::is_void_v< Key >, void > finalize_argstream(const Key &key)
finalizes stream for input i
Definition: ttg.h:3356
std::enable_if_t<!ttg::meta::is_void_v< Key > &&ttg::meta::is_empty_tuple_v< input_values_tuple_type >, void > invoke(const Key &key)
Definition: ttg.h:4268
std::tuple_element_t< i, output_terminalsT > * out()
Definition: ttg.h:4233
void set_arg_local_impl(const Key &key, Value &&value, detail::ttg_data_copy_t *copy_in=nullptr, parsec_task_t **task_ring=nullptr)
Definition: ttg.h:2399
std::enable_if_t<!ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > set_arg_local(const Key &key, const Value &value)
Definition: ttg.h:2327
std::enable_if_t<!ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > set_arg_local(const Key &key, Value &&value)
Definition: ttg.h:2315
static constexpr bool derived_has_hip_op()
Definition: ttg.h:1247
std::enable_if_t< ttg::meta::is_void_v< Key >, void > release_constraint(std::size_t cid)
Definition: ttg.h:2619
void copy_mark_pushout(const Value &value)
Definition: ttg.h:3456
void get_from_pull_msg(void *data, std::size_t size)
Definition: ttg.h:2301
void set_arg_impl(const Key &key, Value &&value, detail::ttg_data_copy_t *copy_in=nullptr)
Definition: ttg.h:2789
std::enable_if_t< ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > set_arg(Value &&value)
Definition: ttg.h:2739
static constexpr bool derived_has_level_zero_op()
Definition: ttg.h:1253
std::enable_if_t< ttg::meta::is_void_v< Key > &&!std::is_void_v< std::decay_t< Value > >, void > prepare_send(const Value &value)
Definition: ttg.h:3606
actual_input_tuple_type input_args_type
Definition: ttg.h:1272
void increment_created()
Definition: ttg.h:469
virtual void execute() override
Definition: ttg.h:410
static constexpr int parsec_ttg_rma_tag()
Definition: ttg.h:406
void decrement_inflight_msg()
Definition: ttg.h:472
WorldImpl & operator=(const WorldImpl &other)=delete
void destroy_tpool()
Definition: ttg.h:421
const ttg::Edge & ctl_edge() const
Definition: ttg.h:467
void increment_inflight_msg()
Definition: ttg.h:471
WorldImpl(const WorldImpl &other)=delete
virtual void profile_off() override
Definition: ttg.h:501
void create_tpool()
Definition: ttg.h:348
WorldImpl(int *argc, char **argv[], int ncores, parsec_context_t *c=nullptr)
Definition: ttg.h:274
ttg::Edge & ctl_edge()
Definition: ttg.h:465
MPI_Comm comm() const
Definition: ttg.h:408
bool mpi_support(ttg::ExecutionSpace space)
Definition: ttg.h:515
void register_tt_profiling(const TT< keyT, output_terminalsT, derivedT, input_valueTs, Space > *t)
Definition: ttg.h:530
virtual bool profiling() override
Definition: ttg.h:513
virtual void dag_off() override
Definition: ttg.h:492
virtual void fence_impl(void) override
Definition: ttg.h:569
virtual void dag_on(const std::string &filename) override
Definition: ttg.h:476
static constexpr int parsec_ttg_tag()
Definition: ttg.h:405
virtual void final_task() override
Definition: ttg.h:519
auto * taskpool()
Definition: ttg.h:346
virtual void destroy() override
Definition: ttg.h:434
virtual void profile_on() override
Definition: ttg.h:507
WorldImpl(WorldImpl &&other)=delete
bool dag_profiling() override
Definition: ttg.h:474
auto * execution_stream()
Definition: ttg.h:345
auto * context()
Definition: ttg.h:344
WorldImpl & operator=(WorldImpl &&other)=delete
rma_delayed_activate(std::vector< KeyT > &&key, detail::ttg_data_copy_t *copy, int num_transfers, ActivationCallbackT cb)
Definition: ttg.h:863
constexpr auto data(C &c) -> decltype(c.data())
Definition: span.h:189
typename make_index_sequence_t< I... >::type make_index_sequence
std::integral_constant< bool,(Flags &const_) !=0 > is_const
void set_default_world(WorldT &world)
Definition: world.h:29
void deregister_world(ttg::base::WorldImplBase &world)
bool force_device_comm()
Definition: env.cpp:33
typename input_terminals_tuple< keyT, valuesT... >::type input_terminals_tuple_t
Definition: terminal.h:354
void register_world(ttg::base::WorldImplBase &world)
int num_threads()
Determine the number of compute threads to use by TTG when not given to ttg::initialize
Definition: env.cpp:15
typename edges_tuple< keyT, valuesT >::type edges_tuple_t
Definition: edge.h:199
void set_current(int device, Stream stream)
Definition: device.h:128
void reset_current()
Definition: device.h:123
constexpr bool probe_all_v
Definition: meta.h:191
constexpr bool is_const_v
Definition: meta.h:325
typename typelist_to_tuple< T >::type typelist_to_tuple_t
Definition: typelist.h:52
ttg_data_copy_t * find_copy_in_task(parsec_ttg_task_base_t *task, const void *ptr)
Definition: ttg.h:646
ttg_parsec::detail::ttg_data_copy_t * get_copy(ttg_parsec::Ptr< T > &p)
Definition: ptr.h:277
bool & initialized_mpi()
Definition: ttg.h:227
parsec_hook_return_t evaluate_level_zero(const parsec_task_t *parsec_task)
Definition: ttg.h:845
bool all_devices_peer_access
Definition: ttg.h:232
ttg::device::Device parsec_device_to_ttg_device(int parsec_id)
Definition: device.h:30
parsec_hook_return_t hook_level_zero(struct parsec_execution_stream_s *es, parsec_task_t *parsec_task)
Definition: ttg.h:813
int first_device_id
Definition: device.h:12
std::size_t max_inline_size
Definition: ttg.h:192
int find_index_of_copy_in_task(parsec_ttg_task_base_t *task, const void *ptr)
Definition: ttg.h:661
void foreach_parsec_data(Value &&value, Fn &&fn)
Definition: parsec_data.h:9
int ttg_device_to_parsec_device(const ttg::device::Device &device)
Definition: device.h:18
const parsec_symbol_t parsec_taskclass_param1
Definition: ttg.h:621
parsec_hook_return_t evaluate_cuda(const parsec_task_t *parsec_task)
Definition: ttg.h:825
bool add_copy_to_task(ttg_data_copy_t *copy, parsec_ttg_task_base_t *task)
Definition: ttg.h:675
constexpr const int PARSEC_TTG_MAX_AM_SIZE
Definition: ttg.h:174
void remove_data_copy(ttg_data_copy_t *copy, parsec_ttg_task_base_t *task)
Definition: ttg.h:689
parsec_hook_return_t hook(struct parsec_execution_stream_s *es, parsec_task_t *parsec_task)
Definition: ttg.h:782
ttg_data_copy_t * register_data_copy(ttg_data_copy_t *copy_in, parsec_ttg_task_base_t *task, bool readonly)
Definition: ttg.h:960
parsec_hook_return_t evaluate_hip(const parsec_task_t *parsec_task)
Definition: ttg.h:835
void transfer_ownership_impl(T &&arg, int device)
Definition: ttg.h:761
const parsec_symbol_t parsec_taskclass_param2
Definition: ttg.h:629
parsec_hook_return_t hook_hip(struct parsec_execution_stream_s *es, parsec_task_t *parsec_task)
Definition: ttg.h:802
ttg_data_copy_t * create_new_datacopy(Value &&value)
Definition: ttg.h:717
parsec_hook_return_t hook_cuda(struct parsec_execution_stream_s *es, parsec_task_t *parsec_task)
Definition: ttg.h:791
thread_local parsec_ttg_task_base_t * parsec_ttg_caller
Definition: thread_local.h:12
void transfer_ownership(parsec_ttg_task_t< TT > *me, int device, std::index_sequence< Is... >)
Definition: ttg.h:772
void release_data_copy(ttg_data_copy_t *copy)
Definition: ttg.h:907
const parsec_symbol_t parsec_taskclass_param3
Definition: ttg.h:637
const parsec_symbol_t parsec_taskclass_param0
Definition: ttg.h:613
this contains PaRSEC-based TTG functionality
Definition: fwd.h:18
void ttg_fence(ttg::World world)
Definition: ttg.h:1138
std::map< uint64_t, static_set_arg_fct_call_t > static_id_to_op_map
Definition: ttg.h:136
std::multimap< uint64_t, static_set_arg_fct_arg_t > delayed_unpack_actions
Definition: ttg.h:139
void ttg_finalize()
Definition: ttg.h:1124
void ttg_register_ptr(ttg::World world, const std::shared_ptr< T > &ptr)
Definition: ttg.h:1141
std::mutex static_map_mutex
Definition: ttg.h:137
void ttg_register_callback(ttg::World world, Callback &&callback)
Definition: ttg.h:1155
ttg::Edge & ttg_ctl_edge(ttg::World world)
Definition: ttg.h:1159
void make_executable_hook(ttg::World &)
Definition: ttg.h:1167
void(* static_set_arg_fct_type)(void *, size_t, ttg::TTBase *)
Definition: ttg.h:134
std::tuple< int, std::unique_ptr< std::byte[]>, size_t > static_set_arg_fct_arg_t
Definition: ttg.h:138
void ttg_initialize(int argc, char **argv, int num_threads=-1, parsec_context_s *=nullptr)
void ttg_register_status(ttg::World world, const std::shared_ptr< std::promise< void >> &status_ptr)
Definition: ttg.h:1150
ttg::World ttg_default_execution_context()
Definition: ttg.h:1134
std::pair< static_set_arg_fct_type, ttg::TTBase * > static_set_arg_fct_call_t
Definition: ttg.h:135
void ttg_execute(ttg::World world)
Definition: ttg.h:1137
void ttg_sum(ttg::World world, double &value)
Definition: ttg.h:1161
top-level TTG namespace contains runtime-neutral functionality
Definition: keymap.h:8
ExecutionSpace
denotes task execution space
Definition: execution.h:17
int size(World world=default_execution_context())
Definition: run.h:89
void abort()
Aborts the TTG program using the default backend's ttg_abort method.
Definition: run.h:62
Runtime
Definition: runtimes.h:15
World default_execution_context()
Accesses the default backend's default execution context.
Definition: run.h:68
TTG_CXX_COROUTINE_NAMESPACE::coroutine_handle< Promise > coroutine_handle
Definition: coroutine.h:24
void print(const T &t, const Ts &... ts)
atomically prints to std::cout a sequence of items (separated by ttg::print_separator) followed by st...
Definition: print.h:130
void print_error(const T &t, const Ts &... ts)
atomically prints to std::cerr a sequence of items (separated by ttg::print_separator) followed by st...
Definition: print.h:138
bool tracing()
returns whether tracing is enabled
Definition: trace.h:28
int rank(World world=default_execution_context())
Definition: run.h:85
ttg::World & get_default_world()
Definition: world.h:80
void trace(const T &t, const Ts &... ts)
Definition: trace.h:43
@ ResumableTask
-> ttg::resumable_task
@ Invalid
not a coroutine, i.e. a standard task function, -> void
@ DeviceTask
-> ttg::device::Task
#define TTG_PARSEC_FLOW_ACCESS_TMP
Definition: parsec-ext.h:8
Provides (de)serialization of C++ data that can be invoked from C via ttg_data_descriptor.
const Value & operator()(const Value &value)
Definition: ttg.h:4608
std::conditional_t< std::is_reference_v< Value >, Value, Value && > operator()(Value &&value)
Definition: ttg.h:4542
value_copy_handler & operator=(value_copy_handler &&h)
Definition: ttg.h:4526
value_copy_handler(const value_copy_handler &h)=delete
std::add_lvalue_reference_t< Value > operator()(ttg_parsec::detail::persistent_value_ref< Value > vref)
Definition: ttg.h:4583
value_copy_handler & operator=(const value_copy_handler &h)=delete
A container for types.
Definition: typelist.h:24
Computes hash values for objects of type T.
Definition: hash.h:81
task that can be resumed after some events occur
Definition: coroutine.h:53
parsec_hash_table_t task_constraint_table
Definition: ttg.h:1198
parsec_hash_table_t tasks_table
Definition: ttg.h:1197
parsec_gpu_task_t * gpu_task
Definition: task.h:14
msg_header_t tt_id
Definition: ttg.h:177
static constexpr std::size_t max_payload_size
Definition: ttg.h:178
msg_t(uint64_t tt_id, uint32_t taskpool_id, msg_header_t::fn_id_t fn_id, int32_t param_id, int sender, int num_keys=1)
Definition: ttg.h:182
unsigned char bytes[max_payload_size]
Definition: ttg.h:179
ttg_parsec_data_flags data_flags
Definition: task.h:136
parsec_hash_table_item_t tt_ht_item
Definition: task.h:94
std::array< stream_info_t, num_streams > streams
Definition: task.h:210
ttg_data_copy_t * copies[num_copies]
Definition: task.h:216
lvalue_reference_type value_ref
Definition: ttvalue.h:88
static void drop_all_ptr()
Definition: ptr.h:117
static void release_task(parsec_ttg_task_base_t *task_base)
Definition: task.h:359
static constexpr int mutable_tag
Definition: ttg_data_copy.h:61
parsec_task_t * get_next_task() const
void set_next_task(parsec_task_t *task)
enum ttg_parsec::msg_header_t::fn_id fn_id_t
uint32_t taskpool_id
Definition: ttg.h:148
std::int8_t num_iovecs
Definition: ttg.h:152
std::size_t key_offset
Definition: ttg.h:150
msg_header_t(fn_id_t fid, uint32_t tid, uint64_t oid, int32_t pid, int sender, int nk)
Definition: ttg.h:160
#define TTG_PROCESS_TT_OP_RETURN(result, id, invoke)
Definition: tt.h:181
int parsec_add_fetch_runtime_task(parsec_taskpool_t *tp, int tasks)
void parsec_taskpool_termination_detected(parsec_taskpool_t *tp)
#define TTG_PARSEC_DEFER_WRITER
Definition: ttg.h:13